[Spice-devel] [RFC PATCH spice-server v4 17/22] stream-device: Limit sending queue from guest to server
Frediano Ziglio
fziglio at redhat.com
Fri Aug 25 09:54:04 UTC 2017
Do not allow the guest to fill host memory.
Also having a huge queue mainly cause to have a higher video
latency.
Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
---
server/stream-channel.c | 41 ++++++++++++++++++++++++++++++++++++++++-
server/stream-channel.h | 10 ++++++++++
server/stream-device.c | 35 ++++++++++++++++++++++++++++++++++-
3 files changed, 84 insertions(+), 2 deletions(-)
diff --git a/server/stream-channel.c b/server/stream-channel.c
index 53a06f91..34c8a351 100644
--- a/server/stream-channel.c
+++ b/server/stream-channel.c
@@ -69,9 +69,15 @@ struct StreamChannel {
/* size of the current video stream */
unsigned width, height;
+ StreamQueueStat queue_stat;
+
/* callback to notify when a stream should be started or stopped */
stream_channel_start_proc start_cb;
void *start_opaque;
+
+ /* callback to notify when queue statistics changes */
+ stream_channel_queue_stat_proc queue_cb;
+ void *queue_opaque;
};
struct StreamChannelClass {
@@ -96,6 +102,7 @@ typedef struct StreamCreateItem {
typedef struct StreamDataItem {
RedPipeItem base;
+ StreamChannel *channel;
// NOTE: this must be the last field in the structure
SpiceMsgDisplayStreamData data;
} StreamDataItem;
@@ -453,6 +460,27 @@ stream_channel_change_format(StreamChannel *channel, const StreamMsgFormat *fmt)
red_pipe_item_unref(&item->base);
}
+static inline void
+stream_channel_update_queue_stat(StreamChannel *channel,
+ int32_t num_diff, int32_t size_diff)
+{
+ channel->queue_stat.num_items += num_diff;
+ channel->queue_stat.size += size_diff;
+ if (channel->queue_cb) {
+ channel->queue_cb(channel->queue_opaque, &channel->queue_stat, channel);
+ }
+}
+
+static void
+data_item_free(RedPipeItem *base)
+{
+ StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
+
+ stream_channel_update_queue_stat(pipe_item->channel, -1, -pipe_item->data.data_size);
+
+ free(pipe_item);
+}
+
void
stream_channel_send_data(StreamChannel *channel, const void *data, size_t size, uint32_t mm_time)
{
@@ -466,10 +494,13 @@ stream_channel_send_data(StreamChannel *channel, const void *data, size_t size,
RedChannel *red_channel = RED_CHANNEL(channel);
StreamDataItem *item = spice_malloc(sizeof(*item) + size);
- red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA);
+ red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
+ data_item_free);
item->data.base.id = channel->stream_id;
item->data.base.multi_media_time = mm_time;
item->data.data_size = size;
+ item->channel = channel;
+ stream_channel_update_queue_stat(channel, 1, size);
// TODO try to optimize avoiding the copy
memcpy(item->data.data, data, size);
red_channel_pipes_new_add(red_channel, pipe_item_new_ref, item);
@@ -485,6 +516,14 @@ stream_channel_register_start_cb(StreamChannel *channel,
}
void
+stream_channel_register_queue_stat_cb(StreamChannel *channel,
+ stream_channel_queue_stat_proc cb, void *opaque)
+{
+ channel->queue_cb = cb;
+ channel->queue_opaque = opaque;
+}
+
+void
stream_channel_reset(StreamChannel *channel)
{
RedChannel *red_channel = RED_CHANNEL(channel);
diff --git a/server/stream-channel.h b/server/stream-channel.h
index bd075a95..f961d715 100644
--- a/server/stream-channel.h
+++ b/server/stream-channel.h
@@ -67,6 +67,16 @@ typedef void (*stream_channel_start_proc)(void *opaque, struct StreamMsgStartSto
void stream_channel_register_start_cb(StreamChannel *channel,
stream_channel_start_proc cb, void *opaque);
+typedef struct StreamQueueStat {
+ uint32_t num_items;
+ uint32_t size;
+} StreamQueueStat;
+
+typedef void (*stream_channel_queue_stat_proc)(void *opaque, const StreamQueueStat *stats,
+ StreamChannel *channel);
+void stream_channel_register_queue_stat_cb(StreamChannel *channel,
+ stream_channel_queue_stat_proc cb, void *opaque);
+
G_END_DECLS
#endif /* STREAM_CHANNEL_H_ */
diff --git a/server/stream-device.c b/server/stream-device.c
index f72140ab..360ee60f 100644
--- a/server/stream-device.c
+++ b/server/stream-device.c
@@ -44,6 +44,7 @@ struct StreamDevice {
uint8_t hdr_pos;
bool has_error;
bool opened;
+ bool flow_stopped;
StreamChannel *stream_channel;
};
@@ -67,7 +68,7 @@ stream_device_read_msg_from_dev(RedCharDevice *self, SpiceCharDeviceInstance *si
SpiceCharDeviceInterface *sif;
int n;
- if (dev->has_error || !dev->stream_channel) {
+ if (dev->has_error || dev->flow_stopped || !dev->stream_channel) {
return NULL;
}
@@ -165,6 +166,9 @@ handle_msg_data(StreamDevice *dev, SpiceCharDeviceInstance *sin)
if (n <= 0) {
break;
}
+ // TODO collect all message ??
+ // up: we send a single frame together
+ // down: guest can cause a crash
stream_channel_send_data(dev->stream_channel, buf, n, reds_get_mm_time());
dev->hdr.size -= n;
}
@@ -218,6 +222,33 @@ stream_device_stream_start(void *opaque, StreamMsgStartStop *start,
red_char_device_write_buffer_add(char_dev, buf);
}
+static void
+stream_device_stream_queue_stat(void *opaque, const StreamQueueStat *stats G_GNUC_UNUSED,
+ StreamChannel *stream_channel G_GNUC_UNUSED)
+{
+ StreamDevice *dev = (StreamDevice *) opaque;
+
+ if (!dev->opened) {
+ return;
+ }
+
+ // very easy control flow... if any data stop
+ // this seems a very small queue but as we use tcp
+ // there's already that queue
+ if (stats->num_items) {
+ dev->flow_stopped = true;
+ return;
+ }
+
+ if (dev->flow_stopped) {
+ dev->flow_stopped = false;
+ // TODO resume flow...
+ // avoid recursion if we need to call get data from data handling from
+ // data handling
+ red_char_device_wakeup(&dev->parent);
+ }
+}
+
RedCharDevice *
stream_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin)
{
@@ -262,6 +293,7 @@ allocate_channels(StreamDevice *dev)
dev->stream_channel = stream_channel;
stream_channel_register_start_cb(stream_channel, stream_device_stream_start, dev);
+ stream_channel_register_queue_stat_cb(stream_channel, stream_device_stream_queue_stat, dev);
}
static void
@@ -280,6 +312,7 @@ stream_device_port_event(RedCharDevice *char_dev, uint8_t event)
}
device->hdr_pos = 0;
device->has_error = false;
+ device->flow_stopped = false;
red_char_device_reset(char_dev);
if (device->stream_channel) {
stream_channel_reset(device->stream_channel);
--
2.13.5
More information about the Spice-devel
mailing list