[Spice-devel] [client v14 25/29] spice-gtk: Let the video decoder queue, schedule and drop the frames
Pavel Grunt
pgrunt at redhat.com
Tue May 17 07:14:59 UTC 2016
On Wed, 2016-05-04 at 11:44 +0200, Francois Gouget wrote:
> Signed-off-by: Francois Gouget <fgouget at codeweavers.com>
Acked-by: Pavel Grunt <pgrunt at redhat.com>
> ---
> src/channel-display-mjpeg.c | 142 ++++++++++++++++++++++++++++---
> src/channel-display-priv.h | 10 ++-
> src/channel-display.c | 201 +++++++++++------------------------------
> ---
> 3 files changed, 189 insertions(+), 164 deletions(-)
>
> diff --git a/src/channel-display-mjpeg.c b/src/channel-display-mjpeg.c
> index 927827b..1238b41 100644
> --- a/src/channel-display-mjpeg.c
> +++ b/src/channel-display-mjpeg.c
> @@ -31,11 +31,16 @@ typedef struct MJpegDecoder {
>
> /* ---------- The builtin mjpeg decoder ---------- */
>
> - SpiceMsgIn *frame_msg;
> struct jpeg_source_mgr mjpeg_src;
> struct jpeg_decompress_struct mjpeg_cinfo;
> struct jpeg_error_mgr mjpeg_jerr;
>
> + /* ---------- Frame queue ---------- */
> +
> + GQueue *msgq;
> + SpiceMsgIn *cur_frame_msg;
> + guint timer_id;
> +
> /* ---------- Output frame data ---------- */
>
> uint8_t *out_frame;
> @@ -50,7 +55,7 @@ static void mjpeg_src_init(struct jpeg_decompress_struct
> *cinfo)
> MJpegDecoder *decoder = SPICE_CONTAINEROF(cinfo->src, MJpegDecoder,
> mjpeg_src);
>
> uint8_t *data;
> - cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->frame_msg,
> &data);
> + cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder-
> >cur_frame_msg, &data);
> cinfo->src->next_input_byte = data;
> }
>
> @@ -72,10 +77,12 @@ static void mjpeg_src_term(struct jpeg_decompress_struct
> *cinfo)
> }
>
>
> -/* ---------- VideoDecoder's public API ---------- */
> +/* ---------- Decoder proper ---------- */
>
> -static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder,
> - SpiceMsgIn *frame_msg)
> +static void mjpeg_decoder_schedule(MJpegDecoder *decoder);
> +
> +/* main context */
> +static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder)
> {
> MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
> gboolean back_compat = decoder->base.stream->channel->priv-
> >peer_hdr.major_version == 1;
> @@ -84,8 +91,7 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder
> *video_decoder,
> uint8_t *dest;
> uint8_t *lines[4];
>
> - decoder->frame_msg = frame_msg;
> - stream_get_dimensions(decoder->base.stream, frame_msg, &width, &height);
> + stream_get_dimensions(decoder->base.stream, decoder->cur_frame_msg,
> &width, &height);
> if (decoder->out_size < width * height * 4) {
> g_free(decoder->out_frame);
> decoder->out_size = width * height * 4;
> @@ -118,7 +124,7 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder
> *video_decoder,
> */
> if (decoder->mjpeg_cinfo.rec_outbuf_height > G_N_ELEMENTS(lines)) {
> jpeg_abort_decompress(&decoder->mjpeg_cinfo);
> - g_return_val_if_reached(NULL);
> + g_return_val_if_reached(G_SOURCE_REMOVE);
> }
>
> while (decoder->mjpeg_cinfo.output_scanline < decoder-
> >mjpeg_cinfo.output_height) {
> @@ -161,12 +167,125 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder
> *video_decoder,
> }
> jpeg_finish_decompress(&decoder->mjpeg_cinfo);
>
> - return decoder->out_frame;
> + /* Display the frame and dispose of it */
> + stream_display_frame(decoder->base.stream, decoder->cur_frame_msg,
> decoder->out_frame);
> + spice_msg_in_unref(decoder->cur_frame_msg);
> + decoder->cur_frame_msg = NULL;
> + decoder->timer_id = 0;
> +
> + /* Schedule the next frame */
> + mjpeg_decoder_schedule(decoder);
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +/* ---------- VideoDecoder's queue scheduling ---------- */
> +
> +static void mjpeg_decoder_schedule(MJpegDecoder *decoder)
> +{
> + SPICE_DEBUG("%s", __FUNCTION__);
> + if (decoder->timer_id) {
> + return;
> + }
> +
> + guint32 time = stream_get_time(decoder->base.stream);
> + SpiceMsgIn *frame_msg = decoder->cur_frame_msg;
> + decoder->cur_frame_msg = NULL;
> + do {
> + if (frame_msg) {
> + SpiceStreamDataHeader *op = spice_msg_in_parsed(frame_msg);
> + if (time <= op->multi_media_time) {
> + guint32 d = op->multi_media_time - time;
> + decoder->cur_frame_msg = frame_msg;
> + decoder->timer_id = g_timeout_add(d,
> mjpeg_decoder_decode_frame, decoder);
> + break;
> + }
> +
> + SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime:
> %u), dropping ",
> + __FUNCTION__, time - op->multi_media_time,
> + op->multi_media_time, time);
> + stream_dropped_frame_on_playback(decoder->base.stream);
> + spice_msg_in_unref(frame_msg);
> + }
> + frame_msg = g_queue_pop_head(decoder->msgq);
> + } while (frame_msg);
> +}
> +
> +
> +/* mjpeg_decoder_drop_queue() helper */
> +static void _msg_in_unref_func(gpointer data, gpointer user_data)
> +{
> + spice_msg_in_unref(data);
> +}
> +
> +static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
> +{
> + if (decoder->timer_id != 0) {
> + g_source_remove(decoder->timer_id);
> + decoder->timer_id = 0;
> + }
> + if (decoder->cur_frame_msg) {
> + spice_msg_in_unref(decoder->cur_frame_msg);
> + decoder->cur_frame_msg = NULL;
> + }
> + g_queue_foreach(decoder->msgq, _msg_in_unref_func, NULL);
> + g_queue_clear(decoder->msgq);
> +}
> +
> +/* ---------- VideoDecoder's public API ---------- */
> +
> +static void mjpeg_decoder_queue_frame(VideoDecoder *video_decoder,
> + SpiceMsgIn *frame_msg, int32_t latency)
> +{
> + MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
> + SpiceMsgIn *last_msg;
> +
> + SPICE_DEBUG("%s", __FUNCTION__);
> +
> + last_msg = g_queue_peek_tail(decoder->msgq);
> + if (last_msg) {
> + SpiceStreamDataHeader *last_op, *frame_op;
> + last_op = spice_msg_in_parsed(last_msg);
> + frame_op = spice_msg_in_parsed(frame_msg);
> + if (frame_op->multi_media_time < last_op->multi_media_time) {
> + /* This should really not happen */
> + SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):"
> + " resetting stream, id %d",
> + frame_op->multi_media_time,
> + last_op->multi_media_time, frame_op->id);
> + mjpeg_decoder_drop_queue(decoder);
> + }
> + }
> +
> + /* Dropped MJPEG frames don't impact the ones that come after.
> + * So drop late frames as early as possible to save on processing time.
> + */
> + if (latency < 0) {
> + return;
> + }
> +
> + spice_msg_in_ref(frame_msg);
> + g_queue_push_tail(decoder->msgq, frame_msg);
> + mjpeg_decoder_schedule(decoder);
> +}
> +
> +static void mjpeg_decoder_reschedule(VideoDecoder *video_decoder)
> +{
> + MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
> +
> + SPICE_DEBUG("%s", __FUNCTION__);
> + if (decoder->timer_id != 0) {
> + g_source_remove(decoder->timer_id);
> + decoder->timer_id = 0;
> + }
> + mjpeg_decoder_schedule(decoder);
> }
>
> static void mjpeg_decoder_destroy(VideoDecoder* video_decoder)
> {
> MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
> +
> + mjpeg_decoder_drop_queue(decoder);
> jpeg_destroy_decompress(&decoder->mjpeg_cinfo);
> g_free(decoder->out_frame);
> free(decoder);
> @@ -180,10 +299,13 @@ VideoDecoder* create_mjpeg_decoder(int codec_type,
> display_stream *stream)
> MJpegDecoder *decoder = spice_new0(MJpegDecoder, 1);
>
> decoder->base.destroy = mjpeg_decoder_destroy;
> - decoder->base.decode_frame = mjpeg_decoder_decode_frame;
> + decoder->base.reschedule = mjpeg_decoder_reschedule;
> + decoder->base.queue_frame = mjpeg_decoder_queue_frame;
> decoder->base.codec_type = codec_type;
> decoder->base.stream = stream;
>
> + decoder->msgq = g_queue_new();
> +
> decoder->mjpeg_cinfo.err = jpeg_std_error(&decoder->mjpeg_jerr);
> jpeg_create_decompress(&decoder->mjpeg_cinfo);
>
> diff --git a/src/channel-display-priv.h b/src/channel-display-priv.h
> index 5256ad9..ec48f51 100644
> --- a/src/channel-display-priv.h
> +++ b/src/channel-display-priv.h
> @@ -41,6 +41,9 @@ struct VideoDecoder {
> /* Releases the video decoder's resources */
> void (*destroy)(VideoDecoder *decoder);
>
> + /* Notifies the decoder that the mm-time clock changed. */
> + void (*reschedule)(VideoDecoder *video_decoder);
> +
> /* Decompresses the specified frame.
> *
> * @decoder: The video decoder.
> @@ -49,7 +52,7 @@ struct VideoDecoder {
> * buffer will be invalidated by the next call to
> * decode_frame().
> */
> - uint8_t* (*decode_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg);
> + void (*queue_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg, int32_t
> latency);
>
> /* The format of the encoded video. */
> int codec_type;
> @@ -98,8 +101,6 @@ struct display_stream {
>
> VideoDecoder *video_decoder;
>
> - GQueue *msgq;
> - guint timeout;
> SpiceChannel *channel;
>
> /* stats */
> @@ -127,6 +128,9 @@ struct display_stream {
> };
>
> void stream_get_dimensions(display_stream *st, SpiceMsgIn *frame_msg, int
> *width, int *height);
> +guint32 stream_get_time(display_stream *st);
> +void stream_dropped_frame_on_playback(display_stream *st);
> +void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg, uint8_t*
> data);
> uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data);
>
>
> diff --git a/src/channel-display.c b/src/channel-display.c
> index 4a06193..338a522 100644
> --- a/src/channel-display.c
> +++ b/src/channel-display.c
> @@ -106,11 +106,9 @@ static void channel_set_handlers(SpiceChannelClass
> *klass);
> static void clear_surfaces(SpiceChannel *channel, gboolean keep_primary);
> static void clear_streams(SpiceChannel *channel);
> static display_surface *find_surface(SpiceDisplayChannelPrivate *c, guint32
> surface_id);
> -static gboolean display_stream_render(display_stream *st);
> static void spice_display_channel_reset(SpiceChannel *channel, gboolean
> migrating);
> static void spice_display_channel_reset_capabilities(SpiceChannel *channel);
> static void destroy_canvas(display_surface *surface);
> -static void _msg_in_unref_func(gpointer data, gpointer user_data);
> static void display_session_mm_time_reset_cb(SpiceSession *session, gpointer
> data);
> static SpiceGlScanout* spice_gl_scanout_copy(const SpiceGlScanout *scanout);
>
> @@ -1087,7 +1085,6 @@ static void display_handle_stream_create(SpiceChannel
> *channel, SpiceMsgIn *in)
> spice_msg_in_ref(in);
> st->clip = &op->clip;
> st->surface = find_surface(c, op->surface_id);
> - st->msgq = g_queue_new();
> st->channel = channel;
> st->drops_seqs_stats_arr = g_array_new(FALSE, FALSE,
> sizeof(drops_sequence_stats));
>
> @@ -1106,45 +1103,6 @@ static void display_handle_stream_create(SpiceChannel
> *channel, SpiceMsgIn *in)
> }
> }
>
> -/* coroutine or main context */
> -static gboolean display_stream_schedule(display_stream *st)
> -{
> - SpiceSession *session = spice_channel_get_session(st->channel);
> - guint32 time, d;
> - SpiceStreamDataHeader *op;
> - SpiceMsgIn *in;
> -
> - SPICE_DEBUG("%s", __FUNCTION__);
> - if (st->timeout || !session)
> - return TRUE;
> -
> - time = spice_session_get_mm_time(session);
> - in = g_queue_peek_head(st->msgq);
> -
> - if (in == NULL) {
> - return TRUE;
> - }
> -
> - op = spice_msg_in_parsed(in);
> - if (time < op->multi_media_time) {
> - d = op->multi_media_time - time;
> - SPICE_DEBUG("scheduling next stream render in %u ms", d);
> - st->timeout = g_timeout_add(d, (GSourceFunc)display_stream_render,
> st);
> - return TRUE;
> - } else {
> - SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u),
> dropping ",
> - __FUNCTION__, time - op->multi_media_time,
> - op->multi_media_time, time);
> - in = g_queue_pop_head(st->msgq);
> - spice_msg_in_unref(in);
> - st->num_drops_on_playback++;
> - if (g_queue_get_length(st->msgq) == 0)
> - return TRUE;
> - }
> -
> - return FALSE;
> -}
> -
> static SpiceRect *stream_get_dest(display_stream *st, SpiceMsgIn *frame_msg)
> {
> if (frame_msg == NULL ||
> @@ -1207,66 +1165,54 @@ void stream_get_dimensions(display_stream *st,
> SpiceMsgIn *frame_msg, int *width
> }
> }
>
> -/* main context */
> -static gboolean display_stream_render(display_stream *st)
> +G_GNUC_INTERNAL
> +guint32 stream_get_time(display_stream *st)
> {
> - SpiceMsgIn *in;
> + SpiceSession *session = spice_channel_get_session(st->channel);
> + return session ? spice_session_get_mm_time(session) : 0;
> +}
>
> - st->timeout = 0;
> - do {
> - in = g_queue_pop_head(st->msgq);
> +/* coroutine or main context */
> +G_GNUC_INTERNAL
> +void stream_dropped_frame_on_playback(display_stream *st)
> +{
> + st->num_drops_on_playback++;
> +}
>
> - g_return_val_if_fail(in != NULL, FALSE);
> +/* main context */
> +G_GNUC_INTERNAL
> +void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg,
> + uint8_t* data)
> +{
> + int width, height;
> + SpiceRect *dest;
> + int stride;
>
> - uint8_t *out_frame = NULL;
> - if (st->video_decoder) {
> - out_frame = st->video_decoder->decode_frame(st->video_decoder,
> in);
> - }
> - if (out_frame) {
> - int width;
> - int height;
> - SpiceRect *dest;
> - uint8_t *data;
> - int stride;
> -
> - stream_get_dimensions(st, in, &width, &height);
> - dest = stream_get_dest(st, in);
> -
> - data = out_frame;
> - stride = width * sizeof(uint32_t);
> - if (!(stream_get_flags(st) & SPICE_STREAM_FLAGS_TOP_DOWN)) {
> - data += stride * (height - 1);
> - stride = -stride;
> - }
> + stream_get_dimensions(st, frame_msg, &width, &height);
> + dest = stream_get_dest(st, frame_msg);
>
> - st->surface->canvas->ops->put_image(
> - st->surface->canvas,
> + stride = width * sizeof(uint32_t);
> + if (!(stream_get_flags(st) & SPICE_STREAM_FLAGS_TOP_DOWN)) {
> + data += stride * (height - 1);
> + stride = -stride;
> + }
> +
> + st->surface->canvas->ops->put_image(
> + st->surface->canvas,
> #ifdef G_OS_WIN32
> - SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc,
> + SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc,
> #endif
> - dest, data,
> - width, height, stride,
> - st->have_region ? &st->region : NULL);
> -
> - if (st->surface->primary)
> - g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE],
> 0,
> - dest->left, dest->top,
> - dest->right - dest->left,
> - dest->bottom - dest->top);
> - }
> + dest, data,
> + width, height, stride,
> + st->have_region ? &st->region : NULL);
>
> - spice_msg_in_unref(in);
> -
> - in = g_queue_peek_head(st->msgq);
> - if (in == NULL)
> - break;
> -
> - if (display_stream_schedule(st))
> - return FALSE;
> - } while (1);
> -
> - return FALSE;
> + if (st->surface->primary)
> + g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0,
> + dest->left, dest->top,
> + dest->right - dest->left,
> + dest->bottom - dest->top);
> }
> +
> /* after a sequence of 3 drops, push a report to the server, even
> * if the report window is bigger */
> #define STREAM_REPORT_DROP_SEQ_LEN_LIMIT 3
> @@ -1327,17 +1273,6 @@ static void
> display_update_stream_report(SpiceDisplayChannel *channel, uint32_t
> }
> }
>
> -static void display_stream_reset_rendering_timer(display_stream *st)
> -{
> - SPICE_DEBUG("%s", __FUNCTION__);
> - if (st->timeout != 0) {
> - g_source_remove(st->timeout);
> - st->timeout = 0;
> - }
> - while (!display_stream_schedule(st)) {
> - }
> -}
> -
> /*
> * Migration can occur between 2 spice-servers with different mm-times.
> * Then, the following cases can happen after migration completes:
> @@ -1367,8 +1302,9 @@ static void
> display_stream_reset_rendering_timer(display_stream *st)
> * case 2 is less likely, since at takes at least 20 frames till the dst-
> server re-identifies
> * the video stream and starts sending stream data
> *
> - * display_session_mm_time_reset_cb handles case 1.a, and
> - * display_stream_test_frames_mm_time_reset handles case 2.b
> + * display_session_mm_time_reset_cb handles case 1.a by notifying the
> + * video decoders through their reschedule() method, and case 2.b is handled
> + * directly by the video decoders in their queue_frame() method
> */
>
> /* main context */
> @@ -1388,36 +1324,7 @@ static void
> display_session_mm_time_reset_cb(SpiceSession *session, gpointer dat
> }
> SPICE_DEBUG("%s: stream-id %d", __FUNCTION__, i);
> st = c->streams[i];
> - display_stream_reset_rendering_timer(st);
> - }
> -}
> -
> -/* coroutine context */
> -static void display_stream_test_frames_mm_time_reset(display_stream *st,
> - SpiceMsgIn
> *new_frame_msg,
> - guint32 mm_time)
> -{
> - SpiceStreamDataHeader *tail_op, *new_op;
> - SpiceMsgIn *tail_msg;
> -
> - SPICE_DEBUG("%s", __FUNCTION__);
> - g_return_if_fail(new_frame_msg != NULL);
> - tail_msg = g_queue_peek_tail(st->msgq);
> - if (!tail_msg) {
> - return;
> - }
> - tail_op = spice_msg_in_parsed(tail_msg);
> - new_op = spice_msg_in_parsed(new_frame_msg);
> -
> - if (new_op->multi_media_time < tail_op->multi_media_time) {
> - SPICE_DEBUG("new-frame-time < tail-frame-time (%u < %u):"
> - " reseting stream, id %d",
> - new_op->multi_media_time,
> - tail_op->multi_media_time,
> - new_op->id);
> - g_queue_foreach(st->msgq, _msg_in_unref_func, NULL);
> - g_queue_clear(st->msgq);
> - display_stream_reset_rendering_timer(st);
> + st->video_decoder->reschedule(st->video_decoder);
> }
> }
>
> @@ -1437,7 +1344,7 @@ static void display_handle_stream_data(SpiceChannel
> *channel, SpiceMsgIn *in)
> g_return_if_fail(c->nstreams > op->id);
>
> st = c->streams[op->id];
> - mmtime = spice_session_get_mm_time(spice_channel_get_session(channel));
> + mmtime = stream_get_time(st);
>
> if (spice_msg_in_type(in) == SPICE_MSG_DISPLAY_STREAM_DATA_SIZED) {
> CHANNEL_DEBUG(channel, "stream %d contains sized data", op->id);
> @@ -1467,11 +1374,6 @@ static void display_handle_stream_data(SpiceChannel
> *channel, SpiceMsgIn *in)
> st->playback_sync_drops_seq_len++;
> } else {
> CHANNEL_DEBUG(channel, "video latency: %d", latency);
> - spice_msg_in_ref(in);
> - display_stream_test_frames_mm_time_reset(st, in, mmtime);
> - g_queue_push_tail(st->msgq, in);
> - while (!display_stream_schedule(st)) {
> - }
> if (st->cur_drops_seq_stats.len) {
> st->cur_drops_seq_stats.duration = op->multi_media_time -
> st-
> >cur_drops_seq_stats.start_mm_time;
> @@ -1481,6 +1383,12 @@ static void display_handle_stream_data(SpiceChannel
> *channel, SpiceMsgIn *in)
> }
> st->playback_sync_drops_seq_len = 0;
> }
> +
> + /* Let the video decoder queue the frames so it can optimize their
> + * decoding and best decide if/when to drop them when they are late,
> + * taking into account the impact on later frames.
> + */
> + st->video_decoder->queue_frame(st->video_decoder, in, latency);
> if (c->enable_adaptive_streaming) {
> display_update_stream_report(SPICE_DISPLAY_CHANNEL(channel), op->id,
> op->multi_media_time, latency);
> @@ -1513,11 +1421,6 @@ static void display_handle_stream_clip(SpiceChannel
> *channel, SpiceMsgIn *in)
> display_update_stream_region(st);
> }
>
> -static void _msg_in_unref_func(gpointer data, gpointer user_data)
> -{
> - spice_msg_in_unref(data);
> -}
> -
> static void destroy_stream(SpiceChannel *channel, int id)
> {
> SpiceDisplayChannelPrivate *c = SPICE_DISPLAY_CHANNEL(channel)->priv;
> @@ -1571,10 +1474,6 @@ static void destroy_stream(SpiceChannel *channel, int
> id)
> spice_msg_in_unref(st->msg_clip);
> spice_msg_in_unref(st->msg_create);
>
> - g_queue_foreach(st->msgq, _msg_in_unref_func, NULL);
> - g_queue_free(st->msgq);
> - if (st->timeout != 0)
> - g_source_remove(st->timeout);
> g_free(st);
> c->streams[id] = NULL;
> }
More information about the Spice-devel
mailing list