[telepathy-gabble/master] implement gabble_bytestream_ibb_block_reading
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Apr 3 09:25:47 PDT 2009
---
src/bytestream-ibb.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 78 insertions(+), 0 deletions(-)
diff --git a/src/bytestream-ibb.c b/src/bytestream-ibb.c
index 860fa62..1d20faf 100644
--- a/src/bytestream-ibb.c
+++ b/src/bytestream-ibb.c
@@ -62,6 +62,8 @@ enum
LAST_PROPERTY
};
+#define BUFFER_MAX_SIZE (512 * 1024)
+
struct _GabbleBytestreamIBBPrivate
{
GabbleConnection *conn;
@@ -75,6 +77,10 @@ struct _GabbleBytestreamIBBPrivate
guint16 seq;
guint16 last_seq_recv;
+ /* We can't stop receving IBB data so if user wants to block the bytestream
+ * we bufferize them until he unblocks it. */
+ gboolean read_blocked;
+ GString *buffer;
gboolean dispose_has_run;
};
@@ -87,6 +93,8 @@ gabble_bytestream_ibb_init (GabbleBytestreamIBB *self)
GABBLE_TYPE_BYTESTREAM_IBB, GabbleBytestreamIBBPrivate);
self->priv = priv;
+
+ priv->buffer = NULL;
}
static void
@@ -123,6 +131,9 @@ gabble_bytestream_ibb_finalize (GObject *object)
g_free (priv->peer_resource);
g_free (priv->peer_jid);
+ if (priv->buffer != NULL)
+ g_string_free (priv->buffer, TRUE);
+
G_OBJECT_CLASS (gabble_bytestream_ibb_parent_class)->finalize (object);
}
@@ -452,6 +463,44 @@ gabble_bytestream_ibb_receive (GabbleBytestreamIBB *self,
return;
}
+ if (priv->read_blocked)
+ {
+ gsize current_buffer_len = 0;
+
+ DEBUG ("Bytestream is blocked. Buffering data");
+ if (priv->buffer != NULL)
+ current_buffer_len = priv->buffer->len;
+
+ if (current_buffer_len + str->len > BUFFER_MAX_SIZE)
+ {
+ DEBUG ("Buffer is full. Closing the bytestream");
+
+ if (is_iq)
+ _gabble_connection_send_iq_error (priv->conn, msg,
+ XMPP_ERROR_NOT_ACCEPTABLE, "buffer is full");
+
+ gabble_bytestream_iface_close (GABBLE_BYTESTREAM_IFACE (self), NULL);
+ g_string_free (str, TRUE);
+ return;
+ }
+
+ if (priv->buffer == NULL)
+ {
+ priv->buffer = str;
+ }
+ else
+ {
+ g_string_append_len (priv->buffer, str->str, str->len);
+ g_string_free (str, TRUE);
+ }
+
+ /* FIXME: we shouldn't ack until we consume data */
+ if (is_iq)
+ _gabble_connection_acknowledge_set_iq (priv->conn, msg);
+
+ return;
+ }
+
g_signal_emit_by_name (G_OBJECT (self), "data-received", sender, str);
g_string_free (str, TRUE);
@@ -648,6 +697,34 @@ gabble_bytestream_ibb_initiate (GabbleBytestreamIface *iface)
}
static void
+gabble_bytestream_ibb_block_reading (GabbleBytestreamIface *iface,
+ gboolean block)
+{
+ GabbleBytestreamIBB *self = GABBLE_BYTESTREAM_IBB (iface);
+ GabbleBytestreamIBBPrivate *priv =
+ GABBLE_BYTESTREAM_IBB_GET_PRIVATE (self);
+
+ if (priv->read_blocked == block)
+ return;
+
+ priv->read_blocked = block;
+
+ DEBUG ("%s the transport bytestream", block ? "block": "unblock");
+
+ if (priv->buffer != NULL && !block)
+ {
+ DEBUG ("Bytestream unblocked, flushing the buffer");
+
+ g_signal_emit_by_name (G_OBJECT (self), "data-received",
+ priv->peer_handle, priv->buffer);
+
+ g_string_free (priv->buffer, TRUE);
+ priv->buffer = NULL;
+ }
+}
+
+
+static void
bytestream_iface_init (gpointer g_iface,
gpointer iface_data)
{
@@ -657,4 +734,5 @@ bytestream_iface_init (gpointer g_iface,
klass->send = gabble_bytestream_ibb_send;
klass->close = gabble_bytestream_ibb_close;
klass->accept = gabble_bytestream_ibb_accept;
+ klass->block_reading = gabble_bytestream_ibb_block_reading;
}
--
1.5.6.5
More information about the telepathy-commits
mailing list