[Telepathy-commits] [telepathy-salut/master] DirectBytestreamManager listens for connections (initiator side). Prototype to make new streams (recipient side)
Alban Crequy
alban.crequy at collabora.co.uk
Tue Nov 25 03:59:20 PST 2008
20080729104508-a41c0-b69e618eb934e92af6a03c988d9f8f589e5f0842.gz
---
src/salut-direct-bytestream-manager.c | 220 ++++++++++++++++++++++++++++++++-
src/salut-direct-bytestream-manager.h | 6 +
src/salut-tubes-channel.c | 16 +++-
src/tube-stream.c | 92 +++++++++++++-
4 files changed, 325 insertions(+), 9 deletions(-)
diff --git a/src/salut-direct-bytestream-manager.c b/src/salut-direct-bytestream-manager.c
index 0ecf2fa..e8551a0 100644
--- a/src/salut-direct-bytestream-manager.c
+++ b/src/salut-direct-bytestream-manager.c
@@ -22,12 +22,15 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <unistd.h>
+#include <errno.h>
#include <gibber/gibber-xmpp-stanza.h>
#include <gibber/gibber-namespaces.h>
#include <gibber/gibber-xmpp-error.h>
#include <gibber/gibber-iq-helper.h>
#include <gibber/gibber-bytestream-direct.h>
+#include <gibber/gibber-util.h>
#include "salut-im-manager.h"
#include "salut-muc-manager.h"
@@ -57,6 +60,17 @@ struct _SalutDirectBytestreamManagerPrivate
SalutXmppConnectionManager *xmpp_connection_manager;
gchar *host_name_fqdn;
+ /* (GibberTransport *) -> (GibberBytestreamIface *) */
+ GHashTable *transport_to_bytestream;
+ /* (GibberBytestreamIface *) -> (GibberTransport *) */
+ GHashTable *bytestream_to_transport;
+ /* (GibberBytestreamIface *) -> int */
+ GHashTable *bytestream_to_fd;
+
+ /* guint id -> guint listener_watch
+ * When used by stream tubes, the id is the tube_id */
+ GHashTable *listener_watchs;
+
gboolean dispose_has_run;
};
@@ -181,6 +195,8 @@ salut_direct_bytestream_manager_constructor (GType type,
g_assert (priv->xmpp_connection_manager != NULL);
g_assert (priv->host_name_fqdn != NULL);
+ priv->listener_watchs = g_hash_table_new (NULL, NULL);
+
return obj;
}
@@ -241,6 +257,178 @@ salut_direct_bytestream_manager_new (SalutConnection *conn,
NULL);
}
+/* transport between the 2 CM, called on the initiator's side */
+static void
+set_transport (SalutDirectBytestreamManager *self,
+ GibberTransport *transport)
+{
+ //SalutDirectBytestreamManagerPrivate *priv =
+ // SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+
+ /* the transport_handler is called when something is received in the
+ * transport */
+ //gibber_transport_set_handler (transport, transport_handler, self);
+
+ /*
+ g_signal_connect (transport, "connected",
+ G_CALLBACK (transport_connected_cb), self);
+ g_signal_connect (transport, "disconnected",
+ G_CALLBACK (transport_disconnected_cb), self);
+ g_signal_connect (priv->transport, "buffer-empty",
+ G_CALLBACK (transport_buffer_empty_cb), self);
+ */
+}
+
+/* callback when receiving a connection from the remote CM */
+static gboolean
+listener_io_in_cb (GIOChannel *source,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ SalutDirectBytestreamManager *self = SALUT_DIRECT_BYTESTREAM_MANAGER
+ (user_data);
+ int listen_fd, fd, ret;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ struct sockaddr_storage addr;
+ socklen_t addrlen = sizeof (struct sockaddr_storage);
+ GibberLLTransport *ll_transport;
+
+ listen_fd = g_io_channel_unix_get_fd (source);
+ fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
+ gibber_normalize_address (&addr);
+
+ ret = getnameinfo ((struct sockaddr *) &addr, addrlen,
+ host, NI_MAXHOST, port, NI_MAXSERV,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+
+ /* check_addr_func */
+
+ if (ret == 0)
+ DEBUG("New connection from %s port %s", host, port);
+ else
+ DEBUG("New connection..");
+
+ ll_transport = gibber_ll_transport_new ();
+ set_transport (self, GIBBER_TRANSPORT (ll_transport));
+ gibber_ll_transport_open_fd (ll_transport, fd);
+
+ return FALSE;
+}
+
+
+/* the id is an opaque id used by the user of the SalutDirectBytestreamManager
+ * object. When used by stream tubes, the id is the tube.
+ *
+ * return: port
+ */
+static int
+start_listen_for_connection (SalutDirectBytestreamManager *self,
+ gpointer id)
+{
+ SalutDirectBytestreamManagerPrivate *priv;
+ priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+ GIOChannel *listener;
+ guint *listener_watch;
+ int port;
+ int fd = -1, ret, yes = 1;
+ struct addrinfo req, *ans = NULL;
+ struct sockaddr *addr;
+ struct sockaddr_in addr4;
+ struct sockaddr_in6 addr6;
+ socklen_t len;
+ #define BACKLOG 1
+
+ memset (&req, 0, sizeof (req));
+ req.ai_flags = AI_PASSIVE;
+ req.ai_family = AF_UNSPEC;
+ req.ai_socktype = SOCK_STREAM;
+ req.ai_protocol = IPPROTO_TCP;
+
+ ret = getaddrinfo (NULL, "0", &req, &ans);
+ if (ret != 0)
+ {
+ DEBUG ("getaddrinfo failed: %s", gai_strerror (ret));
+ goto error;
+ }
+
+ fd = socket (ans->ai_family, ans->ai_socktype, ans->ai_protocol);
+ if (fd == -1)
+ {
+ DEBUG ("socket failed: %s", g_strerror (errno));
+ goto error;
+ }
+
+ ret = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof (int));
+ if (ret == -1)
+ {
+ DEBUG ("setsockopt failed: %s", g_strerror (errno));
+ goto error;
+ }
+
+ ret = bind (fd, ans->ai_addr, ans->ai_addrlen);
+ if (ret < 0)
+ {
+ DEBUG ("bind failed: %s", g_strerror (errno));
+ goto error;
+ }
+
+ if (ans->ai_family == AF_INET)
+ {
+ len = sizeof (struct sockaddr_in);
+ addr = (struct sockaddr *) &addr4;
+ }
+ else
+ {
+ len = sizeof (struct sockaddr_in6);
+ addr = (struct sockaddr *) &addr6;
+ }
+
+ if (getsockname (fd, addr, &len) == -1)
+ {
+ DEBUG ("getsockname failed: %s", g_strerror (errno));
+ goto error;
+ }
+
+ if (ans->ai_family == AF_INET)
+ {
+ port = ntohs (addr4.sin_port);
+ }
+ else
+ {
+ port = ntohs (addr6.sin6_port);
+ }
+
+ ret = listen (fd, BACKLOG);
+ if (ret == -1)
+ {
+ DEBUG ("listen failed: %s", g_strerror (errno));
+ goto error;
+ }
+
+ DEBUG ("listen on %s:%d", priv->host_name_fqdn, port);
+
+ listener = g_io_channel_unix_new (fd);
+ g_io_channel_set_close_on_unref (listener, TRUE);
+ listener_watch = g_malloc (sizeof (*listener_watch));
+ *listener_watch = g_io_add_watch (listener, G_IO_IN,
+ listener_io_in_cb, self);
+
+ /* add id->listener_watch in priv->listener_watchs */
+ g_hash_table_insert (priv->listener_watchs, id, listener_watch);
+
+ freeaddrinfo (ans);
+ return port;
+
+error:
+ if (fd > 0)
+ close (fd);
+
+ if (ans != NULL)
+ freeaddrinfo (ans);
+ return -1;
+}
+
void
salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
SalutContact *contact,
@@ -249,11 +437,35 @@ salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
SalutDirectBytestreamManagerPrivate *priv;
priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
- g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
- "xmpp-connection", connection,
+ DEBUG ("salut_direct_new_listening_stream: Called.");
+
+ start_listen_for_connection (self, NULL);
+
+}
+
+GibberBytestreamIface *
+salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
+ SalutContact *contact)
+{
+ GibberBytestreamIface *bytestream;
+ SalutDirectBytestreamManagerPrivate *priv;
+
+ priv = SALUT_DIRECT_BYTESTREAM_MANAGER_GET_PRIVATE (self);
+
+ bytestream = g_object_new (GIBBER_TYPE_BYTESTREAM_DIRECT,
"state", GIBBER_BYTESTREAM_STATE_LOCAL_PENDING,
- "self-id", priv->connection->name,
- "peer-id", contact->name,
NULL);
+
+ g_assert (bytestream != NULL);
+
+ /* Let's start the initiation of the stream */
+ if (!gibber_bytestream_iface_initiate (bytestream))
+ {
+ /* Initiation failed. */
+ gibber_bytestream_iface_close (bytestream, NULL);
+ bytestream = NULL;
+ }
+
+ return bytestream;
}
diff --git a/src/salut-direct-bytestream-manager.h b/src/salut-direct-bytestream-manager.h
index 69ad1d5..ccce17e 100644
--- a/src/salut-direct-bytestream-manager.h
+++ b/src/salut-direct-bytestream-manager.h
@@ -66,9 +66,15 @@ SalutDirectBytestreamManager *
salut_direct_bytestream_manager_new (SalutConnection *connection,
const gchar *host_name_fqdn);
+/* To be used on the CM-initiator side, to receive connections from the remote
+ * CM */
void
salut_direct_new_listening_stream (SalutDirectBytestreamManager *self,
SalutContact *contact, GibberXmppConnection *connection);
+/* To be used on the CM-receptor side, to make a new connection */
+GibberBytestreamIface *
+salut_direct_bytestream_manager_new_stream (SalutDirectBytestreamManager *self,
+ SalutContact *contact);
#endif /* #ifndef __SALUT_DIRECT_BYTESTREAM_MANAGER_H__*/
diff --git a/src/salut-tubes-channel.c b/src/salut-tubes-channel.c
index afcece8..e5645b7 100644
--- a/src/salut-tubes-channel.c
+++ b/src/salut-tubes-channel.c
@@ -46,6 +46,7 @@
#include "salut-contact.h"
#include "salut-muc-channel.h"
#include "salut-xmpp-connection-manager.h"
+#include "salut-direct-bytestream-manager.h"
#include "tube-iface.h"
#include "tube-dbus.h"
#include "tube-stream.h"
@@ -1795,6 +1796,20 @@ _send_channel_iq_tube (gpointer key,
gchar *tube_id_str = g_strdup_printf ("%d", tube_id);
+ /* listen for future connections from the remote CM before sending the
+ * iq */
+ SalutDirectBytestreamManager *direct_bytestream_mgr;
+ g_assert (priv->conn != NULL);
+ g_object_get (priv->conn,
+ "direct-bytestream-manager", &direct_bytestream_mgr,
+ NULL);
+ g_assert (direct_bytestream_mgr != NULL);
+
+ salut_direct_new_listening_stream (direct_bytestream_mgr, priv->contact,
+ priv->xmpp_connection);
+ g_object_unref (direct_bytestream_mgr);
+
+
contact_repo = tp_base_connection_get_handles (
(TpBaseConnection*) priv->conn, TP_HANDLE_TYPE_CONTACT);
@@ -1865,7 +1880,6 @@ _send_channel_iq_tubes (SalutTubesChannel *self)
}
g_hash_table_foreach (priv->tubes, _send_channel_iq_tube, self);
-
}
diff --git a/src/tube-stream.c b/src/tube-stream.c
index 171dde8..7d838c1 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -48,6 +48,7 @@
#include "salut-connection.h"
#include "tube-iface.h"
#include "salut-si-bytestream-manager.h"
+#include "salut-direct-bytestream-manager.h"
#include "salut-contact-manager.h"
#include "salut-xmpp-connection-manager.h"
@@ -90,7 +91,7 @@ static guint signals[LAST_SIGNAL] = {0};
enum
{
PROP_CONNECTION = 1,
- PROP_CHANNEL,
+ //PROP_CHANNEL,
PROP_HANDLE,
PROP_HANDLE_TYPE,
PROP_SELF_HANDLE,
@@ -116,12 +117,17 @@ struct _SalutTubeStreamPrivate
TpHandleType handle_type;
TpHandle self_handle;
guint id;
+
+ /* Bytestreams for MUC (using stream initiation) */
+
/* (GibberTransport *) -> (GibberBytestreamIface *) */
GHashTable *transport_to_bytestream;
/* (GibberBytestreamIface *) -> (GibberTransport *) */
GHashTable *bytestream_to_transport;
/* (GibberBytestreamIface *) -> int */
GHashTable *bytestream_to_fd;
+
+
TpHandle initiator;
gchar *service;
GHashTable *parameters;
@@ -303,6 +309,7 @@ extra_bytestream_state_changed_cb (GibberBytestreamIface *bytestream,
struct _extra_bytestream_negotiate_cb_data
{
SalutTubeStream *self;
+ /* file descriptor of the connection from the local application */
gint fd;
};
@@ -447,6 +454,79 @@ start_stream_initiation (SalutTubeStream *self,
return result;
}
+static gboolean
+start_stream_direct (SalutTubeStream *self,
+ gint fd,
+ GError **error)
+{
+ SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+ TpHandleRepoIface *contact_repo;
+ const gchar *jid;
+ gboolean result;
+ struct _extra_bytestream_negotiate_cb_data *data;
+ SalutContact *contact;
+ SalutContactManager *contact_mgr;
+ SalutDirectBytestreamManager *direct_bytestream_mgr;
+
+ g_assert (priv->handle_type == TP_HANDLE_TYPE_CONTACT);
+
+ contact_repo = tp_base_connection_get_handles (
+ (TpBaseConnection*) priv->conn, TP_HANDLE_TYPE_CONTACT);
+
+ jid = tp_handle_inspect (contact_repo, priv->initiator);
+
+ data = g_slice_new (struct _extra_bytestream_negotiate_cb_data);
+ data->self = self;
+ data->fd = fd;
+
+ g_object_get (priv->conn,
+ "direct-bytestream-manager", &direct_bytestream_mgr,
+ "contact-manager", &contact_mgr,
+ NULL);
+ g_assert (direct_bytestream_mgr != NULL);
+ g_assert (contact_mgr != NULL);
+
+ contact = salut_contact_manager_get_contact (contact_mgr, priv->initiator);
+ if (contact == NULL)
+ {
+ result = FALSE;
+ g_set_error (error, TP_ERRORS, TP_ERROR_NETWORK_ERROR,
+ "can't find contact with handle %d", priv->initiator);
+ }
+ else
+ {
+ GibberBytestreamIface *bytestream;
+ bytestream = salut_direct_bytestream_manager_new_stream (
+ direct_bytestream_mgr,
+ contact);
+
+ if (bytestream == NULL)
+ {
+ DEBUG ("initiator refused new bytestream");
+
+ close (fd);
+ }
+ else
+ {
+ DEBUG ("extra bytestream accepted");
+
+ g_hash_table_insert (priv->bytestream_to_fd,
+ g_object_ref (bytestream), GUINT_TO_POINTER (fd));
+
+ g_signal_connect (bytestream, "state-changed",
+ G_CALLBACK (extra_bytestream_state_changed_cb), self);
+ }
+
+ g_object_unref (contact);
+ }
+
+ g_object_unref (direct_bytestream_mgr);
+ g_object_unref (contact_mgr);
+
+ return result;
+}
+
+/* callback for listening connections from the local application */
gboolean
listen_cb (GIOChannel *source,
GIOCondition condition,
@@ -506,9 +586,11 @@ listen_cb (GIOChannel *source,
*/
if (priv->handle_type == TP_HANDLE_TYPE_CONTACT)
{
- /* TODO: To be implemented */
- DEBUG ("SalutDirectBytestreamManager to be implemented");
- close (fd);
+ if (!start_stream_direct (self, fd, NULL))
+ {
+ DEBUG ("closing new client connection");
+ close (fd);
+ }
}
else
{
@@ -1239,6 +1321,7 @@ salut_tube_stream_class_init (SalutTubeStreamClass *salut_tube_stream_class)
g_object_class_install_property (object_class, PROP_ACCESS_CONTROL_PARAM,
param_spec);
+ /*
param_spec = g_param_spec_object (
"channel",
"SalutTubesChannel object",
@@ -1250,6 +1333,7 @@ salut_tube_stream_class_init (SalutTubeStreamClass *salut_tube_stream_class)
G_PARAM_STATIC_BLURB);
g_object_class_install_property (object_class, PROP_CHANNEL,
param_spec);
+ */
param_spec = g_param_spec_object (
"xmpp-connection-manager",
--
1.5.6.5
More information about the Telepathy-commits
mailing list