[Spice-devel] [RFC spice-gtk 2/3] Sending LZ4 compressed msgs over selected channels

Snir Sheriber ssheribe at redhat.com
Thu Mar 2 16:53:28 UTC 2017


If channel number is included in the CHANNEL_COMPRESS environment
variable, LZ4 compression will be applied on the channel messages
if possible.

First LZ4 stream compression is trying to be applied, if message
size is too large, regular LZ4 compression is applied, in stream
compression mode messages are being saved sequentially in pre-allocated
buffer which will be utilized by the compression mechanism in the
following compressions
---
 src/spice-channel-priv.h |   4 ++
 src/spice-channel.c      | 163 ++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 165 insertions(+), 2 deletions(-)

diff --git a/src/spice-channel-priv.h b/src/spice-channel-priv.h
index 2eaf816..6315626 100644
--- a/src/spice-channel-priv.h
+++ b/src/spice-channel-priv.h
@@ -110,6 +110,10 @@ struct _SpiceChannelPrivate {
     LZ4_streamDecode_t          *lz4_in_stream;
     char                        *in_stream_buf;
     size_t                      in_stream_offset;
+
+    LZ4_stream_t                *lz4_out_stream;
+    char                        *out_stream_buf;
+    size_t                      out_stream_offset;
 #endif
 
     /* not swapped */
diff --git a/src/spice-channel.c b/src/spice-channel.c
index 5f07bd6..8e83f70 100644
--- a/src/spice-channel.c
+++ b/src/spice-channel.c
@@ -143,6 +143,39 @@ static void spice_channel_init(SpiceChannel *channel)
     g_mutex_init(&c->xmit_queue_lock);
 }
 
+#ifdef USE_LZ4
+static bool spice_channel_asked_for_compression(SpiceChannel *channel)
+{
+    /*
+     * Check if current channel was included by the user in
+     * the COMPRESS_CHANNELS environment variable. The format
+     * to ask for specific channel compression is as follows:
+     * COMPRESS_CHANNELS=<channel number>,<channel number>,<...
+     */
+    const char* pChannels;
+
+    pChannels = g_getenv("COMPRESS_CHANNELS");
+    if (pChannels != NULL) {
+        SpiceChannelPrivate *c;
+        char **result = NULL;
+        int i = 0;
+
+        c = channel->priv = SPICE_CHANNEL_GET_PRIVATE(channel);
+        result = g_strsplit_set(pChannels,",.:;-",-1);
+
+        while (result[i] != NULL) {
+            if (c->channel_type == atoi(result[i])) {
+                g_strfreev(result);
+                return true;
+            }
+            i++;
+        }
+        g_strfreev(result);
+    }
+    return false;
+}
+#endif
+
 static void spice_channel_constructed(GObject *gobject)
 {
     SpiceChannel *channel = SPICE_CHANNEL(gobject);
@@ -161,6 +194,13 @@ static void spice_channel_constructed(GObject *gobject)
     c->lz4_in_stream = LZ4_createStreamDecode();
     c->in_stream_buf = g_malloc(STREAM_BUF_SIZE);
     c->in_stream_offset = 0;
+
+    if(spice_channel_asked_for_compression(channel) && g_socket_get_family(c->sock) != G_SOCKET_FAMILY_UNIX) {
+        spice_channel_set_common_capability(channel, SPICE_COMMON_CAP_LZ4_COMPRESSION);
+        c->lz4_out_stream = LZ4_createStream();
+        c->out_stream_buf = g_malloc(STREAM_BUF_SIZE);
+        c->out_stream_offset = 0;
+    }
 #endif
     spice_session_channel_new(c->session, channel);
 
@@ -203,6 +243,11 @@ static void spice_channel_finalize(GObject *gobject)
         LZ4_freeStreamDecode(c->lz4_in_stream);
 
     g_free(c->in_stream_buf);
+
+    if(c->lz4_out_stream)
+        LZ4_freeStream(c->lz4_out_stream);
+
+    g_free(c->out_stream_buf);
 #endif
 
     if (c->caps)
@@ -902,6 +947,113 @@ static void spice_channel_write(SpiceChannel *channel, const void *data, size_t
         spice_channel_flush_wire(channel, data, len);
 }
 
+#ifdef USE_LZ4
+static char* out_stream_get_ptr(SpiceChannel *channel) {
+
+    SpiceChannelPrivate *c;
+
+    c = SPICE_CHANNEL(channel)->priv;
+
+    return &c->out_stream_buf[c->out_stream_offset];
+
+}
+
+static void out_stream_update(SpiceChannel *channel, int size)
+{
+    SpiceChannelPrivate *c;
+
+    c = SPICE_CHANNEL(channel)->priv;
+    /* Add size to the stream buffer offset & reset if needed
+     * so that place for new message is always available */
+    c->out_stream_offset += size;
+    if (c->out_stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE)
+        c->out_stream_offset = 0;
+}
+
+static LZ4_stream_t* out_stream_get_stream(SpiceChannel *channel)
+{
+    SpiceChannelPrivate *c;
+
+    c = SPICE_CHANNEL(channel)->priv;
+
+    return c->lz4_out_stream;
+}
+
+static int spice_channel_try_write_compressed_msg(SpiceChannel *channel, uint8_t *data, int count)
+{
+    SpiceMsgOut *msg_out_compressed;
+    int bound, compressed_data_count;
+    uint8_t type;
+    char *compressed_buf;
+
+    if (out_stream_get_stream(channel) == NULL) {
+        /* Allocation failed or capability is disabled */
+        return FALSE;
+    }
+    if (!spice_channel_test_common_capability(channel, SPICE_COMMON_CAP_LZ4_COMPRESSION)) {
+        /* Server doesn't support stream compression in this channel */
+        return FALSE;
+    }
+    if (count < COMPRESS_STREAM_MIN_THRESHOLD) {
+        /* Not worth compression */
+        return FALSE;
+    }
+    if ((bound = LZ4_compressBound(count)) == 0) {
+        /* Invalid bound - data will not be compressed */
+        return FALSE;
+    }
+
+    compressed_buf = g_malloc(bound);
+
+    if (count <= COMPRESS_STREAM_MAX_MSG_SIZE) { /* Try to compress in stream mode */
+        LZ4_stream_t *lz4Stream = out_stream_get_stream(channel);
+        char *cur_msg = out_stream_get_ptr(channel);
+
+        memcpy(cur_msg, data, count);/* TODO Avoid that */
+        compressed_data_count = LZ4_compress_fast_continue(lz4Stream,
+                                                           cur_msg,
+                                                           compressed_buf,
+                                                           count,
+                                                           bound,
+                                                           1);
+        type = SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4;
+    } else { /* Try to compress regular lz4 */
+        compressed_data_count = LZ4_compress_default((char*)data,
+                                                     compressed_buf,
+                                                     count,
+                                                     bound);
+        type = SPICE_DATA_COMPRESSION_TYPE_LZ4;
+    }
+
+    if (compressed_data_count > 0) { /* Compression Succeeded - send it */
+        SpiceMsgCompressedData compressed_data_msg = {
+            .type = type,
+            .uncompressed_size = count
+        };
+
+        compressed_data_msg.compressed_data = (uint8_t*)compressed_buf;
+        msg_out_compressed = spice_msg_out_new(SPICE_CHANNEL(channel),
+                                               SPICE_MSGC_COMPRESSED_DATA);
+        msg_out_compressed->marshallers->msg_SpiceMsgCompressedData(msg_out_compressed->marshaller,
+                                                                    &compressed_data_msg);
+        spice_marshaller_add_by_ref_full(msg_out_compressed->marshaller,
+                                         compressed_data_msg.compressed_data,
+                                         compressed_data_count,
+                                         (spice_marshaller_item_free_func)g_free,
+                                         channel);
+
+        if (type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4)
+            out_stream_update(channel, count);
+
+        spice_channel_write_msg(channel, msg_out_compressed);
+        return TRUE;
+    }
+    /* if not - free & fallback to sending the message uncompressed */
+    g_free(compressed_buf);
+    return FALSE;
+}
+#endif
+
 /* coroutine context */
 static void spice_channel_write_msg(SpiceChannel *channel, SpiceMsgOut *out)
 {
@@ -925,8 +1077,15 @@ static void spice_channel_write_msg(SpiceChannel *channel, SpiceMsgOut *out)
                spice_header_get_header_size(channel->priv->use_mini_header);
     spice_header_set_msg_size(out->header, channel->priv->use_mini_header, msg_size);
     data = spice_marshaller_linearize(out->marshaller, 0, &len, &free_data);
-    /* spice_msg_out_hexdump(out, data, len); */
-    spice_channel_write(channel, data, len);
+#ifdef USE_LZ4
+    uint16_t msg_type = spice_header_get_msg_type(out->header, channel->priv->use_mini_header);
+    if (msg_type == SPICE_MSGC_COMPRESSED_DATA ||
+        !spice_channel_try_write_compressed_msg(channel,data,len))
+#endif
+    {
+        /* spice_msg_out_hexdump(out, data, len); */
+        spice_channel_write(channel, data, len);
+    }
 
     if (free_data)
         g_free(data);
-- 
2.9.3



More information about the Spice-devel mailing list