[farsight2/master] rtpsubstream: Use rw-lock to make sure the substream really stops

Olivier Crête olivier.crete at collabora.co.uk
Mon Dec 21 14:10:43 PST 2009


---
 gst/fsrtpconference/fs-rtp-substream.c |  138 +++++++++++++++++---------------
 1 files changed, 75 insertions(+), 63 deletions(-)

diff --git a/gst/fsrtpconference/fs-rtp-substream.c b/gst/fsrtpconference/fs-rtp-substream.c
index e7b9753..86c02bc 100644
--- a/gst/fsrtpconference/fs-rtp-substream.c
+++ b/gst/fsrtpconference/fs-rtp-substream.c
@@ -77,7 +77,6 @@ enum
 #define DEFAULT_NO_RTCP_TIMEOUT (7000)
 
 struct _FsRtpSubStreamPrivate {
-  gboolean disposed;
 
   /* These are only pointers, we don't own references */
   FsRtpConference *conference;
@@ -117,18 +116,17 @@ struct _FsRtpSubStreamPrivate {
    */
   gboolean receiving;
 
-  /* Both of these are sub-stream mutex protected */
-  /* This is TRUE while someone is modifying the recv pipeline */
-  gboolean modifying;
-  /* This becomes true when the substream is stopped */
-  gboolean stopped;
-
   /* Protected by the this mutex */
   GMutex *mutex;
   GstClockID no_rtcp_timeout_id;
   GstClockTime next_no_rtcp_timeout;
   GThread *no_rtcp_timeout_thread;
 
+  /* Can only be used while using the lock */
+  GStaticRWLock stopped_lock;
+  gboolean stopped;
+
+
   GError *construction_error;
 };
 
@@ -166,8 +164,27 @@ fs_rtp_sub_stream_emit_error (FsRtpSubStream *substream,
     gchar *error_msg,
     gchar *debug_msg);
 
+
+static gboolean
+fs_rtp_sub_stream_has_stopped_enter (FsRtpSubStream *self)
+{
+  g_static_rw_lock_reader_lock (&self->priv->stopped_lock);
+
+  if (self->priv->stopped)
+  {
+    g_static_rw_lock_reader_unlock (&self->priv->stopped_lock);
+    return TRUE;
+  }
+
+  return FALSE;
+}
+
+
 static void
-fs_rtp_sub_stream_try_stop (FsRtpSubStream *substream);
+fs_rtp_sub_stream_has_stopped_exit (FsRtpSubStream *self)
+{
+  g_static_rw_lock_reader_unlock (&self->priv->stopped_lock);
+}
 
 
 static void
@@ -399,9 +416,10 @@ static void
 fs_rtp_sub_stream_init (FsRtpSubStream *self)
 {
   self->priv = FS_RTP_SUB_STREAM_GET_PRIVATE (self);
-  self->priv->disposed = FALSE;
   self->priv->receiving = TRUE;
   self->priv->mutex = g_mutex_new ();
+
+  g_static_rw_lock_init (&self->priv->stopped_lock);
 }
 
 
@@ -734,7 +752,6 @@ fs_rtp_sub_stream_dispose (GObject *object)
     self->priv->rtpbin_pad = NULL;
   }
 
-  self->priv->disposed = TRUE;
   G_OBJECT_CLASS (fs_rtp_sub_stream_parent_class)->dispose (object);
 }
 
@@ -893,7 +910,6 @@ fs_rtp_sub_stream_set_codecbin (FsRtpSubStream *substream,
           substream->pt);
       gst_object_unref (codecbin);
       fs_codec_destroy (codec);
-      fs_rtp_sub_stream_try_stop (substream);
       return FALSE;
     }
 
@@ -1045,16 +1061,20 @@ do_nothing_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
 {
 }
 
-static void
-fs_rtp_sub_stream_try_stop (FsRtpSubStream *substream)
+
+/**
+ * fs_rtp_sub_stream_stop:
+ *
+ * Stops all of the elements on a #FsRtpSubstream
+ */
+
+void
+fs_rtp_sub_stream_stop (FsRtpSubStream *substream)
 {
-  FS_RTP_SUB_STREAM_LOCK (substream);
-  if (!substream->priv->stopped || substream->priv->modifying)
-  {
-    FS_RTP_SUB_STREAM_UNLOCK (substream);
-    return;
-  }
-  FS_RTP_SUB_STREAM_UNLOCK (substream);
+  substream->priv->stopped = TRUE;
+  g_static_rw_lock_writer_lock (&substream->priv->stopped_lock);
+  substream->priv->stopped = TRUE;
+  g_static_rw_lock_writer_unlock (&substream->priv->stopped_lock);
 
   if (substream->priv->rtpbin_unlinked_sig) {
     g_signal_handler_disconnect (substream->priv->rtpbin_pad,
@@ -1094,22 +1114,6 @@ fs_rtp_sub_stream_try_stop (FsRtpSubStream *substream)
 }
 
 /**
- * fs_rtp_sub_stream_stop:
- *
- * Stops all of the elements on a #FsRtpSubstream
- */
-
-void
-fs_rtp_sub_stream_stop (FsRtpSubStream *substream)
-{
-  FS_RTP_SUB_STREAM_LOCK (substream);
-  substream->priv->stopped = TRUE;
-  FS_RTP_SUB_STREAM_UNLOCK (substream);
-
-  fs_rtp_sub_stream_try_stop (substream);
-}
-
-/**
  * fs_rtp_sub_stream_add_output_ghostpad_unlock:
  *
  * Creates and adds an output ghostpad for this substreams
@@ -1128,12 +1132,18 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
   GstPad *ghostpad = NULL;
   FsCodec *codec = NULL;
 
-  if (substream->priv->adding_output_ghostpad)
+  if (fs_rtp_sub_stream_has_stopped_enter (substream))
   {
     FS_RTP_SESSION_UNLOCK (substream->priv->session);
     return TRUE;
   }
 
+  if (substream->priv->adding_output_ghostpad)
+  {
+    FS_RTP_SESSION_UNLOCK (substream->priv->session);
+    goto out;
+  }
+
   g_assert (substream->priv->output_ghostpad == NULL);
 
   substream->priv->adding_output_ghostpad = TRUE;
@@ -1161,8 +1171,7 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
     g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
         "Could not build ghostpad src_%u_%u_%d", substream->priv->session->id,
         substream->ssrc, substream->pt);
-    substream->priv->adding_output_ghostpad = FALSE;
-    return FALSE;
+    goto error;
   }
 
   if (!gst_pad_set_active (ghostpad, TRUE))
@@ -1171,8 +1180,7 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
         "Could not activate the src_%u_%u_%d", substream->priv->session->id,
         substream->ssrc, substream->pt);
     gst_object_unref (ghostpad);
-    substream->priv->adding_output_ghostpad = FALSE;
-    return FALSE;
+    goto error;
   }
 
   if (!gst_element_add_pad (GST_ELEMENT (substream->priv->conference),
@@ -1182,8 +1190,7 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
         "Could add build ghostpad src_%u_%u_%d to the conference",
         substream->priv->session->id, substream->ssrc, substream->pt);
     gst_object_unref (ghostpad);
-    substream->priv->adding_output_ghostpad = FALSE;
-    return FALSE;
+    goto error;
   }
 
   FS_RTP_SESSION_LOCK (substream->priv->session);
@@ -1205,9 +1212,16 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
 
   g_object_set (substream->priv->output_valve, "drop", FALSE, NULL);
 
-  fs_rtp_sub_stream_try_stop (substream);
+ out:
 
+  fs_rtp_sub_stream_has_stopped_exit (substream);
   return TRUE;
+
+ error:
+
+  substream->priv->adding_output_ghostpad = FALSE;
+  fs_rtp_sub_stream_has_stopped_exit (substream);
+  return FALSE;
 }
 
 /**
@@ -1228,6 +1242,9 @@ _rtpbin_pad_have_data_callback (GstPad *pad, GstMiniObject *miniobj,
   gboolean ret = TRUE;
   gboolean remove = FALSE;
 
+  if (fs_rtp_sub_stream_has_stopped_enter (self))
+    return FALSE;
+
   FS_RTP_SESSION_LOCK (self->priv->session);
 
   if (!self->priv->codecbin || !self->codec || !self->priv->caps)
@@ -1261,6 +1278,8 @@ _rtpbin_pad_have_data_callback (GstPad *pad, GstMiniObject *miniobj,
 
   FS_RTP_SESSION_UNLOCK (self->priv->session);
 
+  fs_rtp_sub_stream_has_stopped_exit (self);
+
   return ret;
 }
 
@@ -1272,16 +1291,8 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
   FsCodec *codec = NULL;
   GError *error = NULL;
 
-  FS_RTP_SUB_STREAM_LOCK (substream);
-  if (substream->priv->stopped || substream->priv->modifying)
-  {
-    FS_RTP_SUB_STREAM_UNLOCK (substream);
-    FS_RTP_SESSION_UNLOCK (substream->priv->session);
-    fs_rtp_sub_stream_try_stop (substream);
+  if (fs_rtp_sub_stream_has_stopped_enter (substream))
     return;
-  }
-  substream->priv->modifying = TRUE;
-  FS_RTP_SUB_STREAM_UNLOCK (substream);
 
   GST_DEBUG ("Substream blocked for codec change (session:%d SSRC:%x pt:%d)",
       substream->priv->session->id, substream->ssrc, substream->pt);
@@ -1303,14 +1314,8 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
             codec, codecbin, &error))
       goto error;
 
-
-    FS_RTP_SUB_STREAM_LOCK (substream);
     if (substream->priv->stopped)
-    {
-      FS_RTP_SUB_STREAM_UNLOCK (substream);
       goto out;
-    }
-    FS_RTP_SUB_STREAM_UNLOCK (substream);
   }
 
  out:
@@ -1321,10 +1326,7 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
 
   gst_pad_set_blocked_async (pad, FALSE, do_nothing_blocked_callback, NULL);
 
-  FS_RTP_SUB_STREAM_LOCK (substream);
-  substream->priv->modifying = FALSE;
-  FS_RTP_SUB_STREAM_UNLOCK (substream);
-  fs_rtp_sub_stream_try_stop (substream);
+  fs_rtp_sub_stream_has_stopped_exit (substream);
 
   return;
 
@@ -1348,10 +1350,15 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
 static void
 fs_rtp_sub_stream_add_probe_locked (FsRtpSubStream *substream)
 {
+  if (fs_rtp_sub_stream_has_stopped_enter (substream))
+    return;
+
   if (!substream->priv->blocking_id)
     substream->priv->blocking_id = gst_pad_add_data_probe (
         substream->priv->rtpbin_pad,
         G_CALLBACK (_rtpbin_pad_have_data_callback), substream);
+
+  fs_rtp_sub_stream_has_stopped_exit (substream);
 }
 
 /**
@@ -1367,6 +1374,9 @@ fs_rtp_sub_stream_add_probe_locked (FsRtpSubStream *substream)
 void
 fs_rtp_sub_stream_verify_codec_locked (FsRtpSubStream *substream)
 {
+  if (fs_rtp_sub_stream_has_stopped_enter (substream))
+    return;
+
   GST_LOG ("Starting codec verification process for substream with"
       " SSRC:%x pt:%d", substream->ssrc, substream->pt);
 
@@ -1375,6 +1385,8 @@ fs_rtp_sub_stream_verify_codec_locked (FsRtpSubStream *substream)
 
   gst_pad_set_blocked_async (substream->priv->rtpbin_pad, TRUE,
       _rtpbin_pad_blocked_callback, substream);
+
+  fs_rtp_sub_stream_has_stopped_exit (substream);
 }
 
 
-- 
1.5.6.5




More information about the farsight-commits mailing list