[Spice-devel] [PATCH spice-server v3 2/2] stream-channel: Send the full frame in a single message
Lukáš Hrázký
lhrazky at redhat.com
Wed May 9 08:58:57 UTC 2018
On Tue, 2018-05-08 at 10:26 +0100, Frediano Ziglio wrote:
> The current implementation of server and client assumes that a single
> data message contains an encoded frame.
> This is not a problem for most encoding but for MJPEG this causes
> the client to fail decoding.
> Collapse frame data into a single message before sending to the client.
> This is done in the channel as the channel code is responsible to take care
> of client protocol details. This allows for instance to support chunked
> transfer to client if implemented.
So the proper fix if we didn't care about compatibility with old/other
clients would be to implement the "chunked transfer" in the client,
right? But we don't want to break the compatibility. So it is basically
a property of the protocol that a frame must be sent in a single
message.
Why not also send it in a single message from the streaming agent to
the server, so that the whole message can be forwarded and the messages
are 1:1? Instead of adding this accumulation code and having to copy
the frame in the process?
If I don't understand this properly, please excuse my ignorace...
Cheers,
Lukas
> Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> ---
> server/red-stream-device.c | 10 +++--
> server/stream-channel.c | 83 +++++++++++++++++++++++++++++++++-----
> server/stream-channel.h | 12 ++++++
> 3 files changed, 90 insertions(+), 15 deletions(-)
>
> diff --git a/server/red-stream-device.c b/server/red-stream-device.c
> index df6a366f..864be99e 100644
> --- a/server/red-stream-device.c
> +++ b/server/red-stream-device.c
> @@ -314,11 +314,13 @@ handle_msg_data(StreamDevice *dev, SpiceCharDeviceInstance *sin)
> if (n <= 0) {
> break;
> }
> - // TODO collect all message ??
> - // up: we send a single frame together
> - // down: guest can cause a crash
> - stream_channel_send_data(dev->stream_channel, buf, n, reds_get_mm_time());
> + uint32_t mm_time = reds_get_mm_time();
> + if (dev->msg_pos == 0) {
> + stream_channel_start_data(dev->stream_channel, dev->hdr.size, mm_time);
> + }
> + stream_channel_send_data(dev->stream_channel, buf, n, mm_time);
> dev->hdr.size -= n;
> + dev->msg_pos += n;
> }
>
> return dev->hdr.size == 0;
> diff --git a/server/stream-channel.c b/server/stream-channel.c
> index fc409ee6..0ac7ba1d 100644
> --- a/server/stream-channel.c
> +++ b/server/stream-channel.c
> @@ -44,6 +44,7 @@
>
> typedef struct StreamChannelClient StreamChannelClient;
> typedef struct StreamChannelClientClass StreamChannelClientClass;
> +typedef struct StreamDataItem StreamDataItem;
>
> /* we need to inherit from CommonGraphicsChannelClient
> * to get buffer handling */
> @@ -74,6 +75,10 @@ struct StreamChannel {
>
> StreamQueueStat queue_stat;
>
> + /* pending partial data item */
> + StreamDataItem *data_item;
> + uint32_t data_item_pos;
> +
> /* callback to notify when a stream should be started or stopped */
> stream_channel_start_proc start_cb;
> void *start_opaque;
> @@ -105,12 +110,12 @@ typedef struct StreamCreateItem {
> SpiceMsgDisplayStreamCreate stream_create;
> } StreamCreateItem;
>
> -typedef struct StreamDataItem {
> +struct StreamDataItem {
> RedPipeItem base;
> StreamChannel *channel;
> // NOTE: this must be the last field in the structure
> SpiceMsgDisplayStreamData data;
> -} StreamDataItem;
> +};
>
> #define PRIMARY_SURFACE_ID 0
>
> @@ -130,6 +135,16 @@ stream_channel_client_init(StreamChannelClient *client)
> client->stream_id = -1;
> }
>
> +static void
> +stream_channel_unref_data_item(StreamChannel *channel)
> +{
> + if (channel->data_item) {
> + red_pipe_item_unref(&channel->data_item->base);
> + channel->data_item = NULL;
> + channel->data_item_pos = 0;
> + }
> +}
> +
> static void
> request_new_stream(StreamChannel *channel, StreamMsgStartStop *start)
> {
> @@ -153,6 +168,7 @@ stream_channel_client_on_disconnect(RedChannelClient *rcc)
> channel->stream_id = -1;
> channel->width = 0;
> channel->height = 0;
> + stream_channel_unref_data_item(channel);
>
> // send stream stop to device
> StreamMsgStartStop stop = { 0, };
> @@ -452,6 +468,16 @@ stream_channel_constructed(GObject *object)
> reds_register_channel(reds, red_channel);
> }
>
> +static void
> +stream_channel_finalize(GObject *object)
> +{
> + StreamChannel *channel = STREAM_CHANNEL(object);
> +
> + stream_channel_unref_data_item(channel);
> +
> + G_OBJECT_CLASS(stream_channel_parent_class)->finalize(object);
> +}
> +
> static void
> stream_channel_class_init(StreamChannelClass *klass)
> {
> @@ -459,6 +485,7 @@ stream_channel_class_init(StreamChannelClass *klass)
> RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass);
>
> object_class->constructed = stream_channel_constructed;
> + object_class->finalize = stream_channel_finalize;
>
> channel_class->parser = spice_get_client_channel_parser(SPICE_CHANNEL_DISPLAY, NULL);
> channel_class->handle_message = handle_message;
> @@ -532,14 +559,18 @@ data_item_free(RedPipeItem *base)
> {
> StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
>
> - stream_channel_update_queue_stat(pipe_item->channel, -1, -pipe_item->data.data_size);
> + if (pipe_item->channel->data_item != pipe_item) {
> + stream_channel_update_queue_stat(pipe_item->channel, -1, -pipe_item->data.data_size);
> + }
>
> g_free(pipe_item);
> }
>
> -static StreamDataItem*
> -stream_data_item_new(StreamChannel *channel, size_t size, uint32_t mm_time)
> +static void
> +stream_channel_init_data_item(StreamChannel *channel, size_t size, uint32_t mm_time)
> {
> + stream_channel_unref_data_item(channel);
> +
> StreamDataItem *item = g_malloc(sizeof(*item) + size);
> red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
> data_item_free);
> @@ -548,7 +579,22 @@ stream_data_item_new(StreamChannel *channel, size_t size, uint32_t mm_time)
> item->data.data_size = size;
> item->channel = channel;
>
> - return item;
> + channel->data_item = item;
> + channel->data_item_pos = 0;
> +}
> +
> +void
> +stream_channel_start_data(StreamChannel *channel, size_t size, uint32_t mm_time)
> +{
> + // see stream_channel_send_data comment
> + if (channel->stream_id < 0) {
> + return;
> + }
> +
> + // TODO this collects all chunks in a single message
> + // up: we send a single frame together (more compatible)
> + // down: guest can cause a crash due to DoS. As a safe measure we limit the maximum message
> + stream_channel_init_data_item(channel, MIN(size, 32*1024*1024), mm_time);
> }
>
> void
> @@ -563,11 +609,25 @@ stream_channel_send_data(StreamChannel *channel, const void *data, size_t size,
>
> RedChannel *red_channel = RED_CHANNEL(channel);
>
> - StreamDataItem *item = stream_data_item_new(channel, size, mm_time);
> - stream_channel_update_queue_stat(channel, 1, size);
> - // TODO try to optimize avoiding the copy
> - memcpy(item->data.data, data, size);
> - red_channel_pipes_add(red_channel, &item->base);
> + while (size) {
> + if (channel->data_item == NULL) {
> + stream_channel_init_data_item(channel, size, mm_time);
> + }
> +
> + StreamDataItem *item = channel->data_item;
> +
> + size_t copy_size = item->data.data_size - channel->data_item_pos;
> + copy_size = MIN(copy_size, size);
> + // TODO try to optimize avoiding the copy
> + memcpy(item->data.data + channel->data_item_pos, data, copy_size);
> + size -= copy_size;
> + channel->data_item_pos += copy_size;
> + if (channel->data_item_pos == item->data.data_size) {
> + channel->data_item = NULL;
> + stream_channel_update_queue_stat(channel, 1, item->data.data_size);
> + red_channel_pipes_add(red_channel, &item->base);
> + }
> + }
> }
>
> void
> @@ -607,6 +667,7 @@ stream_channel_reset(StreamChannel *channel)
> channel->stream_id = -1;
> channel->width = 0;
> channel->height = 0;
> + stream_channel_unref_data_item(channel);
>
> if (!red_channel_is_connected(red_channel)) {
> return;
> diff --git a/server/stream-channel.h b/server/stream-channel.h
> index e8bec80b..18a1bdea 100644
> --- a/server/stream-channel.h
> +++ b/server/stream-channel.h
> @@ -60,6 +60,18 @@ struct StreamMsgStartStop;
>
> void stream_channel_change_format(StreamChannel *channel,
> const struct StreamMsgFormat *fmt);
> +
> +/**
> + * Tell the channel that a new data packet is starting.
> + * This can be used to group all chunks together.
> + */
> +void stream_channel_start_data(StreamChannel *channel,
> + size_t size,
> + uint32_t mm_time);
> +
> +/**
> + * Send to channel a chunk of data.
> + */
> void stream_channel_send_data(StreamChannel *channel,
> const void *data, size_t size,
> uint32_t mm_time);
More information about the Spice-devel
mailing list