[Spice-devel] [RFC server 1/3] Handle LZ4 compressed messages in rcc
Snir Sheriber
ssheribe at redhat.com
Thu Mar 2 16:53:24 UTC 2017
If LZ4 lib exists, handle LZ4 compressed & stream compressed
messages in any channel
In stream compression mode decompressed messages are being saved
sequentially in a pre-allocated buffer which will be utilized
by the decompression mechanism in the following decompressions
Update spice-common
---
server/red-channel-client.c | 128 ++++++++++++++++++++++++++++++++++++++++++++
spice-common | 2 +-
2 files changed, 129 insertions(+), 1 deletion(-)
diff --git a/server/red-channel-client.c b/server/red-channel-client.c
index cd4b64e..600a9f2 100644
--- a/server/red-channel-client.c
+++ b/server/red-channel-client.c
@@ -31,6 +31,9 @@
#ifdef HAVE_LINUX_SOCKIOS_H
#include <linux/sockios.h> /* SIOCOUTQ */
#endif
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
#include <common/generated_server_marshallers.h>
#include "red-channel-client.h"
@@ -99,6 +102,11 @@ typedef struct IncomingMessageBuffer {
uint32_t header_pos;
uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
uint32_t msg_pos;
+#ifdef USE_LZ4
+ LZ4_streamDecode_t *lz4StreamDecode;
+ char *stream_buf;
+ size_t stream_offset;
+#endif
} IncomingMessageBuffer;
struct RedChannelClientPrivate
@@ -209,6 +217,14 @@ enum ConnectivityState {
CONNECTIVITY_STATE_DISCONNECTED,
};
+#ifdef USE_LZ4
+enum {
+ COMPRESS_STREAM_MIN_THRESHOLD = 6,
+ COMPRESS_STREAM_MAX_MSG_SIZE = 1024 * 64, /* Maximum message size for LZ4 stream compression */
+ STREAM_BUF_SIZE = 1024 * 64 + COMPRESS_STREAM_MAX_MSG_SIZE + 8 /* Recommended buffer size according to the api */
+};
+#endif
+
typedef struct RedEmptyMsgPipeItem {
RedPipeItem base;
int msg;
@@ -365,6 +381,13 @@ red_channel_client_finalize(GObject *object)
reds_stream_free(self->priv->stream);
self->priv->stream = NULL;
+#ifdef USE_LZ4
+ if (self->priv->incoming.lz4StreamDecode) {
+ LZ4_freeStreamDecode(self->priv->incoming.lz4StreamDecode);
+ }
+
+ free(self->priv->incoming.stream_buf);
+#endif
if (self->priv->send_data.main.marshaller) {
spice_marshaller_destroy(self->priv->send_data.main.marshaller);
@@ -404,6 +427,11 @@ static void red_channel_client_constructed(GObject *object)
self->priv->is_mini_header = FALSE;
}
self->priv->incoming.header.data = self->priv->incoming.header_buf;
+#ifdef USE_LZ4
+ self->priv->incoming.lz4StreamDecode = LZ4_createStreamDecode();
+ self->priv->incoming.stream_buf = spice_malloc(STREAM_BUF_SIZE);
+ self->priv->incoming.stream_offset = 0;
+#endif
}
static void red_channel_client_class_init(RedChannelClientClass *klass)
@@ -1233,6 +1261,94 @@ static uint8_t *red_channel_client_parse(RedChannelClient *rcc, uint8_t *message
return parsed_message;
}
+#ifdef USE_LZ4
+static char* in_stream_get_ptr(IncomingMessageBuffer *buffer)
+{
+ if (buffer->stream_buf) {
+ return &buffer->stream_buf[buffer->stream_offset];
+ } else {
+ return NULL;
+ }
+}
+
+static void in_stream_update(IncomingMessageBuffer *buffer, int size)
+{
+ /* Add size to the stream buffer offset & reset if needed so that
+ * place for new message is always available */
+ buffer->stream_offset += size;
+ if (buffer->stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE) {
+ buffer->stream_offset = 0;
+ }
+}
+
+#endif
+
+static int red_channel_client_handle_compressed_msg(RedChannelClient *rcc,
+ SpiceMsgCompressedData *compressed_data_msg)
+{
+ IncomingMessageBuffer *buffer = &rcc->priv->incoming;
+ int decompressed_size;
+ uint32_t msg_size;
+ uint16_t msg_type;
+ char *decompressed = NULL;
+
+ switch (compressed_data_msg->type) {
+#ifdef USE_LZ4
+ case SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4: {
+ if (!(decompressed = in_stream_get_ptr(buffer))) {
+ return FALSE;
+ }
+ decompressed_size = LZ4_decompress_safe_continue(buffer->lz4StreamDecode,
+ (char *)compressed_data_msg->compressed_data,
+ decompressed,
+ compressed_data_msg->compressed_size,
+ compressed_data_msg->uncompressed_size);
+ break;
+ }
+ case SPICE_DATA_COMPRESSION_TYPE_LZ4: {
+ if (!(decompressed = spice_malloc(compressed_data_msg->uncompressed_size))) {
+ return FALSE;
+ }
+ decompressed_size = LZ4_decompress_safe((char *)compressed_data_msg->compressed_data,
+ decompressed,
+ compressed_data_msg->compressed_size,
+ compressed_data_msg->uncompressed_size);
+ break;
+ }
+#endif
+ default:
+ spice_warning("Invalid Compression Type");
+ return FALSE;
+ }
+ int ret;
+ if (decompressed_size > 0 && decompressed_size == compressed_data_msg->uncompressed_size) {
+ /* Lay out decompressed message into incoming buffer */
+ msg_size = buffer->header.get_msg_size(&buffer->header);
+ red_channel_client_release_msg_buf(rcc, SPICE_MSGC_COMPRESSED_DATA, msg_size, buffer->msg);
+ /* copy decompressed msg header to incoming */
+ memcpy(buffer->header.data, decompressed, buffer->header.header_size);
+ msg_size = buffer->header.get_msg_size(&buffer->header);
+ msg_type = buffer->header.get_msg_type(&buffer->header);
+ /* copy decompressed msg data to incoming buffer */
+ buffer->msg = red_channel_client_alloc_msg_buf(rcc, msg_type, msg_size);
+ memcpy(buffer->msg, decompressed + buffer->header.header_size, msg_size); /* TODO: Avoid that */
+#ifdef USE_LZ4
+ if (compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) {
+ in_stream_update(buffer, decompressed_size);
+ }
+#endif
+ ret = TRUE;
+ } else {
+ spice_warning("Decompression Failed");
+ ret = FALSE;
+ }
+
+ if (compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_LZ4) {
+ free(decompressed);
+ }
+ return ret;
+}
+
// TODO: this implementation, as opposed to the old implementation in red_worker,
// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
// arithmetic for the case where a single cb_read could return multiple messages. But
@@ -1304,6 +1420,7 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc)
}
}
+try_parse:
parsed = red_channel_client_parse(rcc,
buffer->msg, msg_size,
msg_type,
@@ -1317,6 +1434,17 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc)
red_channel_client_disconnect(rcc);
return;
}
+ if (msg_type == SPICE_MSGC_COMPRESSED_DATA) {
+ if (red_channel_client_handle_compressed_msg(rcc, (SpiceMsgCompressedData*)parsed)) {
+ msg_size = buffer->header.get_msg_size(&buffer->header);
+ msg_type = buffer->header.get_msg_type(&buffer->header);
+ if (parsed_free != NULL)
+ parsed_free(parsed);
+ goto try_parse; /* parse decopressed msg now */
+ } else {
+ spice_warning("Decompression failed");
+ }
+ }
ret_handle = klass->handle_message(rcc, msg_type,
parsed_size, parsed);
if (parsed_free != NULL) {
diff --git a/spice-common b/spice-common
index 6439bec..cde4a8b 160000
--- a/spice-common
+++ b/spice-common
@@ -1 +1 @@
-Subproject commit 6439bec0deed0fcf251d4d77514b7c4a87384097
+Subproject commit cde4a8bb67330006ee26e1c0d5892c824edce327
--
2.9.3
More information about the Spice-devel
mailing list