[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue
Snir Sheriber
ssheribe at redhat.com
Thu Jul 5 12:24:46 UTC 2018
On 07/05/2018 12:18 PM, Frediano Ziglio wrote:
>
> On 07/04/2018 07:00 PM, Frediano Ziglio wrote:
>
> Hi,
>
>
> On 07/03/2018 06:22 PM, Frediano Ziglio wrote:
>
> Hi,
>
>
> On 05/25/2018 03:55 PM, Frediano Ziglio wrote:
>
> display_queue is queued with decoded frames ready to be displayed.
> However current code can insert a timeout before displaying and
> removing the queued frames. As the frames are not compressed the
> usage of memory by this queue can became in some cases quite
> huge (in the order of different gigabytes).
>
> Using the word different sounds a bit misplaced to me but
> i may be wrong..
>
> Maybe multiple (meaning "more than one") ?
>
> Sounds good
>
> To prevent this do not queue all the frames but just one and count the
> pending frames.
> This way GStreamer will stop queuing after a while.
>
> Signed-off-by: Frediano Ziglio<fziglio at redhat.com>
> --
> Changes since v1:
> - fix pending_samples if some sample get dropped;
> - limit output queue;
> - drop RFC.
> ---
> src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
> 1 file changed, 53 insertions(+), 29 deletions(-)
>
> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> index 3d0827a4..c626d3fd 100644
> --- a/src/channel-display-gst.c
> +++ b/src/channel-display-gst.c
> @@ -29,6 +29,8 @@
> #include <gst/video/gstvideometa.h>
>
>
> +typedef struct SpiceGstFrame SpiceGstFrame;
> +
> /* GStreamer decoder implementation */
>
> typedef struct SpiceGstDecoder {
> @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
>
> GMutex queues_mutex;
> GQueue *decoding_queue;
> - GQueue *display_queue;
> + SpiceGstFrame *display_frame;
> guint timer_id;
> + guint pending_samples;
> } SpiceGstDecoder;
>
> #define VALID_VIDEO_CODEC_TYPE(codec) \
> @@ -76,11 +79,11 @@ typedef enum {
>
> /* ---------- SpiceGstFrame ---------- */
>
> -typedef struct SpiceGstFrame {
> +struct SpiceGstFrame {
> GstClockTime timestamp;
> SpiceFrame *frame;
> GstSample *sample;
> -} SpiceGstFrame;
> +};
>
> static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
> *frame)
> {
> @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
> /* ---------- GStreamer pipeline ---------- */
>
> static void schedule_frame(SpiceGstDecoder *decoder);
> +static void fetch_pending_sample(SpiceGstDecoder *decoder);
>
> static int spice_gst_buffer_get_stride(GstBuffer *buffer)
> {
> @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
>
> g_mutex_lock(&decoder->queues_mutex);
> decoder->timer_id = 0;
> - gstframe = g_queue_pop_head(decoder->display_queue);
> + gstframe = decoder->display_frame;
> + decoder->display_frame = NULL;
> g_mutex_unlock(&decoder->queues_mutex);
> /* If the queue is empty we don't even need to reschedule */
> g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
> @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> g_mutex_lock(&decoder->queues_mutex);
>
> while (!decoder->timer_id) {
> - SpiceGstFrame *gstframe =
> g_queue_peek_head(decoder->display_queue);
> + while (decoder->display_frame == NULL &&
> decoder->pending_samples)
> {
> + fetch_pending_sample(decoder);
> + }
> +
> + SpiceGstFrame *gstframe = decoder->display_frame;
> if (!gstframe) {
> break;
> }
> @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
> decoder->timer_id = g_timeout_add(gstframe->frame->mm_time
> -
> now,
> display_frame, decoder);
> - } else if (g_queue_get_length(decoder->display_queue) == 1) {
> + } else if (decoder->display_frame && !decoder->pending_samples)
> {
> /* Still attempt to display the least out of date frame so
> the
> * video is not completely frozen for an extended period
> of
> time.
> */
> @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> __FUNCTION__, now - gstframe->frame->mm_time,
> gstframe->frame->mm_time, now);
> stream_dropped_frame_on_playback(decoder->base.stream);
> - g_queue_pop_head(decoder->display_queue);
> + decoder->display_frame = NULL;
> free_gst_frame(gstframe);
> }
> }
> @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder
> *decoder)
> g_mutex_unlock(&decoder->queues_mutex);
> }
>
> -/* GStreamer thread
> - *
> - * We cannot use GStreamer's signals because they are not always run in
> - * the main context. So use a callback (lower overhead) and have it pull
> - * the sample to avoid a race with free_pipeline(). This means queuing
> the
> - * decoded frames outside GStreamer. So while we're at it, also schedule
> - * the frame display ourselves in schedule_frame().
> - */
> -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> video_decoder)
> +static void fetch_pending_sample(SpiceGstDecoder *decoder)
> {
> - SpiceGstDecoder *decoder = video_decoder;
> -
> GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
> if (sample) {
> + // account for the fetched sample
> + decoder->pending_samples--;
> +
> GstBuffer *buffer = gst_sample_get_buffer(sample);
> - g_mutex_lock(&decoder->queues_mutex);
>
> /* gst_app_sink_pull_sample() sometimes returns the same
> buffer
> twice
> * or buffers that have a modified, and thus unrecognizable,
> PTS.
> @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink
> *gstappsink,
> gpointer video_decoder)
> if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
> /* The frame is now ready for display */
> gstframe->sample = sample;
> - g_queue_push_tail(decoder->display_queue, gstframe);
> + decoder->display_frame = gstframe;
>
> /* Now that we know there is a match, remove it and
> the
> older
> * frames from the decoding queue.
> @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
> *gstappsink, gpointer video_decoder)
> spice_warning("got an unexpected decoded buffer!");
> gst_sample_unref(sample);
> }
> -
> - g_mutex_unlock(&decoder->queues_mutex);
> - schedule_frame(decoder);
> } else {
> + // no more samples to get, possibly some sample was drop
>
> was dropped
>
> I'll update
>
> + decoder->pending_samples = 0;
> spice_warning("GStreamer error: could not pull sample");
> }
> +}
> +
> +/* GStreamer thread
> + *
> + * We cannot use GStreamer's signals because they are not always run in
> + * the main context. So use a callback (lower overhead) and have it pull
> + * the sample to avoid a race with free_pipeline(). This means queuing
> the
> + * decoded frames outside GStreamer. So while we're at it, also schedule
> + * the frame display ourselves in schedule_frame().
> + */
> +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> video_decoder)
> +{
> + SpiceGstDecoder *decoder = video_decoder;
> +
> + g_mutex_lock(&decoder->queues_mutex);
> + decoder->pending_samples++;
> + if (decoder->timer_id && decoder->display_frame) {
> + g_mutex_unlock(&decoder->queues_mutex);
> + return GST_FLOW_OK;
> + }
> + g_mutex_unlock(&decoder->queues_mutex);
> +
> + schedule_frame(decoder);
> +
> return GST_FLOW_OK;
> }
>
> @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
> *decoder)
> GstAppSinkCallbacks appsink_cbs = { NULL };
> appsink_cbs.new_sample = new_sample;
> gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
> decoder, NULL);
> + gst_app_sink_set_max_buffers(decoder->appsink, 2);
>
> 1. I'm wondering whether limit max buffer to only 2 is enough, may it
> slow down gstreamer's frame processing? (e.g. is gstremaer capable
> to do pipelining of some frame decoding\converting process? what
> happens when p frames are in use and decoding process needs access
> to more then 2 frames - just a theory, but may work faster with higher
> number of max_buffers )
>
> No, p frames for this purpose are in another queue/array, they are in
> the decoder element.
>
> Max buffers is indeed sink(appsink) property but at least in the
> pipeline being built on my fedora pending_samples (which counts
> number of frames are currently processed by the pipeline) value
> seems to be corresponding with the sink's max_buffers value.
> So i suspect this max_buffers property may propagate and affect
> the whole pipeline.
>
> Even if the value is not propagated in the sense of value copied
> if an element is not accepting more data the element feeding it
> will probably stop (and this is recursive) which is actually what
> we want.
>
> Hi,
>
> Ok I've just read docs again, it says the queue is indeed only in
> the sink and if it's
> full it will block the other elements (appsrc will block because
> the sink queue
> is full? i guess it happens to me because pipeline has no other
> queues in its
> elements) so i'm fine with 2.
>
> So when latency is built pending_samples reaches max_buffers and
> stays there
> (once it happens max_buffers value does not really have a meaning)
> , is it possible
> to somehow fetch these to make it 0 again?
>
> What do you mean by "once it happens max_buffers value does not really
> have a meaning"?
> pending_samples should have the count of the samples in the queue,
> just that this queue is
> limited by max_buffers.
Yes, i meant that in the case it's becoming full max_buffers could be
also be 1.
> Samples will be fetched again after we display the pending frame, in
> this case we remove
> from the queue possibly restarting the pipeline. pending_samples will
> eventually go back to
> 0.
>
> You simply cannot continue to put stuff in your mouth if is full,
> isn't it? I mean, for
> me the problem is simple as that.
I see, i thought to avoid pushing frames in certain cases but in second
thought it wouldn't help much.
I'm fine with pushing it :)
Snir.
>
> Frediano
>
>
> Snir.
>
> (what i did is applying only this patch, disabling gst-video-overlay
> and print pending_samples value before app_src_push is called, which
> seems to block if more then max_buffers(2) are currently being
> processed. suspending the client for a few seconds helps to reach
> this max value of buffers.)
>
> This supports my theory that 2 is more than enough. Also consider that
> an additional frame is "queued" for display.
>
> Snir.
>
> I tested with 1, no issues, 2 is already a safer number :-)
>
> 2. Currently when pending samples arrives to the buffers limit it stays
> there (currently 2) (probably because of the timing and its fixed by the
> hack- 3/3 patch)
>
> However seems to work well, I'll ack this one (I'd also raise the limit
> just in
> case)
>
> Do you want to raise it anyways? Like what? Honestly I think 2 is enough.
>
> Snir.
>
> + gst_app_sink_set_drop(decoder->appsink, FALSE);
> }
> bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
> gst_bus_add_watch(bus, handle_pipeline_message, decoder);
> @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
> *video_decoder)
> free_gst_frame(gstframe);
> }
> g_queue_free(decoder->decoding_queue);
> - while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
> - free_gst_frame(gstframe);
> + if (decoder->display_frame) {
> + free_gst_frame(decoder->display_frame);
> }
> - g_queue_free(decoder->display_queue);
>
> g_free(decoder);
>
> @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
> *video_decoder)
> * the decoding queue using the GStreamer timestamp information to
> deal with
> * dropped frames. The SpiceGstFrame is popped from the
> decoding_queue.
> * 6) new_sample() then attaches the decompressed frame to the
> SpiceGstFrame,
> - * pushes it to the display_queue and calls schedule_frame().
> + * set into display_frame and calls schedule_frame().
> * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange
> for
> * display_frame() to be called, in the main thread, at the right
> time
> for
> * the next frame.
> - * 8) display_frame() pops the first SpiceGstFrame from the
> display_queue
> and
> + * 8) display_frame() use SpiceGstFrame from display_frame and
> * calls stream_display_frame().
> * 9) display_frame() then frees the SpiceGstFrame, which frees the
> SpiceFrame
> * and decompressed frame with it.
> @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int
> codec_type,
> display_stream *stream)
> decoder->base.stream = stream;
> g_mutex_init(&decoder->queues_mutex);
> decoder->decoding_queue = g_queue_new();
> - decoder->display_queue = g_queue_new();
>
> if (!create_pipeline(decoder)) {
> decoder->base.destroy((VideoDecoder*)decoder);
>
> Frediano
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/spice-devel/attachments/20180705/105187d9/attachment-0001.html>
More information about the Spice-devel
mailing list