[Spice-devel] [RFC spice-gtk 1/3] Handle LZ4 compressed msgs in any channel

Snir Sheriber ssheribe at redhat.com
Thu Mar 2 16:53:27 UTC 2017


If LZ4 lib exists, handle LZ4 compressed & stream compressed
messages in any channel

In stream compression mode decompressed messages are being saved
sequentially in a pre-allocated buffer which will be utilized
by the decompression mechanism in the following decompressions

Update spice-common
---
 spice-common             |   2 +-
 src/spice-channel-priv.h |  12 ++++
 src/spice-channel.c      | 150 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 163 insertions(+), 1 deletion(-)

diff --git a/spice-common b/spice-common
index 30e8237..cde4a8b 160000
--- a/spice-common
+++ b/spice-common
@@ -1 +1 @@
-Subproject commit 30e8237934b4060513bd0bdcc5ea0caa1dcc7b59
+Subproject commit cde4a8bb67330006ee26e1c0d5892c824edce327
diff --git a/src/spice-channel-priv.h b/src/spice-channel-priv.h
index 50aca5c..2eaf816 100644
--- a/src/spice-channel-priv.h
+++ b/src/spice-channel-priv.h
@@ -27,6 +27,10 @@
 #include <sasl/sasl.h>
 #endif
 
+#ifdef USE_LZ4
+#include<lz4.h>
+#endif
+
 #include "spice-channel.h"
 #include "spice-util-priv.h"
 #include "coroutine.h"
@@ -62,6 +66,8 @@ struct _SpiceMsgIn {
     size_t                psize;
     message_destructor_t  pfree;
     SpiceMsgIn            *parent;
+    bool                  stream_buf_in_use;
+    uint8_t               header_offset;
 };
 
 enum spice_channel_state {
@@ -100,6 +106,12 @@ struct _SpiceChannelPrivate {
     uint64_t                    out_serial;
     uint64_t                    in_serial;
 
+#ifdef USE_LZ4
+    LZ4_streamDecode_t          *lz4_in_stream;
+    char                        *in_stream_buf;
+    size_t                      in_stream_offset;
+#endif
+
     /* not swapped */
     SpiceSession                *session;
     GCoroutine                  coroutine;
diff --git a/src/spice-channel.c b/src/spice-channel.c
index af67931..5f07bd6 100644
--- a/src/spice-channel.c
+++ b/src/spice-channel.c
@@ -47,6 +47,14 @@
 
 #include "gio-coroutine.h"
 
+#ifdef USE_LZ4
+enum {
+    COMPRESS_STREAM_MIN_THRESHOLD = 6,
+    COMPRESS_STREAM_MAX_MSG_SIZE = 1024 * 64, /* Maximum message size for LZ4 stream compression */
+    STREAM_BUF_SIZE = 1024 *64 + COMPRESS_STREAM_MAX_MSG_SIZE + 8 /* Recommended buffer size according to the api */
+};
+#endif
+
 static void spice_channel_handle_msg(SpiceChannel *channel, SpiceMsgIn *msg);
 static void spice_channel_write_msg(SpiceChannel *channel, SpiceMsgOut *out);
 static void spice_channel_send_link(SpiceChannel *channel);
@@ -149,6 +157,11 @@ static void spice_channel_constructed(GObject *gobject)
     if (disabled && strstr(disabled, desc))
         c->disable_channel_msg = TRUE;
 
+#ifdef USE_LZ4
+    c->lz4_in_stream = LZ4_createStreamDecode();
+    c->in_stream_buf = g_malloc(STREAM_BUF_SIZE);
+    c->in_stream_offset = 0;
+#endif
     spice_session_channel_new(c->session, channel);
 
     /* Chain up to the parent class */
@@ -185,6 +198,13 @@ static void spice_channel_finalize(GObject *gobject)
 
     g_mutex_clear(&c->xmit_queue_lock);
 
+#ifdef USE_LZ4
+    if(c->lz4_in_stream)
+        LZ4_freeStreamDecode(c->lz4_in_stream);
+
+    g_free(c->in_stream_buf);
+#endif
+
     if (c->caps)
         g_array_free(c->caps, TRUE);
 
@@ -504,6 +524,10 @@ SpiceMsgIn *spice_msg_in_new(SpiceChannel *channel)
     in = g_new0(SpiceMsgIn, 1);
     in->refcount = 1;
     in->channel  = channel;
+#ifdef USE_LZ4
+    in->stream_buf_in_use = FALSE;
+    in->header_offset = 0;
+#endif
 
     return in;
 }
@@ -546,7 +570,15 @@ void spice_msg_in_unref(SpiceMsgIn *in)
         in->pfree(in->parsed);
     if (in->parent) {
         spice_msg_in_unref(in->parent);
+#ifdef USE_LZ4
+    } else if(in->header_offset > 0) {
+	/* points to the allocated buffer in header_offset offset */
+	g_free(in->data - in->header_offset);
+    } else if(!in->stream_buf_in_use) {
+	/* points to the stream buffer */
+#else
     } else {
+#endif
         g_free(in->data);
     }
     g_free(in);
@@ -1939,6 +1971,119 @@ gboolean spice_channel_get_read_only(SpiceChannel *channel)
     return spice_session_get_read_only(channel->priv->session);
 }
 
+#ifdef USE_LZ4
+static char* in_stream_get_ptr(SpiceChannel *channel)
+{
+    SpiceChannelPrivate *c;
+
+    c = SPICE_CHANNEL(channel)->priv;
+
+    return &c->in_stream_buf[c->in_stream_offset];
+
+}
+
+static void in_stream_update(SpiceChannel *channel, int size)
+{
+    SpiceChannelPrivate *c;
+
+    c = SPICE_CHANNEL(channel)->priv;
+    /* Add size to the stream buffer offset & reset if needed
+     * so that place for new message is always available */
+    c->in_stream_offset += size;
+    if (c->in_stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE)
+        c->in_stream_offset = 0;
+}
+
+static LZ4_streamDecode_t* in_stream_get_stream(SpiceChannel *channel)
+{
+    SpiceChannelPrivate *c;
+
+    c = SPICE_CHANNEL(channel)->priv;
+
+    return c->lz4_in_stream;
+}
+#endif
+
+static int spice_channel_recv_compressed_msg(SpiceChannel *channel,
+                                             SpiceMsgIn *in,
+                                             int *msg_size,
+                                             int *msg_type)
+{
+    SpiceChannelPrivate *c = channel->priv;
+    int decompressed_size = 0;
+    char *decompressed;
+    uint32_t uncompressed_size;
+    SpiceMsgCompressedData *compressed_data_msg = (SpiceMsgCompressedData*)c->parser(in->data,
+                                                                                     in->data + *msg_size,
+                                                                                     *msg_type,
+                                                                                     c->peer_hdr.minor_version,
+                                                                                     &in->psize,
+                                                                                     &in->pfree);
+
+    if (compressed_data_msg == NULL) {
+        g_critical("failed to parse decompressed message: %s type %d",
+                   c->name, *msg_type);
+        return FALSE;
+    }
+    if (compressed_data_msg->uncompressed_size == 0) {
+        spice_warning("Invalid uncompressed_size");
+        return FALSE;
+    }
+
+    uncompressed_size = compressed_data_msg->uncompressed_size;
+    switch (compressed_data_msg->type) {
+#ifdef USE_LZ4
+    case SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4:
+        decompressed = in_stream_get_ptr(channel);
+        decompressed_size = LZ4_decompress_safe_continue (in_stream_get_stream(channel),
+                                                          (char*)compressed_data_msg->compressed_data,
+                                                          decompressed,
+                                                          compressed_data_msg->compressed_size,
+                                                          compressed_data_msg->uncompressed_size);
+        break;
+    case SPICE_DATA_COMPRESSION_TYPE_LZ4:
+        decompressed = g_malloc(compressed_data_msg->uncompressed_size);
+        decompressed_size = LZ4_decompress_safe((char*)compressed_data_msg->compressed_data,
+                                                decompressed,
+                                                compressed_data_msg->compressed_size,
+                                                compressed_data_msg->uncompressed_size);
+        break;
+#endif
+    default:
+        spice_warning("Unknown Compression Type");
+        return FALSE;
+    }
+    if (decompressed_size != uncompressed_size) {
+        spice_warning("Decompress Error decompressed_size=%d expected=%u",
+                      decompressed_size, uncompressed_size);
+        return FALSE;
+    }
+
+    if(decompressed_size > 0) { /* Decompression success */
+        free(in->data);
+        /* point in->data the data location (after the header) */
+        in->data = (uint8_t*)decompressed + spice_header_get_header_size(c->use_mini_header);
+#ifdef USE_LZ4
+        if(compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) {
+            in->stream_buf_in_use = TRUE;
+            in_stream_update(channel, decompressed_size);
+        } else { /* SPICE_DATA_COMPRESSION_TYPE_LZ4 */
+            in->header_offset = spice_header_get_header_size(c->use_mini_header);
+        }
+#endif
+        if(in->pfree != NULL)
+            in->pfree((uint8_t*)compressed_data_msg);
+        /* copy new header and update variables */
+        memcpy(in->header,decompressed, spice_header_get_header_size(c->use_mini_header));
+        in->dpos = *msg_size = spice_header_get_msg_size(in->header, c->use_mini_header);
+        *msg_type = spice_header_get_msg_type(in->header, c->use_mini_header);
+        return TRUE;
+    } else { /* Decompression fail */
+        spice_warning("Decompression Failed");
+        return FALSE;
+    }
+}
+
 /* coroutine context */
 G_GNUC_INTERNAL
 void spice_channel_recv_msg(SpiceChannel *channel,
@@ -1969,6 +2114,11 @@ void spice_channel_recv_msg(SpiceChannel *channel,
     in->dpos = msg_size;
 
     msg_type = spice_header_get_msg_type(in->header, c->use_mini_header);
+    if (msg_type == SPICE_MSG_COMPRESSED_DATA) {
+        if(!spice_channel_recv_compressed_msg(channel, in, &msg_size, &msg_type))
+            goto end;
+    }
+
     sub_list_offset = spice_header_get_msg_sub_list(in->header, c->use_mini_header);
 
     if (msg_type == SPICE_MSG_LIST || sub_list_offset) {
-- 
2.9.3



More information about the Spice-devel mailing list