[Telepathy-commits] [telepathy-salut/master] DirectBytestreamManager listens for connections (initiator side). Prototype to make new streams (recipient side)

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:20 PST 2008


20080729104508-a41c0-b69e618eb934e92af6a03c988d9f8f589e5f0842.gz
---
 src/salut-direct-bytestream-manager.c |  220 ++++++++++++++++++++++++++++++++-
 src/salut-direct-bytestream-manager.h |    6 +
 src/salut-tubes-channel.c             |   16 +++-
 src/tube-stream.c                     |   92 +++++++++++++-
 4 files changed, 325 insertions(+), 9 deletions(-)

diff --git a/src/salut-direct-bytestream-manager.c b/src/salut-direct-bytestream-manager.c
index 0ecf2fa..e8551a0 100644
--- a/src/salut-direct-bytestream-manager.c
+++ b/src/salut-direct-bytestream-manager.c
@@ -22,12 +22,15 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <unistd.h>
+#include <errno.h>
 
 #include <gibber/gibber-xmpp-stanza.h>
 #include <gibber/gibber-namespaces.h>
 #include <gibber/gibber-xmpp-error.h>
 #include <gibber/gibber-iq-helper.h>
 #include <gibber/gibber-bytestream-direct.h>
+#include <gibber/gibber-util.h>
 
 #include "salut-im-manager.h"
 #include "salut-muc-manager.h"
@@ -57,6 +60,17 @@ struct _SalutDirectBytestreamManagerPrivate
   SalutXmppConnectionManager *xmpp_connection_manager;
   gchar *host_name_fqdn;
 
+  /* (GibberTransport *) -> (GibberBytestreamIface *) */
+  GHashTable *transport_to_bytestream;
+  /* (GibberBytestreamIface *) -> (GibberTransport *) */
+  GHashTable *bytestream_to_transport;
+  /* (GibberBytestreamIface *) -> int */
+  GHashTable *bytestream_to_fd;
+
+  /* guint id -> guint listener_watch
+   * When used by stream tubes, the id is the tube_id */
+  GHashTable *listener_watchs;
+
   gboolean dispose_has_run;
 };
 
@@ -181,6 +195,8 @@ salut_direct_bytestream_manager_constructor (GType type,
   g_assert (priv->xmpp_connection_manager != NULL);
   g_assert (priv->host_name_fqdn != NULL);
 
+  priv->listener_watchs = g_hash_table_new (NULL, NULL);
+
   return obj;
 }
 
@@ -241,6 +257,178 @@ salut_direct_bytestream_manager_new (SalutConnection *conn,
       NULL);
 }
 
+/* transport between the 2 CM, called on the initiator's side */
+static void
+set_transport (SalutDirectBytestreamManager *self,
+               GibberTransport *transport)
+{
+  //SalutDirectBytestreamManagerPrivate *priv =
+  //    SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+
+  /* the transport_handler is called when something is received in the
+   * transport */
+  //gibber_transport_set_handler (transport, transport_handler, self);
+
+  /*
+  g_signal_connect (transport, "connected",
+      G_CALLBACK (transport_connected_cb), self);
+  g_signal_connect (transport, "disconnected",
+      G_CALLBACK (transport_disconnected_cb), self);
+  g_signal_connect (priv->transport, "buffer-empty",
+      G_CALLBACK (transport_buffer_empty_cb), self);
+      */
+}
+
+/* callback when receiving a connection from the remote CM */
+static gboolean
+listener_io_in_cb (GIOChannel *source,
+                   GIOCondition condition,
+                   gpointer user_data)
+{
+  SalutDirectBytestreamManager *self = SALUT_DIRECT_BYTESTREAM_MANAGER
+      (user_data);
+  int listen_fd, fd, ret;
+  char host[NI_MAXHOST];
+  char port[NI_MAXSERV];
+  struct sockaddr_storage addr;
+  socklen_t addrlen = sizeof (struct sockaddr_storage);
+  GibberLLTransport *ll_transport;
+
+  listen_fd = g_io_channel_unix_get_fd (source);
+  fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
+  gibber_normalize_address (&addr);
+
+  ret = getnameinfo ((struct sockaddr *) &addr, addrlen,
+      host, NI_MAXHOST, port, NI_MAXSERV,
+      NI_NUMERICHOST | NI_NUMERICSERV);
+
+  /* check_addr_func */
+
+  if (ret == 0)
+    DEBUG("New connection from %s port %s", host, port);
+  else
+    DEBUG("New connection..");
+
+  ll_transport = gibber_ll_transport_new ();
+  set_transport (self, GIBBER_TRANSPORT (ll_transport));
+  gibber_ll_transport_open_fd (ll_transport, fd);
+
+  return FALSE;
+}
+
+
+/* the id is an opaque id used by the user of the SalutDirectBytestreamManager
+ * object. When used by stream tubes, the id is the tube.
+ *
+ * return: port
+ */
+static int
+start_listen_for_connection (SalutDirectBytestreamManager *self,
+                             gpointer id)
+{
+  SalutDirectBytestreamManagerPrivate *priv;
+  priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+  GIOChannel *listener;
+  guint *listener_watch;
+  int port;
+  int fd = -1, ret, yes = 1;
+  struct addrinfo req, *ans = NULL;
+  struct sockaddr *addr;
+  struct sockaddr_in addr4;
+  struct sockaddr_in6 addr6;
+  socklen_t len;
+  #define BACKLOG 1
+
+  memset (&req, 0, sizeof (req));
+  req.ai_flags = AI_PASSIVE;
+  req.ai_family = AF_UNSPEC;
+  req.ai_socktype = SOCK_STREAM;
+  req.ai_protocol = IPPROTO_TCP;
+
+  ret = getaddrinfo (NULL, "0", &req, &ans);
+  if (ret != 0)
+    {
+      DEBUG ("getaddrinfo failed: %s", gai_strerror (ret));
+      goto error;
+    }
+
+  fd = socket (ans->ai_family, ans->ai_socktype, ans->ai_protocol);
+  if (fd == -1)
+    {
+      DEBUG ("socket failed: %s", g_strerror (errno));
+      goto error;
+    }
+
+  ret = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof (int));
+  if (ret == -1)
+    {
+      DEBUG ("setsockopt failed: %s", g_strerror (errno));
+      goto error;
+    }
+
+  ret = bind (fd, ans->ai_addr, ans->ai_addrlen);
+  if (ret  < 0)
+    {
+      DEBUG ("bind failed: %s", g_strerror (errno));
+      goto error;
+    }
+
+  if (ans->ai_family == AF_INET)
+    {
+      len = sizeof (struct sockaddr_in);
+      addr = (struct sockaddr *) &addr4;
+    }
+  else
+    {
+      len = sizeof (struct sockaddr_in6);
+      addr = (struct sockaddr *) &addr6;
+    }
+
+  if (getsockname (fd, addr, &len) == -1)
+  {
+    DEBUG ("getsockname failed: %s", g_strerror (errno));
+    goto error;
+  }
+
+  if (ans->ai_family == AF_INET)
+    {
+      port = ntohs (addr4.sin_port);
+    }
+  else
+    {
+      port = ntohs (addr6.sin6_port);
+    }
+
+  ret = listen (fd, BACKLOG);
+  if (ret == -1)
+    {
+      DEBUG ("listen failed: %s", g_strerror (errno));
+      goto error;
+    }
+
+  DEBUG ("listen on %s:%d", priv->host_name_fqdn, port);
+
+  listener = g_io_channel_unix_new (fd);
+  g_io_channel_set_close_on_unref (listener, TRUE);
+  listener_watch = g_malloc (sizeof (*listener_watch));
+  *listener_watch = g_io_add_watch (listener, G_IO_IN,
+      listener_io_in_cb, self);
+
+  /* add id->listener_watch in priv->listener_watchs */
+  g_hash_table_insert (priv->listener_watchs, id, listener_watch);
+
+  freeaddrinfo (ans);
+  return port;
+
+error:
+  if (fd > 0)
+    close (fd);
+
+  if (ans != NULL)
+    freeaddrinfo (ans);
+  return -1;
+}
+
 void
 salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
                                    SalutContact *contact,
@@ -249,11 +437,35 @@ salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
   SalutDirectBytestreamManagerPrivate *priv;
   priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
 
-  g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
-      "xmpp-connection", connection,
+  DEBUG ("salut_direct_new_listening_stream: Called.");
+
+  start_listen_for_connection (self, NULL);
+
+}
+
+GibberBytestreamIface *
+salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
+                                            SalutContact *contact)
+{
+  GibberBytestreamIface *bytestream;
+  SalutDirectBytestreamManagerPrivate *priv;
+
+  priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+
+  bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
       "state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
-      "self-id", priv->connection->name,
-      "peer-id", contact->name,
       NULL);
+
+  g_assert (bytestream != NULL);
+
+  /* Let's start the initiation of the stream */
+  if (!gibber_bytestream_iface_initiate (bytestream))
+    {
+      /* Initiation failed. */
+      gibber_bytestream_iface_close (bytestream, NULL);
+      bytestream = NULL;
+    }
+
+  return bytestream;
 }
 
diff --git a/src/salut-direct-bytestream-manager.h b/src/salut-direct-bytestream-manager.h
index 69ad1d5..ccce17e 100644
--- a/src/salut-direct-bytestream-manager.h
+++ b/src/salut-direct-bytestream-manager.h
@@ -66,9 +66,15 @@ SalutDirectBytestreamManager *
 salut_direct_bytestream_manager_new (SalutConnection *connection,
     const gchar *host_name_fqdn);
 
+/* To be used on the CM-initiator side, to receive connections from the remote
+ * CM */
 void
 salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
     SalutContact *contact, GibberXmppConnection *connection);
 
+/* To be used on the CM-receptor side, to make a new connection */
+GibberBytestreamIface *
+salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
+    SalutContact *contact);
 
 #endif /* #ifndef __SALUT_DIRECT_BYTESTREAM_MANAGER_H__*/
diff --git a/src/salut-tubes-channel.c b/src/salut-tubes-channel.c
index afcece8..e5645b7 100644
--- a/src/salut-tubes-channel.c
+++ b/src/salut-tubes-channel.c
@@ -46,6 +46,7 @@
 #include "salut-contact.h"
 #include "salut-muc-channel.h"
 #include "salut-xmpp-connection-manager.h"
+#include "salut-direct-bytestream-manager.h"
 #include "tube-iface.h"
 #include "tube-dbus.h"
 #include "tube-stream.h"
@@ -1795,6 +1796,20 @@ _send_channel_iq_tube (gpointer key,
 
       gchar *tube_id_str = g_strdup_printf ("%d", tube_id);
 
+      /* listen for future connections from the remote CM before sending the
+       * iq */
+      SalutDirectBytestreamManager *direct_bytestream_mgr;
+      g_assert (priv->conn != NULL);
+      g_object_get (priv->conn,
+          "direct-bytestream-manager", &direct_bytestream_mgr,
+          NULL);
+      g_assert (direct_bytestream_mgr != NULL);
+
+      salut_direct_new_listening_stream (direct_bytestream_mgr, priv->contact,
+          priv->xmpp_connection);
+      g_object_unref (direct_bytestream_mgr);
+
+
       contact_repo = tp_base_connection_get_handles (
          (TpBaseConnection*) priv->conn, TP_HANDLE_TYPE_CONTACT);
 
@@ -1865,7 +1880,6 @@ _send_channel_iq_tubes (SalutTubesChannel *self)
     }
 
   g_hash_table_foreach (priv->tubes, _send_channel_iq_tube, self);
-
 }
 
 
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 171dde8..7d838c1 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -48,6 +48,7 @@
 #include "salut-connection.h"
 #include "tube-iface.h"
 #include "salut-si-bytestream-manager.h"
+#include "salut-direct-bytestream-manager.h"
 #include "salut-contact-manager.h"
 #include "salut-xmpp-connection-manager.h"
 
@@ -90,7 +91,7 @@ static guint signals[LAST_SIGNAL] = {0};
 enum
 {
   PROP_CONNECTION = 1,
-  PROP_CHANNEL,
+  //PROP_CHANNEL,
   PROP_HANDLE,
   PROP_HANDLE_TYPE,
   PROP_SELF_HANDLE,
@@ -116,12 +117,17 @@ struct _SalutTubeStreamPrivate
   TpHandleType handle_type;
   TpHandle self_handle;
   guint id;
+
+  /* Bytestreams for MUC (using stream initiation) */
+
   /* (GibberTransport *) -> (GibberBytestreamIface *) */
   GHashTable *transport_to_bytestream;
   /* (GibberBytestreamIface *) -> (GibberTransport *) */
   GHashTable *bytestream_to_transport;
   /* (GibberBytestreamIface *) -> int */
   GHashTable *bytestream_to_fd;
+
+
   TpHandle initiator;
   gchar *service;
   GHashTable *parameters;
@@ -303,6 +309,7 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
 struct _extra_bytestream_negotiate_cb_data
 {
   SalutTubeStream *self;
+  /* file descriptor of the connection from the local application */
   gint fd;
 };
 
@@ -447,6 +454,79 @@ start_stream_initiation (SalutTubeStream *self,
   return result;
 }
 
+static gboolean
+start_stream_direct (SalutTubeStream *self,
+                     gint fd,
+                     GError **error)
+{
+  SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+  TpHandleRepoIface *contact_repo;
+  const gchar *jid;
+  gboolean result;
+  struct _extra_bytestream_negotiate_cb_data *data;
+  SalutContact *contact;
+  SalutContactManager *contact_mgr;
+  SalutDirectBytestreamManager *direct_bytestream_mgr;
+
+  g_assert (priv->handle_type == TP_HANDLE_TYPE_CONTACT);
+
+  contact_repo = tp_base_connection_get_handles (
+     (TpBaseConnection*) priv->conn, TP_HANDLE_TYPE_CONTACT);
+
+  jid = tp_handle_inspect (contact_repo, priv->initiator);
+
+  data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
+  data->self = self;
+  data->fd = fd;
+
+  g_object_get (priv->conn,
+      "direct-bytestream-manager", &direct_bytestream_mgr,
+      "contact-manager", &contact_mgr,
+      NULL);
+  g_assert (direct_bytestream_mgr != NULL);
+  g_assert (contact_mgr != NULL);
+
+  contact = salut_contact_manager_get_contact (contact_mgr, priv->initiator);
+  if (contact == NULL)
+    {
+      result = FALSE;
+      g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
+          "can't find contact with handle %d", priv->initiator);
+    }
+  else
+    {
+      GibberBytestreamIface *bytestream;
+      bytestream = salut_direct_bytestream_manager_new_stream (
+          direct_bytestream_mgr,
+          contact);
+
+      if (bytestream == NULL)
+        {
+          DEBUG ("initiator refused new bytestream");
+
+          close (fd);
+        }
+      else
+        {
+          DEBUG ("extra bytestream accepted");
+
+          g_hash_table_insert (priv->bytestream_to_fd,
+              g_object_ref (bytestream), GUINT_TO_POINTER (fd));
+
+          g_signal_connect (bytestream, "state-changed",
+              G_CALLBACK (extra_bytestream_state_changed_cb), self);
+        }
+
+      g_object_unref (contact);
+    }
+
+  g_object_unref (direct_bytestream_mgr);
+  g_object_unref (contact_mgr);
+
+  return result;
+}
+
+/* callback for listening connections from the local application */
 gboolean
 listen_cb (GIOChannel *source,
            GIOCondition condition,
@@ -506,9 +586,11 @@ listen_cb (GIOChannel *source,
    */
   if (priv->handle_type == TP_HANDLE_TYPE_CONTACT)
     {
-      /* TODO: To be implemented */
-      DEBUG ("SalutDirectBytestreamManager to be implemented");
-      close (fd);
+      if (!start_stream_direct (self, fd, NULL))
+        {
+          DEBUG ("closing new client connection");
+          close (fd);
+        }
     }
   else
     {
@@ -1239,6 +1321,7 @@ salut_tube_stream_class_init (SalutTubeStreamClass *salut_tube_stream_class)
   g_object_class_install_property (object_class, PROP_ACCESS_CONTROL_PARAM,
       param_spec);
 
+  /*
   param_spec = g_param_spec_object (
       "channel",
       "SalutTubesChannel object",
@@ -1250,6 +1333,7 @@ salut_tube_stream_class_init (SalutTubeStreamClass *salut_tube_stream_class)
       G_PARAM_STATIC_BLURB);
   g_object_class_install_property (object_class, PROP_CHANNEL,
       param_spec);
+   */
 
   param_spec = g_param_spec_object (
       "xmpp-connection-manager",
-- 
1.5.6.5




More information about the Telepathy-commits mailing list