[Spice-devel] [RFC spice-gtk 1/3] Handle LZ4 compressed msgs in any channel
Snir Sheriber
ssheribe at redhat.com
Thu Mar 2 16:53:27 UTC 2017
If LZ4 lib exists, handle LZ4 compressed & stream compressed
messages in any channel
In stream compression mode decompressed messages are being saved
sequentially in a pre-allocated buffer which will be utilized
by the decompression mechanism in the following decompressions
Update spice-common
---
spice-common | 2 +-
src/spice-channel-priv.h | 12 ++++
src/spice-channel.c | 150 +++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 163 insertions(+), 1 deletion(-)
diff --git a/spice-common b/spice-common
index 30e8237..cde4a8b 160000
--- a/spice-common
+++ b/spice-common
@@ -1 +1 @@
-Subproject commit 30e8237934b4060513bd0bdcc5ea0caa1dcc7b59
+Subproject commit cde4a8bb67330006ee26e1c0d5892c824edce327
diff --git a/src/spice-channel-priv.h b/src/spice-channel-priv.h
index 50aca5c..2eaf816 100644
--- a/src/spice-channel-priv.h
+++ b/src/spice-channel-priv.h
@@ -27,6 +27,10 @@
#include <sasl/sasl.h>
#endif
+#ifdef USE_LZ4
+#include<lz4.h>
+#endif
+
#include "spice-channel.h"
#include "spice-util-priv.h"
#include "coroutine.h"
@@ -62,6 +66,8 @@ struct _SpiceMsgIn {
size_t psize;
message_destructor_t pfree;
SpiceMsgIn *parent;
+ bool stream_buf_in_use;
+ uint8_t header_offset;
};
enum spice_channel_state {
@@ -100,6 +106,12 @@ struct _SpiceChannelPrivate {
uint64_t out_serial;
uint64_t in_serial;
+#ifdef USE_LZ4
+ LZ4_streamDecode_t *lz4_in_stream;
+ char *in_stream_buf;
+ size_t in_stream_offset;
+#endif
+
/* not swapped */
SpiceSession *session;
GCoroutine coroutine;
diff --git a/src/spice-channel.c b/src/spice-channel.c
index af67931..5f07bd6 100644
--- a/src/spice-channel.c
+++ b/src/spice-channel.c
@@ -47,6 +47,14 @@
#include "gio-coroutine.h"
+#ifdef USE_LZ4
+enum {
+ COMPRESS_STREAM_MIN_THRESHOLD = 6,
+ COMPRESS_STREAM_MAX_MSG_SIZE = 1024 * 64, /* Maximum message size for LZ4 stream compression */
+ STREAM_BUF_SIZE = 1024 *64 + COMPRESS_STREAM_MAX_MSG_SIZE + 8 /* Recommended buffer size according to the api */
+};
+#endif
+
static void spice_channel_handle_msg(SpiceChannel *channel, SpiceMsgIn *msg);
static void spice_channel_write_msg(SpiceChannel *channel, SpiceMsgOut *out);
static void spice_channel_send_link(SpiceChannel *channel);
@@ -149,6 +157,11 @@ static void spice_channel_constructed(GObject *gobject)
if (disabled && strstr(disabled, desc))
c->disable_channel_msg = TRUE;
+#ifdef USE_LZ4
+ c->lz4_in_stream = LZ4_createStreamDecode();
+ c->in_stream_buf = g_malloc(STREAM_BUF_SIZE);
+ c->in_stream_offset = 0;
+#endif
spice_session_channel_new(c->session, channel);
/* Chain up to the parent class */
@@ -185,6 +198,13 @@ static void spice_channel_finalize(GObject *gobject)
g_mutex_clear(&c->xmit_queue_lock);
+#ifdef USE_LZ4
+ if(c->lz4_in_stream)
+ LZ4_freeStreamDecode(c->lz4_in_stream);
+
+ g_free(c->in_stream_buf);
+#endif
+
if (c->caps)
g_array_free(c->caps, TRUE);
@@ -504,6 +524,10 @@ SpiceMsgIn *spice_msg_in_new(SpiceChannel *channel)
in = g_new0(SpiceMsgIn, 1);
in->refcount = 1;
in->channel = channel;
+#ifdef USE_LZ4
+ in->stream_buf_in_use = FALSE;
+ in->header_offset = 0;
+#endif
return in;
}
@@ -546,7 +570,15 @@ void spice_msg_in_unref(SpiceMsgIn *in)
in->pfree(in->parsed);
if (in->parent) {
spice_msg_in_unref(in->parent);
+#ifdef USE_LZ4
+ } else if(in->header_offset > 0) {
+ /* points to the allocated buffer in header_offset offset */
+ g_free(in->data - in->header_offset);
+ } else if(!in->stream_buf_in_use) {
+ /* points to the stream buffer */
+#else
} else {
+#endif
g_free(in->data);
}
g_free(in);
@@ -1939,6 +1971,119 @@ gboolean spice_channel_get_read_only(SpiceChannel *channel)
return spice_session_get_read_only(channel->priv->session);
}
+#ifdef USE_LZ4
+static char* in_stream_get_ptr(SpiceChannel *channel)
+{
+ SpiceChannelPrivate *c;
+
+ c = SPICE_CHANNEL(channel)->priv;
+
+ return &c->in_stream_buf[c->in_stream_offset];
+
+}
+
+static void in_stream_update(SpiceChannel *channel, int size)
+{
+ SpiceChannelPrivate *c;
+
+ c = SPICE_CHANNEL(channel)->priv;
+ /* Add size to the stream buffer offset & reset if needed
+ * so that place for new message is always available */
+ c->in_stream_offset += size;
+ if (c->in_stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE)
+ c->in_stream_offset = 0;
+}
+
+static LZ4_streamDecode_t* in_stream_get_stream(SpiceChannel *channel)
+{
+ SpiceChannelPrivate *c;
+
+ c = SPICE_CHANNEL(channel)->priv;
+
+ return c->lz4_in_stream;
+}
+#endif
+
+static int spice_channel_recv_compressed_msg(SpiceChannel *channel,
+ SpiceMsgIn *in,
+ int *msg_size,
+ int *msg_type)
+{
+ SpiceChannelPrivate *c = channel->priv;
+ int decompressed_size = 0;
+ char *decompressed;
+ uint32_t uncompressed_size;
+ SpiceMsgCompressedData *compressed_data_msg = (SpiceMsgCompressedData*)c->parser(in->data,
+ in->data + *msg_size,
+ *msg_type,
+ c->peer_hdr.minor_version,
+ &in->psize,
+ &in->pfree);
+
+ if (compressed_data_msg == NULL) {
+ g_critical("failed to parse decompressed message: %s type %d",
+ c->name, *msg_type);
+ return FALSE;
+ }
+ if (compressed_data_msg->uncompressed_size == 0) {
+ spice_warning("Invalid uncompressed_size");
+ return FALSE;
+ }
+
+ uncompressed_size = compressed_data_msg->uncompressed_size;
+ switch (compressed_data_msg->type) {
+#ifdef USE_LZ4
+ case SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4:
+ decompressed = in_stream_get_ptr(channel);
+ decompressed_size = LZ4_decompress_safe_continue (in_stream_get_stream(channel),
+ (char*)compressed_data_msg->compressed_data,
+ decompressed,
+ compressed_data_msg->compressed_size,
+ compressed_data_msg->uncompressed_size);
+ break;
+ case SPICE_DATA_COMPRESSION_TYPE_LZ4:
+ decompressed = g_malloc(compressed_data_msg->uncompressed_size);
+ decompressed_size = LZ4_decompress_safe((char*)compressed_data_msg->compressed_data,
+ decompressed,
+ compressed_data_msg->compressed_size,
+ compressed_data_msg->uncompressed_size);
+ break;
+#endif
+ default:
+ spice_warning("Unknown Compression Type");
+ return FALSE;
+ }
+ if (decompressed_size != uncompressed_size) {
+ spice_warning("Decompress Error decompressed_size=%d expected=%u",
+ decompressed_size, uncompressed_size);
+ return FALSE;
+ }
+
+ if(decompressed_size > 0) { /* Decompression success */
+ free(in->data);
+ /* point in->data the data location (after the header) */
+ in->data = (uint8_t*)decompressed + spice_header_get_header_size(c->use_mini_header);
+#ifdef USE_LZ4
+ if(compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) {
+ in->stream_buf_in_use = TRUE;
+ in_stream_update(channel, decompressed_size);
+ } else { /* SPICE_DATA_COMPRESSION_TYPE_LZ4 */
+ in->header_offset = spice_header_get_header_size(c->use_mini_header);
+ }
+#endif
+ if(in->pfree != NULL)
+ in->pfree((uint8_t*)compressed_data_msg);
+ /* copy new header and update variables */
+ memcpy(in->header,decompressed, spice_header_get_header_size(c->use_mini_header));
+ in->dpos = *msg_size = spice_header_get_msg_size(in->header, c->use_mini_header);
+ *msg_type = spice_header_get_msg_type(in->header, c->use_mini_header);
+ return TRUE;
+ } else { /* Decompression fail */
+ spice_warning("Decompression Failed");
+ return FALSE;
+ }
+}
+
/* coroutine context */
G_GNUC_INTERNAL
void spice_channel_recv_msg(SpiceChannel *channel,
@@ -1969,6 +2114,11 @@ void spice_channel_recv_msg(SpiceChannel *channel,
in->dpos = msg_size;
msg_type = spice_header_get_msg_type(in->header, c->use_mini_header);
+ if (msg_type == SPICE_MSG_COMPRESSED_DATA) {
+ if(!spice_channel_recv_compressed_msg(channel, in, &msg_size, &msg_type))
+ goto end;
+ }
+
sub_list_offset = spice_header_get_msg_sub_list(in->header, c->use_mini_header);
if (msg_type == SPICE_MSG_LIST || sub_list_offset) {
--
2.9.3
More information about the Spice-devel
mailing list