[Spice-devel] [RFC PATCH spice-server v2 15/19] stream-device: Limit sending queue from guest to server
Frediano Ziglio
fziglio at redhat.com
Tue Aug 22 15:26:51 UTC 2017
>
> So I was reviewing this code and had a few comments but then I realized
> that we already have a basic flow control mechanism for char devices.
> RedCharDeviceClient has an implementation based on 'tokens'. Is there
> any reason that we can't use this instead of re-implementing it here?
>
I think the main reason is that the token implementation requires
to use the characters device as a pass-through device for the client
while this device is not a pass-through.
This implementation does not surely solve the entire problem.
The streaming in the DisplayChannel protocol is supposed to use
stream reports on the server <-> client chat and this should be
propagated to the guest trying to reduce the bandwidth usage
(reducing frames and/or quality). Currently I enable these
report but there's no handling of it. Also there's no messages
for the guest for these information.
Basically this patch solve one part of the issue. It avoids
the queue guest <-> server to grow undefinitely.
Frediano
>
>
> On Wed, 2017-06-14 at 16:40 +0100, Frediano Ziglio wrote:
> > Do not allow the guest to fill host memory.
> > Also having a huge queue mainly cause to have a higher video
> > latency.
> >
> > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > ---
> > server/stream-channel.c | 41
> > ++++++++++++++++++++++++++++++++++++++++-
> > server/stream-channel.h | 10 ++++++++++
> > server/stream-device.c | 34 +++++++++++++++++++++++++++++++++-
> > 3 files changed, 83 insertions(+), 2 deletions(-)
> >
> > diff --git a/server/stream-channel.c b/server/stream-channel.c
> > index 58c550e..966dd77 100644
> > --- a/server/stream-channel.c
> > +++ b/server/stream-channel.c
> > @@ -68,9 +68,15 @@ struct StreamChannel {
> > /* size of the current video stream */
> > unsigned width, height;
> >
> > + StreamQueueStat queue_stat;
> > +
> > /* callback to notify when a stream should be started or stopped
> > */
> > stream_channel_start_proc start_cb;
> > void *start_opaque;
> > +
> > + /* callback to notify when queue statistics changes */
> > + stream_channel_queue_stat_proc queue_cb;
> > + void *queue_opaque;
> > };
> >
> > struct StreamChannelClass {
> > @@ -95,6 +101,7 @@ typedef struct StreamCreateItem {
> >
> > typedef struct StreamDataItem {
> > RedPipeItem base;
> > + StreamChannel *channel;
> > // NOTE: this must be the last field in the structure
> > SpiceMsgDisplayStreamData data;
> > } StreamDataItem;
> > @@ -450,6 +457,27 @@ stream_channel_change_format(StreamChannel
> > *channel, const StreamMsgFormat *fmt)
> > red_pipe_item_unref(&item->base);
> > }
> >
> > +static inline void
> > +stream_channel_update_queue_stat(StreamChannel *channel,
> > + int32_t num_diff, int32_t
> > size_diff)
> > +{
> > + channel->queue_stat.num_items += num_diff;
> > + channel->queue_stat.size += size_diff;
> > + if (channel->queue_cb) {
> > + channel->queue_cb(channel->queue_opaque, &channel-
> > >queue_stat, channel);
> > + }
> > +}
> > +
> > +static void
> > +data_item_free(RedPipeItem *base)
> > +{
> > + StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
> > +
> > + stream_channel_update_queue_stat(pipe_item->channel, -1,
> > -pipe_item->data.data_size);
> > +
> > + free(pipe_item);
> > +}
> > +
> > void
> > stream_channel_send_data(StreamChannel *channel, const void *data,
> > size_t size)
> > {
> > @@ -460,10 +488,13 @@ stream_channel_send_data(StreamChannel
> > *channel, const void *data, size_t size)
> > RedChannel *red_channel = RED_CHANNEL(channel);
> >
> > StreamDataItem *item = spice_malloc(sizeof(*item) + size);
> > - red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA);
> > + red_pipe_item_init_full(&item->base,
> > RED_PIPE_ITEM_TYPE_STREAM_DATA,
> > + data_item_free);
> > item->data.base.id = channel->stream_id;
> > item->data.base.multi_media_time = reds_get_mm_time();
> > item->data.data_size = size;
> > + item->channel = channel;
> > + stream_channel_update_queue_stat(channel, 1, size);
> > // TODO try to optimize avoiding the copy
> > memcpy(item->data.data, data, size);
> > red_channel_pipes_new_add(red_channel, pipe_item_new_ref, item);
> > @@ -479,6 +510,14 @@ stream_channel_register_start_cb(StreamChannel
> > *channel,
> > }
> >
> > void
> > +stream_channel_register_queue_stat_cb(StreamChannel *channel,
> > + stream_channel_queue_stat_proc
> > cb, void *opaque)
> > +{
> > + channel->queue_cb = cb;
> > + channel->queue_opaque = opaque;
> > +}
> > +
> > +void
> > stream_channel_reset(StreamChannel *channel)
> > {
> > RedChannel *red_channel = RED_CHANNEL(channel);
> > diff --git a/server/stream-channel.h b/server/stream-channel.h
> > index 8c9c0f1..ca20b6a 100644
> > --- a/server/stream-channel.h
> > +++ b/server/stream-channel.h
> > @@ -65,6 +65,16 @@ typedef void (*stream_channel_start_proc)(void
> > *opaque, struct StreamMsgStartSto
> > void stream_channel_register_start_cb(StreamChannel *channel,
> > stream_channel_start_proc cb,
> > void *opaque);
> >
> > +typedef struct StreamQueueStat {
> > + uint32_t num_items;
> > + uint32_t size;
> > +} StreamQueueStat;
> > +
> > +typedef void (*stream_channel_queue_stat_proc)(void *opaque, const
> > StreamQueueStat *stats,
> > + StreamChannel
> > *channel);
> > +void stream_channel_register_queue_stat_cb(StreamChannel *channel,
> > + stream_channel_queue_stat
> > _proc cb, void *opaque);
> > +
> > G_END_DECLS
> >
> > #endif /* STREAM_CHANNEL_H_ */
> > diff --git a/server/stream-device.c b/server/stream-device.c
> > index b8a9dac..2b1e2f2 100644
> > --- a/server/stream-device.c
> > +++ b/server/stream-device.c
> > @@ -44,6 +44,7 @@ struct StreamDevice {
> > uint8_t hdr_pos;
> > bool has_error;
> > bool opened;
> > + bool flow_stopped;
> > StreamChannel *channel;
> > };
> >
> > @@ -67,7 +68,7 @@ stream_device_read_msg_from_dev(RedCharDevice
> > *self, SpiceCharDeviceInstance *si
> > SpiceCharDeviceInterface *sif;
> > int n;
> >
> > - if (dev->has_error) {
> > + if (dev->has_error || dev->flow_stopped) {
> > return NULL;
> > }
> >
> > @@ -165,6 +166,9 @@ handle_msg_data(StreamDevice *dev,
> > SpiceCharDeviceInstance *sin)
> > if (n <= 0) {
> > break;
> > }
> > + // TODO collect all message ??
> > + // up: we send a single frame together
> > + // down: guest can cause a crash
> > stream_channel_send_data(dev->channel, buf, n);
> > dev->hdr.size -= n;
> > }
> > @@ -218,6 +222,33 @@ stream_device_stream_start(void *opaque,
> > StreamMsgStartStop *start,
> > red_char_device_write_buffer_add(char_dev, buf);
> > }
> >
> > +static void
> > +stream_device_stream_queue_stat(void *opaque, const StreamQueueStat
> > *stats G_GNUC_UNUSED,
> > + StreamChannel *channel
> > G_GNUC_UNUSED)
> > +{
> > + StreamDevice *dev = (StreamDevice *) opaque;
> > +
> > + if (!dev->opened) {
> > + return;
> > + }
> > +
> > + // very easy control flow... if any data stop
> > + // this seems a very small queue but as we use tcp
> > + // there's already that queue
> > + if (stats->num_items) {
> > + dev->flow_stopped = true;
> > + return;
> > + }
> > +
> > + if (dev->flow_stopped) {
> > + dev->flow_stopped = false;
> > + // TODO resume flow...
> > + // avoid recursion if we need to call get data from data
> > handling from
> > + // data handling
> > + red_char_device_wakeup(&dev->parent);
> > + }
> > +}
> > +
> > RedCharDevice *
> > stream_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin)
> > {
> > @@ -228,6 +259,7 @@ stream_device_connect(RedsState *reds,
> > SpiceCharDeviceInstance *sin)
> > StreamDevice *dev = stream_device_new(sin, reds);
> > dev->channel = channel;
> > stream_channel_register_start_cb(channel,
> > stream_device_stream_start, dev);
> > + stream_channel_register_queue_stat_cb(channel,
> > stream_device_stream_queue_stat, dev);
> >
> > sif = spice_char_device_get_interface(sin);
> > if (sif->state) {
>
More information about the Spice-devel
mailing list