[Spice-devel] [client v2 4/5] streaming: Remove the video decoder's dependency on SpiceMsgIn messages

Francois Gouget fgouget at codeweavers.com
Thu Apr 6 14:02:53 UTC 2017


This improves the separation between networking and the video decoding
components.
It also makes it easier to reuse the latter should the client one day
receive video streams through other messages.

Signed-off-by: Francois Gouget <fgouget at codeweavers.com>
---
 src/channel-display-gst.c   | 79 +++++++++++++++++++--------------------------
 src/channel-display-mjpeg.c | 78 ++++++++++++++++++++++----------------------
 src/channel-display-priv.h  | 24 +++++++++++---
 src/channel-display.c       | 35 +++++++++++---------
 4 files changed, 113 insertions(+), 103 deletions(-)

diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
index 2c002eb0..9b79403e 100644
--- a/src/channel-display-gst.c
+++ b/src/channel-display-gst.c
@@ -90,23 +90,22 @@ G_STATIC_ASSERT(G_N_ELEMENTS(gst_opts) <= SPICE_VIDEO_CODEC_TYPE_ENUM_END);
 
 typedef struct SpiceGstFrame {
     GstClockTime timestamp;
-    SpiceMsgIn *msg;
+    SpiceFrame *frame;
     GstSample *sample;
 } SpiceGstFrame;
 
-static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceMsgIn *msg)
+static SpiceGstFrame *create_gst_frame(GstBuffer *buffer, SpiceFrame *frame)
 {
     SpiceGstFrame *gstframe = spice_new(SpiceGstFrame, 1);
     gstframe->timestamp = GST_BUFFER_PTS(buffer);
-    gstframe->msg = msg;
-    spice_msg_in_ref(msg);
+    gstframe->frame = frame;
     gstframe->sample = NULL;
     return gstframe;
 }
 
 static void free_gst_frame(SpiceGstFrame *gstframe)
 {
-    spice_msg_in_unref(gstframe->msg);
+    gstframe->frame->free(gstframe->frame);
     if (gstframe->sample) {
         gst_sample_unref(gstframe->sample);
     }
@@ -160,7 +159,7 @@ static gboolean display_frame(gpointer video_decoder)
         goto error;
     }
 
-    stream_display_frame(decoder->base.stream, gstframe->msg,
+    stream_display_frame(decoder->base.stream, gstframe->frame,
                          width, height, mapinfo.data);
     gst_buffer_unmap(buffer, &mapinfo);
 
@@ -182,9 +181,8 @@ static void schedule_frame(SpiceGstDecoder *decoder)
             break;
         }
 
-        SpiceStreamDataHeader *op = spice_msg_in_parsed(gstframe->msg);
-        if (now < op->multi_media_time) {
-            decoder->timer_id = g_timeout_add(op->multi_media_time - now,
+        if (now < gstframe->frame->mm_time) {
+            decoder->timer_id = g_timeout_add(gstframe->frame->mm_time - now,
                                               display_frame, decoder);
         } else if (g_queue_get_length(decoder->display_queue) == 1) {
             /* Still attempt to display the least out of date frame so the
@@ -193,8 +191,8 @@ static void schedule_frame(SpiceGstDecoder *decoder)
             decoder->timer_id = g_timeout_add(0, display_frame, decoder);
         } else {
             SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping",
-                        __FUNCTION__, now - op->multi_media_time,
-                        op->multi_media_time, now);
+                        __FUNCTION__, now - gstframe->frame->mm_time,
+                        gstframe->frame->mm_time, now);
             stream_dropped_frame_on_playback(decoder->base.stream);
             g_queue_pop_head(decoder->display_queue);
             free_gst_frame(gstframe);
@@ -411,23 +409,17 @@ static void spice_gst_decoder_destroy(VideoDecoder *video_decoder)
      */
 }
 
-static void release_buffer_data(gpointer data)
-{
-    SpiceMsgIn* frame_msg = (SpiceMsgIn*)data;
-    spice_msg_in_unref(frame_msg);
-}
 
-/* spice_gst_decoder_queue_frame() queues the SpiceMsgIn message for decoding
- * and displaying. The steps it goes through are as follows:
+/* spice_gst_decoder_queue_frame() queues the SpiceFrame for decoding and
+ * displaying. The steps it goes through are as follows:
  *
- * 1) A SpiceGstFrame is created to keep track of SpiceMsgIn and some additional
- *    metadata. SpiceMsgIn is reffed. The SpiceFrame is then pushed to the
- *    decoding_queue.
- * 2) The data part of SpiceMsgIn, which contains the compressed frame data,
- *    is wrapped in a GstBuffer and is pushed to the GStreamer pipeline for
- *    decoding. SpiceMsgIn is reffed.
+ * 1) A SpiceGstFrame is created to keep track of SpiceFrame and some additional
+ *    metadata. The SpiceGstFrame is then pushed to the decoding_queue.
+ * 2) frame->data, which contains the compressed frame data, is reffed and
+ *    wrapped in a GstBuffer which is pushed to the GStreamer pipeline for
+ *    decoding.
  * 3) As soon as the GStreamer pipeline no longer needs the compressed frame it
- *    calls release_buffer_data() to unref SpiceMsgIn.
+ *    will call frame->unref_data() to free it.
  * 4) Once the decompressed frame is available the GStreamer pipeline calls
  *    new_sample() in the GStreamer thread.
  * 5) new_sample() then matches the decompressed frame to a SpiceGstFrame from
@@ -435,36 +427,32 @@ static void release_buffer_data(gpointer data)
  *    dropped frames. The SpiceGstFrame is popped from the decoding_queue.
  * 6) new_sample() then attaches the decompressed frame to the SpiceGstFrame,
  *    pushes it to the display_queue and calls schedule_frame().
- * 7) schedule_frame() then uses the SpiceMsgIn's mm_time to arrange for
+ * 7) schedule_frame() then uses gstframe->frame->mm_time to arrange for
  *    display_frame() to be called, in the main thread, at the right time for
  *    the next frame.
  * 8) display_frame() pops the first SpiceGstFrame from the display_queue and
  *    calls stream_display_frame().
- * 9) display_frame() then frees the SpiceGstFrame and the decompressed frame.
- *    SpiceMsgIn is unreffed.
+ * 9) display_frame() then frees the SpiceGstFrame, which frees the SpiceFrame
+ *    and decompressed frame with it.
  */
 static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
-                                              SpiceMsgIn *frame_msg,
-                                              int32_t latency)
+                                              SpiceFrame *frame, int latency)
 {
     SpiceGstDecoder *decoder = (SpiceGstDecoder*)video_decoder;
 
-    uint8_t *data;
-    uint32_t size = spice_msg_in_frame_data(frame_msg, &data);
-    if (size == 0) {
+    if (frame->size == 0) {
         SPICE_DEBUG("got an empty frame buffer!");
+        frame->free(frame);
         return TRUE;
     }
 
-    SpiceStreamDataHeader *frame_op = spice_msg_in_parsed(frame_msg);
-    if (frame_op->multi_media_time < decoder->last_mm_time) {
+    if (frame->mm_time < decoder->last_mm_time) {
         SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):"
-                    " resetting stream, id %u",
-                    frame_op->multi_media_time,
-                    decoder->last_mm_time, frame_op->id);
+                    " resetting stream",
+                    frame->mm_time, decoder->last_mm_time);
         /* Let GStreamer deal with the frame anyway */
     }
-    decoder->last_mm_time = frame_op->multi_media_time;
+    decoder->last_mm_time = frame->mm_time;
 
     if (latency < 0 &&
         decoder->base.codec_type == SPICE_VIDEO_CODEC_TYPE_MJPEG) {
@@ -472,6 +460,7 @@ static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
          * saves CPU so do it.
          */
         SPICE_DEBUG("dropping a late MJPEG frame");
+        frame->free(frame);
         return TRUE;
     }
 
@@ -481,22 +470,22 @@ static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
         return FALSE;
     }
 
-    /* ref() the frame_msg for the buffer */
-    spice_msg_in_ref(frame_msg);
+    /* ref() the frame data for the buffer */
+    frame->ref_data(frame->data_opaque);
     GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
-                                                    data, size, 0, size,
-                                                    frame_msg, &release_buffer_data);
+                                                    frame->data, frame->size, 0, frame->size,
+                                                    frame->data_opaque, frame->unref_data);
 
     GST_BUFFER_DURATION(buffer) = GST_CLOCK_TIME_NONE;
     GST_BUFFER_DTS(buffer) = GST_CLOCK_TIME_NONE;
     GST_BUFFER_PTS(buffer) = gst_clock_get_time(decoder->clock) - gst_element_get_base_time(decoder->pipeline) + ((uint64_t)MAX(0, latency)) * 1000 * 1000;
 
     g_mutex_lock(&decoder->queues_mutex);
-    g_queue_push_tail(decoder->decoding_queue, create_gst_frame(buffer, frame_msg));
+    g_queue_push_tail(decoder->decoding_queue, create_gst_frame(buffer, frame));
     g_mutex_unlock(&decoder->queues_mutex);
 
     if (gst_app_src_push_buffer(decoder->appsrc, buffer) != GST_FLOW_OK) {
-        SPICE_DEBUG("GStreamer error: unable to push frame of size %u", size);
+        SPICE_DEBUG("GStreamer error: unable to push frame of size %u", frame->size);
         stream_dropped_frame_on_playback(decoder->base.stream);
     }
     return TRUE;
diff --git a/src/channel-display-mjpeg.c b/src/channel-display-mjpeg.c
index 722494ee..3ae9d211 100644
--- a/src/channel-display-mjpeg.c
+++ b/src/channel-display-mjpeg.c
@@ -38,7 +38,7 @@ typedef struct MJpegDecoder {
     /* ---------- Frame queue ---------- */
 
     GQueue *msgq;
-    SpiceMsgIn *cur_frame_msg;
+    SpiceFrame *cur_frame;
     guint timer_id;
 
     /* ---------- Output frame data ---------- */
@@ -53,10 +53,8 @@ typedef struct MJpegDecoder {
 static void mjpeg_src_init(struct jpeg_decompress_struct *cinfo)
 {
     MJpegDecoder *decoder = SPICE_CONTAINEROF(cinfo->src, MJpegDecoder, mjpeg_src);
-
-    uint8_t *data;
-    cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->cur_frame_msg, &data);
-    cinfo->src->next_input_byte = data;
+    cinfo->src->bytes_in_buffer = decoder->cur_frame->size;
+    cinfo->src->next_input_byte = decoder->cur_frame->data;
 }
 
 static boolean mjpeg_src_fill(struct jpeg_decompress_struct *cinfo)
@@ -77,6 +75,15 @@ static void mjpeg_src_term(struct jpeg_decompress_struct *cinfo)
 }
 
 
+/* ---------- A SpiceFrame helper ---------- */
+
+static void free_spice_frame(SpiceFrame *frame)
+{
+    frame->unref_data(frame->data_opaque);
+    frame->free(frame);
+}
+
+
 /* ---------- Decoder proper ---------- */
 
 static void mjpeg_decoder_schedule(MJpegDecoder *decoder);
@@ -168,10 +175,10 @@ static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder)
     jpeg_finish_decompress(&decoder->mjpeg_cinfo);
 
     /* Display the frame and dispose of it */
-    stream_display_frame(decoder->base.stream, decoder->cur_frame_msg,
+    stream_display_frame(decoder->base.stream, decoder->cur_frame,
                          width, height, decoder->out_frame);
-    spice_msg_in_unref(decoder->cur_frame_msg);
-    decoder->cur_frame_msg = NULL;
+    free_spice_frame(decoder->cur_frame);
+    decoder->cur_frame = NULL;
     decoder->timer_id = 0;
 
     /* Schedule the next frame */
@@ -190,33 +197,32 @@ static void mjpeg_decoder_schedule(MJpegDecoder *decoder)
     }
 
     guint32 time = stream_get_time(decoder->base.stream);
-    SpiceMsgIn *frame_msg = decoder->cur_frame_msg;
-    decoder->cur_frame_msg = NULL;
+    SpiceFrame *frame = decoder->cur_frame;
+    decoder->cur_frame = NULL;
     do {
-        if (frame_msg) {
-            SpiceStreamDataHeader *op = spice_msg_in_parsed(frame_msg);
-            if (time <= op->multi_media_time) {
-                guint32 d = op->multi_media_time - time;
-                decoder->cur_frame_msg = frame_msg;
+        if (frame) {
+            if (time <= frame->mm_time) {
+                guint32 d = frame->mm_time - time;
+                decoder->cur_frame = frame;
                 decoder->timer_id = g_timeout_add(d, mjpeg_decoder_decode_frame, decoder);
                 break;
             }
 
             SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping ",
-                        __FUNCTION__, time - op->multi_media_time,
-                        op->multi_media_time, time);
+                        __FUNCTION__, time - frame->mm_time,
+                        frame->mm_time, time);
             stream_dropped_frame_on_playback(decoder->base.stream);
-            spice_msg_in_unref(frame_msg);
+            free_spice_frame(frame);
         }
-        frame_msg = g_queue_pop_head(decoder->msgq);
-    } while (frame_msg);
+        frame = g_queue_pop_head(decoder->msgq);
+    } while (frame);
 }
 
 
 /* mjpeg_decoder_drop_queue() helper */
 static void _msg_in_unref_func(gpointer data, gpointer user_data)
 {
-    spice_msg_in_unref(data);
+    free_spice_frame((SpiceFrame*)data);
 }
 
 static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
@@ -225,9 +231,9 @@ static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
         g_source_remove(decoder->timer_id);
         decoder->timer_id = 0;
     }
-    if (decoder->cur_frame_msg) {
-        spice_msg_in_unref(decoder->cur_frame_msg);
-        decoder->cur_frame_msg = NULL;
+    if (decoder->cur_frame) {
+        free_spice_frame(decoder->cur_frame);
+        decoder->cur_frame = NULL;
     }
     g_queue_foreach(decoder->msgq, _msg_in_unref_func, NULL);
     g_queue_clear(decoder->msgq);
@@ -236,25 +242,21 @@ static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
 /* ---------- VideoDecoder's public API ---------- */
 
 static gboolean mjpeg_decoder_queue_frame(VideoDecoder *video_decoder,
-                                          SpiceMsgIn *frame_msg,
-                                          int32_t latency)
+                                          SpiceFrame *frame, int32_t latency)
 {
     MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
-    SpiceMsgIn *last_msg;
+    SpiceFrame *last_frame;
 
     SPICE_DEBUG("%s", __FUNCTION__);
 
-    last_msg = g_queue_peek_tail(decoder->msgq);
-    if (last_msg) {
-        SpiceStreamDataHeader *last_op, *frame_op;
-        last_op = spice_msg_in_parsed(last_msg);
-        frame_op = spice_msg_in_parsed(frame_msg);
-        if (frame_op->multi_media_time < last_op->multi_media_time) {
+    last_frame = g_queue_peek_tail(decoder->msgq);
+    if (last_frame) {
+        if (frame->mm_time < last_frame->mm_time) {
             /* This should really not happen */
             SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):"
-                        " resetting stream, id %u",
-                        frame_op->multi_media_time,
-                        last_op->multi_media_time, frame_op->id);
+                        " resetting stream",
+                        frame->mm_time,
+                        last_frame->mm_time);
             mjpeg_decoder_drop_queue(decoder);
         }
     }
@@ -266,8 +268,8 @@ static gboolean mjpeg_decoder_queue_frame(VideoDecoder *video_decoder,
         return TRUE;
     }
 
-    spice_msg_in_ref(frame_msg);
-    g_queue_push_tail(decoder->msgq, frame_msg);
+    frame->ref_data(frame->data_opaque);
+    g_queue_push_tail(decoder->msgq, frame);
     mjpeg_decoder_schedule(decoder);
     return TRUE;
 }
diff --git a/src/channel-display-priv.h b/src/channel-display-priv.h
index c5622f1a..3c9d1193 100644
--- a/src/channel-display-priv.h
+++ b/src/channel-display-priv.h
@@ -36,6 +36,20 @@ G_BEGIN_DECLS
 
 typedef struct display_stream display_stream;
 
+typedef struct SpiceFrame SpiceFrame;
+struct SpiceFrame {
+    uint32_t mm_time;
+    SpiceRect dest;
+
+    uint8_t *data;
+    uint32_t size;
+    gpointer data_opaque;
+    void (*ref_data)(gpointer data_opaque);
+    void (*unref_data)(gpointer data_opaque);
+
+    void (*free)(SpiceFrame *frame);
+};
+
 typedef struct VideoDecoder VideoDecoder;
 struct VideoDecoder {
     /* Releases the video decoder's resources */
@@ -44,16 +58,17 @@ struct VideoDecoder {
     /* Notifies the decoder that the mm-time clock changed. */
     void (*reschedule)(VideoDecoder *decoder);
 
-    /* Decompresses the specified frame.
+    /* Takes ownership of the specified frame, decompresses it,
+     * and displays it at the right time.
      *
      * @decoder:   The video decoder.
-     * @frame_msg: The Spice message containing the compressed frame.
+     * @frame:     The compressed Spice frame.
      * @latency:   How long in milliseconds until the frame should be
      *             displayed. Negative values mean the frame is late.
      * @return:    False if the decoder can no longer decode frames,
      *             True otherwise.
      */
-    gboolean (*queue_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg, int32_t latency);
+    gboolean (*queue_frame)(VideoDecoder *video_decoder, SpiceFrame *frame, int latency);
 
     /* The format of the encoded video. */
     int codec_type;
@@ -137,8 +152,7 @@ struct display_stream {
 
 guint32 stream_get_time(display_stream *st);
 void stream_dropped_frame_on_playback(display_stream *st);
-void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg, uint32_t width, uint32_t height, uint8_t *data);
-uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data);
+void stream_display_frame(display_stream *st, SpiceFrame *frame, uint32_t width, uint32_t height, uint8_t* data);
 
 
 G_END_DECLS
diff --git a/src/channel-display.c b/src/channel-display.c
index 2423fb0e..00b54406 100644
--- a/src/channel-display.c
+++ b/src/channel-display.c
@@ -1234,8 +1234,7 @@ static const SpiceRect *stream_get_dest(display_stream *st, SpiceMsgIn *frame_ms
 
 }
 
-G_GNUC_INTERNAL
-uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data)
+static uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data)
 {
     switch (spice_msg_in_type(frame_msg)) {
     case SPICE_MSG_DISPLAY_STREAM_DATA: {
@@ -1270,15 +1269,10 @@ void stream_dropped_frame_on_playback(display_stream *st)
 
 /* main context */
 G_GNUC_INTERNAL
-void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg,
+void stream_display_frame(display_stream *st, SpiceFrame *frame,
                           uint32_t width, uint32_t height, uint8_t *data)
 {
-    const SpiceRect *dest;
-    int stride;
-
-    dest = stream_get_dest(st, frame_msg);
-
-    stride = width * sizeof(uint32_t);
+    int stride = width * sizeof(uint32_t);
     if (!(st->flags & SPICE_STREAM_FLAGS_TOP_DOWN)) {
         data += stride * (height - 1);
         stride = -stride;
@@ -1288,15 +1282,16 @@ void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg,
 #ifdef G_OS_WIN32
                                         SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc,
 #endif
-                                        dest, data,
+                                        &frame->dest, data,
                                         width, height, stride,
                                         st->have_region ? &st->region : NULL);
 
-    if (st->surface->primary)
+    if (st->surface->primary) {
         g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0,
-                      dest->left, dest->top,
-                      dest->right - dest->left,
-                      dest->bottom - dest->top);
+                      frame->dest.left, frame->dest.top,
+                      frame->dest.right - frame->dest.left,
+                      frame->dest.bottom - frame->dest.top);
+    }
 }
 
 /* after a sequence of 3 drops, push a report to the server, even
@@ -1425,6 +1420,7 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
     display_stream *st = get_stream_by_id(channel, op->id);
     guint32 mmtime;
     int32_t latency;
+    SpiceFrame *frame;
 
     g_return_if_fail(st != NULL);
     mmtime = stream_get_time(st);
@@ -1471,11 +1467,20 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
      * decoding and best decide if/when to drop them when they are late,
      * taking into account the impact on later frames.
      */
-    if (!st->video_decoder->queue_frame(st->video_decoder, in, latency)) {
+    frame = spice_new(SpiceFrame, 1);
+    frame->mm_time = op->multi_media_time;
+    frame->dest = *stream_get_dest(st, in);
+    frame->size = spice_msg_in_frame_data(in, &frame->data);
+    frame->data_opaque = in;
+    frame->ref_data = (void*)spice_msg_in_ref;
+    frame->unref_data = (void*)spice_msg_in_unref;
+    frame->free = (void*)free;
+    if (!st->video_decoder->queue_frame(st->video_decoder, frame, latency)) {
         destroy_stream(channel, op->id);
         report_invalid_stream(channel, op->id);
         return;
     }
+
     if (c->enable_adaptive_streaming) {
         display_update_stream_report(SPICE_DISPLAY_CHANNEL(channel), op->id,
                                      op->multi_media_time, latency);
-- 
2.11.0



More information about the Spice-devel mailing list