[Telepathy-commits] [telepathy-salut/master] 1-1 tubes: use direct tcp connections, and connect to the right port
Alban Crequy
alban.crequy at collabora.co.uk
Tue Nov 25 03:59:19 PST 2008
20080731142137-a41c0-efce423d1593431bd156242fa9216afd72c860f2.gz
---
lib/gibber/gibber-bytestream-direct.c | 23 +++++++-
src/salut-direct-bytestream-manager.c | 100 +++++++++++++-------------------
src/salut-direct-bytestream-manager.h | 13 +++-
src/salut-tubes-channel.c | 33 ++++++++---
src/salut-tubes-channel.h | 2 +-
src/salut-tubes-manager.c | 38 +++++++++++-
src/tube-stream.c | 75 +++++++++++++++++++++++-
src/tube-stream.h | 2 +-
8 files changed, 203 insertions(+), 83 deletions(-)
diff --git a/lib/gibber/gibber-bytestream-direct.c b/lib/gibber/gibber-bytestream-direct.c
index e36bd5d..52b7522 100644
--- a/lib/gibber/gibber-bytestream-direct.c
+++ b/lib/gibber/gibber-bytestream-direct.c
@@ -610,7 +610,8 @@ gibber_bytestream_direct_accept (GibberBytestreamIface *bytestream,
GibberBytestreamAugmentSiAcceptReply func,
gpointer user_data)
{
- DEBUG ("not implemented");
+ /* nothing to do */
+ DEBUG ("Called.");
}
/*
@@ -622,7 +623,25 @@ static void
gibber_bytestream_direct_close (GibberBytestreamIface *bytestream,
GError *error)
{
- DEBUG ("not implemented");
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSED)
+ /* bytestream already closed, do nothing */
+ return;
+
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_CLOSING, NULL);
+ if (priv->transport != NULL &&
+ !gibber_transport_buffer_is_empty (priv->transport))
+ {
+ DEBUG ("Wait transport buffer is empty before close the bytestream");
+ }
+ else
+ {
+ DEBUG ("Transport buffer is empty, we can close the bytestream");
+ bytestream_closed (self);
+ }
}
/*
diff --git a/src/salut-direct-bytestream-manager.c b/src/salut-direct-bytestream-manager.c
index e12779f..35fde01 100644
--- a/src/salut-direct-bytestream-manager.c
+++ b/src/salut-direct-bytestream-manager.c
@@ -35,6 +35,7 @@
#include "salut-im-manager.h"
#include "salut-muc-manager.h"
#include "salut-tubes-manager.h"
+#include "tube-iface.h"
#define DEBUG_FLAG DEBUG_DIRECT_BYTESTREAM_MGR
#include "debug.h"
@@ -226,7 +227,7 @@ salut_direct_bytestream_manager_class_init (
param_spec = g_param_spec_string (
"host-name-fqdn",
"host name FQDN",
- "The FQDN host name that will be used by OOB bytestreams",
+ "The FQDN host name that will be used by direct bytestreams",
NULL,
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_READWRITE |
@@ -250,27 +251,12 @@ salut_direct_bytestream_manager_new (SalutConnection *conn,
NULL);
}
-/* transport between the 2 CM, called on the initiator's side */
-static void
-set_transport (SalutDirectBytestreamManager *self,
- GibberTransport *transport)
+struct _listener_io_in_cb_data
{
- //SalutDirectBytestreamManagerPrivate *priv =
- // SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
-
- /* the transport_handler is called when something is received in the
- * transport */
- //gibber_transport_set_handler (transport, transport_handler, self);
-
- /*
- g_signal_connect (transport, "connected",
- G_CALLBACK (transport_connected_cb), self);
- g_signal_connect (transport, "disconnected",
- G_CALLBACK (transport_disconnected_cb), self);
- g_signal_connect (priv->transport, "buffer-empty",
- G_CALLBACK (transport_buffer_empty_cb), self);
- */
-}
+ SalutDirectBytestreamManager *mgr;
+ SalutTubeIface *tube;
+ SalutContact *contact;
+};
/* callback when receiving a connection from the remote CM */
static gboolean
@@ -278,33 +264,25 @@ listener_io_in_cb (GIOChannel *source,
GIOCondition condition,
gpointer user_data)
{
- SalutDirectBytestreamManager *self = SALUT_DIRECT_BYTESTREAM_MANAGER
- (user_data);
- int listen_fd, fd, ret;
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
- struct sockaddr_storage addr;
- socklen_t addrlen = sizeof (struct sockaddr_storage);
- GibberLLTransport *ll_transport;
+ struct _listener_io_in_cb_data *data = user_data;
+ SalutDirectBytestreamManagerPrivate *priv;
+ GibberBytestreamIface *bytestream;
+ int listen_fd;
listen_fd = g_io_channel_unix_get_fd (source);
- fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
- gibber_normalize_address (&addr);
- ret = getnameinfo ((struct sockaddr *) &addr, addrlen,
- host, NI_MAXHOST, port, NI_MAXSERV,
- NI_NUMERICHOST | NI_NUMERICSERV);
+ DEBUG ("Called. listen_fd=%d", listen_fd);
- /* check_addr_func */
+ priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (data->mgr);
- if (ret == 0)
- DEBUG("New connection from %s port %s", host, port);
- else
- DEBUG("New connection..");
+ bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
+ "state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
+ "self-id", priv->connection->name,
+ "peer-id", data->contact->name,
+ NULL);
- ll_transport = gibber_ll_transport_new ();
- set_transport (self, GIBBER_TRANSPORT (ll_transport));
- gibber_ll_transport_open_fd (ll_transport, fd);
+ salut_tube_iface_add_bytestream (data->tube, bytestream);
+ gibber_bytestream_direct_accept_socket (bytestream, listen_fd);
return FALSE;
}
@@ -317,10 +295,12 @@ listener_io_in_cb (GIOChannel *source,
*/
static int
start_listen_for_connection (SalutDirectBytestreamManager *self,
- gpointer id)
+ SalutContact *contact,
+ SalutTubeIface *tube)
{
SalutDirectBytestreamManagerPrivate *priv;
priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+ struct _listener_io_in_cb_data *data;
GIOChannel *listener;
guint *listener_watch;
int port;
@@ -401,14 +381,19 @@ start_listen_for_connection (SalutDirectBytestreamManager *self,
DEBUG ("listen on %s:%d", priv->host_name_fqdn, port);
+ data = g_slice_new (struct _listener_io_in_cb_data);
+ data->mgr = self;
+ data->tube = tube;
+ data->contact = contact,
+
listener = g_io_channel_unix_new (fd);
g_io_channel_set_close_on_unref (listener, TRUE);
listener_watch = g_malloc (sizeof (*listener_watch));
*listener_watch = g_io_add_watch (listener, G_IO_IN,
- listener_io_in_cb, self);
+ listener_io_in_cb, data);
- /* add id->listener_watch in priv->listener_watchs */
- g_hash_table_insert (priv->listener_watchs, id, listener_watch);
+ /* add tube->listener_watch in priv->listener_watchs */
+ g_hash_table_insert (priv->listener_watchs, tube, listener_watch);
freeaddrinfo (ans);
return port;
@@ -422,23 +407,24 @@ error:
return -1;
}
-void
+int
salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
SalutContact *contact,
- GibberXmppConnection *connection)
+ GibberXmppConnection *connection,
+ SalutTubeIface *tube)
{
SalutDirectBytestreamManagerPrivate *priv;
priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
DEBUG ("salut_direct_new_listening_stream: Called.");
- start_listen_for_connection (self, NULL);
-
+ return start_listen_for_connection (self, contact, tube);
}
GibberBytestreamIface *
salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
- SalutContact *contact)
+ SalutContact *contact,
+ int portnum)
{
GibberBytestreamIface *bytestream;
SalutDirectBytestreamManagerPrivate *priv;
@@ -447,18 +433,14 @@ salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
"state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
+ "self-id", priv->connection->name,
+ "peer-id", contact->name,
+ "host", "127.0.0.1", /* FIXME! */
+ "port", portnum,
NULL);
g_assert (bytestream != NULL);
- /* Let's start the initiation of the stream */
- if (!gibber_bytestream_iface_initiate (bytestream))
- {
- /* Initiation failed. */
- gibber_bytestream_iface_close (bytestream, NULL);
- bytestream = NULL;
- }
-
return bytestream;
}
diff --git a/src/salut-direct-bytestream-manager.h b/src/salut-direct-bytestream-manager.h
index ccce17e..f406101 100644
--- a/src/salut-direct-bytestream-manager.h
+++ b/src/salut-direct-bytestream-manager.h
@@ -23,6 +23,7 @@
#include <glib-object.h>
#include "salut-xmpp-connection-manager.h"
#include "salut-contact.h"
+#include "tube-iface.h"
#include <gibber/gibber-linklocal-transport.h>
#include <gibber/gibber-bytestream-iface.h>
@@ -67,14 +68,18 @@ salut_direct_bytestream_manager_new (SalutConnection *connection,
const gchar *host_name_fqdn);
/* To be used on the CM-initiator side, to receive connections from the remote
- * CM */
-void
+ * CM
+ *
+ * return: port
+ * */
+int
salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
- SalutContact *contact, GibberXmppConnection *connection);
+ SalutContact *contact, GibberXmppConnection *connection,
+ SalutTubeIface *tube);
/* To be used on the CM-receptor side, to make a new connection */
GibberBytestreamIface *
salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
- SalutContact *contact);
+ SalutContact *contact, int portnum);
#endif /* #ifndef __SALUT_DIRECT_BYTESTREAM_MANAGER_H__*/
diff --git a/src/salut-tubes-channel.c b/src/salut-tubes-channel.c
index e5645b7..36f9e40 100644
--- a/src/salut-tubes-channel.c
+++ b/src/salut-tubes-channel.c
@@ -164,7 +164,7 @@ static gboolean extract_tube_information (SalutTubesChannel *self,
const gchar **service, GHashTable **parameters, guint *tube_id);
static SalutTubeIface * create_new_tube (SalutTubesChannel *self,
TpTubeType type, TpHandle initiator, const gchar *service,
- GHashTable *parameters, guint tube_id);
+ GHashTable *parameters, guint tube_id, guint portnum);
static void
salut_tubes_channel_init (SalutTubesChannel *self)
@@ -791,7 +791,7 @@ tubes_muc_message_received (SalutTubesChannel *self,
}
tube = create_new_tube (self, type, initiator_handle,
- service, parameters, tube_id);
+ service, parameters, tube_id, 0);
/* the tube has reffed its initiator, no need to keep a ref */
tp_handle_unref (contact_repo, initiator_handle);
@@ -868,7 +868,8 @@ tubes_message_received (SalutTubesChannel *self,
TpTubeType tube_type,
TpHandle initiator_handle,
GHashTable *parameters,
- guint tube_id)
+ guint tube_id,
+ guint portnum)
{
SalutTubesChannelPrivate *priv = SALUT_TUBES_CHANNEL_GET_PRIVATE (self);
@@ -879,7 +880,7 @@ tubes_message_received (SalutTubesChannel *self,
if (tube == NULL)
{
tube = create_new_tube (self, tube_type, initiator_handle, service,
- parameters, tube_id);
+ parameters, tube_id, portnum);
}
}
@@ -1046,6 +1047,7 @@ tube_closed_cb (SalutTubeIface *tube,
SalutTubesChannelPrivate *priv = SALUT_TUBES_CHANNEL_GET_PRIVATE (self);
guint tube_id;
+ DEBUG ("Called. closed=%d", priv->closed);
if (priv->closed)
return;
@@ -1083,7 +1085,8 @@ create_new_tube (SalutTubesChannel *self,
TpHandle initiator,
const gchar *service,
GHashTable *parameters,
- guint tube_id)
+ guint tube_id,
+ guint portnum)
{
SalutTubesChannelPrivate *priv = SALUT_TUBES_CHANNEL_GET_PRIVATE (self);
SalutTubeIface *tube;
@@ -1103,7 +1106,8 @@ create_new_tube (SalutTubesChannel *self,
case TP_TUBE_TYPE_STREAM:
tube = SALUT_TUBE_IFACE (salut_tube_stream_new (priv->conn,
priv->xmpp_connection_manager, priv->handle, priv->handle_type,
- priv->self_handle, initiator, service, parameters, tube_id));
+ priv->self_handle, initiator, service, parameters, tube_id,
+ portnum));
break;
default:
g_assert_not_reached ();
@@ -1459,7 +1463,7 @@ salut_tubes_channel_offer_d_bus_tube (TpSvcChannelTypeTubes *iface,
tube_id = generate_tube_id ();
tube = create_new_tube (self, TP_TUBE_TYPE_DBUS, priv->self_handle,
- service, parameters_copied, tube_id);
+ service, parameters_copied, tube_id, 0);
tp_svc_channel_type_tubes_return_from_offer_d_bus_tube (context, tube_id);
}
@@ -1795,6 +1799,8 @@ _send_channel_iq_tube (gpointer key,
TpHandleRepoIface *contact_repo;
gchar *tube_id_str = g_strdup_printf ("%d", tube_id);
+ int port;
+ gchar *port_str;
/* listen for future connections from the remote CM before sending the
* iq */
@@ -1805,8 +1811,8 @@ _send_channel_iq_tube (gpointer key,
NULL);
g_assert (direct_bytestream_mgr != NULL);
- salut_direct_new_listening_stream (direct_bytestream_mgr, priv->contact,
- priv->xmpp_connection);
+ port = salut_direct_new_listening_stream (direct_bytestream_mgr,
+ priv->contact, priv->xmpp_connection, tube);
g_object_unref (direct_bytestream_mgr);
@@ -1829,6 +1835,8 @@ _send_channel_iq_tube (gpointer key,
g_assert_not_reached ();
}
+ port_str = g_strdup_printf ("%d", port);
+
stanza = gibber_xmpp_stanza_build (GIBBER_STANZA_TYPE_IQ,
GIBBER_STANZA_SUB_TYPE_SET,
jid_from, jid_to,
@@ -1837,6 +1845,10 @@ _send_channel_iq_tube (gpointer key,
GIBBER_NODE_ATTRIBUTE, "type", tube_type_str,
GIBBER_NODE_ATTRIBUTE, "service", service,
GIBBER_NODE_ATTRIBUTE, "id", tube_id_str,
+ GIBBER_NODE, "transport",
+ GIBBER_NODE_ATTRIBUTE, "ip", "127.0.0.1", /* FIXME */
+ GIBBER_NODE_ATTRIBUTE, "port", port_str,
+ GIBBER_NODE_END,
GIBBER_NODE_END,
GIBBER_STANZA_END);
@@ -1859,6 +1871,7 @@ _send_channel_iq_tube (gpointer key,
g_object_unref (stanza);
g_free (tube_id_str);
+ g_free (port_str);
}
g_free (service);
@@ -1936,7 +1949,7 @@ salut_tubes_channel_offer_stream_tube (TpSvcChannelTypeTubes *iface,
tube_id = generate_tube_id ();
tube = create_new_tube (self, TP_TUBE_TYPE_STREAM, priv->self_handle,
- service, parameters_copied, tube_id);
+ service, parameters_copied, tube_id, 0);
g_object_set (tube,
"address-type", address_type,
diff --git a/src/salut-tubes-channel.h b/src/salut-tubes-channel.h
index 4941f9f..6d11f13 100644
--- a/src/salut-tubes-channel.h
+++ b/src/salut-tubes-channel.h
@@ -73,7 +73,7 @@ void tubes_muc_message_received (SalutTubesChannel *channel,
void tubes_message_received (SalutTubesChannel *self,
const gchar *service, TpTubeType tube_type, TpHandle initiator_handle,
- GHashTable *parameters, guint tube_id);
+ GHashTable *parameters, guint tube_id, guint portnum);
void tubes_message_close_received (SalutTubesChannel *self,
TpHandle initiator_handle, guint tube_id);
diff --git a/src/salut-tubes-manager.c b/src/salut-tubes-manager.c
index 0905505..d9a6e9c 100644
--- a/src/salut-tubes-manager.c
+++ b/src/salut-tubes-manager.c
@@ -132,7 +132,8 @@ extract_tube_information (TpHandleRepoIface *contact_repo,
TpHandle *initiator_handle,
const gchar **service,
GHashTable **parameters,
- guint *tube_id)
+ guint *tube_id,
+ guint *portnum)
{
GibberXmppNode *iq;
GibberXmppNode *tube_node, *close_node, *node;
@@ -219,6 +220,36 @@ extract_tube_information (TpHandleRepoIface *contact_repo,
"parameter");
}
+ if (!_close && portnum != NULL)
+ {
+ GibberXmppNode *node;
+ const gchar *str;
+ gchar *endptr;
+ long int tmp;
+
+ node = gibber_xmpp_node_get_child (tube_node, "transport");
+ if (node == NULL)
+ {
+ DEBUG ("no transport to connect to in the tube request");
+ return FALSE;
+ }
+
+ str = gibber_xmpp_node_get_attribute (node, "port");
+ if (str == NULL)
+ {
+ DEBUG ("no port to connect to in the tube request");
+ return FALSE;
+ }
+
+ tmp = strtol (str, &endptr, 10);
+ if (!endptr || *endptr)
+ {
+ DEBUG ("port is not numeric: %s", str);
+ return FALSE;
+ }
+ *portnum = (int) tmp;
+ }
+
if (tube_id != NULL)
{
const gchar *str;
@@ -263,6 +294,7 @@ iq_tube_request_cb (SalutXmppConnectionManager *xcm,
TpHandle initiator_handle;
GHashTable *parameters = NULL;
guint tube_id;
+ guint portnum = 0;
gboolean close;
SalutTubesChannel *chan;
@@ -271,7 +303,7 @@ iq_tube_request_cb (SalutXmppConnectionManager *xcm,
* it or send an error reply */
if (!extract_tube_information (contact_repo, stanza, &close, &tube_type,
- &initiator_handle, &service, ¶meters, &tube_id))
+ &initiator_handle, &service, ¶meters, &tube_id, &portnum))
{
GibberXmppStanza *reply;
@@ -304,7 +336,7 @@ iq_tube_request_cb (SalutXmppConnectionManager *xcm,
}
tubes_message_received (chan, service, tube_type, initiator_handle,
- parameters, tube_id);
+ parameters, tube_id, portnum);
}
reply = gibber_iq_helper_new_result_reply (stanza);
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 7d838c1..891de77 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -36,6 +36,8 @@
#include <gibber/gibber-xmpp-stanza.h>
#include <gibber/gibber-namespaces.h>
#include <gibber/gibber-bytestream-iface.h>
+#include <gibber/gibber-bytestream-oob.h>
+#include <gibber/gibber-bytestream-direct.h>
#include <gibber/gibber-transport.h>
#include <gibber/gibber-fd-transport.h>
#include <gibber/gibber-iq-helper.h>
@@ -106,6 +108,7 @@ enum
PROP_ACCESS_CONTROL,
PROP_ACCESS_CONTROL_PARAM,
PROP_XMPP_CONNECTION_MANAGER,
+ PROP_PORT,
LAST_PROPERTY
};
@@ -117,8 +120,13 @@ struct _SalutTubeStreamPrivate
TpHandleType handle_type;
TpHandle self_handle;
guint id;
+ guint port;
- /* Bytestreams for MUC (using stream initiation) */
+ /* Bytestreams for MUC tubes (using stream initiation) or 1-1 tubes (using
+ * direct TCP connections). One tube can have several bytestreams. The
+ * mapping between the tube bytestream and the transport to the local
+ * application is stored in the transport_to_bytestream and
+ * bytestream_to_transport fields. */
/* (GibberTransport *) -> (GibberBytestreamIface *) */
GHashTable *transport_to_bytestream;
@@ -264,6 +272,8 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
SalutTubeStream *self = SALUT_TUBE_STREAM (user_data);
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+ DEBUG ("Called.");
+
if (state == GIBBER_BYTESTREAM_STATE_OPEN)
{
int fd;
@@ -454,6 +464,7 @@ start_stream_initiation (SalutTubeStream *self,
return result;
}
+/* start a new stream in a tube from the recipient side */
static gboolean
start_stream_direct (SalutTubeStream *self,
gint fd,
@@ -470,6 +481,8 @@ start_stream_direct (SalutTubeStream *self,
g_assert (priv->handle_type == TP_HANDLE_TYPE_CONTACT);
+ DEBUG ("Called.");
+
contact_repo = tp_base_connection_get_handles (
(TpBaseConnection*) priv->conn, TP_HANDLE_TYPE_CONTACT);
@@ -498,23 +511,35 @@ start_stream_direct (SalutTubeStream *self,
GibberBytestreamIface *bytestream;
bytestream = salut_direct_bytestream_manager_new_stream (
direct_bytestream_mgr,
- contact);
+ contact, priv->port);
if (bytestream == NULL)
{
DEBUG ("initiator refused new bytestream");
+ result = FALSE;
close (fd);
}
else
{
DEBUG ("extra bytestream accepted");
+ result = TRUE;
g_hash_table_insert (priv->bytestream_to_fd,
g_object_ref (bytestream), GUINT_TO_POINTER (fd));
g_signal_connect (bytestream, "state-changed",
G_CALLBACK (extra_bytestream_state_changed_cb), self);
+
+ /* Let's start the initiation of the stream */
+ if (!gibber_bytestream_iface_initiate (bytestream))
+ {
+ /* Initiation failed. */
+ gibber_bytestream_iface_close (bytestream, NULL);
+ result = FALSE;
+ close (fd);
+ }
+
}
g_object_unref (contact);
@@ -613,6 +638,8 @@ new_connection_to_socket (SalutTubeStream *self,
SockAddr addr;
socklen_t len;
+ DEBUG ("Called.");
+
g_assert (priv->initiator == priv->self_handle);
memset (&addr, 0, sizeof (addr));
@@ -1111,6 +1138,9 @@ salut_tube_stream_get_property (GObject *object,
case PROP_XMPP_CONNECTION_MANAGER:
g_value_set_object (value, priv->xmpp_connection_manager);
break;
+ case PROP_PORT:
+ g_value_set_uint (value, priv->port);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -1182,6 +1212,9 @@ salut_tube_stream_set_property (GObject *object,
priv->xmpp_connection_manager = g_value_get_object (value);
g_object_ref (priv->xmpp_connection_manager);
break;
+ case PROP_PORT:
+ priv->port = g_value_get_uint (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -1348,6 +1381,20 @@ salut_tube_stream_class_init (SalutTubeStreamClass *salut_tube_stream_class)
g_object_class_install_property (object_class, PROP_XMPP_CONNECTION_MANAGER,
param_spec);
+ param_spec = g_param_spec_uint (
+ "port",
+ "port on the initiator's CM",
+ "New stream in this tube will connect to the initiator's CM on this port"
+ " in case of 1-1 tube",
+ 0,
+ G_MAXUINT32,
+ 0,
+ G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_READWRITE |
+ G_PARAM_STATIC_NICK |
+ G_PARAM_STATIC_BLURB);
+ g_object_class_install_property (object_class, PROP_PORT, param_spec);
+
signals[OPENED] =
g_signal_new ("opened",
G_OBJECT_CLASS_TYPE (salut_tube_stream_class),
@@ -1402,6 +1449,24 @@ data_received_cb (GibberBytestreamIface *bytestream,
DEBUG ("sending failed: %s", error->message);
g_error_free (error);
}
+
+ if (!gibber_transport_buffer_is_empty (transport))
+ {
+ /* We >don't want to send more data while the buffer isn't empty */
+ /* FIXME: Should we move this as bytestream-iface method? */
+ if (GIBBER_IS_BYTESTREAM_OOB (bytestream))
+ {
+ DEBUG ("tube buffer isn't empty. Block the bytestream");
+ gibber_bytestream_oob_block_read (
+ GIBBER_BYTESTREAM_OOB (bytestream), TRUE);
+ }
+ else if (GIBBER_IS_BYTESTREAM_DIRECT (bytestream))
+ {
+ DEBUG ("tube buffer isn't empty. Block the bytestream");
+ gibber_bytestream_direct_block_read (
+ GIBBER_BYTESTREAM_DIRECT (bytestream), TRUE);
+ }
+ }
}
SalutTubeStream *
@@ -1413,7 +1478,8 @@ salut_tube_stream_new (SalutConnection *conn,
TpHandle initiator,
const gchar *service,
GHashTable *parameters,
- guint id)
+ guint id,
+ guint portnum)
{
return g_object_new (SALUT_TYPE_TUBE_STREAM,
"connection", conn,
@@ -1425,6 +1491,7 @@ salut_tube_stream_new (SalutConnection *conn,
"service", service,
"parameters", parameters,
"id", id,
+ "port", portnum,
NULL);
}
@@ -1578,6 +1645,8 @@ salut_tube_stream_add_bytestream (SalutTubeIface *tube,
SalutTubeStream *self = SALUT_TUBE_STREAM (tube);
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+ DEBUG ("Called.");
+
if (priv->initiator != priv->self_handle)
{
DEBUG ("I'm not the initiator of this tube, can't accept "
diff --git a/src/tube-stream.h b/src/tube-stream.h
index 3472065..70d5567 100644
--- a/src/tube-stream.h
+++ b/src/tube-stream.h
@@ -63,7 +63,7 @@ GType salut_tube_stream_get_type (void);
SalutTubeStream *salut_tube_stream_new (SalutConnection *conn,
SalutXmppConnectionManager *xmpp_connection_manager, TpHandle handle,
TpHandleType handle_type, TpHandle self_handle, TpHandle initiator,
- const gchar *service, GHashTable *parameters, guint id);
+ const gchar *service, GHashTable *parameters, guint id, guint portnum);
gboolean salut_tube_stream_check_params (TpSocketAddressType address_type,
const GValue *address, TpSocketAccessControl access_control,
--
1.5.6.5
More information about the Telepathy-commits
mailing list