[telepathy-stream-engine/refs/tags/telepathy-stream-engine_0.5.10] Keep a list of all objects linked to an audio sink

Olivier Crête olivier.crete at collabora.co.uk
Mon Oct 26 09:28:24 PDT 2009


---
 src/tp-stream-engine.c |  146 +++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 138 insertions(+), 8 deletions(-)

diff --git a/src/tp-stream-engine.c b/src/tp-stream-engine.c
index 69ff796..9323fde 100644
--- a/src/tp-stream-engine.c
+++ b/src/tp-stream-engine.c
@@ -55,6 +55,9 @@ static void
 _create_pipeline (TpStreamEngine *self);
 
 static void
+setup_realtime_thread (pthread_t thread);
+
+static void
 register_dbus_signal_marshallers()
 {
   /*register a marshaller for the NewMediaStreamHandler signal*/
@@ -155,8 +158,7 @@ struct _TpStreamEnginePrivate
 
   guint bus_async_source_id;
 
-  GPtrArray *audio_elements;
-  GPtrArray *audio_pads;
+  GPtrArray *audio_objects;
 
   GHashTable *object_threads;
 };
@@ -235,8 +237,7 @@ tp_stream_engine_init (TpStreamEngine *self)
   priv->channels_by_path = g_hash_table_new_full (g_str_hash, g_str_equal,
       g_free, NULL);
 
-  priv->audio_pads = g_ptr_array_new ();
-  priv->audio_elements = g_ptr_array_new ();
+  priv->audio_objects = g_ptr_array_new ();
   priv->object_threads = g_hash_table_new (g_direct_hash, g_direct_equal);
 
   priv->mutex = g_mutex_new ();
@@ -421,8 +422,7 @@ tp_stream_engine_finalize (GObject *object)
 
   g_mutex_free (self->priv->mutex);
 
-  g_ptr_array_free (self->priv->audio_elements, TRUE);
-  g_ptr_array_free (self->priv->audio_pads, TRUE);
+  g_ptr_array_free (self->priv->audio_objects, TRUE);
   g_hash_table_destroy (self->priv->object_threads);
 
   G_OBJECT_CLASS (tp_stream_engine_parent_class)->finalize (object);
@@ -645,6 +645,120 @@ stream_receiving (TpStreamEngineVideoStream *videostream, gpointer user_data)
   g_free (channel_path);
 }
 
+static void
+remove_audio_object (gpointer data, GObject *where_the_obj_was)
+{
+  TpStreamEngine *self = data;
+
+  g_mutex_lock (self->priv->mutex);
+  g_ptr_array_remove (self->priv->audio_objects, where_the_obj_was);
+  g_mutex_unlock (self->priv->mutex);
+}
+
+static void
+audio_pad_linked (GstPad *pad, GstPad *peer, TpStreamEngine *self);
+
+static gboolean
+insert_audio_object_locked (TpStreamEngine *self, gpointer object)
+{
+  guint i;
+  pthread_t thread;
+
+  for (i = 0; i < self->priv->audio_objects->len; i++)
+    if (g_ptr_array_index (self->priv->audio_objects, i) == object)
+      return FALSE;
+
+  g_ptr_array_add (self->priv->audio_objects, object);
+  g_object_weak_ref (G_OBJECT (object), remove_audio_object, self);
+
+  thread = GPOINTER_TO_UINT (g_hash_table_lookup (self->priv->object_threads,
+          object));
+  if (thread)
+    setup_realtime_thread (thread);
+
+  return TRUE;
+}
+
+static void
+add_audio_pad (TpStreamEngine *self, GstPad *pad)
+{
+  GstElement *el;
+
+  insert_audio_object_locked (self, pad);
+
+  el = gst_pad_get_parent_element (pad);
+
+  if (el)
+    {
+      insert_audio_object_locked (self, el);
+      gst_object_unref (el);
+    }
+
+  if (gst_pad_get_direction(pad) == GST_PAD_SINK)
+    g_signal_connect (pad, "linked", G_CALLBACK (audio_pad_linked), self);
+}
+
+static void
+audio_sinkpad_iter (gpointer data, gpointer user_data)
+{
+  TpStreamEngine *self = user_data;
+  GstPad *sinkpad = data;
+  GstPad *srcpad = gst_pad_get_peer (sinkpad);
+
+  add_audio_pad (self, sinkpad);
+
+  if (srcpad)
+    {
+      GstIterator *iter;
+      add_audio_pad (self, srcpad);
+
+      iter = gst_pad_iterate_internal_links (srcpad);
+
+      while (gst_iterator_foreach (iter, audio_sinkpad_iter, self) ==
+          GST_ITERATOR_RESYNC)
+        gst_iterator_resync (iter);
+
+      gst_iterator_free (iter);
+
+      gst_object_unref (srcpad);
+    }
+}
+
+static void
+audio_pad_linked (GstPad *pad, GstPad *peer, TpStreamEngine *self)
+{
+  GstIterator *iter = gst_pad_iterate_internal_links (pad);
+
+  g_mutex_lock (self->priv->mutex);
+  add_audio_pad (self, peer);
+
+
+  while (gst_iterator_foreach (iter, audio_sinkpad_iter, self) ==
+      GST_ITERATOR_RESYNC)
+    gst_iterator_resync (iter);
+
+  gst_iterator_free (iter);
+  g_mutex_unlock (self->priv->mutex);
+}
+
+static void
+audio_sink_added_cb (TpStreamEngineAudioStream *audiostream,
+    GstElement *sink, TpStreamEngine *self)
+{
+  GstIterator *iter;
+
+  g_mutex_lock (self->priv->mutex);
+
+  iter = gst_element_iterate_sink_pads (sink);
+
+  while (gst_iterator_foreach (iter, audio_sinkpad_iter, self) ==
+      GST_ITERATOR_RESYNC)
+    gst_iterator_resync (iter);
+
+  gst_iterator_free (iter);
+
+  g_mutex_unlock (self->priv->mutex);
+}
 
 static void
 channel_stream_created (TfChannel *chan G_GNUC_UNUSED,
@@ -678,6 +792,8 @@ channel_stream_created (TfChannel *chan G_GNUC_UNUSED,
           audiostream);
       g_mutex_unlock (self->priv->mutex);
 
+      g_signal_connect (audiostream, "sink-added",
+          G_CALLBACK (audio_sink_added_cb), self);
     }
   else if (media_type == TP_MEDIA_STREAM_TYPE_VIDEO)
     {
@@ -982,13 +1098,27 @@ bus_async_handler (GstBus *bus G_GNUC_UNUSED,
 }
 
 static void
+setup_realtime_thread (pthread_t thread)
+{
+}
+
+static void
 enter_thread (TpStreamEngine *self, GstObject *src, GstElement *owner)
 {
-  pthread_t selfthread = pthread_self ();
+  pthread_t thread = pthread_self ();
+  guint i;
 
   g_mutex_lock (self->priv->mutex);
   g_hash_table_insert (self->priv->object_threads, src,
-      GINT_TO_POINTER (selfthread));
+      GINT_TO_POINTER (thread));
+
+  for (i = 0; i < self->priv->audio_objects; i++)
+    if (g_ptr_array_index (self->priv->audio_objects, i) == src)
+      {
+        setup_realtime_thread (thread);
+        break;
+      }
+
   g_mutex_unlock (self->priv->mutex);
 }
 
-- 
1.5.6.5




More information about the telepathy-commits mailing list