[Telepathy-commits] [telepathy-salut/master] gibber-bytestream-direct: Implement gibber_bytestream_direct_accept_socket()
Alban Crequy
alban.crequy at collabora.co.uk
Tue Nov 25 03:59:17 PST 2008
20080729103850-a41c0-2b8d67c07d54a6e918812daa7ff07fbae5509aa3.gz
---
lib/gibber/gibber-bytestream-direct.c | 209 ++++++++++++++++++++++++++++++++-
lib/gibber/gibber-bytestream-direct.h | 9 ++
2 files changed, 217 insertions(+), 1 deletions(-)
diff --git a/lib/gibber/gibber-bytestream-direct.c b/lib/gibber/gibber-bytestream-direct.c
index b3ce852..648cae4 100644
--- a/lib/gibber/gibber-bytestream-direct.c
+++ b/lib/gibber/gibber-bytestream-direct.c
@@ -26,6 +26,9 @@
#include <glib.h>
#include "gibber-xmpp-connection.h"
+#include "gibber-linklocal-transport.h"
+#include "gibber-util.h"
+#include "gibber-xmpp-error.h"
#define DEBUG_FLAG DEBUG_BYTESTREAM
#include "gibber-debug.h"
@@ -45,6 +48,7 @@ enum
{
DATA_RECEIVED,
STATE_CHANGED,
+ WRITE_BLOCKED,
LAST_SIGNAL
};
@@ -72,9 +76,21 @@ struct _GibberBytestreamDirectPrivate
gchar *stream_init_id;
GibberBytestreamState state;
+ gchar *host;
+
+ /* Are we the recipient of this bytestream?
+ * If not we are the sender */
+ gboolean recipient;
+ GibberTransport *transport;
+ gboolean write_blocked;
+ gboolean read_blocked;
+
guint16 seq;
guint16 last_seq_recv;
+ GibberBytestreamDirectCheckAddrFunc check_addr_func;
+ gpointer check_addr_func_data;
+
gboolean dispose_has_run;
};
@@ -275,6 +291,196 @@ gibber_bytestream_direct_class_init (
NULL, NULL,
g_cclosure_marshal_VOID__UINT,
G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ signals[WRITE_BLOCKED] =
+ g_signal_new ("write-blocked",
+ G_OBJECT_CLASS_TYPE (gibber_bytestream_direct_class),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+ 0,
+ NULL, NULL,
+ g_cclosure_marshal_VOID__BOOLEAN,
+ G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
+}
+
+void
+gibber_bytestream_direct_set_check_addr_func (
+ GibberBytestreamDirect *self,
+ GibberBytestreamDirectCheckAddrFunc func,
+ gpointer user_data)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ priv->check_addr_func = func;
+ priv->check_addr_func_data = user_data;
+}
+
+static void
+transport_handler (GibberTransport *transport,
+ GibberBuffer *data,
+ gpointer user_data)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (user_data);
+ GString *buffer;
+
+ buffer = g_string_new_len ((const gchar *) data->data, data->length);
+
+ g_signal_emit (G_OBJECT (self), signals[DATA_RECEIVED], 0, NULL, buffer);
+
+ g_string_free (buffer, TRUE);
+}
+
+static void
+transport_connected_cb (GibberTransport *transport,
+ GibberBytestreamDirect *self)
+{
+ DEBUG ("transport connected. Bytestream is now open");
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_OPEN,
+ NULL);
+}
+
+static void
+transport_disconnected_cb (GibberTransport *transport,
+ GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSED)
+ return;
+
+ DEBUG ("transport disconnected. close the bytestream");
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_ACCEPTED)
+ {
+ /* Connection to host failed */
+ GError e = { GIBBER_XMPP_ERROR, XMPP_ERROR_ITEM_NOT_FOUND,
+ "connection failed" };
+
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), &e);
+ }
+ else
+ {
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), NULL);
+ }
+}
+
+static void
+change_write_blocked_state (GibberBytestreamDirect *self,
+ gboolean blocked)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->write_blocked == blocked)
+ return;
+
+ priv->write_blocked = blocked;
+ g_signal_emit (self, signals[WRITE_BLOCKED], 0, blocked);
+}
+
+static void
+bytestream_closed (GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->transport != NULL)
+ {
+ g_signal_handlers_disconnect_matched (priv->transport,
+ G_SIGNAL_MATCH_DATA, 0, 0, NULL, NULL, self);
+ gibber_transport_disconnect (priv->transport);
+ g_object_unref (priv->transport);
+ priv->transport = NULL;
+ }
+
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_CLOSED, NULL);
+}
+static void
+transport_buffer_empty_cb (GibberTransport *transport,
+ GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSING)
+ {
+ DEBUG ("buffer is now empty. Bytestream can be closed");
+ bytestream_closed (self);
+ }
+
+ else if (priv->write_blocked)
+ {
+ DEBUG ("buffer is empty, unblock write to the bytestream");
+ change_write_blocked_state (self, FALSE);
+ }
+}
+
+static void
+set_transport (GibberBytestreamDirect *self,
+ GibberTransport *transport)
+{
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ g_assert (priv->transport == NULL);
+
+ priv->transport = transport;
+ gibber_transport_set_handler (transport, transport_handler, self);
+
+ g_signal_connect (transport, "connected",
+ G_CALLBACK (transport_connected_cb), self);
+ g_signal_connect (transport, "disconnected",
+ G_CALLBACK (transport_disconnected_cb), self);
+ g_signal_connect (priv->transport, "buffer-empty",
+ G_CALLBACK (transport_buffer_empty_cb), self);
+}
+
+gboolean
+gibber_bytestream_direct_accept_socket (GibberBytestreamIface *bytestream,
+ int listen_fd)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberBytestreamDirectPrivate *priv;
+ GibberLLTransport *ll_transport;
+ struct sockaddr_storage addr;
+ int fd, ret;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ socklen_t addrlen = sizeof (struct sockaddr_storage);
+
+
+ priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state != GIBBER_BYTESTREAM_STATE_INITIATING)
+ {
+ DEBUG ("bytestream is not is the initiating state (state %d)",
+ priv->state);
+ return FALSE;
+ }
+
+ fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
+ gibber_normalize_address (&addr);
+
+ ret = getnameinfo ((struct sockaddr *) &addr, addrlen,
+ host, NI_MAXHOST, port, NI_MAXSERV,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+
+ if (priv->check_addr_func != NULL && !priv->check_addr_func (self, &addr,
+ addrlen, priv->check_addr_func_data))
+ {
+ DEBUG ("connection from %s refused by the bytestream user", host);
+ return FALSE;
+ }
+
+ if (ret == 0)
+ DEBUG("New connection from %s port %s", host, port);
+ else
+ DEBUG("New connection..");
+
+ ll_transport = gibber_ll_transport_new ();
+ set_transport (self, GIBBER_TRANSPORT (ll_transport));
+ gibber_ll_transport_open_fd (ll_transport, fd);
+
+ return TRUE;
}
/*
@@ -319,11 +525,12 @@ gibber_bytestream_direct_close (GibberBytestreamIface *bytestream,
/*
* gibber_bytestream_direct_initiate
+ * connect to the remote end
*
* Implements gibber_bytestream_iface_initiate on GibberBytestreamIface
*/
static gboolean
-gibber_bytestream_direct_initiate (GibberBytestreamIface *bytestream)
+gibber_bytestream_direct_initiate (GibberBytestreamIface *self)
{
DEBUG ("not implemented");
return FALSE;
diff --git a/lib/gibber/gibber-bytestream-direct.h b/lib/gibber/gibber-bytestream-direct.h
index aede82e..8295a53 100644
--- a/lib/gibber/gibber-bytestream-direct.h
+++ b/lib/gibber/gibber-bytestream-direct.h
@@ -21,6 +21,7 @@
#define __GIBBER_BYTESTREAM_DIRECT_H__
#include <glib-object.h>
+#include <netdb.h>
#include "gibber-bytestream-iface.h"
G_BEGIN_DECLS
@@ -28,6 +29,10 @@ G_BEGIN_DECLS
typedef struct _GibberBytestreamDirect GibberBytestreamDirect;
typedef struct _GibberBytestreamDirectClass GibberBytestreamDirectClass;
+typedef gboolean (* GibberBytestreamDirectCheckAddrFunc) (
+ GibberBytestreamDirect *bytestream, struct sockaddr_storage *addr,
+ socklen_t addrlen, gpointer user_data);
+
struct _GibberBytestreamDirectClass {
GObjectClass parent_class;
};
@@ -57,6 +62,10 @@ GType gibber_bytestream_direct_get_type (void);
(G_TYPE_INSTANCE_GET_CLASS ((obj), GIBBER_TYPE_BYTESTREAM_DIRECT,\
GibberBytestreamDirectClass))
+void gibber_bytestream_direct_set_check_addr_func (
+ GibberBytestreamDirect *bytestream,
+ GibberBytestreamDirectCheckAddrFunc func, gpointer user_data);
+
G_END_DECLS
#endif /* #ifndef __GIBBER_BYTESTREAM_DIRECT_H__ */
--
1.5.6.5
More information about the Telepathy-commits
mailing list