[Spice-devel] [PATCH spice-gtk v2 1/3] channel-display-gst: Prevent accumulating output queue
Frediano Ziglio
fziglio at redhat.com
Tue Jul 3 15:22:00 UTC 2018
>
> Hi,
>
>
> On 05/25/2018 03:55 PM, Frediano Ziglio wrote:
> > display_queue is queued with decoded frames ready to be displayed.
> > However current code can insert a timeout before displaying and
> > removing the queued frames. As the frames are not compressed the
> > usage of memory by this queue can became in some cases quite
> > huge (in the order of different gigabytes).
> Using the word different sounds a bit misplaced to me but
> i may be wrong..
>
Maybe multiple (meaning "more than one") ?
> > To prevent this do not queue all the frames but just one and count the
> > pending frames.
> > This way GStreamer will stop queuing after a while.
> >
> > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > --
> > Changes since v1:
> > - fix pending_samples if some sample get dropped;
> > - limit output queue;
> > - drop RFC.
> > ---
> > src/channel-display-gst.c | 82 +++++++++++++++++++++++++--------------
> > 1 file changed, 53 insertions(+), 29 deletions(-)
> >
> > diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> > index 3d0827a4..c626d3fd 100644
> > --- a/src/channel-display-gst.c
> > +++ b/src/channel-display-gst.c
> > @@ -29,6 +29,8 @@
> > #include <gst/video/gstvideometa.h>
> >
> >
> > +typedef struct SpiceGstFrame SpiceGstFrame;
> > +
> > /* GStreamer decoder implementation */
> >
> > typedef struct SpiceGstDecoder {
> > @@ -49,8 +51,9 @@ typedef struct SpiceGstDecoder {
> >
> > GMutex queues_mutex;
> > GQueue *decoding_queue;
> > - GQueue *display_queue;
> > + SpiceGstFrame *display_frame;
> > guint timer_id;
> > + guint pending_samples;
> > } SpiceGstDecoder;
> >
> > #define VALID_VIDEO_CODEC_TYPE(codec) \
> > @@ -76,11 +79,11 @@ typedef enum {
> >
> > /* ---------- SpiceGstFrame ---------- */
> >
> > -typedef struct SpiceGstFrame {
> > +struct SpiceGstFrame {
> > GstClockTime timestamp;
> > SpiceFrame *frame;
> > GstSample *sample;
> > -} SpiceGstFrame;
> > +};
> >
> > static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame
> > *frame)
> > {
> > @@ -104,6 +107,7 @@ static void free_gst_frame(SpiceGstFrame *gstframe)
> > /* ---------- GStreamer pipeline ---------- */
> >
> > static void schedule_frame(SpiceGstDecoder *decoder);
> > +static void fetch_pending_sample(SpiceGstDecoder *decoder);
> >
> > static int spice_gst_buffer_get_stride(GstBuffer *buffer)
> > {
> > @@ -124,7 +128,8 @@ static gboolean display_frame(gpointer video_decoder)
> >
> > g_mutex_lock(&decoder->queues_mutex);
> > decoder->timer_id = 0;
> > - gstframe = g_queue_pop_head(decoder->display_queue);
> > + gstframe = decoder->display_frame;
> > + decoder->display_frame = NULL;
> > g_mutex_unlock(&decoder->queues_mutex);
> > /* If the queue is empty we don't even need to reschedule */
> > g_return_val_if_fail(gstframe, G_SOURCE_REMOVE);
> > @@ -170,7 +175,11 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > g_mutex_lock(&decoder->queues_mutex);
> >
> > while (!decoder->timer_id) {
> > - SpiceGstFrame *gstframe =
> > g_queue_peek_head(decoder->display_queue);
> > + while (decoder->display_frame == NULL && decoder->pending_samples)
> > {
> > + fetch_pending_sample(decoder);
> > + }
> > +
> > + SpiceGstFrame *gstframe = decoder->display_frame;
> > if (!gstframe) {
> > break;
> > }
> > @@ -178,7 +187,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > if (spice_mmtime_diff(now, gstframe->frame->mm_time) < 0) {
> > decoder->timer_id = g_timeout_add(gstframe->frame->mm_time -
> > now,
> > display_frame, decoder);
> > - } else if (g_queue_get_length(decoder->display_queue) == 1) {
> > + } else if (decoder->display_frame && !decoder->pending_samples) {
> > /* Still attempt to display the least out of date frame so
> > the
> > * video is not completely frozen for an extended period of
> > time.
> > */
> > @@ -188,7 +197,7 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > __FUNCTION__, now - gstframe->frame->mm_time,
> > gstframe->frame->mm_time, now);
> > stream_dropped_frame_on_playback(decoder->base.stream);
> > - g_queue_pop_head(decoder->display_queue);
> > + decoder->display_frame = NULL;
> > free_gst_frame(gstframe);
> > }
> > }
> > @@ -196,22 +205,14 @@ static void schedule_frame(SpiceGstDecoder *decoder)
> > g_mutex_unlock(&decoder->queues_mutex);
> > }
> >
> > -/* GStreamer thread
> > - *
> > - * We cannot use GStreamer's signals because they are not always run in
> > - * the main context. So use a callback (lower overhead) and have it pull
> > - * the sample to avoid a race with free_pipeline(). This means queuing the
> > - * decoded frames outside GStreamer. So while we're at it, also schedule
> > - * the frame display ourselves in schedule_frame().
> > - */
> > -static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> > video_decoder)
> > +static void fetch_pending_sample(SpiceGstDecoder *decoder)
> > {
> > - SpiceGstDecoder *decoder = video_decoder;
> > -
> > GstSample *sample = gst_app_sink_pull_sample(decoder->appsink);
> > if (sample) {
> > + // account for the fetched sample
> > + decoder->pending_samples--;
> > +
> > GstBuffer *buffer = gst_sample_get_buffer(sample);
> > - g_mutex_lock(&decoder->queues_mutex);
> >
> > /* gst_app_sink_pull_sample() sometimes returns the same buffer
> > twice
> > * or buffers that have a modified, and thus unrecognizable, PTS.
> > @@ -227,7 +228,7 @@ static GstFlowReturn new_sample(GstAppSink *gstappsink,
> > gpointer video_decoder)
> > if (gstframe->timestamp == GST_BUFFER_PTS(buffer)) {
> > /* The frame is now ready for display */
> > gstframe->sample = sample;
> > - g_queue_push_tail(decoder->display_queue, gstframe);
> > + decoder->display_frame = gstframe;
> >
> > /* Now that we know there is a match, remove it and the
> > older
> > * frames from the decoding queue.
> > @@ -250,12 +251,35 @@ static GstFlowReturn new_sample(GstAppSink
> > *gstappsink, gpointer video_decoder)
> > spice_warning("got an unexpected decoded buffer!");
> > gst_sample_unref(sample);
> > }
> > -
> > - g_mutex_unlock(&decoder->queues_mutex);
> > - schedule_frame(decoder);
> > } else {
> > + // no more samples to get, possibly some sample was drop
>
> was dropped
>
I'll update
> > + decoder->pending_samples = 0;
> > spice_warning("GStreamer error: could not pull sample");
> > }
> > +}
> > +
> > +/* GStreamer thread
> > + *
> > + * We cannot use GStreamer's signals because they are not always run in
> > + * the main context. So use a callback (lower overhead) and have it pull
> > + * the sample to avoid a race with free_pipeline(). This means queuing the
> > + * decoded frames outside GStreamer. So while we're at it, also schedule
> > + * the frame display ourselves in schedule_frame().
> > + */
> > +static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer
> > video_decoder)
> > +{
> > + SpiceGstDecoder *decoder = video_decoder;
> > +
> > + g_mutex_lock(&decoder->queues_mutex);
> > + decoder->pending_samples++;
> > + if (decoder->timer_id && decoder->display_frame) {
> > + g_mutex_unlock(&decoder->queues_mutex);
> > + return GST_FLOW_OK;
> > + }
> > + g_mutex_unlock(&decoder->queues_mutex);
> > +
> > + schedule_frame(decoder);
> > +
> > return GST_FLOW_OK;
> > }
> >
> > @@ -463,6 +487,8 @@ static gboolean create_pipeline(SpiceGstDecoder
> > *decoder)
> > GstAppSinkCallbacks appsink_cbs = { NULL };
> > appsink_cbs.new_sample = new_sample;
> > gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs,
> > decoder, NULL);
> > + gst_app_sink_set_max_buffers(decoder->appsink, 2);
>
> 1. I'm wondering whether limit max buffer to only 2 is enough, may it
> slow down gstreamer's frame processing? (e.g. is gstremaer capable
> to do pipelining of some frame decoding\converting process? what
> happens when p frames are in use and decoding process needs access
> to more then 2 frames - just a theory, but may work faster with higher
> number of max_buffers )
>
No, p frames for this purpose are in another queue/array, they are in
the decoder element.
I tested with 1, no issues, 2 is already a safer number :-)
> 2. Currently when pending samples arrives to the buffers limit it stays
> there (currently 2) (probably because of the timing and its fixed by the
> hack- 3/3 patch)
>
> However seems to work well, I'll ack this one (I'd also raise the limit
> just in
> case)
>
Do you want to raise it anyways? Like what? Honestly I think 2 is enough.
> Snir.
>
> > + gst_app_sink_set_drop(decoder->appsink, FALSE);
> > }
> > bus = gst_pipeline_get_bus(GST_PIPELINE(decoder->pipeline));
> > gst_bus_add_watch(bus, handle_pipeline_message, decoder);
> > @@ -521,10 +547,9 @@ static void spice_gst_decoder_destroy(VideoDecoder
> > *video_decoder)
> > free_gst_frame(gstframe);
> > }
> > g_queue_free(decoder->decoding_queue);
> > - while ((gstframe = g_queue_pop_head(decoder->display_queue))) {
> > - free_gst_frame(gstframe);
> > + if (decoder->display_frame) {
> > + free_gst_frame(decoder->display_frame);
> > }
> > - g_queue_free(decoder->display_queue);
> >
> > g_free(decoder);
> >
> > @@ -550,11 +575,11 @@ static void spice_gst_decoder_destroy(VideoDecoder
> > *video_decoder)
> > * the decoding queue using the GStreamer timestamp information to
> > deal with
> > * dropped frames. The SpiceGstFrame is popped from the
> > decoding_queue.
> > * 6) new_sample() then attaches the decompressed frame to the
> > SpiceGstFrame,
> > - * pushes it to the display_queue and calls schedule_frame().
> > + * set into display_frame and calls schedule_frame().
> > * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange for
> > * display_frame() to be called, in the main thread, at the right time
> > for
> > * the next frame.
> > - * 8) display_frame() pops the first SpiceGstFrame from the display_queue
> > and
> > + * 8) display_frame() use SpiceGstFrame from display_frame and
> > * calls stream_display_frame().
> > * 9) display_frame() then frees the SpiceGstFrame, which frees the
> > SpiceFrame
> > * and decompressed frame with it.
> > @@ -661,7 +686,6 @@ VideoDecoder* create_gstreamer_decoder(int codec_type,
> > display_stream *stream)
> > decoder->base.stream = stream;
> > g_mutex_init(&decoder->queues_mutex);
> > decoder->decoding_queue = g_queue_new();
> > - decoder->display_queue = g_queue_new();
> >
> > if (!create_pipeline(decoder)) {
> > decoder->base.destroy((VideoDecoder*)decoder);
Frediano
More information about the Spice-devel
mailing list