[Spice-devel] [server PATCH v4] LZ4 compression is now available at the Spicevmc channel
Victor Toso
lists at victortoso.com
Tue May 10 10:42:09 UTC 2016
Hi,
On Thu, May 05, 2016 at 02:43:34PM +0300, Snir Sheriber wrote:
> Compressed message type is CompressedData which contains compression
> type (1 byte) followed by the uncompressed data size (4 bytes) followed
> by the compressed data size (4 bytes) followed by the compressed data
>
> If SPICE_USBREDIR_CAP_DATA_COMPRESS_LZ4 capability is available &&
> data_size > COMPRESS_THRESHOLD data will be sent compressed otherwise
> data will be sent uncompressed (also if compression has failed)
It works and code looks good to me. I'm not 100% comfortable to ack this
on server so I'd wait for other's opinion that this seems fine.
Reviewed-by: Victor Toso <victortoso at redhat.com>
> ---
> server/spicevmc.c | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++----
> 1 file changed, 145 insertions(+), 10 deletions(-)
>
> diff --git a/server/spicevmc.c b/server/spicevmc.c
> index 54dfe4a..896eb3c 100644
> --- a/server/spicevmc.c
> +++ b/server/spicevmc.c
> @@ -34,6 +34,9 @@
> #include "red-channel.h"
> #include "reds.h"
> #include "migration-protocol.h"
> +#ifdef USE_LZ4
> +#include <lz4.h>
> +#endif
>
> /* todo: add flow control. i.e.,
> * (a) limit the tokens available for the client
> @@ -41,10 +44,13 @@
> */
> /* 64K should be enough for all but the largest writes + 32 bytes hdr */
> #define BUF_SIZE (64 * 1024 + 32)
> +#define COMPRESS_THRESHOLD 1000
>
> typedef struct RedVmcPipeItem {
> RedPipeItem base;
>
> + SpiceDataCompressionType type;
> + uint32_t uncompressed_data_size;
> /* writes which don't fit this will get split, this is not a problem */
> uint8_t buf[BUF_SIZE];
> uint32_t buf_used;
> @@ -105,6 +111,55 @@ enum {
> RED_PIPE_ITEM_TYPE_PORT_EVENT,
> };
>
> +static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
> + uint16_t type,
> + uint32_t size);
> +
> +static void spicevmc_red_channel_release_msg_rcv_buf(RedChannelClient *rcc,
> + uint16_t type,
> + uint32_t size,
> + uint8_t *msg);
> +
> +static RedVmcPipeItem* try_compress_lz4(SpiceVmcState *state, int n, RedVmcPipeItem *msg_item) {
> + RedVmcPipeItem *msg_item_compressed;
> + int bound, compressed_data_count;
> +
> + if (reds_stream_get_family(state->rcc->stream) == AF_UNIX) {
> + /* AF_LOCAL - data will not be compressed */
> + return NULL;
> + }
> + if (n <= COMPRESS_THRESHOLD) {
> + /* n <= threshold - data will not be compressed */
> + return NULL;
> + }
> + if (!red_channel_test_remote_cap(&state->channel, SPICE_SPICEVMC_CAP_DATA_COMPRESS_LZ4)) {
> + /* Client doesn't has compression cap - data will not be compressed */
> + return NULL;
> + }
> + bound = LZ4_compressBound(n);
> + if (bound == 0 || bound >= BUF_SIZE) {
> + /* bound is invalid - data will not be compressed */
> + return NULL;
> + }
> + msg_item_compressed = spice_new0(RedVmcPipeItem, 1);
> + red_pipe_item_init(&msg_item_compressed->base, RED_PIPE_ITEM_TYPE_SPICEVMC_DATA);
> + compressed_data_count = LZ4_compress_default((char*)&msg_item->buf,
> + (char*)&msg_item_compressed->buf,
> + n,
> + bound);
> +
> + if (compressed_data_count > 0) {
> + msg_item_compressed->type = SPICE_DATA_COMPRESSION_TYPE_LZ4;
> + msg_item_compressed->uncompressed_data_size = n;
> + msg_item_compressed->buf_used = compressed_data_count;
> + free(msg_item);
> + return msg_item_compressed;
> + }/* LZ4 compression failed-fallback a non-compressed data is to be sent */
> + spice_warning("Compress Error");
> + free(msg_item_compressed);
> + return NULL;
> +}
> +
> static RedPipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *sin,
> void *opaque)
> {
> @@ -121,6 +176,7 @@ static RedPipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *
>
> if (!state->pipe_item) {
> msg_item = spice_new0(RedVmcPipeItem, 1);
> + msg_item->type = SPICE_DATA_COMPRESSION_TYPE_NONE;
> red_pipe_item_init(&msg_item->base, RED_PIPE_ITEM_TYPE_SPICEVMC_DATA);
> } else {
> spice_assert(state->pipe_item->buf_used == 0);
> @@ -132,6 +188,15 @@ static RedPipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *
> sizeof(msg_item->buf));
> if (n > 0) {
> spice_debug("read from dev %d", n);
> +#ifdef USE_LZ4
> + RedVmcPipeItem *msg_item_compressed;
> +
> + msg_item_compressed = try_compress_lz4(state, n, msg_item);
> + if (msg_item_compressed != NULL) {
> + return (RedPipeItem *)msg_item_compressed;
> + }
> +#endif
> + msg_item->uncompressed_data_size = n;
> msg_item->buf_used = n;
> return (RedPipeItem *)msg_item;
> } else {
> @@ -278,11 +343,52 @@ static int spicevmc_channel_client_handle_migrate_data(RedChannelClient *rcc,
> return red_char_device_restore(state->chardev, &mig_data->base);
> }
>
> -static int spicevmc_red_channel_client_handle_message(RedChannelClient *rcc,
> - uint16_t type,
> +static int try_handle_compressed_msg(RedChannelClient *rcc, SpiceMsgCompressedData *compressed_data_msg) {
> + /*NOTE: *decompressed is free by the char-device */
> + SpiceVmcState *state;
> + uint32_t decompressed_size;
> + char *decompressed;
> +
> + state = spicevmc_red_channel_client_get_state(rcc);
> + decompressed = (char*)spicevmc_red_channel_alloc_msg_rcv_buf(rcc,SPICE_MSGC_SPICEVMC_DATA,
> + compressed_data_msg->uncompressed_size);
> + switch (compressed_data_msg->type) {
> +#ifdef USE_LZ4
> + case SPICE_DATA_COMPRESSION_TYPE_LZ4:
> + decompressed_size = LZ4_decompress_safe ((char*)compressed_data_msg->compressed_data,
> + decompressed,
> + compressed_data_msg->compressed_size,
> + compressed_data_msg->uncompressed_size);
> + break;
> +#endif
> + default:
> + spice_warning("Invalid Compression Type");
> + spicevmc_red_channel_release_msg_rcv_buf(rcc,SPICE_MSGC_SPICEVMC_DATA,
> + compressed_data_msg->uncompressed_size,
> + (uint8_t*)decompressed);
> + return FALSE;
> + }
> + if (decompressed_size != compressed_data_msg->uncompressed_size) {
> + spice_warning("Decompression Error");
> + spicevmc_red_channel_release_msg_rcv_buf(rcc, SPICE_MSGC_SPICEVMC_DATA,
> + compressed_data_msg->uncompressed_size,
> + (uint8_t*)decompressed);
> + return FALSE;
> + }
> + spice_assert(state->recv_from_client_buf->buf == (uint8_t*)decompressed);
> + state->recv_from_client_buf->buf_used = decompressed_size;
> + red_char_device_write_buffer_add(state->chardev, state->recv_from_client_buf);
> + state->recv_from_client_buf = NULL;
> + return TRUE;
> +
> +}
> +
> +static int spicevmc_red_channel_client_handle_message_parsed(RedChannelClient *rcc,
> uint32_t size,
> - uint8_t *msg)
> -{
> + uint16_t type,
> + void *msg)
> +{ /*NOTE: *msg free by free() (when cb to spicevmc_red_channel_release_msg_rcv_buf
> + *with the compressed msg type)*/
> SpiceVmcState *state;
> SpiceCharDeviceInterface *sif;
>
> @@ -296,16 +402,22 @@ static int spicevmc_red_channel_client_handle_message(RedChannelClient *rcc,
> red_char_device_write_buffer_add(state->chardev, state->recv_from_client_buf);
> state->recv_from_client_buf = NULL;
> break;
> + case SPICE_MSGC_SPICEVMC_COMPRESSED_DATA: {
> + if (!try_handle_compressed_msg(rcc, (SpiceMsgCompressedData*)msg)) {
> + return FALSE;
> + }
> + break;
> + }
> case SPICE_MSGC_PORT_EVENT:
> if (size != sizeof(uint8_t)) {
> spice_warning("bad port event message size");
> return FALSE;
> }
> if (sif->base.minor_version >= 2 && sif->event != NULL)
> - sif->event(state->chardev_sin, *msg);
> + sif->event(state->chardev_sin, *(uint8_t*)msg);
> break;
> default:
> - return red_channel_client_handle_message(rcc, size, type, msg);
> + return red_channel_client_handle_message(rcc, size, type, (uint8_t*)msg);
> }
>
> return TRUE;
> @@ -371,8 +483,27 @@ static void spicevmc_red_channel_send_data(RedChannelClient *rcc,
> {
> RedVmcPipeItem *i = SPICE_CONTAINEROF(item, RedVmcPipeItem, base);
>
> - red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_DATA, item);
> - spice_marshaller_add_ref(m, i->buf, i->buf_used);
> + switch (i->type){
> + case SPICE_DATA_COMPRESSION_TYPE_NONE:
> + red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_DATA, item);
> + spice_marshaller_add_ref(m, i->buf, i->buf_used);
> + break;
> + case SPICE_DATA_COMPRESSION_TYPE_LZ4: {
> + SpiceMsgCompressedData compressed_msg;
> +
> + red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_COMPRESSED_DATA, item);
> + compressed_msg.type = SPICE_DATA_COMPRESSION_TYPE_LZ4;
> + compressed_msg.uncompressed_size = i->uncompressed_data_size;
> + compressed_msg.compressed_size = i->buf_used;
> +
> + spice_marshall_SpiceMsgCompressedData(m, &compressed_msg);
> + spice_marshaller_add_ref(m, i->buf, i->buf_used);
> + break;
> + }
> + default:
> + g_assert_not_reached();
> + }
> +
> }
>
> static void spicevmc_red_channel_send_migrate_data(RedChannelClient *rcc,
> @@ -519,16 +650,20 @@ RedCharDevice *spicevmc_device_connect(RedsState *reds,
> channel_cbs.handle_migrate_flush_mark = spicevmc_channel_client_handle_migrate_flush_mark;
> channel_cbs.handle_migrate_data = spicevmc_channel_client_handle_migrate_data;
>
> - state = (SpiceVmcState*)red_channel_create(sizeof(SpiceVmcState), reds,
> + state = (SpiceVmcState*)red_channel_create_parser(sizeof(SpiceVmcState), reds,
> reds_get_core_interface(reds), channel_type, id[channel_type]++,
> FALSE /* handle_acks */,
> - spicevmc_red_channel_client_handle_message,
> + spice_get_client_channel_parser(SPICE_CHANNEL_USBREDIR, NULL),
> + spicevmc_red_channel_client_handle_message_parsed,
> &channel_cbs,
> SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER);
> red_channel_init_outgoing_messages_window(&state->channel);
>
> client_cbs.connect = spicevmc_connect;
> red_channel_register_client_cbs(&state->channel, &client_cbs, NULL);
> +#ifdef USE_LZ4
> + red_channel_set_cap(&state->channel, SPICE_SPICEVMC_CAP_DATA_COMPRESS_LZ4);
> +#endif
>
> state->chardev = red_char_device_spicevmc_new(sin, reds, state);
> state->chardev_sin = sin;
> --
> 2.5.5
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel
More information about the Spice-devel
mailing list