[Telepathy-commits] [telepathy-salut/master] gibber-bytestream-direct: Implement gibber_bytestream_direct_accept_socket()

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:17 PST 2008


20080729103850-a41c0-2b8d67c07d54a6e918812daa7ff07fbae5509aa3.gz
---
 lib/gibber/gibber-bytestream-direct.c |  209 ++++++++++++++++++++++++++++++++-
 lib/gibber/gibber-bytestream-direct.h |    9 ++
 2 files changed, 217 insertions(+), 1 deletions(-)

diff --git a/lib/gibber/gibber-bytestream-direct.c b/lib/gibber/gibber-bytestream-direct.c
index b3ce852..648cae4 100644
--- a/lib/gibber/gibber-bytestream-direct.c
+++ b/lib/gibber/gibber-bytestream-direct.c
@@ -26,6 +26,9 @@
 #include <glib.h>
 
 #include "gibber-xmpp-connection.h"
+#include "gibber-linklocal-transport.h"
+#include "gibber-util.h"
+#include "gibber-xmpp-error.h"
 
 #define DEBUG_FLAG DEBUG_BYTESTREAM
 #include "gibber-debug.h"
@@ -45,6 +48,7 @@ enum
 {
   DATA_RECEIVED,
   STATE_CHANGED,
+  WRITE_BLOCKED,
   LAST_SIGNAL
 };
 
@@ -72,9 +76,21 @@ struct _GibberBytestreamDirectPrivate
   gchar *stream_init_id;
   GibberBytestreamState state;
 
+  gchar *host;
+
+  /* Are we the recipient of this bytestream?
+   * If not we are the sender */
+  gboolean recipient;
+  GibberTransport *transport;
+  gboolean write_blocked;
+  gboolean read_blocked;
+
   guint16 seq;
   guint16 last_seq_recv;
 
+  GibberBytestreamDirectCheckAddrFunc check_addr_func;
+  gpointer check_addr_func_data;
+
   gboolean dispose_has_run;
 };
 
@@ -275,6 +291,196 @@ gibber_bytestream_direct_class_init (
                   NULL, NULL,
                   g_cclosure_marshal_VOID__UINT,
                   G_TYPE_NONE, 1, G_TYPE_UINT);
+
+  signals[WRITE_BLOCKED] =
+    g_signal_new ("write-blocked",
+                  G_OBJECT_CLASS_TYPE (gibber_bytestream_direct_class),
+                  G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
+                  0,
+                  NULL, NULL,
+                  g_cclosure_marshal_VOID__BOOLEAN,
+                  G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
+}
+
+void
+gibber_bytestream_direct_set_check_addr_func (
+    GibberBytestreamDirect *self,
+    GibberBytestreamDirectCheckAddrFunc func,
+    gpointer user_data)
+{
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  priv->check_addr_func = func;
+  priv->check_addr_func_data = user_data;
+}
+
+static void
+transport_handler (GibberTransport *transport,
+                   GibberBuffer *data,
+                   gpointer user_data)
+{
+  GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (user_data);
+  GString *buffer;
+
+  buffer = g_string_new_len ((const gchar *) data->data, data->length);
+
+  g_signal_emit (G_OBJECT (self), signals[DATA_RECEIVED], 0, NULL, buffer);
+
+  g_string_free (buffer, TRUE);
+}
+
+static void
+transport_connected_cb (GibberTransport *transport,
+                        GibberBytestreamDirect *self)
+{
+  DEBUG ("transport connected. Bytestream is now open");
+  g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_OPEN,
+      NULL);
+}
+
+static void
+transport_disconnected_cb (GibberTransport *transport,
+                           GibberBytestreamDirect *self)
+{
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSED)
+    return;
+
+  DEBUG ("transport disconnected. close the bytestream");
+
+  if (priv->state == GIBBER_BYTESTREAM_STATE_ACCEPTED)
+    {
+      /* Connection to host failed */
+      GError e = { GIBBER_XMPP_ERROR, XMPP_ERROR_ITEM_NOT_FOUND,
+          "connection failed" };
+
+      gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), &e);
+    }
+  else
+    {
+      gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), NULL);
+    }
+}
+
+static void
+change_write_blocked_state (GibberBytestreamDirect *self,
+                            gboolean blocked)
+{
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->write_blocked == blocked)
+    return;
+
+  priv->write_blocked = blocked;
+  g_signal_emit (self, signals[WRITE_BLOCKED], 0, blocked);
+}
+
+static void
+bytestream_closed (GibberBytestreamDirect *self)
+{
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->transport != NULL)
+    {
+      g_signal_handlers_disconnect_matched (priv->transport,
+          G_SIGNAL_MATCH_DATA, 0, 0, NULL, NULL, self);
+      gibber_transport_disconnect (priv->transport);
+      g_object_unref (priv->transport);
+      priv->transport = NULL;
+    }
+
+  g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_CLOSED, NULL);
+}
+static void
+transport_buffer_empty_cb (GibberTransport *transport,
+                           GibberBytestreamDirect *self)
+{
+  GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSING)
+    {
+      DEBUG ("buffer is now empty. Bytestream can be closed");
+      bytestream_closed (self);
+    }
+
+  else if (priv->write_blocked)
+    {
+      DEBUG ("buffer is empty, unblock write to the bytestream");
+      change_write_blocked_state (self, FALSE);
+    }
+}
+
+static void
+set_transport (GibberBytestreamDirect *self,
+               GibberTransport *transport)
+{
+  GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  g_assert (priv->transport == NULL);
+
+  priv->transport = transport;
+  gibber_transport_set_handler (transport, transport_handler, self);
+
+  g_signal_connect (transport, "connected",
+      G_CALLBACK (transport_connected_cb), self);
+  g_signal_connect (transport, "disconnected",
+      G_CALLBACK (transport_disconnected_cb), self);
+  g_signal_connect (priv->transport, "buffer-empty",
+      G_CALLBACK (transport_buffer_empty_cb), self);
+}
+
+gboolean
+gibber_bytestream_direct_accept_socket (GibberBytestreamIface *bytestream,
+                                        int listen_fd)
+{
+  GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+  GibberBytestreamDirectPrivate *priv;
+  GibberLLTransport *ll_transport;
+  struct sockaddr_storage addr;
+  int fd, ret;
+  char host[NI_MAXHOST];
+  char port[NI_MAXSERV];
+  socklen_t addrlen = sizeof (struct sockaddr_storage);
+
+
+  priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->state != GIBBER_BYTESTREAM_STATE_INITIATING)
+    {
+      DEBUG ("bytestream is not is the initiating state (state %d)",
+          priv->state);
+      return FALSE;
+    }
+
+  fd = accept (listen_fd, (struct sockaddr *) &addr, &addrlen);
+  gibber_normalize_address (&addr);
+
+  ret = getnameinfo ((struct sockaddr *) &addr, addrlen,
+      host, NI_MAXHOST, port, NI_MAXSERV,
+      NI_NUMERICHOST | NI_NUMERICSERV);
+
+  if (priv->check_addr_func != NULL && !priv->check_addr_func (self, &addr,
+        addrlen, priv->check_addr_func_data))
+    {
+      DEBUG ("connection from %s refused by the bytestream user", host);
+      return FALSE;
+    }
+
+  if (ret == 0)
+    DEBUG("New connection from %s port %s", host, port);
+  else
+    DEBUG("New connection..");
+
+  ll_transport = gibber_ll_transport_new ();
+  set_transport (self, GIBBER_TRANSPORT (ll_transport));
+  gibber_ll_transport_open_fd (ll_transport, fd);
+
+  return TRUE;
 }
 
 /*
@@ -319,11 +525,12 @@ gibber_bytestream_direct_close (GibberBytestreamIface *bytestream,
 
 /*
  * gibber_bytestream_direct_initiate
+ * connect to the remote end
  *
  * Implements gibber_bytestream_iface_initiate on GibberBytestreamIface
  */
 static gboolean
-gibber_bytestream_direct_initiate (GibberBytestreamIface *bytestream)
+gibber_bytestream_direct_initiate (GibberBytestreamIface *self)
 {
   DEBUG ("not implemented");
   return FALSE;
diff --git a/lib/gibber/gibber-bytestream-direct.h b/lib/gibber/gibber-bytestream-direct.h
index aede82e..8295a53 100644
--- a/lib/gibber/gibber-bytestream-direct.h
+++ b/lib/gibber/gibber-bytestream-direct.h
@@ -21,6 +21,7 @@
 #define __GIBBER_BYTESTREAM_DIRECT_H__
 
 #include <glib-object.h>
+#include <netdb.h>
 #include "gibber-bytestream-iface.h"
 
 G_BEGIN_DECLS
@@ -28,6 +29,10 @@ G_BEGIN_DECLS
 typedef struct _GibberBytestreamDirect GibberBytestreamDirect;
 typedef struct _GibberBytestreamDirectClass GibberBytestreamDirectClass;
 
+typedef gboolean (* GibberBytestreamDirectCheckAddrFunc) (
+    GibberBytestreamDirect *bytestream, struct sockaddr_storage *addr,
+    socklen_t addrlen, gpointer user_data);
+
 struct _GibberBytestreamDirectClass {
   GObjectClass parent_class;
 };
@@ -57,6 +62,10 @@ GType gibber_bytestream_direct_get_type (void);
   (G_TYPE_INSTANCE_GET_CLASS ((obj), GIBBER_TYPE_BYTESTREAM_DIRECT,\
                               GibberBytestreamDirectClass))
 
+void gibber_bytestream_direct_set_check_addr_func (
+    GibberBytestreamDirect *bytestream,
+    GibberBytestreamDirectCheckAddrFunc func, gpointer user_data);
+
 G_END_DECLS
 
 #endif /* #ifndef __GIBBER_BYTESTREAM_DIRECT_H__ */
-- 
1.5.6.5




More information about the Telepathy-commits mailing list