[Telepathy-commits] [telepathy-gabble/master] GabbleMediaChannel: don't return from RequestStreams until the stream is actually created

Simon McVittie simon.mcvittie at collabora.co.uk
Tue Mar 10 05:12:59 PDT 2009


Now that creating a stream from a content is never synchronous, we can
safely assume that the stream can't be created until the main loop is
re-entered.
---
 src/media-channel.c |  215 ++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 186 insertions(+), 29 deletions(-)

diff --git a/src/media-channel.c b/src/media-channel.c
index 1736d56..35bfba1 100644
--- a/src/media-channel.c
+++ b/src/media-channel.c
@@ -146,7 +146,11 @@ struct _GabbleMediaChannelPrivate
 
   GabbleMediaFactory *factory;
   GabbleJingleSession *session;
+
+  /* array of referenced GabbleMediaStream* */
   GPtrArray *streams;
+  /* list of PendingStreamRequest* in no particular order */
+  GList *pending_stream_requests;
 
   guint next_stream_id;
 
@@ -1341,6 +1345,96 @@ CHOOSE_TRANSPORT:
   return resource;
 }
 
+typedef struct {
+    /* borrowed content => itself, used as a set */
+    GHashTable *contents;
+    /* accumulates borrowed pointers to streams */
+    GPtrArray *streams;
+    DBusGMethodInvocation *context;
+} PendingStreamRequest;
+
+static PendingStreamRequest *
+pending_stream_request_new (GPtrArray *contents,
+                            DBusGMethodInvocation *context)
+{
+  PendingStreamRequest *p = g_slice_new0 (PendingStreamRequest);
+  guint i;
+
+  p->contents = g_hash_table_new (NULL, NULL);
+  p->streams = g_ptr_array_sized_new (contents->len);
+  p->context = context;
+
+  for (i = 0; i < contents->len; i++)
+    {
+      g_hash_table_insert (p->contents, g_ptr_array_index (contents, i),
+          g_ptr_array_index (contents, i));
+    }
+
+  return p;
+}
+
+static gboolean
+pending_stream_request_maybe_satisfy (PendingStreamRequest *p,
+                                      GabbleMediaChannel *channel,
+                                      GabbleJingleContent *content,
+                                      GabbleMediaStream *stream)
+{
+  g_hash_table_remove (p->contents, content);
+
+  if (g_hash_table_size (p->contents) == 0 && p->context != NULL)
+    {
+      GPtrArray *ret = make_stream_list (channel, p->streams);
+
+      tp_svc_channel_type_streamed_media_return_from_request_streams (
+          p->context, ret);
+      g_ptr_array_free (ret, TRUE);
+      p->context = NULL;
+      return TRUE;
+    }
+
+  return FALSE;
+}
+
+static gboolean
+pending_stream_request_maybe_fail (PendingStreamRequest *p,
+                                   GabbleMediaChannel *channel,
+                                   GabbleJingleContent *content)
+{
+  g_hash_table_remove (p->contents, content);
+
+  if (g_hash_table_size (p->contents) == 0 && p->context != NULL)
+    {
+      GError e = { TP_ERRORS, TP_ERROR_NOT_AVAILABLE,
+          "The stream was removed before it could be fully set up" };
+
+      dbus_g_method_return_error (p->context, &e);
+      p->context = NULL;
+      return TRUE;
+    }
+
+  return FALSE;
+}
+
+static void
+pending_stream_request_free (gpointer data)
+{
+  PendingStreamRequest *p = data;
+
+  if (p->context != NULL)
+    {
+      GError e = { TP_ERRORS, TP_ERROR_CANCELLED,
+          "The session terminated before the requested stream could be added"
+      };
+
+      dbus_g_method_return_error (p->context, &e);
+    }
+
+  g_hash_table_destroy (p->contents);
+  g_ptr_array_free (p->streams, TRUE);
+
+  g_slice_free (PendingStreamRequest, p);
+}
+
 static gboolean
 _gabble_media_channel_request_contents (GabbleMediaChannel *chan,
                                         const GArray *media_types,
@@ -1495,9 +1589,9 @@ _gabble_media_channel_request_contents (GabbleMediaChannel *chan,
             content_ns, transport_ns);
 
       /* The stream is created in "new-content" callback, and appended to
-       * priv->streams. This may or may not happen synchronously (adding
-       * streams can take time due to the relay info lookup) so here we just
-       * return the contents. */
+       * priv->streams. This is now guaranteed to happen asynchronously (adding
+       * streams can take time due to the relay info lookup, and if it doesn't,
+       * we use an idle so it does). */
       g_assert (c != NULL);
       g_ptr_array_add (*ret, c);
     }
@@ -1622,11 +1716,8 @@ gabble_media_channel_request_streams (TpSvcChannelTypeStreamedMedia *iface,
   TpBaseConnection *conn;
   GPtrArray *contents;
   GError *error = NULL;
-  GPtrArray *streams;
-  GPtrArray *ret;
   TpHandleRepoIface *contact_handles;
   gboolean wait;
-  guint i;
 
   g_assert (GABBLE_IS_MEDIA_CHANNEL (self));
 
@@ -1688,26 +1779,10 @@ gabble_media_channel_request_streams (TpSvcChannelTypeStreamedMedia *iface,
         &error))
     goto error;
 
-  streams = g_ptr_array_sized_new (types->len);
-
-  for (i = 0; i < types->len; i++)
-    {
-      /* For now, preserve the incorrect assumption that streams are added
-       * synchronously. The n last streams are the ones we just created,
-       * where n = types->len. */
-      GabbleMediaStream *stream = g_ptr_array_index (priv->streams,
-          priv->streams->len - types->len + i);
-
-      g_ptr_array_add (streams, stream);
-    }
-
-  ret = make_stream_list (self, streams);
-
-  g_ptr_array_free (streams, TRUE);
+  priv->pending_stream_requests = g_list_prepend (
+      priv->pending_stream_requests,
+      pending_stream_request_new (contents, context));
   g_ptr_array_free (contents, TRUE);
-
-  tp_svc_channel_type_streamed_media_return_from_request_streams (context, ret);
-  g_ptr_array_free (ret, TRUE);
   return;
 
 error:
@@ -1955,6 +2030,12 @@ session_terminated_cb (GabbleJingleSession *session,
       TP_CHANNEL_GROUP_FLAG_CAN_ADD,
       TP_CHANNEL_GROUP_FLAG_CAN_REMOVE);
 
+  /* any contents that we were waiting for have now lost */
+  g_list_foreach (priv->pending_stream_requests,
+      (GFunc) pending_stream_request_free, NULL);
+  g_list_free (priv->pending_stream_requests);
+  priv->pending_stream_requests = NULL;
+
   /* unref streams */
   if (priv->streams != NULL)
     {
@@ -2369,6 +2450,30 @@ construct_stream (GabbleMediaChannel *chan,
 
   g_ptr_array_add (priv->streams, stream);
 
+  /* return from RequestStreams */
+    {
+      GList *link = priv->pending_stream_requests;
+
+      while (link != NULL)
+        {
+          if (pending_stream_request_maybe_satisfy (link->data,
+                chan, c, stream))
+            {
+              GList *dead = link;
+
+              pending_stream_request_free (dead->data);
+
+              link = dead->next;
+              priv->pending_stream_requests = g_list_delete_link (
+                  priv->pending_stream_requests, dead);
+            }
+          else
+            {
+              link = link->next;
+            }
+        }
+    }
+
   gabble_signal_connect_weak (stream, "close", (GCallback) stream_close_cb,
       chan_o);
   gabble_signal_connect_weak (stream, "error", (GCallback) stream_error_cb,
@@ -2413,6 +2518,7 @@ construct_stream (GabbleMediaChannel *chan,
 typedef struct {
     GabbleMediaChannel *self;
     GabbleJingleContent *content;
+    gulong removed_id;
     gchar *nat_traversal;
     gchar *name;
 } StreamCreationData;
@@ -2424,7 +2530,13 @@ stream_creation_data_free (gpointer p)
 
   g_free (d->name);
   g_free (d->nat_traversal);
-  g_object_unref (d->content);
+
+  if (d->content != NULL)
+    {
+      g_signal_handler_disconnect (d->content, d->removed_id);
+      g_object_unref (d->content);
+    }
+
   g_object_unref (d->self);
   g_slice_free (StreamCreationData, d);
 }
@@ -2434,7 +2546,9 @@ construct_stream_later_cb (gpointer user_data)
 {
   StreamCreationData *d = user_data;
 
-  construct_stream (d->self, d->content, d->name, d->nat_traversal, NULL);
+  if (d->content != NULL)
+    construct_stream (d->self, d->content, d->name, d->nat_traversal, NULL);
+
   return FALSE;
 }
 
@@ -2444,11 +2558,48 @@ google_relay_session_cb (GPtrArray *relays,
 {
   StreamCreationData *d = user_data;
 
-  construct_stream (d->self, d->content, d->name, d->nat_traversal, relays);
+  if (d->content != NULL)
+    construct_stream (d->self, d->content, d->name, d->nat_traversal, relays);
+
   stream_creation_data_free (d);
 }
 
 static void
+content_removed_cb (GabbleJingleContent *content,
+                    StreamCreationData *d)
+{
+  if (d->content != NULL)
+    {
+      GList *link = d->self->priv->pending_stream_requests;
+
+      g_signal_handler_disconnect (d->content, d->removed_id);
+
+      /* return from RequestStreams unsuccessfully */
+      while (link != NULL)
+        {
+          if (pending_stream_request_maybe_fail (link->data,
+                d->self, d->content))
+            {
+              GList *dead = link;
+
+              pending_stream_request_free (dead->data);
+
+              link = dead->next;
+              d->self->priv->pending_stream_requests = g_list_delete_link (
+                  d->self->priv->pending_stream_requests, dead);
+            }
+          else
+            {
+              link = link->next;
+            }
+        }
+
+      g_object_unref (d->content);
+      d->content = NULL;
+    }
+}
+
+static void
 create_stream_from_content (GabbleMediaChannel *self,
                             GabbleJingleContent *c)
 {
@@ -2470,13 +2621,19 @@ create_stream_from_content (GabbleMediaChannel *self,
       "nat-traversal", &nat_traversal,
       NULL);
 
-  StreamCreationData *d = g_slice_new0 (StreamCreationData);
+  d = g_slice_new0 (StreamCreationData);
 
   d->self = g_object_ref (self);
   d->nat_traversal = nat_traversal;
   d->name = name;
   d->content = g_object_ref (c);
 
+  /* If the content gets removed before we've finished looking up its
+   * relay (can this happen?) we need to cancel the creation of the stream,
+   * and make any PendingStreamRequests fail */
+  d->removed_id = g_signal_connect (c, "removed",
+      G_CALLBACK (content_removed_cb), d);
+
   if (!tp_strdiff (nat_traversal, "gtalk-p2p"))
     {
       /* See if our server is Google, and if it is, ask them for a relay.
-- 
1.5.6.5




More information about the telepathy-commits mailing list