[Spice-devel] [RFC PATCH spice-server v2 15/19] stream-device: Limit sending queue from guest to server
Jonathon Jongsma
jjongsma at redhat.com
Tue Aug 22 21:34:18 UTC 2017
On Tue, 2017-08-22 at 11:26 -0400, Frediano Ziglio wrote:
> >
> > So I was reviewing this code and had a few comments but then I
> > realized
> > that we already have a basic flow control mechanism for char
> > devices.
> > RedCharDeviceClient has an implementation based on 'tokens'. Is
> > there
> > any reason that we can't use this instead of re-implementing it
> > here?
> >
>
> I think the main reason is that the token implementation requires
> to use the characters device as a pass-through device for the client
> while this device is not a pass-through.
Yes, it's not a pass-through, but when I looked, I had thought that it
would be possible to use the RedCharDeviceClient for our purpose. It's
not explicitly a pass-through: the data can be transformed (or dropped)
by the implementation of
RedCharDeviceClass::read_one_msg_from_device().
But now I see that some of our char device reads may result in many
RedPipeItem being sent (e.g. reading a STREAM_TYPE_FORMAT message
results in us sending the following pipe items over the stream channel:
STREAM_DESTROY, SURFACE_DESTROY, SURFACE_CREATE, DISPLAY_MARK,
STREAM_CREATE, and STREAM_ACTIVATE_REPORT)...
>
> This implementation does not surely solve the entire problem.
> The streaming in the DisplayChannel protocol is supposed to use
> stream reports on the server <-> client chat and this should be
> propagated to the guest trying to reduce the bandwidth usage
> (reducing frames and/or quality). Currently I enable these
> report but there's no handling of it. Also there's no messages
> for the guest for these information.
>
> Basically this patch solve one part of the issue. It avoids
> the queue guest <-> server to grow undefinitely.
OK. Even so, it seems a bit of a shame to have to sort of re-implement
flow control here. Maybe there's no choice...
>
> Frediano
>
> >
> >
> > On Wed, 2017-06-14 at 16:40 +0100, Frediano Ziglio wrote:
> > > Do not allow the guest to fill host memory.
> > > Also having a huge queue mainly cause to have a higher video
> > > latency.
> > >
> > > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > > ---
> > > server/stream-channel.c | 41
> > > ++++++++++++++++++++++++++++++++++++++++-
> > > server/stream-channel.h | 10 ++++++++++
> > > server/stream-device.c | 34 +++++++++++++++++++++++++++++++++-
> > > 3 files changed, 83 insertions(+), 2 deletions(-)
> > >
> > > diff --git a/server/stream-channel.c b/server/stream-channel.c
> > > index 58c550e..966dd77 100644
> > > --- a/server/stream-channel.c
> > > +++ b/server/stream-channel.c
> > > @@ -68,9 +68,15 @@ struct StreamChannel {
> > > /* size of the current video stream */
> > > unsigned width, height;
> > >
> > > + StreamQueueStat queue_stat;
> > > +
> > > /* callback to notify when a stream should be started or
> > > stopped
> > > */
> > > stream_channel_start_proc start_cb;
> > > void *start_opaque;
> > > +
> > > + /* callback to notify when queue statistics changes */
> > > + stream_channel_queue_stat_proc queue_cb;
> > > + void *queue_opaque;
> > > };
> > >
> > > struct StreamChannelClass {
> > > @@ -95,6 +101,7 @@ typedef struct StreamCreateItem {
> > >
> > > typedef struct StreamDataItem {
> > > RedPipeItem base;
> > > + StreamChannel *channel;
> > > // NOTE: this must be the last field in the structure
> > > SpiceMsgDisplayStreamData data;
> > > } StreamDataItem;
> > > @@ -450,6 +457,27 @@ stream_channel_change_format(StreamChannel
> > > *channel, const StreamMsgFormat *fmt)
> > > red_pipe_item_unref(&item->base);
> > > }
> > >
> > > +static inline void
> > > +stream_channel_update_queue_stat(StreamChannel *channel,
> > > + int32_t num_diff, int32_t
> > > size_diff)
> > > +{
> > > + channel->queue_stat.num_items += num_diff;
> > > + channel->queue_stat.size += size_diff;
> > > + if (channel->queue_cb) {
> > > + channel->queue_cb(channel->queue_opaque, &channel-
> > > > queue_stat, channel);
> > >
> > > + }
> > > +}
> > > +
> > > +static void
> > > +data_item_free(RedPipeItem *base)
> > > +{
> > > + StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem,
> > > base);
> > > +
> > > + stream_channel_update_queue_stat(pipe_item->channel, -1,
> > > -pipe_item->data.data_size);
> > > +
> > > + free(pipe_item);
> > > +}
> > > +
> > > void
> > > stream_channel_send_data(StreamChannel *channel, const void
> > > *data,
> > > size_t size)
> > > {
> > > @@ -460,10 +488,13 @@ stream_channel_send_data(StreamChannel
> > > *channel, const void *data, size_t size)
> > > RedChannel *red_channel = RED_CHANNEL(channel);
> > >
> > > StreamDataItem *item = spice_malloc(sizeof(*item) + size);
> > > - red_pipe_item_init(&item->base,
> > > RED_PIPE_ITEM_TYPE_STREAM_DATA);
> > > + red_pipe_item_init_full(&item->base,
> > > RED_PIPE_ITEM_TYPE_STREAM_DATA,
> > > + data_item_free);
> > > item->data.base.id = channel->stream_id;
> > > item->data.base.multi_media_time = reds_get_mm_time();
> > > item->data.data_size = size;
> > > + item->channel = channel;
> > > + stream_channel_update_queue_stat(channel, 1, size);
> > > // TODO try to optimize avoiding the copy
> > > memcpy(item->data.data, data, size);
> > > red_channel_pipes_new_add(red_channel, pipe_item_new_ref,
> > > item);
> > > @@ -479,6 +510,14 @@
> > > stream_channel_register_start_cb(StreamChannel
> > > *channel,
> > > }
> > >
> > > void
> > > +stream_channel_register_queue_stat_cb(StreamChannel *channel,
> > > + stream_channel_queue_stat_
> > > proc
> > > cb, void *opaque)
> > > +{
> > > + channel->queue_cb = cb;
> > > + channel->queue_opaque = opaque;
> > > +}
> > > +
> > > +void
> > > stream_channel_reset(StreamChannel *channel)
> > > {
> > > RedChannel *red_channel = RED_CHANNEL(channel);
> > > diff --git a/server/stream-channel.h b/server/stream-channel.h
> > > index 8c9c0f1..ca20b6a 100644
> > > --- a/server/stream-channel.h
> > > +++ b/server/stream-channel.h
> > > @@ -65,6 +65,16 @@ typedef void (*stream_channel_start_proc)(void
> > > *opaque, struct StreamMsgStartSto
> > > void stream_channel_register_start_cb(StreamChannel *channel,
> > > stream_channel_start_proc
> > > cb,
> > > void *opaque);
> > >
> > > +typedef struct StreamQueueStat {
> > > + uint32_t num_items;
> > > + uint32_t size;
> > > +} StreamQueueStat;
> > > +
> > > +typedef void (*stream_channel_queue_stat_proc)(void *opaque,
> > > const
> > > StreamQueueStat *stats,
> > > + StreamChannel
> > > *channel);
> > > +void stream_channel_register_queue_stat_cb(StreamChannel
> > > *channel,
> > > + stream_channel_queue_
> > > stat
> > > _proc cb, void *opaque);
> > > +
> > > G_END_DECLS
> > >
> > > #endif /* STREAM_CHANNEL_H_ */
> > > diff --git a/server/stream-device.c b/server/stream-device.c
> > > index b8a9dac..2b1e2f2 100644
> > > --- a/server/stream-device.c
> > > +++ b/server/stream-device.c
> > > @@ -44,6 +44,7 @@ struct StreamDevice {
> > > uint8_t hdr_pos;
> > > bool has_error;
> > > bool opened;
> > > + bool flow_stopped;
> > > StreamChannel *channel;
> > > };
> > >
> > > @@ -67,7 +68,7 @@ stream_device_read_msg_from_dev(RedCharDevice
> > > *self, SpiceCharDeviceInstance *si
> > > SpiceCharDeviceInterface *sif;
> > > int n;
> > >
> > > - if (dev->has_error) {
> > > + if (dev->has_error || dev->flow_stopped) {
> > > return NULL;
> > > }
> > >
> > > @@ -165,6 +166,9 @@ handle_msg_data(StreamDevice *dev,
> > > SpiceCharDeviceInstance *sin)
> > > if (n <= 0) {
> > > break;
> > > }
> > > + // TODO collect all message ??
> > > + // up: we send a single frame together
> > > + // down: guest can cause a crash
> > > stream_channel_send_data(dev->channel, buf, n);
> > > dev->hdr.size -= n;
> > > }
> > > @@ -218,6 +222,33 @@ stream_device_stream_start(void *opaque,
> > > StreamMsgStartStop *start,
> > > red_char_device_write_buffer_add(char_dev, buf);
> > > }
> > >
> > > +static void
> > > +stream_device_stream_queue_stat(void *opaque, const
> > > StreamQueueStat
> > > *stats G_GNUC_UNUSED,
> > > + StreamChannel *channel
> > > G_GNUC_UNUSED)
> > > +{
> > > + StreamDevice *dev = (StreamDevice *) opaque;
> > > +
> > > + if (!dev->opened) {
> > > + return;
> > > + }
> > > +
> > > + // very easy control flow... if any data stop
> > > + // this seems a very small queue but as we use tcp
> > > + // there's already that queue
> > > + if (stats->num_items) {
> > > + dev->flow_stopped = true;
> > > + return;
> > > + }
> > > +
> > > + if (dev->flow_stopped) {
> > > + dev->flow_stopped = false;
> > > + // TODO resume flow...
> > > + // avoid recursion if we need to call get data from data
> > > handling from
> > > + // data handling
> > > + red_char_device_wakeup(&dev->parent);
> > > + }
> > > +}
> > > +
> > > RedCharDevice *
> > > stream_device_connect(RedsState *reds, SpiceCharDeviceInstance
> > > *sin)
> > > {
> > > @@ -228,6 +259,7 @@ stream_device_connect(RedsState *reds,
> > > SpiceCharDeviceInstance *sin)
> > > StreamDevice *dev = stream_device_new(sin, reds);
> > > dev->channel = channel;
> > > stream_channel_register_start_cb(channel,
> > > stream_device_stream_start, dev);
> > > + stream_channel_register_queue_stat_cb(channel,
> > > stream_device_stream_queue_stat, dev);
> > >
> > > sif = spice_char_device_get_interface(sin);
> > > if (sif->state) {
More information about the Spice-devel
mailing list