[Telepathy-commits] [telepathy-salut/master] SalutTubeStream: use GibberListener to listen connections from the local application
Alban Crequy
alban.crequy at collabora.co.uk
Tue Nov 25 03:59:39 PST 2008
---
src/tube-stream.c | 297 ++++++++++++++--------------------------------------
1 files changed, 80 insertions(+), 217 deletions(-)
diff --git a/src/tube-stream.c b/src/tube-stream.c
index d104d3b..7d06c1f 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -41,6 +41,7 @@
#include <gibber/gibber-transport.h>
#include <gibber/gibber-fd-transport.h>
#include <gibber/gibber-iq-helper.h>
+#include <gibber/gibber-listener.h>
#define DEBUG_FLAG DEBUG_TUBES
@@ -132,9 +133,6 @@ struct _SalutTubeStreamPrivate
GHashTable *transport_to_bytestream;
/* (GibberBytestreamIface *) -> (GibberTransport *) */
GHashTable *bytestream_to_transport;
- /* (GibberBytestreamIface *) -> int */
- GHashTable *bytestream_to_fd;
-
TpHandle initiator;
gchar *service;
@@ -145,8 +143,11 @@ struct _SalutTubeStreamPrivate
GValue *address;
TpSocketAccessControl access_control;
GValue *access_control_param;
+
+ GibberListener *listener;
GIOChannel *listen_io_channel;
guint listen_io_channel_source_id;
+
gboolean closed;
/* we need to send an iq stanza to close the tube on 1-1 tube */
@@ -245,7 +246,6 @@ remove_transport (SalutTubeStream *self,
g_hash_table_remove (priv->transport_to_bytestream, transport);
g_hash_table_remove (priv->bytestream_to_transport, bytestream);
- g_hash_table_remove (priv->bytestream_to_fd, bytestream);
}
static void
@@ -328,8 +328,7 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
if (state == GIBBER_BYTESTREAM_STATE_OPEN)
{
- int fd;
- GibberLLTransport *ll_transport;
+ GibberTransport *transport;
DEBUG ("extra bytestream open");
@@ -338,13 +337,11 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
g_signal_connect (bytestream, "write-blocked",
G_CALLBACK (bytestream_write_blocked_cb), self);
- fd = GPOINTER_TO_INT (g_hash_table_lookup (priv->bytestream_to_fd,
- bytestream));
- g_assert (fd != 0);
+ transport = g_hash_table_lookup (priv->bytestream_to_transport,
+ bytestream);
+ g_assert (transport != NULL);
- ll_transport = gibber_ll_transport_new ();
- gibber_ll_transport_open_fd (ll_transport, fd);
- add_transport (self, GIBBER_TRANSPORT (ll_transport), bytestream);
+ add_transport (self, transport, bytestream);
}
else if (state == GIBBER_BYTESTREAM_STATE_CLOSED)
{
@@ -371,8 +368,8 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
struct _extra_bytestream_negotiate_cb_data
{
SalutTubeStream *self;
- /* file descriptor of the connection from the local application */
- gint fd;
+ /* transport from the local application */
+ GibberTransport *transport;
};
static void
@@ -388,14 +385,14 @@ extra_bytestream_negotiate_cb (GibberBytestreamIface *bytestream,
{
DEBUG ("initiator refused new bytestream");
- close (data->fd);
+ g_object_unref (data->transport);
return;
}
DEBUG ("extra bytestream accepted");
- g_hash_table_insert (priv->bytestream_to_fd, g_object_ref (bytestream),
- GUINT_TO_POINTER (data->fd));
+ g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+ data->transport);
g_signal_connect (bytestream, "state-changed",
G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -426,7 +423,7 @@ generate_stream_id (SalutTubeStream *self)
static gboolean
start_stream_initiation (SalutTubeStream *self,
- gint fd,
+ GibberTransport *transport,
GError **error)
{
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
@@ -471,7 +468,7 @@ start_stream_initiation (SalutTubeStream *self,
data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
data->self = self;
- data->fd = fd;
+ data->transport = g_object_ref (transport);
g_object_get (priv->conn,
"si-bytestream-manager", &si_bytestream_mgr,
@@ -486,6 +483,7 @@ start_stream_initiation (SalutTubeStream *self,
result = FALSE;
g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
"can't find contact with handle %d", priv->initiator);
+ g_object_unref (transport);
}
else
{
@@ -513,7 +511,7 @@ start_stream_initiation (SalutTubeStream *self,
/* start a new stream in a tube from the recipient side */
static gboolean
start_stream_direct (SalutTubeStream *self,
- gint fd,
+ GibberTransport *transport,
GError **error)
{
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
@@ -522,7 +520,6 @@ start_stream_direct (SalutTubeStream *self,
struct _extra_bytestream_negotiate_cb_data *data;
SalutContact *contact;
SalutContactManager *contact_mgr;
- SalutDirectBytestreamManager *direct_bytestream_mgr;
GibberBytestreamIface *bytestream;
g_assert (priv->handle_type == TP_HANDLE_TYPE_CONTACT);
@@ -534,13 +531,11 @@ start_stream_direct (SalutTubeStream *self,
data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
data->self = self;
- data->fd = fd;
+ data->transport = g_object_ref (transport);
g_object_get (priv->conn,
- "direct-bytestream-manager", &direct_bytestream_mgr,
"contact-manager", &contact_mgr,
NULL);
- g_assert (direct_bytestream_mgr != NULL);
g_assert (contact_mgr != NULL);
contact = salut_contact_manager_get_contact (contact_mgr, priv->initiator);
@@ -549,31 +544,20 @@ start_stream_direct (SalutTubeStream *self,
g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
"can't find contact with handle %d", priv->initiator);
- g_object_unref (direct_bytestream_mgr);
- g_object_unref (contact_mgr);
-
- return FALSE;
- }
-
- bytestream = salut_direct_bytestream_manager_new_stream (
- direct_bytestream_mgr, contact, priv->port);
-
- if (bytestream == NULL)
- {
- DEBUG ("initiator refused new bytestream");
- close (fd);
-
- g_object_unref (contact);
- g_object_unref (direct_bytestream_mgr);
+ g_object_unref (transport);
g_object_unref (contact_mgr);
return FALSE;
}
- DEBUG ("extra bytestream accepted");
+ bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
+ "addresses", salut_contact_get_addresses (contact),
+ "state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
+ "peer-id", contact->name,
+ "port", priv->port,
+ NULL);
- g_hash_table_insert (priv->bytestream_to_fd,
- g_object_ref (bytestream), GUINT_TO_POINTER (fd));
+ g_assert (bytestream != NULL);
g_signal_connect (bytestream, "state-changed",
G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -583,73 +567,30 @@ start_stream_direct (SalutTubeStream *self,
{
/* Initiation failed. */
gibber_bytestream_iface_close (bytestream, NULL);
- close (fd);
+ g_object_unref (transport);
g_object_unref (contact);
- g_object_unref (direct_bytestream_mgr);
g_object_unref (contact_mgr);
return FALSE;
}
g_object_unref (contact);
- g_object_unref (direct_bytestream_mgr);
g_object_unref (contact_mgr);
return TRUE;
}
/* callback for listening connections from the local application */
-gboolean
-listen_cb (GIOChannel *source,
- GIOCondition condition,
- gpointer data)
+static void
+new_connection_cb (GibberListener *listener,
+ GibberTransport *transport,
+ struct sockaddr_storage *addr,
+ guint size,
+ gpointer user_data)
{
- SalutTubeStream *self = SALUT_TUBE_STREAM (data);
+ SalutTubeStream *self = SALUT_TUBE_STREAM (user_data);
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
- int fd, listen_fd;
- SockAddr addr;
- socklen_t addrlen;
- int flags;
-
- listen_fd = g_io_channel_unix_get_fd (source);
-
- if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
- {
- addrlen = sizeof (addr.un);
- }
- else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
- {
- addrlen = sizeof (addr.ipv4);
- }
- else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
- {
- addrlen = sizeof (addr.ipv6);
- }
- else
- {
- g_assert_not_reached ();
- }
-
- fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
- if (fd == -1)
- {
- DEBUG ("Error accepting socket: %s", g_strerror (errno));
- return TRUE;
- }
-
- DEBUG ("connection from client");
-
- /* Set socket non blocking */
- flags = fcntl (fd, F_GETFL, 0);
- if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1)
- {
- DEBUG ("Can't set socket non blocking: %s", g_strerror (errno));
- close (fd);
- return TRUE;
- }
-
- DEBUG ("request new bytestream");
/* Streams in MUC tubes are established with stream initiation (XEP-0095).
* We use SalutSiBytestreamManager.
@@ -659,22 +600,18 @@ listen_cb (GIOChannel *source,
*/
if (priv->handle_type == TP_HANDLE_TYPE_CONTACT)
{
- if (!start_stream_direct (self, fd, NULL))
+ if (!start_stream_direct (self, transport, NULL))
{
DEBUG ("closing new client connection");
- close (fd);
}
}
else
{
- if (!start_stream_initiation (self, fd, NULL))
+ if (!start_stream_initiation (self, transport, NULL))
{
DEBUG ("closing new client connection");
- close (fd);
}
}
-
- return TRUE;
}
static gboolean
@@ -682,6 +619,7 @@ new_connection_to_socket (SalutTubeStream *self,
GibberBytestreamIface *bytestream)
{
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+ GibberFdTransport *transport;
int fd;
SockAddr addr;
socklen_t len;
@@ -782,8 +720,11 @@ new_connection_to_socket (SalutTubeStream *self,
}
DEBUG ("Connected to socket");
- g_hash_table_insert (priv->bytestream_to_fd, g_object_ref (bytestream),
- GUINT_TO_POINTER (fd));
+ transport = g_object_new (GIBBER_TYPE_FD_TRANSPORT, NULL);
+ gibber_fd_transport_set_fd (transport, fd);
+
+ g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+ g_object_ref (transport));
g_signal_connect (bytestream, "state-changed",
G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -796,7 +737,6 @@ tube_stream_open (SalutTubeStream *self,
GError **error)
{
SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
- int fd;
DEBUG ("called");
@@ -808,141 +748,84 @@ tube_stream_open (SalutTubeStream *self,
/* We didn't create this tube so it doesn't have
* a socket associated with it. Let's create one */
g_assert (priv->address == NULL);
+ g_assert (priv->listener == NULL);
+ priv->listener = gibber_listener_new ();
+
+ g_signal_connect (priv->listener, "new-connection",
+ G_CALLBACK (new_connection_cb), self);
if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
{
GArray *array;
- struct sockaddr_un addr;
gchar suffix[8];
-
- fd = socket (PF_UNIX, SOCK_STREAM, 0);
- if (fd == -1)
- {
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error creating socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- memset (&addr, 0, sizeof (addr));
- addr.sun_family = PF_UNIX;
+ gchar *path;
+ int ret;
generate_ascii_string (8, suffix);
- snprintf (addr.sun_path, sizeof (addr.sun_path) - 1,
- "/tmp/stream-salut-%.8s", suffix);
+ path = g_strdup_printf ("/tmp/stream-salut-%.8s", suffix);
- DEBUG ("create socket: %s", addr.sun_path);
+ DEBUG ("create socket: %s", path);
- array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (
- addr.sun_path));
- g_array_insert_vals (array, 0, addr.sun_path, strlen (addr.sun_path));
+ array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (path));
+ g_array_insert_vals (array, 0, path, strlen (path));
priv->address = tp_g_value_slice_new (DBUS_TYPE_G_UCHAR_ARRAY);
g_value_set_boxed (priv->address, array);
g_array_free (array, TRUE);
+ g_free (path);
- if (bind (fd, (struct sockaddr *) &addr, sizeof (addr)) == -1)
+ ret = gibber_listener_listen_socket (priv->listener, path, FALSE,
+ error);
+ if (ret != TRUE)
{
- DEBUG ("Error binding socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error binding socket: %s", g_strerror (errno));
+ g_assert (error != NULL && *error != NULL);
+ DEBUG ("Error listening on socket %s: %s", path, (*error)->message);
return FALSE;
}
}
else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
{
- struct sockaddr_in addr;
- socklen_t len;
-
- addr.sin_family = AF_INET;
- addr.sin_port = 0; /* == ntohs (0) */
- addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
-
- len = sizeof (addr);
+ int port;
- fd = socket (PF_INET, SOCK_STREAM, 0);
- if (fd == -1)
+ port = gibber_listener_listen_tcp_loopback_af (priv->listener, 0,
+ GIBBER_AF_IPV4, error);
+ if (port <= 0)
{
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error creating socket: %s", g_strerror (errno));
+ g_assert (error != NULL && *error != NULL);
+ DEBUG ("Error listening on socket: %s", (*error)->message);
return FALSE;
}
- if (bind (fd, (struct sockaddr *) &addr, len) == -1)
- {
- DEBUG ("Error binding socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error binding socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
- {
- DEBUG ("getsockname failed: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "getsockname failed: %s", g_strerror (errno));
- return FALSE;
- }
-
- DEBUG ("create socket %s:%u", "127.0.0.1", ntohs (addr.sin_port));
-
priv->address = tp_g_value_slice_new (SOCKET_ADDRESS_IPV4_TYPE);
g_value_take_boxed (priv->address,
dbus_g_type_specialized_construct (SOCKET_ADDRESS_IPV4_TYPE));
dbus_g_type_struct_set (priv->address,
0, "127.0.0.1",
- 1, ntohs (addr.sin_port),
+ 1, port,
G_MAXUINT);
}
else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
{
- struct sockaddr_in6 addr;
- socklen_t len;
- struct in6_addr loopback_addr = IN6ADDR_LOOPBACK_INIT;
-
- addr.sin6_family = AF_INET6;
- addr.sin6_port = 0; /* == ntohs (0) */
- addr.sin6_addr = loopback_addr;
-
- len = sizeof (addr);
+ int port;
- fd = socket (PF_INET6, SOCK_STREAM, 0);
- if (fd == -1)
+ port = gibber_listener_listen_tcp_loopback_af (priv->listener, 0,
+ GIBBER_AF_IPV6, error);
+ if (port <= 0)
{
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error creating socket: %s", g_strerror (errno));
+ g_assert (error != NULL && *error != NULL);
+ DEBUG ("Error listening on socket: %s", (*error)->message);
return FALSE;
}
- if (bind (fd, (struct sockaddr *) &addr, len) == -1)
- {
- DEBUG ("Error binding socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error binding socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
- {
- DEBUG ("getsockname failed: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "getsockname failed: %s", g_strerror (errno));
- return FALSE;
- }
-
- DEBUG ("create socket %s:%u", "::1", ntohs (addr.sin6_port));
-
priv->address = tp_g_value_slice_new (SOCKET_ADDRESS_IPV6_TYPE);
g_value_take_boxed (priv->address,
dbus_g_type_specialized_construct (SOCKET_ADDRESS_IPV6_TYPE));
dbus_g_type_struct_set (priv->address,
0, "::1",
- 1, ntohs (addr.sin6_port),
+ 1, port,
G_MAXUINT);
}
else
@@ -950,22 +833,6 @@ tube_stream_open (SalutTubeStream *self,
g_assert_not_reached ();
}
- if (listen (fd, 5) == -1)
- {
- DEBUG ("Error listening socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error listening socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- priv->listen_io_channel = g_io_channel_unix_new (fd);
- g_io_channel_set_encoding (priv->listen_io_channel, NULL, NULL);
- g_io_channel_set_buffered (priv->listen_io_channel, FALSE);
- g_io_channel_set_close_on_unref (priv->listen_io_channel, TRUE);
-
- priv->listen_io_channel_source_id = g_io_add_watch (priv->listen_io_channel,
- G_IO_IN, listen_cb, self);
-
return TRUE;
}
@@ -985,9 +852,6 @@ salut_tube_stream_init (SalutTubeStream *self)
g_direct_equal, (GDestroyNotify) g_object_unref,
(GDestroyNotify) g_object_unref);
- priv->bytestream_to_fd = g_hash_table_new_full (g_direct_hash,
- g_direct_equal, (GDestroyNotify) g_object_unref, NULL);
-
priv->listen_io_channel = NULL;
priv->listen_io_channel_source_id = 0;
priv->address_type = TP_SOCKET_ADDRESS_TYPE_UNIX;
@@ -1023,7 +887,6 @@ close_each_extra_bytestream (gpointer key,
gibber_transport_disconnect (transport);
g_hash_table_remove (priv->bytestream_to_transport, bytestream);
- g_hash_table_remove (priv->bytestream_to_fd, bytestream);
return TRUE;
}
@@ -1072,12 +935,6 @@ salut_tube_stream_dispose (GObject *object)
priv->bytestream_to_transport = NULL;
}
- if (priv->bytestream_to_fd != NULL)
- {
- g_hash_table_destroy (priv->bytestream_to_fd);
- priv->bytestream_to_fd = NULL;
- }
-
tp_handle_unref (contact_repo, priv->initiator);
if (priv->listen_io_channel_source_id != 0)
@@ -1093,6 +950,12 @@ salut_tube_stream_dispose (GObject *object)
priv->listen_io_channel = NULL;
}
+ if (priv->listener != NULL)
+ {
+ g_object_unref (priv->listener);
+ priv->listener = NULL;
+ }
+
if (priv->iq_helper != NULL)
{
g_object_unref (priv->iq_helper);
--
1.5.6.5
More information about the Telepathy-commits
mailing list