[telepathy-gabble/master] implement gabble_bytestream_ibb_block_reading

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Apr 3 09:25:47 PDT 2009


---
 src/bytestream-ibb.c |   78 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 78 insertions(+), 0 deletions(-)

diff --git a/src/bytestream-ibb.c b/src/bytestream-ibb.c
index 860fa62..1d20faf 100644
--- a/src/bytestream-ibb.c
+++ b/src/bytestream-ibb.c
@@ -62,6 +62,8 @@ enum
   LAST_PROPERTY
 };
 
+#define BUFFER_MAX_SIZE (512 * 1024)
+
 struct _GabbleBytestreamIBBPrivate
 {
   GabbleConnection *conn;
@@ -75,6 +77,10 @@ struct _GabbleBytestreamIBBPrivate
 
   guint16 seq;
   guint16 last_seq_recv;
+  /* We can't stop receving IBB data so if user wants to block the bytestream
+   * we bufferize them until he unblocks it. */
+  gboolean read_blocked;
+  GString *buffer;
   gboolean dispose_has_run;
 };
 
@@ -87,6 +93,8 @@ gabble_bytestream_ibb_init (GabbleBytestreamIBB *self)
       GABBLE_TYPE_BYTESTREAM_IBB, GabbleBytestreamIBBPrivate);
 
   self->priv = priv;
+
+  priv->buffer = NULL;
 }
 
 static void
@@ -123,6 +131,9 @@ gabble_bytestream_ibb_finalize (GObject *object)
   g_free (priv->peer_resource);
   g_free (priv->peer_jid);
 
+  if (priv->buffer != NULL)
+    g_string_free (priv->buffer, TRUE);
+
   G_OBJECT_CLASS (gabble_bytestream_ibb_parent_class)->finalize (object);
 }
 
@@ -452,6 +463,44 @@ gabble_bytestream_ibb_receive (GabbleBytestreamIBB *self,
       return;
     }
 
+  if (priv->read_blocked)
+    {
+      gsize current_buffer_len = 0;
+
+      DEBUG ("Bytestream is blocked. Buffering data");
+      if (priv->buffer != NULL)
+        current_buffer_len = priv->buffer->len;
+
+      if (current_buffer_len + str->len > BUFFER_MAX_SIZE)
+        {
+          DEBUG ("Buffer is full. Closing the bytestream");
+
+          if (is_iq)
+            _gabble_connection_send_iq_error (priv->conn, msg,
+                XMPP_ERROR_NOT_ACCEPTABLE, "buffer is full");
+
+          gabble_bytestream_iface_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
+          g_string_free (str, TRUE);
+          return;
+        }
+
+      if (priv->buffer == NULL)
+        {
+          priv->buffer = str;
+        }
+      else
+        {
+          g_string_append_len (priv->buffer, str->str, str->len);
+          g_string_free (str, TRUE);
+        }
+
+      /* FIXME: we shouldn't ack until we consume data */
+      if (is_iq)
+        _gabble_connection_acknowledge_set_iq (priv->conn, msg);
+
+      return;
+    }
+
   g_signal_emit_by_name (G_OBJECT (self), "data-received", sender, str);
   g_string_free (str, TRUE);
 
@@ -648,6 +697,34 @@ gabble_bytestream_ibb_initiate (GabbleBytestreamIface *iface)
 }
 
 static void
+gabble_bytestream_ibb_block_reading (GabbleBytestreamIface *iface,
+                                     gboolean block)
+{
+  GabbleBytestreamIBB *self = GABBLE_BYTESTREAM_IBB (iface);
+  GabbleBytestreamIBBPrivate *priv =
+      GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
+
+  if (priv->read_blocked == block)
+    return;
+
+  priv->read_blocked = block;
+
+  DEBUG ("%s the transport bytestream", block ? "block": "unblock");
+
+  if (priv->buffer != NULL && !block)
+    {
+      DEBUG ("Bytestream unblocked, flushing the buffer");
+
+      g_signal_emit_by_name (G_OBJECT (self), "data-received",
+          priv->peer_handle, priv->buffer);
+
+      g_string_free (priv->buffer, TRUE);
+      priv->buffer = NULL;
+    }
+}
+
+
+static void
 bytestream_iface_init (gpointer g_iface,
                        gpointer iface_data)
 {
@@ -657,4 +734,5 @@ bytestream_iface_init (gpointer g_iface,
   klass->send = gabble_bytestream_ibb_send;
   klass->close = gabble_bytestream_ibb_close;
   klass->accept = gabble_bytestream_ibb_accept;
+  klass->block_reading = gabble_bytestream_ibb_block_reading;
 }
-- 
1.5.6.5




More information about the telepathy-commits mailing list