[Telepathy-commits] [telepathy-salut/master] tube-stream.c: Block reading on the application's socket when the tube's buffer is not empty

Alban Crequy alban.crequy at collabora.co.uk
Tue Nov 25 03:59:19 PST 2008


20080805104343-a41c0-e542e054a89212ec86e2eb37ae54765437b807ff.gz
---
 src/tube-stream.c |   77 ++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 74 insertions(+), 3 deletions(-)

diff --git a/src/tube-stream.c b/src/tube-stream.c
index 9f47bcb..365cdf7 100644
--- a/src/tube-stream.c
+++ b/src/tube-stream.c
@@ -260,8 +260,79 @@ static void
 transport_buffer_empty_cb (GibberTransport *transport,
                            SalutTubeStream *self)
 {
-  DEBUG ("buffer is now empty. Transport can be removed");
-  remove_transport (self, transport);
+  SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+  GibberBytestreamIface *bytestream;
+  GibberBytestreamState state;
+
+  bytestream = g_hash_table_lookup (priv->transport_to_bytestream, transport);
+  g_assert (bytestream != NULL);
+  g_object_get (bytestream, "state", &state, NULL);
+
+  if (state == GIBBER_BYTESTREAM_STATE_CLOSED)
+    {
+      DEBUG ("buffer is now empty. Transport can be removed");
+      remove_transport (self, transport);
+      return;
+    }
+
+  /* Buffer is empty so we can unblock the buffer if it was blocked */
+  /* FIXME: Should we move this as bytestream-iface method? */
+  if (GIBBER_IS_BYTESTREAM_OOB (bytestream))
+      {
+        DEBUG ("tube buffer isn't empty. Block the bytestream");
+        gibber_bytestream_oob_block_read (
+          GIBBER_BYTESTREAM_OOB (bytestream), FALSE);
+      }
+  else if (GIBBER_IS_BYTESTREAM_DIRECT (bytestream))
+      {
+        DEBUG ("tube buffer isn't empty. Block the bytestream");
+        gibber_bytestream_direct_block_read (
+          GIBBER_BYTESTREAM_DIRECT (bytestream), FALSE);
+      }
+}
+
+static void
+add_transport (SalutTubeStream *self,
+               GibberTransport *transport,
+               GibberBytestreamIface *bytestream)
+{
+  SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+
+  gibber_transport_set_handler (transport, transport_handler, self);
+
+  g_hash_table_insert (priv->transport_to_bytestream,
+      g_object_ref (transport), g_object_ref (bytestream));
+  g_hash_table_insert (priv->bytestream_to_transport,
+      g_object_ref (bytestream), g_object_ref (transport));
+
+  g_signal_connect (transport, "disconnected",
+      G_CALLBACK (transport_disconnected_cb), self);
+  g_signal_connect (transport, "buffer-empty",
+      G_CALLBACK (transport_buffer_empty_cb), self);
+}
+
+static void
+bytestream_write_blocked_cb (GibberBytestreamIface *bytestream,
+                             gboolean blocked,
+                             SalutTubeStream *self)
+{
+  SalutTubeStreamPrivate *priv = SALUT_TUBE_STREAM_GET_PRIVATE (self);
+  GibberTransport *transport;
+
+  transport = g_hash_table_lookup (priv->bytestream_to_transport,
+      bytestream);
+  g_assert (transport != NULL);
+
+  if (blocked)
+    {
+      DEBUG ("bytestream blocked, stop to read data from the tube socket");
+    }
+  else
+    {
+      DEBUG ("bytestream unblocked, restart to read data from the tube socket");
+    }
+
+  gibber_transport_block_receiving (transport, blocked);
 }
 
 static void
@@ -1452,7 +1523,7 @@ data_received_cb (GibberBytestreamIface *bytestream,
 
   if (!gibber_transport_buffer_is_empty (transport))
     {
-      /* We >don't want to send more data while the buffer isn't empty */
+      /* We don't want to send more data while the buffer isn't empty */
       /* FIXME: Should we move this as bytestream-iface method? */
       if (GIBBER_IS_BYTESTREAM_OOB (bytestream))
           {
-- 
1.5.6.5




More information about the Telepathy-commits mailing list