[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue

Snir Sheriber ssheribe at redhat.com
Wed Jul 4 08:09:03 UTC 2018


Hi,


On 07/03/2018 06:22 PM, Frediano Ziglio wrote:
>> Hi,
>>
>>
>> On 05/25/2018 03:55 PM, Frediano Ziglio wrote:
>>> display_queue is queued with decoded frames ready to be displayed.
>>> However current code can insert a timeout before displaying and
>>> removing the queued frames. As the frames are not compressed the
>>> usage of memory by this queue can became in some cases quite
>>> huge (in the order of different gigabytes).
>> Using the word different sounds a bit misplaced to me but
>> i may be wrong..
>>
> Maybe multiple (meaning "more than one") ?

Sounds good

>
>>> To prevent this do not queue all the frames but just one and count the
>>> pending frames.
>>> This way GStreamer will stop queuing after a while.
>>>
>>> Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
>>> --
>>> Changes since v1:
>>> - fix pending_samples if some sample get dropped;
>>> - limit output queue;
>>> - drop RFC.
>>> ---
>>>    src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
>>>    1 file changed, 53 insertions(+), 29 deletions(-)
>>>
>>> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
>>> index 3d0827a4..c626d3fd 100644
>>> --- a/src/channel-display-gst.c
>>> +++ b/src/channel-display-gst.c
>>> @@ -29,6 +29,8 @@
>>>    #include <gst/video/gstvideometa.h>
>>>    
>>>    
>>> +typedef struct SpiceGstFrame SpiceGstFrame;
>>> +
>>>    /* GStreamer decoder implementation */
>>>    
>>>    typedef struct SpiceGstDecoder {
>>> @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
>>>    
>>>        GMutex queues_mutex;
>>>        GQueue *decoding_queue;
>>> -    GQueue *display_queue;
>>> +    SpiceGstFrame *display_frame;
>>>        guint timer_id;
>>> +    guint pending_samples;
>>>    } SpiceGstDecoder;
>>>    
>>>    #define VALID_VIDEO_CODEC_TYPE(codec) \
>>> @@ -76,11 +79,11 @@ typedef enum {
>>>    
>>>    /* ---------- SpiceGstFrame ---------- */
>>>    
>>> -typedef struct SpiceGstFrame {
>>> +struct SpiceGstFrame {
>>>        GstClockTime timestamp;
>>>        SpiceFrame *frame;
>>>        GstSample *sample;
>>> -} SpiceGstFrame;
>>> +};
>>>    
>>>    static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
>>>    *frame)
>>>    {
>>> @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
>>>    /* ---------- GStreamer pipeline ---------- */
>>>    
>>>    static void schedule_frame(SpiceGstDecoder *decoder);
>>> +static void fetch_pending_sample(SpiceGstDecoder *decoder);
>>>    
>>>    static int spice_gst_buffer_get_stride(GstBuffer *buffer)
>>>    {
>>> @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
>>>    
>>>        g_mutex_lock(&decoder->queues_mutex);
>>>        decoder->timer_id = 0;
>>> -    gstframe = g_queue_pop_head(decoder->display_queue);
>>> +    gstframe = decoder->display_frame;
>>> +    decoder->display_frame = NULL;
>>>        g_mutex_unlock(&decoder->queues_mutex);
>>>        /* If the queue is empty we don't even need to reschedule */
>>>        g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
>>> @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>        g_mutex_lock(&decoder->queues_mutex);
>>>    
>>>        while (!decoder->timer_id) {
>>> -        SpiceGstFrame *gstframe =
>>> g_queue_peek_head(decoder->display_queue);
>>> +        while (decoder->display_frame == NULL && decoder->pending_samples)
>>> {
>>> +            fetch_pending_sample(decoder);
>>> +        }
>>> +
>>> +        SpiceGstFrame *gstframe = decoder->display_frame;
>>>            if (!gstframe) {
>>>                break;
>>>            }
>>> @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>            if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
>>>                decoder->timer_id = g_timeout_add(gstframe->frame->mm_time -
>>>                now,
>>>                                                  display_frame, decoder);
>>> -        } else if (g_queue_get_length(decoder->display_queue) == 1) {
>>> +        } else if (decoder->display_frame && !decoder->pending_samples) {
>>>                /* Still attempt to display the least out of date frame so
>>>                the
>>>                 * video is not completely frozen for an extended period of
>>>                 time.
>>>                 */
>>> @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>                            __FUNCTION__, now - gstframe->frame->mm_time,
>>>                            gstframe->frame->mm_time, now);
>>>                stream_dropped_frame_on_playback(decoder->base.stream);
>>> -            g_queue_pop_head(decoder->display_queue);
>>> +            decoder->display_frame = NULL;
>>>                free_gst_frame(gstframe);
>>>            }
>>>        }
>>> @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>        g_mutex_unlock(&decoder->queues_mutex);
>>>    }
>>>    
>>> -/* GStreamer thread
>>> - *
>>> - * We cannot use GStreamer's signals because they are not always run in
>>> - * the main context. So use a callback (lower overhead) and have it pull
>>> - * the sample to avoid a race with free_pipeline(). This means queuing the
>>> - * decoded frames outside GStreamer. So while we're at it, also schedule
>>> - * the frame display ourselves in schedule_frame().
>>> - */
>>> -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
>>> video_decoder)
>>> +static void fetch_pending_sample(SpiceGstDecoder *decoder)
>>>    {
>>> -    SpiceGstDecoder *decoder = video_decoder;
>>> -
>>>        GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
>>>        if (sample) {
>>> +        // account for the fetched sample
>>> +        decoder->pending_samples--;
>>> +
>>>            GstBuffer *buffer = gst_sample_get_buffer(sample);
>>> -        g_mutex_lock(&decoder->queues_mutex);
>>>    
>>>            /* gst_app_sink_pull_sample() sometimes returns the same buffer
>>>            twice
>>>             * or buffers that have a modified, and thus unrecognizable, PTS.
>>> @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink *gstappsink,
>>> gpointer video_decoder)
>>>                if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
>>>                    /* The frame is now ready for display */
>>>                    gstframe->sample = sample;
>>> -                g_queue_push_tail(decoder->display_queue, gstframe);
>>> +                decoder->display_frame = gstframe;
>>>    
>>>                    /* Now that we know there is a match, remove it and the
>>>                    older
>>>                     * frames from the decoding queue.
>>> @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
>>> *gstappsink, gpointer video_decoder)
>>>                spice_warning("got an unexpected decoded buffer!");
>>>                gst_sample_unref(sample);
>>>            }
>>> -
>>> -        g_mutex_unlock(&decoder->queues_mutex);
>>> -        schedule_frame(decoder);
>>>        } else {
>>> +        // no more samples to get, possibly some sample was drop
>> was dropped
>>
> I'll update
>
>>> +        decoder->pending_samples = 0;
>>>            spice_warning("GStreamer error: could not pull sample");
>>>        }
>>> +}
>>> +
>>> +/* GStreamer thread
>>> + *
>>> + * We cannot use GStreamer's signals because they are not always run in
>>> + * the main context. So use a callback (lower overhead) and have it pull
>>> + * the sample to avoid a race with free_pipeline(). This means queuing the
>>> + * decoded frames outside GStreamer. So while we're at it, also schedule
>>> + * the frame display ourselves in schedule_frame().
>>> + */
>>> +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
>>> video_decoder)
>>> +{
>>> +    SpiceGstDecoder *decoder = video_decoder;
>>> +
>>> +    g_mutex_lock(&decoder->queues_mutex);
>>> +    decoder->pending_samples++;
>>> +    if (decoder->timer_id && decoder->display_frame) {
>>> +        g_mutex_unlock(&decoder->queues_mutex);
>>> +        return GST_FLOW_OK;
>>> +    }
>>> +    g_mutex_unlock(&decoder->queues_mutex);
>>> +
>>> +    schedule_frame(decoder);
>>> +
>>>        return GST_FLOW_OK;
>>>    }
>>>    
>>> @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
>>> *decoder)
>>>            GstAppSinkCallbacks appsink_cbs = { NULL };
>>>            appsink_cbs.new_sample = new_sample;
>>>            gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
>>>            decoder, NULL);
>>> +        gst_app_sink_set_max_buffers(decoder->appsink, 2);
>> 1. I'm wondering whether limit max buffer to only 2 is enough, may it
>> slow down gstreamer's frame processing? (e.g. is gstremaer capable
>> to do pipelining of some frame decoding\converting process? what
>> happens when p frames are in use and decoding process needs access
>> to more then 2 frames - just a theory, but may work faster with higher
>> number of max_buffers  )
>>
> No, p frames for this purpose are in another queue/array, they are in
> the decoder element.

Max buffers is indeed sink(appsink) property but at least in the
pipeline being built on my fedora pending_samples (which counts
number of frames are currently processed by the pipeline) value
seems to be corresponding with the sink's max_buffers value.
So i suspect this max_buffers property may propagate and affect
the whole pipeline.


(what i did is applying only this patch, disabling gst-video-overlay
and print pending_samples value before app_src_push is called, which
seems to block if more then max_buffers(2) are currently being
processed. suspending the client for a few seconds helps to reach
this max value of buffers.)

Snir.

> I tested with 1, no issues, 2 is already a safer number :-)
>
>> 2. Currently when pending samples arrives to the buffers limit it stays
>> there (currently 2) (probably because of the timing and its fixed by the
>> hack- 3/3 patch)
>>
>> However seems to work well, I'll ack this one (I'd also raise the limit
>> just in
>> case)
>>
> Do you want to raise it anyways? Like what? Honestly I think 2 is enough.
>
>> Snir.
>>
>>> +        gst_app_sink_set_drop(decoder->appsink, FALSE);
>>>        }
>>>        bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
>>>        gst_bus_add_watch(bus, handle_pipeline_message, decoder);
>>> @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
>>> *video_decoder)
>>>            free_gst_frame(gstframe);
>>>        }
>>>        g_queue_free(decoder->decoding_queue);
>>> -    while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
>>> -        free_gst_frame(gstframe);
>>> +    if (decoder->display_frame) {
>>> +        free_gst_frame(decoder->display_frame);
>>>        }
>>> -    g_queue_free(decoder->display_queue);
>>>    
>>>        g_free(decoder);
>>>    
>>> @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
>>> *video_decoder)
>>>     *    the decoding queue using the GStreamer timestamp information to
>>>     deal with
>>>     *    dropped frames. The SpiceGstFrame is popped from the
>>>     decoding_queue.
>>>     * 6) new_sample() then attaches the decompressed frame to the
>>>     SpiceGstFrame,
>>> - *    pushes it to the display_queue and calls schedule_frame().
>>> + *    set into display_frame and calls schedule_frame().
>>>     * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange for
>>>     *    display_frame() to be called, in the main thread, at the right time
>>>     for
>>>     *    the next frame.
>>> - * 8) display_frame() pops the first SpiceGstFrame from the display_queue
>>> and
>>> + * 8) display_frame() use SpiceGstFrame from display_frame and
>>>     *    calls stream_display_frame().
>>>     * 9) display_frame() then frees the SpiceGstFrame, which frees the
>>>     SpiceFrame
>>>     *    and decompressed frame with it.
>>> @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int codec_type,
>>> display_stream *stream)
>>>            decoder->base.stream = stream;
>>>            g_mutex_init(&decoder->queues_mutex);
>>>            decoder->decoding_queue = g_queue_new();
>>> -        decoder->display_queue = g_queue_new();
>>>    
>>>            if (!create_pipeline(decoder)) {
>>>                decoder->base.destroy((VideoDecoder*)decoder);
> Frediano



More information about the Spice-devel mailing list