[Spice-devel] [RFC PATCH 15/15] stream-device: Limit sending queue from guest to server

Frediano Ziglio fziglio at redhat.com
Wed Apr 19 15:31:14 UTC 2017


Do not allow the guest to fill host memory.
Also having a huge queue mainly cause to have a higher video
latency.

Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
---
 server/stream-channel.c | 41 ++++++++++++++++++++++++++++++++++++++++-
 server/stream-channel.h | 10 ++++++++++
 server/stream-device.c  | 34 +++++++++++++++++++++++++++++++++-
 3 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/server/stream-channel.c b/server/stream-channel.c
index 58c550e..966dd77 100644
--- a/server/stream-channel.c
+++ b/server/stream-channel.c
@@ -68,9 +68,15 @@ struct StreamChannel {
     /* size of the current video stream */
     unsigned width, height;
 
+    StreamQueueStat queue_stat;
+
     /* callback to notify when a stream should be started or stopped */
     stream_channel_start_proc start_cb;
     void *start_opaque;
+
+    /* callback to notify when queue statistics changes */
+    stream_channel_queue_stat_proc queue_cb;
+    void *queue_opaque;
 };
 
 struct StreamChannelClass {
@@ -95,6 +101,7 @@ typedef struct StreamCreateItem {
 
 typedef struct StreamDataItem {
     RedPipeItem base;
+    StreamChannel *channel;
     // NOTE: this must be the last field in the structure
     SpiceMsgDisplayStreamData data;
 } StreamDataItem;
@@ -450,6 +457,27 @@ stream_channel_change_format(StreamChannel *channel, const StreamMsgFormat *fmt)
     red_pipe_item_unref(&item->base);
 }
 
+static inline void
+stream_channel_update_queue_stat(StreamChannel *channel,
+                                 int32_t num_diff, int32_t size_diff)
+{
+    channel->queue_stat.num_items += num_diff;
+    channel->queue_stat.size += size_diff;
+    if (channel->queue_cb) {
+        channel->queue_cb(channel->queue_opaque, &channel->queue_stat, channel);
+    }
+}
+
+static void
+data_item_free(RedPipeItem *base)
+{
+    StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
+
+    stream_channel_update_queue_stat(pipe_item->channel, -1, -pipe_item->data.data_size);
+
+    free(pipe_item);
+}
+
 void
 stream_channel_send_data(StreamChannel *channel, const void *data, size_t size)
 {
@@ -460,10 +488,13 @@ stream_channel_send_data(StreamChannel *channel, const void *data, size_t size)
     RedChannel *red_channel = RED_CHANNEL(channel);
 
     StreamDataItem *item = spice_malloc(sizeof(*item) + size);
-    red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA);
+    red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
+                            data_item_free);
     item->data.base.id = channel->stream_id;
     item->data.base.multi_media_time = reds_get_mm_time();
     item->data.data_size = size;
+    item->channel = channel;
+    stream_channel_update_queue_stat(channel, 1, size);
     // TODO try to optimize avoiding the copy
     memcpy(item->data.data, data, size);
     red_channel_pipes_new_add(red_channel, pipe_item_new_ref, item);
@@ -479,6 +510,14 @@ stream_channel_register_start_cb(StreamChannel *channel,
 }
 
 void
+stream_channel_register_queue_stat_cb(StreamChannel *channel,
+                                      stream_channel_queue_stat_proc cb, void *opaque)
+{
+    channel->queue_cb = cb;
+    channel->queue_opaque = opaque;
+}
+
+void
 stream_channel_reset(StreamChannel *channel)
 {
     RedChannel *red_channel = RED_CHANNEL(channel);
diff --git a/server/stream-channel.h b/server/stream-channel.h
index 8c9c0f1..ca20b6a 100644
--- a/server/stream-channel.h
+++ b/server/stream-channel.h
@@ -65,6 +65,16 @@ typedef void (*stream_channel_start_proc)(void *opaque, struct StreamMsgStartSto
 void stream_channel_register_start_cb(StreamChannel *channel,
                                       stream_channel_start_proc cb, void *opaque);
 
+typedef struct StreamQueueStat {
+    uint32_t num_items;
+    uint32_t size;
+} StreamQueueStat;
+
+typedef void (*stream_channel_queue_stat_proc)(void *opaque, const StreamQueueStat *stats,
+                                               StreamChannel *channel);
+void stream_channel_register_queue_stat_cb(StreamChannel *channel,
+                                           stream_channel_queue_stat_proc cb, void *opaque);
+
 G_END_DECLS
 
 #endif /* STREAM_CHANNEL_H_ */
diff --git a/server/stream-device.c b/server/stream-device.c
index df28ffc..2c0f7ae 100644
--- a/server/stream-device.c
+++ b/server/stream-device.c
@@ -48,6 +48,7 @@ struct StreamDevice {
     uint8_t hdr_pos;
     bool has_error;
     bool opened;
+    bool flow_stopped;
     StreamChannel *channel;
 };
 
@@ -71,7 +72,7 @@ stream_device_read_msg_from_dev(RedCharDevice *self, SpiceCharDeviceInstance *si
     SpiceCharDeviceInterface *sif;
     int n;
 
-    if (dev->has_error) {
+    if (dev->has_error || dev->flow_stopped) {
         return NULL;
     }
 
@@ -169,6 +170,9 @@ handle_msg_data(StreamDevice *dev, SpiceCharDeviceInstance *sin)
         if (n <= 0) {
             break;
         }
+        // TODO collect all message ??
+        // up: we send a single frame together
+        // down: guest can cause a crash
         stream_channel_send_data(dev->channel, buf, n);
         dev->hdr.size -= n;
     }
@@ -222,6 +226,33 @@ stream_device_stream_start(void *opaque, StreamMsgStartStop *start,
     red_char_device_write_buffer_add(char_dev, buf);
 }
 
+static void
+stream_device_stream_queue_stat(void *opaque, const StreamQueueStat *stats G_GNUC_UNUSED,
+                                StreamChannel *channel G_GNUC_UNUSED)
+{
+    StreamDevice *dev = (StreamDevice *) opaque;
+
+    if (!dev->opened) {
+        return;
+    }
+
+    // very easy control flow... if any data stop
+    // this seems a very small queue but as we use tcp
+    // there's already that queue
+    if (stats->num_items) {
+        dev->flow_stopped = true;
+        return;
+    }
+
+    if (dev->flow_stopped) {
+        dev->flow_stopped = false;
+        // TODO resume flow...
+        // avoid recursion if we need to call get data from data handling from
+        // data handling
+        red_char_device_wakeup(&dev->parent);
+    }
+}
+
 RedCharDevice *
 stream_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin)
 {
@@ -232,6 +263,7 @@ stream_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin)
     StreamDevice *dev = stream_device_new(sin, reds);
     dev->channel = channel;
     stream_channel_register_start_cb(channel, stream_device_stream_start, dev);
+    stream_channel_register_queue_stat_cb(channel, stream_device_stream_queue_stat, dev);
 
     sif = spice_char_device_get_interface(sin);
     if (sif->state) {
-- 
2.9.3



More information about the Spice-devel mailing list