[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue
Snir Sheriber
ssheribe at redhat.com
Wed Jul 4 08:09:03 UTC 2018
Hi,
On 07/03/2018 06:22 PM, Frediano Ziglio wrote:
>> Hi,
>>
>>
>> On 05/25/2018 03:55 PM, Frediano Ziglio wrote:
>>> display_queue is queued with decoded frames ready to be displayed.
>>> However current code can insert a timeout before displaying and
>>> removing the queued frames. As the frames are not compressed the
>>> usage of memory by this queue can became in some cases quite
>>> huge (in the order of different gigabytes).
>> Using the word different sounds a bit misplaced to me but
>> i may be wrong..
>>
> Maybe multiple (meaning "more than one") ?
Sounds good
>
>>> To prevent this do not queue all the frames but just one and count the
>>> pending frames.
>>> This way GStreamer will stop queuing after a while.
>>>
>>> Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
>>> --
>>> Changes since v1:
>>> - fix pending_samples if some sample get dropped;
>>> - limit output queue;
>>> - drop RFC.
>>> ---
>>> src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
>>> 1 file changed, 53 insertions(+), 29 deletions(-)
>>>
>>> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
>>> index 3d0827a4..c626d3fd 100644
>>> --- a/src/channel-display-gst.c
>>> +++ b/src/channel-display-gst.c
>>> @@ -29,6 +29,8 @@
>>> #include <gst/video/gstvideometa.h>
>>>
>>>
>>> +typedef struct SpiceGstFrame SpiceGstFrame;
>>> +
>>> /* GStreamer decoder implementation */
>>>
>>> typedef struct SpiceGstDecoder {
>>> @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
>>>
>>> GMutex queues_mutex;
>>> GQueue *decoding_queue;
>>> - GQueue *display_queue;
>>> + SpiceGstFrame *display_frame;
>>> guint timer_id;
>>> + guint pending_samples;
>>> } SpiceGstDecoder;
>>>
>>> #define VALID_VIDEO_CODEC_TYPE(codec) \
>>> @@ -76,11 +79,11 @@ typedef enum {
>>>
>>> /* ---------- SpiceGstFrame ---------- */
>>>
>>> -typedef struct SpiceGstFrame {
>>> +struct SpiceGstFrame {
>>> GstClockTime timestamp;
>>> SpiceFrame *frame;
>>> GstSample *sample;
>>> -} SpiceGstFrame;
>>> +};
>>>
>>> static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
>>> *frame)
>>> {
>>> @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
>>> /* ---------- GStreamer pipeline ---------- */
>>>
>>> static void schedule_frame(SpiceGstDecoder *decoder);
>>> +static void fetch_pending_sample(SpiceGstDecoder *decoder);
>>>
>>> static int spice_gst_buffer_get_stride(GstBuffer *buffer)
>>> {
>>> @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
>>>
>>> g_mutex_lock(&decoder->queues_mutex);
>>> decoder->timer_id = 0;
>>> - gstframe = g_queue_pop_head(decoder->display_queue);
>>> + gstframe = decoder->display_frame;
>>> + decoder->display_frame = NULL;
>>> g_mutex_unlock(&decoder->queues_mutex);
>>> /* If the queue is empty we don't even need to reschedule */
>>> g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
>>> @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>> g_mutex_lock(&decoder->queues_mutex);
>>>
>>> while (!decoder->timer_id) {
>>> - SpiceGstFrame *gstframe =
>>> g_queue_peek_head(decoder->display_queue);
>>> + while (decoder->display_frame == NULL && decoder->pending_samples)
>>> {
>>> + fetch_pending_sample(decoder);
>>> + }
>>> +
>>> + SpiceGstFrame *gstframe = decoder->display_frame;
>>> if (!gstframe) {
>>> break;
>>> }
>>> @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>> if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
>>> decoder->timer_id = g_timeout_add(gstframe->frame->mm_time -
>>> now,
>>> display_frame, decoder);
>>> - } else if (g_queue_get_length(decoder->display_queue) == 1) {
>>> + } else if (decoder->display_frame && !decoder->pending_samples) {
>>> /* Still attempt to display the least out of date frame so
>>> the
>>> * video is not completely frozen for an extended period of
>>> time.
>>> */
>>> @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>> __FUNCTION__, now - gstframe->frame->mm_time,
>>> gstframe->frame->mm_time, now);
>>> stream_dropped_frame_on_playback(decoder->base.stream);
>>> - g_queue_pop_head(decoder->display_queue);
>>> + decoder->display_frame = NULL;
>>> free_gst_frame(gstframe);
>>> }
>>> }
>>> @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder *decoder)
>>> g_mutex_unlock(&decoder->queues_mutex);
>>> }
>>>
>>> -/* GStreamer thread
>>> - *
>>> - * We cannot use GStreamer's signals because they are not always run in
>>> - * the main context. So use a callback (lower overhead) and have it pull
>>> - * the sample to avoid a race with free_pipeline(). This means queuing the
>>> - * decoded frames outside GStreamer. So while we're at it, also schedule
>>> - * the frame display ourselves in schedule_frame().
>>> - */
>>> -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
>>> video_decoder)
>>> +static void fetch_pending_sample(SpiceGstDecoder *decoder)
>>> {
>>> - SpiceGstDecoder *decoder = video_decoder;
>>> -
>>> GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
>>> if (sample) {
>>> + // account for the fetched sample
>>> + decoder->pending_samples--;
>>> +
>>> GstBuffer *buffer = gst_sample_get_buffer(sample);
>>> - g_mutex_lock(&decoder->queues_mutex);
>>>
>>> /* gst_app_sink_pull_sample() sometimes returns the same buffer
>>> twice
>>> * or buffers that have a modified, and thus unrecognizable, PTS.
>>> @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink *gstappsink,
>>> gpointer video_decoder)
>>> if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
>>> /* The frame is now ready for display */
>>> gstframe->sample = sample;
>>> - g_queue_push_tail(decoder->display_queue, gstframe);
>>> + decoder->display_frame = gstframe;
>>>
>>> /* Now that we know there is a match, remove it and the
>>> older
>>> * frames from the decoding queue.
>>> @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
>>> *gstappsink, gpointer video_decoder)
>>> spice_warning("got an unexpected decoded buffer!");
>>> gst_sample_unref(sample);
>>> }
>>> -
>>> - g_mutex_unlock(&decoder->queues_mutex);
>>> - schedule_frame(decoder);
>>> } else {
>>> + // no more samples to get, possibly some sample was drop
>> was dropped
>>
> I'll update
>
>>> + decoder->pending_samples = 0;
>>> spice_warning("GStreamer error: could not pull sample");
>>> }
>>> +}
>>> +
>>> +/* GStreamer thread
>>> + *
>>> + * We cannot use GStreamer's signals because they are not always run in
>>> + * the main context. So use a callback (lower overhead) and have it pull
>>> + * the sample to avoid a race with free_pipeline(). This means queuing the
>>> + * decoded frames outside GStreamer. So while we're at it, also schedule
>>> + * the frame display ourselves in schedule_frame().
>>> + */
>>> +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
>>> video_decoder)
>>> +{
>>> + SpiceGstDecoder *decoder = video_decoder;
>>> +
>>> + g_mutex_lock(&decoder->queues_mutex);
>>> + decoder->pending_samples++;
>>> + if (decoder->timer_id && decoder->display_frame) {
>>> + g_mutex_unlock(&decoder->queues_mutex);
>>> + return GST_FLOW_OK;
>>> + }
>>> + g_mutex_unlock(&decoder->queues_mutex);
>>> +
>>> + schedule_frame(decoder);
>>> +
>>> return GST_FLOW_OK;
>>> }
>>>
>>> @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
>>> *decoder)
>>> GstAppSinkCallbacks appsink_cbs = { NULL };
>>> appsink_cbs.new_sample = new_sample;
>>> gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
>>> decoder, NULL);
>>> + gst_app_sink_set_max_buffers(decoder->appsink, 2);
>> 1. I'm wondering whether limit max buffer to only 2 is enough, may it
>> slow down gstreamer's frame processing? (e.g. is gstremaer capable
>> to do pipelining of some frame decoding\converting process? what
>> happens when p frames are in use and decoding process needs access
>> to more then 2 frames - just a theory, but may work faster with higher
>> number of max_buffers )
>>
> No, p frames for this purpose are in another queue/array, they are in
> the decoder element.
Max buffers is indeed sink(appsink) property but at least in the
pipeline being built on my fedora pending_samples (which counts
number of frames are currently processed by the pipeline) value
seems to be corresponding with the sink's max_buffers value.
So i suspect this max_buffers property may propagate and affect
the whole pipeline.
(what i did is applying only this patch, disabling gst-video-overlay
and print pending_samples value before app_src_push is called, which
seems to block if more then max_buffers(2) are currently being
processed. suspending the client for a few seconds helps to reach
this max value of buffers.)
Snir.
> I tested with 1, no issues, 2 is already a safer number :-)
>
>> 2. Currently when pending samples arrives to the buffers limit it stays
>> there (currently 2) (probably because of the timing and its fixed by the
>> hack- 3/3 patch)
>>
>> However seems to work well, I'll ack this one (I'd also raise the limit
>> just in
>> case)
>>
> Do you want to raise it anyways? Like what? Honestly I think 2 is enough.
>
>> Snir.
>>
>>> + gst_app_sink_set_drop(decoder->appsink, FALSE);
>>> }
>>> bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
>>> gst_bus_add_watch(bus, handle_pipeline_message, decoder);
>>> @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
>>> *video_decoder)
>>> free_gst_frame(gstframe);
>>> }
>>> g_queue_free(decoder->decoding_queue);
>>> - while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
>>> - free_gst_frame(gstframe);
>>> + if (decoder->display_frame) {
>>> + free_gst_frame(decoder->display_frame);
>>> }
>>> - g_queue_free(decoder->display_queue);
>>>
>>> g_free(decoder);
>>>
>>> @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
>>> *video_decoder)
>>> * the decoding queue using the GStreamer timestamp information to
>>> deal with
>>> * dropped frames. The SpiceGstFrame is popped from the
>>> decoding_queue.
>>> * 6) new_sample() then attaches the decompressed frame to the
>>> SpiceGstFrame,
>>> - * pushes it to the display_queue and calls schedule_frame().
>>> + * set into display_frame and calls schedule_frame().
>>> * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange for
>>> * display_frame() to be called, in the main thread, at the right time
>>> for
>>> * the next frame.
>>> - * 8) display_frame() pops the first SpiceGstFrame from the display_queue
>>> and
>>> + * 8) display_frame() use SpiceGstFrame from display_frame and
>>> * calls stream_display_frame().
>>> * 9) display_frame() then frees the SpiceGstFrame, which frees the
>>> SpiceFrame
>>> * and decompressed frame with it.
>>> @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int codec_type,
>>> display_stream *stream)
>>> decoder->base.stream = stream;
>>> g_mutex_init(&decoder->queues_mutex);
>>> decoder->decoding_queue = g_queue_new();
>>> - decoder->display_queue = g_queue_new();
>>>
>>> if (!create_pipeline(decoder)) {
>>> decoder->base.destroy((VideoDecoder*)decoder);
> Frediano
More information about the Spice-devel
mailing list