[Telepathy-commits] [telepathy-gabble/master] Implement SI fallback
Marco Barisione
marco at barisione.org
Tue Jan 6 08:41:30 PST 2009
---
src/Makefile.am | 2 +
src/bytestream-factory.c | 173 +++++++--
src/bytestream-factory.h | 6 +
src/bytestream-ibb.c | 23 ++
src/bytestream-iface.c | 8 +
src/bytestream-multiple.c | 604 +++++++++++++++++++++++++++++++
src/bytestream-multiple.h | 74 ++++
src/bytestream-socks5.c | 67 +++-
src/tube-stream.c | 8 +
tests/twisted/Makefile.am | 1 +
tests/twisted/tubes/test-si-fallback.py | 332 +++++++++++++++++
11 files changed, 1241 insertions(+), 57 deletions(-)
create mode 100644 src/bytestream-multiple.c
create mode 100644 src/bytestream-multiple.h
create mode 100644 tests/twisted/tubes/test-si-fallback.py
diff --git a/src/Makefile.am b/src/Makefile.am
index 8fef055..501dacc 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -28,6 +28,8 @@ libgabble_convenience_la_SOURCES = \
bytestream-iface.c \
bytestream-muc.h \
bytestream-muc.c \
+ bytestream-multiple.h \
+ bytestream-multiple.c \
bytestream-socks5.h \
bytestream-socks5.c \
capabilities.h \
diff --git a/src/bytestream-factory.c b/src/bytestream-factory.c
index 65fcfd0..1713d1a 100644
--- a/src/bytestream-factory.c
+++ b/src/bytestream-factory.c
@@ -33,6 +33,7 @@
#include "bytestream-ibb.h"
#include "bytestream-iface.h"
#include "bytestream-muc.h"
+#include "bytestream-multiple.h"
#include "bytestream-socks5.h"
#include "connection.h"
#include "debug.h"
@@ -124,6 +125,11 @@ struct _GabbleBytestreamFactoryPrivate
* BytestreamIdentifier -> GabbleBytestreamMuc */
GHashTable *muc_bytestreams;
+ /* SI-initiated bytestreams - real data sent through another bytestream.
+ *
+ * BytestreamIdentifier -> GabbleBytestreamMultiple */
+ GHashTable *multiple_bytestreams;
+
gboolean dispose_has_run;
};
@@ -161,6 +167,9 @@ gabble_bytestream_factory_init (GabbleBytestreamFactory *self)
priv->socks5_bytestreams = g_hash_table_new_full (bytestream_id_hash,
bytestream_id_equal, bytestream_id_free, g_object_unref);
+
+ priv->multiple_bytestreams = g_hash_table_new_full (bytestream_id_hash,
+ bytestream_id_equal, bytestream_id_free, g_object_unref);
}
static GObject *
@@ -239,6 +248,9 @@ gabble_bytestream_factory_dispose (GObject *object)
g_hash_table_destroy (priv->socks5_bytestreams);
priv->socks5_bytestreams = NULL;
+ g_hash_table_destroy (priv->multiple_bytestreams);
+ priv->multiple_bytestreams = NULL;
+
if (G_OBJECT_CLASS (gabble_bytestream_factory_parent_class)->dispose)
G_OBJECT_CLASS (gabble_bytestream_factory_parent_class)->dispose (object);
}
@@ -319,7 +331,7 @@ remove_bytestream (GabbleBytestreamFactory *self,
(self);
BytestreamIdentifier bsid = { NULL, NULL };
guint handle_type;
- GHashTable *table;
+ GHashTable *table = NULL;
g_object_get (bytestream,
"stream-id", &(bsid.stream),
@@ -337,6 +349,8 @@ remove_bytestream (GabbleBytestreamFactory *self,
table = priv->ibb_bytestreams;
else if (GABBLE_IS_BYTESTREAM_SOCKS5 (bytestream))
table = priv->socks5_bytestreams;
+ else if (GABBLE_IS_BYTESTREAM_MULTIPLE (bytestream))
+ table = priv->multiple_bytestreams;
}
if (table == NULL)
@@ -366,10 +380,12 @@ streaminit_parse_request (LmMessage *message,
const gchar **stream_id,
const gchar **stream_init_id,
const gchar **mime_type,
- GSList **stream_methods)
+ GSList **stream_methods,
+ gboolean *use_fallback)
{
LmMessageNode *iq = message->node;
LmMessageNode *feature, *x, *field, *stream_method;
+ const gchar *list_type;
*stream_init_id = lm_message_node_get_attribute (iq, "id");
@@ -423,8 +439,16 @@ streaminit_parse_request (LmMessage *message,
/* some future field, ignore it */
continue;
- if (tp_strdiff (lm_message_node_get_attribute (field, "type"),
- "list-single"))
+ list_type = lm_message_node_get_attribute (field, "type");
+ if (!tp_strdiff (list_type, "list-single"))
+ {
+ *use_fallback = FALSE;
+ }
+ else if (!tp_strdiff (list_type, "list-multiple"))
+ {
+ *use_fallback = TRUE;
+ }
+ else
{
NODE_DEBUG (message->node, "SI request's stream-method field was "
"not of type list-single");
@@ -486,6 +510,7 @@ gabble_bytestream_factory_make_stream_init_iq (const gchar *full_jid,
const gchar *stream_id,
const gchar *profile)
{
+ /* FIXME list-multiple is not supported by old versions of gabble */
return lm_message_build (full_jid, LM_MESSAGE_TYPE_IQ,
'@', "type", "set",
'(', "si", "",
@@ -499,7 +524,7 @@ gabble_bytestream_factory_make_stream_init_iq (const gchar *full_jid,
'@', "type", "form",
'(', "field", "",
'@', "var", "stream-method",
- '@', "type", "list-single",
+ '@', "type", "list-multiple",
'(', "option", "",
'(', "value", NS_BYTESTREAMS,
')',
@@ -538,6 +563,8 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
GSList *l;
const gchar *profile, *from, *stream_id, *stream_init_id, *mime_type;
GSList *stream_methods = NULL;
+ gboolean use_fallback;
+ GabbleBytestreamMultiple *multiple;
gchar *peer_resource = NULL;
if (lm_message_get_sub_type (msg) != LM_MESSAGE_SUB_TYPE_SET)
@@ -551,7 +578,7 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
* it or send an error reply */
if (!streaminit_parse_request (msg, si, &profile, &from, &stream_id,
- &stream_init_id, &mime_type, &stream_methods))
+ &stream_init_id, &mime_type, &stream_methods, &use_fallback))
{
_gabble_connection_send_iq_error (priv->conn, msg,
XMPP_ERROR_BAD_REQUEST, "failed to parse SI request");
@@ -575,6 +602,13 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
gabble_decode_jid (from, NULL, NULL, &peer_resource);
}
+ if (use_fallback)
+ multiple = gabble_bytestream_factory_create_multiple (self, peer_handle,
+ stream_id, stream_init_id, peer_resource,
+ GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
+ else
+ multiple = NULL;
+
/* check stream method */
for (l = stream_methods; l != NULL; l = l->next)
{
@@ -586,7 +620,6 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
gabble_bytestream_factory_create_ibb (self, peer_handle,
stream_id, stream_init_id, peer_resource,
GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
- break;
}
else if (!tp_strdiff (l->data, NS_BYTESTREAMS))
{
@@ -594,10 +627,22 @@ bytestream_factory_iq_si_cb (LmMessageHandler *handler,
gabble_bytestream_factory_create_socks5 (self, peer_handle,
stream_id, stream_init_id, peer_resource,
GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
- break;
}
+ else
+ {
+ continue;
+ }
+
+ if (multiple != NULL)
+ gabble_bytestream_multiple_add_bytestream (multiple, bytestream);
+ else
+ break;
}
+ /* FIXME check if there are bytestream methods in the multiple one */
+ if (multiple != NULL)
+ bytestream = GABBLE_BYTESTREAM_IFACE (multiple);
+
if (bytestream == NULL)
{
DEBUG ("SI request doesn't contain any supported stream methods.");
@@ -1249,6 +1294,35 @@ gabble_bytestream_factory_create_socks5 (GabbleBytestreamFactory *self,
return socks5;
}
+GabbleBytestreamMultiple *
+gabble_bytestream_factory_create_multiple (GabbleBytestreamFactory *self,
+ TpHandle peer_handle,
+ const gchar *stream_id,
+ const gchar *stream_init_id,
+ const gchar *peer_resource,
+ GabbleBytestreamState state)
+{
+ GabbleBytestreamFactoryPrivate *priv;
+ GabbleBytestreamMultiple *multiple;
+
+ g_return_val_if_fail (GABBLE_IS_BYTESTREAM_FACTORY (self), NULL);
+ priv = GABBLE_BYTESTREAM_FACTORY_GET_PRIVATE (self);
+
+ multiple = g_object_new (GABBLE_TYPE_BYTESTREAM_MULTIPLE,
+ "connection", priv->conn,
+ "peer-handle", peer_handle,
+ "stream-id", stream_id,
+ "state", state,
+ "stream-init-id", stream_init_id,
+ "peer-resource", peer_resource,
+ NULL);
+
+ g_signal_connect (multiple, "state-changed",
+ G_CALLBACK (bytestream_state_changed_cb), self);
+
+ return multiple;
+}
+
struct _streaminit_reply_cb_data
{
gchar *stream_id;
@@ -1268,6 +1342,7 @@ streaminit_reply_cb (GabbleConnection *conn,
struct _streaminit_reply_cb_data *data =
(struct _streaminit_reply_cb_data*) user_data;
GabbleBytestreamIface *bytestream = NULL;
+ GSList *bytestream_list = NULL;
gchar *peer_resource = NULL;
LmMessageNode *si, *feature, *x, *field, *value;
const gchar *from, *stream_method;
@@ -1332,45 +1407,64 @@ streaminit_reply_cb (GabbleConnection *conn,
/* some future field, ignore it */
continue;
- value = lm_message_node_get_child (field, "value");
- if (value == NULL)
+ for (value = field->children; value; value = value->next)
{
- NODE_DEBUG (reply_msg->node, "SI reply's stream-method field "
- "doesn't contain stream-method value");
- goto END;
+ stream_method = lm_message_node_get_value (value);
+
+ if (!tp_strdiff (stream_method, NS_IBB))
+ {
+ /* Remote user has accepted the stream */
+ bytestream = GABBLE_BYTESTREAM_IFACE (
+ gabble_bytestream_factory_create_ibb (self, peer_handle,
+ data->stream_id, NULL, peer_resource,
+ GABBLE_BYTESTREAM_STATE_INITIATING));
+ }
+ else if (!tp_strdiff (stream_method, NS_BYTESTREAMS))
+ {
+ /* Remote user has accepted the stream */
+ bytestream = GABBLE_BYTESTREAM_IFACE (
+ gabble_bytestream_factory_create_socks5 (self, peer_handle,
+ data->stream_id, NULL, peer_resource,
+ GABBLE_BYTESTREAM_STATE_INITIATING));
+ }
+ else
+ {
+ DEBUG ("Remote user chose an unsupported stream method");
+ goto END;
+ }
+
+ bytestream_list = g_slist_prepend (bytestream_list, bytestream);
}
- stream_method = lm_message_node_get_value (value);
+ break;
+ }
- if (!tp_strdiff (stream_method, NS_IBB))
- {
- /* Remote user has accepted the stream */
- bytestream = GABBLE_BYTESTREAM_IFACE (
- gabble_bytestream_factory_create_ibb (self, peer_handle,
- data->stream_id, NULL, peer_resource,
- GABBLE_BYTESTREAM_STATE_INITIATING));
- }
- else if (!tp_strdiff (stream_method, NS_BYTESTREAMS))
- {
- /* Remote user has accepted the stream */
- bytestream = GABBLE_BYTESTREAM_IFACE (
- gabble_bytestream_factory_create_socks5 (self, peer_handle,
- data->stream_id, NULL, peer_resource,
- GABBLE_BYTESTREAM_STATE_INITIATING));
- }
- else
+ if (bytestream_list == NULL)
+ goto END;
+
+ /* Create a multiple bytestream if needed */
+ if (g_slist_length (bytestream_list) > 1)
+ {
+ GSList *l;
+
+ bytestream = GABBLE_BYTESTREAM_IFACE (
+ gabble_bytestream_factory_create_multiple (self, peer_handle,
+ data->stream_id, NULL, peer_resource,
+ GABBLE_BYTESTREAM_STATE_INITIATING));
+
+ l = bytestream_list = g_slist_reverse (bytestream_list);
+
+ while (l != NULL)
{
- DEBUG ("Remote user chose an unsupported stream method");
- goto END;
- }
+ gabble_bytestream_multiple_add_bytestream (
+ GABBLE_BYTESTREAM_MULTIPLE (bytestream), l->data);
- /* no need to parse the rest of the fields, we've found the one we
- * wanted */
- break;
+ l = g_slist_next (l);
+ }
}
- if (bytestream == NULL)
- goto END;
+ /* If there is only one bytestream than we already have
+ bytestream == bytestream_list->data */
DEBUG ("stream %s accepted", data->stream_id);
@@ -1402,6 +1496,7 @@ END:
g_free (data->stream_id);
g_slice_free (struct _streaminit_reply_cb_data, data);
+ g_slist_free (bytestream_list);
return LM_HANDLER_RESULT_REMOVE_MESSAGE;
}
diff --git a/src/bytestream-factory.h b/src/bytestream-factory.h
index 1fe01bf..4523ef4 100644
--- a/src/bytestream-factory.h
+++ b/src/bytestream-factory.h
@@ -28,6 +28,7 @@
#include "bytestream-iface.h"
#include "bytestream-ibb.h"
#include "bytestream-muc.h"
+#include "bytestream-multiple.h"
#include "bytestream-socks5.h"
#include "connection.h"
@@ -86,6 +87,11 @@ GabbleBytestreamSocks5 *gabble_bytestream_factory_create_socks5 (
const gchar *stream_init_id, const gchar *peer_resource,
GabbleBytestreamState state);
+GabbleBytestreamMultiple *gabble_bytestream_factory_create_multiple (
+ GabbleBytestreamFactory *self, TpHandle peer_handle,
+ const gchar *stream_id, const gchar *stream_init_id,
+ const gchar *peer_resource, GabbleBytestreamState state);
+
LmMessage *gabble_bytestream_factory_make_stream_init_iq (
const gchar *full_jid, const gchar *stream_id, const gchar *profile);
diff --git a/src/bytestream-ibb.c b/src/bytestream-ibb.c
index 7ae1b48..0cbbcfd 100644
--- a/src/bytestream-ibb.c
+++ b/src/bytestream-ibb.c
@@ -52,6 +52,7 @@ enum
{
DATA_RECEIVED,
STATE_CHANGED,
+ CONNECTION_ERROR,
LAST_SIGNAL
};
@@ -69,6 +70,7 @@ enum
PROP_PEER_RESOURCE,
PROP_STATE,
PROP_PROTOCOL,
+ PROP_CLOSE_ON_CONNECTION_ERROR,
PROP_BLOCK_SIZE,
LAST_PROPERTY
};
@@ -82,6 +84,7 @@ struct _GabbleBytestreamIBBPrivate
gchar *peer_resource;
GabbleBytestreamState state;
gchar *peer_jid;
+ gboolean close_on_connection_error;
guint block_size;
guint16 seq;
@@ -98,6 +101,8 @@ gabble_bytestream_ibb_init (GabbleBytestreamIBB *self)
GABBLE_TYPE_BYTESTREAM_IBB, GabbleBytestreamIBBPrivate);
self->priv = priv;
+
+ priv->close_on_connection_error = TRUE;
}
static void
@@ -175,6 +180,9 @@ gabble_bytestream_ibb_get_property (GObject *object,
case PROP_PROTOCOL:
g_value_set_string (value, NS_IBB);
break;
+ case PROP_CLOSE_ON_CONNECTION_ERROR:
+ g_value_set_boolean (value, priv->close_on_connection_error);
+ break;
case PROP_BLOCK_SIZE:
g_value_set_uint (value, priv->block_size);
break;
@@ -220,6 +228,9 @@ gabble_bytestream_ibb_set_property (GObject *object,
g_signal_emit (object, signals[STATE_CHANGED], 0, priv->state);
}
break;
+ case PROP_CLOSE_ON_CONNECTION_ERROR:
+ priv->close_on_connection_error = g_value_get_boolean (value);
+ break;
case PROP_BLOCK_SIZE:
priv->block_size = g_value_get_uint (value);
break;
@@ -294,6 +305,9 @@ gabble_bytestream_ibb_class_init (
"state");
g_object_class_override_property (object_class, PROP_PROTOCOL,
"protocol");
+ g_object_class_override_property (object_class,
+ PROP_CLOSE_ON_CONNECTION_ERROR, "close-on-connection-error");
+
param_spec = g_param_spec_string (
"peer-resource",
@@ -339,6 +353,15 @@ gabble_bytestream_ibb_class_init (
NULL, NULL,
gabble_marshal_VOID__UINT,
G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ signals[CONNECTION_ERROR] =
+ g_signal_new ("connection-error",
+ G_OBJECT_CLASS_TYPE (gabble_bytestream_ibb_class),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+ 0,
+ NULL, NULL,
+ gabble_marshal_VOID__VOID,
+ G_TYPE_NONE, 0);
}
/*
diff --git a/src/bytestream-iface.c b/src/bytestream-iface.c
index 72e0381..c3c8aac 100644
--- a/src/bytestream-iface.c
+++ b/src/bytestream-iface.c
@@ -132,6 +132,14 @@ gabble_bytestream_iface_base_init (gpointer klass)
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
g_object_interface_install_property (klass, param_spec);
+ param_spec = g_param_spec_boolean (
+ "close-on-connection-error",
+ "Close on connection error",
+ "Wether to send a close stanza if there was a connection error",
+ TRUE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+ g_object_interface_install_property (klass, param_spec);
+
initialized = TRUE;
}
}
diff --git a/src/bytestream-multiple.c b/src/bytestream-multiple.c
new file mode 100644
index 0000000..844af6f
--- /dev/null
+++ b/src/bytestream-multiple.c
@@ -0,0 +1,604 @@
+/*
+ * bytestream-multiple.c - Source for GabbleBytestreamMultiple
+ * Copyright (C) 2007-2008 Collabora Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "config.h"
+#include "bytestream-multiple.h"
+
+#include <dbus/dbus-glib.h>
+#include <dbus/dbus-glib-lowlevel.h>
+#include <loudmouth/loudmouth.h>
+#include <telepathy-glib/interfaces.h>
+
+#define DEBUG_FLAG GABBLE_DEBUG_BYTESTREAM
+
+#include "base64.h"
+#include "bytestream-factory.h"
+#include "bytestream-iface.h"
+#include "connection.h"
+#include "debug.h"
+#include "disco.h"
+#include "gabble-signals-marshal.h"
+#include "namespaces.h"
+#include "util.h"
+
+static void
+bytestream_iface_init (gpointer g_iface, gpointer iface_data);
+
+G_DEFINE_TYPE_WITH_CODE (GabbleBytestreamMultiple, gabble_bytestream_multiple,
+ G_TYPE_OBJECT,
+ G_IMPLEMENT_INTERFACE (GABBLE_TYPE_BYTESTREAM_IFACE,
+ bytestream_iface_init));
+
+/* signals */
+enum
+{
+ DATA_RECEIVED,
+ STATE_CHANGED,
+ CONNECTION_ERROR,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = {0};
+
+/* properties */
+enum
+{
+ PROP_CONNECTION = 1,
+ PROP_PEER_HANDLE,
+ PROP_PEER_HANDLE_TYPE,
+ PROP_STREAM_ID,
+ PROP_STREAM_INIT_ID,
+ PROP_PEER_JID,
+ PROP_PEER_RESOURCE,
+ PROP_STATE,
+ PROP_PROTOCOL,
+ PROP_CLOSE_ON_CONNECTION_ERROR,
+ LAST_PROPERTY
+};
+
+struct _GabbleBytestreamMultiplePrivate
+{
+ GabbleConnection *conn;
+ TpHandle peer_handle;
+ gchar *stream_id;
+ gchar *stream_init_id;
+ gchar *peer_resource;
+ GabbleBytestreamState state;
+ gchar *peer_jid;
+ gboolean close_on_connection_error;
+
+ GList *bytestreams;
+
+ gboolean dispose_has_run;
+};
+
+#define GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE(obj) ((obj)->priv)
+
+static void gabble_bytestream_multiple_close (GabbleBytestreamIface *iface,
+ GError *error);
+
+static void
+gabble_bytestream_multiple_init (GabbleBytestreamMultiple *self)
+{
+ GabbleBytestreamMultiplePrivate *priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
+ GABBLE_TYPE_BYTESTREAM_MULTIPLE, GabbleBytestreamMultiplePrivate);
+
+ self->priv = priv;
+
+ priv->close_on_connection_error = TRUE;
+}
+
+static void
+gabble_bytestream_multiple_dispose (GObject *object)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+ TpHandleRepoIface *contact_repo = tp_base_connection_get_handles (
+ (TpBaseConnection *) priv->conn, TP_HANDLE_TYPE_CONTACT);
+
+ if (priv->dispose_has_run)
+ return;
+
+ priv->dispose_has_run = TRUE;
+
+ tp_handle_unref (contact_repo, priv->peer_handle);
+
+ if (priv->state != GABBLE_BYTESTREAM_STATE_CLOSED)
+ {
+ gabble_bytestream_iface_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
+ }
+
+ G_OBJECT_CLASS (gabble_bytestream_multiple_parent_class)->dispose (object);
+}
+
+static void
+gabble_bytestream_multiple_finalize (GObject *object)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+ g_free (priv->stream_id);
+ g_free (priv->stream_init_id);
+ g_free (priv->peer_resource);
+ g_free (priv->peer_jid);
+
+ G_OBJECT_CLASS (gabble_bytestream_multiple_parent_class)->finalize (object);
+}
+
+static void
+gabble_bytestream_multiple_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+ switch (property_id)
+ {
+ case PROP_CONNECTION:
+ g_value_set_object (value, priv->conn);
+ break;
+ case PROP_PEER_HANDLE:
+ g_value_set_uint (value, priv->peer_handle);
+ break;
+ case PROP_PEER_HANDLE_TYPE:
+ g_value_set_uint (value, TP_HANDLE_TYPE_CONTACT);
+ break;
+ case PROP_STREAM_ID:
+ g_value_set_string (value, priv->stream_id);
+ break;
+ case PROP_STREAM_INIT_ID:
+ g_value_set_string (value, priv->stream_init_id);
+ break;
+ case PROP_PEER_RESOURCE:
+ g_value_set_string (value, priv->peer_resource);
+ break;
+ case PROP_PEER_JID:
+ g_value_set_string (value, priv->peer_jid);
+ break;
+ case PROP_STATE:
+ g_value_set_uint (value, priv->state);
+ break;
+ case PROP_PROTOCOL:
+ g_value_set_string (value, NS_BYTESTREAMS);
+ break;
+ case PROP_CLOSE_ON_CONNECTION_ERROR:
+ g_value_set_boolean (value, priv->close_on_connection_error);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+gabble_bytestream_multiple_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (object);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+ switch (property_id)
+ {
+ case PROP_CONNECTION:
+ priv->conn = g_value_get_object (value);
+ break;
+ case PROP_PEER_HANDLE:
+ priv->peer_handle = g_value_get_uint (value);
+ break;
+ case PROP_STREAM_ID:
+ g_free (priv->stream_id);
+ priv->stream_id = g_value_dup_string (value);
+ break;
+ case PROP_STREAM_INIT_ID:
+ g_free (priv->stream_init_id);
+ priv->stream_init_id = g_value_dup_string (value);
+ break;
+ case PROP_PEER_RESOURCE:
+ g_free (priv->peer_resource);
+ priv->peer_resource = g_value_dup_string (value);
+ break;
+ case PROP_STATE:
+ if (priv->state != g_value_get_uint (value))
+ {
+ priv->state = g_value_get_uint (value);
+ g_signal_emit (object, signals[STATE_CHANGED], 0, priv->state);
+ }
+ break;
+ case PROP_CLOSE_ON_CONNECTION_ERROR:
+ priv->close_on_connection_error = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static GObject *
+gabble_bytestream_multiple_constructor (GType type,
+ guint n_props,
+ GObjectConstructParam *props)
+{
+ GObject *obj;
+ GabbleBytestreamMultiplePrivate *priv;
+ TpHandleRepoIface *contact_repo;
+ const gchar *jid;
+
+ obj = G_OBJECT_CLASS (gabble_bytestream_multiple_parent_class)->
+ constructor (type, n_props, props);
+
+ priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (GABBLE_BYTESTREAM_MULTIPLE (obj));
+
+ g_assert (priv->conn != NULL);
+ g_assert (priv->peer_handle != 0);
+ g_assert (priv->stream_id != NULL);
+
+ contact_repo = tp_base_connection_get_handles (
+ (TpBaseConnection *) priv->conn, TP_HANDLE_TYPE_CONTACT);
+
+ tp_handle_ref (contact_repo, priv->peer_handle);
+
+ jid = tp_handle_inspect (contact_repo, priv->peer_handle);
+
+ if (priv->peer_resource != NULL)
+ priv->peer_jid = g_strdup_printf ("%s/%s", jid, priv->peer_resource);
+ else
+ priv->peer_jid = g_strdup (jid);
+
+ return obj;
+}
+
+static void
+gabble_bytestream_multiple_class_init (
+ GabbleBytestreamMultipleClass *gabble_bytestream_multiple_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (gabble_bytestream_multiple_class);
+ GParamSpec *param_spec;
+
+ g_type_class_add_private (gabble_bytestream_multiple_class,
+ sizeof (GabbleBytestreamMultiplePrivate));
+
+ object_class->dispose = gabble_bytestream_multiple_dispose;
+ object_class->finalize = gabble_bytestream_multiple_finalize;
+
+ object_class->get_property = gabble_bytestream_multiple_get_property;
+ object_class->set_property = gabble_bytestream_multiple_set_property;
+ object_class->constructor = gabble_bytestream_multiple_constructor;
+
+ g_object_class_override_property (object_class, PROP_CONNECTION,
+ "connection");
+ g_object_class_override_property (object_class, PROP_PEER_HANDLE,
+ "peer-handle");
+ g_object_class_override_property (object_class, PROP_PEER_HANDLE_TYPE,
+ "peer-handle-type");
+ g_object_class_override_property (object_class, PROP_STREAM_ID,
+ "stream-id");
+ g_object_class_override_property (object_class, PROP_PEER_JID,
+ "peer-jid");
+ g_object_class_override_property (object_class, PROP_STATE,
+ "state");
+ g_object_class_override_property (object_class, PROP_PROTOCOL,
+ "protocol");
+ g_object_class_override_property (object_class,
+ PROP_CLOSE_ON_CONNECTION_ERROR, "close-on-connection-error");
+
+ param_spec = g_param_spec_string (
+ "peer-resource",
+ "Peer resource",
+ "the resource used by the remote peer during the SI, if any",
+ NULL,
+ G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+ g_object_class_install_property (object_class, PROP_PEER_RESOURCE,
+ param_spec);
+
+ param_spec = g_param_spec_string (
+ "stream-init-id",
+ "stream init ID",
+ "the iq ID of the SI request, if any",
+ NULL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+ g_object_class_install_property (object_class, PROP_STREAM_INIT_ID,
+ param_spec);
+
+ signals[DATA_RECEIVED] =
+ g_signal_new ("data-received",
+ G_OBJECT_CLASS_TYPE (gabble_bytestream_multiple_class),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+ 0,
+ NULL, NULL,
+ g_cclosure_marshal_VOID__UINT_POINTER,
+ G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_POINTER);
+
+ signals[STATE_CHANGED] =
+ g_signal_new ("state-changed",
+ G_OBJECT_CLASS_TYPE (gabble_bytestream_multiple_class),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+ 0,
+ NULL, NULL,
+ gabble_marshal_VOID__UINT,
+ G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ signals[CONNECTION_ERROR] =
+ g_signal_new ("connection-error",
+ G_OBJECT_CLASS_TYPE (gabble_bytestream_multiple_class),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+ 0,
+ NULL, NULL,
+ gabble_marshal_VOID__VOID,
+ G_TYPE_NONE, 0);
+}
+
+/*
+ * gabble_bytestream_multiple_send
+ *
+ * Implements gabble_bytestream_iface_send on GabbleBytestreamIface
+ */
+static gboolean
+gabble_bytestream_multiple_send (GabbleBytestreamIface *iface,
+ guint len,
+ const gchar *str)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+ GabbleBytestreamIface *bytestream;
+
+ if (priv->state != GABBLE_BYTESTREAM_STATE_OPEN)
+ {
+ DEBUG ("can't send data through a not open bytestream (state: %d)",
+ priv->state);
+ return FALSE;
+ }
+
+ g_assert (priv->bytestreams);
+ g_assert (priv->bytestreams->data);
+
+ bytestream = priv->bytestreams->data;
+ return gabble_bytestream_iface_send (bytestream, len, str);
+}
+
+/*
+ * gabble_bytestream_multiple_accept
+ *
+ * Implements gabble_bytestream_iface_accept on GabbleBytestreamIface
+ */
+static void
+gabble_bytestream_multiple_accept (GabbleBytestreamIface *iface,
+ GabbleBytestreamAugmentSiAcceptReply func,
+ gpointer user_data)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+ LmMessage *msg;
+ LmMessageNode *si;
+
+ if (priv->state != GABBLE_BYTESTREAM_STATE_LOCAL_PENDING)
+ {
+ /* The stream was previoulsy or automatically accepted */
+ return;
+ }
+
+ msg = gabble_bytestream_factory_make_accept_iq (priv->peer_jid,
+ priv->stream_init_id, NS_BYTESTREAMS);
+ si = lm_message_node_get_child_with_namespace (msg->node, "si", NS_SI);
+ g_assert (si != NULL);
+
+ if (func != NULL)
+ {
+ /* let the caller add his profile specific data */
+ func (si, user_data);
+ }
+
+ if (_gabble_connection_send (priv->conn, msg, NULL))
+ {
+ DEBUG ("stream %s with %s is now accepted", priv->stream_id,
+ priv->peer_jid);
+ g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_ACCEPTED, NULL);
+ }
+
+ lm_message_unref (msg);
+}
+
+static void
+gabble_bytestream_multiple_decline (GabbleBytestreamMultiple *self,
+ GError *error)
+{
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+ LmMessage *msg;
+
+ g_return_if_fail (priv->state == GABBLE_BYTESTREAM_STATE_LOCAL_PENDING);
+
+ msg = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
+ '@', "type", "error",
+ '@', "id", priv->stream_init_id,
+ NULL);
+
+ if (error != NULL && error->domain == GABBLE_XMPP_ERROR)
+ {
+ gabble_xmpp_error_to_node (error->code, msg->node, error->message);
+ }
+ else
+ {
+ gabble_xmpp_error_to_node (XMPP_ERROR_FORBIDDEN, msg->node,
+ "Offer Declined");
+ }
+
+ _gabble_connection_send (priv->conn, msg, NULL);
+
+ lm_message_unref (msg);
+
+ g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
+}
+
+/*
+ * gabble_bytestream_multiple_close
+ *
+ * Implements gabble_bytestream_iface_close on GabbleBytestreamIface
+ */
+static void
+gabble_bytestream_multiple_close (GabbleBytestreamIface *iface,
+ GError *error)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+ if (priv->state == GABBLE_BYTESTREAM_STATE_CLOSED)
+ /* bytestream already closed, do nothing */
+ return;
+
+ if (priv->state == GABBLE_BYTESTREAM_STATE_LOCAL_PENDING)
+ {
+ /* Stream was created using SI so we decline the request */
+ gabble_bytestream_multiple_decline (self, error);
+ }
+ else
+ {
+ /* FIXME: send a close stanza */
+ /* FIXME: close the sub-bytesreams */
+
+ g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
+ }
+}
+
+/*
+ * gabble_bytestream_multiple_initiate
+ *
+ * Implements gabble_bytestream_iface_initiate on GabbleBytestreamIface
+ */
+static gboolean
+gabble_bytestream_multiple_initiate (GabbleBytestreamIface *iface)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (iface);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+ GabbleBytestreamIface *bytestream;
+
+ if (priv->state != GABBLE_BYTESTREAM_STATE_INITIATING)
+ {
+ DEBUG ("bytestream is not is the initiating state (state %d)",
+ priv->state);
+ return FALSE;
+ }
+
+ if (priv->bytestreams == NULL)
+ return FALSE;
+
+ /* Initiate the first available bytestream */
+ bytestream = priv->bytestreams->data;
+
+ return gabble_bytestream_iface_initiate (bytestream);
+}
+
+static void
+bytestream_connection_error_cb (GabbleBytestreamIface *failed,
+ gpointer user_data)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (user_data);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+ GabbleBytestreamIface *fallback;
+
+ priv->bytestreams = g_list_remove (priv->bytestreams, failed);
+
+ if (!priv->bytestreams)
+ return;
+
+ /* If we have other methods to try, prevent the failed bytestrem to send a
+ close stanza */
+ g_object_set (failed, "close-on-connection-error", FALSE, NULL);
+
+ g_object_unref (failed);
+
+ fallback = priv->bytestreams->data;
+
+ DEBUG ("Trying alternative streaming method");
+
+ g_object_set (fallback, "state", GABBLE_BYTESTREAM_STATE_INITIATING, NULL);
+ gabble_bytestream_iface_initiate (fallback);
+}
+
+static void
+bytestream_data_received_cb (GabbleBytestreamIface *bytestream,
+ TpHandle sender,
+ GString *str,
+ gpointer user_data)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (user_data);
+
+ /* Just forward the data */
+ g_signal_emit (G_OBJECT (self), signals[DATA_RECEIVED], 0, sender, str);
+}
+
+static void
+bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
+ GabbleBytestreamState state,
+ gpointer user_data)
+{
+ GabbleBytestreamMultiple *self = GABBLE_BYTESTREAM_MULTIPLE (user_data);
+ GabbleBytestreamMultiplePrivate *priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+ if (state == GABBLE_BYTESTREAM_STATE_CLOSED &&
+ g_list_length (priv->bytestreams) <= 1)
+ {
+ return;
+ }
+
+ g_object_set (self, "state", state, NULL);
+}
+
+/*
+ * gabble_bytestream_multiple_add_bytestream
+ *
+ * Add an alternative stream method.
+ */
+void
+gabble_bytestream_multiple_add_bytestream (GabbleBytestreamMultiple *self,
+ GabbleBytestreamIface *bytestream)
+{
+ GabbleBytestreamMultiplePrivate *priv;
+
+ g_return_if_fail (GABBLE_IS_BYTESTREAM_MULTIPLE (self));
+ g_return_if_fail (GABBLE_IS_BYTESTREAM_IFACE (bytestream));
+
+ priv = GABBLE_BYTESTREAM_MULTIPLE_GET_PRIVATE (self);
+
+ DEBUG ("Add bytestream");
+
+ g_object_ref (bytestream);
+ priv->bytestreams = g_list_append (priv->bytestreams, bytestream);
+
+ g_signal_connect (bytestream, "connection-error",
+ G_CALLBACK (bytestream_connection_error_cb), self);
+ g_signal_connect (bytestream, "data-received",
+ G_CALLBACK (bytestream_data_received_cb), self);
+ g_signal_connect (bytestream, "state-changed",
+ G_CALLBACK (bytestream_state_changed_cb), self);
+}
+
+static void
+bytestream_iface_init (gpointer g_iface,
+ gpointer iface_data)
+{
+ GabbleBytestreamIfaceClass *klass = (GabbleBytestreamIfaceClass *) g_iface;
+
+ klass->initiate = gabble_bytestream_multiple_initiate;
+ klass->send = gabble_bytestream_multiple_send;
+ klass->close = gabble_bytestream_multiple_close;
+ klass->accept = gabble_bytestream_multiple_accept;
+}
diff --git a/src/bytestream-multiple.h b/src/bytestream-multiple.h
new file mode 100644
index 0000000..3334a19
--- /dev/null
+++ b/src/bytestream-multiple.h
@@ -0,0 +1,74 @@
+/*
+ * bytestream-socks5.h - Header for GabbleBytestreamMultiple
+ * Copyright (C) 2007-2008 Collabora Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef __GABBLE_BYTESTREAM_MULTIPLE_H__
+#define __GABBLE_BYTESTREAM_MULTIPLE_H__
+
+#include <stdlib.h>
+
+#include <glib-object.h>
+#include <loudmouth/loudmouth.h>
+
+#include <telepathy-glib/base-connection.h>
+
+#include "bytestream-iface.h"
+#include "error.h"
+
+G_BEGIN_DECLS
+
+typedef struct _GabbleBytestreamMultiple GabbleBytestreamMultiple;
+typedef struct _GabbleBytestreamMultipleClass GabbleBytestreamMultipleClass;
+typedef struct _GabbleBytestreamMultiplePrivate GabbleBytestreamMultiplePrivate;
+
+struct _GabbleBytestreamMultipleClass {
+ GObjectClass parent_class;
+};
+
+struct _GabbleBytestreamMultiple {
+ GObject parent;
+
+ GabbleBytestreamMultiplePrivate *priv;
+};
+
+GType gabble_bytestream_multiple_get_type (void);
+
+/* TYPE MACROS */
+#define GABBLE_TYPE_BYTESTREAM_MULTIPLE \
+ (gabble_bytestream_multiple_get_type ())
+#define GABBLE_BYTESTREAM_MULTIPLE(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj), GABBLE_TYPE_BYTESTREAM_MULTIPLE,\
+ GabbleBytestreamMultiple))
+#define GABBLE_BYTESTREAM_MULTIPLE_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass), GABBLE_TYPE_BYTESTREAM_MULTIPLE,\
+ GabbleBytestreamMultipleClass))
+#define GABBLE_IS_BYTESTREAM_MULTIPLE(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj), GABBLE_TYPE_BYTESTREAM_MULTIPLE))
+#define GABBLE_IS_BYTESTREAM_MULTIPLE_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass), GABBLE_TYPE_BYTESTREAM_MULTIPLE))
+#define GABBLE_BYTESTREAM_MULTIPLE_GET_CLASS(obj) \
+ (G_TYPE_INSTANCE_GET_CLASS ((obj), GABBLE_TYPE_BYTESTREAM_MULTIPLE,\
+ GabbleBytestreamMultipleClass))
+
+void
+gabble_bytestream_multiple_add_bytestream (GabbleBytestreamMultiple *self,
+ GabbleBytestreamIface *bytestream);
+
+G_END_DECLS
+
+#endif /* #ifndef __GABBLE_BYTESTREAM_MULTIPLE_H__ */
diff --git a/src/bytestream-socks5.c b/src/bytestream-socks5.c
index f570ccc..ad661a7 100644
--- a/src/bytestream-socks5.c
+++ b/src/bytestream-socks5.c
@@ -67,6 +67,7 @@ enum
{
DATA_RECEIVED,
STATE_CHANGED,
+ CONNECTION_ERROR,
LAST_SIGNAL
};
@@ -84,6 +85,7 @@ enum
PROP_PEER_RESOURCE,
PROP_STATE,
PROP_PROTOCOL,
+ PROP_CLOSE_ON_CONNECTION_ERROR,
LAST_PROPERTY
};
@@ -154,6 +156,7 @@ struct _GabbleBytestreamSocks5Private
gchar *peer_resource;
GabbleBytestreamState bytestream_state;
gchar *peer_jid;
+ gboolean close_on_connection_error;
GSList *streamhosts;
LmMessage *msg_for_acknowledge_connection;
@@ -192,6 +195,8 @@ gabble_bytestream_socks5_init (GabbleBytestreamSocks5 *self)
GABBLE_TYPE_BYTESTREAM_SOCKS5, GabbleBytestreamSocks5Private);
self->priv = priv;
+
+ priv->close_on_connection_error = TRUE;
}
static void
@@ -272,6 +277,9 @@ gabble_bytestream_socks5_get_property (GObject *object,
case PROP_PROTOCOL:
g_value_set_string (value, NS_BYTESTREAMS);
break;
+ case PROP_CLOSE_ON_CONNECTION_ERROR:
+ g_value_set_boolean (value, priv->close_on_connection_error);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -314,6 +322,9 @@ gabble_bytestream_socks5_set_property (GObject *object,
g_signal_emit (object, signals[STATE_CHANGED], 0, priv->bytestream_state);
}
break;
+ case PROP_CLOSE_ON_CONNECTION_ERROR:
+ priv->close_on_connection_error = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -385,6 +396,8 @@ gabble_bytestream_socks5_class_init (
"state");
g_object_class_override_property (object_class, PROP_PROTOCOL,
"protocol");
+ g_object_class_override_property (object_class,
+ PROP_CLOSE_ON_CONNECTION_ERROR, "close-on-connection-error");
param_spec = g_param_spec_string (
"peer-resource",
@@ -421,6 +434,15 @@ gabble_bytestream_socks5_class_init (
NULL, NULL,
gabble_marshal_VOID__UINT,
G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ signals[CONNECTION_ERROR] =
+ g_signal_new ("connection-error",
+ G_OBJECT_CLASS_TYPE (gabble_bytestream_socks5_class),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+ 0,
+ NULL, NULL,
+ gabble_marshal_VOID__VOID,
+ G_TYPE_NONE, 0);
}
static void
@@ -515,14 +537,21 @@ socks5_error (GabbleBytestreamSocks5 *self)
DEBUG ("connection to streamhost failed, trying the next one");
socks5_connect (self);
+
+ g_object_unref (self);
return;
}
/* ... but there are no more streamhosts */
DEBUG ("no more streamhosts to try");
- _gabble_connection_send_iq_error (priv->conn,
- priv->msg_for_acknowledge_connection, XMPP_ERROR_ITEM_NOT_FOUND,
- "impossible to connect to any streamhost");
+ g_signal_emit (self, signals[CONNECTION_ERROR], 0);
+
+ if (priv->close_on_connection_error)
+ {
+ _gabble_connection_send_iq_error (priv->conn,
+ priv->msg_for_acknowledge_connection, XMPP_ERROR_ITEM_NOT_FOUND,
+ "impossible to connect to any streamhost");
+ }
lm_message_unref (priv->msg_for_acknowledge_connection);
priv->msg_for_acknowledge_connection = NULL;
@@ -531,8 +560,6 @@ socks5_error (GabbleBytestreamSocks5 *self)
DEBUG ("error, closing the connection\n");
gabble_bytestream_socks5_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
-
- return;
}
static gboolean
@@ -1097,25 +1124,28 @@ gabble_bytestream_socks5_close (GabbleBytestreamIface *iface,
}
else
{
- LmMessage *msg;
-
DEBUG ("send Socks5 close stanza");
socks5_close_channel (self);
- msg = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
- '@', "type", "set",
- '(', "close", "",
- '@', "xmlns", NS_BYTESTREAMS,
- '@', "sid", priv->stream_id,
- ')', NULL);
+ if (priv->close_on_connection_error)
+ {
+ LmMessage *msg;
- /* We don't really care about the answer as the bytestream
- * is closed anyway. */
- _gabble_connection_send_with_reply (priv->conn, msg,
- NULL, NULL, NULL, NULL);
+ msg = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
+ '@', "type", "set",
+ '(', "close", "",
+ '@', "xmlns", NS_BYTESTREAMS,
+ '@', "sid", priv->stream_id,
+ ')', NULL);
- lm_message_unref (msg);
+ /* We don't really care about the answer as the bytestream
+ * is closed anyway. */
+ _gabble_connection_send_with_reply (priv->conn, msg,
+ NULL, NULL, NULL, NULL);
+
+ lm_message_unref (msg);
+ }
g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
}
@@ -1139,6 +1169,7 @@ socks5_init_reply_cb (GabbleConnection *conn,
else
{
DEBUG ("error during Socks5 initiation");
+ g_signal_emit (self, signals[CONNECTION_ERROR], 0);
g_object_set (self, "state", GABBLE_BYTESTREAM_STATE_CLOSED, NULL);
}
diff --git a/src/tube-stream.c b/src/tube-stream.c
index e5ee71f..80fa662 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -269,6 +269,14 @@ extra_bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
guint source_id;
DEBUG ("extra bytestream open");
+ /* The bytestream can go back from open to initiating in case of a
+ * multiple bytestream */
+ if (g_hash_table_lookup (priv->io_channel_to_watcher_source_id,
+ channel))
+ {
+ return;
+ }
+
g_signal_connect (bytestream, "data-received",
G_CALLBACK (data_received_cb), self);
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 8874eac..86e5164 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -40,6 +40,7 @@ TWISTED_TESTS = \
tubes/test-muc-accept-stream-tube-ibb.py \
tubes/test-muc-offer-dbus-tube.py \
tubes/test-muc-offer-stream-tube-ibb.py \
+ tubes/test-si-fallback.py \
tubes/test-si-ibb-tubes.py \
tubes/test-si-socks5-tubes.py \
tubes/ensure-si-tube.py \
diff --git a/tests/twisted/tubes/test-si-fallback.py b/tests/twisted/tubes/test-si-fallback.py
new file mode 100644
index 0000000..b15160e
--- /dev/null
+++ b/tests/twisted/tubes/test-si-fallback.py
@@ -0,0 +1,332 @@
+"""Test stream initiation fallback."""
+
+import base64
+import errno
+import os
+
+import dbus
+from dbus.connection import Connection
+from dbus.lowlevel import SignalMessage
+
+from servicetest import call_async, EventPattern, tp_name_prefix, watch_tube_signals, EventProtocolClientFactory
+from gabbletest import exec_test, acknowledge_iq
+
+from twisted.words.xish import domish, xpath
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor
+from twisted.words.protocols.jabber.client import IQ
+
+from gabbleconfig import HAVE_DBUS_TUBES
+
+NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
+NS_SI = 'http://jabber.org/protocol/si'
+NS_FEATURE_NEG = 'http://jabber.org/protocol/feature-neg'
+NS_IBB = 'http://jabber.org/protocol/ibb'
+NS_X_DATA = 'jabber:x:data'
+NS_BYTESTREAMS = 'http://jabber.org/protocol/bytestreams'
+
+class Echo(Protocol):
+ def dataReceived(self, data):
+ self.transport.write(data)
+
+def set_up_echo():
+ factory = Factory()
+ factory.protocol = Echo
+ try:
+ os.remove(os.getcwd() + '/stream')
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ reactor.listenUNIX(os.getcwd() + '/stream', factory)
+
+def test(q, bus, conn, stream):
+ set_up_echo()
+
+ conn.Connect()
+
+ _, vcard_event, roster_event = q.expect_many(
+ EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+ EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+ query_name='vCard'),
+ EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+ acknowledge_iq(stream, vcard_event.stanza)
+
+ roster = roster_event.stanza
+ roster['type'] = 'result'
+ item = roster_event.query.addElement('item')
+ item['jid'] = 'bob at localhost'
+ item['subscription'] = 'both'
+ stream.send(roster)
+
+ presence = domish.Element(('jabber:client', 'presence'))
+ presence['from'] = 'bob at localhost/Bob'
+ presence['to'] = 'test at localhost/Resource'
+ c = presence.addElement('c')
+ c['xmlns'] = 'http://jabber.org/protocol/caps'
+ c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+ c['ver'] = '1.2.3'
+ stream.send(presence)
+
+ event = q.expect('stream-iq', iq_type='get',
+ query_ns='http://jabber.org/protocol/disco#info',
+ to='bob at localhost/Bob')
+ result = event.stanza
+ result['type'] = 'result'
+ assert event.query['node'] == \
+ 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+ feature = event.query.addElement('feature')
+ feature['var'] = NS_TUBES
+ stream.send(result)
+
+ requestotron = dbus.Interface(conn,
+ 'org.freedesktop.Telepathy.Connection.Interface.Requests')
+ bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
+
+ # Offer a tube
+ call_async(q, requestotron, 'CreateChannel',
+ {'org.freedesktop.Telepathy.Channel.ChannelType':
+ 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
+ 'org.freedesktop.Telepathy.Channel.TargetHandleType':
+ 1,
+ 'org.freedesktop.Telepathy.Channel.TargetHandle':
+ bob_handle,
+ 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT.Service':
+ 'echo',
+ 'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT.Parameters':
+ dbus.Dictionary({'foo': 'bar'}, signature='sv'),
+ })
+ ret, _, _ = q.expect_many(
+ EventPattern('dbus-return', method='CreateChannel'),
+ EventPattern('dbus-signal', signal='NewChannel'),
+ EventPattern('dbus-signal', signal='NewChannels'),
+ )
+
+ chan_path = ret.value[0]
+ channels = filter(lambda x:
+ x[1] == "org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT" and
+ x[0] == chan_path,
+ conn.ListChannels())
+ tube_chan = bus.get_object(conn.bus_name, channels[0][0])
+ tube_iface = dbus.Interface(tube_chan,
+ tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+
+ path = os.getcwd() + '/stream'
+ call_async(q, tube_iface, 'OfferStreamTube',
+ 0, dbus.ByteArray(path), 0, "")
+
+ event = q.expect('stream-message')
+ message = event.stanza
+ tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % NS_TUBES,
+ message)
+ assert tube_nodes is not None
+ assert len(tube_nodes) == 1
+ tube = tube_nodes[0]
+
+ assert tube['service'] == 'echo'
+ assert tube['type'] == 'stream'
+ assert not tube.hasAttribute('initiator')
+ stream_tube_id = long(tube['id'])
+
+ # The CM is the server, so fake a client wanting to talk to it
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ si = iq.addElement((NS_SI, 'si'))
+ si['id'] = 'alpha'
+ si['profile'] = NS_TUBES
+ feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+ x = feature.addElement((NS_X_DATA, 'x'))
+ x['type'] = 'form'
+ field = x.addElement((None, 'field'))
+ field['var'] = 'stream-method'
+ field['type'] = 'list-multiple'
+ option = field.addElement((None, 'option'))
+ value = option.addElement((None, 'value'))
+ value.addContent(NS_BYTESTREAMS)
+ option = field.addElement((None, 'option'))
+ value = option.addElement((None, 'value'))
+ value.addContent(NS_IBB)
+
+ stream_node = si.addElement((NS_TUBES, 'stream'))
+ stream_node['tube'] = str(stream_tube_id)
+ stream.send(iq)
+
+ si_reply_event, _ = q.expect_many(
+ EventPattern('stream-iq', iq_type='result'),
+ EventPattern('dbus-signal', signal='TubeChannelStateChanged',
+ args=[2])) # 2 == OPEN
+ iq = si_reply_event.stanza
+ si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+ iq)[0]
+ value = xpath.queryForNodes('/si/feature/x/field/value', si)
+ assert len(value) == 1
+ proto = value[0]
+ assert str(proto) == NS_BYTESTREAMS
+ tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+ assert len(tube) == 1
+
+ q.expect('dbus-signal', signal='StreamTubeNewConnection',
+ args=[bob_handle])
+
+ # Send the non-working streamhost
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ query = iq.addElement((NS_BYTESTREAMS, 'query'))
+ query['sid'] = 'alpha'
+ query['mode'] = 'tcp'
+ streamhost = query.addElement('streamhost')
+ streamhost['jid'] = 'bob at localhost/Bob'
+ streamhost['host'] = 'invalid.invalid'
+ streamhost['port'] = '1234'
+ stream.send(iq)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ open = xpath.queryForNodes('/iq/open', iq)[0]
+ assert open.uri == NS_IBB
+ assert open['sid'] == 'alpha'
+
+ result = IQ(stream, 'result')
+ result['id'] = iq['id']
+ result['from'] = iq['to']
+ result['to'] = 'test at localhost/Resource'
+
+ stream.send(result)
+
+ # have the fake client send us some data
+ message = domish.Element(('jabber:client', 'message'))
+ message['to'] = 'test at localhost/Resource'
+ message['from'] = 'bob at localhost/Bob'
+ data_node = message.addElement((NS_IBB, 'data'))
+ data_node['sid'] = 'alpha'
+ data_node['seq'] = '0'
+ data_node.addContent(base64.b64encode('hello world'))
+ stream.send(message)
+
+ event = q.expect('stream-message', to='bob at localhost/Bob')
+ message = event.stanza
+
+ data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % NS_IBB,
+ message)
+ assert data_nodes is not None
+ assert len(data_nodes) == 1
+ ibb_data = data_nodes[0]
+ assert ibb_data['sid'] == 'alpha'
+ binary = base64.b64decode(str(ibb_data))
+ assert binary == 'hello world'
+
+
+ # Accepting a tube
+ message = domish.Element(('jabber:client', 'message'))
+ message['to'] = 'test at localhost/Resource'
+ message['from'] = 'bob at localhost/Bob'
+ message['id'] = 'msg-id'
+ tube = message.addElement((NS_TUBES, 'tube'))
+ tube['type'] = 'stream'
+ tube['id'] = '42'
+ tube['service'] = 'foo-service'
+ parameters = tube.addElement((None, 'parameters'))
+ parameter = parameters.addElement((None, 'parameter'))
+ parameter['type'] = 'str'
+ parameter['name'] = 'foo'
+ parameter.addContent('bar')
+
+ stream.send (message)
+
+ event = q.expect('dbus-signal', signal='NewTube')
+ id = event.args[0]
+ initiator = event.args[1]
+ type = event.args[2]
+ service = event.args[3]
+ parameters = event.args[4]
+ state = event.args[5]
+
+ assert id == 42
+ initiator_jid = conn.InspectHandles(1, [initiator])[0]
+ assert initiator_jid == 'bob at localhost'
+ assert type == 1 # Stream tube
+ assert service == 'foo-service'
+ assert parameters == {'foo': 'bar'}
+ assert state == 0 # local pending
+
+ # accept the tube
+ channels = filter(lambda x:
+ x[1] == "org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT" and
+ '42' in x[0],
+ conn.ListChannels())
+ assert len(channels) == 1
+
+ tube_chan = bus.get_object(conn.bus_name, channels[0][0])
+ tube_iface = dbus.Interface(tube_chan,
+ tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+
+ call_async(q, tube_iface, 'AcceptStreamTube', 0, 0, '')
+
+ event = q.expect('dbus-return', method='AcceptStreamTube')
+ path2 = event.value[0]
+ path2 = ''.join([chr(c) for c in path2])
+
+ factory = EventProtocolClientFactory(q)
+ reactor.connectUNIX(path2, factory)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ si_nodes = xpath.queryForNodes('/iq/si', iq)
+ assert si_nodes is not None
+ assert len(si_nodes) == 1
+ si = si_nodes[0]
+ assert si['profile'] == NS_TUBES
+ dbus_stream_id = si['id']
+
+ feature = xpath.queryForNodes('/si/feature', si)[0]
+ x = xpath.queryForNodes('/feature/x', feature)[0]
+ assert x['type'] == 'form'
+ field = xpath.queryForNodes('/x/field', x)[0]
+ assert field['var'] == 'stream-method'
+ assert field['type'] == 'list-multiple'
+ value = xpath.queryForNodes('/field/option/value', field)[0]
+ assert str(value) == NS_BYTESTREAMS
+ value = xpath.queryForNodes('/field/option/value', field)[1]
+ assert str(value) == NS_IBB
+
+ result = IQ(stream, 'result')
+ result['id'] = iq['id']
+ result['from'] = iq['to']
+ result['to'] = 'test at localhost/Resource'
+ res_si = result.addElement((NS_SI, 'si'))
+ res_feature = res_si.addElement((NS_FEATURE_NEG, 'feature'))
+ res_x = res_feature.addElement((NS_X_DATA, 'x'))
+ res_x['type'] = 'submit'
+ res_field = res_x.addElement((None, 'field'))
+ res_field['var'] = 'stream-method'
+ res_value = res_field.addElement((None, 'value'))
+ res_value.addContent(NS_BYTESTREAMS)
+ res_value = res_field.addElement((None, 'value'))
+ res_value.addContent(NS_IBB)
+
+ stream.send(result)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ query = xpath.queryForNodes('/iq/query', iq)[0]
+ assert query.uri == NS_BYTESTREAMS
+ sid = query['sid']
+ streamhost = xpath.queryForNodes('/iq/query/streamhost', iq)[0]
+ assert streamhost
+
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ open = iq.addElement((NS_IBB, 'open'))
+ open['sid'] = sid
+ open['block-size'] = '4096'
+ stream.send(iq)
+
+ q.expect('stream-iq', iq_type='result')
+
+ return
+
+if __name__ == '__main__':
+ exec_test(test)
--
1.5.6.5
More information about the Telepathy-commits
mailing list