[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue

Frediano Ziglio fziglio at redhat.com
Fri Jun 22 10:47:40 UTC 2018


ping

> 
> > 
> > On Fri, May 25, 2018 at 01:55:20PM +0100, Frediano Ziglio wrote:
> > > display_queue is queued with decoded frames ready to be displayed.
> > > However current code can insert a timeout before displaying and
> > > removing the queued frames. As the frames are not compressed the
> > > usage of memory by this queue can became in some cases quite
> > > huge (in the order of different gigabytes).
> > > To prevent this do not queue all the frames but just one and count the
> > > pending frames.
> > > This way GStreamer will stop queuing after a while.
> > 
> > long time since I looked at that streaming code, but the reason we do
> > the queueing ourselves is to preserve the lip sync because the mjpeg
> > encoded frames, and the audio  channel, right?
> 
> Yes.
> 
> > When streaming full screen, do we have lip sync (yet?)? Will we ever
> 
> Yes, we still do lip sync also for full screen. Note that with the
> direct streaming patch if streaming-mode is detected the sync is done
> by the screen sync element using PTS time, here we are handling all
> other cases.
> 
> > get it? In other words, why aren't we letting gstreamer handle all the
> > queueing and the frame dropping when doing full screen streaming?
> > 
> 
> GStreamer handles PTS only with the "presentation" element. In case
> there is an appsink (so no presentation element) GStreamer just send
> buffers to appsink without any flow control. Is appsink that can
> be configured to have some limitation.
> On streaming mode there's no appsink so PTS is taken into account
> by the presentation element.
> For no streaming mode but full screen we need to render on the canvas
> so we use the appsink.
> 
> > Christophe
> > 
> 
> Frediano
> 
> > > 
> > > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > > --
> > > Changes since v1:
> > > - fix pending_samples if some sample get dropped;
> > > - limit output queue;
> > > - drop RFC.
> > > ---
> > >  src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
> > >  1 file changed, 53 insertions(+), 29 deletions(-)
> > > 
> > > diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> > > index 3d0827a4..c626d3fd 100644
> > > --- a/src/channel-display-gst.c
> > > +++ b/src/channel-display-gst.c
> > > @@ -29,6 +29,8 @@
> > >  #include <gst/video/gstvideometa.h>
> > >  
> > >  
> > > +typedef struct SpiceGstFrame SpiceGstFrame;
> > > +
> > >  /* GStreamer decoder implementation */
> > >  
> > >  typedef struct SpiceGstDecoder {
> > > @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
> > >  
> > >      GMutex queues_mutex;
> > >      GQueue *decoding_queue;
> > > -    GQueue *display_queue;
> > > +    SpiceGstFrame *display_frame;
> > >      guint timer_id;
> > > +    guint pending_samples;
> > >  } SpiceGstDecoder;
> > >  
> > >  #define VALID_VIDEO_CODEC_TYPE(codec) \
> > > @@ -76,11 +79,11 @@ typedef enum {
> > >  
> > >  /* ---------- SpiceGstFrame ---------- */
> > >  
> > > -typedef struct SpiceGstFrame {
> > > +struct SpiceGstFrame {
> > >      GstClockTime timestamp;
> > >      SpiceFrame *frame;
> > >      GstSample *sample;
> > > -} SpiceGstFrame;
> > > +};
> > >  
> > >  static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
> > >  *frame)
> > >  {
> > > @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
> > >  /* ---------- GStreamer pipeline ---------- */
> > >  
> > >  static void schedule_frame(SpiceGstDecoder *decoder);
> > > +static void fetch_pending_sample(SpiceGstDecoder *decoder);
> > >  
> > >  static int spice_gst_buffer_get_stride(GstBuffer *buffer)
> > >  {
> > > @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
> > >  
> > >      g_mutex_lock(&decoder->queues_mutex);
> > >      decoder->timer_id = 0;
> > > -    gstframe = g_queue_pop_head(decoder->display_queue);
> > > +    gstframe = decoder->display_frame;
> > > +    decoder->display_frame = NULL;
> > >      g_mutex_unlock(&decoder->queues_mutex);
> > >      /* If the queue is empty we don't even need to reschedule */
> > >      g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
> > > @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > >      g_mutex_lock(&decoder->queues_mutex);
> > >  
> > >      while (!decoder->timer_id) {
> > > -        SpiceGstFrame *gstframe =
> > > g_queue_peek_head(decoder->display_queue);
> > > +        while (decoder->display_frame == NULL &&
> > > decoder->pending_samples)
> > > {
> > > +            fetch_pending_sample(decoder);
> > > +        }
> > > +
> > > +        SpiceGstFrame *gstframe = decoder->display_frame;
> > >          if (!gstframe) {
> > >              break;
> > >          }
> > > @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > >          if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
> > >              decoder->timer_id = g_timeout_add(gstframe->frame->mm_time -
> > >              now,
> > >                                                display_frame, decoder);
> > > -        } else if (g_queue_get_length(decoder->display_queue) == 1) {
> > > +        } else if (decoder->display_frame && !decoder->pending_samples)
> > > {
> > >              /* Still attempt to display the least out of date frame so
> > >              the
> > >               * video is not completely frozen for an extended period of
> > >               time.
> > >               */
> > > @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > >                          __FUNCTION__, now - gstframe->frame->mm_time,
> > >                          gstframe->frame->mm_time, now);
> > >              stream_dropped_frame_on_playback(decoder->base.stream);
> > > -            g_queue_pop_head(decoder->display_queue);
> > > +            decoder->display_frame = NULL;
> > >              free_gst_frame(gstframe);
> > >          }
> > >      }
> > > @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder
> > > *decoder)
> > >      g_mutex_unlock(&decoder->queues_mutex);
> > >  }
> > >  
> > > -/* GStreamer thread
> > > - *
> > > - * We cannot use GStreamer's signals because they are not always run in
> > > - * the main context. So use a callback (lower overhead) and have it pull
> > > - * the sample to avoid a race with free_pipeline(). This means queuing
> > > the
> > > - * decoded frames outside GStreamer. So while we're at it, also schedule
> > > - * the frame display ourselves in schedule_frame().
> > > - */
> > > -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> > > video_decoder)
> > > +static void fetch_pending_sample(SpiceGstDecoder *decoder)
> > >  {
> > > -    SpiceGstDecoder *decoder = video_decoder;
> > > -
> > >      GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
> > >      if (sample) {
> > > +        // account for the fetched sample
> > > +        decoder->pending_samples--;
> > > +
> > >          GstBuffer *buffer = gst_sample_get_buffer(sample);
> > > -        g_mutex_lock(&decoder->queues_mutex);
> > >  
> > >          /* gst_app_sink_pull_sample() sometimes returns the same buffer
> > >          twice
> > >           * or buffers that have a modified, and thus unrecognizable,
> > >           PTS.
> > > @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink
> > > *gstappsink,
> > > gpointer video_decoder)
> > >              if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
> > >                  /* The frame is now ready for display */
> > >                  gstframe->sample = sample;
> > > -                g_queue_push_tail(decoder->display_queue, gstframe);
> > > +                decoder->display_frame = gstframe;
> > >  
> > >                  /* Now that we know there is a match, remove it and the
> > >                  older
> > >                   * frames from the decoding queue.
> > > @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
> > > *gstappsink, gpointer video_decoder)
> > >              spice_warning("got an unexpected decoded buffer!");
> > >              gst_sample_unref(sample);
> > >          }
> > > -
> > > -        g_mutex_unlock(&decoder->queues_mutex);
> > > -        schedule_frame(decoder);
> > >      } else {
> > > +        // no more samples to get, possibly some sample was drop
> > > +        decoder->pending_samples = 0;
> > >          spice_warning("GStreamer error: could not pull sample");
> > >      }
> > > +}
> > > +
> > > +/* GStreamer thread
> > > + *
> > > + * We cannot use GStreamer's signals because they are not always run in
> > > + * the main context. So use a callback (lower overhead) and have it pull
> > > + * the sample to avoid a race with free_pipeline(). This means queuing
> > > the
> > > + * decoded frames outside GStreamer. So while we're at it, also schedule
> > > + * the frame display ourselves in schedule_frame().
> > > + */
> > > +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> > > video_decoder)
> > > +{
> > > +    SpiceGstDecoder *decoder = video_decoder;
> > > +
> > > +    g_mutex_lock(&decoder->queues_mutex);
> > > +    decoder->pending_samples++;
> > > +    if (decoder->timer_id && decoder->display_frame) {
> > > +        g_mutex_unlock(&decoder->queues_mutex);
> > > +        return GST_FLOW_OK;
> > > +    }
> > > +    g_mutex_unlock(&decoder->queues_mutex);
> > > +
> > > +    schedule_frame(decoder);
> > > +
> > >      return GST_FLOW_OK;
> > >  }
> > >  
> > > @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
> > > *decoder)
> > >          GstAppSinkCallbacks appsink_cbs = { NULL };
> > >          appsink_cbs.new_sample = new_sample;
> > >          gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
> > >          decoder, NULL);
> > > +        gst_app_sink_set_max_buffers(decoder->appsink, 2);
> > > +        gst_app_sink_set_drop(decoder->appsink, FALSE);
> > >      }
> > >      bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
> > >      gst_bus_add_watch(bus, handle_pipeline_message, decoder);
> > > @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
> > > *video_decoder)
> > >          free_gst_frame(gstframe);
> > >      }
> > >      g_queue_free(decoder->decoding_queue);
> > > -    while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
> > > -        free_gst_frame(gstframe);
> > > +    if (decoder->display_frame) {
> > > +        free_gst_frame(decoder->display_frame);
> > >      }
> > > -    g_queue_free(decoder->display_queue);
> > >  
> > >      g_free(decoder);
> > >  
> > > @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
> > > *video_decoder)
> > >   *    the decoding queue using the GStreamer timestamp information to
> > >   deal
> > >   with
> > >   *    dropped frames. The SpiceGstFrame is popped from the
> > >   decoding_queue.
> > >   * 6) new_sample() then attaches the decompressed frame to the
> > >   SpiceGstFrame,
> > > - *    pushes it to the display_queue and calls schedule_frame().
> > > + *    set into display_frame and calls schedule_frame().
> > >   * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange for
> > >   *    display_frame() to be called, in the main thread, at the right
> > >   time
> > >   for
> > >   *    the next frame.
> > > - * 8) display_frame() pops the first SpiceGstFrame from the
> > > display_queue
> > > and
> > > + * 8) display_frame() use SpiceGstFrame from display_frame and
> > >   *    calls stream_display_frame().
> > >   * 9) display_frame() then frees the SpiceGstFrame, which frees the
> > >   SpiceFrame
> > >   *    and decompressed frame with it.
> > > @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int
> > > codec_type,
> > > display_stream *stream)
> > >          decoder->base.stream = stream;
> > >          g_mutex_init(&decoder->queues_mutex);
> > >          decoder->decoding_queue = g_queue_new();
> > > -        decoder->display_queue = g_queue_new();
> > >  
> > >          if (!create_pipeline(decoder)) {
> > >              decoder->base.destroy((VideoDecoder*)decoder);


More information about the Spice-devel mailing list