[Telepathy-commits] [telepathy-gabble/master] IBB: implement flow control using a window of acked stanzas

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Mon Mar 9 10:51:37 PDT 2009


---
 src/bytestream-ibb.c |  147 ++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 124 insertions(+), 23 deletions(-)

diff --git a/src/bytestream-ibb.c b/src/bytestream-ibb.c
index bd51716..4f6898f 100644
--- a/src/bytestream-ibb.c
+++ b/src/bytestream-ibb.c
@@ -85,6 +85,9 @@ struct _GabbleBytestreamIBBPrivate
    * we buffer them until he unblocks it. */
   gboolean read_blocked;
   GString *read_buffer;
+  /* (LmMessage *) -> TRUE */
+  GHashTable *sent_stanzas_not_acked;
+  GString *write_buffer;
   gboolean dispose_has_run;
 };
 
@@ -99,6 +102,9 @@ gabble_bytestream_ibb_init (GabbleBytestreamIBB *self)
   self->priv = priv;
 
   priv->read_buffer = NULL;
+  priv->sent_stanzas_not_acked = g_hash_table_new (g_direct_hash,
+      g_direct_equal);
+  priv->write_buffer = NULL;
 }
 
 static void
@@ -138,6 +144,11 @@ gabble_bytestream_ibb_finalize (GObject *object)
   if (priv->read_buffer != NULL)
     g_string_free (priv->read_buffer, TRUE);
 
+  if (priv->write_buffer != NULL)
+    g_string_free (priv->write_buffer, TRUE);
+
+  g_hash_table_destroy (priv->sent_stanzas_not_acked);
+
   G_OBJECT_CLASS (gabble_bytestream_ibb_parent_class)->finalize (object);
 }
 
@@ -328,34 +339,78 @@ gabble_bytestream_ibb_class_init (
 }
 
 static gboolean
+send_data (GabbleBytestreamIBB *self, const gchar *str, guint len,
+    gboolean *result);
+
+static LmHandlerResult
+iq_acked_cb (GabbleConnection *conn,
+             LmMessage *sent_msg,
+             LmMessage *reply_msg,
+             GObject *obj,
+             gpointer user_data)
+{
+  GabbleBytestreamIBB *self = GABBLE_BYTESTREAM_IBB (obj);
+  GabbleBytestreamIBBPrivate *priv = GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
+
+  g_hash_table_remove (priv->sent_stanzas_not_acked, sent_msg);
+
+  if (priv->write_buffer != NULL)
+    {
+      guint sent;
+
+      DEBUG ("A stanza has been acked. Try to flush the buffer");
+
+      sent = send_data (self, priv->write_buffer->str, priv->write_buffer->len,
+          NULL);
+      if (sent == priv->write_buffer->len)
+        {
+          DEBUG ("buffer has been flushed");
+          g_string_free (priv->write_buffer, TRUE);
+          priv->write_buffer = NULL;
+        }
+      else
+        {
+          g_string_erase (priv->write_buffer, 0, sent);
+
+          DEBUG ("buffer has not been completely flushed; %d bytes left",
+              priv->write_buffer->len);
+        }
+    }
+
+  return LM_HANDLER_RESULT_REMOVE_MESSAGE;
+}
+
+static gboolean
 send_data (GabbleBytestreamIBB *self,
            const gchar *str,
-           guint len)
+           guint len,
+           gboolean *result)
 {
   GabbleBytestreamIBBPrivate *priv = GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
-  LmMessage *iq;
   guint sent, stanza_count;
-  LmMessageNode *data;
-
-  iq = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
-      '@', "type", "set",
-      '(', "data", "",
-        '*', &data,
-        '@', "xmlns", NS_IBB,
-        '@', "sid", priv->stream_id,
-      ')', NULL);
 
   sent = 0;
   stanza_count = 0;
   while (sent < len)
     {
+      LmMessage *iq;
       guint send_now, remaining;
       gchar *seq, *encoded;
       GError *error = NULL;
       gboolean ret;
+      guint nb_stanzas_waiting;
 
       remaining = (len - sent);
 
+      nb_stanzas_waiting = g_hash_table_size (priv->sent_stanzas_not_acked);
+      if (nb_stanzas_waiting >= WINDOW_SIZE)
+        {
+          DEBUG ("Window is full (%u). Stop sending stanzas",
+              nb_stanzas_waiting);
+          break;
+        }
+
+      /* We can send stanzas */
       if (remaining > priv->block_size)
         {
           /* We can't send all the remaining data in one stanza */
@@ -368,37 +423,52 @@ send_data (GabbleBytestreamIBB *self,
         }
 
       encoded = base64_encode (send_now, str + sent, FALSE);
-      lm_message_node_set_value (data, encoded);
-
       seq = g_strdup_printf ("%u", priv->seq++);
-      lm_message_node_set_attribute (data, "seq", seq);
 
-      DEBUG ("send %d bytes", send_now);
-      ret = _gabble_connection_send (priv->conn, iq, &error);
+      iq = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
+          '@', "type", "set",
+          '(', "data", encoded,
+            '@', "xmlns", NS_IBB,
+            '@', "sid", priv->stream_id,
+            '@', "seq", seq,
+          ')', NULL);
+
+      ret = _gabble_connection_send_with_reply (priv->conn, iq, iq_acked_cb,
+          G_OBJECT (self), NULL, &error);
 
       g_free (encoded);
       g_free (seq);
+      lm_message_unref (iq);
 
       if (!ret)
         {
           DEBUG ("error sending IBB stanza: %s. Close the bytestream",
               error->message);
           g_error_free (error);
-          lm_message_unref (iq);
 
           gabble_bytestream_iface_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
-          return FALSE;
+
+          if (result != NULL)
+            *result = FALSE;
+
+          return sent;
         }
 
+      g_hash_table_insert (priv->sent_stanzas_not_acked, iq,
+          GUINT_TO_POINTER (TRUE));
+
+      DEBUG ("send %d bytes (window size: %u)", send_now,
+          nb_stanzas_waiting + 1);
+
       sent += send_now;
       stanza_count++;
     }
 
-  DEBUG ("finished to send %d bytes (%d stanzas needed)", len, stanza_count);
+  DEBUG ("sent %d bytes (%d stanzas needed)", sent, stanza_count);
 
-  lm_message_unref (iq);
-
-  return TRUE;
+  if (result != NULL)
+    *result = TRUE;
+  return sent;
 }
 
 /*
@@ -413,6 +483,8 @@ gabble_bytestream_ibb_send (GabbleBytestreamIface *iface,
 {
   GabbleBytestreamIBB *self = GABBLE_BYTESTREAM_IBB (iface);
   GabbleBytestreamIBBPrivate *priv = GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
+  gboolean result;
+  guint sent;
 
   if (priv->state != GABBLE_BYTESTREAM_STATE_OPEN)
     {
@@ -421,7 +493,36 @@ gabble_bytestream_ibb_send (GabbleBytestreamIface *iface,
       return FALSE;
     }
 
-  return send_data (self, str, len);;
+  if (priv->write_buffer != NULL)
+    {
+      DEBUG ("Write buffer is not empty. Buffering data");
+
+      g_string_append_len (priv->write_buffer, str, len);
+      return TRUE;
+    }
+
+  sent = send_data (self, str, len, &result);
+  if (sent < len)
+    {
+      guint remaining;
+
+      DEBUG ("Some data have not been sent. Buffer them");
+
+      remaining = (len - sent);
+
+      if (priv->write_buffer == NULL)
+        {
+          priv->write_buffer = g_string_new_len (str + sent, remaining);
+        }
+      else
+        {
+          g_string_append_len (priv->write_buffer, str + sent, remaining);
+        }
+
+      DEBUG ("write buffer size: %u", priv->write_buffer->len);
+    }
+
+  return result;
 }
 
 void
-- 
1.5.6.5




More information about the telepathy-commits mailing list