[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue

Snir Sheriber ssheribe at redhat.com
Thu Jul 5 08:02:51 UTC 2018



On 07/04/2018 07:00 PM, Frediano Ziglio wrote:
>> Hi,
>>
>>
>> On 07/03/2018 06:22 PM, Frediano Ziglio wrote:
>>>> Hi,
>>>>
>>>>
>>>> On 05/25/2018 03:55 PM, Frediano Ziglio wrote:
>>>>> display_queue is queued with decoded frames ready to be displayed.
>>>>> However current code can insert a timeout before displaying and
>>>>> removing the queued frames. As the frames are not compressed the
>>>>> usage of memory by this queue can became in some cases quite
>>>>> huge (in the order of different gigabytes).
>>>> Using the word different sounds a bit misplaced to me but
>>>> i may be wrong..
>>>>
>>> Maybe multiple (meaning "more than one") ?
>> Sounds good
>>
>>>>> To prevent this do not queue all the frames but just one and count the
>>>>> pending frames.
>>>>> This way GStreamer will stop queuing after a while.
>>>>>
>>>>> Signed-off-by: Frediano Ziglio<fziglio at redhat.com>
>>>>> --
>>>>> Changes since v1:
>>>>> - fix pending_samples if some sample get dropped;
>>>>> - limit output queue;
>>>>> - drop RFC.
>>>>> ---
>>>>>     src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
>>>>>     1 file changed, 53 insertions(+), 29 deletions(-)
>>>>>
>>>>> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
>>>>> index 3d0827a4..c626d3fd 100644
>>>>> --- a/src/channel-display-gst.c
>>>>> +++ b/src/channel-display-gst.c
>>>>> @@ -29,6 +29,8 @@
>>>>>     #include <gst/video/gstvideometa.h>
>>>>>     
>>>>>     
>>>>> +typedef struct SpiceGstFrame SpiceGstFrame;
>>>>> +
>>>>>     /* GStreamer decoder implementation */
>>>>>     
>>>>>     typedef struct SpiceGstDecoder {
>>>>> @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
>>>>>     
>>>>>         GMutex queues_mutex;
>>>>>         GQueue *decoding_queue;
>>>>> -    GQueue *display_queue;
>>>>> +    SpiceGstFrame *display_frame;
>>>>>         guint timer_id;
>>>>> +    guint pending_samples;
>>>>>     } SpiceGstDecoder;
>>>>>     
>>>>>     #define VALID_VIDEO_CODEC_TYPE(codec) \
>>>>> @@ -76,11 +79,11 @@ typedef enum {
>>>>>     
>>>>>     /* ---------- SpiceGstFrame ---------- */
>>>>>     
>>>>> -typedef struct SpiceGstFrame {
>>>>> +struct SpiceGstFrame {
>>>>>         GstClockTime timestamp;
>>>>>         SpiceFrame *frame;
>>>>>         GstSample *sample;
>>>>> -} SpiceGstFrame;
>>>>> +};
>>>>>     
>>>>>     static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
>>>>>     *frame)
>>>>>     {
>>>>> @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
>>>>>     /* ---------- GStreamer pipeline ---------- */
>>>>>     
>>>>>     static void schedule_frame(SpiceGstDecoder *decoder);
>>>>> +static void fetch_pending_sample(SpiceGstDecoder *decoder);
>>>>>     
>>>>>     static int spice_gst_buffer_get_stride(GstBuffer *buffer)
>>>>>     {
>>>>> @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
>>>>>     
>>>>>         g_mutex_lock(&decoder->queues_mutex);
>>>>>         decoder->timer_id = 0;
>>>>> -    gstframe = g_queue_pop_head(decoder->display_queue);
>>>>> +    gstframe = decoder->display_frame;
>>>>> +    decoder->display_frame = NULL;
>>>>>         g_mutex_unlock(&decoder->queues_mutex);
>>>>>         /* If the queue is empty we don't even need to reschedule */
>>>>>         g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
>>>>> @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>>>         g_mutex_lock(&decoder->queues_mutex);
>>>>>     
>>>>>         while (!decoder->timer_id) {
>>>>> -        SpiceGstFrame *gstframe =
>>>>> g_queue_peek_head(decoder->display_queue);
>>>>> +        while (decoder->display_frame == NULL &&
>>>>> decoder->pending_samples)
>>>>> {
>>>>> +            fetch_pending_sample(decoder);
>>>>> +        }
>>>>> +
>>>>> +        SpiceGstFrame *gstframe = decoder->display_frame;
>>>>>             if (!gstframe) {
>>>>>                 break;
>>>>>             }
>>>>> @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>>>             if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
>>>>>                 decoder->timer_id = g_timeout_add(gstframe->frame->mm_time
>>>>>                 -
>>>>>                 now,
>>>>>                                                   display_frame, decoder);
>>>>> -        } else if (g_queue_get_length(decoder->display_queue) == 1) {
>>>>> +        } else if (decoder->display_frame && !decoder->pending_samples)
>>>>> {
>>>>>                 /* Still attempt to display the least out of date frame so
>>>>>                 the
>>>>>                  * video is not completely frozen for an extended period
>>>>>                  of
>>>>>                  time.
>>>>>                  */
>>>>> @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>>>>                             __FUNCTION__, now - gstframe->frame->mm_time,
>>>>>                             gstframe->frame->mm_time, now);
>>>>>                 stream_dropped_frame_on_playback(decoder->base.stream);
>>>>> -            g_queue_pop_head(decoder->display_queue);
>>>>> +            decoder->display_frame = NULL;
>>>>>                 free_gst_frame(gstframe);
>>>>>             }
>>>>>         }
>>>>> @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder
>>>>> *decoder)
>>>>>         g_mutex_unlock(&decoder->queues_mutex);
>>>>>     }
>>>>>     
>>>>> -/* GStreamer thread
>>>>> - *
>>>>> - * We cannot use GStreamer's signals because they are not always run in
>>>>> - * the main context. So use a callback (lower overhead) and have it pull
>>>>> - * the sample to avoid a race with free_pipeline(). This means queuing
>>>>> the
>>>>> - * decoded frames outside GStreamer. So while we're at it, also schedule
>>>>> - * the frame display ourselves in schedule_frame().
>>>>> - */
>>>>> -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
>>>>> video_decoder)
>>>>> +static void fetch_pending_sample(SpiceGstDecoder *decoder)
>>>>>     {
>>>>> -    SpiceGstDecoder *decoder = video_decoder;
>>>>> -
>>>>>         GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
>>>>>         if (sample) {
>>>>> +        // account for the fetched sample
>>>>> +        decoder->pending_samples--;
>>>>> +
>>>>>             GstBuffer *buffer = gst_sample_get_buffer(sample);
>>>>> -        g_mutex_lock(&decoder->queues_mutex);
>>>>>     
>>>>>             /* gst_app_sink_pull_sample() sometimes returns the same
>>>>>             buffer
>>>>>             twice
>>>>>              * or buffers that have a modified, and thus unrecognizable,
>>>>>              PTS.
>>>>> @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink
>>>>> *gstappsink,
>>>>> gpointer video_decoder)
>>>>>                 if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
>>>>>                     /* The frame is now ready for display */
>>>>>                     gstframe->sample = sample;
>>>>> -                g_queue_push_tail(decoder->display_queue, gstframe);
>>>>> +                decoder->display_frame = gstframe;
>>>>>     
>>>>>                     /* Now that we know there is a match, remove it and
>>>>>                     the
>>>>>                     older
>>>>>                      * frames from the decoding queue.
>>>>> @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
>>>>> *gstappsink, gpointer video_decoder)
>>>>>                 spice_warning("got an unexpected decoded buffer!");
>>>>>                 gst_sample_unref(sample);
>>>>>             }
>>>>> -
>>>>> -        g_mutex_unlock(&decoder->queues_mutex);
>>>>> -        schedule_frame(decoder);
>>>>>         } else {
>>>>> +        // no more samples to get, possibly some sample was drop
>>>> was dropped
>>>>
>>> I'll update
>>>
>>>>> +        decoder->pending_samples = 0;
>>>>>             spice_warning("GStreamer error: could not pull sample");
>>>>>         }
>>>>> +}
>>>>> +
>>>>> +/* GStreamer thread
>>>>> + *
>>>>> + * We cannot use GStreamer's signals because they are not always run in
>>>>> + * the main context. So use a callback (lower overhead) and have it pull
>>>>> + * the sample to avoid a race with free_pipeline(). This means queuing
>>>>> the
>>>>> + * decoded frames outside GStreamer. So while we're at it, also schedule
>>>>> + * the frame display ourselves in schedule_frame().
>>>>> + */
>>>>> +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
>>>>> video_decoder)
>>>>> +{
>>>>> +    SpiceGstDecoder *decoder = video_decoder;
>>>>> +
>>>>> +    g_mutex_lock(&decoder->queues_mutex);
>>>>> +    decoder->pending_samples++;
>>>>> +    if (decoder->timer_id && decoder->display_frame) {
>>>>> +        g_mutex_unlock(&decoder->queues_mutex);
>>>>> +        return GST_FLOW_OK;
>>>>> +    }
>>>>> +    g_mutex_unlock(&decoder->queues_mutex);
>>>>> +
>>>>> +    schedule_frame(decoder);
>>>>> +
>>>>>         return GST_FLOW_OK;
>>>>>     }
>>>>>     
>>>>> @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
>>>>> *decoder)
>>>>>             GstAppSinkCallbacks appsink_cbs = { NULL };
>>>>>             appsink_cbs.new_sample = new_sample;
>>>>>             gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
>>>>>             decoder, NULL);
>>>>> +        gst_app_sink_set_max_buffers(decoder->appsink, 2);
>>>> 1. I'm wondering whether limit max buffer to only 2 is enough, may it
>>>> slow down gstreamer's frame processing? (e.g. is gstremaer capable
>>>> to do pipelining of some frame decoding\converting process? what
>>>> happens when p frames are in use and decoding process needs access
>>>> to more then 2 frames - just a theory, but may work faster with higher
>>>> number of max_buffers  )
>>>>
>>> No, p frames for this purpose are in another queue/array, they are in
>>> the decoder element.
>> Max buffers is indeed sink(appsink) property but at least in the
>> pipeline being built on my fedora pending_samples (which counts
>> number of frames are currently processed by the pipeline) value
>> seems to be corresponding with the sink's max_buffers value.
>> So i suspect this max_buffers property may propagate and affect
>> the whole pipeline.
>>
> Even if the value is not propagated in the sense of value copied
> if an element is not accepting more data the element feeding it
> will probably stop (and this is recursive) which is actually what
> we want.
Hi,

Ok I've just read docs again, it says the queue is indeed only in the 
sink and if it's
full it will block the other elements (appsrc will block because the 
sink queue
is full? i guess it happens to me because pipeline has no other queues 
in its
elements) so i'm fine with 2.

So when latency is built pending_samples reaches max_buffers and stays there
(once it happens max_buffers value does not really have a meaning) , is 
it possible
to somehow fetch these to make it 0 again?


Snir.

>> (what i did is applying only this patch, disabling gst-video-overlay
>> and print pending_samples value before app_src_push is called, which
>> seems to block if more then max_buffers(2) are currently being
>> processed. suspending the client for a few seconds helps to reach
>> this max value of buffers.)
>>
> This supports my theory that 2 is more than enough. Also consider that
> an additional frame is "queued" for display.
>
>> Snir.
>>
>>> I tested with 1, no issues, 2 is already a safer number :-)
>>>
>>>> 2. Currently when pending samples arrives to the buffers limit it stays
>>>> there (currently 2) (probably because of the timing and its fixed by the
>>>> hack- 3/3 patch)
>>>>
>>>> However seems to work well, I'll ack this one (I'd also raise the limit
>>>> just in
>>>> case)
>>>>
>>> Do you want to raise it anyways? Like what? Honestly I think 2 is enough.
>>>
>>>> Snir.
>>>>
>>>>> +        gst_app_sink_set_drop(decoder->appsink, FALSE);
>>>>>         }
>>>>>         bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
>>>>>         gst_bus_add_watch(bus, handle_pipeline_message, decoder);
>>>>> @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
>>>>> *video_decoder)
>>>>>             free_gst_frame(gstframe);
>>>>>         }
>>>>>         g_queue_free(decoder->decoding_queue);
>>>>> -    while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
>>>>> -        free_gst_frame(gstframe);
>>>>> +    if (decoder->display_frame) {
>>>>> +        free_gst_frame(decoder->display_frame);
>>>>>         }
>>>>> -    g_queue_free(decoder->display_queue);
>>>>>     
>>>>>         g_free(decoder);
>>>>>     
>>>>> @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
>>>>> *video_decoder)
>>>>>      *    the decoding queue using the GStreamer timestamp information to
>>>>>      deal with
>>>>>      *    dropped frames. The SpiceGstFrame is popped from the
>>>>>      decoding_queue.
>>>>>      * 6) new_sample() then attaches the decompressed frame to the
>>>>>      SpiceGstFrame,
>>>>> - *    pushes it to the display_queue and calls schedule_frame().
>>>>> + *    set into display_frame and calls schedule_frame().
>>>>>      * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange
>>>>>      for
>>>>>      *    display_frame() to be called, in the main thread, at the right
>>>>>      time
>>>>>      for
>>>>>      *    the next frame.
>>>>> - * 8) display_frame() pops the first SpiceGstFrame from the
>>>>> display_queue
>>>>> and
>>>>> + * 8) display_frame() use SpiceGstFrame from display_frame and
>>>>>      *    calls stream_display_frame().
>>>>>      * 9) display_frame() then frees the SpiceGstFrame, which frees the
>>>>>      SpiceFrame
>>>>>      *    and decompressed frame with it.
>>>>> @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int
>>>>> codec_type,
>>>>> display_stream *stream)
>>>>>             decoder->base.stream = stream;
>>>>>             g_mutex_init(&decoder->queues_mutex);
>>>>>             decoder->decoding_queue = g_queue_new();
>>>>> -        decoder->display_queue = g_queue_new();
>>>>>     
>>>>>             if (!create_pipeline(decoder)) {
>>>>>                 decoder->base.destroy((VideoDecoder*)decoder);
>>> Frediano

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/spice-devel/attachments/20180705/addcb524/attachment-0001.html>


More information about the Spice-devel mailing list