[Telepathy-commits] [telepathy-salut/master] SalutDirectBytestreamManager: clean up API, stop listening on the socket when the tube is closed

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:20 PST 2008


20080801142026-a41c0-a27a76056b40ddaeb1e50284fd5bb2cc36263869.gz
---
 src/salut-direct-bytestream-manager.c |  122 +++++++++++++++++++++++----------
 src/salut-direct-bytestream-manager.h |   19 +++---
 src/salut-tubes-channel.c             |   12 +++-
 src/tube-stream.c                     |   14 ++++
 4 files changed, 119 insertions(+), 48 deletions(-)

diff --git a/src/salut-direct-bytestream-manager.c b/src/salut-direct-bytestream-manager.c
index 35fde01..4aeaeb7 100644
--- a/src/salut-direct-bytestream-manager.c
+++ b/src/salut-direct-bytestream-manager.c
@@ -63,11 +63,29 @@ struct _SalutDirectBytestreamManagerPrivate
 
   /* guint id -> guint listener_watch
    * When used by stream tubes, the id is the tube_id */
-  GHashTable *listener_watchs;
+  GHashTable *listeners;
 
   gboolean dispose_has_run;
 };
 
+struct _listener_io_in_cb_data
+{
+  SalutDirectBytestreamManager *mgr;
+  SalutContact *contact;
+  SalutDirectBytestreamManagerNewConnectionFunc cb;
+  gpointer user_data;
+};
+
+typedef struct _SalutDirectBytestreamManagerListener
+    SalutDirectBytestreamManagerListener;
+struct _SalutDirectBytestreamManagerListener
+{
+  gpointer id;
+  GIOChannel *listen_io_channel;
+  guint listen_io_channel_source_id;
+  struct _listener_io_in_cb_data *data;
+};
+
 #define SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE(obj) \
     ((SalutDirectBytestreamManagerPrivate *) ((SalutDirectBytestreamManager *)obj)->priv)
 
@@ -189,7 +207,7 @@ salut_direct_bytestream_manager_constructor (GType type,
   g_assert (priv->xmpp_connection_manager != NULL);
   g_assert (priv->host_name_fqdn != NULL);
 
-  priv->listener_watchs = g_hash_table_new (NULL, NULL);
+  priv->listeners = g_hash_table_new (NULL, NULL);
 
   return obj;
 }
@@ -251,13 +269,6 @@ salut_direct_bytestream_manager_new (SalutConnection *conn,
       NULL);
 }
 
-struct _listener_io_in_cb_data
-{
-  SalutDirectBytestreamManager *mgr;
-  SalutTubeIface *tube;
-  SalutContact *contact;
-};
-
 /* callback when receiving a connection from the remote CM */
 static gboolean
 listener_io_in_cb (GIOChannel *source,
@@ -271,8 +282,6 @@ listener_io_in_cb (GIOChannel *source,
 
   listen_fd = g_io_channel_unix_get_fd (source);
 
-  DEBUG ("Called. listen_fd=%d", listen_fd);
-
   priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (data->mgr);
 
   bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
@@ -281,28 +290,42 @@ listener_io_in_cb (GIOChannel *source,
       "peer-id", data->contact->name,
       NULL);
 
-  salut_tube_iface_add_bytestream (data->tube, bytestream);
+  /* call this before accepting the socket, in order to let the opportunity to
+   * register a callback for the "new-connection" bytestream's signal. */
+  data->cb (bytestream, data->user_data);
+
   gibber_bytestream_direct_accept_socket (bytestream, listen_fd);
 
   return FALSE;
 }
 
 
-/* the id is an opaque id used by the user of the SalutDirectBytestreamManager
- * object. When used by stream tubes, the id is the tube.
+/**
+ * salut_direct_bytestream_manager_listen:
+ * @self: the direct bytestream manager
+ * @contact: the contact allowed to connect
+ * @new_connection_cb: callback when a new connection comes
+ * @id: opaquer pointer given to the callback
  *
- * return: port
+ * Listen on a random TCP port for incoming connections. Only connections from
+ * the contact given as parameter will be accepted. A GibberBystreamDirect
+ * object is created for each successful connection. A callback is called
+ * each time a new GibberBystreamDirect object is created before it actually
+ * connects to let the opportunity to the caller to connect on a bytestream
+ * signal.
+ *
+ * Returns: TCP port number
  */
-static int
-start_listen_for_connection (SalutDirectBytestreamManager *self,
-                             SalutContact *contact,
-                             SalutTubeIface *tube)
+int
+salut_direct_bytestream_manager_listen (SalutDirectBytestreamManager *self,
+    SalutContact *contact,
+    SalutDirectBytestreamManagerNewConnectionFunc new_connection_cb,
+    gpointer id)
 {
   SalutDirectBytestreamManagerPrivate *priv;
   priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
   struct _listener_io_in_cb_data *data;
-  GIOChannel *listener;
-  guint *listener_watch;
+  SalutDirectBytestreamManagerListener *listener;
   int port;
   int fd = -1, ret, yes = 1;
   struct addrinfo req, *ans = NULL;
@@ -381,19 +404,22 @@ start_listen_for_connection (SalutDirectBytestreamManager *self,
 
   DEBUG ("listen on %s:%d", priv->host_name_fqdn, port);
 
-  data = g_slice_new (struct _listener_io_in_cb_data);
+  data = g_slice_new0 (struct _listener_io_in_cb_data);
   data->mgr = self;
-  data->tube = tube;
   data->contact = contact,
+  data->cb = new_connection_cb;
+  data->user_data = id;
+
+  listener = g_slice_new0 (SalutDirectBytestreamManagerListener);
+  listener->id = id;
+  listener->data = data;
 
-  listener = g_io_channel_unix_new (fd);
-  g_io_channel_set_close_on_unref (listener, TRUE);
-  listener_watch = g_malloc (sizeof (*listener_watch));
-  *listener_watch = g_io_add_watch (listener, G_IO_IN,
-      listener_io_in_cb, data);
+  listener->listen_io_channel = g_io_channel_unix_new (fd);
+  g_io_channel_set_close_on_unref (listener->listen_io_channel, TRUE);
+  listener->listen_io_channel_source_id = g_io_add_watch
+      (listener->listen_io_channel, G_IO_IN, listener_io_in_cb, data);
 
-  /* add tube->listener_watch in priv->listener_watchs */
-  g_hash_table_insert (priv->listener_watchs, tube, listener_watch);
+  g_hash_table_insert (priv->listeners, id, listener);
 
   freeaddrinfo (ans);
   return port;
@@ -407,18 +433,40 @@ error:
   return -1;
 }
 
-int
-salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
-                                   SalutContact *contact,
-                                   GibberXmppConnection *connection,
-                                   SalutTubeIface *tube)
+/**
+ * salut_direct_bytestream_manager_stop_listen:
+ * @self: the direct bytestream manager
+ * @id: opaquer pointer
+ *
+ * Stop to listen on a TCP port after a call to
+ * salut_direct_bytestream_manager_listen.
+ */
+void salut_direct_bytestream_manager_stop_listen (
+    SalutDirectBytestreamManager *self, gpointer id)
 {
   SalutDirectBytestreamManagerPrivate *priv;
+  SalutDirectBytestreamManagerListener *listener;
+
   priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
 
-  DEBUG ("salut_direct_new_listening_stream: Called.");
+  listener = g_hash_table_lookup (priv->listeners, id);
+  g_assert (listener != NULL);
 
-  return start_listen_for_connection (self, contact, tube);
+  if (listener->listen_io_channel_source_id != 0)
+    {
+      GSource* source = g_main_context_find_source_by_id (NULL,
+          listener->listen_io_channel_source_id);
+
+      /* FIXME: why source is already destroyed here!? */
+      if (source != NULL)
+        {
+          g_source_destroy (source);
+        }
+      listener->listen_io_channel_source_id = 0;
+    }
+  
+  g_slice_free (struct _listener_io_in_cb_data, listener->data);
+  g_slice_free (SalutDirectBytestreamManagerListener, listener);
 }
 
 GibberBytestreamIface *
diff --git a/src/salut-direct-bytestream-manager.h b/src/salut-direct-bytestream-manager.h
index f406101..87f0e84 100644
--- a/src/salut-direct-bytestream-manager.h
+++ b/src/salut-direct-bytestream-manager.h
@@ -33,6 +33,9 @@ G_BEGIN_DECLS
 typedef struct _SalutDirectBytestreamManager SalutDirectBytestreamManager;
 typedef struct _SalutDirectBytestreamManagerClass SalutDirectBytestreamManagerClass;
 
+typedef void (* SalutDirectBytestreamManagerNewConnectionFunc) (
+    GibberBytestreamIface *bytestream, gpointer user_data);
+
 struct _SalutDirectBytestreamManagerClass {
     GObjectClass parent_class;
 };
@@ -67,15 +70,13 @@ SalutDirectBytestreamManager *
 salut_direct_bytestream_manager_new (SalutConnection *connection,
     const gchar *host_name_fqdn);
 
-/* To be used on the CM-initiator side, to receive connections from the remote
- * CM
- *
- * return: port
- * */
-int
-salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
-    SalutContact *contact, GibberXmppConnection *connection,
-    SalutTubeIface *tube);
+int salut_direct_bytestream_manager_listen (SalutDirectBytestreamManager *self,
+    SalutContact *contact,
+    SalutDirectBytestreamManagerNewConnectionFunc new_connection_cb,
+    gpointer id);
+
+void salut_direct_bytestream_manager_stop_listen (
+    SalutDirectBytestreamManager *self, gpointer id);
 
 /* To be used on the CM-receptor side, to make a new connection */
 GibberBytestreamIface *
diff --git a/src/salut-tubes-channel.c b/src/salut-tubes-channel.c
index 36f9e40..203f053 100644
--- a/src/salut-tubes-channel.c
+++ b/src/salut-tubes-channel.c
@@ -1056,6 +1056,7 @@ tube_closed_cb (SalutTubeIface *tube,
     {
       DEBUG ("Can't find tube having this id: %d", tube_id);
     }
+
   DEBUG ("tube %d removed", tube_id);
 
   /* Emit the DBusNamesChanged signal */
@@ -1760,6 +1761,13 @@ iq_reply_cb (GibberIqHelper *helper,
   DEBUG ("tube offered successfully");
 }
 
+static void
+_new_connection_cb (GibberBytestreamIface *bytestream, gpointer user_data)
+{
+  SalutTubeIface *tube = user_data;
+
+  salut_tube_iface_add_bytestream (tube, bytestream);
+}
 
 static void
 _send_channel_iq_tube (gpointer key,
@@ -1811,8 +1819,8 @@ _send_channel_iq_tube (gpointer key,
           NULL);
       g_assert (direct_bytestream_mgr != NULL);
 
-      port = salut_direct_new_listening_stream (direct_bytestream_mgr,
-          priv->contact, priv->xmpp_connection, tube);
+      port = salut_direct_bytestream_manager_listen (direct_bytestream_mgr,
+          priv->contact, _new_connection_cb, tube);
       g_object_unref (direct_bytestream_mgr);
 
 
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 891de77..9f47bcb 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -1621,6 +1621,20 @@ salut_tube_stream_close (SalutTubeIface *tube)
       g_free (tube_id_str);
 
       g_object_unref (stanza);
+
+      if (priv->initiator == priv->self_handle)
+        {
+          SalutDirectBytestreamManager *direct_bytestream_mgr;
+          g_assert (priv->conn != NULL);
+          g_object_get (priv->conn,
+              "direct-bytestream-manager", &direct_bytestream_mgr,
+              NULL);
+          g_assert (direct_bytestream_mgr != NULL);
+
+          salut_direct_bytestream_manager_stop_listen (direct_bytestream_mgr, tube);
+          g_object_unref (direct_bytestream_mgr);
+        }
+
     }
 
   g_signal_emit (G_OBJECT (self), signals[CLOSED], 0);
-- 
1.5.6.5




More information about the Telepathy-commits mailing list