[Telepathy-commits] [telepathy-salut/master] GibberBystreamDirect: implement _block_read, _initiate and _accept_socket

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:17 PST 2008


20080730163119-a41c0-772067c8e1391f208621de50222c1517b697db91.gz
---
 lib/gibber/gibber-bytestream-direct.c |   84 +++++++++++++++++++++++++++++++--
 lib/gibber/gibber-bytestream-direct.h |    7 +++
 2 files changed, 86 insertions(+), 5 deletions(-)

diff --git a/lib/gibber/gibber-bytestream-direct.c b/lib/gibber/gibber-bytestream-direct.c
index 595fd91..e36bd5d 100644
--- a/lib/gibber/gibber-bytestream-direct.c
+++ b/lib/gibber/gibber-bytestream-direct.c
@@ -22,6 +22,9 @@
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
 
 #include <glib.h>
 
@@ -210,9 +213,11 @@ gibber_bytestream_direct_set_property (GObject *object,
         priv->stream_init_id = g_value_dup_string (value);
         break;
       case PROP_STATE:
+        DEBUG ("Set PROP_STATE");
         if (priv->state != g_value_get_uint (value))
             {
               priv->state = g_value_get_uint (value);
+              DEBUG ("Emit STATE_CHANGED");
               g_signal_emit (object, signals[STATE_CHANGED], 0, priv->state);
             }
         break;
@@ -367,6 +372,8 @@ transport_handler (GibberTransport *transport,
   GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (user_data);
   GString *buffer;
 
+  DEBUG ("Called. GibberBytestreamDirect emit DATA_RECEIVED.");
+
   buffer = g_string_new_len ((const gchar *) data->data, data->length);
 
   g_signal_emit (G_OBJECT (self), signals[DATA_RECEIVED], 0, NULL, buffer);
@@ -494,7 +501,7 @@ gibber_bytestream_direct_accept_socket (GibberBytestreamIface *bytestream,
 
   priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
 
-  if (priv->state != GIBBER_BYTESTREAM_STATE_INITIATING)
+  if (priv->state != GIBBER_BYTESTREAM_STATE_LOCAL_PENDING)
     {
       DEBUG ("bytestream is not is the initiating state (state %d)",
           priv->state);
@@ -527,6 +534,22 @@ gibber_bytestream_direct_accept_socket (GibberBytestreamIface *bytestream,
   return TRUE;
 }
 
+void
+gibber_bytestream_direct_block_read (GibberBytestreamDirect *self,
+                                     gboolean block)
+{
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  if (priv->read_blocked == block)
+    return;
+
+  priv->read_blocked = block;
+
+  DEBUG ("%s the transport bytestream", block ? "block": "unblock");
+  gibber_transport_block_receiving (priv->transport, block);
+}
+
 /*
  * gibber_bytestream_direct_send
  *
@@ -537,7 +560,42 @@ gibber_bytestream_direct_send (GibberBytestreamIface *bytestream,
                             guint len,
                             const gchar *str)
 {
-  DEBUG ("not implemented");
+  GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+  GError *error = NULL;
+
+  if (priv->state != GIBBER_BYTESTREAM_STATE_OPEN)
+    {
+      DEBUG ("can't send data through a not open bytestream (state: %d)",
+          priv->state);
+      return FALSE;
+    }
+
+  if (priv->write_blocked)
+    {
+      DEBUG ("can't send data for now, bytestream is blocked");
+      return FALSE;
+    }
+
+  DEBUG ("send %u bytes through bytestream", len);
+  if (!gibber_transport_send (priv->transport, (const guint8 *) str, len,
+        &error))
+    {
+      DEBUG ("sending failed: %s", error->message);
+      g_error_free (error);
+
+      gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), NULL);
+      return FALSE;
+    }
+
+  if (!gibber_transport_buffer_is_empty (priv->transport))
+    {
+      /* We >don't want to send more data while the buffer isn't empty */
+      DEBUG ("buffer isn't empty. Block write to the bytestream");
+      change_write_blocked_state (self, TRUE);
+    }
+
   return TRUE;
 }
 
@@ -574,10 +632,26 @@ gibber_bytestream_direct_close (GibberBytestreamIface *bytestream,
  * Implements gibber_bytestream_iface_initiate on GibberBytestreamIface
  */
 static gboolean
-gibber_bytestream_direct_initiate (GibberBytestreamIface *self)
+gibber_bytestream_direct_initiate (GibberBytestreamIface *bytestream)
 {
-  DEBUG ("not implemented");
-  return FALSE;
+  GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+  GibberLLTransport *ll_transport;
+  struct sockaddr_in server;
+  GibberBytestreamDirectPrivate *priv =
+      GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+  DEBUG ("Called.");
+
+  server.sin_addr.s_addr = inet_addr ("127.0.0.1");
+  server.sin_family = AF_INET;
+  server.sin_port = g_htons ((guint16) priv->portnum);
+
+  ll_transport = gibber_ll_transport_new ();
+  set_transport (self, GIBBER_TRANSPORT (ll_transport));
+  gibber_ll_transport_open_sockaddr (ll_transport,
+      (struct sockaddr_storage *) &server, NULL);
+
+  return TRUE;
 }
 
 static void
diff --git a/lib/gibber/gibber-bytestream-direct.h b/lib/gibber/gibber-bytestream-direct.h
index 8295a53..0076536 100644
--- a/lib/gibber/gibber-bytestream-direct.h
+++ b/lib/gibber/gibber-bytestream-direct.h
@@ -66,6 +66,13 @@ void gibber_bytestream_direct_set_check_addr_func (
     GibberBytestreamDirect *bytestream,
     GibberBytestreamDirectCheckAddrFunc func, gpointer user_data);
 
+gboolean
+gibber_bytestream_direct_accept_socket (GibberBytestreamIface *bytestream,
+                                        int listen_fd);
+void
+gibber_bytestream_direct_block_read (GibberBytestreamDirect *self,
+                                     gboolean block);
+
 G_END_DECLS
 
 #endif /* #ifndef __GIBBER_BYTESTREAM_DIRECT_H__ */
-- 
1.5.6.5




More information about the Telepathy-commits mailing list