[Telepathy-commits] [telepathy-salut/master] 1-1 tubes: use direct tcp connections, and connect to the right port

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:19 PST 2008


20080731142137-a41c0-efce423d1593431bd156242fa9216afd72c860f2.gz
---
 lib/gibber/gibber-bytestream-direct.c |   23 +++++++-
 src/salut-direct-bytestream-manager.c |  100 +++++++++++++-------------------
 src/salut-direct-bytestream-manager.h |   13 +++-
 src/salut-tubes-channel.c             |   33 ++++++++---
 src/salut-tubes-channel.h             |    2 +-
 src/salut-tubes-manager.c             |   38 +++++++++++-
 src/tube-stream.c                     |   75 +++++++++++++++++++++++-
 src/tube-stream.h                     |    2 +-
 8 files changed, 203 insertions(+), 83 deletions(-)

diff --git a/lib/gibber/gibber-bytestream-direct.c b/lib/gibber/gibber-bytestream-direct.c
index e36bd5d..52b7522 100644
--- a/lib/gibber/gibber-bytestream-direct.c
+++ b/lib/gibber/gibber-bytestream-direct.c
@@ -610,7 +610,8 @@ gibber_bytestream_direct_accept (GibberBytestreamIface *bytestream,
                               GibberBytestreamAugmentSiAcceptReply func,
                               gpointer user_data)
 {
-  DEBUG ("not implemented");
+  /* nothing to do */
+  DEBUG ("Called.");
 }
 
 /*
@@ -622,7 +623,25 @@ static void
 gibber_bytestream_direct_close (GibberBytestreamIface *bytestream,
                              GError *error)
 {
-  DEBUG ("not implemented");
+  GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSED)
+     /* bytestream already closed, do nothing */
+     return;
+
+  g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_CLOSING, NULL);
+  if (priv->transport != NULL &&
+      !gibber_transport_buffer_is_empty (priv->transport))
+    {
+      DEBUG ("Wait transport buffer is empty before close the bytestream");
+    }
+  else
+    {
+      DEBUG ("Transport buffer is empty, we can close the bytestream");
+      bytestream_closed (self);
+    }
 }
 
 /*
diff --git a/src/salut-direct-bytestream-manager.c b/src/salut-direct-bytestream-manager.c
index e12779f..35fde01 100644
--- a/src/salut-direct-bytestream-manager.c
+++ b/src/salut-direct-bytestream-manager.c
@@ -35,6 +35,7 @@
 #include "salut-im-manager.h"
 #include "salut-muc-manager.h"
 #include "salut-tubes-manager.h"
+#include "tube-iface.h"
 
 #define DEBUG_FLAG DEBUG_DIRECT_BYTESTREAM_MGR
 #include "debug.h"
@@ -226,7 +227,7 @@ salut_direct_bytestream_manager_class_init (
   param_spec = g_param_spec_string (
       "host-name-fqdn",
       "host name FQDN",
-      "The FQDN host name that will be used by OOB bytestreams",
+      "The FQDN host name that will be used by direct bytestreams",
       NULL,
       G_PARAM_CONSTRUCT_ONLY |
       G_PARAM_READWRITE |
@@ -250,27 +251,12 @@ salut_direct_bytestream_manager_new (SalutConnection *conn,
       NULL);
 }
 
-/* transport between the 2 CM, called on the initiator's side */
-static void
-set_transport (SalutDirectBytestreamManager *self,
-               GibberTransport *transport)
+struct _listener_io_in_cb_data
 {
-  //SalutDirectBytestreamManagerPrivate *priv =
-  //    SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
-
-  /* the transport_handler is called when something is received in the
-   * transport */
-  //gibber_transport_set_handler (transport, transport_handler, self);
-
-  /*
-  g_signal_connect (transport, "connected",
-      G_CALLBACK (transport_connected_cb), self);
-  g_signal_connect (transport, "disconnected",
-      G_CALLBACK (transport_disconnected_cb), self);
-  g_signal_connect (priv->transport, "buffer-empty",
-      G_CALLBACK (transport_buffer_empty_cb), self);
-      */
-}
+  SalutDirectBytestreamManager *mgr;
+  SalutTubeIface *tube;
+  SalutContact *contact;
+};
 
 /* callback when receiving a connection from the remote CM */
 static gboolean
@@ -278,33 +264,25 @@ listener_io_in_cb (GIOChannel *source,
                    GIOCondition condition,
                    gpointer user_data)
 {
-  SalutDirectBytestreamManager *self = SALUT_DIRECT_BYTESTREAM_MANAGER
-      (user_data);
-  int listen_fd, fd, ret;
-  char host[NI_MAXHOST];
-  char port[NI_MAXSERV];
-  struct sockaddr_storage addr;
-  socklen_t addrlen = sizeof (struct sockaddr_storage);
-  GibberLLTransport *ll_transport;
+  struct _listener_io_in_cb_data *data = user_data;
+  SalutDirectBytestreamManagerPrivate *priv;
+  GibberBytestreamIface *bytestream;
+  int listen_fd;
 
   listen_fd = g_io_channel_unix_get_fd (source);
-  fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
-  gibber_normalize_address (&addr);
 
-  ret = getnameinfo ((struct sockaddr *) &addr, addrlen,
-      host, NI_MAXHOST, port, NI_MAXSERV,
-      NI_NUMERICHOST | NI_NUMERICSERV);
+  DEBUG ("Called. listen_fd=%d", listen_fd);
 
-  /* check_addr_func */
+  priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (data->mgr);
 
-  if (ret == 0)
-    DEBUG("New connection from %s port %s", host, port);
-  else
-    DEBUG("New connection..");
+  bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
+      "state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
+      "self-id", priv->connection->name,
+      "peer-id", data->contact->name,
+      NULL);
 
-  ll_transport = gibber_ll_transport_new ();
-  set_transport (self, GIBBER_TRANSPORT (ll_transport));
-  gibber_ll_transport_open_fd (ll_transport, fd);
+  salut_tube_iface_add_bytestream (data->tube, bytestream);
+  gibber_bytestream_direct_accept_socket (bytestream, listen_fd);
 
   return FALSE;
 }
@@ -317,10 +295,12 @@ listener_io_in_cb (GIOChannel *source,
  */
 static int
 start_listen_for_connection (SalutDirectBytestreamManager *self,
-                             gpointer id)
+                             SalutContact *contact,
+                             SalutTubeIface *tube)
 {
   SalutDirectBytestreamManagerPrivate *priv;
   priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+  struct _listener_io_in_cb_data *data;
   GIOChannel *listener;
   guint *listener_watch;
   int port;
@@ -401,14 +381,19 @@ start_listen_for_connection (SalutDirectBytestreamManager *self,
 
   DEBUG ("listen on %s:%d", priv->host_name_fqdn, port);
 
+  data = g_slice_new (struct _listener_io_in_cb_data);
+  data->mgr = self;
+  data->tube = tube;
+  data->contact = contact,
+
   listener = g_io_channel_unix_new (fd);
   g_io_channel_set_close_on_unref (listener, TRUE);
   listener_watch = g_malloc (sizeof (*listener_watch));
   *listener_watch = g_io_add_watch (listener, G_IO_IN,
-      listener_io_in_cb, self);
+      listener_io_in_cb, data);
 
-  /* add id->listener_watch in priv->listener_watchs */
-  g_hash_table_insert (priv->listener_watchs, id, listener_watch);
+  /* add tube->listener_watch in priv->listener_watchs */
+  g_hash_table_insert (priv->listener_watchs, tube, listener_watch);
 
   freeaddrinfo (ans);
   return port;
@@ -422,23 +407,24 @@ error:
   return -1;
 }
 
-void
+int
 salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
                                    SalutContact *contact,
-                                   GibberXmppConnection *connection)
+                                   GibberXmppConnection *connection,
+                                   SalutTubeIface *tube)
 {
   SalutDirectBytestreamManagerPrivate *priv;
   priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
 
   DEBUG ("salut_direct_new_listening_stream: Called.");
 
-  start_listen_for_connection (self, NULL);
-
+  return start_listen_for_connection (self, contact, tube);
 }
 
 GibberBytestreamIface *
 salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
-                                            SalutContact *contact)
+                                            SalutContact *contact,
+                                            int portnum)
 {
   GibberBytestreamIface *bytestream;
   SalutDirectBytestreamManagerPrivate *priv;
@@ -447,18 +433,14 @@ salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
 
   bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
       "state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
+      "self-id", priv->connection->name,
+      "peer-id", contact->name,
+      "host", "127.0.0.1", /* FIXME! */
+      "port", portnum,
       NULL);
 
   g_assert (bytestream != NULL);
 
-  /* Let's start the initiation of the stream */
-  if (!gibber_bytestream_iface_initiate (bytestream))
-    {
-      /* Initiation failed. */
-      gibber_bytestream_iface_close (bytestream, NULL);
-      bytestream = NULL;
-    }
-
   return bytestream;
 }
 
diff --git a/src/salut-direct-bytestream-manager.h b/src/salut-direct-bytestream-manager.h
index ccce17e..f406101 100644
--- a/src/salut-direct-bytestream-manager.h
+++ b/src/salut-direct-bytestream-manager.h
@@ -23,6 +23,7 @@
 #include <glib-object.h>
 #include "salut-xmpp-connection-manager.h"
 #include "salut-contact.h"
+#include "tube-iface.h"
 
 #include <gibber/gibber-linklocal-transport.h>
 #include <gibber/gibber-bytestream-iface.h>
@@ -67,14 +68,18 @@ salut_direct_bytestream_manager_new (SalutConnection *connection,
     const gchar *host_name_fqdn);
 
 /* To be used on the CM-initiator side, to receive connections from the remote
- * CM */
-void
+ * CM
+ *
+ * return: port
+ * */
+int
 salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
-    SalutContact *contact, GibberXmppConnection *connection);
+    SalutContact *contact, GibberXmppConnection *connection,
+    SalutTubeIface *tube);
 
 /* To be used on the CM-receptor side, to make a new connection */
 GibberBytestreamIface *
 salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
-    SalutContact *contact);
+    SalutContact *contact, int portnum);
 
 #endif /* #ifndef __SALUT_DIRECT_BYTESTREAM_MANAGER_H__*/
diff --git a/src/salut-tubes-channel.c b/src/salut-tubes-channel.c
index e5645b7..36f9e40 100644
--- a/src/salut-tubes-channel.c
+++ b/src/salut-tubes-channel.c
@@ -164,7 +164,7 @@ static gboolean extract_tube_information (SalutTubesChannel *self,
     const gchar **service, GHashTable **parameters, guint *tube_id);
 static SalutTubeIface * create_new_tube (SalutTubesChannel *self,
     TpTubeType type, TpHandle initiator, const gchar *service,
-    GHashTable *parameters, guint tube_id);
+    GHashTable *parameters, guint tube_id, guint portnum);
 
 static void
 salut_tubes_channel_init (SalutTubesChannel *self)
@@ -791,7 +791,7 @@ tubes_muc_message_received (SalutTubesChannel *self,
                 }
 
               tube = create_new_tube (self, type, initiator_handle,
-                  service, parameters, tube_id);
+                  service, parameters, tube_id, 0);
 
               /* the tube has reffed its initiator, no need to keep a ref */
               tp_handle_unref (contact_repo, initiator_handle);
@@ -868,7 +868,8 @@ tubes_message_received (SalutTubesChannel *self,
                         TpTubeType tube_type,
                         TpHandle initiator_handle,
                         GHashTable *parameters,
-                        guint tube_id)
+                        guint tube_id,
+                        guint portnum)
 {
   SalutTubesChannelPrivate *priv = SALUT_TUBES_CHANNEL_GET_PRIVATE (self);
 
@@ -879,7 +880,7 @@ tubes_message_received (SalutTubesChannel *self,
   if (tube == NULL)
     {
       tube = create_new_tube (self, tube_type, initiator_handle, service,
-        parameters, tube_id);
+        parameters, tube_id, portnum);
     }
 }
 
@@ -1046,6 +1047,7 @@ tube_closed_cb (SalutTubeIface *tube,
   SalutTubesChannelPrivate *priv = SALUT_TUBES_CHANNEL_GET_PRIVATE (self);
   guint tube_id;
 
+  DEBUG ("Called. closed=%d", priv->closed);
   if (priv->closed)
     return;
 
@@ -1083,7 +1085,8 @@ create_new_tube (SalutTubesChannel *self,
                  TpHandle initiator,
                  const gchar *service,
                  GHashTable *parameters,
-                 guint tube_id)
+                 guint tube_id,
+                 guint portnum)
 {
   SalutTubesChannelPrivate *priv = SALUT_TUBES_CHANNEL_GET_PRIVATE (self);
   SalutTubeIface *tube;
@@ -1103,7 +1106,8 @@ create_new_tube (SalutTubesChannel *self,
     case TP_TUBE_TYPE_STREAM:
       tube = SALUT_TUBE_IFACE (salut_tube_stream_new (priv->conn,
           priv->xmpp_connection_manager, priv->handle, priv->handle_type,
-          priv->self_handle, initiator, service, parameters, tube_id));
+          priv->self_handle, initiator, service, parameters, tube_id,
+          portnum));
       break;
     default:
       g_assert_not_reached ();
@@ -1459,7 +1463,7 @@ salut_tubes_channel_offer_d_bus_tube (TpSvcChannelTypeTubes *iface,
   tube_id = generate_tube_id ();
 
   tube = create_new_tube (self, TP_TUBE_TYPE_DBUS, priv->self_handle,
-      service, parameters_copied, tube_id);
+      service, parameters_copied, tube_id, 0);
 
   tp_svc_channel_type_tubes_return_from_offer_d_bus_tube (context, tube_id);
 }
@@ -1795,6 +1799,8 @@ _send_channel_iq_tube (gpointer key,
       TpHandleRepoIface *contact_repo;
 
       gchar *tube_id_str = g_strdup_printf ("%d", tube_id);
+      int port;
+      gchar *port_str;
 
       /* listen for future connections from the remote CM before sending the
        * iq */
@@ -1805,8 +1811,8 @@ _send_channel_iq_tube (gpointer key,
           NULL);
       g_assert (direct_bytestream_mgr != NULL);
 
-      salut_direct_new_listening_stream (direct_bytestream_mgr, priv->contact,
-          priv->xmpp_connection);
+      port = salut_direct_new_listening_stream (direct_bytestream_mgr,
+          priv->contact, priv->xmpp_connection, tube);
       g_object_unref (direct_bytestream_mgr);
 
 
@@ -1829,6 +1835,8 @@ _send_channel_iq_tube (gpointer key,
             g_assert_not_reached ();
         }
 
+      port_str = g_strdup_printf ("%d", port);
+
       stanza = gibber_xmpp_stanza_build (GIBBER_STANZA_TYPE_IQ,
           GIBBER_STANZA_SUB_TYPE_SET,
           jid_from, jid_to,
@@ -1837,6 +1845,10 @@ _send_channel_iq_tube (gpointer key,
             GIBBER_NODE_ATTRIBUTE, "type", tube_type_str,
             GIBBER_NODE_ATTRIBUTE, "service", service,
             GIBBER_NODE_ATTRIBUTE, "id", tube_id_str,
+            GIBBER_NODE, "transport",
+              GIBBER_NODE_ATTRIBUTE, "ip", "127.0.0.1", /* FIXME */
+              GIBBER_NODE_ATTRIBUTE, "port", port_str,
+            GIBBER_NODE_END,
           GIBBER_NODE_END,
           GIBBER_STANZA_END);
 
@@ -1859,6 +1871,7 @@ _send_channel_iq_tube (gpointer key,
 
       g_object_unref (stanza);
       g_free (tube_id_str);
+      g_free (port_str);
     }
 
   g_free (service);
@@ -1936,7 +1949,7 @@ salut_tubes_channel_offer_stream_tube (TpSvcChannelTypeTubes *iface,
   tube_id = generate_tube_id ();
 
   tube = create_new_tube (self, TP_TUBE_TYPE_STREAM, priv->self_handle,
-      service, parameters_copied, tube_id);
+      service, parameters_copied, tube_id, 0);
 
   g_object_set (tube,
       "address-type", address_type,
diff --git a/src/salut-tubes-channel.h b/src/salut-tubes-channel.h
index 4941f9f..6d11f13 100644
--- a/src/salut-tubes-channel.h
+++ b/src/salut-tubes-channel.h
@@ -73,7 +73,7 @@ void tubes_muc_message_received (SalutTubesChannel *channel,
 
 void tubes_message_received (SalutTubesChannel *self,
     const gchar *service, TpTubeType tube_type, TpHandle initiator_handle,
-    GHashTable *parameters, guint tube_id);
+    GHashTable *parameters, guint tube_id, guint portnum);
 
 void tubes_message_close_received (SalutTubesChannel *self,
     TpHandle initiator_handle, guint tube_id);
diff --git a/src/salut-tubes-manager.c b/src/salut-tubes-manager.c
index 0905505..d9a6e9c 100644
--- a/src/salut-tubes-manager.c
+++ b/src/salut-tubes-manager.c
@@ -132,7 +132,8 @@ extract_tube_information (TpHandleRepoIface *contact_repo,
                           TpHandle *initiator_handle,
                           const gchar **service,
                           GHashTable **parameters,
-                          guint *tube_id)
+                          guint *tube_id,
+                          guint *portnum)
 {
   GibberXmppNode *iq;
   GibberXmppNode *tube_node, *close_node, *node;
@@ -219,6 +220,36 @@ extract_tube_information (TpHandleRepoIface *contact_repo,
           "parameter");
     }
 
+  if (!_close && portnum != NULL)
+    {
+      GibberXmppNode *node;
+      const gchar *str;
+      gchar *endptr;
+      long int tmp;
+
+      node = gibber_xmpp_node_get_child (tube_node, "transport");
+      if (node == NULL)
+        {
+          DEBUG ("no transport to connect to in the tube request");
+          return FALSE;
+        }
+
+      str = gibber_xmpp_node_get_attribute (node, "port");
+      if (str == NULL)
+        {
+          DEBUG ("no port to connect to in the tube request");
+          return FALSE;
+        }
+
+      tmp = strtol (str, &endptr, 10);
+      if (!endptr || *endptr)
+        {
+          DEBUG ("port is not numeric: %s", str);
+          return FALSE;
+        }
+      *portnum = (int) tmp;
+    }
+
   if (tube_id != NULL)
     {
       const gchar *str;
@@ -263,6 +294,7 @@ iq_tube_request_cb (SalutXmppConnectionManager *xcm,
   TpHandle initiator_handle;
   GHashTable *parameters = NULL;
   guint tube_id;
+  guint portnum = 0;
   gboolean close;
 
   SalutTubesChannel *chan;
@@ -271,7 +303,7 @@ iq_tube_request_cb (SalutXmppConnectionManager *xcm,
    * it or send an error reply */
 
   if (!extract_tube_information (contact_repo, stanza, &close, &tube_type,
-          &initiator_handle, &service, &parameters, &tube_id))
+          &initiator_handle, &service, &parameters, &tube_id, &portnum))
     {
       GibberXmppStanza *reply;
 
@@ -304,7 +336,7 @@ iq_tube_request_cb (SalutXmppConnectionManager *xcm,
       }
 
     tubes_message_received (chan, service, tube_type, initiator_handle,
-        parameters, tube_id);
+        parameters, tube_id, portnum);
   }
 
   reply = gibber_iq_helper_new_result_reply (stanza);
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 7d838c1..891de77 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -36,6 +36,8 @@
 #include <gibber/gibber-xmpp-stanza.h>
 #include <gibber/gibber-namespaces.h>
 #include <gibber/gibber-bytestream-iface.h>
+#include <gibber/gibber-bytestream-oob.h>
+#include <gibber/gibber-bytestream-direct.h>
 #include <gibber/gibber-transport.h>
 #include <gibber/gibber-fd-transport.h>
 #include <gibber/gibber-iq-helper.h>
@@ -106,6 +108,7 @@ enum
   PROP_ACCESS_CONTROL,
   PROP_ACCESS_CONTROL_PARAM,
   PROP_XMPP_CONNECTION_MANAGER,
+  PROP_PORT,
   LAST_PROPERTY
 };
 
@@ -117,8 +120,13 @@ struct _SalutTubeStreamPrivate
   TpHandleType handle_type;
   TpHandle self_handle;
   guint id;
+  guint port;
 
-  /* Bytestreams for MUC (using stream initiation) */
+  /* Bytestreams for MUC tubes (using stream initiation) or 1-1 tubes (using
+   * direct TCP connections). One tube can have several bytestreams. The
+   * mapping between the tube bytestream and the transport to the local
+   * application is stored in the transport_to_bytestream and
+   * bytestream_to_transport fields. */
 
   /* (GibberTransport *) -> (GibberBytestreamIface *) */
   GHashTable *transport_to_bytestream;
@@ -264,6 +272,8 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
   SalutTubeStream *self = SALUT_TUBE_STREAM (user_data);
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
 
+  DEBUG ("Called.");
+
   if (state == GIBBER_BYTESTREAM_STATE_OPEN)
     {
       int fd;
@@ -454,6 +464,7 @@ start_stream_initiation (SalutTubeStream *self,
   return result;
 }
 
+/* start a new stream in a tube from the recipient side */
 static gboolean
 start_stream_direct (SalutTubeStream *self,
                      gint fd,
@@ -470,6 +481,8 @@ start_stream_direct (SalutTubeStream *self,
 
   g_assert (priv->handle_type == TP_HANDLE_TYPE_CONTACT);
 
+  DEBUG ("Called.");
+
   contact_repo = tp_base_connection_get_handles (
      (TpBaseConnection*) priv->conn, TP_HANDLE_TYPE_CONTACT);
 
@@ -498,23 +511,35 @@ start_stream_direct (SalutTubeStream *self,
       GibberBytestreamIface *bytestream;
       bytestream = salut_direct_bytestream_manager_new_stream (
           direct_bytestream_mgr,
-          contact);
+          contact, priv->port);
 
       if (bytestream == NULL)
         {
           DEBUG ("initiator refused new bytestream");
+          result = FALSE;
 
           close (fd);
         }
       else
         {
           DEBUG ("extra bytestream accepted");
+          result = TRUE;
 
           g_hash_table_insert (priv->bytestream_to_fd,
               g_object_ref (bytestream), GUINT_TO_POINTER (fd));
 
           g_signal_connect (bytestream, "state-changed",
               G_CALLBACK (extra_bytestream_state_changed_cb), self);
+
+          /* Let's start the initiation of the stream */
+          if (!gibber_bytestream_iface_initiate (bytestream))
+            {
+              /* Initiation failed. */
+              gibber_bytestream_iface_close (bytestream, NULL);
+              result = FALSE;
+              close (fd);
+            }
+
         }
 
       g_object_unref (contact);
@@ -613,6 +638,8 @@ new_connection_to_socket (SalutTubeStream *self,
   SockAddr addr;
   socklen_t len;
 
+  DEBUG ("Called.");
+
   g_assert (priv->initiator == priv->self_handle);
 
   memset (&addr, 0, sizeof (addr));
@@ -1111,6 +1138,9 @@ salut_tube_stream_get_property (GObject *object,
       case PROP_XMPP_CONNECTION_MANAGER:
         g_value_set_object (value, priv->xmpp_connection_manager);
         break;
+      case PROP_PORT:
+        g_value_set_uint (value, priv->port);
+        break;
       default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
         break;
@@ -1182,6 +1212,9 @@ salut_tube_stream_set_property (GObject *object,
         priv->xmpp_connection_manager = g_value_get_object (value);
         g_object_ref (priv->xmpp_connection_manager);
         break;
+      case PROP_PORT:
+        priv->port = g_value_get_uint (value);
+        break;
       default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
         break;
@@ -1348,6 +1381,20 @@ salut_tube_stream_class_init (SalutTubeStreamClass *salut_tube_stream_class)
   g_object_class_install_property (object_class, PROP_XMPP_CONNECTION_MANAGER,
       param_spec);
 
+  param_spec = g_param_spec_uint (
+      "port",
+      "port on the initiator's CM",
+      "New stream in this tube will connect to the initiator's CM on this port"
+      " in case of 1-1 tube",
+      0,
+      G_MAXUINT32,
+      0,
+      G_PARAM_CONSTRUCT_ONLY |
+      G_PARAM_READWRITE |
+      G_PARAM_STATIC_NICK |
+      G_PARAM_STATIC_BLURB);
+  g_object_class_install_property (object_class, PROP_PORT, param_spec);
+
   signals[OPENED] =
     g_signal_new ("opened",
                   G_OBJECT_CLASS_TYPE (salut_tube_stream_class),
@@ -1402,6 +1449,24 @@ data_received_cb (GibberBytestreamIface *bytestream,
     DEBUG ("sending failed: %s", error->message);
     g_error_free (error);
   }
+
+  if (!gibber_transport_buffer_is_empty (transport))
+    {
+      /* We >don't want to send more data while the buffer isn't empty */
+      /* FIXME: Should we move this as bytestream-iface method? */
+      if (GIBBER_IS_BYTESTREAM_OOB (bytestream))
+          {
+            DEBUG ("tube buffer isn't empty. Block the bytestream");
+            gibber_bytestream_oob_block_read (
+              GIBBER_BYTESTREAM_OOB (bytestream), TRUE);
+          }
+      else if (GIBBER_IS_BYTESTREAM_DIRECT (bytestream))
+          {
+            DEBUG ("tube buffer isn't empty. Block the bytestream");
+            gibber_bytestream_direct_block_read (
+              GIBBER_BYTESTREAM_DIRECT (bytestream), TRUE);
+          }
+    }
 }
 
 SalutTubeStream *
@@ -1413,7 +1478,8 @@ salut_tube_stream_new (SalutConnection *conn,
                         TpHandle initiator,
                         const gchar *service,
                         GHashTable *parameters,
-                        guint id)
+                        guint id,
+                        guint portnum)
 {
   return g_object_new (SALUT_TYPE_TUBE_STREAM,
       "connection", conn,
@@ -1425,6 +1491,7 @@ salut_tube_stream_new (SalutConnection *conn,
       "service", service,
       "parameters", parameters,
       "id", id,
+      "port", portnum,
       NULL);
 }
 
@@ -1578,6 +1645,8 @@ salut_tube_stream_add_bytestream (SalutTubeIface *tube,
   SalutTubeStream *self = SALUT_TUBE_STREAM (tube);
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
 
+  DEBUG ("Called.");
+
   if (priv->initiator != priv->self_handle)
     {
       DEBUG ("I'm not the initiator of this tube, can't accept "
diff --git a/src/tube-stream.h b/src/tube-stream.h
index 3472065..70d5567 100644
--- a/src/tube-stream.h
+++ b/src/tube-stream.h
@@ -63,7 +63,7 @@ GType salut_tube_stream_get_type (void);
 SalutTubeStream *salut_tube_stream_new (SalutConnection *conn,
     SalutXmppConnectionManager *xmpp_connection_manager, TpHandle handle,
     TpHandleType handle_type, TpHandle self_handle, TpHandle initiator,
-    const gchar *service, GHashTable *parameters, guint id);
+    const gchar *service, GHashTable *parameters, guint id, guint portnum);
 
 gboolean salut_tube_stream_check_params (TpSocketAddressType address_type,
     const GValue *address, TpSocketAccessControl access_control,
-- 
1.5.6.5




More information about the Telepathy-commits mailing list