[Telepathy-commits] [telepathy-salut/master] SalutDirectBytestreamManager: clean up API, stop listening on the socket when the tube is closed
Alban Crequy
alban.crequy at collabora.co.uk
Tue Nov 25 03:59:20 PST 2008
20080801142026-a41c0-a27a76056b40ddaeb1e50284fd5bb2cc36263869.gz
---
src/salut-direct-bytestream-manager.c | 122 +++++++++++++++++++++++----------
src/salut-direct-bytestream-manager.h | 19 +++---
src/salut-tubes-channel.c | 12 +++-
src/tube-stream.c | 14 ++++
4 files changed, 119 insertions(+), 48 deletions(-)
diff --git a/src/salut-direct-bytestream-manager.c b/src/salut-direct-bytestream-manager.c
index 35fde01..4aeaeb7 100644
--- a/src/salut-direct-bytestream-manager.c
+++ b/src/salut-direct-bytestream-manager.c
@@ -63,11 +63,29 @@ struct _SalutDirectBytestreamManagerPrivate
/* guint id -> guint listener_watch
* When used by stream tubes, the id is the tube_id */
- GHashTable *listener_watchs;
+ GHashTable *listeners;
gboolean dispose_has_run;
};
+struct _listener_io_in_cb_data
+{
+ SalutDirectBytestreamManager *mgr;
+ SalutContact *contact;
+ SalutDirectBytestreamManagerNewConnectionFunc cb;
+ gpointer user_data;
+};
+
+typedef struct _SalutDirectBytestreamManagerListener
+ SalutDirectBytestreamManagerListener;
+struct _SalutDirectBytestreamManagerListener
+{
+ gpointer id;
+ GIOChannel *listen_io_channel;
+ guint listen_io_channel_source_id;
+ struct _listener_io_in_cb_data *data;
+};
+
#define SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE(obj) \
((SalutDirectBytestreamManagerPrivate *) ((SalutDirectBytestreamManager *)obj)->priv)
@@ -189,7 +207,7 @@ salut_direct_bytestream_manager_constructor (GType type,
g_assert (priv->xmpp_connection_manager != NULL);
g_assert (priv->host_name_fqdn != NULL);
- priv->listener_watchs = g_hash_table_new (NULL, NULL);
+ priv->listeners = g_hash_table_new (NULL, NULL);
return obj;
}
@@ -251,13 +269,6 @@ salut_direct_bytestream_manager_new (SalutConnection *conn,
NULL);
}
-struct _listener_io_in_cb_data
-{
- SalutDirectBytestreamManager *mgr;
- SalutTubeIface *tube;
- SalutContact *contact;
-};
-
/* callback when receiving a connection from the remote CM */
static gboolean
listener_io_in_cb (GIOChannel *source,
@@ -271,8 +282,6 @@ listener_io_in_cb (GIOChannel *source,
listen_fd = g_io_channel_unix_get_fd (source);
- DEBUG ("Called. listen_fd=%d", listen_fd);
-
priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (data->mgr);
bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
@@ -281,28 +290,42 @@ listener_io_in_cb (GIOChannel *source,
"peer-id", data->contact->name,
NULL);
- salut_tube_iface_add_bytestream (data->tube, bytestream);
+ /* call this before accepting the socket, in order to let the opportunity to
+ * register a callback for the "new-connection" bytestream's signal. */
+ data->cb (bytestream, data->user_data);
+
gibber_bytestream_direct_accept_socket (bytestream, listen_fd);
return FALSE;
}
-/* the id is an opaque id used by the user of the SalutDirectBytestreamManager
- * object. When used by stream tubes, the id is the tube.
+/**
+ * salut_direct_bytestream_manager_listen:
+ * @self: the direct bytestream manager
+ * @contact: the contact allowed to connect
+ * @new_connection_cb: callback when a new connection comes
+ * @id: opaquer pointer given to the callback
*
- * return: port
+ * Listen on a random TCP port for incoming connections. Only connections from
+ * the contact given as parameter will be accepted. A GibberBystreamDirect
+ * object is created for each successful connection. A callback is called
+ * each time a new GibberBystreamDirect object is created before it actually
+ * connects to let the opportunity to the caller to connect on a bytestream
+ * signal.
+ *
+ * Returns: TCP port number
*/
-static int
-start_listen_for_connection (SalutDirectBytestreamManager *self,
- SalutContact *contact,
- SalutTubeIface *tube)
+int
+salut_direct_bytestream_manager_listen (SalutDirectBytestreamManager *self,
+ SalutContact *contact,
+ SalutDirectBytestreamManagerNewConnectionFunc new_connection_cb,
+ gpointer id)
{
SalutDirectBytestreamManagerPrivate *priv;
priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
struct _listener_io_in_cb_data *data;
- GIOChannel *listener;
- guint *listener_watch;
+ SalutDirectBytestreamManagerListener *listener;
int port;
int fd = -1, ret, yes = 1;
struct addrinfo req, *ans = NULL;
@@ -381,19 +404,22 @@ start_listen_for_connection (SalutDirectBytestreamManager *self,
DEBUG ("listen on %s:%d", priv->host_name_fqdn, port);
- data = g_slice_new (struct _listener_io_in_cb_data);
+ data = g_slice_new0 (struct _listener_io_in_cb_data);
data->mgr = self;
- data->tube = tube;
data->contact = contact,
+ data->cb = new_connection_cb;
+ data->user_data = id;
+
+ listener = g_slice_new0 (SalutDirectBytestreamManagerListener);
+ listener->id = id;
+ listener->data = data;
- listener = g_io_channel_unix_new (fd);
- g_io_channel_set_close_on_unref (listener, TRUE);
- listener_watch = g_malloc (sizeof (*listener_watch));
- *listener_watch = g_io_add_watch (listener, G_IO_IN,
- listener_io_in_cb, data);
+ listener->listen_io_channel = g_io_channel_unix_new (fd);
+ g_io_channel_set_close_on_unref (listener->listen_io_channel, TRUE);
+ listener->listen_io_channel_source_id = g_io_add_watch
+ (listener->listen_io_channel, G_IO_IN, listener_io_in_cb, data);
- /* add tube->listener_watch in priv->listener_watchs */
- g_hash_table_insert (priv->listener_watchs, tube, listener_watch);
+ g_hash_table_insert (priv->listeners, id, listener);
freeaddrinfo (ans);
return port;
@@ -407,18 +433,40 @@ error:
return -1;
}
-int
-salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
- SalutContact *contact,
- GibberXmppConnection *connection,
- SalutTubeIface *tube)
+/**
+ * salut_direct_bytestream_manager_stop_listen:
+ * @self: the direct bytestream manager
+ * @id: opaquer pointer
+ *
+ * Stop to listen on a TCP port after a call to
+ * salut_direct_bytestream_manager_listen.
+ */
+void salut_direct_bytestream_manager_stop_listen (
+ SalutDirectBytestreamManager *self, gpointer id)
{
SalutDirectBytestreamManagerPrivate *priv;
+ SalutDirectBytestreamManagerListener *listener;
+
priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
- DEBUG ("salut_direct_new_listening_stream: Called.");
+ listener = g_hash_table_lookup (priv->listeners, id);
+ g_assert (listener != NULL);
- return start_listen_for_connection (self, contact, tube);
+ if (listener->listen_io_channel_source_id != 0)
+ {
+ GSource* source = g_main_context_find_source_by_id (NULL,
+ listener->listen_io_channel_source_id);
+
+ /* FIXME: why source is already destroyed here!? */
+ if (source != NULL)
+ {
+ g_source_destroy (source);
+ }
+ listener->listen_io_channel_source_id = 0;
+ }
+
+ g_slice_free (struct _listener_io_in_cb_data, listener->data);
+ g_slice_free (SalutDirectBytestreamManagerListener, listener);
}
GibberBytestreamIface *
diff --git a/src/salut-direct-bytestream-manager.h b/src/salut-direct-bytestream-manager.h
index f406101..87f0e84 100644
--- a/src/salut-direct-bytestream-manager.h
+++ b/src/salut-direct-bytestream-manager.h
@@ -33,6 +33,9 @@ G_BEGIN_DECLS
typedef struct _SalutDirectBytestreamManager SalutDirectBytestreamManager;
typedef struct _SalutDirectBytestreamManagerClass SalutDirectBytestreamManagerClass;
+typedef void (* SalutDirectBytestreamManagerNewConnectionFunc) (
+ GibberBytestreamIface *bytestream, gpointer user_data);
+
struct _SalutDirectBytestreamManagerClass {
GObjectClass parent_class;
};
@@ -67,15 +70,13 @@ SalutDirectBytestreamManager *
salut_direct_bytestream_manager_new (SalutConnection *connection,
const gchar *host_name_fqdn);
-/* To be used on the CM-initiator side, to receive connections from the remote
- * CM
- *
- * return: port
- * */
-int
-salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
- SalutContact *contact, GibberXmppConnection *connection,
- SalutTubeIface *tube);
+int salut_direct_bytestream_manager_listen (SalutDirectBytestreamManager *self,
+ SalutContact *contact,
+ SalutDirectBytestreamManagerNewConnectionFunc new_connection_cb,
+ gpointer id);
+
+void salut_direct_bytestream_manager_stop_listen (
+ SalutDirectBytestreamManager *self, gpointer id);
/* To be used on the CM-receptor side, to make a new connection */
GibberBytestreamIface *
diff --git a/src/salut-tubes-channel.c b/src/salut-tubes-channel.c
index 36f9e40..203f053 100644
--- a/src/salut-tubes-channel.c
+++ b/src/salut-tubes-channel.c
@@ -1056,6 +1056,7 @@ tube_closed_cb (SalutTubeIface *tube,
{
DEBUG ("Can't find tube having this id: %d", tube_id);
}
+
DEBUG ("tube %d removed", tube_id);
/* Emit the DBusNamesChanged signal */
@@ -1760,6 +1761,13 @@ iq_reply_cb (GibberIqHelper *helper,
DEBUG ("tube offered successfully");
}
+static void
+_new_connection_cb (GibberBytestreamIface *bytestream, gpointer user_data)
+{
+ SalutTubeIface *tube = user_data;
+
+ salut_tube_iface_add_bytestream (tube, bytestream);
+}
static void
_send_channel_iq_tube (gpointer key,
@@ -1811,8 +1819,8 @@ _send_channel_iq_tube (gpointer key,
NULL);
g_assert (direct_bytestream_mgr != NULL);
- port = salut_direct_new_listening_stream (direct_bytestream_mgr,
- priv->contact, priv->xmpp_connection, tube);
+ port = salut_direct_bytestream_manager_listen (direct_bytestream_mgr,
+ priv->contact, _new_connection_cb, tube);
g_object_unref (direct_bytestream_mgr);
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 891de77..9f47bcb 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -1621,6 +1621,20 @@ salut_tube_stream_close (SalutTubeIface *tube)
g_free (tube_id_str);
g_object_unref (stanza);
+
+ if (priv->initiator == priv->self_handle)
+ {
+ SalutDirectBytestreamManager *direct_bytestream_mgr;
+ g_assert (priv->conn != NULL);
+ g_object_get (priv->conn,
+ "direct-bytestream-manager", &direct_bytestream_mgr,
+ NULL);
+ g_assert (direct_bytestream_mgr != NULL);
+
+ salut_direct_bytestream_manager_stop_listen (direct_bytestream_mgr, tube);
+ g_object_unref (direct_bytestream_mgr);
+ }
+
}
g_signal_emit (G_OBJECT (self), signals[CLOSED], 0);
--
1.5.6.5
More information about the Telepathy-commits
mailing list