[Telepathy-commits] [telepathy-gabble/master] use Gibber transports and listenner
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Mon Dec 15 07:17:41 PST 2008
---
src/tube-stream.c | 678 ++++++++++++++++++++++-------------------------------
1 files changed, 282 insertions(+), 396 deletions(-)
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 4bcdf41..c6915c0 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -44,6 +44,12 @@
#define DEBUG_FLAG GABBLE_DEBUG_TUBES
+#include <gibber/gibber-fd-transport.h>
+#include <gibber/gibber-listener.h>
+#include <gibber/gibber-tcp-transport.h>
+#include <gibber/gibber-transport.h>
+#include <gibber/gibber-unix-transport.h>
+
#include "bytestream-factory.h"
#include "bytestream-iface.h"
#include "connection.h"
@@ -142,9 +148,28 @@ struct _GabbleTubeStreamPrivate
TpHandleType handle_type;
TpHandle self_handle;
guint id;
- GHashTable *fd_to_bytestreams;
- GHashTable *bytestream_to_io_channel;
- GHashTable *io_channel_to_watcher_source_id;
+
+ /* Bytestreams for tubes. One tube can have several bytestreams. The
+ * mapping between the tube bytestream and the transport to the local
+ * application is stored in the transport_to_bytestream and
+ * bytestream_to_transport fields. This is used both on initiator-side and
+ * on recipient-side. */
+
+ /* (GabbleBytestreamIface *) -> (GibberTransport *)
+ *
+ * The (b->t) is inserted as soon as they are created. On initiator side,
+ * we receive an incoming bytestream, create a transport and insert (b->t).
+ * On recipient side, we receive an incoming transport, create a bytestream
+ * and insert (b->t).
+ */
+ GHashTable *bytestream_to_transport;
+
+ /* (GibberTransport *) -> (GabbleBytestreamIface *)
+ *
+ * The (t->b) is inserted when the bytestream is open.
+ */
+ GHashTable *transport_to_bytestream;
+
TpHandle initiator;
gchar *service;
GHashTable *parameters;
@@ -154,8 +179,10 @@ struct _GabbleTubeStreamPrivate
GValue *address;
TpSocketAccessControl access_control;
GValue *access_control_param;
- GIOChannel *listen_io_channel;
- guint listen_io_channel_source_id;
+
+ /* listen for connections from local applications */
+ GibberListener *local_listener;
+
gboolean closed;
gboolean dispose_has_run;
@@ -181,72 +208,135 @@ generate_ascii_string (guint len,
buf[i] = chars[g_random_int_range (0, 64)];
}
-static gboolean
-data_to_read_on_socket_cb (GIOChannel *source,
- GIOCondition condition,
- gpointer data)
+static void
+transport_handler (GibberTransport *transport,
+ GibberBuffer *data,
+ gpointer user_data)
{
- GabbleTubeStream *self = GABBLE_TUBE_STREAM (data);
+ GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
GabbleBytestreamIface *bytestream;
- int fd;
- gchar buffer[4096];
- gsize num_read;
- GIOStatus status;
- GError *error = NULL;
- gboolean result = TRUE;
- if (! (condition & G_IO_IN))
- return TRUE;
+ bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
+ if (bytestream == NULL)
+ {
+ DEBUG ("no open bytestream associated with this transport");
+ return;
+ }
- fd = g_io_channel_unix_get_fd (source);
+ DEBUG ("read %" G_GSIZE_FORMAT " bytes from socket", data->length);
- bytestream = g_hash_table_lookup (priv->fd_to_bytestreams,
- GINT_TO_POINTER (fd));
+ gabble_bytestream_iface_send (bytestream, data->length,
+ (const gchar *) data->data);
+}
+
+static void
+transport_disconnected_cb (GibberTransport *transport,
+ GabbleTubeStream *self)
+{
+ GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+ GabbleBytestreamIface *bytestream;
+
+ bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
if (bytestream == NULL)
- {
- DEBUG ("no bytestream associated with this socket");
+ return;
- g_hash_table_remove (priv->io_channel_to_watcher_source_id, source);
- return FALSE;
- }
+ DEBUG ("transport disconnected. close the extra bytestream");
- memset (&buffer, 0, sizeof (buffer));
+ gabble_bytestream_iface_close (bytestream, NULL);
+}
- status = g_io_channel_read_chars (source, buffer, 4096, &num_read, &error);
- if (status == G_IO_STATUS_NORMAL)
- {
- DEBUG ("read %" G_GSIZE_FORMAT " bytes from socket", num_read);
+static void
+remove_transport (GabbleTubeStream *self,
+ GabbleBytestreamIface *bytestream,
+ GibberTransport *transport)
+{
+ GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- gabble_bytestream_iface_send (bytestream, num_read, buffer);
- result = TRUE;
- }
- else if (status == G_IO_STATUS_EOF)
- {
- DEBUG ("error reading from socket: EOF");
+ DEBUG ("disconnect and remove transport");
+ g_signal_handlers_disconnect_matched (transport, G_SIGNAL_MATCH_DATA,
+ 0, 0, NULL, NULL, self);
- gabble_bytestream_iface_close (bytestream, NULL);
- result = FALSE;
- }
- else if (status == G_IO_STATUS_AGAIN)
+ gibber_transport_disconnect (transport);
+
+ /* the transport may not be in transport_to_bytestream if the bytestream was
+ * not fully open */
+ g_hash_table_remove (priv->transport_to_bytestream, transport);
+
+ g_hash_table_remove (priv->bytestream_to_transport, bytestream);
+}
+
+static void
+transport_buffer_empty_cb (GibberTransport *transport,
+ GabbleTubeStream *self)
+{
+ GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+ GabbleBytestreamIface *bytestream;
+ GabbleBytestreamState state;
+
+ bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
+ g_assert (bytestream != NULL);
+ g_object_get (bytestream, "state", &state, NULL);
+
+ if (state == GABBLE_BYTESTREAM_STATE_CLOSED)
{
- DEBUG ("error reading from socket: resource temporarily unavailable");
+ DEBUG ("buffer is now empty. Transport can be removed");
+ remove_transport (self, bytestream, transport);
+ return;
+ }
+
+ /* Buffer is empty so we can unblock the buffer if it was blocked */
+ DEBUG ("tube buffer is empty. Unblock the bytestream");
+ //gabble_bytestream_iface_block_read (bytestream, FALSE);
+ //TODO
+}
+
+static void
+add_transport (GabbleTubeStream *self,
+ GibberTransport *transport,
+ GabbleBytestreamIface *bytestream)
+{
+ GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+
+ gibber_transport_set_handler (transport, transport_handler, self);
+
+ g_hash_table_insert (priv->transport_to_bytestream,
+ g_object_ref (transport), g_object_ref (bytestream));
- result = TRUE;
+ g_signal_connect (transport, "disconnected",
+ G_CALLBACK (transport_disconnected_cb), self);
+ g_signal_connect (transport, "buffer-empty",
+ G_CALLBACK (transport_buffer_empty_cb), self);
+
+ /* We can transfer transport's data; unblock it. */
+ gibber_transport_block_receiving (transport, FALSE);
+}
+
+#if 0
+static void
+bytestream_write_blocked_cb (GabbleBytestreamIface *bytestream,
+ gboolean blocked,
+ GabbleTubeStream *self)
+{
+ GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
+ GibberTransport *transport;
+
+ transport = g_hash_table_lookup (priv->bytestream_to_transport,
+ bytestream);
+ g_assert (transport != NULL);
+
+ if (blocked)
+ {
+ DEBUG ("bytestream blocked, stop to read data from the tube socket");
}
else
{
- DEBUG ("error reading from socket: %s", error ? error->message : "");
-
- gabble_bytestream_iface_close (bytestream, NULL);
- result = FALSE;
+ DEBUG ("bytestream unblocked, restart to read data from the tube socket");
}
- if (error != NULL)
- g_error_free (error);
-
- return TRUE;
+ gibber_transport_block_receiving (transport, blocked);
}
+#endif
static void
extra_bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
@@ -255,47 +345,56 @@ extra_bytestream_state_changed_cb (GabbleBytestreamIface *bytestream,
{
GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- GIOChannel *channel;
- channel = g_hash_table_lookup (priv->bytestream_to_io_channel,
- bytestream);
- if (channel == NULL)
- {
- DEBUG ("no IO channel associated with the bytestream");
- return;
- }
+ DEBUG ("Called.");
if (state == GABBLE_BYTESTREAM_STATE_OPEN)
{
- guint source_id;
+ GibberTransport *transport;
+
DEBUG ("extra bytestream open");
g_signal_connect (bytestream, "data-received",
G_CALLBACK (data_received_cb), self);
+ /*
+ * TODO
+ g_signal_connect (bytestream, "write-blocked",
+ G_CALLBACK (bytestream_write_blocked_cb), self);
+ */
+
+ transport = g_hash_table_lookup (priv->bytestream_to_transport,
+ bytestream);
+ g_assert (transport != NULL);
- source_id = g_io_add_watch (channel, G_IO_IN, data_to_read_on_socket_cb,
- self);
- g_hash_table_insert (priv->io_channel_to_watcher_source_id,
- g_io_channel_ref (channel), GUINT_TO_POINTER (source_id));
+ add_transport (self, transport, bytestream);
}
else if (state == GABBLE_BYTESTREAM_STATE_CLOSED)
{
- int fd;
+ GibberTransport *transport;
DEBUG ("extra bytestream closed");
-
- fd = g_io_channel_unix_get_fd (channel);
-
- g_hash_table_remove (priv->fd_to_bytestreams, GINT_TO_POINTER (fd));
- g_hash_table_remove (priv->bytestream_to_io_channel, bytestream);
- g_hash_table_remove (priv->io_channel_to_watcher_source_id, channel);
+ transport = g_hash_table_lookup (priv->bytestream_to_transport,
+ bytestream);
+ if (transport != NULL)
+ {
+ if (gibber_transport_buffer_is_empty (transport))
+ {
+ DEBUG ("Buffer is empty, we can remove the transport");
+ remove_transport (self, bytestream, transport);
+ }
+ else
+ {
+ DEBUG ("Wait buffer is empty before disconnect the transport");
+ }
+ }
}
}
struct _extra_bytestream_negotiate_cb_data
{
GabbleTubeStream *self;
- gint fd;
+ /* transport from the local application */
+ GibberTransport *transport;
};
static void
@@ -308,27 +407,19 @@ extra_bytestream_negotiate_cb (GabbleBytestreamIface *bytestream,
(struct _extra_bytestream_negotiate_cb_data *) user_data;
GabbleTubeStream *self = data->self;
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- GIOChannel *channel;
if (bytestream == NULL)
{
DEBUG ("initiator refused new bytestream");
- close (data->fd);
+ g_object_unref (data->transport);
return;
}
DEBUG ("extra bytestream accepted");
- channel = g_io_channel_unix_new (data->fd);
- g_io_channel_set_encoding (channel, NULL, NULL);
- g_io_channel_set_buffered (channel, FALSE);
- g_io_channel_set_close_on_unref (channel, TRUE);
-
- g_hash_table_insert (priv->fd_to_bytestreams, GINT_TO_POINTER (data->fd),
- g_object_ref (bytestream));
- g_hash_table_insert (priv->bytestream_to_io_channel,
- g_object_ref (bytestream), channel);
+ g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+ data->transport);
g_signal_connect (bytestream, "state-changed",
G_CALLBACK (extra_bytestream_state_changed_cb), self);
@@ -338,7 +429,7 @@ extra_bytestream_negotiate_cb (GabbleBytestreamIface *bytestream,
static gboolean
start_stream_initiation (GabbleTubeStream *self,
- gint fd,
+ GibberTransport *transport,
GError **error)
{
GabbleTubeStreamPrivate *priv;
@@ -421,7 +512,7 @@ start_stream_initiation (GabbleTubeStream *self,
data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
data->self = self;
- data->fd = fd;
+ data->transport = g_object_ref (transport);
result = gabble_bytestream_factory_negotiate_stream (
priv->conn->bytestream_factory,
@@ -439,64 +530,27 @@ start_stream_initiation (GabbleTubeStream *self,
return result;
}
-static gboolean
-listen_cb (GIOChannel *source,
- GIOCondition condition,
- gpointer data)
+/* callback for listening connections from the local application */
+static void
+local_new_connection_cb (GibberListener *listener,
+ GibberTransport *transport,
+ struct sockaddr_storage *addr,
+ guint size,
+ gpointer user_data)
{
- GabbleTubeStream *self = GABBLE_TUBE_STREAM (data);
- GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- int fd, listen_fd;
- SockAddr addr;
- socklen_t addrlen;
- int flags;
-
- listen_fd = g_io_channel_unix_get_fd (source);
-
- if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
- {
- addrlen = sizeof (addr.un);
- }
- else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
- {
- addrlen = sizeof (addr.ipv4);
- }
- else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
- {
- addrlen = sizeof (addr.ipv6);
- }
- else
- {
- g_return_val_if_reached (TRUE);
- }
-
- fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
- if (fd == -1)
- {
- DEBUG ("Error accepting socket: %s", g_strerror (errno));
- return TRUE;
- }
-
- DEBUG ("connection from client");
-
- /* Set socket non blocking */
- flags = fcntl (fd, F_GETFL, 0);
- if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1)
- {
- DEBUG ("Can't set socket non blocking: %s", g_strerror (errno));
- close (fd);
- return TRUE;
- }
+ GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
- DEBUG ("request new bytestream");
+ /* Block the transport while there is no open bytestream to transfer
+ * its data. */
+ gibber_transport_block_receiving (transport, TRUE);
- if (!start_stream_initiation (self, fd, NULL))
+ /* Streams in stream tubes are established with stream initiation (XEP-0095).
+ * We use SalutSiBytestreamManager.
+ */
+ if (!start_stream_initiation (self, transport, NULL))
{
DEBUG ("closing new client connection");
- close (fd);
}
-
- return TRUE;
}
static gboolean
@@ -504,118 +558,58 @@ new_connection_to_socket (GabbleTubeStream *self,
GabbleBytestreamIface *bytestream)
{
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- int fd;
- GIOChannel *channel;
- SockAddr addr;
- socklen_t len;
+ GibberTransport *transport;
- g_assert (priv->initiator == priv->self_handle);
+ DEBUG ("Called.");
- memset (&addr, 0, sizeof (addr));
+ g_assert (priv->initiator == priv->self_handle);
if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
{
GArray *array;
array = g_value_get_boxed (priv->address);
-
- fd = socket (PF_UNIX, SOCK_STREAM, 0);
- if (fd == -1)
- {
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- addr.un.sun_family = PF_UNIX;
- strncpy (addr.un.sun_path, array->data, sizeof (addr.un.sun_path) - 1);
- addr.un.sun_path[sizeof (addr.un.sun_path) - 1] = '\0';
- len = sizeof (addr.un);
-
DEBUG ("Will try to connect to socket: %s", (const gchar *) array->data);
+
+ transport = GIBBER_TRANSPORT (gibber_unix_transport_new ());
+ gibber_unix_transport_connect (GIBBER_UNIX_TRANSPORT (transport),
+ array->data, NULL);
}
else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4 ||
priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
{
gchar *ip;
+ gchar *port_str;
guint port;
- struct addrinfo req, *result = NULL;
- int ret;
-
- if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
- fd = socket (PF_INET, SOCK_STREAM, 0);
- else
- fd = socket (PF_INET6, SOCK_STREAM, 0);
-
- if (fd == -1)
- {
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- return FALSE;
- }
dbus_g_type_struct_get (priv->address,
0, &ip,
1, &port,
G_MAXUINT);
+ port_str = g_strdup_printf ("%d", port);
- memset (&req, 0, sizeof (req));
- req.ai_flags = AI_NUMERICHOST;
- req.ai_socktype = SOCK_STREAM;
- req.ai_protocol = IPPROTO_TCP;
-
- if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
- req.ai_family = AF_INET;
- else
- req.ai_family = AF_INET6;
-
- ret = getaddrinfo (ip, NULL, &req, &result);
- if (ret != 0)
- {
- DEBUG ("getaddrinfo failed: %s", gai_strerror (ret));
- g_free (ip);
- return FALSE;
- }
-
- DEBUG ("Will try to connect to %s:%u", ip, port);
+ transport = GIBBER_TRANSPORT (gibber_tcp_transport_new ());
+ gibber_tcp_transport_connect (GIBBER_TCP_TRANSPORT (transport), ip,
+ port_str);
- if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
- {
- memcpy (&addr, result->ai_addr, sizeof (addr.ipv4));
- addr.ipv4.sin_port = ntohs (port);
- len = sizeof (addr.ipv4);
- }
- else
- {
- memcpy (&addr, result->ai_addr, sizeof (addr.ipv6));
- addr.ipv6.sin6_port = ntohs (port);
- len = sizeof (addr.ipv6);
- }
+ /* TODO: use priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4 */
g_free (ip);
- freeaddrinfo (result);
+ g_free (port_str);
}
else
{
- g_return_val_if_reached (FALSE);
+ g_assert_not_reached ();
}
- if (connect (fd, (struct sockaddr *) &addr, len) == -1)
- {
- DEBUG ("Error connecting socket: %s", g_strerror (errno));
- return FALSE;
- }
- DEBUG ("Connected to socket");
-
- channel = g_io_channel_unix_new (fd);
- g_io_channel_set_encoding (channel, NULL, NULL);
- g_io_channel_set_buffered (channel, FALSE);
- g_io_channel_set_close_on_unref (channel, TRUE);
+ /* Block the transport while there is no open bytestream to transfer
+ * its data. */
+ gibber_transport_block_receiving (transport, TRUE);
- g_hash_table_insert (priv->fd_to_bytestreams, GINT_TO_POINTER (fd),
- g_object_ref (bytestream));
- g_hash_table_insert (priv->bytestream_to_io_channel,
- g_object_ref (bytestream), channel);
+ g_hash_table_insert (priv->bytestream_to_transport, g_object_ref (bytestream),
+ g_object_ref (transport));
g_signal_connect (bytestream, "state-changed",
- G_CALLBACK (extra_bytestream_state_changed_cb), self);
+ G_CALLBACK (extra_bytestream_state_changed_cb), self);
return TRUE;
}
@@ -625,7 +619,6 @@ tube_stream_open (GabbleTubeStream *self,
GError **error)
{
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- int fd;
DEBUG ("called");
@@ -637,86 +630,55 @@ tube_stream_open (GabbleTubeStream *self,
/* We didn't create this tube so it doesn't have
* a socket associated with it. Let's create one */
g_assert (priv->address == NULL);
+ g_assert (priv->local_listener == NULL);
+ priv->local_listener = gibber_listener_new ();
+ g_signal_connect (priv->local_listener, "new-connection",
+ G_CALLBACK (local_new_connection_cb), self);
if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_UNIX)
{
GArray *array;
- struct sockaddr_un addr;
gchar suffix[8];
-
- fd = socket (PF_UNIX, SOCK_STREAM, 0);
- if (fd == -1)
- {
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error creating socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- memset (&addr, 0, sizeof (addr));
- addr.sun_family = PF_UNIX;
+ gchar *path;
+ int ret;
generate_ascii_string (8, suffix);
- snprintf (addr.sun_path, sizeof (addr.sun_path) - 1,
- "/tmp/stream-gabble-%.8s", suffix);
+ path = g_strdup_printf ("/tmp/stream-salut-%.8s", suffix);
- DEBUG ("create socket: %s", addr.sun_path);
+ DEBUG ("create socket: %s", path);
- array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (
- addr.sun_path));
- g_array_insert_vals (array, 0, addr.sun_path, strlen (addr.sun_path));
+ array = g_array_sized_new (TRUE, FALSE, sizeof (gchar), strlen (path));
+ g_array_insert_vals (array, 0, path, strlen (path));
priv->address = tp_g_value_slice_new (DBUS_TYPE_G_UCHAR_ARRAY);
g_value_set_boxed (priv->address, array);
g_array_free (array, TRUE);
- if (bind (fd, (struct sockaddr *) &addr, sizeof (addr)) == -1)
+ ret = gibber_listener_listen_socket (priv->local_listener, path, FALSE,
+ error);
+ if (ret != TRUE)
{
- DEBUG ("Error binding socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error binding socket: %s", g_strerror (errno));
+ g_assert (error != NULL && *error != NULL);
+ DEBUG ("Error listening on socket %s: %s", path, (*error)->message);
+ g_free (path);
return FALSE;
}
+ g_free (path);
}
else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV4)
{
- struct sockaddr_in addr;
- socklen_t len;
-
- addr.sin_family = AF_INET;
- addr.sin_port = 0; /* == ntohs (0) */
- addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
-
- len = sizeof (addr);
-
- fd = socket (PF_INET, SOCK_STREAM, 0);
- if (fd == -1)
- {
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error creating socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- if (bind (fd, (struct sockaddr *) &addr, len) == -1)
- {
- DEBUG ("Error binding socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error binding socket: %s", g_strerror (errno));
- return FALSE;
- }
+ int ret;
- if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
+ ret = gibber_listener_listen_tcp_loopback_af (priv->local_listener, 0,
+ GIBBER_AF_IPV4, error);
+ if (!ret)
{
- DEBUG ("getsockname failed: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "getsockname failed: %s", g_strerror (errno));
+ g_assert (error != NULL && *error != NULL);
+ DEBUG ("Error listening on socket: %s", (*error)->message);
return FALSE;
}
- DEBUG ("create socket %s:%u", "127.0.0.1", ntohs (addr.sin_port));
-
priv->address = tp_g_value_slice_new (
TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV4);
g_value_take_boxed (priv->address,
@@ -725,97 +687,42 @@ tube_stream_open (GabbleTubeStream *self,
dbus_g_type_struct_set (priv->address,
0, "127.0.0.1",
- 1, ntohs (addr.sin_port),
+ 1, gibber_listener_get_port (priv->local_listener),
G_MAXUINT);
}
else if (priv->address_type == TP_SOCKET_ADDRESS_TYPE_IPV6)
{
- struct sockaddr_in6 addr;
- socklen_t len;
- struct in6_addr loopback_addr = IN6ADDR_LOOPBACK_INIT;
-
- addr.sin6_family = AF_INET6;
- addr.sin6_port = 0; /* == ntohs (0) */
- addr.sin6_addr = loopback_addr;
-
- len = sizeof (addr);
-
- fd = socket (PF_INET6, SOCK_STREAM, 0);
- if (fd == -1)
- {
- DEBUG ("Error creating socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error creating socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- if (bind (fd, (struct sockaddr *) &addr, len) == -1)
- {
- DEBUG ("Error binding socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error binding socket: %s", g_strerror (errno));
- return FALSE;
- }
+ int ret;
- if (getsockname (fd, (struct sockaddr *) &addr, &len) == -1)
+ ret = gibber_listener_listen_tcp_loopback_af (priv->local_listener, 0,
+ GIBBER_AF_IPV6, error);
+ if (!ret)
{
- DEBUG ("getsockname failed: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "getsockname failed: %s", g_strerror (errno));
+ g_assert (error != NULL && *error != NULL);
+ DEBUG ("Error listening on socket: %s", (*error)->message);
return FALSE;
}
- DEBUG ("create socket %s:%u", "::1", ntohs (addr.sin6_port));
-
priv->address = tp_g_value_slice_new (
TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV6);
g_value_take_boxed (priv->address,
dbus_g_type_specialized_construct (
- TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV6));
+ TP_STRUCT_TYPE_SOCKET_ADDRESS_IPV6));
dbus_g_type_struct_set (priv->address,
0, "::1",
- 1, ntohs (addr.sin6_port),
+ 1, gibber_listener_get_port (priv->local_listener),
G_MAXUINT);
}
else
{
- g_return_val_if_reached (FALSE);
+ g_assert_not_reached ();
}
- if (listen (fd, 5) == -1)
- {
- DEBUG ("Error listening socket: %s", g_strerror (errno));
- g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
- "Error listening socket: %s", g_strerror (errno));
- return FALSE;
- }
-
- priv->listen_io_channel = g_io_channel_unix_new (fd);
- g_io_channel_set_encoding (priv->listen_io_channel, NULL, NULL);
- g_io_channel_set_buffered (priv->listen_io_channel, FALSE);
- g_io_channel_set_close_on_unref (priv->listen_io_channel, TRUE);
-
- priv->listen_io_channel_source_id = g_io_add_watch (priv->listen_io_channel,
- G_IO_IN, listen_cb, self);
-
return TRUE;
}
static void
-remove_watcher_source_id (gpointer data)
-{
- guint source_id = GPOINTER_TO_UINT (data);
- GSource *source;
-
- source = g_main_context_find_source_by_id (NULL, source_id);
- if (source != NULL)
- {
- g_source_destroy (source);
- }
-}
-
-static void
gabble_tube_stream_init (GabbleTubeStream *self)
{
GabbleTubeStreamPrivate *priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
@@ -823,19 +730,14 @@ gabble_tube_stream_init (GabbleTubeStream *self)
self->priv = priv;
- priv->fd_to_bytestreams = g_hash_table_new_full (g_direct_hash,
- g_direct_equal, NULL, (GDestroyNotify) g_object_unref);
-
- priv->bytestream_to_io_channel = g_hash_table_new_full (g_direct_hash,
+ priv->transport_to_bytestream = g_hash_table_new_full (g_direct_hash,
g_direct_equal, (GDestroyNotify) g_object_unref,
- (GDestroyNotify) g_io_channel_unref);
+ (GDestroyNotify) g_object_unref);
- priv->io_channel_to_watcher_source_id = g_hash_table_new_full (g_direct_hash,
- g_direct_equal, (GDestroyNotify) g_io_channel_unref,
- (GDestroyNotify) remove_watcher_source_id);
+ priv->bytestream_to_transport = g_hash_table_new_full (g_direct_hash,
+ g_direct_equal, (GDestroyNotify) g_object_unref,
+ (GDestroyNotify) g_object_unref);
- priv->listen_io_channel = NULL;
- priv->listen_io_channel_source_id = 0;
priv->address_type = TP_SOCKET_ADDRESS_TYPE_UNIX;
priv->address = NULL;
priv->access_control = TP_SOCKET_ACCESS_CONTROL_LOCALHOST;
@@ -852,22 +754,21 @@ close_each_extra_bytestream (gpointer key,
{
GabbleTubeStream *self = GABBLE_TUBE_STREAM (user_data);
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (self);
- GabbleBytestreamIface *bytestream = (GabbleBytestreamIface *) value;
- GIOChannel *channel;
+ GibberTransport *transport = (GibberTransport *) value;
+ GabbleBytestreamIface *bytestream = (GabbleBytestreamIface *) key;
/* We are iterating over priv->fd_to_bytestreams so we can't modify it.
* Disconnect signals so extra_bytestream_state_changed_cb won't be
* called */
g_signal_handlers_disconnect_matched (bytestream, G_SIGNAL_MATCH_DATA,
0, 0, NULL, NULL, self);
+ g_signal_handlers_disconnect_matched (transport, G_SIGNAL_MATCH_DATA,
+ 0, 0, NULL, NULL, self);
gabble_bytestream_iface_close (bytestream, NULL);
+ gibber_transport_disconnect (transport);
- channel = g_hash_table_lookup (priv->bytestream_to_io_channel, bytestream);
- g_assert (channel != NULL);
-
- g_hash_table_remove (priv->bytestream_to_io_channel, bytestream);
- g_hash_table_remove (priv->io_channel_to_watcher_source_id, channel);
+ g_hash_table_remove (priv->transport_to_bytestream, transport);
return TRUE;
}
@@ -904,37 +805,24 @@ gabble_tube_stream_dispose (GObject *object)
g_string_free (path, TRUE);
}
- if (priv->fd_to_bytestreams != NULL)
+ if (priv->transport_to_bytestream != NULL)
{
- g_hash_table_destroy (priv->fd_to_bytestreams);
- priv->fd_to_bytestreams = NULL;
+ g_hash_table_destroy (priv->transport_to_bytestream);
+ priv->transport_to_bytestream = NULL;
}
- if (priv->bytestream_to_io_channel != NULL)
+ if (priv->bytestream_to_transport != NULL)
{
- g_hash_table_destroy (priv->bytestream_to_io_channel);
- priv->bytestream_to_io_channel = NULL;
- }
-
- if (priv->io_channel_to_watcher_source_id != NULL)
- {
- g_hash_table_destroy (priv->io_channel_to_watcher_source_id);
- priv->io_channel_to_watcher_source_id = NULL;
+ g_hash_table_destroy (priv->bytestream_to_transport);
+ priv->bytestream_to_transport = NULL;
}
tp_handle_unref (contact_repo, priv->initiator);
- if (priv->listen_io_channel_source_id != 0)
- {
- g_source_destroy (g_main_context_find_source_by_id (NULL,
- priv->listen_io_channel_source_id));
- priv->listen_io_channel_source_id = 0;
- }
-
- if (priv->listen_io_channel)
+ if (priv->local_listener != NULL)
{
- g_io_channel_unref (priv->listen_io_channel);
- priv->listen_io_channel = NULL;
+ g_object_unref (priv->local_listener);
+ priv->local_listener = NULL;
}
priv->dispose_has_run = TRUE;
@@ -1452,34 +1340,32 @@ data_received_cb (GabbleBytestreamIface *bytestream,
{
GabbleTubeStream *tube = GABBLE_TUBE_STREAM (user_data);
GabbleTubeStreamPrivate *priv = GABBLE_TUBE_STREAM_GET_PRIVATE (tube);
- gsize written;
- GIOChannel *channel;
- GIOStatus status;
+ GibberTransport *transport;
GError *error = NULL;
DEBUG ("received %" G_GSIZE_FORMAT " bytes from bytestream", data->len);
- channel = g_hash_table_lookup (priv->bytestream_to_io_channel, bytestream);
- if (channel == NULL)
+ transport = g_hash_table_lookup (priv->bytestream_to_transport, bytestream);
+ if (transport == NULL)
{
- DEBUG ("no IO channel associated with the bytestream");
+ DEBUG ("no transport associated with the bytestream");
return;
}
- status = g_io_channel_write_chars (channel, data->str, data->len,
- &written, &error);
- if (status == G_IO_STATUS_NORMAL)
- {
- DEBUG ("%" G_GSIZE_FORMAT " bytes written to the socket", written);
- }
- else
+ if (!gibber_transport_send (transport, (const guint8 *) data->str, data->len,
+ &error))
+ {
+ DEBUG ("sending failed: %s", error->message);
+ g_error_free (error);
+ }
+
+ if (!gibber_transport_buffer_is_empty (transport))
{
- DEBUG ("error writing to socket: %s",
- error ? error->message : "");
+ /* We don't want to send more data while the buffer isn't empty */
+ DEBUG ("tube buffer isn't empty. Block the bytestream");
+ //gabble_bytestream_iface_block_read (bytestream, TRUE);
+ //TODO
}
-
- if (error != NULL)
- g_error_free (error);
}
GabbleTubeStream *
@@ -1577,7 +1463,7 @@ gabble_tube_stream_close (GabbleTubeIface *tube, gboolean closed_remotely)
return;
priv->closed = TRUE;
- g_hash_table_foreach_remove (priv->fd_to_bytestreams,
+ g_hash_table_foreach_remove (priv->bytestream_to_transport,
close_each_extra_bytestream, self);
if (!closed_remotely && priv->handle_type == TP_HANDLE_TYPE_CONTACT)
--
1.5.6.5
More information about the Telepathy-commits
mailing list