[Spice-devel] [PATCH spice-gtk v2 2/3] RFC channel-display-gst: Limit input queue

Frediano Ziglio fziglio at redhat.com
Fri May 25 12:55:21 UTC 2018


Whenever there are bytes from the network we queue them to
GStreamer. The pipeline is set to not have a limit so the queue
can grow indefinitely.
Limit the queue using need_data callback and stopping reading.

Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
--
Changes since v1:
- use need_data instead of a fixed limit of 64mb.

Note that stopping reading cause the entire coroutine to stop.
Also the coroutine keeps reading till there is data so limiting
the speed to read can cause the coroutine to not send data either,
so the additional check in spice-channel.c.
---
 src/channel-display-gst.c | 59 ++++++++++++++++++++++++++++++++++++++-
 src/spice-channel.c       |  3 +-
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
index c626d3fd..977a864a 100644
--- a/src/channel-display-gst.c
+++ b/src/channel-display-gst.c
@@ -54,6 +54,8 @@ typedef struct SpiceGstDecoder {
     SpiceGstFrame *display_frame;
     guint timer_id;
     guint pending_samples;
+
+    GstBuffer *pending_src_buffer;
 } SpiceGstDecoder;
 
 #define VALID_VIDEO_CODEC_TYPE(codec) \
@@ -353,6 +355,36 @@ static gboolean handle_pipeline_message(GstBus *bus, GstMessage *msg, gpointer v
     return TRUE;
 }
 
+// callback when appsrc needs data, possibly called from another
+// thread
+static void
+appsrc_need_data(GstAppSrc *src, guint length, gpointer user_data)
+{
+    SpiceGstDecoder *decoder = user_data;
+    bool must_wakeup = false;
+    g_mutex_lock(&decoder->queues_mutex);
+    if (decoder->pending_src_buffer) {
+        // TODO errors
+        gst_app_src_push_buffer(decoder->appsrc, decoder->pending_src_buffer);
+        decoder->pending_src_buffer = NULL;
+        must_wakeup = true;
+    }
+    g_mutex_unlock(&decoder->queues_mutex);
+
+    if (must_wakeup) {
+        g_main_context_wakeup(g_main_context_default());
+    }
+}
+
+static void
+appsrc_setup_callbacks(SpiceGstDecoder *decoder)
+{
+    if (decoder->appsrc) {
+        GstAppSrcCallbacks appsrc_cbs = { appsrc_need_data, NULL, NULL };
+        gst_app_src_set_callbacks(decoder->appsrc, &appsrc_cbs, decoder, NULL);
+    }
+}
+
 #if GST_CHECK_VERSION(1,9,0)
 static void app_source_setup(GstElement *pipeline G_GNUC_UNUSED,
                              GstElement *source,
@@ -377,6 +409,8 @@ static void app_source_setup(GstElement *pipeline G_GNUC_UNUSED,
                  NULL);
     gst_caps_unref(caps);
     decoder->appsrc = GST_APP_SRC(gst_object_ref(source));
+
+    appsrc_setup_callbacks(decoder);
 }
 #endif
 
@@ -483,6 +517,7 @@ static gboolean create_pipeline(SpiceGstDecoder *decoder)
     decoder->appsink = GST_APP_SINK(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "sink"));
 #endif
 
+    appsrc_setup_callbacks(decoder);
     if (decoder->appsink) {
         GstAppSinkCallbacks appsink_cbs = { NULL };
         appsink_cbs.new_sample = new_sample;
@@ -558,6 +593,16 @@ static void spice_gst_decoder_destroy(VideoDecoder *video_decoder)
      */
 }
 
+static gboolean wait_push(gpointer data)
+{
+    SpiceGstDecoder *decoder = data;
+
+    g_mutex_lock(&decoder->queues_mutex);
+    gboolean res = (decoder->pending_src_buffer == NULL);
+    g_mutex_unlock(&decoder->queues_mutex);
+
+    return res;
+}
 
 /* spice_gst_decoder_queue_frame() queues the SpiceFrame for decoding and
  * displaying. The steps it goes through are as follows:
@@ -584,6 +629,7 @@ static void spice_gst_decoder_destroy(VideoDecoder *video_decoder)
  * 9) display_frame() then frees the SpiceGstFrame, which frees the SpiceFrame
  *    and decompressed frame with it.
  */
+/* coroutine context */
 static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
                                               SpiceFrame *frame, int latency)
 {
@@ -647,7 +693,18 @@ static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
         frame = NULL;
     }
 
-    if (gst_app_src_push_buffer(decoder->appsrc, buffer) != GST_FLOW_OK) {
+    g_coroutine_condition_wait(g_coroutine_self(), wait_push, decoder);
+    g_assert(decoder->pending_src_buffer == NULL);
+
+    GstFlowReturn flow = GST_FLOW_OK;
+    g_mutex_lock(&decoder->queues_mutex);
+    if (gst_app_src_get_current_level_bytes(decoder->appsrc) == 0) {
+        flow = gst_app_src_push_buffer(decoder->appsrc, buffer);
+    } else {
+        decoder->pending_src_buffer = buffer;
+    }
+    g_mutex_unlock(&decoder->queues_mutex);
+    if (flow != GST_FLOW_OK) {
         SPICE_DEBUG("GStreamer error: unable to push frame");
         stream_dropped_frame_on_playback(decoder->base.stream);
     }
diff --git a/src/spice-channel.c b/src/spice-channel.c
index 7e3e3b72..7b45fa35 100644
--- a/src/spice-channel.c
+++ b/src/spice-channel.c
@@ -2336,7 +2336,8 @@ static void spice_channel_iterate_read(SpiceChannel *channel)
     /* treat all incoming data (block on message completion) */
     while (!c->has_error &&
            c->state != SPICE_CHANNEL_STATE_MIGRATING &&
-           g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(c->in))
+           g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(c->in)) &&
+           spice_channel_get_queue_size(channel) == 0
     ) { do
             spice_channel_recv_msg(channel,
                                    (handler_msg_in)SPICE_CHANNEL_GET_CLASS(channel)->handle_msg, NULL);
-- 
2.17.0



More information about the Spice-devel mailing list