[Spice-devel] [spice-gtk PATCH v4 2/2] spice-channel: check message queue size
Victor Toso
victortoso at redhat.com
Wed Oct 21 03:38:12 PDT 2015
When channel wants to send much more data then the wire can handle, the
queue grows fast. This patch does not limit the queue growth but
introduces an internal API to check if queue size is too big.
This internal API is used with usbredirhost_can_write_iso callback from
usbredir. This way, we garantee that we are only droping data from
streaming devices.
An easy way to test locally is sharing and webcam and simulating high
latency with tc:
tc qdisc add dev lo root netem delay 100ms
tc qdisc change dev lo root netem delay 1000ms
tc qdisc del dev lo root netem
Resolves: https://bugzilla.redhat.com/show_bug.cgi?id=1264156
---
src/channel-usbredir.c | 43 +++++++++++++++++++++++++++++++++++++++++++
src/spice-channel-priv.h | 2 ++
src/spice-channel.c | 19 ++++++++++++++++++-
3 files changed, 63 insertions(+), 1 deletion(-)
diff --git a/src/channel-usbredir.c b/src/channel-usbredir.c
index 89f5c9d..69c565b 100644
--- a/src/channel-usbredir.c
+++ b/src/channel-usbredir.c
@@ -91,6 +91,7 @@ static void usbredir_log(void *user_data, int level, const char *msg);
static int usbredir_read_callback(void *user_data, uint8_t *data, int count);
static int usbredir_write_callback(void *user_data, uint8_t *data, int count);
static void usbredir_write_flush_callback(void *user_data);
+static uint64_t usbredir_can_write_iso_callback(void *user_data);
static void *usbredir_alloc_lock(void);
static void usbredir_lock_lock(void *user_data);
@@ -200,11 +201,22 @@ static void channel_set_handlers(SpiceChannelClass *klass)
/* ------------------------------------------------------------------ */
/* private api */
+/* FIXME: This is temporary, we should use instead the amount of seconds
+ * we want to max bufferized and let usbredir pick HIGHER and LOWER
+ * threshold. We might want to know the HIGHER threshold for limiting the
+ * queue size and handle memory pool issues that we still need to
+ * investigate: https://bugs.freedesktop.org/show_bug.cgi?id=92354
+ */
+#define USBREDIR_THRESHOLD_HIGHER (3 * 1024 * 1024)
+#define USBREDIR_THRESHOLD_LOWER (1 * 1024 * 1024)
+
G_GNUC_INTERNAL
void spice_usbredir_channel_set_context(SpiceUsbredirChannel *channel,
libusb_context *context)
{
SpiceUsbredirChannelPrivate *priv = channel->priv;
+ const gchar *env;
+ guint64 threshold_higher, threshold_lower;
g_return_if_fail(priv->host == NULL);
@@ -224,6 +236,31 @@ void spice_usbredir_channel_set_context(SpiceUsbredirChannel *channel,
usbredirhost_fl_write_cb_owns_buffer);
if (!priv->host)
g_error("Out of memory allocating usbredirhost");
+
+ /* FIXME: threshold could be channel-usbredir property?
+ * envvar for now should be enough */
+ env = g_getenv("USBREDIR_CHANNEL_THRESHOLD_HIGHER");
+ if (env != NULL) {
+ /* we expect the env var to be in MB */
+ threshold_higher = g_ascii_strtoull(env, NULL, 10);
+ threshold_higher = threshold_higher * 1024 * 1024;
+ } else {
+ threshold_higher = USBREDIR_THRESHOLD_HIGHER;
+ }
+
+ env = g_getenv("USBREDIR_CHANNEL_THRESHOLD_LOWER");
+ if (env != NULL) {
+ /* we expect the env var to be in MB */
+ threshold_lower = g_ascii_strtoull(env, NULL, 10);
+ threshold_lower = threshold_lower * 1024 * 1024;
+ } else {
+ threshold_lower = USBREDIR_THRESHOLD_LOWER;
+ }
+
+ CHANNEL_DEBUG(channel, "Max threshold is %lu bytes | Min threshold is %lu bytes",
+ threshold_higher, threshold_lower);
+ usbredirhost_set_cb_can_write_iso(priv->host, usbredir_can_write_iso_callback,
+ threshold_higher, threshold_lower);
}
static gboolean spice_usbredir_channel_open_device(
@@ -461,6 +498,12 @@ void spice_usbredir_channel_get_guest_filter(
/* ------------------------------------------------------------------ */
/* callbacks (any context) */
+static uint64_t usbredir_can_write_iso_callback(void *user_data)
+{
+ g_return_val_if_fail(SPICE_IS_USBREDIR_CHANNEL(user_data), 0);
+ return spice_channel_get_queue_size(SPICE_CHANNEL(user_data));
+}
+
/* Note that this function must be re-entrant safe, as it can get called
from both the main thread as well as from the usb event handling thread */
static void usbredir_write_flush_callback(void *user_data)
diff --git a/src/spice-channel-priv.h b/src/spice-channel-priv.h
index 436a521..4b2d1e6 100644
--- a/src/spice-channel-priv.h
+++ b/src/spice-channel-priv.h
@@ -111,6 +111,7 @@ struct _SpiceChannelPrivate {
gboolean xmit_queue_blocked;
STATIC_MUTEX xmit_queue_lock;
guint xmit_queue_wakeup_id;
+ guint64 xmit_queue_size;
char name[16];
enum spice_channel_state state;
@@ -171,6 +172,7 @@ void spice_channel_wakeup(SpiceChannel *channel, gboolean cancel);
SpiceSession* spice_channel_get_session(SpiceChannel *channel);
enum spice_channel_state spice_channel_get_state(SpiceChannel *channel);
+guint64 spice_channel_get_queue_size (SpiceChannel *channel);
/* coroutine context */
typedef void (*handler_msg_in)(SpiceChannel *channel, SpiceMsgIn *msg, gpointer data);
diff --git a/src/spice-channel.c b/src/spice-channel.c
index 2ce52c7..8b6125f 100644
--- a/src/spice-channel.c
+++ b/src/spice-channel.c
@@ -697,10 +697,12 @@ void spice_msg_out_send(SpiceMsgOut *out)
{
SpiceChannelPrivate *c;
gboolean was_empty;
+ guint32 size;
g_return_if_fail(out != NULL);
g_return_if_fail(out->channel != NULL);
c = out->channel->priv;
+ size = spice_marshaller_get_total_size(out->marshaller);
STATIC_MUTEX_LOCK(c->xmit_queue_lock);
if (c->xmit_queue_blocked) {
@@ -710,6 +712,7 @@ void spice_msg_out_send(SpiceMsgOut *out)
was_empty = g_queue_is_empty(&c->xmit_queue);
g_queue_push_tail(&c->xmit_queue, out);
+ c->xmit_queue_size = (was_empty) ? size : c->xmit_queue_size + size;
/* One wakeup is enough to empty the entire queue -> only do a wakeup
if the queue was empty, and there isn't one pending already. */
@@ -2104,8 +2107,11 @@ static void spice_channel_iterate_write(SpiceChannel *channel)
STATIC_MUTEX_LOCK(c->xmit_queue_lock);
out = g_queue_pop_head(&c->xmit_queue);
STATIC_MUTEX_UNLOCK(c->xmit_queue_lock);
- if (out)
+ if (out) {
+ guint32 size = spice_marshaller_get_total_size(out->marshaller);
+ c->xmit_queue_size = (c->xmit_queue_size < size) ? 0 : c->xmit_queue_size - size;
spice_channel_write_msg(channel, out);
+ }
} while (out);
spice_channel_flushed(channel, TRUE);
@@ -2814,6 +2820,17 @@ enum spice_channel_state spice_channel_get_state(SpiceChannel *channel)
}
G_GNUC_INTERNAL
+guint64 spice_channel_get_queue_size (SpiceChannel *channel)
+{
+ guint64 size;
+ SpiceChannelPrivate *c = channel->priv;
+ STATIC_MUTEX_LOCK(c->xmit_queue_lock);
+ size = c->xmit_queue_size;
+ STATIC_MUTEX_UNLOCK(c->xmit_queue_lock);
+ return size;
+}
+
+G_GNUC_INTERNAL
void spice_channel_swap(SpiceChannel *channel, SpiceChannel *swap, gboolean swap_msgs)
{
SpiceChannelPrivate *c = channel->priv;
--
2.5.0
More information about the Spice-devel
mailing list