[Telepathy-commits] [telepathy-gabble/master] GabbleMediaChannel: don't return from RequestStreams until the stream is actually created
Simon McVittie
simon.mcvittie at collabora.co.uk
Tue Mar 10 05:12:59 PDT 2009
Now that creating a stream from a content is never synchronous, we can
safely assume that the stream can't be created until the main loop is
re-entered.
---
src/media-channel.c | 215 ++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 186 insertions(+), 29 deletions(-)
diff --git a/src/media-channel.c b/src/media-channel.c
index 1736d56..35bfba1 100644
--- a/src/media-channel.c
+++ b/src/media-channel.c
@@ -146,7 +146,11 @@ struct _GabbleMediaChannelPrivate
GabbleMediaFactory *factory;
GabbleJingleSession *session;
+
+ /* array of referenced GabbleMediaStream* */
GPtrArray *streams;
+ /* list of PendingStreamRequest* in no particular order */
+ GList *pending_stream_requests;
guint next_stream_id;
@@ -1341,6 +1345,96 @@ CHOOSE_TRANSPORT:
return resource;
}
+typedef struct {
+ /* borrowed content => itself, used as a set */
+ GHashTable *contents;
+ /* accumulates borrowed pointers to streams */
+ GPtrArray *streams;
+ DBusGMethodInvocation *context;
+} PendingStreamRequest;
+
+static PendingStreamRequest *
+pending_stream_request_new (GPtrArray *contents,
+ DBusGMethodInvocation *context)
+{
+ PendingStreamRequest *p = g_slice_new0 (PendingStreamRequest);
+ guint i;
+
+ p->contents = g_hash_table_new (NULL, NULL);
+ p->streams = g_ptr_array_sized_new (contents->len);
+ p->context = context;
+
+ for (i = 0; i < contents->len; i++)
+ {
+ g_hash_table_insert (p->contents, g_ptr_array_index (contents, i),
+ g_ptr_array_index (contents, i));
+ }
+
+ return p;
+}
+
+static gboolean
+pending_stream_request_maybe_satisfy (PendingStreamRequest *p,
+ GabbleMediaChannel *channel,
+ GabbleJingleContent *content,
+ GabbleMediaStream *stream)
+{
+ g_hash_table_remove (p->contents, content);
+
+ if (g_hash_table_size (p->contents) == 0 && p->context != NULL)
+ {
+ GPtrArray *ret = make_stream_list (channel, p->streams);
+
+ tp_svc_channel_type_streamed_media_return_from_request_streams (
+ p->context, ret);
+ g_ptr_array_free (ret, TRUE);
+ p->context = NULL;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static gboolean
+pending_stream_request_maybe_fail (PendingStreamRequest *p,
+ GabbleMediaChannel *channel,
+ GabbleJingleContent *content)
+{
+ g_hash_table_remove (p->contents, content);
+
+ if (g_hash_table_size (p->contents) == 0 && p->context != NULL)
+ {
+ GError e = { TP_ERRORS, TP_ERROR_NOT_AVAILABLE,
+ "The stream was removed before it could be fully set up" };
+
+ dbus_g_method_return_error (p->context, &e);
+ p->context = NULL;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static void
+pending_stream_request_free (gpointer data)
+{
+ PendingStreamRequest *p = data;
+
+ if (p->context != NULL)
+ {
+ GError e = { TP_ERRORS, TP_ERROR_CANCELLED,
+ "The session terminated before the requested stream could be added"
+ };
+
+ dbus_g_method_return_error (p->context, &e);
+ }
+
+ g_hash_table_destroy (p->contents);
+ g_ptr_array_free (p->streams, TRUE);
+
+ g_slice_free (PendingStreamRequest, p);
+}
+
static gboolean
_gabble_media_channel_request_contents (GabbleMediaChannel *chan,
const GArray *media_types,
@@ -1495,9 +1589,9 @@ _gabble_media_channel_request_contents (GabbleMediaChannel *chan,
content_ns, transport_ns);
/* The stream is created in "new-content" callback, and appended to
- * priv->streams. This may or may not happen synchronously (adding
- * streams can take time due to the relay info lookup) so here we just
- * return the contents. */
+ * priv->streams. This is now guaranteed to happen asynchronously (adding
+ * streams can take time due to the relay info lookup, and if it doesn't,
+ * we use an idle so it does). */
g_assert (c != NULL);
g_ptr_array_add (*ret, c);
}
@@ -1622,11 +1716,8 @@ gabble_media_channel_request_streams (TpSvcChannelTypeStreamedMedia *iface,
TpBaseConnection *conn;
GPtrArray *contents;
GError *error = NULL;
- GPtrArray *streams;
- GPtrArray *ret;
TpHandleRepoIface *contact_handles;
gboolean wait;
- guint i;
g_assert (GABBLE_IS_MEDIA_CHANNEL (self));
@@ -1688,26 +1779,10 @@ gabble_media_channel_request_streams (TpSvcChannelTypeStreamedMedia *iface,
&error))
goto error;
- streams = g_ptr_array_sized_new (types->len);
-
- for (i = 0; i < types->len; i++)
- {
- /* For now, preserve the incorrect assumption that streams are added
- * synchronously. The n last streams are the ones we just created,
- * where n = types->len. */
- GabbleMediaStream *stream = g_ptr_array_index (priv->streams,
- priv->streams->len - types->len + i);
-
- g_ptr_array_add (streams, stream);
- }
-
- ret = make_stream_list (self, streams);
-
- g_ptr_array_free (streams, TRUE);
+ priv->pending_stream_requests = g_list_prepend (
+ priv->pending_stream_requests,
+ pending_stream_request_new (contents, context));
g_ptr_array_free (contents, TRUE);
-
- tp_svc_channel_type_streamed_media_return_from_request_streams (context, ret);
- g_ptr_array_free (ret, TRUE);
return;
error:
@@ -1955,6 +2030,12 @@ session_terminated_cb (GabbleJingleSession *session,
TP_CHANNEL_GROUP_FLAG_CAN_ADD,
TP_CHANNEL_GROUP_FLAG_CAN_REMOVE);
+ /* any contents that we were waiting for have now lost */
+ g_list_foreach (priv->pending_stream_requests,
+ (GFunc) pending_stream_request_free, NULL);
+ g_list_free (priv->pending_stream_requests);
+ priv->pending_stream_requests = NULL;
+
/* unref streams */
if (priv->streams != NULL)
{
@@ -2369,6 +2450,30 @@ construct_stream (GabbleMediaChannel *chan,
g_ptr_array_add (priv->streams, stream);
+ /* return from RequestStreams */
+ {
+ GList *link = priv->pending_stream_requests;
+
+ while (link != NULL)
+ {
+ if (pending_stream_request_maybe_satisfy (link->data,
+ chan, c, stream))
+ {
+ GList *dead = link;
+
+ pending_stream_request_free (dead->data);
+
+ link = dead->next;
+ priv->pending_stream_requests = g_list_delete_link (
+ priv->pending_stream_requests, dead);
+ }
+ else
+ {
+ link = link->next;
+ }
+ }
+ }
+
gabble_signal_connect_weak (stream, "close", (GCallback) stream_close_cb,
chan_o);
gabble_signal_connect_weak (stream, "error", (GCallback) stream_error_cb,
@@ -2413,6 +2518,7 @@ construct_stream (GabbleMediaChannel *chan,
typedef struct {
GabbleMediaChannel *self;
GabbleJingleContent *content;
+ gulong removed_id;
gchar *nat_traversal;
gchar *name;
} StreamCreationData;
@@ -2424,7 +2530,13 @@ stream_creation_data_free (gpointer p)
g_free (d->name);
g_free (d->nat_traversal);
- g_object_unref (d->content);
+
+ if (d->content != NULL)
+ {
+ g_signal_handler_disconnect (d->content, d->removed_id);
+ g_object_unref (d->content);
+ }
+
g_object_unref (d->self);
g_slice_free (StreamCreationData, d);
}
@@ -2434,7 +2546,9 @@ construct_stream_later_cb (gpointer user_data)
{
StreamCreationData *d = user_data;
- construct_stream (d->self, d->content, d->name, d->nat_traversal, NULL);
+ if (d->content != NULL)
+ construct_stream (d->self, d->content, d->name, d->nat_traversal, NULL);
+
return FALSE;
}
@@ -2444,11 +2558,48 @@ google_relay_session_cb (GPtrArray *relays,
{
StreamCreationData *d = user_data;
- construct_stream (d->self, d->content, d->name, d->nat_traversal, relays);
+ if (d->content != NULL)
+ construct_stream (d->self, d->content, d->name, d->nat_traversal, relays);
+
stream_creation_data_free (d);
}
static void
+content_removed_cb (GabbleJingleContent *content,
+ StreamCreationData *d)
+{
+ if (d->content != NULL)
+ {
+ GList *link = d->self->priv->pending_stream_requests;
+
+ g_signal_handler_disconnect (d->content, d->removed_id);
+
+ /* return from RequestStreams unsuccessfully */
+ while (link != NULL)
+ {
+ if (pending_stream_request_maybe_fail (link->data,
+ d->self, d->content))
+ {
+ GList *dead = link;
+
+ pending_stream_request_free (dead->data);
+
+ link = dead->next;
+ d->self->priv->pending_stream_requests = g_list_delete_link (
+ d->self->priv->pending_stream_requests, dead);
+ }
+ else
+ {
+ link = link->next;
+ }
+ }
+
+ g_object_unref (d->content);
+ d->content = NULL;
+ }
+}
+
+static void
create_stream_from_content (GabbleMediaChannel *self,
GabbleJingleContent *c)
{
@@ -2470,13 +2621,19 @@ create_stream_from_content (GabbleMediaChannel *self,
"nat-traversal", &nat_traversal,
NULL);
- StreamCreationData *d = g_slice_new0 (StreamCreationData);
+ d = g_slice_new0 (StreamCreationData);
d->self = g_object_ref (self);
d->nat_traversal = nat_traversal;
d->name = name;
d->content = g_object_ref (c);
+ /* If the content gets removed before we've finished looking up its
+ * relay (can this happen?) we need to cancel the creation of the stream,
+ * and make any PendingStreamRequests fail */
+ d->removed_id = g_signal_connect (c, "removed",
+ G_CALLBACK (content_removed_cb), d);
+
if (!tp_strdiff (nat_traversal, "gtalk-p2p"))
{
/* See if our server is Google, and if it is, ask them for a relay.
--
1.5.6.5
More information about the telepathy-commits
mailing list