[Telepathy-commits] [telepathy-salut/master] SalutTubeStream: use GibberListener to listen connections from the local application

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:39 PST 2008


---
 src/tube-stream.c |  297 ++++++++++++++--------------------------------------
 1 files changed, 80 insertions(+), 217 deletions(-)

diff --git a/src/tube-stream.c b/src/tube-stream.c
index d104d3b..7d06c1f 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -41,6 +41,7 @@
 #include <gibber/gibber-transport.h>
 #include <gibber/gibber-fd-transport.h>
 #include <gibber/gibber-iq-helper.h>
+#include <gibber/gibber-listener.h>
 
 #define DEBUG_FLAG DEBUG_TUBES
 
@@ -132,9 +133,6 @@ struct _SalutTubeStreamPrivate
   GHashTable *transport_to_bytestream;
   /* (GibberBytestreamIface *) -> (GibberTransport *) */
   GHashTable *bytestream_to_transport;
-  /* (GibberBytestreamIface *) -> int */
-  GHashTable *bytestream_to_fd;
-
 
   TpHandle initiator;
   gchar *service;
@@ -145,8 +143,11 @@ struct _SalutTubeStreamPrivate
   GValue *address;
   TpSocketAccessControl access_control;
   GValue *access_control_param;
+
+  GibberListener *listener;
   GIOChannel *listen_io_channel;
   guint listen_io_channel_source_id;
+
   gboolean closed;
 
   /* we need to send an iq stanza to close the tube on 1-1 tube */
@@ -245,7 +246,6 @@ remove_transport (SalutTubeStream *self,
   g_hash_table_remove (priv->transport_to_bytestream, transport);
 
   g_hash_table_remove (priv->bytestream_to_transport, bytestream);
-  g_hash_table_remove (priv->bytestream_to_fd, bytestream);
 }
 
 static void
@@ -328,8 +328,7 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
 
   if (state == GIBBER_BYTESTREAM_STATE_OPEN)
     {
-      int fd;
-      GibberLLTransport *ll_transport;
+      GibberTransport *transport;
 
       DEBUG ("extra bytestream open");
 
@@ -338,13 +337,11 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
       g_signal_connect (bytestream, "write-blocked",
           G_CALLBACK (bytestream_write_blocked_cb), self);
 
-      fd = GPOINTER_TO_INT (g_hash_table_lookup (priv->bytestream_to_fd,
-            bytestream));
-      g_assert (fd != 0);
+      transport = g_hash_table_lookup (priv->bytestream_to_transport,
+            bytestream);
+      g_assert (transport != NULL);
 
-      ll_transport = gibber_ll_transport_new ();
-      gibber_ll_transport_open_fd (ll_transport, fd);
-      add_transport (self, GIBBER_TRANSPORT (ll_transport), bytestream);
+      add_transport (self, transport, bytestream);
     }
   else if (state == GIBBER_BYTESTREAM_STATE_CLOSED)
     {
@@ -371,8 +368,8 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
 struct _extra_bytestream_negotiate_cb_data
 {
   SalutTubeStream *self;
-  /* file descriptor of the connection from the local application */
-  gint fd;
+  /* transport from the local application */
+  GibberTransport *transport;
 };
 
 static void
@@ -388,14 +385,14 @@ extra_bytestream_negotiate_cb (GibberBytestreamIface *bytestream,
     {
       DEBUG ("initiator refused new bytestream");
 
-      close (data->fd);
+      g_object_unref (data->transport);
       return;
     }
 
   DEBUG ("extra bytestream accepted");
 
-  g_hash_table_insert (priv->bytestream_to_fd, g_object_ref (bytestream),
-      GUINT_TO_POINTER (data->fd));
+  g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+      data->transport);
 
   g_signal_connect (bytestream, "state-changed",
       G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -426,7 +423,7 @@ generate_stream_id (SalutTubeStream *self)
 
 static gboolean
 start_stream_initiation (SalutTubeStream *self,
-                         gint fd,
+                         GibberTransport *transport,
                          GError **error)
 {
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
@@ -471,7 +468,7 @@ start_stream_initiation (SalutTubeStream *self,
 
   data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
   data->self = self;
-  data->fd = fd;
+  data->transport = g_object_ref (transport);
 
   g_object_get (priv->conn,
       "si-bytestream-manager", &si_bytestream_mgr,
@@ -486,6 +483,7 @@ start_stream_initiation (SalutTubeStream *self,
       result = FALSE;
       g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
           "can't find contact with handle %d", priv->initiator);
+      g_object_unref (transport);
     }
   else
     {
@@ -513,7 +511,7 @@ start_stream_initiation (SalutTubeStream *self,
 /* start a new stream in a tube from the recipient side */
 static gboolean
 start_stream_direct (SalutTubeStream *self,
-                     gint fd,
+                     GibberTransport *transport,
                      GError **error)
 {
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
@@ -522,7 +520,6 @@ start_stream_direct (SalutTubeStream *self,
   struct _extra_bytestream_negotiate_cb_data *data;
   SalutContact *contact;
   SalutContactManager *contact_mgr;
-  SalutDirectBytestreamManager *direct_bytestream_mgr;
   GibberBytestreamIface *bytestream;
 
   g_assert (priv->handle_type == TP_HANDLE_TYPE_CONTACT);
@@ -534,13 +531,11 @@ start_stream_direct (SalutTubeStream *self,
 
   data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
   data->self = self;
-  data->fd = fd;
+  data->transport = g_object_ref (transport);
 
   g_object_get (priv->conn,
-      "direct-bytestream-manager", &direct_bytestream_mgr,
       "contact-manager", &contact_mgr,
       NULL);
-  g_assert (direct_bytestream_mgr != NULL);
   g_assert (contact_mgr != NULL);
 
   contact = salut_contact_manager_get_contact (contact_mgr, priv->initiator);
@@ -549,31 +544,20 @@ start_stream_direct (SalutTubeStream *self,
       g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
           "can't find contact with handle %d", priv->initiator);
 
-      g_object_unref (direct_bytestream_mgr);
-      g_object_unref (contact_mgr);
-
-      return FALSE;
-    }
-
-  bytestream = salut_direct_bytestream_manager_new_stream (
-      direct_bytestream_mgr, contact, priv->port);
-
-  if (bytestream == NULL)
-    {
-      DEBUG ("initiator refused new bytestream");
-      close (fd);
-
-      g_object_unref (contact);
-      g_object_unref (direct_bytestream_mgr);
+      g_object_unref (transport);
       g_object_unref (contact_mgr);
 
       return FALSE;
     }
 
-  DEBUG ("extra bytestream accepted");
+  bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
+      "addresses", salut_contact_get_addresses (contact),
+      "state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
+      "peer-id", contact->name,
+      "port", priv->port,
+      NULL);
 
-  g_hash_table_insert (priv->bytestream_to_fd,
-      g_object_ref (bytestream), GUINT_TO_POINTER (fd));
+  g_assert (bytestream != NULL);
 
   g_signal_connect (bytestream, "state-changed",
       G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -583,73 +567,30 @@ start_stream_direct (SalutTubeStream *self,
     {
       /* Initiation failed. */
       gibber_bytestream_iface_close (bytestream, NULL);
-      close (fd);
 
+      g_object_unref (transport);
       g_object_unref (contact);
-      g_object_unref (direct_bytestream_mgr);
       g_object_unref (contact_mgr);
 
       return FALSE;
     }
 
   g_object_unref (contact);
-  g_object_unref (direct_bytestream_mgr);
   g_object_unref (contact_mgr);
 
   return TRUE;
 }
 
 /* callback for listening connections from the local application */
-gboolean
-listen_cb (GIOChannel *source,
-           GIOCondition condition,
-           gpointer data)
+static void
+new_connection_cb (GibberListener *listener,
+                   GibberTransport *transport,
+                   struct sockaddr_storage *addr,
+                   guint size,
+                   gpointer user_data)
 {
-  SalutTubeStream *self = SALUT_TUBE_STREAM (data);
+  SalutTubeStream *self = SALUT_TUBE_STREAM (user_data);
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
-  int fd, listen_fd;
-  SockAddr addr;
-  socklen_t addrlen;
-  int flags;
-
-  listen_fd = g_io_channel_unix_get_fd (source);
-
-  if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
-    {
-      addrlen = sizeof (addr.un);
-    }
-  else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
-    {
-      addrlen = sizeof (addr.ipv4);
-    }
-  else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
-    {
-      addrlen = sizeof (addr.ipv6);
-    }
-  else
-    {
-      g_assert_not_reached ();
-    }
-
-  fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
-  if (fd == -1)
-    {
-      DEBUG ("Error accepting socket: %s", g_strerror (errno));
-      return TRUE;
-    }
-
-  DEBUG ("connection from client");
-
-  /* Set socket non blocking */
-  flags = fcntl (fd, F_GETFL, 0);
-  if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1)
-    {
-      DEBUG ("Can't set socket non blocking: %s", g_strerror (errno));
-      close (fd);
-      return TRUE;
-    }
-
-  DEBUG ("request new bytestream");
 
   /* Streams in MUC tubes are established with stream initiation (XEP-0095).
    * We use SalutSiBytestreamManager.
@@ -659,22 +600,18 @@ listen_cb (GIOChannel *source,
    */
   if (priv->handle_type == TP_HANDLE_TYPE_CONTACT)
     {
-      if (!start_stream_direct (self, fd, NULL))
+      if (!start_stream_direct (self, transport, NULL))
         {
           DEBUG ("closing new client connection");
-          close (fd);
         }
     }
   else
     {
-      if (!start_stream_initiation (self, fd, NULL))
+      if (!start_stream_initiation (self, transport, NULL))
         {
           DEBUG ("closing new client connection");
-          close (fd);
         }
     }
-
-  return TRUE;
 }
 
 static gboolean
@@ -682,6 +619,7 @@ new_connection_to_socket (SalutTubeStream *self,
                           GibberBytestreamIface *bytestream)
 {
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+  GibberFdTransport *transport;
   int fd;
   SockAddr addr;
   socklen_t len;
@@ -782,8 +720,11 @@ new_connection_to_socket (SalutTubeStream *self,
     }
   DEBUG ("Connected to socket");
 
-  g_hash_table_insert (priv->bytestream_to_fd, g_object_ref (bytestream),
-      GUINT_TO_POINTER (fd));
+  transport = g_object_new (GIBBER_TYPE_FD_TRANSPORT, NULL);
+  gibber_fd_transport_set_fd (transport, fd);
+
+  g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+      g_object_ref (transport));
 
   g_signal_connect (bytestream, "state-changed",
       G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -796,7 +737,6 @@ tube_stream_open (SalutTubeStream *self,
                   GError **error)
 {
   SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
-  int fd;
 
   DEBUG ("called");
 
@@ -808,141 +748,84 @@ tube_stream_open (SalutTubeStream *self,
   /* We didn't create this tube so it doesn't have
    * a socket associated with it. Let's create one */
   g_assert (priv->address == NULL);
+  g_assert (priv->listener == NULL);
+  priv->listener = gibber_listener_new ();
+
+  g_signal_connect (priv->listener, "new-connection",
+      G_CALLBACK (new_connection_cb), self);
 
   if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
     {
       GArray *array;
-      struct sockaddr_un addr;
       gchar suffix[8];
-
-      fd = socket (PF_UNIX, SOCK_STREAM, 0);
-      if (fd == -1)
-        {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error creating socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      memset (&addr, 0, sizeof (addr));
-      addr.sun_family = PF_UNIX;
+      gchar *path;
+      int ret;
 
       generate_ascii_string (8, suffix);
-      snprintf (addr.sun_path, sizeof (addr.sun_path) - 1,
-        "/tmp/stream-salut-%.8s", suffix);
+      path = g_strdup_printf ("/tmp/stream-salut-%.8s", suffix);
 
-      DEBUG ("create socket: %s", addr.sun_path);
+      DEBUG ("create socket: %s", path);
 
-      array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (
-            addr.sun_path));
-      g_array_insert_vals (array, 0, addr.sun_path, strlen (addr.sun_path));
+      array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (path));
+      g_array_insert_vals (array, 0, path, strlen (path));
 
       priv->address = tp_g_value_slice_new (DBUS_TYPE_G_UCHAR_ARRAY);
       g_value_set_boxed (priv->address, array);
 
       g_array_free (array, TRUE);
+      g_free (path);
 
-      if (bind (fd, (struct sockaddr *) &addr, sizeof (addr)) == -1)
+      ret = gibber_listener_listen_socket (priv->listener, path, FALSE,
+          error);
+      if (ret != TRUE)
         {
-          DEBUG ("Error binding socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error binding socket: %s", g_strerror (errno));
+          g_assert (error != NULL && *error != NULL);
+          DEBUG ("Error listening on socket %s: %s", path, (*error)->message);
           return FALSE;
         }
     }
   else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
     {
-      struct sockaddr_in addr;
-      socklen_t len;
-
-      addr.sin_family = AF_INET;
-      addr.sin_port = 0;         /* == ntohs (0) */
-      addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
-
-      len = sizeof (addr);
+      int port;
 
-      fd = socket (PF_INET, SOCK_STREAM, 0);
-      if (fd == -1)
+      port = gibber_listener_listen_tcp_loopback_af (priv->listener, 0,
+          GIBBER_AF_IPV4, error);
+      if (port <= 0)
         {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error creating socket: %s", g_strerror (errno));
+          g_assert (error != NULL && *error != NULL);
+          DEBUG ("Error listening on socket: %s", (*error)->message);
           return FALSE;
         }
 
-      if (bind (fd, (struct sockaddr *) &addr, len) == -1)
-        {
-          DEBUG ("Error binding socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error binding socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
-        {
-          DEBUG ("getsockname failed: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "getsockname failed: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      DEBUG ("create socket %s:%u", "127.0.0.1", ntohs (addr.sin_port));
-
       priv->address = tp_g_value_slice_new (SOCKET_ADDRESS_IPV4_TYPE);
       g_value_take_boxed (priv->address,
           dbus_g_type_specialized_construct (SOCKET_ADDRESS_IPV4_TYPE));
 
       dbus_g_type_struct_set (priv->address,
           0, "127.0.0.1",
-          1, ntohs (addr.sin_port),
+          1, port,
           G_MAXUINT);
     }
   else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
     {
-      struct sockaddr_in6 addr;
-      socklen_t len;
-      struct in6_addr loopback_addr = IN6ADDR_LOOPBACK_INIT;
-
-      addr.sin6_family = AF_INET6;
-      addr.sin6_port = 0;         /* == ntohs (0) */
-      addr.sin6_addr = loopback_addr;
-
-      len = sizeof (addr);
+      int port;
 
-      fd = socket (PF_INET6, SOCK_STREAM, 0);
-      if (fd == -1)
+      port = gibber_listener_listen_tcp_loopback_af (priv->listener, 0,
+          GIBBER_AF_IPV6, error);
+      if (port <= 0)
         {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error creating socket: %s", g_strerror (errno));
+          g_assert (error != NULL && *error != NULL);
+          DEBUG ("Error listening on socket: %s", (*error)->message);
           return FALSE;
         }
 
-      if (bind (fd, (struct sockaddr *) &addr, len) == -1)
-        {
-          DEBUG ("Error binding socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error binding socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
-        {
-          DEBUG ("getsockname failed: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "getsockname failed: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      DEBUG ("create socket %s:%u", "::1", ntohs (addr.sin6_port));
-
       priv->address = tp_g_value_slice_new (SOCKET_ADDRESS_IPV6_TYPE);
       g_value_take_boxed (priv->address,
           dbus_g_type_specialized_construct (SOCKET_ADDRESS_IPV6_TYPE));
 
       dbus_g_type_struct_set (priv->address,
           0, "::1",
-          1, ntohs (addr.sin6_port),
+          1, port,
           G_MAXUINT);
     }
   else
@@ -950,22 +833,6 @@ tube_stream_open (SalutTubeStream *self,
       g_assert_not_reached ();
     }
 
-  if (listen (fd, 5) == -1)
-    {
-      DEBUG ("Error listening socket: %s", g_strerror (errno));
-      g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-          "Error listening socket: %s", g_strerror (errno));
-      return FALSE;
-    }
-
-  priv->listen_io_channel = g_io_channel_unix_new (fd);
-  g_io_channel_set_encoding (priv->listen_io_channel, NULL, NULL);
-  g_io_channel_set_buffered (priv->listen_io_channel, FALSE);
-  g_io_channel_set_close_on_unref (priv->listen_io_channel, TRUE);
-
-  priv->listen_io_channel_source_id = g_io_add_watch (priv->listen_io_channel,
-      G_IO_IN, listen_cb, self);
-
   return TRUE;
 }
 
@@ -985,9 +852,6 @@ salut_tube_stream_init (SalutTubeStream *self)
       g_direct_equal, (GDestroyNotify) g_object_unref,
       (GDestroyNotify) g_object_unref);
 
-  priv->bytestream_to_fd = g_hash_table_new_full (g_direct_hash,
-      g_direct_equal, (GDestroyNotify) g_object_unref, NULL);
-
   priv->listen_io_channel = NULL;
   priv->listen_io_channel_source_id = 0;
   priv->address_type = TP_SOCKET_ADDRESS_TYPE_UNIX;
@@ -1023,7 +887,6 @@ close_each_extra_bytestream (gpointer key,
   gibber_transport_disconnect (transport);
 
   g_hash_table_remove (priv->bytestream_to_transport, bytestream);
-  g_hash_table_remove (priv->bytestream_to_fd, bytestream);
 
   return TRUE;
 }
@@ -1072,12 +935,6 @@ salut_tube_stream_dispose (GObject *object)
       priv->bytestream_to_transport = NULL;
     }
 
-  if (priv->bytestream_to_fd != NULL)
-    {
-      g_hash_table_destroy (priv->bytestream_to_fd);
-      priv->bytestream_to_fd = NULL;
-    }
-
   tp_handle_unref (contact_repo, priv->initiator);
 
   if (priv->listen_io_channel_source_id != 0)
@@ -1093,6 +950,12 @@ salut_tube_stream_dispose (GObject *object)
       priv->listen_io_channel = NULL;
     }
 
+  if (priv->listener != NULL)
+    {
+      g_object_unref (priv->listener);
+      priv->listener = NULL;
+    }
+
   if (priv->iq_helper != NULL)
     {
       g_object_unref (priv->iq_helper);
-- 
1.5.6.5




More information about the Telepathy-commits mailing list