[Spice-devel] [client v10 23/27] spice-gtk: Let the video decoder queue, schedule and drop the frames

Francois Gouget fgouget at codeweavers.com
Tue Mar 1 15:54:19 UTC 2016


Signed-off-by: Francois Gouget <fgouget at codeweavers.com>
---
 src/channel-display-mjpeg.c | 141 ++++++++++++++++++++++++++++---
 src/channel-display-priv.h  |  10 ++-
 src/channel-display.c       | 201 +++++++++++---------------------------------
 3 files changed, 188 insertions(+), 164 deletions(-)

diff --git a/src/channel-display-mjpeg.c b/src/channel-display-mjpeg.c
index 6271cfb..63ef4cf 100644
--- a/src/channel-display-mjpeg.c
+++ b/src/channel-display-mjpeg.c
@@ -31,11 +31,16 @@ typedef struct MJpegDecoder {
 
     /* ---------- The builtin mjpeg decoder ---------- */
 
-    SpiceMsgIn *frame_msg;
     struct jpeg_source_mgr         mjpeg_src;
     struct jpeg_decompress_struct  mjpeg_cinfo;
     struct jpeg_error_mgr          mjpeg_jerr;
 
+    /* ---------- Frame queue ---------- */
+
+    GQueue *msgq;
+    SpiceMsgIn *cur_frame_msg;
+    guint timer_id;
+
     /* ---------- Output frame data ---------- */
 
     uint8_t *out_frame;
@@ -50,7 +55,7 @@ static void mjpeg_src_init(struct jpeg_decompress_struct *cinfo)
     MJpegDecoder *decoder = SPICE_CONTAINEROF(cinfo->src, MJpegDecoder, mjpeg_src);
 
     uint8_t *data;
-    cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->frame_msg, &data);
+    cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->cur_frame_msg, &data);
     cinfo->src->next_input_byte = data;
 }
 
@@ -72,10 +77,12 @@ static void mjpeg_src_term(struct jpeg_decompress_struct *cinfo)
 }
 
 
-/* ---------- VideoDecoder's public API ---------- */
+/* ---------- Decoder proper ---------- */
 
-static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder,
-                                           SpiceMsgIn *frame_msg)
+static void mjpeg_decoder_schedule(MJpegDecoder *decoder);
+
+/* main context */
+static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder)
 {
     MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
     gboolean back_compat = decoder->base.stream->channel->priv->peer_hdr.major_version == 1;
@@ -84,8 +91,7 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder,
     uint8_t *dest;
     uint8_t *lines[4];
 
-    decoder->frame_msg = frame_msg;
-    stream_get_dimensions(decoder->base.stream, frame_msg, &width, &height);
+    stream_get_dimensions(decoder->base.stream, decoder->cur_frame_msg, &width, &height);
     if (decoder->out_size < width * height * 4) {
         g_free(decoder->out_frame);
         decoder->out_size = width * height * 4;
@@ -118,7 +124,7 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder,
      */
     if (decoder->mjpeg_cinfo.rec_outbuf_height > G_N_ELEMENTS(lines)) {
         jpeg_abort_decompress(&decoder->mjpeg_cinfo);
-        g_return_val_if_reached(NULL);
+        g_return_val_if_reached(G_SOURCE_REMOVE);
     }
 
     while (decoder->mjpeg_cinfo.output_scanline < decoder->mjpeg_cinfo.output_height) {
@@ -161,12 +167,124 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder,
     }
     jpeg_finish_decompress(&decoder->mjpeg_cinfo);
 
-    return decoder->out_frame;
+    /* Display the frame and dispose of it */
+    stream_display_frame(decoder->base.stream, decoder->cur_frame_msg, decoder->out_frame);
+    spice_msg_in_unref(decoder->cur_frame_msg);
+    decoder->cur_frame_msg = NULL;
+    decoder->timer_id = 0;
+
+    /* Schedule the next frame */
+    mjpeg_decoder_schedule(decoder);
+
+    return G_SOURCE_REMOVE;
+}
+
+/* ---------- VideoDecoder's queue scheduling ---------- */
+
+static void mjpeg_decoder_schedule(MJpegDecoder *decoder)
+{
+    SPICE_DEBUG("%s", __FUNCTION__);
+    if (decoder->timer_id) {
+        return;
+    }
+
+    guint32 time = stream_get_time(decoder->base.stream);
+    SpiceMsgIn *frame_msg = decoder->cur_frame_msg;
+    do {
+        if (frame_msg) {
+            SpiceStreamDataHeader *op = spice_msg_in_parsed(frame_msg);
+            if (time < op->multi_media_time) {
+                guint32 d = op->multi_media_time - time;
+                decoder->cur_frame_msg = frame_msg;
+                decoder->timer_id = g_timeout_add(d, mjpeg_decoder_decode_frame, decoder);
+                break;
+            }
+
+            SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping ",
+                        __FUNCTION__, time - op->multi_media_time,
+                        op->multi_media_time, time);
+            stream_dropped_frame(decoder->base.stream);
+            spice_msg_in_unref(frame_msg);
+        }
+        frame_msg = g_queue_pop_head(decoder->msgq);
+    } while (frame_msg);
+}
+
+
+/* mjpeg_decoder_drop_queue() helper */
+static void _msg_in_unref_func(gpointer data, gpointer user_data)
+{
+    spice_msg_in_unref(data);
+}
+
+static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
+{
+    g_queue_foreach(decoder->msgq, _msg_in_unref_func, NULL);
+    g_queue_clear(decoder->msgq);
+    if (decoder->timer_id != 0) {
+        g_source_remove(decoder->timer_id);
+        decoder->timer_id = 0;
+    }
+    if (decoder->cur_frame_msg) {
+        spice_msg_in_unref(decoder->cur_frame_msg);
+        decoder->cur_frame_msg = NULL;
+    }
+}
+
+/* ---------- VideoDecoder's public API ---------- */
+
+static void mjpeg_decoder_queue_frame(VideoDecoder *video_decoder,
+                                      SpiceMsgIn *frame_msg, int32_t latency)
+{
+    MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
+    SpiceMsgIn *last_msg;
+
+    SPICE_DEBUG("%s", __FUNCTION__);
+
+    last_msg = g_queue_peek_tail(decoder->msgq);
+    if (last_msg) {
+        SpiceStreamDataHeader *last_op, *frame_op;
+        last_op = spice_msg_in_parsed(last_msg);
+        frame_op = spice_msg_in_parsed(frame_msg);
+        if (frame_op->multi_media_time < last_op->multi_media_time) {
+            /* This should really not happen */
+            SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):"
+                        " resetting stream, id %d",
+                        frame_op->multi_media_time,
+                        last_op->multi_media_time, frame_op->id);
+            mjpeg_decoder_drop_queue(decoder);
+        }
+    }
+
+    /* Dropped MJPEG frames don't impact the ones that come after.
+     * So drop late frames as early as possible to save on processing time.
+     */
+    if (latency < 0) {
+        return;
+    }
+
+    spice_msg_in_ref(frame_msg);
+    g_queue_push_tail(decoder->msgq, frame_msg);
+    mjpeg_decoder_schedule(decoder);
+}
+
+static void mjpeg_decoder_reschedule(VideoDecoder *video_decoder)
+{
+    MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
+
+    SPICE_DEBUG("%s", __FUNCTION__);
+    if (decoder->timer_id != 0) {
+        g_source_remove(decoder->timer_id);
+        decoder->timer_id = 0;
+    }
+    mjpeg_decoder_schedule(decoder);
 }
 
 static void mjpeg_decoder_destroy(VideoDecoder* video_decoder)
 {
     MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
+
+    mjpeg_decoder_drop_queue(decoder);
     jpeg_destroy_decompress(&decoder->mjpeg_cinfo);
     g_free(decoder->out_frame);
     free(decoder);
@@ -180,10 +298,13 @@ VideoDecoder* create_mjpeg_decoder(int codec_type, display_stream *stream)
     MJpegDecoder *decoder = spice_new0(MJpegDecoder, 1);
 
     decoder->base.destroy = &mjpeg_decoder_destroy;
-    decoder->base.decode_frame = &mjpeg_decoder_decode_frame;
+    decoder->base.reschedule = &mjpeg_decoder_reschedule;
+    decoder->base.queue_frame = &mjpeg_decoder_queue_frame;
     decoder->base.codec_type = codec_type;
     decoder->base.stream = stream;
 
+    decoder->msgq = g_queue_new();
+
     decoder->mjpeg_cinfo.err = jpeg_std_error(&decoder->mjpeg_jerr);
     jpeg_create_decompress(&decoder->mjpeg_cinfo);
 
diff --git a/src/channel-display-priv.h b/src/channel-display-priv.h
index d074119..690addc 100644
--- a/src/channel-display-priv.h
+++ b/src/channel-display-priv.h
@@ -41,6 +41,9 @@ struct VideoDecoder {
     /* Releases the video decoder's resources */
     void (*destroy)(VideoDecoder *decoder);
 
+    /* Notifies the decoder that the mm-time clock changed. */
+    void (*reschedule)(VideoDecoder *video_decoder);
+
     /* Decompresses the specified frame.
      *
      * @decoder:   The video decoder.
@@ -49,7 +52,7 @@ struct VideoDecoder {
      *             buffer will be invalidated by the next call to
      *             decode_frame().
      */
-    uint8_t* (*decode_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg);
+    void (*queue_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg, int32_t latency);
 
     /* The format of the encoded video. */
     int codec_type;
@@ -99,8 +102,6 @@ struct display_stream {
 
     VideoDecoder                *video_decoder;
 
-    GQueue                      *msgq;
-    guint                       timeout;
     SpiceChannel                *channel;
 
     /* stats */
@@ -128,6 +129,9 @@ struct display_stream {
 };
 
 void stream_get_dimensions(display_stream *st, SpiceMsgIn *frame_msg, int *width, int *height);
+guint32 stream_get_time(display_stream *st);
+void stream_dropped_frame(display_stream *st);
+void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg, uint8_t* data);
 uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data);
 
 
diff --git a/src/channel-display.c b/src/channel-display.c
index d3072a7..958413b 100644
--- a/src/channel-display.c
+++ b/src/channel-display.c
@@ -115,11 +115,9 @@ static void channel_set_handlers(SpiceChannelClass *klass);
 static void clear_surfaces(SpiceChannel *channel, gboolean keep_primary);
 static void clear_streams(SpiceChannel *channel);
 static display_surface *find_surface(SpiceDisplayChannelPrivate *c, guint32 surface_id);
-static gboolean display_stream_render(display_stream *st);
 static void spice_display_channel_reset(SpiceChannel *channel, gboolean migrating);
 static void spice_display_channel_reset_capabilities(SpiceChannel *channel);
 static void destroy_canvas(display_surface *surface);
-static void _msg_in_unref_func(gpointer data, gpointer user_data);
 static void display_session_mm_time_reset_cb(SpiceSession *session, gpointer data);
 static SpiceGlScanout* spice_gl_scanout_copy(const SpiceGlScanout *scanout);
 
@@ -1120,7 +1118,6 @@ static void display_handle_stream_create(SpiceChannel *channel, SpiceMsgIn *in)
     spice_msg_in_ref(in);
     st->clip = &op->clip;
     st->surface = find_surface(c, op->surface_id);
-    st->msgq = g_queue_new();
     st->channel = channel;
     st->drops_seqs_stats_arr = g_array_new(FALSE, FALSE, sizeof(drops_sequence_stats));
 
@@ -1139,45 +1136,6 @@ static void display_handle_stream_create(SpiceChannel *channel, SpiceMsgIn *in)
     }
 }
 
-/* coroutine or main context */
-static gboolean display_stream_schedule(display_stream *st)
-{
-    SpiceSession *session = spice_channel_get_session(st->channel);
-    guint32 time, d;
-    SpiceStreamDataHeader *op;
-    SpiceMsgIn *in;
-
-    SPICE_DEBUG("%s", __FUNCTION__);
-    if (st->timeout || !session)
-        return TRUE;
-
-    time = spice_session_get_mm_time(session);
-    in = g_queue_peek_head(st->msgq);
-
-    if (in == NULL) {
-        return TRUE;
-    }
-
-    op = spice_msg_in_parsed(in);
-    if (time < op->multi_media_time) {
-        d = op->multi_media_time - time;
-        SPICE_DEBUG("scheduling next stream render in %u ms", d);
-        st->timeout = g_timeout_add(d, (GSourceFunc)display_stream_render, st);
-        return TRUE;
-    } else {
-        SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping ",
-                    __FUNCTION__, time - op->multi_media_time,
-                    op->multi_media_time, time);
-        in = g_queue_pop_head(st->msgq);
-        spice_msg_in_unref(in);
-        st->num_drops_on_playback++;
-        if (g_queue_get_length(st->msgq) == 0)
-            return TRUE;
-    }
-
-    return FALSE;
-}
-
 static SpiceRect *stream_get_dest(display_stream *st, SpiceMsgIn *frame_msg)
 {
     if (frame_msg == NULL ||
@@ -1240,66 +1198,54 @@ void stream_get_dimensions(display_stream *st, SpiceMsgIn *frame_msg, int *width
    }
 }
 
-/* main context */
-static gboolean display_stream_render(display_stream *st)
+G_GNUC_INTERNAL
+guint32 stream_get_time(display_stream *st)
 {
-    SpiceMsgIn *in;
+    SpiceSession *session = spice_channel_get_session(st->channel);
+    return session ? spice_session_get_mm_time(session) : 0;
+}
 
-    st->timeout = 0;
-    do {
-        in = g_queue_pop_head(st->msgq);
+/* coroutine or main context */
+G_GNUC_INTERNAL
+void stream_dropped_frame(display_stream *st)
+{
+    st->num_drops_on_playback++;
+}
 
-        g_return_val_if_fail(in != NULL, FALSE);
+/* main context */
+G_GNUC_INTERNAL
+void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg,
+                          uint8_t* data)
+{
+    int width, height;
+    SpiceRect *dest;
+    int stride;
 
-        uint8_t *out_frame = NULL;
-        if (st->video_decoder) {
-            out_frame = st->video_decoder->decode_frame(st->video_decoder, in);
-        }
-        if (out_frame) {
-            int width;
-            int height;
-            SpiceRect *dest;
-            uint8_t *data;
-            int stride;
-
-            stream_get_dimensions(st, in, &width, &height);
-            dest = stream_get_dest(st, in);
-
-            data = out_frame;
-            stride = width * sizeof(uint32_t);
-            if (!(stream_get_flags(st) & SPICE_STREAM_FLAGS_TOP_DOWN)) {
-                data += stride * (height - 1);
-                stride = -stride;
-            }
+    stream_get_dimensions(st, frame_msg, &width, &height);
+    dest = stream_get_dest(st, frame_msg);
 
-            st->surface->canvas->ops->put_image(
-                st->surface->canvas,
+    stride = width * sizeof(uint32_t);
+    if (!(stream_get_flags(st) & SPICE_STREAM_FLAGS_TOP_DOWN)) {
+        data += stride * (height - 1);
+        stride = -stride;
+    }
+
+    st->surface->canvas->ops->put_image(
+        st->surface->canvas,
 #ifdef G_OS_WIN32
-                SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc,
+        SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc,
 #endif
-                dest, data,
-                width, height, stride,
-                st->have_region ? &st->region : NULL);
-
-            if (st->surface->primary)
-                g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0,
-                    dest->left, dest->top,
-                    dest->right - dest->left,
-                    dest->bottom - dest->top);
-        }
+        dest, data,
+        width, height, stride,
+        st->have_region ? &st->region : NULL);
 
-        spice_msg_in_unref(in);
-
-        in = g_queue_peek_head(st->msgq);
-        if (in == NULL)
-            break;
-
-        if (display_stream_schedule(st))
-            return FALSE;
-    } while (1);
-
-    return FALSE;
+    if (st->surface->primary)
+        g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0,
+                      dest->left, dest->top,
+                      dest->right - dest->left,
+                      dest->bottom - dest->top);
 }
+
 /* after a sequence of 3 drops, push a report to the server, even
  * if the report window is bigger */
 #define STREAM_REPORT_DROP_SEQ_LEN_LIMIT 3
@@ -1360,17 +1306,6 @@ static void display_update_stream_report(SpiceDisplayChannel *channel, uint32_t
     }
 }
 
-static void display_stream_reset_rendering_timer(display_stream *st)
-{
-    SPICE_DEBUG("%s", __FUNCTION__);
-    if (st->timeout != 0) {
-        g_source_remove(st->timeout);
-        st->timeout = 0;
-    }
-    while (!display_stream_schedule(st)) {
-    }
-}
-
 /*
  * Migration can occur between 2 spice-servers with different mm-times.
  * Then, the following cases can happen after migration completes:
@@ -1400,8 +1335,9 @@ static void display_stream_reset_rendering_timer(display_stream *st)
  * case 2 is less likely, since at takes at least 20 frames till the dst-server re-identifies
  * the video stream and starts sending stream data
  *
- * display_session_mm_time_reset_cb handles case 1.a, and
- * display_stream_test_frames_mm_time_reset handles case 2.b
+ * display_session_mm_time_reset_cb handles case 1.a by notifying the
+ * video decoders through their reschedule() method, and case 2.b is handled
+ * directly by the video decoders in their queue_frame() method
  */
 
 /* main context */
@@ -1421,36 +1357,7 @@ static void display_session_mm_time_reset_cb(SpiceSession *session, gpointer dat
         }
         SPICE_DEBUG("%s: stream-id %d", __FUNCTION__, i);
         st = c->streams[i];
-        display_stream_reset_rendering_timer(st);
-    }
-}
-
-/* coroutine context */
-static void display_stream_test_frames_mm_time_reset(display_stream *st,
-                                                     SpiceMsgIn *new_frame_msg,
-                                                     guint32 mm_time)
-{
-    SpiceStreamDataHeader *tail_op, *new_op;
-    SpiceMsgIn *tail_msg;
-
-    SPICE_DEBUG("%s", __FUNCTION__);
-    g_return_if_fail(new_frame_msg != NULL);
-    tail_msg = g_queue_peek_tail(st->msgq);
-    if (!tail_msg) {
-        return;
-    }
-    tail_op = spice_msg_in_parsed(tail_msg);
-    new_op = spice_msg_in_parsed(new_frame_msg);
-
-    if (new_op->multi_media_time < tail_op->multi_media_time) {
-        SPICE_DEBUG("new-frame-time < tail-frame-time (%u < %u):"
-                    " reseting stream, id %d",
-                    new_op->multi_media_time,
-                    tail_op->multi_media_time,
-                    new_op->id);
-        g_queue_foreach(st->msgq, _msg_in_unref_func, NULL);
-        g_queue_clear(st->msgq);
-        display_stream_reset_rendering_timer(st);
+        st->video_decoder->reschedule(st->video_decoder);
     }
 }
 
@@ -1470,7 +1377,7 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
     g_return_if_fail(c->nstreams > op->id);
 
     st =  c->streams[op->id];
-    mmtime = spice_session_get_mm_time(spice_channel_get_session(channel));
+    mmtime = stream_get_time(st);
 
     if (spice_msg_in_type(in) == SPICE_MSG_DISPLAY_STREAM_DATA_SIZED) {
         CHANNEL_DEBUG(channel, "stream %d contains sized data", op->id);
@@ -1500,11 +1407,6 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
         st->playback_sync_drops_seq_len++;
     } else {
         CHANNEL_DEBUG(channel, "video latency: %d", latency);
-        spice_msg_in_ref(in);
-        display_stream_test_frames_mm_time_reset(st, in, mmtime);
-        g_queue_push_tail(st->msgq, in);
-        while (!display_stream_schedule(st)) {
-        }
         if (st->cur_drops_seq_stats.len) {
             st->cur_drops_seq_stats.duration = op->multi_media_time -
                                                st->cur_drops_seq_stats.start_mm_time;
@@ -1514,6 +1416,12 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
         }
         st->playback_sync_drops_seq_len = 0;
     }
+
+    /* Let the video decoder queue the frames so it can optimize their
+     * decoding and best decide if/when to drop them when they are late,
+     * taking into account the impact on later frames.
+     */
+    st->video_decoder->queue_frame(st->video_decoder, in,  latency);
     if (c->enable_adaptive_streaming) {
         display_update_stream_report(SPICE_DISPLAY_CHANNEL(channel), op->id,
                                      op->multi_media_time, latency);
@@ -1546,11 +1454,6 @@ static void display_handle_stream_clip(SpiceChannel *channel, SpiceMsgIn *in)
     display_update_stream_region(st);
 }
 
-static void _msg_in_unref_func(gpointer data, gpointer user_data)
-{
-    spice_msg_in_unref(data);
-}
-
 static void destroy_stream(SpiceChannel *channel, int id)
 {
     SpiceDisplayChannelPrivate *c = SPICE_DISPLAY_CHANNEL(channel)->priv;
@@ -1604,10 +1507,6 @@ static void destroy_stream(SpiceChannel *channel, int id)
         spice_msg_in_unref(st->msg_clip);
     spice_msg_in_unref(st->msg_create);
 
-    g_queue_foreach(st->msgq, _msg_in_unref_func, NULL);
-    g_queue_free(st->msgq);
-    if (st->timeout != 0)
-        g_source_remove(st->timeout);
     g_free(st);
     c->streams[id] = NULL;
 }
-- 
2.7.0



More information about the Spice-devel mailing list