[Spice-devel] [RFC server 1/3] Handle LZ4 compressed messages in rcc

Snir Sheriber ssheribe at redhat.com
Thu Mar 2 16:53:24 UTC 2017


If LZ4 lib exists, handle LZ4 compressed & stream compressed
messages in any channel

In stream compression mode decompressed messages are being saved
sequentially in a pre-allocated buffer which will be utilized
by the decompression mechanism in the following decompressions

Update spice-common
---
 server/red-channel-client.c | 128 ++++++++++++++++++++++++++++++++++++++++++++
 spice-common                |   2 +-
 2 files changed, 129 insertions(+), 1 deletion(-)

diff --git a/server/red-channel-client.c b/server/red-channel-client.c
index cd4b64e..600a9f2 100644
--- a/server/red-channel-client.c
+++ b/server/red-channel-client.c
@@ -31,6 +31,9 @@
 #ifdef HAVE_LINUX_SOCKIOS_H
 #include <linux/sockios.h> /* SIOCOUTQ */
 #endif
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
 #include <common/generated_server_marshallers.h>
 
 #include "red-channel-client.h"
@@ -99,6 +102,11 @@ typedef struct IncomingMessageBuffer {
     uint32_t header_pos;
     uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
     uint32_t msg_pos;
+#ifdef USE_LZ4
+    LZ4_streamDecode_t *lz4StreamDecode;
+    char *stream_buf;
+    size_t stream_offset;
+#endif
 } IncomingMessageBuffer;
 
 struct RedChannelClientPrivate
@@ -209,6 +217,14 @@ enum ConnectivityState {
     CONNECTIVITY_STATE_DISCONNECTED,
 };
 
+#ifdef USE_LZ4
+enum {
+    COMPRESS_STREAM_MIN_THRESHOLD = 6,
+    COMPRESS_STREAM_MAX_MSG_SIZE = 1024 * 64, /* Maximum message size for LZ4 stream compression */
+    STREAM_BUF_SIZE = 1024 * 64 + COMPRESS_STREAM_MAX_MSG_SIZE + 8 /* Recommended buffer size according to the api */
+};
+#endif
+
 typedef struct RedEmptyMsgPipeItem {
     RedPipeItem base;
     int msg;
@@ -365,6 +381,13 @@ red_channel_client_finalize(GObject *object)
 
     reds_stream_free(self->priv->stream);
     self->priv->stream = NULL;
+#ifdef USE_LZ4
+    if (self->priv->incoming.lz4StreamDecode) {
+        LZ4_freeStreamDecode(self->priv->incoming.lz4StreamDecode);
+    }
+
+    free(self->priv->incoming.stream_buf);
+#endif
 
     if (self->priv->send_data.main.marshaller) {
         spice_marshaller_destroy(self->priv->send_data.main.marshaller);
@@ -404,6 +427,11 @@ static void red_channel_client_constructed(GObject *object)
         self->priv->is_mini_header = FALSE;
     }
     self->priv->incoming.header.data = self->priv->incoming.header_buf;
+#ifdef USE_LZ4
+    self->priv->incoming.lz4StreamDecode = LZ4_createStreamDecode();
+    self->priv->incoming.stream_buf = spice_malloc(STREAM_BUF_SIZE);
+    self->priv->incoming.stream_offset = 0;
+#endif
 }
 
 static void red_channel_client_class_init(RedChannelClientClass *klass)
@@ -1233,6 +1261,94 @@ static uint8_t *red_channel_client_parse(RedChannelClient *rcc, uint8_t *message
     return parsed_message;
 }
 
+#ifdef USE_LZ4
+static char* in_stream_get_ptr(IncomingMessageBuffer *buffer)
+{
+    if (buffer->stream_buf) {
+        return &buffer->stream_buf[buffer->stream_offset];
+    } else {
+        return NULL;
+    }
+}
+
+static void in_stream_update(IncomingMessageBuffer *buffer, int size)
+{
+    /* Add size to the stream buffer offset & reset if needed so that
+     * place for new message is always available */
+    buffer->stream_offset += size;
+    if (buffer->stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE) {
+        buffer->stream_offset = 0;
+    }
+}
+
+#endif
+
+static int red_channel_client_handle_compressed_msg(RedChannelClient *rcc,
+                                                    SpiceMsgCompressedData *compressed_data_msg)
+{
+    IncomingMessageBuffer *buffer = &rcc->priv->incoming;
+    int decompressed_size;
+    uint32_t msg_size;
+    uint16_t msg_type;
+    char *decompressed = NULL;
+
+    switch (compressed_data_msg->type) {
+#ifdef USE_LZ4
+    case SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4: {
+        if (!(decompressed = in_stream_get_ptr(buffer))) {
+            return FALSE;
+        }
+        decompressed_size = LZ4_decompress_safe_continue(buffer->lz4StreamDecode,
+                                                         (char *)compressed_data_msg->compressed_data,
+                                                         decompressed,
+                                                         compressed_data_msg->compressed_size,
+                                                         compressed_data_msg->uncompressed_size);
+        break;
+    }
+    case SPICE_DATA_COMPRESSION_TYPE_LZ4: {
+        if (!(decompressed = spice_malloc(compressed_data_msg->uncompressed_size))) {
+            return FALSE;
+        }
+        decompressed_size = LZ4_decompress_safe((char *)compressed_data_msg->compressed_data,
+                                                decompressed,
+                                                compressed_data_msg->compressed_size,
+                                                compressed_data_msg->uncompressed_size);
+        break;
+    }
+#endif
+    default:
+        spice_warning("Invalid Compression Type");
+        return FALSE;
+    }
+    int ret;
+    if (decompressed_size > 0 && decompressed_size == compressed_data_msg->uncompressed_size) {
+        /* Lay out decompressed message into incoming buffer */
+        msg_size = buffer->header.get_msg_size(&buffer->header);
+        red_channel_client_release_msg_buf(rcc, SPICE_MSGC_COMPRESSED_DATA, msg_size, buffer->msg);
+        /* copy decompressed msg header to incoming */
+        memcpy(buffer->header.data, decompressed, buffer->header.header_size);
+        msg_size = buffer->header.get_msg_size(&buffer->header);
+        msg_type = buffer->header.get_msg_type(&buffer->header);
+        /* copy decompressed msg data to incoming buffer */
+        buffer->msg = red_channel_client_alloc_msg_buf(rcc, msg_type, msg_size);
+        memcpy(buffer->msg, decompressed + buffer->header.header_size, msg_size); /* TODO: Avoid that */
+#ifdef USE_LZ4
+        if (compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) {
+            in_stream_update(buffer, decompressed_size);
+        }
+#endif
+        ret = TRUE;
+    } else {
+        spice_warning("Decompression Failed");
+        ret = FALSE;
+    }
+
+    if (compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_LZ4) {
+        free(decompressed);
+    }
+    return ret;
+}
+
 // TODO: this implementation, as opposed to the old implementation in red_worker,
 // does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
 // arithmetic for the case where a single cb_read could return multiple messages. But
@@ -1304,6 +1420,7 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc)
             }
         }
 
+try_parse:
         parsed = red_channel_client_parse(rcc,
                                           buffer->msg, msg_size,
                                           msg_type,
@@ -1317,6 +1434,17 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc)
             red_channel_client_disconnect(rcc);
             return;
         }
+        if (msg_type == SPICE_MSGC_COMPRESSED_DATA) {
+            if (red_channel_client_handle_compressed_msg(rcc, (SpiceMsgCompressedData*)parsed)) {
+                msg_size = buffer->header.get_msg_size(&buffer->header);
+                msg_type = buffer->header.get_msg_type(&buffer->header);
+                if (parsed_free != NULL)
+                    parsed_free(parsed);
+                goto try_parse; /* parse decopressed msg now */
+            } else {
+                spice_warning("Decompression failed");
+            }
+        }
         ret_handle = klass->handle_message(rcc, msg_type,
                                            parsed_size, parsed);
         if (parsed_free != NULL) {
diff --git a/spice-common b/spice-common
index 6439bec..cde4a8b 160000
--- a/spice-common
+++ b/spice-common
@@ -1 +1 @@
-Subproject commit 6439bec0deed0fcf251d4d77514b7c4a87384097
+Subproject commit cde4a8bb67330006ee26e1c0d5892c824edce327
-- 
2.9.3



More information about the Spice-devel mailing list