[telepathy-stream-engine/refs/tags/telepathy-stream-engine_0.5.10] Reset the pipeline on error

Olivier Crête olivier.crete at collabora.co.uk
Mon Oct 26 09:28:19 PDT 2009


---
 src/tp-stream-engine.c |  136 +++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 130 insertions(+), 6 deletions(-)

diff --git a/src/tp-stream-engine.c b/src/tp-stream-engine.c
index 23e9ee0..b585f03 100644
--- a/src/tp-stream-engine.c
+++ b/src/tp-stream-engine.c
@@ -140,10 +140,16 @@ struct _TpStreamEnginePrivate
 
   gint video_source_use_count;
 
+  gboolean failcount;
+  guint reset_failcount_id;
+  guint restart_pipeline_id;
+
   /* Everything below this is protected by the mutex */
   GList *output_sinks;
   GList *preview_sinks;
 
+  GList *audio_streams;
+
   TpStreamEngineVideoPreview *preview;
 
   guint bus_async_source_id;
@@ -332,6 +338,20 @@ tp_stream_engine_dispose (GObject *object)
   if (priv->dispose_has_run)
     return;
 
+
+  if (self->priv->reset_failcount_id)
+    {
+      g_source_remove (self->priv->reset_failcount_id);
+      self->priv->reset_failcount_id = 0;
+    }
+
+
+  if (self->priv->restart_pipeline_id)
+    {
+      g_source_remove (self->priv->restart_pipeline_id);
+      self->priv->restart_pipeline_id = 0;
+    }
+
   if (priv->channels)
     {
       guint i;
@@ -576,6 +596,14 @@ stream_closed (TfStream *stream, gpointer user_data)
 
       gst_object_unref (pad);
     }
+  else if (TP_STREAM_ENGINE_IS_AUDIO_STREAM (sestream))
+    {
+      g_mutex_lock (self->priv->mutex);
+      self->priv->audio_streams = g_list_remove (self->priv->audio_streams,
+          sestream);
+      g_mutex_unlock (self->priv->mutex);
+    }
+
   g_object_unref (sestream);
   g_object_set_data (G_OBJECT (stream), "se-stream", NULL);
 }
@@ -630,6 +658,12 @@ channel_stream_created (TfChannel *chan G_GNUC_UNUSED,
         }
       g_clear_error (&error);
       g_object_set_data (G_OBJECT (stream), "se-stream", audiostream);
+
+      g_mutex_lock (self->priv->mutex);
+      self->priv->audio_streams = g_list_prepend (self->priv->audio_streams,
+          audiostream);
+      g_mutex_unlock (self->priv->mutex);
+
     }
   else if (media_type == TP_MEDIA_STREAM_TYPE_VIDEO)
     {
@@ -784,6 +818,97 @@ error_all_streams (TpStreamEngine *self, const gchar *message)
 }
 
 static gboolean
+reset_failcount (gpointer data)
+{
+  TpStreamEngine *self = TP_STREAM_ENGINE (data);
+
+  self->priv->failcount = 0;
+  self->priv->reset_failcount_id = 0;
+
+  return FALSE;
+}
+
+static gboolean
+restart_pipeline (gpointer data)
+{
+  TpStreamEngine *self = TP_STREAM_ENGINE (data);
+  GstStateChangeReturn ret;
+  GList *item;
+
+  if (self->priv->reset_failcount_id)
+    {
+      g_source_remove (self->priv->reset_failcount_id);
+      self->priv->reset_failcount_id = 0;
+    }
+
+  self->priv->failcount++;
+
+  ret = gst_element_set_state (self->priv->pipeline, GST_STATE_NULL);
+  g_assert (ret != GST_STATE_CHANGE_FAILURE);
+  ret = gst_element_set_state (self->priv->videosrc, GST_STATE_NULL);
+  g_assert (ret != GST_STATE_CHANGE_FAILURE);
+
+  g_mutex_lock (self->priv->mutex);
+  for (item = self->priv->audio_streams; item ; item = item->next)
+    {
+      if (!tp_stream_engine_audio_stream_set_playing (
+              TP_STREAM_ENGINE_AUDIO_STREAM (item->data), FALSE))
+        {
+          g_mutex_unlock (self->priv->mutex);
+          goto fail;
+        }
+    }
+  g_mutex_unlock (self->priv->mutex);
+
+  ret = gst_element_set_state (self->priv->pipeline, GST_STATE_PLAYING);
+  if (ret == GST_STATE_CHANGE_FAILURE)
+    goto fail;
+  if (self->priv->video_source_use_count)
+    {
+      gst_element_set_state (self->priv->videosrc, GST_STATE_PLAYING);
+      if (ret == GST_STATE_CHANGE_FAILURE)
+        goto fail;
+    }
+
+  g_mutex_lock (self->priv->mutex);
+  for (item = self->priv->audio_streams; item ; item = item->next)
+    {
+      if (!tp_stream_engine_audio_stream_set_playing (
+              TP_STREAM_ENGINE_AUDIO_STREAM (item->data), TRUE))
+        {
+          g_mutex_unlock (self->priv->mutex);
+          goto fail;
+        }
+    }
+  g_mutex_unlock (self->priv->mutex);
+
+  self->priv->restart_pipeline_id = 0;
+  self->priv->reset_failcount_id =
+      g_timeout_add_seconds (4, reset_failcount, self);
+
+
+  return FALSE;
+
+ fail:
+
+  g_warning ("Failed to restart the pipeline after an error");
+
+  if (self->priv->failcount > 5)
+    {
+      error_all_streams (self, "Could not restart the pipeline after an error");
+      g_error ("Failed five times to restart the pipeline after an error");
+      return FALSE;
+    }
+
+  if (!self->priv->restart_pipeline_id)
+    self->priv->restart_pipeline_id = g_timeout_add (200, restart_pipeline,
+        self);
+
+
+  return TRUE;
+}
+
+static gboolean
 bus_async_handler (GstBus *bus G_GNUC_UNUSED,
     GstMessage *message,
     gpointer data)
@@ -796,7 +921,6 @@ bus_async_handler (GstBus *bus G_GNUC_UNUSED,
   GstObject *source = GST_OBJECT (GST_MESSAGE_SRC (message));
   gchar *name = NULL;
 
-
   for (i = 0; i < priv->channels->len; i++)
     if (tf_channel_bus_message (
             g_ptr_array_index (priv->channels, i), message))
@@ -809,15 +933,15 @@ bus_async_handler (GstBus *bus G_GNUC_UNUSED,
       case GST_MESSAGE_ERROR:
         gst_message_parse_error (message, &error, &error_string);
 
-        error_all_streams (engine, error->message);
 
-        g_error ("%s: got error from %s: %s: %s (%d %d), stopping pipeline",
+        //error_all_streams (engine, error->message);
+
+        g_warning ("%s: got error from %s: %s: %s (%d %d), stopping pipeline",
             G_STRFUNC, name, error->message, error_string,
             error->domain, error->code);
 
-        gst_element_set_state (engine->priv->pipeline, GST_STATE_NULL);
-        gst_element_set_state (engine->priv->videosrc, GST_STATE_NULL);
-        gst_element_set_state (engine->priv->pipeline, GST_STATE_PLAYING);
+        if (priv->restart_pipeline_id == 0)
+          restart_pipeline (engine);
 
         g_free (error_string);
         g_error_free (error);
-- 
1.5.6.5




More information about the telepathy-commits mailing list