[Spice-devel] [PATCH spice-server v3 2/2] stream-channel: Send the full frame in a single message

Lukáš Hrázký lhrazky at redhat.com
Wed May 9 11:20:19 UTC 2018


On Wed, 2018-05-09 at 05:18 -0400, Frediano Ziglio wrote:
> > 
> > On Tue, 2018-05-08 at 10:26 +0100, Frediano Ziglio wrote:
> > > The current implementation of server and client assumes that a single
> > > data message contains an encoded frame.
> > > This is not a problem for most encoding but for MJPEG this causes
> > > the client to fail decoding.
> > > Collapse frame data into a single message before sending to the client.
> > > This is done in the channel as the channel code is responsible to take care
> > > of client protocol details. This allows for instance to support chunked
> > > transfer to client if implemented.
> > 
> > So the proper fix if we didn't care about compatibility with old/other
> > clients would be to implement the "chunked transfer" in the client,
> > right? But we don't want to break the compatibility. So it is basically
> > a property of the protocol that a frame must be sent in a single
> > message.
> > 
> 
> Yes, there are different assumptions in the code about this, not clear if
> this was due by MJPEG encoding (first and only encoding for long time), or
> something by design, the documentation on the protocol is basically
> nothing (just the "stream_data" message with no description at all).

Ok, so perhaps defining this (using the limitations of the current
implementation) should be part of the series?

> > Why not also send it in a single message from the streaming agent to
> > the server, so that the whole message can be forwarded and the messages
> > are 1:1? Instead of adding this accumulation code and having to copy
> > the frame in the process?
> > 
> 
> The agent send a single message, but reads/writes to the device are
> not atomic. Note that the current protocol introduce additional
> delays as the frames cannot be partially decoded but must wait for the
> full message (maybe the client can change its read code to handle this,
> at the moment it does nothing about, on the server is less of a problem
> as the message is build quickly from device to memory so not much delay
> is added).

This is the part I don't understand... AFAICS, you read the whole
message in red-stream-device.c:handle_msg_data(). That should be the
whole frame? Then you send the whole frame with
stream_channel_send_data(). So it should never be partial?

Also see below.

> > If I don't understand this properly, please excuse my ignorace...
> > 
> > Cheers,
> > Lukas
> > 
> > > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > > ---
> > >  server/red-stream-device.c | 10 +++--
> > >  server/stream-channel.c    | 83 +++++++++++++++++++++++++++++++++-----
> > >  server/stream-channel.h    | 12 ++++++
> > >  3 files changed, 90 insertions(+), 15 deletions(-)
> > > 
> > > diff --git a/server/red-stream-device.c b/server/red-stream-device.c
> > > index df6a366f..864be99e 100644
> > > --- a/server/red-stream-device.c
> > > +++ b/server/red-stream-device.c
> > > @@ -314,11 +314,13 @@ handle_msg_data(StreamDevice *dev,
> > > SpiceCharDeviceInstance *sin)
> > >          if (n <= 0) {
> > >              break;
> > >          }
> > > -        // TODO collect all message ??
> > > -        // up: we send a single frame together
> > > -        // down: guest can cause a crash
> > > -        stream_channel_send_data(dev->stream_channel, buf, n,
> > > reds_get_mm_time());
> > > +        uint32_t mm_time = reds_get_mm_time();
> > > +        if (dev->msg_pos == 0) {
> > > +            stream_channel_start_data(dev->stream_channel, dev->hdr.size,
> > > mm_time);
> > > +        }
> > > +        stream_channel_send_data(dev->stream_channel, buf, n, mm_time);
> > >          dev->hdr.size -= n;
> > > +        dev->msg_pos += n;
> > >      }
> > >  
> > >      return dev->hdr.size == 0;
> > > diff --git a/server/stream-channel.c b/server/stream-channel.c
> > > index fc409ee6..0ac7ba1d 100644
> > > --- a/server/stream-channel.c
> > > +++ b/server/stream-channel.c
> > > @@ -44,6 +44,7 @@
> > >  
> > >  typedef struct StreamChannelClient StreamChannelClient;
> > >  typedef struct StreamChannelClientClass StreamChannelClientClass;
> > > +typedef struct StreamDataItem StreamDataItem;
> > >  
> > >  /* we need to inherit from CommonGraphicsChannelClient
> > >   * to get buffer handling */
> > > @@ -74,6 +75,10 @@ struct StreamChannel {
> > >  
> > >      StreamQueueStat queue_stat;
> > >  
> > > +    /* pending partial data item */
> > > +    StreamDataItem *data_item;
> > > +    uint32_t data_item_pos;
> > > +
> > >      /* callback to notify when a stream should be started or stopped */
> > >      stream_channel_start_proc start_cb;
> > >      void *start_opaque;
> > > @@ -105,12 +110,12 @@ typedef struct StreamCreateItem {
> > >      SpiceMsgDisplayStreamCreate stream_create;
> > >  } StreamCreateItem;
> > >  
> > > -typedef struct StreamDataItem {
> > > +struct StreamDataItem {
> > >      RedPipeItem base;
> > >      StreamChannel *channel;
> > >      // NOTE: this must be the last field in the structure
> > >      SpiceMsgDisplayStreamData data;
> > > -} StreamDataItem;
> > > +};
> > >  
> > >  #define PRIMARY_SURFACE_ID 0
> > >  
> > > @@ -130,6 +135,16 @@ stream_channel_client_init(StreamChannelClient
> > > *client)
> > >      client->stream_id = -1;
> > >  }
> > >  
> > > +static void
> > > +stream_channel_unref_data_item(StreamChannel *channel)
> > > +{
> > > +    if (channel->data_item) {
> > > +        red_pipe_item_unref(&channel->data_item->base);
> > > +        channel->data_item = NULL;
> > > +        channel->data_item_pos = 0;
> > > +    }
> > > +}
> > > +
> > >  static void
> > >  request_new_stream(StreamChannel *channel, StreamMsgStartStop *start)
> > >  {
> > > @@ -153,6 +168,7 @@ stream_channel_client_on_disconnect(RedChannelClient
> > > *rcc)
> > >      channel->stream_id = -1;
> > >      channel->width = 0;
> > >      channel->height = 0;
> > > +    stream_channel_unref_data_item(channel);
> > >  
> > >      // send stream stop to device
> > >      StreamMsgStartStop stop = { 0, };
> > > @@ -452,6 +468,16 @@ stream_channel_constructed(GObject *object)
> > >      reds_register_channel(reds, red_channel);
> > >  }
> > >  
> > > +static void
> > > +stream_channel_finalize(GObject *object)
> > > +{
> > > +    StreamChannel *channel = STREAM_CHANNEL(object);
> > > +
> > > +    stream_channel_unref_data_item(channel);
> > > +
> > > +    G_OBJECT_CLASS(stream_channel_parent_class)->finalize(object);
> > > +}
> > > +
> > >  static void
> > >  stream_channel_class_init(StreamChannelClass *klass)
> > >  {
> > > @@ -459,6 +485,7 @@ stream_channel_class_init(StreamChannelClass *klass)
> > >      RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass);
> > >  
> > >      object_class->constructed = stream_channel_constructed;
> > > +    object_class->finalize = stream_channel_finalize;
> > >  
> > >      channel_class->parser =
> > >      spice_get_client_channel_parser(SPICE_CHANNEL_DISPLAY, NULL);
> > >      channel_class->handle_message = handle_message;
> > > @@ -532,14 +559,18 @@ data_item_free(RedPipeItem *base)
> > >  {
> > >      StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
> > >  
> > > -    stream_channel_update_queue_stat(pipe_item->channel, -1,
> > > -pipe_item->data.data_size);
> > > +    if (pipe_item->channel->data_item != pipe_item) {
> > > +        stream_channel_update_queue_stat(pipe_item->channel, -1,
> > > -pipe_item->data.data_size);
> > > +    }
> > >  
> > >      g_free(pipe_item);
> > >  }
> > >  
> > > -static StreamDataItem*
> > > -stream_data_item_new(StreamChannel *channel, size_t size, uint32_t
> > > mm_time)
> > > +static void
> > > +stream_channel_init_data_item(StreamChannel *channel, size_t size,
> > > uint32_t mm_time)
> > >  {
> > > +    stream_channel_unref_data_item(channel);
> > > +
> > >      StreamDataItem *item = g_malloc(sizeof(*item) + size);
> > >      red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
> > >                              data_item_free);
> > > @@ -548,7 +579,22 @@ stream_data_item_new(StreamChannel *channel, size_t
> > > size, uint32_t mm_time)
> > >      item->data.data_size = size;
> > >      item->channel = channel;
> > >  
> > > -    return item;
> > > +    channel->data_item = item;
> > > +    channel->data_item_pos = 0;
> > > +}
> > > +
> > > +void
> > > +stream_channel_start_data(StreamChannel *channel, size_t size, uint32_t
> > > mm_time)
> > > +{
> > > +    // see stream_channel_send_data comment
> > > +    if (channel->stream_id < 0) {
> > > +        return;
> > > +    }
> > > +
> > > +    // TODO this collects all chunks in a single message
> > > +    // up: we send a single frame together (more compatible)
> > > +    // down: guest can cause a crash due to DoS. As a safe measure we
> > > limit the maximum message
> > > +    stream_channel_init_data_item(channel, MIN(size, 32*1024*1024),
> > > mm_time);
> > >  }
> > >  
> > >  void
> > > @@ -563,11 +609,25 @@ stream_channel_send_data(StreamChannel *channel,
> > > const void *data, size_t size,
> > >  
> > >      RedChannel *red_channel = RED_CHANNEL(channel);
> > >  
> > > -    StreamDataItem *item = stream_data_item_new(channel, size, mm_time);
> > > -    stream_channel_update_queue_stat(channel, 1, size);
> > > -    // TODO try to optimize avoiding the copy
> > > -    memcpy(item->data.data, data, size);
> > > -    red_channel_pipes_add(red_channel, &item->base);
> > > +    while (size) {
> > > +        if (channel->data_item == NULL) {
> > > +            stream_channel_init_data_item(channel, size, mm_time);
> > > +        }
> > > +
> > > +        StreamDataItem *item = channel->data_item;
> > > +
> > > +        size_t copy_size = item->data.data_size - channel->data_item_pos;
> > > +        copy_size = MIN(copy_size, size);
> > > +        // TODO try to optimize avoiding the copy
> > > +        memcpy(item->data.data + channel->data_item_pos, data, copy_size);
> > > +        size -= copy_size;
> > > +        channel->data_item_pos += copy_size;
> > > +        if (channel->data_item_pos == item->data.data_size) {
> > > +            channel->data_item = NULL;
> > > +            stream_channel_update_queue_stat(channel, 1,
> > > item->data.data_size);
> > > +            red_channel_pipes_add(red_channel, &item->base);
> > > +        }
> > > +    }

What does the while (size) loop do here? It will do more than one
iteration only if copy_size < size, which means there is not enough
space in the item buffer and in that case it seems to me it will loop
forever? Am I missing something?

> > >  }
> > >  
> > >  void
> > > @@ -607,6 +667,7 @@ stream_channel_reset(StreamChannel *channel)
> > >      channel->stream_id = -1;
> > >      channel->width = 0;
> > >      channel->height = 0;
> > > +    stream_channel_unref_data_item(channel);
> > >  
> > >      if (!red_channel_is_connected(red_channel)) {
> > >          return;
> > > diff --git a/server/stream-channel.h b/server/stream-channel.h
> > > index e8bec80b..18a1bdea 100644
> > > --- a/server/stream-channel.h
> > > +++ b/server/stream-channel.h
> > > @@ -60,6 +60,18 @@ struct StreamMsgStartStop;
> > >  
> > >  void stream_channel_change_format(StreamChannel *channel,
> > >                                    const struct StreamMsgFormat *fmt);
> > > +
> > > +/**
> > > + * Tell the channel that a new data packet is starting.
> > > + * This can be used to group all chunks together.
> > > + */
> > > +void stream_channel_start_data(StreamChannel *channel,
> > > +                               size_t size,
> > > +                               uint32_t mm_time);
> > > +
> > > +/**
> > > + * Send to channel a chunk of data.
> > > + */
> > >  void stream_channel_send_data(StreamChannel *channel,
> > >                                const void *data, size_t size,
> > >                                uint32_t mm_time);


More information about the Spice-devel mailing list