[Telepathy-commits] [telepathy-gabble/master] IBB: implement flow control using a window of acked stanzas
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Mon Mar 9 10:51:37 PDT 2009
---
src/bytestream-ibb.c | 147 ++++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 124 insertions(+), 23 deletions(-)
diff --git a/src/bytestream-ibb.c b/src/bytestream-ibb.c
index bd51716..4f6898f 100644
--- a/src/bytestream-ibb.c
+++ b/src/bytestream-ibb.c
@@ -85,6 +85,9 @@ struct _GabbleBytestreamIBBPrivate
* we buffer them until he unblocks it. */
gboolean read_blocked;
GString *read_buffer;
+ /* (LmMessage *) -> TRUE */
+ GHashTable *sent_stanzas_not_acked;
+ GString *write_buffer;
gboolean dispose_has_run;
};
@@ -99,6 +102,9 @@ gabble_bytestream_ibb_init (GabbleBytestreamIBB *self)
self->priv = priv;
priv->read_buffer = NULL;
+ priv->sent_stanzas_not_acked = g_hash_table_new (g_direct_hash,
+ g_direct_equal);
+ priv->write_buffer = NULL;
}
static void
@@ -138,6 +144,11 @@ gabble_bytestream_ibb_finalize (GObject *object)
if (priv->read_buffer != NULL)
g_string_free (priv->read_buffer, TRUE);
+ if (priv->write_buffer != NULL)
+ g_string_free (priv->write_buffer, TRUE);
+
+ g_hash_table_destroy (priv->sent_stanzas_not_acked);
+
G_OBJECT_CLASS (gabble_bytestream_ibb_parent_class)->finalize (object);
}
@@ -328,34 +339,78 @@ gabble_bytestream_ibb_class_init (
}
static gboolean
+send_data (GabbleBytestreamIBB *self, const gchar *str, guint len,
+ gboolean *result);
+
+static LmHandlerResult
+iq_acked_cb (GabbleConnection *conn,
+ LmMessage *sent_msg,
+ LmMessage *reply_msg,
+ GObject *obj,
+ gpointer user_data)
+{
+ GabbleBytestreamIBB *self = GABBLE_BYTESTREAM_IBB (obj);
+ GabbleBytestreamIBBPrivate *priv = GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
+
+ g_hash_table_remove (priv->sent_stanzas_not_acked, sent_msg);
+
+ if (priv->write_buffer != NULL)
+ {
+ guint sent;
+
+ DEBUG ("A stanza has been acked. Try to flush the buffer");
+
+ sent = send_data (self, priv->write_buffer->str, priv->write_buffer->len,
+ NULL);
+ if (sent == priv->write_buffer->len)
+ {
+ DEBUG ("buffer has been flushed");
+ g_string_free (priv->write_buffer, TRUE);
+ priv->write_buffer = NULL;
+ }
+ else
+ {
+ g_string_erase (priv->write_buffer, 0, sent);
+
+ DEBUG ("buffer has not been completely flushed; %d bytes left",
+ priv->write_buffer->len);
+ }
+ }
+
+ return LM_HANDLER_RESULT_REMOVE_MESSAGE;
+}
+
+static gboolean
send_data (GabbleBytestreamIBB *self,
const gchar *str,
- guint len)
+ guint len,
+ gboolean *result)
{
GabbleBytestreamIBBPrivate *priv = GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
- LmMessage *iq;
guint sent, stanza_count;
- LmMessageNode *data;
-
- iq = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
- '@', "type", "set",
- '(', "data", "",
- '*', &data,
- '@', "xmlns", NS_IBB,
- '@', "sid", priv->stream_id,
- ')', NULL);
sent = 0;
stanza_count = 0;
while (sent < len)
{
+ LmMessage *iq;
guint send_now, remaining;
gchar *seq, *encoded;
GError *error = NULL;
gboolean ret;
+ guint nb_stanzas_waiting;
remaining = (len - sent);
+ nb_stanzas_waiting = g_hash_table_size (priv->sent_stanzas_not_acked);
+ if (nb_stanzas_waiting >= WINDOW_SIZE)
+ {
+ DEBUG ("Window is full (%u). Stop sending stanzas",
+ nb_stanzas_waiting);
+ break;
+ }
+
+ /* We can send stanzas */
if (remaining > priv->block_size)
{
/* We can't send all the remaining data in one stanza */
@@ -368,37 +423,52 @@ send_data (GabbleBytestreamIBB *self,
}
encoded = base64_encode (send_now, str + sent, FALSE);
- lm_message_node_set_value (data, encoded);
-
seq = g_strdup_printf ("%u", priv->seq++);
- lm_message_node_set_attribute (data, "seq", seq);
- DEBUG ("send %d bytes", send_now);
- ret = _gabble_connection_send (priv->conn, iq, &error);
+ iq = lm_message_build (priv->peer_jid, LM_MESSAGE_TYPE_IQ,
+ '@', "type", "set",
+ '(', "data", encoded,
+ '@', "xmlns", NS_IBB,
+ '@', "sid", priv->stream_id,
+ '@', "seq", seq,
+ ')', NULL);
+
+ ret = _gabble_connection_send_with_reply (priv->conn, iq, iq_acked_cb,
+ G_OBJECT (self), NULL, &error);
g_free (encoded);
g_free (seq);
+ lm_message_unref (iq);
if (!ret)
{
DEBUG ("error sending IBB stanza: %s. Close the bytestream",
error->message);
g_error_free (error);
- lm_message_unref (iq);
gabble_bytestream_iface_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
- return FALSE;
+
+ if (result != NULL)
+ *result = FALSE;
+
+ return sent;
}
+ g_hash_table_insert (priv->sent_stanzas_not_acked, iq,
+ GUINT_TO_POINTER (TRUE));
+
+ DEBUG ("send %d bytes (window size: %u)", send_now,
+ nb_stanzas_waiting + 1);
+
sent += send_now;
stanza_count++;
}
- DEBUG ("finished to send %d bytes (%d stanzas needed)", len, stanza_count);
+ DEBUG ("sent %d bytes (%d stanzas needed)", sent, stanza_count);
- lm_message_unref (iq);
-
- return TRUE;
+ if (result != NULL)
+ *result = TRUE;
+ return sent;
}
/*
@@ -413,6 +483,8 @@ gabble_bytestream_ibb_send (GabbleBytestreamIface *iface,
{
GabbleBytestreamIBB *self = GABBLE_BYTESTREAM_IBB (iface);
GabbleBytestreamIBBPrivate *priv = GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
+ gboolean result;
+ guint sent;
if (priv->state != GABBLE_BYTESTREAM_STATE_OPEN)
{
@@ -421,7 +493,36 @@ gabble_bytestream_ibb_send (GabbleBytestreamIface *iface,
return FALSE;
}
- return send_data (self, str, len);;
+ if (priv->write_buffer != NULL)
+ {
+ DEBUG ("Write buffer is not empty. Buffering data");
+
+ g_string_append_len (priv->write_buffer, str, len);
+ return TRUE;
+ }
+
+ sent = send_data (self, str, len, &result);
+ if (sent < len)
+ {
+ guint remaining;
+
+ DEBUG ("Some data have not been sent. Buffer them");
+
+ remaining = (len - sent);
+
+ if (priv->write_buffer == NULL)
+ {
+ priv->write_buffer = g_string_new_len (str + sent, remaining);
+ }
+ else
+ {
+ g_string_append_len (priv->write_buffer, str + sent, remaining);
+ }
+
+ DEBUG ("write buffer size: %u", priv->write_buffer->len);
+ }
+
+ return result;
}
void
--
1.5.6.5
More information about the telepathy-commits
mailing list