[Telepathy-commits] [telepathy-gabble/master] Implement SI fallback

Marco Barisione marco at barisione.org
Tue Jan 6 08:41:30 PST 2009


---
 src/Makefile.am                         |    2 +
 src/bytestream-factory.c                |  173 +++++++--
 src/bytestream-factory.h                |    6 +
 src/bytestream-ibb.c                    |   23 ++
 src/bytestream-iface.c                  |    8 +
 src/bytestream-multiple.c               |  604 +++++++++++++++++++++++++++++++
 src/bytestream-multiple.h               |   74 ++++
 src/bytestream-socks5.c                 |   67 +++-
 src/tube-stream.c                       |    8 +
 tests/twisted/Makefile.am               |    1 +
 tests/twisted/tubes/test-si-fallback.py |  332 +++++++++++++++++
 11 files changed, 1241 insertions(+), 57 deletions(-)
 create mode 100644 src/bytestream-multiple.c
 create mode 100644 src/bytestream-multiple.h
 create mode 100644 tests/twisted/tubes/test-si-fallback.py

diff --git a/src/Makefile.am b/src/Makefile.am
index 8fef055..501dacc 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -28,6 +28,8 @@ libgabble_convenience_la_SOURCES = \
     bytestream-iface.c \
     bytestream-muc.h \
     bytestream-muc.c \
+    bytestream-multiple.h \
+    bytestream-multiple.c \
     bytestream-socks5.h \
     bytestream-socks5.c \
     capabilities.h \
diff --git a/src/bytestream-factory.c b/src/bytestream-factory.c
index 65fcfd0..1713d1a 100644
--- a/src/bytestream-factory.c
+++ b/src/bytestream-factory.c
@@ -33,6 +33,7 @@
 #include "bytestream-ibb.h"
 #include "bytestream-iface.h"
 #include "bytestream-muc.h"
+#include "bytestream-multiple.h"
 #include "bytestream-socks5.h"
 #include "connection.h"
 #include "debug.h"
@@ -124,6 +125,11 @@ struct _GabbleBytestreamFactoryPrivate
    * BytestreamIdentifier -> GabbleBytestreamMuc */
   GHashTable *muc_bytestreams;
 
+  /* SI-initiated bytestreams - real data sent through another bytestream.
+   *
+   * BytestreamIdentifier -> GabbleBytestreamMultiple */
+  GHashTable *multiple_bytestreams;
+
   gboolean dispose_has_run;
 };
 
@@ -161,6 +167,9 @@ gabble_bytestream_factory_init (GabbleBytestreamFactory *self)
 
   priv->socks5_bytestreams = g_hash_table_new_full (bytestream_id_hash,
       bytestream_id_equal, bytestream_id_free, g_object_unref);
+
+  priv->multiple_bytestreams = g_hash_table_new_full (bytestream_id_hash,
+      bytestream_id_equal, bytestream_id_free, g_object_unref);
 }
 
 static GObject *
@@ -239,6 +248,9 @@ gabble_bytestream_factory_dispose (GObject *object)
   g_hash_table_destroy (priv->socks5_bytestreams);
   priv->socks5_bytestreams = NULL;
 
+  g_hash_table_destroy (priv->multiple_bytestreams);
+  priv->multiple_bytestreams = NULL;
+
   if (G_OBJECT_CLASS (gabble_bytestream_factory_parent_class)->dispose)
     G_OBJECT_CLASS (gabble_bytestream_factory_parent_class)->dispose (object);
 }
@@ -319,7 +331,7 @@ remove_bytestream (GabbleBytestreamFactory *self,
     (self);
   BytestreamIdentifier bsid = { NULL, NULL };
   guint handle_type;
-  GHashTable *table;
+  GHashTable *table = NULL;
 
   g_object_get (bytestream,
       "stream-id", &(bsid.stream),
@@ -337,6 +349,8 @@ remove_bytestream (GabbleBytestreamFactory *self,
         table = priv->ibb_bytestreams;
       else if (GABBLE_IS_BYTESTREAM_SOCKS5 (bytestream))
         table = priv->socks5_bytestreams;
+      else if (GABBLE_IS_BYTESTREAM_MULTIPLE (bytestream))
+        table = priv->multiple_bytestreams;
     }
 
   if (table == NULL)
@@ -366,10 +380,12 @@ streaminit_parse_request (LmMessage *message,
                           const gchar **stream_id,
                           const gchar **stream_init_id,
                           const gchar **mime_type,
-                          GSList **stream_methods)
+                          GSList **stream_methods,
+                          gboolean *use_fallback)
 {
   LmMessageNode *iq = message->node;
   LmMessageNode *feature, *x, *field, *stream_method;
+  const gchar *list_type;
 
   *stream_init_id = lm_message_node_get_attribute (iq, "id");
 
@@ -423,8 +439,16 @@ streaminit_parse_request (LmMessage *message,
         /* some future field, ignore it */
         continue;
 
-      if (tp_strdiff (lm_message_node_get_attribute (field, "type"),
-            "list-single"))
+      list_type = lm_message_node_get_attribute (field, "type");
+      if (!tp_strdiff (list_type, "list-single"))
+        {
+          *use_fallback = FALSE;
+        }
+      else if (!tp_strdiff (list_type, "list-multiple"))
+        {
+          *use_fallback = TRUE;
+        }
+      else
         {
           NODE_DEBUG (message->node, "SI request's stream-method field was "
               "not of type list-single");
@@ -486,6 +510,7 @@ gabble_bytestream_factory_make_stream_init_iq (const gchar *full_jid,
                                                const gchar *stream_id,
                                                const gchar *profile)
 {
+  /* FIXME list-multiple is not supported by old versions of gabble */
   return lm_message_build (full_jid, LM_MESSAGE_TYPE_IQ,
       '@', "type", "set",
       '(', "si", "",
@@ -499,7 +524,7 @@ gabble_bytestream_factory_make_stream_init_iq (const gchar *full_jid,
             '@', "type", "form",
             '(', "field", "",
               '@', "var", "stream-method",
-              '@', "type", "list-single",
+              '@', "type", "list-multiple",
               '(', "option", "",
                 '(', "value", NS_BYTESTREAMS,
                 ')',
@@ -538,6 +563,8 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
   GSList *l;
   const gchar *profile, *from, *stream_id, *stream_init_id, *mime_type;
   GSList *stream_methods = NULL;
+  gboolean use_fallback;
+  GabbleBytestreamMultiple *multiple;
   gchar *peer_resource = NULL;
 
   if (lm_message_get_sub_type (msg) != LM_MESSAGE_SUB_TYPE_SET)
@@ -551,7 +578,7 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
    * it or send an error reply */
 
   if (!streaminit_parse_request (msg, si, &profile, &from, &stream_id,
-        &stream_init_id, &mime_type, &stream_methods))
+        &stream_init_id, &mime_type, &stream_methods, &use_fallback))
     {
       _gabble_connection_send_iq_error (priv->conn, msg,
           XMPP_ERROR_BAD_REQUEST, "failed to parse SI request");
@@ -575,6 +602,13 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
       gabble_decode_jid (from, NULL, NULL, &peer_resource);
     }
 
+  if (use_fallback)
+    multiple = gabble_bytestream_factory_create_multiple (self, peer_handle,
+        stream_id, stream_init_id, peer_resource,
+        GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
+  else
+    multiple = NULL;
+
   /* check stream method */
   for (l = stream_methods; l != NULL; l = l->next)
     {
@@ -586,7 +620,6 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
             gabble_bytestream_factory_create_ibb (self, peer_handle,
               stream_id, stream_init_id, peer_resource,
               GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
-          break;
         }
       else if (!tp_strdiff (l->data, NS_BYTESTREAMS))
         {
@@ -594,10 +627,22 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
             gabble_bytestream_factory_create_socks5 (self, peer_handle,
               stream_id, stream_init_id, peer_resource,
               GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
-          break;
         }
+      else
+        {
+          continue;
+        }
+
+      if (multiple != NULL)
+        gabble_bytestream_multiple_add_bytestream (multiple, bytestream);
+      else
+        break;
     }
 
+  /* FIXME check if there are bytestream methods in the multiple one */
+  if (multiple != NULL)
+    bytestream = GABBLE_BYTESTREAM_IFACE (multiple);
+
   if (bytestream == NULL)
     {
       DEBUG ("SI request doesn't contain any supported stream methods.");
@@ -1249,6 +1294,35 @@ gabble_bytestream_factory_create_socks5 (GabbleBytestreamFactory *self,
   return socks5;
 }
 
+GabbleBytestreamMultiple *
+gabble_bytestream_factory_create_multiple (GabbleBytestreamFactory *self,
+                                           TpHandle peer_handle,
+                                           const gchar *stream_id,
+                                           const gchar *stream_init_id,
+                                           const gchar *peer_resource,
+                                           GabbleBytestreamState state)
+{
+  GabbleBytestreamFactoryPrivate *priv;
+  GabbleBytestreamMultiple *multiple;
+
+  g_return_val_if_fail (GABBLE_IS_BYTESTREAM_FACTORY (self), NULL);
+  priv = GABBLE_BYTESTREAM_FACTORY_GET_PRIVATE (self);
+
+  multiple = g_object_new (GABBLE_TYPE_BYTESTREAM_MULTIPLE,
+      "connection", priv->conn,
+      "peer-handle", peer_handle,
+      "stream-id", stream_id,
+      "state", state,
+      "stream-init-id", stream_init_id,
+      "peer-resource", peer_resource,
+      NULL);
+
+  g_signal_connect (multiple, "state-changed",
+      G_CALLBACK (bytestream_state_changed_cb), self);
+
+  return multiple;
+}
+
 struct _streaminit_reply_cb_data
 {
   gchar *stream_id;
@@ -1268,6 +1342,7 @@ streaminit_reply_cb (GabbleConnection *conn,
   struct _streaminit_reply_cb_data *data =
     (struct _streaminit_reply_cb_data*) user_data;
   GabbleBytestreamIface *bytestream = NULL;
+  GSList *bytestream_list = NULL;
   gchar *peer_resource = NULL;
   LmMessageNode *si, *feature, *x, *field, *value;
   const gchar *from, *stream_method;
@@ -1332,45 +1407,64 @@ streaminit_reply_cb (GabbleConnection *conn,
         /* some future field, ignore it */
         continue;
 
-      value = lm_message_node_get_child (field, "value");
-      if (value == NULL)
+      for (value = field->children; value; value = value->next)
         {
-          NODE_DEBUG (reply_msg->node, "SI reply's stream-method field "
-              "doesn't contain stream-method value");
-          goto END;
+          stream_method = lm_message_node_get_value (value);
+
+          if (!tp_strdiff (stream_method, NS_IBB))
+            {
+              /* Remote user has accepted the stream */
+              bytestream = GABBLE_BYTESTREAM_IFACE (
+                  gabble_bytestream_factory_create_ibb (self, peer_handle,
+                  data->stream_id, NULL, peer_resource,
+                  GABBLE_BYTESTREAM_STATE_INITIATING));
+            }
+          else if (!tp_strdiff (stream_method, NS_BYTESTREAMS))
+            {
+              /* Remote user has accepted the stream */
+              bytestream = GABBLE_BYTESTREAM_IFACE (
+                  gabble_bytestream_factory_create_socks5 (self, peer_handle,
+                  data->stream_id, NULL, peer_resource,
+                  GABBLE_BYTESTREAM_STATE_INITIATING));
+            }
+          else
+            {
+              DEBUG ("Remote user chose an unsupported stream method");
+              goto END;
+            }
+
+          bytestream_list = g_slist_prepend (bytestream_list, bytestream);
         }
 
-      stream_method = lm_message_node_get_value (value);
+      break;
+    }
 
-      if (!tp_strdiff (stream_method, NS_IBB))
-        {
-          /* Remote user has accepted the stream */
-          bytestream = GABBLE_BYTESTREAM_IFACE (
-              gabble_bytestream_factory_create_ibb (self, peer_handle,
-              data->stream_id, NULL, peer_resource,
-              GABBLE_BYTESTREAM_STATE_INITIATING));
-        }
-      else if (!tp_strdiff (stream_method, NS_BYTESTREAMS))
-        {
-          /* Remote user has accepted the stream */
-          bytestream = GABBLE_BYTESTREAM_IFACE (
-              gabble_bytestream_factory_create_socks5 (self, peer_handle,
-              data->stream_id, NULL, peer_resource,
-              GABBLE_BYTESTREAM_STATE_INITIATING));
-        }
-      else
+  if (bytestream_list == NULL)
+    goto END;
+
+  /* Create a multiple bytestream if needed */
+  if (g_slist_length (bytestream_list) > 1)
+    {
+      GSList *l;
+
+      bytestream = GABBLE_BYTESTREAM_IFACE (
+          gabble_bytestream_factory_create_multiple (self, peer_handle,
+          data->stream_id, NULL, peer_resource,
+          GABBLE_BYTESTREAM_STATE_INITIATING));
+
+      l = bytestream_list = g_slist_reverse (bytestream_list);
+
+      while (l != NULL)
         {
-          DEBUG ("Remote user chose an unsupported stream method");
-          goto END;
-        }
+          gabble_bytestream_multiple_add_bytestream (
+              GABBLE_BYTESTREAM_MULTIPLE (bytestream), l->data);
 
-      /* no need to parse the rest of the fields, we've found the one we
-       * wanted */
-      break;
+          l = g_slist_next (l);
+        }
     }
 
-  if (bytestream == NULL)
-    goto END;
+  /* If there is only one bytestream than we already have
+     bytestream == bytestream_list->data */
 
   DEBUG ("stream %s accepted", data->stream_id);
 
@@ -1402,6 +1496,7 @@ END:
 
   g_free (data->stream_id);
   g_slice_free (struct _streaminit_reply_cb_data, data);
+  g_slist_free (bytestream_list);
 
   return LM_HANDLER_RESULT_REMOVE_MESSAGE;
 }
diff --git a/src/bytestream-factory.h b/src/bytestream-factory.h
index 1fe01bf..4523ef4 100644
--- a/src/bytestream-factory.h
+++ b/src/bytestream-factory.h
@@ -28,6 +28,7 @@
 #include "bytestream-iface.h"
 #include "bytestream-ibb.h"
 #include "bytestream-muc.h"
+#include "bytestream-multiple.h"
 #include "bytestream-socks5.h"
 #include "connection.h"
 
@@ -86,6 +87,11 @@ GabbleBytestreamSocks5 *gabble_bytestream_factory_create_socks5 (
     const gchar *stream_init_id, const gchar *peer_resource,
     GabbleBytestreamState state);
 
+GabbleBytestreamMultiple *gabble_bytestream_factory_create_multiple (
+    GabbleBytestreamFactory *self, TpHandle peer_handle,
+    const gchar *stream_id, const gchar *stream_init_id,
+    const gchar *peer_resource, GabbleBytestreamState state);
+
 LmMessage *gabble_bytestream_factory_make_stream_init_iq (
     const gchar *full_jid, const gchar *stream_id, const gchar *profile);
 
diff --git a/src/bytestream-ibb.c b/src/bytestream-ibb.c
index 7ae1b48..0cbbcfd 100644
--- a/src/bytestream-ibb.c
+++ b/src/bytestream-ibb.c
@@ -52,6 +52,7 @@ enum
 {
   DATA_RECEIVED,
   STATE_CHANGED,
+  CONNECTION_ERROR,
   LAST_SIGNAL
 };
 
@@ -69,6 +70,7 @@ enum
   PROP_PEER_RESOURCE,
   PROP_STATE,
   PROP_PROTOCOL,
+  PROP_CLOSE_ON_CONNECTION_ERROR,
   PROP_BLOCK_SIZE,
   LAST_PROPERTY
 };
@@ -82,6 +84,7 @@ struct _GabbleBytestreamIBBPrivate
   gchar *peer_resource;
   GabbleBytestreamState state;
   gchar *peer_jid;
+  gboolean close_on_connection_error;
   guint block_size;
 
   guint16 seq;
@@ -98,6 +101,8 @@ gabble_bytestream_ibb_init (GabbleBytestreamIBB *self)
       GABBLE_TYPE_BYTESTREAM_IBB, GabbleBytestreamIBBPrivate);
 
   self->priv = priv;
+
+  priv->close_on_connection_error = TRUE;
 }
 
 static void
@@ -175,6 +180,9 @@ gabble_bytestream_ibb_get_property (GObject *object,
       case PROP_PROTOCOL:
         g_value_set_string (value, NS_IBB);
         break;
+      case PROP_CLOSE_ON_CONNECTION_ERROR:
+        g_value_set_boolean (value, priv->close_on_connection_error);
+        break;
       case PROP_BLOCK_SIZE:
         g_value_set_uint (value, priv->block_size);
         break;
@@ -220,6 +228,9 @@ gabble_bytestream_ibb_set_property (GObject *object,
               g_signal_emit (object, signals[STATE_CHANGED], 0, priv->state);
             }
         break;
+      case PROP_CLOSE_ON_CONNECTION_ERROR:
+        priv->close_on_connection_error = g_value_get_boolean (value);
+        break;
       case PROP_BLOCK_SIZE:
         priv->block_size = g_value_get_uint (value);
         break;
@@ -294,6 +305,9 @@ gabble_bytestream_ibb_class_init (
        "state");
    g_object_class_override_property (object_class, PROP_PROTOCOL,
        "protocol");
+   g_object_class_override_property (object_class,
+       PROP_CLOSE_ON_CONNECTION_ERROR, "close-on-connection-error");
+
 
   param_spec = g_param_spec_string (
       "peer-resource",
@@ -339,6 +353,15 @@ gabble_bytestream_ibb_class_init (
                   NULL, NULL,
                   gabble_marshal_VOID__UINT,
                   G_TYPE_NONE, 1, G_TYPE_UINT);
+
+  signals[CONNECTION_ERROR] =
+    g_signal_new ("connection-error",
+                  G_OBJECT_CLASS_TYPE (gabble_bytestream_ibb_class),
+                  G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+                  0,
+                  NULL, NULL,
+                  gabble_marshal_VOID__VOID,
+                  G_TYPE_NONE, 0);
 }
 
 /*
diff --git a/src/bytestream-iface.c b/src/bytestream-iface.c
index 72e0381..c3c8aac 100644
--- a/src/bytestream-iface.c
+++ b/src/bytestream-iface.c
@@ -132,6 +132,14 @@ gabble_bytestream_iface_base_init (gpointer klass)
           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
       g_object_interface_install_property (klass, param_spec);
 
+      param_spec = g_param_spec_boolean (
+          "close-on-connection-error",
+          "Close on connection error",
+          "Wether to send a close stanza if there was a connection error",
+          TRUE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+      g_object_interface_install_property (klass, param_spec);
+
       initialized = TRUE;
     }
 }
diff --git a/src/bytestream-multiple.c b/src/bytestream-multiple.c
new file mode 100644
index 0000000..844af6f
--- /dev/null
+++ b/src/bytestream-multiple.c
@@ -0,0 +1,604 @@
+/*
+ * bytestream-multiple.c - Source for GabbleBytestreamMultiple
+ * Copyright (C) 2007-2008 Collabora Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include "config.h"
+#include "bytestream-multiple.h"
+
+#include <dbus/dbus-glib.h>
+#include <dbus/dbus-glib-lowlevel.h>
+#include <loudmouth/loudmouth.h>
+#include <telepathy-glib/interfaces.h>
+
+#define DEBUG_FLAG GABBLE_DEBUG_BYTESTREAM
+
+#include "base64.h"
+#include "bytestream-factory.h"
+#include "bytestream-iface.h"
+#include "connection.h"
+#include "debug.h"
+#include "disco.h"
+#include "gabble-signals-marshal.h"
+#include "namespaces.h"
+#include "util.h"
+
+static void
+bytestream_iface_init (gpointer g_iface, gpointer iface_data);
+
+G_DEFINE_TYPE_WITH_CODE (GabbleBytestreamMultiple, gabble_bytestream_multiple,
+    G_TYPE_OBJECT,
+    G_IMPLEMENT_INTERFACE (GABBLE_TYPE_BYTESTREAM_IFACE,
+      bytestream_iface_init));
+
+/* signals */
+enum
+{
+  DATA_RECEIVED,
+  STATE_CHANGED,
+  CONNECTION_ERROR,
+  LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = {0};
+
+/* properties */
+enum
+{
+  PROP_CONNECTION = 1,
+  PROP_PEER_HANDLE,
+  PROP_PEER_HANDLE_TYPE,
+  PROP_STREAM_ID,
+  PROP_STREAM_INIT_ID,
+  PROP_PEER_JID,
+  PROP_PEER_RESOURCE,
+  PROP_STATE,
+  PROP_PROTOCOL,
+  PROP_CLOSE_ON_CONNECTION_ERROR,
+  LAST_PROPERTY
+};
+
+struct _GabbleBytestreamMultiplePrivate
+{
+  GabbleConnection *conn;
+  TpHandle peer_handle;
+  gchar *stream_id;
+  gchar *stream_init_id;
+  gchar *peer_resource;
+  GabbleBytestreamState state;
+  gchar *peer_jid;
+  gboolean close_on_connection_error;
+
+  GList *bytestreams;
+
+  gboolean dispose_has_run;
+};
+
+#define GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE(obj) ((obj)->priv)
+
+static void gabble_bytestream_multiple_close (GabbleBytestreamIface *iface,
+    GError *error);
+
+static void
+gabble_bytestream_multiple_init (GabbleBytestreamMultiple *self)
+{
+  GabbleBytestreamMultiplePrivate *priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
+      GABBLE_TYPE_BYTESTREAM_MULTIPLE, GabbleBytestreamMultiplePrivate);
+
+  self->priv = priv;
+
+  priv->close_on_connection_error = TRUE;
+}
+
+static void
+gabble_bytestream_multiple_dispose (GObject *object)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+  TpHandleRepoIface *contact_repo = tp_base_connection_get_handles (
+      (TpBaseConnection *) priv->conn, TP_HANDLE_TYPE_CONTACT);
+
+  if (priv->dispose_has_run)
+    return;
+
+  priv->dispose_has_run = TRUE;
+
+  tp_handle_unref (contact_repo, priv->peer_handle);
+
+  if (priv->state != GABBLE_BYTESTREAM_STATE_CLOSED)
+    {
+      gabble_bytestream_iface_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
+    }
+
+  G_OBJECT_CLASS (gabble_bytestream_multiple_parent_class)->dispose (object);
+}
+
+static void
+gabble_bytestream_multiple_finalize (GObject *object)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+  g_free (priv->stream_id);
+  g_free (priv->stream_init_id);
+  g_free (priv->peer_resource);
+  g_free (priv->peer_jid);
+
+  G_OBJECT_CLASS (gabble_bytestream_multiple_parent_class)->finalize (object);
+}
+
+static void
+gabble_bytestream_multiple_get_property (GObject *object,
+                                         guint property_id,
+                                         GValue *value,
+                                         GParamSpec *pspec)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+  switch (property_id)
+    {
+      case PROP_CONNECTION:
+        g_value_set_object (value, priv->conn);
+        break;
+      case PROP_PEER_HANDLE:
+        g_value_set_uint (value, priv->peer_handle);
+        break;
+      case PROP_PEER_HANDLE_TYPE:
+        g_value_set_uint (value, TP_HANDLE_TYPE_CONTACT);
+        break;
+      case PROP_STREAM_ID:
+        g_value_set_string (value, priv->stream_id);
+        break;
+      case PROP_STREAM_INIT_ID:
+        g_value_set_string (value, priv->stream_init_id);
+        break;
+      case PROP_PEER_RESOURCE:
+        g_value_set_string (value, priv->peer_resource);
+        break;
+      case PROP_PEER_JID:
+        g_value_set_string (value, priv->peer_jid);
+        break;
+      case PROP_STATE:
+        g_value_set_uint (value, priv->state);
+        break;
+      case PROP_PROTOCOL:
+        g_value_set_string (value, NS_BYTESTREAMS);
+        break;
+      case PROP_CLOSE_ON_CONNECTION_ERROR:
+        g_value_set_boolean (value, priv->close_on_connection_error);
+        break;
+      default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+        break;
+    }
+}
+
+static void
+gabble_bytestream_multiple_set_property (GObject *object,
+                                         guint property_id,
+                                         const GValue *value,
+                                         GParamSpec *pspec)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+  switch (property_id)
+    {
+      case PROP_CONNECTION:
+        priv->conn = g_value_get_object (value);
+        break;
+      case PROP_PEER_HANDLE:
+        priv->peer_handle = g_value_get_uint (value);
+        break;
+      case PROP_STREAM_ID:
+        g_free (priv->stream_id);
+        priv->stream_id = g_value_dup_string (value);
+        break;
+      case PROP_STREAM_INIT_ID:
+        g_free (priv->stream_init_id);
+        priv->stream_init_id = g_value_dup_string (value);
+        break;
+      case PROP_PEER_RESOURCE:
+        g_free (priv->peer_resource);
+        priv->peer_resource = g_value_dup_string (value);
+        break;
+      case PROP_STATE:
+        if (priv->state != g_value_get_uint (value))
+          {
+            priv->state = g_value_get_uint (value);
+            g_signal_emit (object, signals[STATE_CHANGED], 0, priv->state);
+          }
+        break;
+      case PROP_CLOSE_ON_CONNECTION_ERROR:
+        priv->close_on_connection_error = g_value_get_boolean (value);
+        break;
+      default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+        break;
+    }
+}
+
+static GObject *
+gabble_bytestream_multiple_constructor (GType type,
+                                        guint n_props,
+                                        GObjectConstructParam *props)
+{
+  GObject *obj;
+  GabbleBytestreamMultiplePrivate *priv;
+  TpHandleRepoIface *contact_repo;
+  const gchar *jid;
+
+  obj = G_OBJECT_CLASS (gabble_bytestream_multiple_parent_class)->
+           constructor (type, n_props, props);
+
+  priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (GABBLE_BYTESTREAM_MULTIPLE (obj));
+
+  g_assert (priv->conn != NULL);
+  g_assert (priv->peer_handle != 0);
+  g_assert (priv->stream_id != NULL);
+
+  contact_repo = tp_base_connection_get_handles (
+      (TpBaseConnection *) priv->conn, TP_HANDLE_TYPE_CONTACT);
+
+  tp_handle_ref (contact_repo, priv->peer_handle);
+
+  jid = tp_handle_inspect (contact_repo, priv->peer_handle);
+
+  if (priv->peer_resource != NULL)
+    priv->peer_jid = g_strdup_printf ("%s/%s", jid, priv->peer_resource);
+  else
+    priv->peer_jid = g_strdup (jid);
+
+  return obj;
+}
+
+static void
+gabble_bytestream_multiple_class_init (
+    GabbleBytestreamMultipleClass *gabble_bytestream_multiple_class)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (gabble_bytestream_multiple_class);
+  GParamSpec *param_spec;
+
+  g_type_class_add_private (gabble_bytestream_multiple_class,
+      sizeof (GabbleBytestreamMultiplePrivate));
+
+  object_class->dispose = gabble_bytestream_multiple_dispose;
+  object_class->finalize = gabble_bytestream_multiple_finalize;
+
+  object_class->get_property = gabble_bytestream_multiple_get_property;
+  object_class->set_property = gabble_bytestream_multiple_set_property;
+  object_class->constructor = gabble_bytestream_multiple_constructor;
+
+   g_object_class_override_property (object_class, PROP_CONNECTION,
+      "connection");
+   g_object_class_override_property (object_class, PROP_PEER_HANDLE,
+       "peer-handle");
+   g_object_class_override_property (object_class, PROP_PEER_HANDLE_TYPE,
+       "peer-handle-type");
+   g_object_class_override_property (object_class, PROP_STREAM_ID,
+       "stream-id");
+   g_object_class_override_property (object_class, PROP_PEER_JID,
+       "peer-jid");
+   g_object_class_override_property (object_class, PROP_STATE,
+       "state");
+   g_object_class_override_property (object_class, PROP_PROTOCOL,
+       "protocol");
+   g_object_class_override_property (object_class,
+        PROP_CLOSE_ON_CONNECTION_ERROR, "close-on-connection-error");
+
+  param_spec = g_param_spec_string (
+      "peer-resource",
+      "Peer resource",
+      "the resource used by the remote peer during the SI, if any",
+      NULL,
+      G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+  g_object_class_install_property (object_class, PROP_PEER_RESOURCE,
+      param_spec);
+
+  param_spec = g_param_spec_string (
+      "stream-init-id",
+      "stream init ID",
+      "the iq ID of the SI request, if any",
+      NULL,
+      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+  g_object_class_install_property (object_class, PROP_STREAM_INIT_ID,
+      param_spec);
+
+  signals[DATA_RECEIVED] =
+    g_signal_new ("data-received",
+                  G_OBJECT_CLASS_TYPE (gabble_bytestream_multiple_class),
+                  G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+                  0,
+                  NULL, NULL,
+                  g_cclosure_marshal_VOID__UINT_POINTER,
+                  G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_POINTER);
+
+  signals[STATE_CHANGED] =
+    g_signal_new ("state-changed",
+                  G_OBJECT_CLASS_TYPE (gabble_bytestream_multiple_class),
+                  G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+                  0,
+                  NULL, NULL,
+                  gabble_marshal_VOID__UINT,
+                  G_TYPE_NONE, 1, G_TYPE_UINT);
+
+  signals[CONNECTION_ERROR] =
+    g_signal_new ("connection-error",
+                  G_OBJECT_CLASS_TYPE (gabble_bytestream_multiple_class),
+                  G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+                  0,
+                  NULL, NULL,
+                  gabble_marshal_VOID__VOID,
+                  G_TYPE_NONE, 0);
+}
+
+/*
+ * gabble_bytestream_multiple_send
+ *
+ * Implements gabble_bytestream_iface_send on GabbleBytestreamIface
+ */
+static gboolean
+gabble_bytestream_multiple_send (GabbleBytestreamIface *iface,
+                                 guint len,
+                                 const gchar *str)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+  GabbleBytestreamIface *bytestream;
+
+  if (priv->state != GABBLE_BYTESTREAM_STATE_OPEN)
+    {
+      DEBUG ("can't send data through a not open bytestream (state: %d)",
+          priv->state);
+      return FALSE;
+    }
+
+  g_assert (priv->bytestreams);
+  g_assert (priv->bytestreams->data);
+
+  bytestream = priv->bytestreams->data;
+  return gabble_bytestream_iface_send (bytestream, len, str);
+}
+
+/*
+ * gabble_bytestream_multiple_accept
+ *
+ * Implements gabble_bytestream_iface_accept on GabbleBytestreamIface
+ */
+static void
+gabble_bytestream_multiple_accept (GabbleBytestreamIface *iface,
+                                   GabbleBytestreamAugmentSiAcceptReply func,
+                                   gpointer user_data)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+  LmMessage *msg;
+  LmMessageNode *si;
+
+  if (priv->state != GABBLE_BYTESTREAM_STATE_LOCAL_PENDING)
+    {
+      /* The stream was previoulsy or automatically accepted */
+      return;
+    }
+
+  msg = gabble_bytestream_factory_make_accept_iq (priv->peer_jid,
+      priv->stream_init_id, NS_BYTESTREAMS);
+  si = lm_message_node_get_child_with_namespace (msg->node, "si", NS_SI);
+  g_assert (si != NULL);
+
+  if (func != NULL)
+    {
+      /* let the caller add his profile specific data */
+      func (si, user_data);
+    }
+
+  if (_gabble_connection_send (priv->conn, msg, NULL))
+    {
+      DEBUG ("stream %s with %s is now accepted", priv->stream_id,
+          priv->peer_jid);
+      g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_ACCEPTED, NULL);
+    }
+
+  lm_message_unref (msg);
+}
+
+static void
+gabble_bytestream_multiple_decline (GabbleBytestreamMultiple *self,
+                                    GError *error)
+{
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+  LmMessage *msg;
+
+  g_return_if_fail (priv->state == GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
+
+  msg = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
+      '@', "type", "error",
+      '@', "id", priv->stream_init_id,
+      NULL);
+
+  if (error != NULL && error->domain == GABBLE_XMPP_ERROR)
+    {
+      gabble_xmpp_error_to_node (error->code, msg->node, error->message);
+    }
+  else
+    {
+      gabble_xmpp_error_to_node (XMPP_ERROR_FORBIDDEN, msg->node,
+          "Offer Declined");
+    }
+
+  _gabble_connection_send (priv->conn, msg, NULL);
+
+  lm_message_unref (msg);
+
+  g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
+}
+
+/*
+ * gabble_bytestream_multiple_close
+ *
+ * Implements gabble_bytestream_iface_close on GabbleBytestreamIface
+ */
+static void
+gabble_bytestream_multiple_close (GabbleBytestreamIface *iface,
+                                GError *error)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+  if (priv->state == GABBLE_BYTESTREAM_STATE_CLOSED)
+     /* bytestream already closed, do nothing */
+     return;
+
+  if (priv->state == GABBLE_BYTESTREAM_STATE_LOCAL_PENDING)
+    {
+      /* Stream was created using SI so we decline the request */
+      gabble_bytestream_multiple_decline (self, error);
+    }
+  else
+    {
+      /* FIXME: send a close stanza */
+      /* FIXME: close the sub-bytesreams */
+
+      g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
+    }
+}
+
+/*
+ * gabble_bytestream_multiple_initiate
+ *
+ * Implements gabble_bytestream_iface_initiate on GabbleBytestreamIface
+ */
+static gboolean
+gabble_bytestream_multiple_initiate (GabbleBytestreamIface *iface)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+  GabbleBytestreamIface *bytestream;
+
+  if (priv->state != GABBLE_BYTESTREAM_STATE_INITIATING)
+    {
+      DEBUG ("bytestream is not is the initiating state (state %d)",
+          priv->state);
+      return FALSE;
+    }
+
+  if (priv->bytestreams == NULL)
+    return FALSE;
+
+  /* Initiate the first available bytestream */
+  bytestream = priv->bytestreams->data;
+
+  return gabble_bytestream_iface_initiate (bytestream);
+}
+
+static void
+bytestream_connection_error_cb (GabbleBytestreamIface *failed,
+                                gpointer user_data)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (user_data);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+  GabbleBytestreamIface *fallback;
+
+  priv->bytestreams = g_list_remove (priv->bytestreams, failed);
+
+  if (!priv->bytestreams)
+    return;
+
+  /* If we have other methods to try, prevent the failed bytestrem to send a
+     close stanza */
+  g_object_set (failed, "close-on-connection-error", FALSE, NULL);
+
+  g_object_unref (failed);
+
+  fallback = priv->bytestreams->data;
+
+  DEBUG ("Trying alternative streaming method");
+
+  g_object_set (fallback, "state", GABBLE_BYTESTREAM_STATE_INITIATING, NULL);
+  gabble_bytestream_iface_initiate (fallback);
+}
+
+static void
+bytestream_data_received_cb (GabbleBytestreamIface *bytestream,
+                             TpHandle sender,
+                             GString *str,
+                             gpointer user_data)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (user_data);
+
+  /* Just forward the data */
+  g_signal_emit (G_OBJECT (self), signals[DATA_RECEIVED], 0, sender, str);
+}
+
+static void
+bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
+                             GabbleBytestreamState state,
+                             gpointer user_data)
+{
+  GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (user_data);
+  GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+  if (state == GABBLE_BYTESTREAM_STATE_CLOSED &&
+      g_list_length (priv->bytestreams) <= 1)
+    {
+      return;
+    }
+
+  g_object_set (self, "state", state, NULL);
+}
+
+/*
+ * gabble_bytestream_multiple_add_bytestream
+ *
+ * Add an alternative stream method.
+ */
+void
+gabble_bytestream_multiple_add_bytestream (GabbleBytestreamMultiple *self,
+                                           GabbleBytestreamIface *bytestream)
+{
+  GabbleBytestreamMultiplePrivate *priv;
+
+  g_return_if_fail (GABBLE_IS_BYTESTREAM_MULTIPLE (self));
+  g_return_if_fail (GABBLE_IS_BYTESTREAM_IFACE (bytestream));
+
+  priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+  DEBUG ("Add bytestream");
+
+  g_object_ref (bytestream);
+  priv->bytestreams = g_list_append (priv->bytestreams, bytestream);
+
+  g_signal_connect (bytestream, "connection-error",
+      G_CALLBACK (bytestream_connection_error_cb), self);
+  g_signal_connect (bytestream, "data-received",
+      G_CALLBACK (bytestream_data_received_cb), self);
+  g_signal_connect (bytestream, "state-changed",
+      G_CALLBACK (bytestream_state_changed_cb), self);
+}
+
+static void
+bytestream_iface_init (gpointer g_iface,
+                       gpointer iface_data)
+{
+  GabbleBytestreamIfaceClass *klass = (GabbleBytestreamIfaceClass *) g_iface;
+
+  klass->initiate = gabble_bytestream_multiple_initiate;
+  klass->send = gabble_bytestream_multiple_send;
+  klass->close = gabble_bytestream_multiple_close;
+  klass->accept = gabble_bytestream_multiple_accept;
+}
diff --git a/src/bytestream-multiple.h b/src/bytestream-multiple.h
new file mode 100644
index 0000000..3334a19
--- /dev/null
+++ b/src/bytestream-multiple.h
@@ -0,0 +1,74 @@
+/*
+ * bytestream-socks5.h - Header for GabbleBytestreamMultiple
+ * Copyright (C) 2007-2008 Collabora Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#ifndef __GABBLE_BYTESTREAM_MULTIPLE_H__
+#define __GABBLE_BYTESTREAM_MULTIPLE_H__
+
+#include <stdlib.h>
+
+#include <glib-object.h>
+#include <loudmouth/loudmouth.h>
+
+#include <telepathy-glib/base-connection.h>
+
+#include "bytestream-iface.h"
+#include "error.h"
+
+G_BEGIN_DECLS
+
+typedef struct _GabbleBytestreamMultiple GabbleBytestreamMultiple;
+typedef struct _GabbleBytestreamMultipleClass GabbleBytestreamMultipleClass;
+typedef struct _GabbleBytestreamMultiplePrivate GabbleBytestreamMultiplePrivate;
+
+struct _GabbleBytestreamMultipleClass {
+  GObjectClass parent_class;
+};
+
+struct _GabbleBytestreamMultiple {
+  GObject parent;
+
+  GabbleBytestreamMultiplePrivate *priv;
+};
+
+GType gabble_bytestream_multiple_get_type (void);
+
+/* TYPE MACROS */
+#define GABBLE_TYPE_BYTESTREAM_MULTIPLE \
+  (gabble_bytestream_multiple_get_type ())
+#define GABBLE_BYTESTREAM_MULTIPLE(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), GABBLE_TYPE_BYTESTREAM_MULTIPLE,\
+                              GabbleBytestreamMultiple))
+#define GABBLE_BYTESTREAM_MULTIPLE_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), GABBLE_TYPE_BYTESTREAM_MULTIPLE,\
+                           GabbleBytestreamMultipleClass))
+#define GABBLE_IS_BYTESTREAM_MULTIPLE(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), GABBLE_TYPE_BYTESTREAM_MULTIPLE))
+#define GABBLE_IS_BYTESTREAM_MULTIPLE_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), GABBLE_TYPE_BYTESTREAM_MULTIPLE))
+#define GABBLE_BYTESTREAM_MULTIPLE_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj), GABBLE_TYPE_BYTESTREAM_MULTIPLE,\
+                              GabbleBytestreamMultipleClass))
+
+void
+gabble_bytestream_multiple_add_bytestream (GabbleBytestreamMultiple *self,
+                                           GabbleBytestreamIface *bytestream);
+
+G_END_DECLS
+
+#endif /* #ifndef __GABBLE_BYTESTREAM_MULTIPLE_H__ */
diff --git a/src/bytestream-socks5.c b/src/bytestream-socks5.c
index f570ccc..ad661a7 100644
--- a/src/bytestream-socks5.c
+++ b/src/bytestream-socks5.c
@@ -67,6 +67,7 @@ enum
 {
   DATA_RECEIVED,
   STATE_CHANGED,
+  CONNECTION_ERROR,
   LAST_SIGNAL
 };
 
@@ -84,6 +85,7 @@ enum
   PROP_PEER_RESOURCE,
   PROP_STATE,
   PROP_PROTOCOL,
+  PROP_CLOSE_ON_CONNECTION_ERROR,
   LAST_PROPERTY
 };
 
@@ -154,6 +156,7 @@ struct _GabbleBytestreamSocks5Private
   gchar *peer_resource;
   GabbleBytestreamState bytestream_state;
   gchar *peer_jid;
+  gboolean close_on_connection_error;
 
   GSList *streamhosts;
   LmMessage *msg_for_acknowledge_connection;
@@ -192,6 +195,8 @@ gabble_bytestream_socks5_init (GabbleBytestreamSocks5 *self)
       GABBLE_TYPE_BYTESTREAM_SOCKS5, GabbleBytestreamSocks5Private);
 
   self->priv = priv;
+
+  priv->close_on_connection_error = TRUE;
 }
 
 static void
@@ -272,6 +277,9 @@ gabble_bytestream_socks5_get_property (GObject *object,
       case PROP_PROTOCOL:
         g_value_set_string (value, NS_BYTESTREAMS);
         break;
+      case PROP_CLOSE_ON_CONNECTION_ERROR:
+        g_value_set_boolean (value, priv->close_on_connection_error);
+        break;
       default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
         break;
@@ -314,6 +322,9 @@ gabble_bytestream_socks5_set_property (GObject *object,
               g_signal_emit (object, signals[STATE_CHANGED], 0, priv->bytestream_state);
             }
         break;
+      case PROP_CLOSE_ON_CONNECTION_ERROR:
+        priv->close_on_connection_error = g_value_get_boolean (value);
+        break;
       default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
         break;
@@ -385,6 +396,8 @@ gabble_bytestream_socks5_class_init (
        "state");
    g_object_class_override_property (object_class, PROP_PROTOCOL,
        "protocol");
+   g_object_class_override_property (object_class,
+       PROP_CLOSE_ON_CONNECTION_ERROR, "close-on-connection-error");
 
   param_spec = g_param_spec_string (
       "peer-resource",
@@ -421,6 +434,15 @@ gabble_bytestream_socks5_class_init (
                   NULL, NULL,
                   gabble_marshal_VOID__UINT,
                   G_TYPE_NONE, 1, G_TYPE_UINT);
+
+  signals[CONNECTION_ERROR] =
+    g_signal_new ("connection-error",
+                  G_OBJECT_CLASS_TYPE (gabble_bytestream_socks5_class),
+                  G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+                  0,
+                  NULL, NULL,
+                  gabble_marshal_VOID__VOID,
+                  G_TYPE_NONE, 0);
 }
 
 static void
@@ -515,14 +537,21 @@ socks5_error (GabbleBytestreamSocks5 *self)
           DEBUG ("connection to streamhost failed, trying the next one");
 
           socks5_connect (self);
+
+          g_object_unref (self);
           return;
         }
 
       /* ... but there are no more streamhosts */
       DEBUG ("no more streamhosts to try");
-      _gabble_connection_send_iq_error (priv->conn,
-          priv->msg_for_acknowledge_connection, XMPP_ERROR_ITEM_NOT_FOUND,
-          "impossible to connect to any streamhost");
+      g_signal_emit (self, signals[CONNECTION_ERROR], 0);
+
+      if (priv->close_on_connection_error)
+        {
+          _gabble_connection_send_iq_error (priv->conn,
+              priv->msg_for_acknowledge_connection, XMPP_ERROR_ITEM_NOT_FOUND,
+              "impossible to connect to any streamhost");
+        }
 
       lm_message_unref (priv->msg_for_acknowledge_connection);
       priv->msg_for_acknowledge_connection = NULL;
@@ -531,8 +560,6 @@ socks5_error (GabbleBytestreamSocks5 *self)
   DEBUG ("error, closing the connection\n");
 
   gabble_bytestream_socks5_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
-
-  return;
 }
 
 static gboolean 
@@ -1097,25 +1124,28 @@ gabble_bytestream_socks5_close (GabbleBytestreamIface *iface,
     }
   else
     {
-      LmMessage *msg;
-
       DEBUG ("send Socks5 close stanza");
 
       socks5_close_channel (self);
 
-      msg = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
-          '@', "type", "set",
-          '(', "close", "",
-            '@', "xmlns", NS_BYTESTREAMS,
-            '@', "sid", priv->stream_id,
-          ')', NULL);
+      if (priv->close_on_connection_error)
+        {
+          LmMessage *msg;
 
-      /* We don't really care about the answer as the bytestream
-       * is closed anyway. */
-      _gabble_connection_send_with_reply (priv->conn, msg,
-          NULL, NULL, NULL, NULL);
+          msg = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
+              '@', "type", "set",
+              '(', "close", "",
+                '@', "xmlns", NS_BYTESTREAMS,
+                '@', "sid", priv->stream_id,
+              ')', NULL);
 
-      lm_message_unref (msg);
+          /* We don't really care about the answer as the bytestream
+           * is closed anyway. */
+          _gabble_connection_send_with_reply (priv->conn, msg,
+              NULL, NULL, NULL, NULL);
+
+          lm_message_unref (msg);
+        }
 
       g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
     }
@@ -1139,6 +1169,7 @@ socks5_init_reply_cb (GabbleConnection *conn,
   else
     {
       DEBUG ("error during Socks5 initiation");
+      g_signal_emit (self, signals[CONNECTION_ERROR], 0);
       g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
     }
 
diff --git a/src/tube-stream.c b/src/tube-stream.c
index e5ee71f..80fa662 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -269,6 +269,14 @@ extra_bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
       guint source_id;
       DEBUG ("extra bytestream open");
 
+      /* The bytestream can go back from open to initiating in case of a
+       * multiple bytestream */
+      if (g_hash_table_lookup (priv->io_channel_to_watcher_source_id,
+            channel))
+        {
+          return;
+        }
+
       g_signal_connect (bytestream, "data-received",
           G_CALLBACK (data_received_cb), self);
 
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 8874eac..86e5164 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -40,6 +40,7 @@ TWISTED_TESTS = \
 	tubes/test-muc-accept-stream-tube-ibb.py \
 	tubes/test-muc-offer-dbus-tube.py \
 	tubes/test-muc-offer-stream-tube-ibb.py \
+	tubes/test-si-fallback.py \
 	tubes/test-si-ibb-tubes.py \
 	tubes/test-si-socks5-tubes.py \
 	tubes/ensure-si-tube.py \
diff --git a/tests/twisted/tubes/test-si-fallback.py b/tests/twisted/tubes/test-si-fallback.py
new file mode 100644
index 0000000..b15160e
--- /dev/null
+++ b/tests/twisted/tubes/test-si-fallback.py
@@ -0,0 +1,332 @@
+"""Test stream initiation fallback."""
+
+import base64
+import errno
+import os
+
+import dbus
+from dbus.connection import Connection
+from dbus.lowlevel import SignalMessage
+
+from servicetest import call_async, EventPattern, tp_name_prefix, watch_tube_signals, EventProtocolClientFactory
+from gabbletest import exec_test, acknowledge_iq
+
+from twisted.words.xish import domish, xpath
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor
+from twisted.words.protocols.jabber.client import IQ
+
+from gabbleconfig import HAVE_DBUS_TUBES
+
+NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
+NS_SI = 'http://jabber.org/protocol/si'
+NS_FEATURE_NEG = 'http://jabber.org/protocol/feature-neg'
+NS_IBB = 'http://jabber.org/protocol/ibb'
+NS_X_DATA = 'jabber:x:data'
+NS_BYTESTREAMS = 'http://jabber.org/protocol/bytestreams'
+
+class Echo(Protocol):
+    def dataReceived(self, data):
+        self.transport.write(data)
+
+def set_up_echo():
+    factory = Factory()
+    factory.protocol = Echo
+    try:
+        os.remove(os.getcwd() + '/stream')
+    except OSError, e:
+        if e.errno != errno.ENOENT:
+            raise
+    reactor.listenUNIX(os.getcwd() + '/stream', factory)
+
+def test(q, bus, conn, stream):
+    set_up_echo()
+
+    conn.Connect()
+
+    _, vcard_event, roster_event = q.expect_many(
+        EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+        EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+            query_name='vCard'),
+        EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+    acknowledge_iq(stream, vcard_event.stanza)
+
+    roster = roster_event.stanza
+    roster['type'] = 'result'
+    item = roster_event.query.addElement('item')
+    item['jid'] = 'bob at localhost'
+    item['subscription'] = 'both'
+    stream.send(roster)
+
+    presence = domish.Element(('jabber:client', 'presence'))
+    presence['from'] = 'bob at localhost/Bob'
+    presence['to'] = 'test at localhost/Resource'
+    c = presence.addElement('c')
+    c['xmlns'] = 'http://jabber.org/protocol/caps'
+    c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+    c['ver'] = '1.2.3'
+    stream.send(presence)
+
+    event = q.expect('stream-iq', iq_type='get',
+        query_ns='http://jabber.org/protocol/disco#info',
+        to='bob at localhost/Bob')
+    result = event.stanza
+    result['type'] = 'result'
+    assert event.query['node'] == \
+        'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+    feature = event.query.addElement('feature')
+    feature['var'] = NS_TUBES
+    stream.send(result)
+
+    requestotron = dbus.Interface(conn,
+            'org.freedesktop.Telepathy.Connection.Interface.Requests')
+    bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
+
+    # Offer a tube
+    call_async(q, requestotron, 'CreateChannel',
+            {'org.freedesktop.Telepathy.Channel.ChannelType':
+                'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
+             'org.freedesktop.Telepathy.Channel.TargetHandleType':
+                1,
+             'org.freedesktop.Telepathy.Channel.TargetHandle':
+                bob_handle,
+             'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT.Service':
+                'echo',
+             'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT.Parameters':
+                dbus.Dictionary({'foo': 'bar'}, signature='sv'),
+            })
+    ret, _, _ = q.expect_many(
+        EventPattern('dbus-return', method='CreateChannel'),
+        EventPattern('dbus-signal', signal='NewChannel'),
+        EventPattern('dbus-signal', signal='NewChannels'),
+        )
+
+    chan_path = ret.value[0]
+    channels = filter(lambda x:
+      x[1] == "org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT" and
+      x[0] == chan_path,
+      conn.ListChannels())
+    tube_chan = bus.get_object(conn.bus_name, channels[0][0])
+    tube_iface = dbus.Interface(tube_chan,
+        tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+
+    path = os.getcwd() + '/stream'
+    call_async(q, tube_iface, 'OfferStreamTube',
+        0, dbus.ByteArray(path), 0, "")
+
+    event = q.expect('stream-message')
+    message = event.stanza
+    tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % NS_TUBES,
+        message)
+    assert tube_nodes is not None
+    assert len(tube_nodes) == 1
+    tube = tube_nodes[0]
+
+    assert tube['service'] == 'echo'
+    assert tube['type'] == 'stream'
+    assert not tube.hasAttribute('initiator')
+    stream_tube_id = long(tube['id'])
+
+    # The CM is the server, so fake a client wanting to talk to it
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    si = iq.addElement((NS_SI, 'si'))
+    si['id'] = 'alpha'
+    si['profile'] = NS_TUBES
+    feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+    x = feature.addElement((NS_X_DATA, 'x'))
+    x['type'] = 'form'
+    field = x.addElement((None, 'field'))
+    field['var'] = 'stream-method'
+    field['type'] = 'list-multiple'
+    option = field.addElement((None, 'option'))
+    value = option.addElement((None, 'value'))
+    value.addContent(NS_BYTESTREAMS)
+    option = field.addElement((None, 'option'))
+    value = option.addElement((None, 'value'))
+    value.addContent(NS_IBB)
+
+    stream_node = si.addElement((NS_TUBES, 'stream'))
+    stream_node['tube'] = str(stream_tube_id)
+    stream.send(iq)
+
+    si_reply_event, _ = q.expect_many(
+            EventPattern('stream-iq', iq_type='result'),
+            EventPattern('dbus-signal', signal='TubeChannelStateChanged',
+                args=[2])) # 2 == OPEN
+    iq = si_reply_event.stanza
+    si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+        iq)[0]
+    value = xpath.queryForNodes('/si/feature/x/field/value', si)
+    assert len(value) == 1
+    proto = value[0]
+    assert str(proto) == NS_BYTESTREAMS
+    tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+    assert len(tube) == 1
+
+    q.expect('dbus-signal', signal='StreamTubeNewConnection',
+        args=[bob_handle])
+
+    # Send the non-working streamhost
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    query = iq.addElement((NS_BYTESTREAMS, 'query'))
+    query['sid'] = 'alpha'
+    query['mode'] = 'tcp'
+    streamhost = query.addElement('streamhost')
+    streamhost['jid'] = 'bob at localhost/Bob'
+    streamhost['host'] = 'invalid.invalid'
+    streamhost['port'] = '1234'
+    stream.send(iq)
+
+    event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+    iq = event.stanza
+    open = xpath.queryForNodes('/iq/open', iq)[0]
+    assert open.uri == NS_IBB
+    assert open['sid'] == 'alpha'
+
+    result = IQ(stream, 'result')
+    result['id'] = iq['id']
+    result['from'] = iq['to']
+    result['to'] = 'test at localhost/Resource'
+
+    stream.send(result)
+
+    # have the fake client send us some data
+    message = domish.Element(('jabber:client', 'message'))
+    message['to'] = 'test at localhost/Resource'
+    message['from'] = 'bob at localhost/Bob'
+    data_node = message.addElement((NS_IBB, 'data'))
+    data_node['sid'] = 'alpha'
+    data_node['seq'] = '0'
+    data_node.addContent(base64.b64encode('hello world'))
+    stream.send(message)
+
+    event = q.expect('stream-message', to='bob at localhost/Bob')
+    message = event.stanza
+
+    data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % NS_IBB,
+        message)
+    assert data_nodes is not None
+    assert len(data_nodes) == 1
+    ibb_data = data_nodes[0]
+    assert ibb_data['sid'] == 'alpha'
+    binary = base64.b64decode(str(ibb_data))
+    assert binary == 'hello world'
+
+
+    # Accepting a tube
+    message = domish.Element(('jabber:client', 'message'))
+    message['to'] = 'test at localhost/Resource'
+    message['from'] = 'bob at localhost/Bob'
+    message['id'] = 'msg-id'
+    tube = message.addElement((NS_TUBES, 'tube'))
+    tube['type'] = 'stream'
+    tube['id'] = '42'
+    tube['service'] = 'foo-service'
+    parameters = tube.addElement((None, 'parameters'))
+    parameter = parameters.addElement((None, 'parameter'))
+    parameter['type'] = 'str'
+    parameter['name'] = 'foo'
+    parameter.addContent('bar')
+
+    stream.send (message)
+
+    event = q.expect('dbus-signal', signal='NewTube')
+    id = event.args[0]
+    initiator = event.args[1]
+    type = event.args[2]
+    service = event.args[3]
+    parameters = event.args[4]
+    state = event.args[5]
+
+    assert id == 42
+    initiator_jid = conn.InspectHandles(1, [initiator])[0]
+    assert initiator_jid == 'bob at localhost'
+    assert type == 1 # Stream tube
+    assert service == 'foo-service'
+    assert parameters == {'foo': 'bar'}
+    assert state == 0 # local pending
+
+    # accept the tube
+    channels = filter(lambda x:
+      x[1] == "org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT" and
+      '42' in x[0],
+      conn.ListChannels())
+    assert len(channels) == 1
+
+    tube_chan = bus.get_object(conn.bus_name, channels[0][0])
+    tube_iface = dbus.Interface(tube_chan,
+        tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+
+    call_async(q, tube_iface, 'AcceptStreamTube', 0, 0, '')
+
+    event = q.expect('dbus-return', method='AcceptStreamTube')
+    path2 = event.value[0]
+    path2 = ''.join([chr(c) for c in path2])
+
+    factory = EventProtocolClientFactory(q)
+    reactor.connectUNIX(path2, factory)
+
+    event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+    iq = event.stanza
+    si_nodes = xpath.queryForNodes('/iq/si', iq)
+    assert si_nodes is not None
+    assert len(si_nodes) == 1
+    si = si_nodes[0]
+    assert si['profile'] == NS_TUBES
+    dbus_stream_id = si['id']
+
+    feature = xpath.queryForNodes('/si/feature', si)[0]
+    x = xpath.queryForNodes('/feature/x', feature)[0]
+    assert x['type'] == 'form'
+    field = xpath.queryForNodes('/x/field', x)[0]
+    assert field['var'] == 'stream-method'
+    assert field['type'] == 'list-multiple'
+    value = xpath.queryForNodes('/field/option/value', field)[0]
+    assert str(value) == NS_BYTESTREAMS
+    value = xpath.queryForNodes('/field/option/value', field)[1]
+    assert str(value) == NS_IBB
+
+    result = IQ(stream, 'result')
+    result['id'] = iq['id']
+    result['from'] = iq['to']
+    result['to'] = 'test at localhost/Resource'
+    res_si = result.addElement((NS_SI, 'si'))
+    res_feature = res_si.addElement((NS_FEATURE_NEG, 'feature'))
+    res_x = res_feature.addElement((NS_X_DATA, 'x'))
+    res_x['type'] = 'submit'
+    res_field = res_x.addElement((None, 'field'))
+    res_field['var'] = 'stream-method'
+    res_value = res_field.addElement((None, 'value'))
+    res_value.addContent(NS_BYTESTREAMS)
+    res_value = res_field.addElement((None, 'value'))
+    res_value.addContent(NS_IBB)
+
+    stream.send(result)
+
+    event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+    iq = event.stanza
+    query = xpath.queryForNodes('/iq/query', iq)[0]
+    assert query.uri == NS_BYTESTREAMS
+    sid = query['sid']
+    streamhost = xpath.queryForNodes('/iq/query/streamhost', iq)[0]
+    assert streamhost
+
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    open = iq.addElement((NS_IBB, 'open'))
+    open['sid'] = sid
+    open['block-size'] = '4096'
+    stream.send(iq)
+
+    q.expect('stream-iq', iq_type='result')
+
+    return
+
+if __name__ == '__main__':
+    exec_test(test)
-- 
1.5.6.5




More information about the Telepathy-commits mailing list