[0.11] gst-plugins-base: audiodecoder: Fix thread safety issues if both pads have different streaming threads

Wim Taymans wtay at kemper.freedesktop.org
Mon Sep 26 10:23:02 PDT 2011


Module: gst-plugins-base
Branch: 0.11
Commit: b767be2f689970f3d0ff4875f4254c43e4991fa1
URL:    http://cgit.freedesktop.org/gstreamer/gst-plugins-base/commit/?id=b767be2f689970f3d0ff4875f4254c43e4991fa1

Author: Sebastian Dröge <sebastian.droege at collabora.co.uk>
Date:   Mon Sep 26 16:22:00 2011 +0200

audiodecoder: Fix thread safety issues if both pads have different streaming threads

---

 gst-libs/gst/audio/gstaudiodecoder.c |   57 +++++++++++++++++++++++++--------
 gst-libs/gst/audio/gstaudiodecoder.h |    9 +++++-
 2 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/gst-libs/gst/audio/gstaudiodecoder.c b/gst-libs/gst/audio/gstaudiodecoder.c
index 84729cb..3a910a8 100644
--- a/gst-libs/gst/audio/gstaudiodecoder.c
+++ b/gst-libs/gst/audio/gstaudiodecoder.c
@@ -385,6 +385,8 @@ gst_audio_decoder_init (GstAudioDecoder * dec, GstAudioDecoderClass * klass)
   dec->priv->adapter_out = gst_adapter_new ();
   g_queue_init (&dec->priv->frames);
 
+  g_static_rec_mutex_init (&dec->stream_lock);
+
   /* property default */
   dec->priv->latency = DEFAULT_LATENCY;
   dec->priv->tolerance = DEFAULT_TOLERANCE;
@@ -400,7 +402,7 @@ gst_audio_decoder_reset (GstAudioDecoder * dec, gboolean full)
 {
   GST_DEBUG_OBJECT (dec, "gst_audio_decoder_reset");
 
-  GST_OBJECT_LOCK (dec);
+  GST_AUDIO_DECODER_STREAM_LOCK (dec);
 
   if (full) {
     dec->priv->active = FALSE;
@@ -438,7 +440,7 @@ gst_audio_decoder_reset (GstAudioDecoder * dec, gboolean full)
   dec->priv->discont = TRUE;
   dec->priv->sync_flush = FALSE;
 
-  GST_OBJECT_UNLOCK (dec);
+  GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
 }
 
 static void
@@ -456,6 +458,8 @@ gst_audio_decoder_finalize (GObject * object)
     g_object_unref (dec->priv->adapter_out);
   }
 
+  g_static_rec_mutex_free (&dec->stream_lock);
+
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -472,6 +476,8 @@ gst_audio_decoder_src_setcaps (GstPad * pad, GstCaps * caps)
 
   GST_DEBUG_OBJECT (dec, "setting src caps %" GST_PTR_FORMAT, caps);
 
+  GST_AUDIO_DECODER_STREAM_LOCK (dec);
+
   /* parse caps here to check subclass;
    * also makes us aware of output format */
   if (!gst_caps_is_fixed (caps))
@@ -488,6 +494,9 @@ gst_audio_decoder_src_setcaps (GstPad * pad, GstCaps * caps)
   if (!gst_audio_info_from_caps (&dec->priv->ctx.info, caps))
     goto refuse_caps;
 
+done:
+  GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
+
   gst_object_unref (dec);
   return res;
 
@@ -495,8 +504,8 @@ gst_audio_decoder_src_setcaps (GstPad * pad, GstCaps * caps)
 refuse_caps:
   {
     GST_WARNING_OBJECT (dec, "rejected caps %" GST_PTR_FORMAT, caps);
-    gst_object_unref (dec);
-    return res;
+    res = FALSE;
+    goto done;
   }
 }
 
@@ -512,6 +521,7 @@ gst_audio_decoder_sink_setcaps (GstPad * pad, GstCaps * caps)
 
   GST_DEBUG_OBJECT (dec, "caps: %" GST_PTR_FORMAT, caps);
 
+  GST_AUDIO_DECODER_STREAM_LOCK (dec);
   /* NOTE pbutils only needed here */
   /* TODO maybe (only) upstream demuxer/parser etc should handle this ? */
   if (dec->priv->taglist)
@@ -523,6 +533,8 @@ gst_audio_decoder_sink_setcaps (GstPad * pad, GstCaps * caps)
   if (klass->set_format)
     res = klass->set_format (dec, caps);
 
+  GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
+
   g_object_unref (dec);
   return res;
 }
@@ -696,6 +708,7 @@ gst_audio_decoder_finish_frame (GstAudioDecoder * dec, GstBuffer * buf,
   GstAudioDecoderContext *ctx;
   gint samples = 0;
   GstClockTime ts, next_ts;
+  GstFlowReturn ret = GST_FLOW_OK;
 
   /* subclass should know what it is producing by now */
   g_return_val_if_fail (buf == NULL || GST_PAD_CAPS (dec->srcpad) != NULL,
@@ -713,13 +726,13 @@ gst_audio_decoder_finish_frame (GstAudioDecoder * dec, GstBuffer * buf,
       buf ? GST_BUFFER_SIZE (buf) : -1,
       buf ? GST_BUFFER_SIZE (buf) / ctx->info.bpf : -1, frames);
 
+  GST_AUDIO_DECODER_STREAM_LOCK (dec);
+
   if (priv->pending_events) {
     GList *pending_events, *l;
 
-    GST_OBJECT_LOCK (dec);
     pending_events = priv->pending_events;
     priv->pending_events = NULL;
-    GST_OBJECT_UNLOCK (dec);
 
     GST_DEBUG_OBJECT (dec, "Pushing pending events");
     for (l = priv->pending_events; l; l = l->next)
@@ -833,7 +846,11 @@ gst_audio_decoder_finish_frame (GstAudioDecoder * dec, GstBuffer * buf,
     dec->priv->error_count--;
 
 exit:
-  return gst_audio_decoder_output (dec, buf);
+  ret = gst_audio_decoder_output (dec, buf);
+
+  GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
+
+  return ret;
 
   /* ERRORS */
 wrong_buffer:
@@ -842,7 +859,8 @@ wrong_buffer:
         ("buffer size %d not a multiple of %d", GST_BUFFER_SIZE (buf),
             ctx->info.bpf));
     gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+    ret = GST_FLOW_ERROR;
+    goto exit;
   }
 overflow:
   {
@@ -851,7 +869,8 @@ overflow:
             priv->frames.length), (NULL));
     if (buf)
       gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+    ret = GST_FLOW_ERROR;
+    goto exit;
   }
 }
 
@@ -1255,6 +1274,8 @@ gst_audio_decoder_chain (GstPad * pad, GstBuffer * buffer)
       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
 
+  GST_AUDIO_DECODER_STREAM_LOCK (dec);
+
   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
     gint64 samples, ts;
 
@@ -1281,6 +1302,8 @@ gst_audio_decoder_chain (GstPad * pad, GstBuffer * buffer)
   else
     ret = gst_audio_decoder_chain_reverse (dec, buffer);
 
+  GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
+
   return ret;
 }
 
@@ -1306,6 +1329,7 @@ gst_audio_decoder_sink_eventfunc (GstAudioDecoder * dec, GstEvent * event)
       gint64 start, stop, time;
       gboolean update;
 
+      GST_AUDIO_DECODER_STREAM_LOCK (dec);
       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
           &start, &stop, &time);
 
@@ -1341,6 +1365,7 @@ gst_audio_decoder_sink_eventfunc (GstAudioDecoder * dec, GstEvent * event)
               GST_FORMAT_TIME, start, stop, time);
         } else {
           GST_DEBUG_OBJECT (dec, "unsupported format; ignoring");
+          GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
           break;
         }
       }
@@ -1383,8 +1408,10 @@ gst_audio_decoder_sink_eventfunc (GstAudioDecoder * dec, GstEvent * event)
       gst_segment_set_newsegment_full (&dec->segment, update, rate, arate,
           format, start, stop, time);
 
-      gst_pad_push_event (dec->srcpad, event);
+      dec->priv->pending_events =
+          g_list_append (dec->priv->pending_events, event);
       handled = TRUE;
+      GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
       break;
     }
 
@@ -1392,18 +1419,20 @@ gst_audio_decoder_sink_eventfunc (GstAudioDecoder * dec, GstEvent * event)
       break;
 
     case GST_EVENT_FLUSH_STOP:
+      GST_AUDIO_DECODER_STREAM_LOCK (dec);
       /* prepare for fresh start */
       gst_audio_decoder_flush (dec, TRUE);
 
-      GST_OBJECT_LOCK (dec);
       g_list_foreach (dec->priv->pending_events, (GFunc) gst_event_unref, NULL);
       g_list_free (dec->priv->pending_events);
       dec->priv->pending_events = NULL;
-      GST_OBJECT_UNLOCK (dec);
+      GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
       break;
 
     case GST_EVENT_EOS:
+      GST_AUDIO_DECODER_STREAM_LOCK (dec);
       gst_audio_decoder_drain (dec);
+      GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
       break;
 
     default:
@@ -1447,10 +1476,10 @@ gst_audio_decoder_sink_event (GstPad * pad, GstEvent * event)
         || GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) {
       ret = gst_pad_event_default (pad, event);
     } else {
-      GST_OBJECT_LOCK (dec);
+      GST_AUDIO_DECODER_STREAM_LOCK (dec);
       dec->priv->pending_events =
           g_list_append (dec->priv->pending_events, event);
-      GST_OBJECT_UNLOCK (dec);
+      GST_AUDIO_DECODER_STREAM_UNLOCK (dec);
       ret = TRUE;
     }
   }
diff --git a/gst-libs/gst/audio/gstaudiodecoder.h b/gst-libs/gst/audio/gstaudiodecoder.h
index 1c47e1a..783f83e 100644
--- a/gst-libs/gst/audio/gstaudiodecoder.h
+++ b/gst-libs/gst/audio/gstaudiodecoder.h
@@ -20,7 +20,6 @@
  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  * Boston, MA 02111-1307, USA.
  */
-
 #ifndef _GST_AUDIO_DECODER_H_
 #define _GST_AUDIO_DECODER_H_
 
@@ -85,6 +84,9 @@ G_BEGIN_DECLS
  */
 #define GST_AUDIO_DECODER_SINK_PAD(obj)        (((GstAudioDecoder *) (obj))->sinkpad)
 
+#define GST_AUDIO_DECODER_STREAM_LOCK(dec) g_static_rec_mutex_lock (&GST_AUDIO_DECODER (dec)->stream_lock)
+#define GST_AUDIO_DECODER_STREAM_UNLOCK(dec) g_static_rec_mutex_unlock (&GST_AUDIO_DECODER (dec)->stream_lock)
+
 typedef struct _GstAudioDecoder GstAudioDecoder;
 typedef struct _GstAudioDecoderClass GstAudioDecoderClass;
 
@@ -146,6 +148,11 @@ struct _GstAudioDecoder
   GstPad         *sinkpad;
   GstPad         *srcpad;
 
+  /* protects all data processing, i.e. is locked
+   * in the chain function, finish_frame and when
+   * processing serialized events */
+  GStaticRecMutex stream_lock;
+
   /* MT-protected (with STREAM_LOCK) */
   GstSegment      segment;
 



More information about the gstreamer-commits mailing list