[telepathy-stream-engine/refs/tags/telepathy-stream-engine_0.5.10] Reset the pipeline on error
Olivier Crête
olivier.crete at collabora.co.uk
Mon Oct 26 09:28:19 PDT 2009
---
src/tp-stream-engine.c | 136 +++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 130 insertions(+), 6 deletions(-)
diff --git a/src/tp-stream-engine.c b/src/tp-stream-engine.c
index 23e9ee0..b585f03 100644
--- a/src/tp-stream-engine.c
+++ b/src/tp-stream-engine.c
@@ -140,10 +140,16 @@ struct _TpStreamEnginePrivate
gint video_source_use_count;
+ gboolean failcount;
+ guint reset_failcount_id;
+ guint restart_pipeline_id;
+
/* Everything below this is protected by the mutex */
GList *output_sinks;
GList *preview_sinks;
+ GList *audio_streams;
+
TpStreamEngineVideoPreview *preview;
guint bus_async_source_id;
@@ -332,6 +338,20 @@ tp_stream_engine_dispose (GObject *object)
if (priv->dispose_has_run)
return;
+
+ if (self->priv->reset_failcount_id)
+ {
+ g_source_remove (self->priv->reset_failcount_id);
+ self->priv->reset_failcount_id = 0;
+ }
+
+
+ if (self->priv->restart_pipeline_id)
+ {
+ g_source_remove (self->priv->restart_pipeline_id);
+ self->priv->restart_pipeline_id = 0;
+ }
+
if (priv->channels)
{
guint i;
@@ -576,6 +596,14 @@ stream_closed (TfStream *stream, gpointer user_data)
gst_object_unref (pad);
}
+ else if (TP_STREAM_ENGINE_IS_AUDIO_STREAM (sestream))
+ {
+ g_mutex_lock (self->priv->mutex);
+ self->priv->audio_streams = g_list_remove (self->priv->audio_streams,
+ sestream);
+ g_mutex_unlock (self->priv->mutex);
+ }
+
g_object_unref (sestream);
g_object_set_data (G_OBJECT (stream), "se-stream", NULL);
}
@@ -630,6 +658,12 @@ channel_stream_created (TfChannel *chan G_GNUC_UNUSED,
}
g_clear_error (&error);
g_object_set_data (G_OBJECT (stream), "se-stream", audiostream);
+
+ g_mutex_lock (self->priv->mutex);
+ self->priv->audio_streams = g_list_prepend (self->priv->audio_streams,
+ audiostream);
+ g_mutex_unlock (self->priv->mutex);
+
}
else if (media_type == TP_MEDIA_STREAM_TYPE_VIDEO)
{
@@ -784,6 +818,97 @@ error_all_streams (TpStreamEngine *self, const gchar *message)
}
static gboolean
+reset_failcount (gpointer data)
+{
+ TpStreamEngine *self = TP_STREAM_ENGINE (data);
+
+ self->priv->failcount = 0;
+ self->priv->reset_failcount_id = 0;
+
+ return FALSE;
+}
+
+static gboolean
+restart_pipeline (gpointer data)
+{
+ TpStreamEngine *self = TP_STREAM_ENGINE (data);
+ GstStateChangeReturn ret;
+ GList *item;
+
+ if (self->priv->reset_failcount_id)
+ {
+ g_source_remove (self->priv->reset_failcount_id);
+ self->priv->reset_failcount_id = 0;
+ }
+
+ self->priv->failcount++;
+
+ ret = gst_element_set_state (self->priv->pipeline, GST_STATE_NULL);
+ g_assert (ret != GST_STATE_CHANGE_FAILURE);
+ ret = gst_element_set_state (self->priv->videosrc, GST_STATE_NULL);
+ g_assert (ret != GST_STATE_CHANGE_FAILURE);
+
+ g_mutex_lock (self->priv->mutex);
+ for (item = self->priv->audio_streams; item ; item = item->next)
+ {
+ if (!tp_stream_engine_audio_stream_set_playing (
+ TP_STREAM_ENGINE_AUDIO_STREAM (item->data), FALSE))
+ {
+ g_mutex_unlock (self->priv->mutex);
+ goto fail;
+ }
+ }
+ g_mutex_unlock (self->priv->mutex);
+
+ ret = gst_element_set_state (self->priv->pipeline, GST_STATE_PLAYING);
+ if (ret == GST_STATE_CHANGE_FAILURE)
+ goto fail;
+ if (self->priv->video_source_use_count)
+ {
+ gst_element_set_state (self->priv->videosrc, GST_STATE_PLAYING);
+ if (ret == GST_STATE_CHANGE_FAILURE)
+ goto fail;
+ }
+
+ g_mutex_lock (self->priv->mutex);
+ for (item = self->priv->audio_streams; item ; item = item->next)
+ {
+ if (!tp_stream_engine_audio_stream_set_playing (
+ TP_STREAM_ENGINE_AUDIO_STREAM (item->data), TRUE))
+ {
+ g_mutex_unlock (self->priv->mutex);
+ goto fail;
+ }
+ }
+ g_mutex_unlock (self->priv->mutex);
+
+ self->priv->restart_pipeline_id = 0;
+ self->priv->reset_failcount_id =
+ g_timeout_add_seconds (4, reset_failcount, self);
+
+
+ return FALSE;
+
+ fail:
+
+ g_warning ("Failed to restart the pipeline after an error");
+
+ if (self->priv->failcount > 5)
+ {
+ error_all_streams (self, "Could not restart the pipeline after an error");
+ g_error ("Failed five times to restart the pipeline after an error");
+ return FALSE;
+ }
+
+ if (!self->priv->restart_pipeline_id)
+ self->priv->restart_pipeline_id = g_timeout_add (200, restart_pipeline,
+ self);
+
+
+ return TRUE;
+}
+
+static gboolean
bus_async_handler (GstBus *bus G_GNUC_UNUSED,
GstMessage *message,
gpointer data)
@@ -796,7 +921,6 @@ bus_async_handler (GstBus *bus G_GNUC_UNUSED,
GstObject *source = GST_OBJECT (GST_MESSAGE_SRC (message));
gchar *name = NULL;
-
for (i = 0; i < priv->channels->len; i++)
if (tf_channel_bus_message (
g_ptr_array_index (priv->channels, i), message))
@@ -809,15 +933,15 @@ bus_async_handler (GstBus *bus G_GNUC_UNUSED,
case GST_MESSAGE_ERROR:
gst_message_parse_error (message, &error, &error_string);
- error_all_streams (engine, error->message);
- g_error ("%s: got error from %s: %s: %s (%d %d), stopping pipeline",
+ //error_all_streams (engine, error->message);
+
+ g_warning ("%s: got error from %s: %s: %s (%d %d), stopping pipeline",
G_STRFUNC, name, error->message, error_string,
error->domain, error->code);
- gst_element_set_state (engine->priv->pipeline, GST_STATE_NULL);
- gst_element_set_state (engine->priv->videosrc, GST_STATE_NULL);
- gst_element_set_state (engine->priv->pipeline, GST_STATE_PLAYING);
+ if (priv->restart_pipeline_id == 0)
+ restart_pipeline (engine);
g_free (error_string);
g_error_free (error);
--
1.5.6.5
More information about the telepathy-commits
mailing list