[farsight2/master] rtpsubstream: Use rw-lock to make sure the substream really stops
Olivier Crête
olivier.crete at collabora.co.uk
Mon Dec 21 14:10:43 PST 2009
---
gst/fsrtpconference/fs-rtp-substream.c | 138 +++++++++++++++++---------------
1 files changed, 75 insertions(+), 63 deletions(-)
diff --git a/gst/fsrtpconference/fs-rtp-substream.c b/gst/fsrtpconference/fs-rtp-substream.c
index e7b9753..86c02bc 100644
--- a/gst/fsrtpconference/fs-rtp-substream.c
+++ b/gst/fsrtpconference/fs-rtp-substream.c
@@ -77,7 +77,6 @@ enum
#define DEFAULT_NO_RTCP_TIMEOUT (7000)
struct _FsRtpSubStreamPrivate {
- gboolean disposed;
/* These are only pointers, we don't own references */
FsRtpConference *conference;
@@ -117,18 +116,17 @@ struct _FsRtpSubStreamPrivate {
*/
gboolean receiving;
- /* Both of these are sub-stream mutex protected */
- /* This is TRUE while someone is modifying the recv pipeline */
- gboolean modifying;
- /* This becomes true when the substream is stopped */
- gboolean stopped;
-
/* Protected by the this mutex */
GMutex *mutex;
GstClockID no_rtcp_timeout_id;
GstClockTime next_no_rtcp_timeout;
GThread *no_rtcp_timeout_thread;
+ /* Can only be used while using the lock */
+ GStaticRWLock stopped_lock;
+ gboolean stopped;
+
+
GError *construction_error;
};
@@ -166,8 +164,27 @@ fs_rtp_sub_stream_emit_error (FsRtpSubStream *substream,
gchar *error_msg,
gchar *debug_msg);
+
+static gboolean
+fs_rtp_sub_stream_has_stopped_enter (FsRtpSubStream *self)
+{
+ g_static_rw_lock_reader_lock (&self->priv->stopped_lock);
+
+ if (self->priv->stopped)
+ {
+ g_static_rw_lock_reader_unlock (&self->priv->stopped_lock);
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+
static void
-fs_rtp_sub_stream_try_stop (FsRtpSubStream *substream);
+fs_rtp_sub_stream_has_stopped_exit (FsRtpSubStream *self)
+{
+ g_static_rw_lock_reader_unlock (&self->priv->stopped_lock);
+}
static void
@@ -399,9 +416,10 @@ static void
fs_rtp_sub_stream_init (FsRtpSubStream *self)
{
self->priv = FS_RTP_SUB_STREAM_GET_PRIVATE (self);
- self->priv->disposed = FALSE;
self->priv->receiving = TRUE;
self->priv->mutex = g_mutex_new ();
+
+ g_static_rw_lock_init (&self->priv->stopped_lock);
}
@@ -734,7 +752,6 @@ fs_rtp_sub_stream_dispose (GObject *object)
self->priv->rtpbin_pad = NULL;
}
- self->priv->disposed = TRUE;
G_OBJECT_CLASS (fs_rtp_sub_stream_parent_class)->dispose (object);
}
@@ -893,7 +910,6 @@ fs_rtp_sub_stream_set_codecbin (FsRtpSubStream *substream,
substream->pt);
gst_object_unref (codecbin);
fs_codec_destroy (codec);
- fs_rtp_sub_stream_try_stop (substream);
return FALSE;
}
@@ -1045,16 +1061,20 @@ do_nothing_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
{
}
-static void
-fs_rtp_sub_stream_try_stop (FsRtpSubStream *substream)
+
+/**
+ * fs_rtp_sub_stream_stop:
+ *
+ * Stops all of the elements on a #FsRtpSubstream
+ */
+
+void
+fs_rtp_sub_stream_stop (FsRtpSubStream *substream)
{
- FS_RTP_SUB_STREAM_LOCK (substream);
- if (!substream->priv->stopped || substream->priv->modifying)
- {
- FS_RTP_SUB_STREAM_UNLOCK (substream);
- return;
- }
- FS_RTP_SUB_STREAM_UNLOCK (substream);
+ substream->priv->stopped = TRUE;
+ g_static_rw_lock_writer_lock (&substream->priv->stopped_lock);
+ substream->priv->stopped = TRUE;
+ g_static_rw_lock_writer_unlock (&substream->priv->stopped_lock);
if (substream->priv->rtpbin_unlinked_sig) {
g_signal_handler_disconnect (substream->priv->rtpbin_pad,
@@ -1094,22 +1114,6 @@ fs_rtp_sub_stream_try_stop (FsRtpSubStream *substream)
}
/**
- * fs_rtp_sub_stream_stop:
- *
- * Stops all of the elements on a #FsRtpSubstream
- */
-
-void
-fs_rtp_sub_stream_stop (FsRtpSubStream *substream)
-{
- FS_RTP_SUB_STREAM_LOCK (substream);
- substream->priv->stopped = TRUE;
- FS_RTP_SUB_STREAM_UNLOCK (substream);
-
- fs_rtp_sub_stream_try_stop (substream);
-}
-
-/**
* fs_rtp_sub_stream_add_output_ghostpad_unlock:
*
* Creates and adds an output ghostpad for this substreams
@@ -1128,12 +1132,18 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
GstPad *ghostpad = NULL;
FsCodec *codec = NULL;
- if (substream->priv->adding_output_ghostpad)
+ if (fs_rtp_sub_stream_has_stopped_enter (substream))
{
FS_RTP_SESSION_UNLOCK (substream->priv->session);
return TRUE;
}
+ if (substream->priv->adding_output_ghostpad)
+ {
+ FS_RTP_SESSION_UNLOCK (substream->priv->session);
+ goto out;
+ }
+
g_assert (substream->priv->output_ghostpad == NULL);
substream->priv->adding_output_ghostpad = TRUE;
@@ -1161,8 +1171,7 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not build ghostpad src_%u_%u_%d", substream->priv->session->id,
substream->ssrc, substream->pt);
- substream->priv->adding_output_ghostpad = FALSE;
- return FALSE;
+ goto error;
}
if (!gst_pad_set_active (ghostpad, TRUE))
@@ -1171,8 +1180,7 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
"Could not activate the src_%u_%u_%d", substream->priv->session->id,
substream->ssrc, substream->pt);
gst_object_unref (ghostpad);
- substream->priv->adding_output_ghostpad = FALSE;
- return FALSE;
+ goto error;
}
if (!gst_element_add_pad (GST_ELEMENT (substream->priv->conference),
@@ -1182,8 +1190,7 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
"Could add build ghostpad src_%u_%u_%d to the conference",
substream->priv->session->id, substream->ssrc, substream->pt);
gst_object_unref (ghostpad);
- substream->priv->adding_output_ghostpad = FALSE;
- return FALSE;
+ goto error;
}
FS_RTP_SESSION_LOCK (substream->priv->session);
@@ -1205,9 +1212,16 @@ fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
g_object_set (substream->priv->output_valve, "drop", FALSE, NULL);
- fs_rtp_sub_stream_try_stop (substream);
+ out:
+ fs_rtp_sub_stream_has_stopped_exit (substream);
return TRUE;
+
+ error:
+
+ substream->priv->adding_output_ghostpad = FALSE;
+ fs_rtp_sub_stream_has_stopped_exit (substream);
+ return FALSE;
}
/**
@@ -1228,6 +1242,9 @@ _rtpbin_pad_have_data_callback (GstPad *pad, GstMiniObject *miniobj,
gboolean ret = TRUE;
gboolean remove = FALSE;
+ if (fs_rtp_sub_stream_has_stopped_enter (self))
+ return FALSE;
+
FS_RTP_SESSION_LOCK (self->priv->session);
if (!self->priv->codecbin || !self->codec || !self->priv->caps)
@@ -1261,6 +1278,8 @@ _rtpbin_pad_have_data_callback (GstPad *pad, GstMiniObject *miniobj,
FS_RTP_SESSION_UNLOCK (self->priv->session);
+ fs_rtp_sub_stream_has_stopped_exit (self);
+
return ret;
}
@@ -1272,16 +1291,8 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
FsCodec *codec = NULL;
GError *error = NULL;
- FS_RTP_SUB_STREAM_LOCK (substream);
- if (substream->priv->stopped || substream->priv->modifying)
- {
- FS_RTP_SUB_STREAM_UNLOCK (substream);
- FS_RTP_SESSION_UNLOCK (substream->priv->session);
- fs_rtp_sub_stream_try_stop (substream);
+ if (fs_rtp_sub_stream_has_stopped_enter (substream))
return;
- }
- substream->priv->modifying = TRUE;
- FS_RTP_SUB_STREAM_UNLOCK (substream);
GST_DEBUG ("Substream blocked for codec change (session:%d SSRC:%x pt:%d)",
substream->priv->session->id, substream->ssrc, substream->pt);
@@ -1303,14 +1314,8 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
codec, codecbin, &error))
goto error;
-
- FS_RTP_SUB_STREAM_LOCK (substream);
if (substream->priv->stopped)
- {
- FS_RTP_SUB_STREAM_UNLOCK (substream);
goto out;
- }
- FS_RTP_SUB_STREAM_UNLOCK (substream);
}
out:
@@ -1321,10 +1326,7 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
gst_pad_set_blocked_async (pad, FALSE, do_nothing_blocked_callback, NULL);
- FS_RTP_SUB_STREAM_LOCK (substream);
- substream->priv->modifying = FALSE;
- FS_RTP_SUB_STREAM_UNLOCK (substream);
- fs_rtp_sub_stream_try_stop (substream);
+ fs_rtp_sub_stream_has_stopped_exit (substream);
return;
@@ -1348,10 +1350,15 @@ _rtpbin_pad_blocked_callback (GstPad *pad, gboolean blocked, gpointer user_data)
static void
fs_rtp_sub_stream_add_probe_locked (FsRtpSubStream *substream)
{
+ if (fs_rtp_sub_stream_has_stopped_enter (substream))
+ return;
+
if (!substream->priv->blocking_id)
substream->priv->blocking_id = gst_pad_add_data_probe (
substream->priv->rtpbin_pad,
G_CALLBACK (_rtpbin_pad_have_data_callback), substream);
+
+ fs_rtp_sub_stream_has_stopped_exit (substream);
}
/**
@@ -1367,6 +1374,9 @@ fs_rtp_sub_stream_add_probe_locked (FsRtpSubStream *substream)
void
fs_rtp_sub_stream_verify_codec_locked (FsRtpSubStream *substream)
{
+ if (fs_rtp_sub_stream_has_stopped_enter (substream))
+ return;
+
GST_LOG ("Starting codec verification process for substream with"
" SSRC:%x pt:%d", substream->ssrc, substream->pt);
@@ -1375,6 +1385,8 @@ fs_rtp_sub_stream_verify_codec_locked (FsRtpSubStream *substream)
gst_pad_set_blocked_async (substream->priv->rtpbin_pad, TRUE,
_rtpbin_pad_blocked_callback, substream);
+
+ fs_rtp_sub_stream_has_stopped_exit (substream);
}
--
1.5.6.5
More information about the farsight-commits
mailing list