[Telepathy-commits] [telepathy-gabble/master] use Gibber transports and listenner

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Mon Dec 15 07:17:41 PST 2008


---
 src/tube-stream.c |  678 ++++++++++++++++++++++-------------------------------
 1 files changed, 282 insertions(+), 396 deletions(-)

diff --git a/src/tube-stream.c b/src/tube-stream.c
index 4bcdf41..c6915c0 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -44,6 +44,12 @@
 
 #define DEBUG_FLAG GABBLE_DEBUG_TUBES
 
+#include <gibber/gibber-fd-transport.h>
+#include <gibber/gibber-listener.h>
+#include <gibber/gibber-tcp-transport.h>
+#include <gibber/gibber-transport.h>
+#include <gibber/gibber-unix-transport.h>
+
 #include "bytestream-factory.h"
 #include "bytestream-iface.h"
 #include "connection.h"
@@ -142,9 +148,28 @@ struct _GabbleTubeStreamPrivate
   TpHandleType handle_type;
   TpHandle self_handle;
   guint id;
-  GHashTable *fd_to_bytestreams;
-  GHashTable *bytestream_to_io_channel;
-  GHashTable *io_channel_to_watcher_source_id;
+
+  /* Bytestreams for tubes. One tube can have several bytestreams. The
+   * mapping between the tube bytestream and the transport to the local
+   * application is stored in the transport_to_bytestream and
+   * bytestream_to_transport fields. This is used both on initiator-side and
+   * on recipient-side. */
+
+  /* (GabbleBytestreamIface *) -> (GibberTransport *)
+   *
+   * The (b->t) is inserted as soon as they are created. On initiator side,
+   * we receive an incoming bytestream, create a transport and insert (b->t).
+   * On recipient side, we receive an incoming transport, create a bytestream
+   * and insert (b->t).
+   */
+  GHashTable *bytestream_to_transport;
+
+  /* (GibberTransport *) -> (GabbleBytestreamIface *)
+   *
+   * The (t->b) is inserted when the bytestream is open.
+   */
+  GHashTable *transport_to_bytestream;
+
   TpHandle initiator;
   gchar *service;
   GHashTable *parameters;
@@ -154,8 +179,10 @@ struct _GabbleTubeStreamPrivate
   GValue *address;
   TpSocketAccessControl access_control;
   GValue *access_control_param;
-  GIOChannel *listen_io_channel;
-  guint listen_io_channel_source_id;
+
+  /* listen for connections from local applications */
+  GibberListener *local_listener;
+
   gboolean closed;
 
   gboolean dispose_has_run;
@@ -181,72 +208,135 @@ generate_ascii_string (guint len,
     buf[i] = chars[g_random_int_range (0, 64)];
 }
 
-static gboolean
-data_to_read_on_socket_cb (GIOChannel *source,
-                           GIOCondition condition,
-                           gpointer data)
+static void
+transport_handler (GibberTransport *transport,
+                   GibberBuffer *data,
+                   gpointer user_data)
 {
-  GabbleTubeStream *self = GABBLE_TUBE_STREAM (data);
+  GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
   GabbleBytestreamIface *bytestream;
-  int fd;
-  gchar buffer[4096];
-  gsize num_read;
-  GIOStatus status;
-  GError *error = NULL;
-  gboolean result = TRUE;
 
-  if (! (condition & G_IO_IN))
-    return TRUE;
+  bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
+  if (bytestream == NULL)
+    {
+      DEBUG ("no open bytestream associated with this transport");
+      return;
+    }
 
-  fd = g_io_channel_unix_get_fd (source);
+  DEBUG ("read %" G_GSIZE_FORMAT " bytes from socket", data->length);
 
-  bytestream = g_hash_table_lookup (priv->fd_to_bytestreams,
-      GINT_TO_POINTER (fd));
+  gabble_bytestream_iface_send (bytestream, data->length,
+      (const gchar *) data->data);
+}
+
+static void
+transport_disconnected_cb (GibberTransport *transport,
+                           GabbleTubeStream *self)
+{
+  GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+  GabbleBytestreamIface *bytestream;
+
+  bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
   if (bytestream == NULL)
-    {
-      DEBUG ("no bytestream associated with this socket");
+    return;
 
-      g_hash_table_remove (priv->io_channel_to_watcher_source_id, source);
-      return FALSE;
-    }
+  DEBUG ("transport disconnected. close the extra bytestream");
 
-  memset (&buffer, 0, sizeof (buffer));
+  gabble_bytestream_iface_close (bytestream, NULL);
+}
 
-  status = g_io_channel_read_chars (source, buffer, 4096, &num_read, &error);
-  if (status == G_IO_STATUS_NORMAL)
-    {
-      DEBUG ("read %" G_GSIZE_FORMAT " bytes from socket", num_read);
+static void
+remove_transport (GabbleTubeStream *self,
+                  GabbleBytestreamIface *bytestream,
+                  GibberTransport *transport)
+{
+  GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
 
-      gabble_bytestream_iface_send (bytestream, num_read, buffer);
-      result = TRUE;
-    }
-  else if (status == G_IO_STATUS_EOF)
-    {
-      DEBUG ("error reading from socket: EOF");
+  DEBUG ("disconnect and remove transport");
+  g_signal_handlers_disconnect_matched (transport, G_SIGNAL_MATCH_DATA,
+      0, 0, NULL, NULL, self);
 
-      gabble_bytestream_iface_close (bytestream, NULL);
-      result = FALSE;
-    }
-  else if (status == G_IO_STATUS_AGAIN)
+  gibber_transport_disconnect (transport);
+
+  /* the transport may not be in transport_to_bytestream if the bytestream was
+   * not fully open */
+  g_hash_table_remove (priv->transport_to_bytestream, transport);
+
+  g_hash_table_remove (priv->bytestream_to_transport, bytestream);
+}
+
+static void
+transport_buffer_empty_cb (GibberTransport *transport,
+                           GabbleTubeStream *self)
+{
+  GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+  GabbleBytestreamIface *bytestream;
+  GabbleBytestreamState state;
+
+  bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
+  g_assert (bytestream != NULL);
+  g_object_get (bytestream, "state", &state, NULL);
+
+  if (state == GABBLE_BYTESTREAM_STATE_CLOSED)
     {
-      DEBUG ("error reading from socket: resource temporarily unavailable");
+      DEBUG ("buffer is now empty. Transport can be removed");
+      remove_transport (self, bytestream, transport);
+      return;
+    }
+
+  /* Buffer is empty so we can unblock the buffer if it was blocked */
+  DEBUG ("tube buffer is empty. Unblock the bytestream");
+  //gabble_bytestream_iface_block_read (bytestream, FALSE);
+  //TODO
+}
+
+static void
+add_transport (GabbleTubeStream *self,
+               GibberTransport *transport,
+               GabbleBytestreamIface *bytestream)
+{
+  GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+
+  gibber_transport_set_handler (transport, transport_handler, self);
+
+  g_hash_table_insert (priv->transport_to_bytestream,
+      g_object_ref (transport), g_object_ref (bytestream));
 
-      result = TRUE;
+  g_signal_connect (transport, "disconnected",
+      G_CALLBACK (transport_disconnected_cb), self);
+  g_signal_connect (transport, "buffer-empty",
+      G_CALLBACK (transport_buffer_empty_cb), self);
+
+  /* We can transfer transport's data; unblock it. */
+  gibber_transport_block_receiving (transport, FALSE);
+}
+
+#if 0
+static void
+bytestream_write_blocked_cb (GabbleBytestreamIface *bytestream,
+                             gboolean blocked,
+                             GabbleTubeStream *self)
+{
+  GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+  GibberTransport *transport;
+
+  transport = g_hash_table_lookup (priv->bytestream_to_transport,
+      bytestream);
+  g_assert (transport != NULL);
+
+  if (blocked)
+    {
+      DEBUG ("bytestream blocked, stop to read data from the tube socket");
     }
   else
     {
-      DEBUG ("error reading from socket: %s", error ? error->message : "");
-
-      gabble_bytestream_iface_close (bytestream, NULL);
-      result = FALSE;
+      DEBUG ("bytestream unblocked, restart to read data from the tube socket");
     }
 
-  if (error != NULL)
-    g_error_free (error);
-
-  return TRUE;
+  gibber_transport_block_receiving (transport, blocked);
 }
+#endif
 
 static void
 extra_bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
@@ -255,47 +345,56 @@ extra_bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
 {
   GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
-  GIOChannel *channel;
 
-  channel = g_hash_table_lookup (priv->bytestream_to_io_channel,
-      bytestream);
-  if (channel == NULL)
-    {
-      DEBUG ("no IO channel associated with the bytestream");
-      return;
-    }
+  DEBUG ("Called.");
 
   if (state == GABBLE_BYTESTREAM_STATE_OPEN)
     {
-      guint source_id;
+      GibberTransport *transport;
+
       DEBUG ("extra bytestream open");
 
       g_signal_connect (bytestream, "data-received",
           G_CALLBACK (data_received_cb), self);
+      /*
+       * TODO
+      g_signal_connect (bytestream, "write-blocked",
+          G_CALLBACK (bytestream_write_blocked_cb), self);
+          */
+
+      transport = g_hash_table_lookup (priv->bytestream_to_transport,
+            bytestream);
+      g_assert (transport != NULL);
 
-      source_id = g_io_add_watch (channel, G_IO_IN, data_to_read_on_socket_cb,
-          self);
-      g_hash_table_insert (priv->io_channel_to_watcher_source_id,
-          g_io_channel_ref (channel), GUINT_TO_POINTER (source_id));
+      add_transport (self, transport, bytestream);
     }
   else if (state == GABBLE_BYTESTREAM_STATE_CLOSED)
     {
-      int fd;
+      GibberTransport *transport;
 
       DEBUG ("extra bytestream closed");
-
-      fd = g_io_channel_unix_get_fd (channel);
-
-      g_hash_table_remove (priv->fd_to_bytestreams, GINT_TO_POINTER (fd));
-      g_hash_table_remove (priv->bytestream_to_io_channel, bytestream);
-      g_hash_table_remove (priv->io_channel_to_watcher_source_id, channel);
+      transport = g_hash_table_lookup (priv->bytestream_to_transport,
+          bytestream);
+      if (transport != NULL)
+        {
+          if (gibber_transport_buffer_is_empty (transport))
+            {
+              DEBUG ("Buffer is empty, we can remove the transport");
+              remove_transport (self, bytestream, transport);
+            }
+          else
+            {
+              DEBUG ("Wait buffer is empty before disconnect the transport");
+            }
+        }
     }
 }
 
 struct _extra_bytestream_negotiate_cb_data
 {
   GabbleTubeStream *self;
-  gint fd;
+  /* transport from the local application */
+  GibberTransport *transport;
 };
 
 static void
@@ -308,27 +407,19 @@ extra_bytestream_negotiate_cb (GabbleBytestreamIface *bytestream,
     (struct _extra_bytestream_negotiate_cb_data *) user_data;
   GabbleTubeStream *self = data->self;
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
-  GIOChannel *channel;
 
   if (bytestream == NULL)
     {
       DEBUG ("initiator refused new bytestream");
 
-      close (data->fd);
+      g_object_unref (data->transport);
       return;
     }
 
   DEBUG ("extra bytestream accepted");
 
-  channel = g_io_channel_unix_new (data->fd);
-  g_io_channel_set_encoding (channel, NULL, NULL);
-  g_io_channel_set_buffered (channel, FALSE);
-  g_io_channel_set_close_on_unref (channel, TRUE);
-
-  g_hash_table_insert (priv->fd_to_bytestreams, GINT_TO_POINTER (data->fd),
-      g_object_ref (bytestream));
-  g_hash_table_insert (priv->bytestream_to_io_channel,
-      g_object_ref (bytestream), channel);
+  g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+      data->transport);
 
   g_signal_connect (bytestream, "state-changed",
                 G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -338,7 +429,7 @@ extra_bytestream_negotiate_cb (GabbleBytestreamIface *bytestream,
 
 static gboolean
 start_stream_initiation (GabbleTubeStream *self,
-                         gint fd,
+                         GibberTransport *transport,
                          GError **error)
 {
   GabbleTubeStreamPrivate *priv;
@@ -421,7 +512,7 @@ start_stream_initiation (GabbleTubeStream *self,
 
   data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
   data->self = self;
-  data->fd = fd;
+  data->transport = g_object_ref (transport);
 
   result = gabble_bytestream_factory_negotiate_stream (
     priv->conn->bytestream_factory,
@@ -439,64 +530,27 @@ start_stream_initiation (GabbleTubeStream *self,
   return result;
 }
 
-static gboolean
-listen_cb (GIOChannel *source,
-           GIOCondition condition,
-           gpointer data)
+/* callback for listening connections from the local application */
+static void
+local_new_connection_cb (GibberListener *listener,
+                         GibberTransport *transport,
+                         struct sockaddr_storage *addr,
+                         guint size,
+                         gpointer user_data)
 {
-  GabbleTubeStream *self = GABBLE_TUBE_STREAM (data);
-  GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
-  int fd, listen_fd;
-  SockAddr addr;
-  socklen_t addrlen;
-  int flags;
-
-  listen_fd = g_io_channel_unix_get_fd (source);
-
-  if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
-    {
-      addrlen = sizeof (addr.un);
-    }
-  else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
-    {
-      addrlen = sizeof (addr.ipv4);
-    }
-  else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
-    {
-      addrlen = sizeof (addr.ipv6);
-    }
-  else
-    {
-      g_return_val_if_reached (TRUE);
-    }
-
-  fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
-  if (fd == -1)
-    {
-      DEBUG ("Error accepting socket: %s", g_strerror (errno));
-      return TRUE;
-    }
-
-  DEBUG ("connection from client");
-
-  /* Set socket non blocking */
-  flags = fcntl (fd, F_GETFL, 0);
-  if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1)
-    {
-      DEBUG ("Can't set socket non blocking: %s", g_strerror (errno));
-      close (fd);
-      return TRUE;
-    }
+  GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
 
-  DEBUG ("request new bytestream");
+  /* Block the transport while there is no open bytestream to transfer
+   * its data. */
+  gibber_transport_block_receiving (transport, TRUE);
 
-  if (!start_stream_initiation (self, fd, NULL))
+  /* Streams in stream tubes are established with stream initiation (XEP-0095).
+   * We use SalutSiBytestreamManager.
+   */
+  if (!start_stream_initiation (self, transport, NULL))
     {
       DEBUG ("closing new client connection");
-      close (fd);
     }
-
-  return TRUE;
 }
 
 static gboolean
@@ -504,118 +558,58 @@ new_connection_to_socket (GabbleTubeStream *self,
                           GabbleBytestreamIface *bytestream)
 {
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
-  int fd;
-  GIOChannel *channel;
-  SockAddr addr;
-  socklen_t len;
+  GibberTransport *transport;
 
-  g_assert (priv->initiator == priv->self_handle);
+  DEBUG ("Called.");
 
-  memset (&addr, 0, sizeof (addr));
+  g_assert (priv->initiator == priv->self_handle);
 
   if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
     {
       GArray *array;
       array = g_value_get_boxed (priv->address);
-
-      fd = socket (PF_UNIX, SOCK_STREAM, 0);
-      if (fd == -1)
-        {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      addr.un.sun_family = PF_UNIX;
-      strncpy (addr.un.sun_path, array->data, sizeof (addr.un.sun_path) - 1);
-      addr.un.sun_path[sizeof (addr.un.sun_path) - 1] = '\0';
-      len = sizeof (addr.un);
-
       DEBUG ("Will try to connect to socket: %s", (const gchar *) array->data);
+
+      transport = GIBBER_TRANSPORT (gibber_unix_transport_new ());
+      gibber_unix_transport_connect (GIBBER_UNIX_TRANSPORT (transport),
+          array->data, NULL);
     }
   else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4 ||
       priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
     {
       gchar *ip;
+      gchar *port_str;
       guint port;
-      struct addrinfo req, *result = NULL;
-      int ret;
-
-      if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
-        fd = socket (PF_INET, SOCK_STREAM, 0);
-      else
-        fd = socket (PF_INET6, SOCK_STREAM, 0);
-
-      if (fd == -1)
-        {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          return FALSE;
-        }
 
       dbus_g_type_struct_get (priv->address,
           0, &ip,
           1, &port,
           G_MAXUINT);
+      port_str = g_strdup_printf ("%d", port);
 
-      memset (&req, 0, sizeof (req));
-      req.ai_flags = AI_NUMERICHOST;
-      req.ai_socktype = SOCK_STREAM;
-      req.ai_protocol = IPPROTO_TCP;
-
-      if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
-        req.ai_family = AF_INET;
-      else
-        req.ai_family = AF_INET6;
-
-      ret = getaddrinfo (ip, NULL, &req, &result);
-      if (ret != 0)
-        {
-          DEBUG ("getaddrinfo failed: %s",  gai_strerror (ret));
-          g_free (ip);
-          return FALSE;
-        }
-
-      DEBUG ("Will try to connect to %s:%u", ip, port);
+      transport = GIBBER_TRANSPORT (gibber_tcp_transport_new ());
+      gibber_tcp_transport_connect (GIBBER_TCP_TRANSPORT (transport), ip,
+          port_str);
 
-      if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
-        {
-          memcpy (&addr, result->ai_addr, sizeof (addr.ipv4));
-          addr.ipv4.sin_port = ntohs (port);
-          len = sizeof (addr.ipv4);
-        }
-      else
-        {
-          memcpy (&addr, result->ai_addr, sizeof (addr.ipv6));
-          addr.ipv6.sin6_port = ntohs (port);
-          len = sizeof (addr.ipv6);
-        }
+      /* TODO: use priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4 */
 
       g_free (ip);
-      freeaddrinfo (result);
+      g_free (port_str);
     }
   else
     {
-      g_return_val_if_reached (FALSE);
+      g_assert_not_reached ();
     }
 
-  if (connect (fd, (struct sockaddr *) &addr, len) == -1)
-    {
-      DEBUG ("Error connecting socket: %s", g_strerror (errno));
-      return FALSE;
-    }
-  DEBUG ("Connected to socket");
-
-  channel = g_io_channel_unix_new (fd);
-  g_io_channel_set_encoding (channel, NULL, NULL);
-  g_io_channel_set_buffered (channel, FALSE);
-  g_io_channel_set_close_on_unref (channel, TRUE);
+  /* Block the transport while there is no open bytestream to transfer
+   * its data. */
+  gibber_transport_block_receiving (transport, TRUE);
 
-  g_hash_table_insert (priv->fd_to_bytestreams, GINT_TO_POINTER (fd),
-      g_object_ref (bytestream));
-  g_hash_table_insert (priv->bytestream_to_io_channel,
-      g_object_ref (bytestream), channel);
+  g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+      g_object_ref (transport));
 
   g_signal_connect (bytestream, "state-changed",
-                G_CALLBACK (extra_bytestream_state_changed_cb), self);
+      G_CALLBACK (extra_bytestream_state_changed_cb), self);
 
   return TRUE;
 }
@@ -625,7 +619,6 @@ tube_stream_open (GabbleTubeStream *self,
                   GError **error)
 {
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
-  int fd;
 
   DEBUG ("called");
 
@@ -637,86 +630,55 @@ tube_stream_open (GabbleTubeStream *self,
   /* We didn't create this tube so it doesn't have
    * a socket associated with it. Let's create one */
   g_assert (priv->address == NULL);
+  g_assert (priv->local_listener == NULL);
+  priv->local_listener = gibber_listener_new ();
 
+  g_signal_connect (priv->local_listener, "new-connection",
+      G_CALLBACK (local_new_connection_cb), self);
   if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
     {
       GArray *array;
-      struct sockaddr_un addr;
       gchar suffix[8];
-
-      fd = socket (PF_UNIX, SOCK_STREAM, 0);
-      if (fd == -1)
-        {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error creating socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      memset (&addr, 0, sizeof (addr));
-      addr.sun_family = PF_UNIX;
+      gchar *path;
+      int ret;
 
       generate_ascii_string (8, suffix);
-      snprintf (addr.sun_path, sizeof (addr.sun_path) - 1,
-        "/tmp/stream-gabble-%.8s", suffix);
+      path = g_strdup_printf ("/tmp/stream-salut-%.8s", suffix);
 
-      DEBUG ("create socket: %s", addr.sun_path);
+      DEBUG ("create socket: %s", path);
 
-      array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (
-            addr.sun_path));
-      g_array_insert_vals (array, 0, addr.sun_path, strlen (addr.sun_path));
+      array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (path));
+      g_array_insert_vals (array, 0, path, strlen (path));
 
       priv->address = tp_g_value_slice_new (DBUS_TYPE_G_UCHAR_ARRAY);
       g_value_set_boxed (priv->address, array);
 
       g_array_free (array, TRUE);
 
-      if (bind (fd, (struct sockaddr *) &addr, sizeof (addr)) == -1)
+      ret = gibber_listener_listen_socket (priv->local_listener, path, FALSE,
+          error);
+      if (ret != TRUE)
         {
-          DEBUG ("Error binding socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error binding socket: %s", g_strerror (errno));
+          g_assert (error != NULL && *error != NULL);
+          DEBUG ("Error listening on socket %s: %s", path, (*error)->message);
+          g_free (path);
           return FALSE;
         }
+      g_free (path);
     }
   else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
     {
-      struct sockaddr_in addr;
-      socklen_t len;
-
-      addr.sin_family = AF_INET;
-      addr.sin_port = 0;         /* == ntohs (0) */
-      addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
-
-      len = sizeof (addr);
-
-      fd = socket (PF_INET, SOCK_STREAM, 0);
-      if (fd == -1)
-        {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error creating socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      if (bind (fd, (struct sockaddr *) &addr, len) == -1)
-        {
-          DEBUG ("Error binding socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error binding socket: %s", g_strerror (errno));
-          return FALSE;
-        }
+      int ret;
 
-      if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
+      ret = gibber_listener_listen_tcp_loopback_af (priv->local_listener, 0,
+          GIBBER_AF_IPV4, error);
+      if (!ret)
         {
-          DEBUG ("getsockname failed: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "getsockname failed: %s", g_strerror (errno));
+          g_assert (error != NULL && *error != NULL);
+          DEBUG ("Error listening on socket: %s", (*error)->message);
           return FALSE;
         }
 
-      DEBUG ("create socket %s:%u", "127.0.0.1", ntohs (addr.sin_port));
-
       priv->address = tp_g_value_slice_new (
           TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV4);
       g_value_take_boxed (priv->address,
@@ -725,97 +687,42 @@ tube_stream_open (GabbleTubeStream *self,
 
       dbus_g_type_struct_set (priv->address,
           0, "127.0.0.1",
-          1, ntohs (addr.sin_port),
+          1, gibber_listener_get_port (priv->local_listener),
           G_MAXUINT);
     }
   else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
     {
-      struct sockaddr_in6 addr;
-      socklen_t len;
-      struct in6_addr loopback_addr = IN6ADDR_LOOPBACK_INIT;
-
-      addr.sin6_family = AF_INET6;
-      addr.sin6_port = 0;         /* == ntohs (0) */
-      addr.sin6_addr = loopback_addr;
-
-      len = sizeof (addr);
-
-      fd = socket (PF_INET6, SOCK_STREAM, 0);
-      if (fd == -1)
-        {
-          DEBUG ("Error creating socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error creating socket: %s", g_strerror (errno));
-          return FALSE;
-        }
-
-      if (bind (fd, (struct sockaddr *) &addr, len) == -1)
-        {
-          DEBUG ("Error binding socket: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "Error binding socket: %s", g_strerror (errno));
-          return FALSE;
-        }
+      int ret;
 
-      if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
+      ret = gibber_listener_listen_tcp_loopback_af (priv->local_listener, 0,
+          GIBBER_AF_IPV6, error);
+      if (!ret)
         {
-          DEBUG ("getsockname failed: %s", g_strerror (errno));
-          g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-              "getsockname failed: %s", g_strerror (errno));
+          g_assert (error != NULL && *error != NULL);
+          DEBUG ("Error listening on socket: %s", (*error)->message);
           return FALSE;
         }
 
-      DEBUG ("create socket %s:%u", "::1", ntohs (addr.sin6_port));
-
       priv->address = tp_g_value_slice_new (
           TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV6);
       g_value_take_boxed (priv->address,
           dbus_g_type_specialized_construct (
-              TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV6));
+            TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV6));
 
       dbus_g_type_struct_set (priv->address,
           0, "::1",
-          1, ntohs (addr.sin6_port),
+          1, gibber_listener_get_port (priv->local_listener),
           G_MAXUINT);
     }
   else
     {
-      g_return_val_if_reached (FALSE);
+      g_assert_not_reached ();
     }
 
-  if (listen (fd, 5) == -1)
-    {
-      DEBUG ("Error listening socket: %s", g_strerror (errno));
-      g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
-          "Error listening socket: %s", g_strerror (errno));
-      return FALSE;
-    }
-
-  priv->listen_io_channel = g_io_channel_unix_new (fd);
-  g_io_channel_set_encoding (priv->listen_io_channel, NULL, NULL);
-  g_io_channel_set_buffered (priv->listen_io_channel, FALSE);
-  g_io_channel_set_close_on_unref (priv->listen_io_channel, TRUE);
-
-  priv->listen_io_channel_source_id = g_io_add_watch (priv->listen_io_channel,
-      G_IO_IN, listen_cb, self);
-
   return TRUE;
 }
 
 static void
-remove_watcher_source_id (gpointer data)
-{
-  guint source_id = GPOINTER_TO_UINT (data);
-  GSource *source;
-
-  source = g_main_context_find_source_by_id (NULL, source_id);
-  if (source != NULL)
-    {
-      g_source_destroy (source);
-    }
-}
-
-static void
 gabble_tube_stream_init (GabbleTubeStream *self)
 {
   GabbleTubeStreamPrivate *priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
@@ -823,19 +730,14 @@ gabble_tube_stream_init (GabbleTubeStream *self)
 
   self->priv = priv;
 
-  priv->fd_to_bytestreams = g_hash_table_new_full (g_direct_hash,
-      g_direct_equal, NULL, (GDestroyNotify) g_object_unref);
-
-  priv->bytestream_to_io_channel = g_hash_table_new_full (g_direct_hash,
+  priv->transport_to_bytestream = g_hash_table_new_full (g_direct_hash,
       g_direct_equal, (GDestroyNotify) g_object_unref,
-      (GDestroyNotify) g_io_channel_unref);
+      (GDestroyNotify) g_object_unref);
 
-  priv->io_channel_to_watcher_source_id = g_hash_table_new_full (g_direct_hash,
-      g_direct_equal, (GDestroyNotify) g_io_channel_unref,
-      (GDestroyNotify) remove_watcher_source_id);
+  priv->bytestream_to_transport = g_hash_table_new_full (g_direct_hash,
+      g_direct_equal, (GDestroyNotify) g_object_unref,
+      (GDestroyNotify) g_object_unref);
 
-  priv->listen_io_channel = NULL;
-  priv->listen_io_channel_source_id = 0;
   priv->address_type = TP_SOCKET_ADDRESS_TYPE_UNIX;
   priv->address = NULL;
   priv->access_control = TP_SOCKET_ACCESS_CONTROL_LOCALHOST;
@@ -852,22 +754,21 @@ close_each_extra_bytestream (gpointer key,
 {
   GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
-  GabbleBytestreamIface *bytestream = (GabbleBytestreamIface *) value;
-  GIOChannel *channel;
+  GibberTransport *transport = (GibberTransport *) value;
+  GabbleBytestreamIface *bytestream = (GabbleBytestreamIface *) key;
 
   /* We are iterating over priv->fd_to_bytestreams so we can't modify it.
    * Disconnect signals so extra_bytestream_state_changed_cb won't be
    * called */
   g_signal_handlers_disconnect_matched (bytestream, G_SIGNAL_MATCH_DATA,
       0, 0, NULL, NULL, self);
+ g_signal_handlers_disconnect_matched (transport, G_SIGNAL_MATCH_DATA,
+      0, 0, NULL, NULL, self);
 
   gabble_bytestream_iface_close (bytestream, NULL);
+  gibber_transport_disconnect (transport);
 
-  channel = g_hash_table_lookup (priv->bytestream_to_io_channel, bytestream);
-  g_assert (channel != NULL);
-
-  g_hash_table_remove (priv->bytestream_to_io_channel, bytestream);
-  g_hash_table_remove (priv->io_channel_to_watcher_source_id, channel);
+  g_hash_table_remove (priv->transport_to_bytestream, transport);
 
   return TRUE;
 }
@@ -904,37 +805,24 @@ gabble_tube_stream_dispose (GObject *object)
       g_string_free (path, TRUE);
     }
 
-  if (priv->fd_to_bytestreams != NULL)
+  if (priv->transport_to_bytestream != NULL)
     {
-      g_hash_table_destroy (priv->fd_to_bytestreams);
-      priv->fd_to_bytestreams = NULL;
+      g_hash_table_destroy (priv->transport_to_bytestream);
+      priv->transport_to_bytestream = NULL;
     }
 
-  if (priv->bytestream_to_io_channel != NULL)
+  if (priv->bytestream_to_transport != NULL)
     {
-      g_hash_table_destroy (priv->bytestream_to_io_channel);
-      priv->bytestream_to_io_channel = NULL;
-    }
-
-  if (priv->io_channel_to_watcher_source_id != NULL)
-    {
-      g_hash_table_destroy (priv->io_channel_to_watcher_source_id);
-      priv->io_channel_to_watcher_source_id = NULL;
+      g_hash_table_destroy (priv->bytestream_to_transport);
+      priv->bytestream_to_transport = NULL;
     }
 
   tp_handle_unref (contact_repo, priv->initiator);
 
-  if (priv->listen_io_channel_source_id != 0)
-    {
-      g_source_destroy (g_main_context_find_source_by_id (NULL,
-            priv->listen_io_channel_source_id));
-      priv->listen_io_channel_source_id = 0;
-    }
-
-  if (priv->listen_io_channel)
+  if (priv->local_listener != NULL)
     {
-      g_io_channel_unref (priv->listen_io_channel);
-      priv->listen_io_channel = NULL;
+      g_object_unref (priv->local_listener);
+      priv->local_listener = NULL;
     }
 
   priv->dispose_has_run = TRUE;
@@ -1452,34 +1340,32 @@ data_received_cb (GabbleBytestreamIface *bytestream,
 {
   GabbleTubeStream *tube = GABBLE_TUBE_STREAM (user_data);
   GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (tube);
-  gsize written;
-  GIOChannel *channel;
-  GIOStatus status;
+  GibberTransport *transport;
   GError *error = NULL;
 
   DEBUG ("received %" G_GSIZE_FORMAT " bytes from bytestream", data->len);
 
-  channel = g_hash_table_lookup (priv->bytestream_to_io_channel, bytestream);
-  if (channel == NULL)
+  transport = g_hash_table_lookup (priv->bytestream_to_transport, bytestream);
+  if (transport == NULL)
     {
-      DEBUG ("no IO channel associated with the bytestream");
+      DEBUG ("no transport associated with the bytestream");
       return;
     }
 
-  status = g_io_channel_write_chars (channel, data->str, data->len,
-      &written, &error);
-  if (status == G_IO_STATUS_NORMAL)
-    {
-      DEBUG ("%" G_GSIZE_FORMAT " bytes written to the socket", written);
-    }
-  else
+  if (!gibber_transport_send (transport, (const guint8 *) data->str, data->len,
+      &error))
+  {
+    DEBUG ("sending failed: %s", error->message);
+    g_error_free (error);
+  }
+
+  if (!gibber_transport_buffer_is_empty (transport))
     {
-      DEBUG ("error writing to socket: %s",
-          error ? error->message : "");
+      /* We don't want to send more data while the buffer isn't empty */
+      DEBUG ("tube buffer isn't empty. Block the bytestream");
+      //gabble_bytestream_iface_block_read (bytestream, TRUE);
+      //TODO
     }
-
-  if (error != NULL)
-    g_error_free (error);
 }
 
 GabbleTubeStream *
@@ -1577,7 +1463,7 @@ gabble_tube_stream_close (GabbleTubeIface *tube, gboolean closed_remotely)
     return;
   priv->closed = TRUE;
 
-  g_hash_table_foreach_remove (priv->fd_to_bytestreams,
+  g_hash_table_foreach_remove (priv->bytestream_to_transport,
       close_each_extra_bytestream, self);
 
   if (!closed_remotely && priv->handle_type == TP_HANDLE_TYPE_CONTACT)
-- 
1.5.6.5




More information about the Telepathy-commits mailing list