[Spice-devel] [PATCH spice-gtk 1/3] RFC channel-display-gst: Prevent accumulating output queue

Frediano Ziglio fziglio at redhat.com
Tue May 22 08:26:24 UTC 2018


> 
> Hi,
> 
> 
> On 05/18/2018 02:50 PM, Frediano Ziglio wrote:
> >> Hi,
> >>
> >>
> >> On 04/19/2018 03:28 PM, Frediano Ziglio wrote:
> >>> display_queue is queued with decoded frames ready to be displayed.
> >>> However current code can insert a timeout before displaying and
> >>> removing the queued frames. As the frames are not compressed the
> >>> usage of memory by this queue can became in some cases quite
> >>> huge (in the order of different gigabytes).
> >>> To prevent this do not queue all the frames but just one and count the
> >>> pending frames.
> >>> This way GStreamer will stop queuing after a while.
> >>>
> >>> Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> >>> ---
> >>>    src/channel-display-gst.c | 76
> >>>    +++++++++++++++++++++++++++++------------------
> >>>    1 file changed, 47 insertions(+), 29 deletions(-)
> >>>
> >>> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> >>> index 0d7aabb..7ba8cd0 100644
> >>> --- a/src/channel-display-gst.c
> >>> +++ b/src/channel-display-gst.c
> >>> @@ -29,6 +29,8 @@
> >>>    #include <gst/video/gstvideometa.h>
> >>>    
> >>>    
> >>> +typedef struct SpiceGstFrame SpiceGstFrame;
> >>> +
> >>>    /* GStreamer decoder implementation */
> >>>    
> >>>    typedef struct SpiceGstDecoder {
> >>> @@ -47,8 +49,9 @@ typedef struct SpiceGstDecoder {
> >>>    
> >>>        GMutex queues_mutex;
> >>>        GQueue *decoding_queue;
> >>> -    GQueue *display_queue;
> >>> +    SpiceGstFrame *display_frame;
> >>>        guint timer_id;
> >>> +    guint pending_samples;
> >>>    } SpiceGstDecoder;
> >>>    
> >>>    #define VALID_VIDEO_CODEC_TYPE(codec) \
> >>> @@ -74,11 +77,11 @@ typedef enum {
> >>>    
> >>>    /* ---------- SpiceGstFrame ---------- */
> >>>    
> >>> -typedef struct SpiceGstFrame {
> >>> +struct SpiceGstFrame {
> >>>        GstClockTime timestamp;
> >>>        SpiceFrame *frame;
> >>>        GstSample *sample;
> >>> -} SpiceGstFrame;
> >>> +};
> >>>    
> >>>    static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
> >>>    *frame)
> >>>    {
> >>> @@ -102,6 +105,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
> >>>    /* ---------- GStreamer pipeline ---------- */
> >>>    
> >>>    static void schedule_frame(SpiceGstDecoder *decoder);
> >>> +static void fetch_pending_sample(SpiceGstDecoder *decoder);
> >>>    
> >>>    static int spice_gst_buffer_get_stride(GstBuffer *buffer)
> >>>    {
> >>> @@ -122,7 +126,8 @@ static gboolean display_frame(gpointer video_decoder)
> >>>    
> >>>        g_mutex_lock(&decoder->queues_mutex);
> >>>        decoder->timer_id = 0;
> >>> -    gstframe = g_queue_pop_head(decoder->display_queue);
> >>> +    gstframe = decoder->display_frame;
> >>> +    decoder->display_frame = NULL;
> >>>        g_mutex_unlock(&decoder->queues_mutex);
> >>>        /* If the queue is empty we don't even need to reschedule */
> >>>        g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
> >>> @@ -168,7 +173,12 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >>>        g_mutex_lock(&decoder->queues_mutex);
> >>>    
> >>>        while (!decoder->timer_id) {
> >>> -        SpiceGstFrame *gstframe =
> >>> g_queue_peek_head(decoder->display_queue);
> >>> +        while (decoder->display_frame == NULL &&
> >>> decoder->pending_samples)
> >>> {
> >>> +            decoder->pending_samples--;
> >>> +            fetch_pending_sample(decoder);
> >> Maybe pending_samples--; should be done after making sure a
> >> pulling\fetching
> >> of a sample was completed successfully.
> >>
> > If there are pending samples fetch_pending_sample is supposed to get a
> > sample anyway. Not sure what could have been so wrong that gstreamer
> > told us "there is a sample" but there is not... "ghost" sample?
> 
> I'm not sure it could happen but i was wondering if there is a chance to
> wrongly count EOS (in the middle of a stream?) or state change (iirc
> those can also release the gst_app_sink_pull_sample blocking..  the
> latter is async event)
> 

Hope not, there's a specific callback for EOS.
And there should be no state change, beside errors stopping the stream
which should be already handled.

Did some experiment. Looks like the default is infinite queue and no drop,so
1) if I don't pull new samples memory goes up till OOM killer trigger;
2) if I limit the queue (2 buffers) and not pull the decoding also stop, the
   memory slightly increase due to the push on the input queue but is slow enough;
3) if I limit the queue, drop (gst_app_sink_set_drop) and not pull the program
   is decoding frames (I can see cpu usage) and memory stays constant;
4) if I limit queue, not drop and pull slowly (once a second) I can note that
   cpu usage is lower than case 3 and memory does not grow.
This seems to indicate that limiting the sink queue is a good idea after all,
obviously there can be cases where gstreamer create an infinite queue in the
middle consuming lot of memory. I think that setting DTS time in this case
could help.

About the pull and counting maybe setting pending_samples to 0 if pull fails
and just decreasing if success seems a good idea.

> >>> +        }
> >>> +
> >>> +        SpiceGstFrame *gstframe = decoder->display_frame;
> >>>            if (!gstframe) {
> >>>                break;
> >>>            }
> >>> @@ -176,7 +186,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >>>            if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
> >>>                decoder->timer_id = g_timeout_add(gstframe->frame->mm_time
> >>>                -
> >>>                now,
> >>>                                                  display_frame, decoder);
> >>> -        } else if (g_queue_get_length(decoder->display_queue) == 1) {
> >>> +        } else if (decoder->display_frame && !decoder->pending_samples)
> >>> {
> >>>                /* Still attempt to display the least out of date frame so
> >>>                the
> >>>                 * video is not completely frozen for an extended period
> >>>                 of
> >>>                 time.
> >>>                 */
> >> Is this comment still true? if there's no pending samples this
> >> is the only frame available (which is probably also out of date)
> >>
> > I suppose can still happen if we just got last frame from queue after
> > rendering another.
> >
> >>> @@ -186,7 +196,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> >>>                            __FUNCTION__, now - gstframe->frame->mm_time,
> >>>                            gstframe->frame->mm_time, now);
> >>>                stream_dropped_frame_on_playback(decoder->base.stream);
> >>> -            g_queue_pop_head(decoder->display_queue);
> >>> +            decoder->display_frame = NULL;
> >>>                free_gst_frame(gstframe);
> >>>            }
> >>>        }
> >>> @@ -194,22 +204,11 @@ static void schedule_frame(SpiceGstDecoder
> >>> *decoder)
> >>>        g_mutex_unlock(&decoder->queues_mutex);
> >>>    }
> >>>    
> >>> -/* GStreamer thread
> >>> - *
> >>> - * We cannot use GStreamer's signals because they are not always run in
> >>> - * the main context. So use a callback (lower overhead) and have it pull
> >>> - * the sample to avoid a race with free_pipeline(). This means queuing
> >>> the
> >>> - * decoded frames outside GStreamer. So while we're at it, also schedule
> >>> - * the frame display ourselves in schedule_frame().
> >>> - */
> >>> -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> >>> video_decoder)
> >>> +static void fetch_pending_sample(SpiceGstDecoder *decoder)
> >>>    {
> >>> -    SpiceGstDecoder *decoder = video_decoder;
> >>> -
> >>>        GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
> >>>        if (sample) {
> >>>            GstBuffer *buffer = gst_sample_get_buffer(sample);
> >>> -        g_mutex_lock(&decoder->queues_mutex);
> >>>    
> >>>            /* gst_app_sink_pull_sample() sometimes returns the same
> >>>            buffer
> >>>            twice
> >>>             * or buffers that have a modified, and thus unrecognizable,
> >>>             PTS.
> >>> @@ -225,7 +224,7 @@ static GstFlowReturn new_sample(GstAppSink
> >>> *gstappsink,
> >>> gpointer video_decoder)
> >>>                if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
> >>>                    /* The frame is now ready for display */
> >>>                    gstframe->sample = sample;
> >>> -                g_queue_push_tail(decoder->display_queue, gstframe);
> >>> +                decoder->display_frame = gstframe;
> >>>    
> >>>                    /* Now that we know there is a match, remove it and
> >>>                    the
> >>>                    older
> >>>                     * frames from the decoding queue.
> >>> @@ -248,12 +247,33 @@ static GstFlowReturn new_sample(GstAppSink
> >>> *gstappsink, gpointer video_decoder)
> >>>                spice_warning("got an unexpected decoded buffer!");
> >>>                gst_sample_unref(sample);
> >>>            }
> >>> -
> >>> -        g_mutex_unlock(&decoder->queues_mutex);
> >>> -        schedule_frame(decoder);
> >>>        } else {
> >>>            spice_warning("GStreamer error: could not pull sample");
> >>>        }
> >>> +}
> >>> +
> >>> +/* GStreamer thread
> >>> + *
> >>> + * We cannot use GStreamer's signals because they are not always run in
> >>> + * the main context. So use a callback (lower overhead) and have it pull
> >>> + * the sample to avoid a race with free_pipeline(). This means queuing
> >>> the
> >>> + * decoded frames outside GStreamer. So while we're at it, also schedule
> >>> + * the frame display ourselves in schedule_frame().
> >>> + */
> >>> +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> >>> video_decoder)
> >>> +{
> >>> +    SpiceGstDecoder *decoder = video_decoder;
> >>> +
> >>> +    g_mutex_lock(&decoder->queues_mutex);
> >>> +    decoder->pending_samples++;
> >>> +    if (decoder->timer_id && decoder->display_frame) {
> >>> +        g_mutex_unlock(&decoder->queues_mutex);
> >>> +        return GST_FLOW_OK;
> >> Should it return GST_FLOW_OK also when the new sample is not being pulled?
> >> (although i assume it's fine)
> >>
> > Looking at the enumeration looks like any other option will disable
> > the pipeline.
> >
> > I have to try but I think would be a good idea to call
> > gst_app_sink_set_max_buffers
> > limiting the gstreamer queue to... I would say 2 (at the end would be 3
> > considering
> > we retrieve a sample as soon as possible).
> > I think this patch reduce the display queue we build after gstreamer
> > (AppSink)
> > but the queue maybe moved just on the step before.
> 
> Yes, if pulling rate is not fast enough the gstreamer's queue will grow.
> but i'm not sure what will happen if we will limit the sink's queue, will
> it throw frames or it will signal the pipeline to slow down (and than
> queuing may move to an earlier step)
> 

Yes, from experiment above the default settings are unlimited queue and
no drop so will grow till memory exhaustion.

> Snir.
> >
> >>> +    }
> >>> +    g_mutex_unlock(&decoder->queues_mutex);
> >>> +
> >>> +    schedule_frame(decoder);
> >>> +
> >>>        return GST_FLOW_OK;
> >>>    }
> >>>    
> >>> @@ -474,10 +494,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
> >>> *video_decoder)
> >>>            free_gst_frame(gstframe);
> >>>        }
> >>>        g_queue_free(decoder->decoding_queue);
> >>> -    while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
> >>> -        free_gst_frame(gstframe);
> >>> +    if (decoder->display_frame) {
> >>> +        free_gst_frame(decoder->display_frame);
> >>>        }
> >>> -    g_queue_free(decoder->display_queue);
> >>>    
> >>>        g_free(decoder);
> >>>    
> >>> @@ -503,11 +522,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
> >>> *video_decoder)
> >>>     *    the decoding queue using the GStreamer timestamp information to
> >>>     deal with
> >>>     *    dropped frames. The SpiceGstFrame is popped from the
> >>>     decoding_queue.
> >>>     * 6) new_sample() then attaches the decompressed frame to the
> >>>     SpiceGstFrame,
> >>> - *    pushes it to the display_queue and calls schedule_frame().
> >>> + *    set into display_frame and calls schedule_frame().
> >>>     * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange
> >>>     for
> >>>     *    display_frame() to be called, in the main thread, at the right
> >>>     time
> >>>     for
> >>>     *    the next frame.
> >>> - * 8) display_frame() pops the first SpiceGstFrame from the
> >>> display_queue
> >>> and
> >>> + * 8) display_frame() use SpiceGstFrame from display_frame and
> >>>     *    calls stream_display_frame().
> >>>     * 9) display_frame() then frees the SpiceGstFrame, which frees the
> >>>     SpiceFrame
> >>>     *    and decompressed frame with it.
> >>> @@ -609,7 +628,6 @@ VideoDecoder* create_gstreamer_decoder(int
> >>> codec_type,
> >>> display_stream *stream)
> >>>            decoder->base.stream = stream;
> >>>            g_mutex_init(&decoder->queues_mutex);
> >>>            decoder->decoding_queue = g_queue_new();
> >>> -        decoder->display_queue = g_queue_new();
> >>>    
> >>>            if (!create_pipeline(decoder)) {
> >>>                decoder->base.destroy((VideoDecoder*)decoder);
> >> Looks good!! tried it and seems to work well (i haven't re-based) i
> >> really prefer
> >> it like that- avoiding the display queue.
> >> I want to believe the different pipelines playbin may build in other
> >> cases will also
> >> behave nicely :| , anyway i'd say these patches should go upstream.
> >>
> > About next patch I was thinking that even better instead of having a
> > fixed size limit and pushing buffers another idea would be to instead
> > handle need-data signal (either with signals or callbacks).
> > 64MB can be really large causing big delays.
> >
> > Frediano
> 
> 


More information about the Spice-devel mailing list