[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue

Frediano Ziglio fziglio at redhat.com
Mon May 28 13:20:02 UTC 2018


> 
> On Fri, May 25, 2018 at 01:55:20PM +0100, Frediano Ziglio wrote:
> > display_queue is queued with decoded frames ready to be displayed.
> > However current code can insert a timeout before displaying and
> > removing the queued frames. As the frames are not compressed the
> > usage of memory by this queue can became in some cases quite
> > huge (in the order of different gigabytes).
> > To prevent this do not queue all the frames but just one and count the
> > pending frames.
> > This way GStreamer will stop queuing after a while.
> 
> long time since I looked at that streaming code, but the reason we do
> the queueing ourselves is to preserve the lip sync because the mjpeg
> encoded frames, and the audio  channel, right?

Yes.

> When streaming full screen, do we have lip sync (yet?)? Will we ever

Yes, we still do lip sync also for full screen. Note that with the
direct streaming patch if streaming-mode is detected the sync is done
by the screen sync element using PTS time, here we are handling all
other cases.

> get it? In other words, why aren't we letting gstreamer handle all the
> queueing and the frame dropping when doing full screen streaming?
> 

GStreamer handles PTS only with the "presentation" element. In case
there is an appsink (so no presentation element) GStreamer just send
buffers to appsink without any flow control. Is appsink that can
be configured to have some limitation.
On streaming mode there's no appsink so PTS is taken into account
by the presentation element.
For no streaming mode but full screen we need to render on the canvas
so we use the appsink.

> Christophe
> 

Frediano

> > 
> > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > --
> > Changes since v1:
> > - fix pending_samples if some sample get dropped;
> > - limit output queue;
> > - drop RFC.
> > ---
> >  src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
> >  1 file changed, 53 insertions(+), 29 deletions(-)
> > 
> > diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> > index 3d0827a4..c626d3fd 100644
> > --- a/src/channel-display-gst.c
> > +++ b/src/channel-display-gst.c
> > @@ -29,6 +29,8 @@
> >  #include <gst/video/gstvideometa.h>
> >  
> >  
> > +typedef struct SpiceGstFrame SpiceGstFrame;
> > +
> >  /* GStreamer decoder implementation */
> >  
> >  typedef struct SpiceGstDecoder {
> > @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
> >  
> >      GMutex queues_mutex;
> >      GQueue *decoding_queue;
> > -    GQueue *display_queue;
> > +    SpiceGstFrame *display_frame;
> >      guint timer_id;
> > +    guint pending_samples;
> >  } SpiceGstDecoder;
> >  
> >  #define VALID_VIDEO_CODEC_TYPE(codec) \
> > @@ -76,11 +79,11 @@ typedef enum {
> >  
> >  /* ---------- SpiceGstFrame ---------- */
> >  
> > -typedef struct SpiceGstFrame {
> > +struct SpiceGstFrame {
> >      GstClockTime timestamp;
> >      SpiceFrame *frame;
> >      GstSample *sample;
> > -} SpiceGstFrame;
> > +};
> >  
> >  static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
> >  *frame)
> >  {
> > @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
> >  /* ---------- GStreamer pipeline ---------- */
> >  
> >  static void schedule_frame(SpiceGstDecoder *decoder);
> > +static void fetch_pending_sample(SpiceGstDecoder *decoder);
> >  
> >  static int spice_gst_buffer_get_stride(GstBuffer *buffer)
> >  {
> > @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
> >  
> >      g_mutex_lock(&decoder->queues_mutex);
> >      decoder->timer_id = 0;
> > -    gstframe = g_queue_pop_head(decoder->display_queue);
> > +    gstframe = decoder->display_frame;
> > +    decoder->display_frame = NULL;
> >      g_mutex_unlock(&decoder->queues_mutex);
> >      /* If the queue is empty we don't even need to reschedule */
> >      g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
> > @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >      g_mutex_lock(&decoder->queues_mutex);
> >  
> >      while (!decoder->timer_id) {
> > -        SpiceGstFrame *gstframe =
> > g_queue_peek_head(decoder->display_queue);
> > +        while (decoder->display_frame == NULL && decoder->pending_samples)
> > {
> > +            fetch_pending_sample(decoder);
> > +        }
> > +
> > +        SpiceGstFrame *gstframe = decoder->display_frame;
> >          if (!gstframe) {
> >              break;
> >          }
> > @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >          if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
> >              decoder->timer_id = g_timeout_add(gstframe->frame->mm_time -
> >              now,
> >                                                display_frame, decoder);
> > -        } else if (g_queue_get_length(decoder->display_queue) == 1) {
> > +        } else if (decoder->display_frame && !decoder->pending_samples) {
> >              /* Still attempt to display the least out of date frame so the
> >               * video is not completely frozen for an extended period of
> >               time.
> >               */
> > @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >                          __FUNCTION__, now - gstframe->frame->mm_time,
> >                          gstframe->frame->mm_time, now);
> >              stream_dropped_frame_on_playback(decoder->base.stream);
> > -            g_queue_pop_head(decoder->display_queue);
> > +            decoder->display_frame = NULL;
> >              free_gst_frame(gstframe);
> >          }
> >      }
> > @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >      g_mutex_unlock(&decoder->queues_mutex);
> >  }
> >  
> > -/* GStreamer thread
> > - *
> > - * We cannot use GStreamer's signals because they are not always run in
> > - * the main context. So use a callback (lower overhead) and have it pull
> > - * the sample to avoid a race with free_pipeline(). This means queuing the
> > - * decoded frames outside GStreamer. So while we're at it, also schedule
> > - * the frame display ourselves in schedule_frame().
> > - */
> > -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> > video_decoder)
> > +static void fetch_pending_sample(SpiceGstDecoder *decoder)
> >  {
> > -    SpiceGstDecoder *decoder = video_decoder;
> > -
> >      GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
> >      if (sample) {
> > +        // account for the fetched sample
> > +        decoder->pending_samples--;
> > +
> >          GstBuffer *buffer = gst_sample_get_buffer(sample);
> > -        g_mutex_lock(&decoder->queues_mutex);
> >  
> >          /* gst_app_sink_pull_sample() sometimes returns the same buffer
> >          twice
> >           * or buffers that have a modified, and thus unrecognizable, PTS.
> > @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink *gstappsink,
> > gpointer video_decoder)
> >              if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
> >                  /* The frame is now ready for display */
> >                  gstframe->sample = sample;
> > -                g_queue_push_tail(decoder->display_queue, gstframe);
> > +                decoder->display_frame = gstframe;
> >  
> >                  /* Now that we know there is a match, remove it and the
> >                  older
> >                   * frames from the decoding queue.
> > @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
> > *gstappsink, gpointer video_decoder)
> >              spice_warning("got an unexpected decoded buffer!");
> >              gst_sample_unref(sample);
> >          }
> > -
> > -        g_mutex_unlock(&decoder->queues_mutex);
> > -        schedule_frame(decoder);
> >      } else {
> > +        // no more samples to get, possibly some sample was drop
> > +        decoder->pending_samples = 0;
> >          spice_warning("GStreamer error: could not pull sample");
> >      }
> > +}
> > +
> > +/* GStreamer thread
> > + *
> > + * We cannot use GStreamer's signals because they are not always run in
> > + * the main context. So use a callback (lower overhead) and have it pull
> > + * the sample to avoid a race with free_pipeline(). This means queuing the
> > + * decoded frames outside GStreamer. So while we're at it, also schedule
> > + * the frame display ourselves in schedule_frame().
> > + */
> > +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> > video_decoder)
> > +{
> > +    SpiceGstDecoder *decoder = video_decoder;
> > +
> > +    g_mutex_lock(&decoder->queues_mutex);
> > +    decoder->pending_samples++;
> > +    if (decoder->timer_id && decoder->display_frame) {
> > +        g_mutex_unlock(&decoder->queues_mutex);
> > +        return GST_FLOW_OK;
> > +    }
> > +    g_mutex_unlock(&decoder->queues_mutex);
> > +
> > +    schedule_frame(decoder);
> > +
> >      return GST_FLOW_OK;
> >  }
> >  
> > @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
> > *decoder)
> >          GstAppSinkCallbacks appsink_cbs = { NULL };
> >          appsink_cbs.new_sample = new_sample;
> >          gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
> >          decoder, NULL);
> > +        gst_app_sink_set_max_buffers(decoder->appsink, 2);
> > +        gst_app_sink_set_drop(decoder->appsink, FALSE);
> >      }
> >      bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
> >      gst_bus_add_watch(bus, handle_pipeline_message, decoder);
> > @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
> > *video_decoder)
> >          free_gst_frame(gstframe);
> >      }
> >      g_queue_free(decoder->decoding_queue);
> > -    while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
> > -        free_gst_frame(gstframe);
> > +    if (decoder->display_frame) {
> > +        free_gst_frame(decoder->display_frame);
> >      }
> > -    g_queue_free(decoder->display_queue);
> >  
> >      g_free(decoder);
> >  
> > @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
> > *video_decoder)
> >   *    the decoding queue using the GStreamer timestamp information to deal
> >   with
> >   *    dropped frames. The SpiceGstFrame is popped from the decoding_queue.
> >   * 6) new_sample() then attaches the decompressed frame to the
> >   SpiceGstFrame,
> > - *    pushes it to the display_queue and calls schedule_frame().
> > + *    set into display_frame and calls schedule_frame().
> >   * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange for
> >   *    display_frame() to be called, in the main thread, at the right time
> >   for
> >   *    the next frame.
> > - * 8) display_frame() pops the first SpiceGstFrame from the display_queue
> > and
> > + * 8) display_frame() use SpiceGstFrame from display_frame and
> >   *    calls stream_display_frame().
> >   * 9) display_frame() then frees the SpiceGstFrame, which frees the
> >   SpiceFrame
> >   *    and decompressed frame with it.
> > @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int codec_type,
> > display_stream *stream)
> >          decoder->base.stream = stream;
> >          g_mutex_init(&decoder->queues_mutex);
> >          decoder->decoding_queue = g_queue_new();
> > -        decoder->display_queue = g_queue_new();
> >  
> >          if (!create_pipeline(decoder)) {
> >              decoder->base.destroy((VideoDecoder*)decoder);
> > --
> > 2.17.0
> > 
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel at lists.freedesktop.org
> > https://lists.freedesktop.org/mailman/listinfo/spice-devel
> 


More information about the Spice-devel mailing list