[Spice-commits] 6 commits - gtk/channel-webdav.c gtk/giopipe.c tests/pipe.c

Victor Toso de Carvalho victortoso at kemper.freedesktop.org
Fri Jun 5 00:29:08 PDT 2015


 gtk/channel-webdav.c |   25 ++---
 gtk/giopipe.c        |   63 ++++++-------
 tests/pipe.c         |  244 +++++++++++++++++++++++++++++++++++++++++++++++----
 3 files changed, 275 insertions(+), 57 deletions(-)

New commits:
commit 70cd7ee2107a47bcd44887501fd61bf45de03dfb
Author: Victor Toso <victortoso at redhat.com>
Date:   Tue Jun 2 12:33:11 2015 +0200

    tests: add test to check for zombie GSources
    
    Using g_pollable_input_stream_create_source to generage several dummy
    GSources in order to check if giopipe sets all of them to be dispatched.
    
    This test check for zombie GSources during a write_all/read_chunk
    operation.

diff --git a/tests/pipe.c b/tests/pipe.c
index a0ddde4..01629da 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -21,6 +21,8 @@ typedef struct _Fixture {
     guint16 read_size;
     guint16 total_read;
 
+    GList *sources;
+
     GMainLoop *loop;
     GCancellable *cancellable;
     guint timeout;
@@ -60,6 +62,7 @@ fixture_set_up(Fixture *fixture,
         fixture->buf[i] = 0x42 + i;
     }
 
+    fixture->sources = NULL;
     fixture->cancellable = g_cancellable_new();
     fixture->loop = g_main_loop_new (NULL, FALSE);
     fixture->timeout = g_timeout_add (1000, stop_loop, fixture->loop);
@@ -72,6 +75,9 @@ fixture_tear_down(Fixture *fixture,
     g_clear_object(&fixture->p1);
     g_clear_object(&fixture->p2);
 
+    if (fixture->sources)
+        g_list_free_full(fixture->sources, (GDestroyNotify) g_source_unref);
+
     g_clear_pointer(&fixture->data, g_free);
     g_clear_object(&fixture->cancellable);
     g_source_remove(fixture->timeout);
@@ -365,6 +371,102 @@ test_pipe_concurrent_write(Fixture *f, gconstpointer user_data)
     g_main_loop_run (f->loop);
 }
 
+static void
+write_all_cb_zombie_check(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    Fixture *f = user_data;
+    GError *error = NULL;
+    gsize nbytes;
+    GList *it;
+
+    g_output_stream_write_all_finish(G_OUTPUT_STREAM(source), result, &nbytes, &error);
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, ==, f->data_len);
+    g_clear_error(&error);
+
+    for (it = f->sources; it != NULL; it = it->next) {
+        GSource *s = it->data;
+        g_assert_true (g_source_is_destroyed (s));
+    }
+
+    g_main_loop_quit (f->loop);
+}
+
+static gboolean
+source_cb (gpointer user_data)
+{
+    return G_SOURCE_REMOVE;
+}
+
+#define NUM_OF_DUMMY_GSOURCE 1000
+
+static void
+read_chunk_cb_and_do_zombie(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    Fixture *f = user_data;
+    GError *error = NULL;
+    gssize nbytes;
+    gboolean data_match, try_zombie;
+    gint i;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, >, 0);
+    data_match = (g_ascii_strncasecmp(f->data + f->total_read, f->buf, nbytes) == 0);
+    g_assert_true(data_match);
+
+    /* Simulate more Pollable GSources created to read from Pipe; This should
+     * not fail but giopipe does not allow concurrent read/write which means
+     * that only the *last* GSource created will be the one that does the actual
+     * read; The other GSources that are still active should be dispatched.
+     * (In this test, only the real GSource created in g_input_stream_read_async
+     * will read the data) */
+
+    /* Create GSources in all iterations besides the last one, simply because
+     * it is convenient! The execution of the last interaction should give enough
+     * time for for all dummy GSources being detached. */
+    f->total_read += nbytes;
+    try_zombie = (f->total_read + f->read_size < f->data_len);
+
+    if (try_zombie) {
+        for (i = 0; i < NUM_OF_DUMMY_GSOURCE/2; i++) {
+            GSource *s = g_pollable_input_stream_create_source(G_POLLABLE_INPUT_STREAM(f->ip2), NULL);
+            g_source_set_callback(s, source_cb, NULL, NULL);
+            g_source_attach(s, NULL);
+            f->sources = g_list_prepend(f->sources, s);
+        }
+    }
+
+    if (f->total_read != f->data_len)
+        g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+                                  f->cancellable, read_chunk_cb_and_do_zombie, f);
+
+    if (try_zombie) {
+        for (i = 0; i < NUM_OF_DUMMY_GSOURCE/2; i++) {
+            GSource *s = g_pollable_input_stream_create_source(G_POLLABLE_INPUT_STREAM(f->ip2), NULL);
+            g_source_set_callback(s, source_cb, NULL, NULL);
+            g_source_attach(s, NULL);
+            f->sources = g_list_prepend(f->sources, s);
+        }
+    }
+}
+
+static void
+test_pipe_zombie_sources(Fixture *f, gconstpointer user_data)
+{
+    gint i;
+    f->data_len = 64;
+    f->data = get_test_data(f->data_len);
+    f->read_size = 16;
+    f->total_read = 0;
+
+    g_output_stream_write_all_async(f->op1, f->data, f->data_len, G_PRIORITY_DEFAULT,
+                                    f->cancellable, write_all_cb_zombie_check, f);
+    g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_chunk_cb_and_do_zombie, f);
+    g_main_loop_run (f->loop);
+}
+
 int main(int argc, char* argv[])
 {
     setlocale(LC_ALL, "");
@@ -403,6 +505,10 @@ int main(int argc, char* argv[])
                fixture_set_up, test_pipe_concurrent_write,
                fixture_tear_down);
 
+    g_test_add("/pipe/zombie-sources", Fixture, NULL,
+               fixture_set_up, test_pipe_zombie_sources,
+               fixture_tear_down);
+
     g_test_add("/pipe/readclosestream", Fixture, NULL,
                fixture_set_up, test_pipe_readclosestream,
                fixture_tear_down);
commit b3234ab353d5dfed6a0ba2e2ccf9733aff7e053b
Author: Victor Toso <victortoso at redhat.com>
Date:   Tue May 26 11:45:15 2015 +0200

    tests: add test to concurrent write to pipe
    
    Concurrent write is not supported and should fail. The GIO error is
    G_IO_ERROR_PENDING

diff --git a/tests/pipe.c b/tests/pipe.c
index e4757b6..a0ddde4 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -324,6 +324,47 @@ test_pipe_write_all_64_read_chunks_16(Fixture *f, gconstpointer user_data)
     g_main_loop_run (f->loop);
 }
 
+static void
+read_chunk_cb_and_try_write(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    Fixture *f = user_data;
+    GError *error = NULL;
+    gssize nbytes;
+    gboolean data_match;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, >, 0);
+    data_match = (g_ascii_strncasecmp(f->data + f->total_read, f->buf, nbytes) == 0);
+    g_assert_true(data_match);
+
+    f->total_read += nbytes;
+    if (f->total_read != f->data_len) {
+        /* try write before reading another chunk */
+        g_output_stream_write(f->op1, "", 1, f->cancellable, &error);
+        g_assert_error(error, G_IO_ERROR, G_IO_ERROR_PENDING);
+        g_clear_error(&error);
+
+        g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+                                  f->cancellable, read_chunk_cb_and_try_write, f);
+    }
+}
+
+static void
+test_pipe_concurrent_write(Fixture *f, gconstpointer user_data)
+{
+    f->data_len = 64;
+    f->data = get_test_data(f->data_len);
+    f->read_size = 16;
+    f->total_read = 0;
+
+    g_output_stream_write_all_async(f->op1, f->data, f->data_len, G_PRIORITY_DEFAULT,
+                                    f->cancellable, write_all_cb, f);
+    g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_chunk_cb_and_try_write, f);
+    g_main_loop_run (f->loop);
+}
+
 int main(int argc, char* argv[])
 {
     setlocale(LC_ALL, "");
@@ -358,6 +399,10 @@ int main(int argc, char* argv[])
                fixture_set_up, test_pipe_write_all_64_read_chunks_16,
                fixture_tear_down);
 
+    g_test_add("/pipe/concurrent-write", Fixture, NULL,
+               fixture_set_up, test_pipe_concurrent_write,
+               fixture_tear_down);
+
     g_test_add("/pipe/readclosestream", Fixture, NULL,
                fixture_set_up, test_pipe_readclosestream,
                fixture_tear_down);
commit da01ae66221e88be3f40055e90d6a7ce59c35fae
Author: Victor Toso <victortoso at redhat.com>
Date:   Tue May 19 15:12:59 2015 +0200

    tests: pipe using _write_all_async function

diff --git a/tests/pipe.c b/tests/pipe.c
index ec11b6f..e4757b6 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -16,6 +16,10 @@ typedef struct _Fixture {
     GOutputStream *op2;
 
     gchar buf[16];
+    gchar *data;
+    guint16 data_len;
+    guint16 read_size;
+    guint16 total_read;
 
     GMainLoop *loop;
     GCancellable *cancellable;
@@ -68,6 +72,7 @@ fixture_tear_down(Fixture *fixture,
     g_clear_object(&fixture->p1);
     g_clear_object(&fixture->p2);
 
+    g_clear_pointer(&fixture->data, g_free);
     g_clear_object(&fixture->cancellable);
     g_source_remove(fixture->timeout);
     g_main_loop_unref(fixture->loop);
@@ -253,6 +258,72 @@ test_pipe_readcancel(Fixture *f, gconstpointer user_data)
     g_main_loop_run (f->loop);
 }
 
+static gchar *
+get_test_data(gint n)
+{
+    GString *s = g_string_sized_new(n);
+    const gchar *data = "01234567abcdefgh";
+    gint i, q;
+
+    q = n / 16;
+    for (i = 0; i < q; i++)
+        s = g_string_append(s, data);
+
+    s = g_string_append_len(s, data, (n % 16));
+    return g_string_free(s, FALSE);
+}
+
+static void
+write_all_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    Fixture *f = user_data;
+    GError *error = NULL;
+    gsize nbytes;
+
+    g_output_stream_write_all_finish(G_OUTPUT_STREAM(source), result, &nbytes, &error);
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, ==, f->data_len);
+    g_clear_error(&error);
+
+    g_main_loop_quit (f->loop);
+}
+
+static void
+read_chunk_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    Fixture *f = user_data;
+    GError *error = NULL;
+    gssize nbytes;
+    gboolean data_match;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, >, 0);
+    data_match = (g_ascii_strncasecmp(f->data + f->total_read, f->buf, nbytes) == 0);
+    g_assert_true(data_match);
+
+    f->total_read += nbytes;
+    if (f->total_read != f->data_len) {
+        g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+                                  f->cancellable, read_chunk_cb, f);
+    }
+}
+
+static void
+test_pipe_write_all_64_read_chunks_16(Fixture *f, gconstpointer user_data)
+{
+    f->data_len = 64;
+    f->data = get_test_data(f->data_len);
+    f->read_size = 16;
+    f->total_read = 0;
+
+    g_output_stream_write_all_async(f->op1, f->data, f->data_len, G_PRIORITY_DEFAULT,
+                                    f->cancellable, write_all_cb, f);
+    g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_chunk_cb, f);
+    g_main_loop_run (f->loop);
+}
+
 int main(int argc, char* argv[])
 {
     setlocale(LC_ALL, "");
@@ -283,6 +354,10 @@ int main(int argc, char* argv[])
                fixture_set_up, test_pipe_write8read16,
                fixture_tear_down);
 
+    g_test_add("/pipe/write-all64-read-chunks16", Fixture, NULL,
+               fixture_set_up, test_pipe_write_all_64_read_chunks_16,
+               fixture_tear_down);
+
     g_test_add("/pipe/readclosestream", Fixture, NULL,
                fixture_set_up, test_pipe_readclosestream,
                fixture_tear_down);
commit e5d2047812412bc7d5e11cfb63a4d8b55551a408
Author: Victor Toso <victortoso at redhat.com>
Date:   Mon May 25 17:02:53 2015 +0200

    tests: remove read8_cb to use generic read_cb
    
    Both functions are basic the same so let's keep the generic one.

diff --git a/tests/pipe.c b/tests/pipe.c
index 841cb77..ec11b6f 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -160,26 +160,12 @@ test_pipe_readwrite(Fixture *f, gconstpointer user_data)
 }
 
 static void
-read8_cb(GObject *source, GAsyncResult *result, gpointer user_data)
-{
-    GError *error = NULL;
-    gssize nbytes;
-    GMainLoop *loop = user_data;
-
-    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
-
-    g_assert_cmpint(nbytes, ==, 8);
-    g_assert_no_error(error);
-    g_clear_error(&error);
-}
-
-static void
 test_pipe_write16read8(Fixture *f, gconstpointer user_data)
 {
     g_output_stream_write_async(f->op1, "0123456789abcdef", 16, G_PRIORITY_DEFAULT,
                                 f->cancellable, write_cb, f->loop);
     g_input_stream_read_async(f->ip2, f->buf, 8, G_PRIORITY_DEFAULT,
-                              f->cancellable, read8_cb, GINT_TO_POINTER(8));
+                              f->cancellable, read_cb, GINT_TO_POINTER(8));
 
     g_main_loop_run (f->loop);
 
@@ -193,7 +179,7 @@ test_pipe_write8read16(Fixture *f, gconstpointer user_data)
     g_output_stream_write_async(f->op1, "01234567", 8, G_PRIORITY_DEFAULT,
                                 f->cancellable, write_cb, f->loop);
     g_input_stream_read_async(f->ip2, f->buf, 16, G_PRIORITY_DEFAULT,
-                              f->cancellable, read8_cb, GINT_TO_POINTER(8));
+                              f->cancellable, read_cb, GINT_TO_POINTER(8));
 
     g_main_loop_run (f->loop);
 
commit 33d4016228798108250b0ef9173f2c4c719a5065
Author: Victor Toso <victortoso at redhat.com>
Date:   Tue May 19 15:59:00 2015 +0200

    webdav: write all buffer to client webdav
    
    Client's webdav can request less data (8192) then the amount sent by
    guest's webdav. Using g_output_stream_write_all_async in order to avoid
    losing data.

diff --git a/gtk/channel-webdav.c b/gtk/channel-webdav.c
index 1d3862e..bde728e 100644
--- a/gtk/channel-webdav.c
+++ b/gtk/channel-webdav.c
@@ -298,11 +298,11 @@ static void client_start_read(SpiceWebdavChannel *self, Client *client)
 static void start_demux(SpiceWebdavChannel *self);
 
 static void demux_to_client_finish(SpiceWebdavChannel *self,
-                                   Client *client, gssize size)
+                                   Client *client, gboolean fail)
 {
     SpiceWebdavChannelPrivate *c = self->priv;
 
-    if (size <= 0) {
+    if (fail) {
         remove_client(self, client);
     }
 
@@ -315,34 +315,37 @@ static void demux_to_client_cb(GObject *source, GAsyncResult *result, gpointer u
     Client *client = user_data;
     SpiceWebdavChannelPrivate *c = client->self->priv;
     GError *error = NULL;
-    gssize size;
+    gboolean fail;
+    gsize size;
 
-    size = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error);
+    g_output_stream_write_all_finish(G_OUTPUT_STREAM(source), result, &size, &error);
 
     if (error) {
         CHANNEL_DEBUG(client->self, "write failed: %s", error->message);
         g_clear_error(&error);
     }
 
+    fail = (size != c->demux.size);
     g_warn_if_fail(size == c->demux.size);
-    demux_to_client_finish(client->self, client, size);
+    demux_to_client_finish(client->self, client, fail);
 }
 
 static void demux_to_client(SpiceWebdavChannel *self,
                             Client *client)
 {
     SpiceWebdavChannelPrivate *c = self->priv;
-    gssize size = c->demux.size;
+    gsize size = c->demux.size;
 
-    CHANNEL_DEBUG(self, "pushing %"G_GSSIZE_FORMAT" to client %p", size, client);
+    CHANNEL_DEBUG(self, "pushing %"G_GSIZE_FORMAT" to client %p", size, client);
 
     if (size > 0) {
-        g_output_stream_write_async(g_io_stream_get_output_stream(client->pipe),
-                                    c->demux.buf, size, G_PRIORITY_DEFAULT,
-                                    c->cancellable, demux_to_client_cb, client);
+        g_output_stream_write_all_async(g_io_stream_get_output_stream(client->pipe),
+                                        c->demux.buf, size, G_PRIORITY_DEFAULT,
+                                        c->cancellable, demux_to_client_cb, client);
         return;
     } else {
-        demux_to_client_finish(self, client, size);
+        /* Nothing to write */
+        demux_to_client_finish(self, client, FALSE);
     }
 }
 
commit 5cf5f631cc5f74ce8d1d462d06a80cb71b3121ed
Author: Victor Toso <victortoso at redhat.com>
Date:   Fri May 15 17:46:27 2015 +0200

    giopipe: don't fail on create_source
    
    PipeInputStream and PipeOutputStream should not fail when creating
    GPollableStream source as this currently does not work with default
    write_all and read_all functions;
    
    In order to avoid creating zombie GSource in create_source of both
    PipeInputStream and PipeOutputStream, we track all created GSources and
    set them to be dispatched when data is available to read/write. It is
    worth to mention that concurrent write/read is not possible with current
    giopipe and only the last created GSource will read the data as it is
    dispatched first.

diff --git a/gtk/giopipe.c b/gtk/giopipe.c
index 50edb5b..d91c4d9 100644
--- a/gtk/giopipe.c
+++ b/gtk/giopipe.c
@@ -44,7 +44,7 @@ struct _PipeInputStream
      * closing.
      */
     gboolean peer_closed;
-    GSource *source;
+    GList *sources;
 };
 
 struct _PipeInputStreamClass
@@ -69,7 +69,7 @@ struct _PipeOutputStream
     const gchar *buffer;
     gsize count;
     gboolean peer_closed;
-    GSource *source;
+    GList *sources;
 };
 
 struct _PipeOutputStreamClass
@@ -120,12 +120,32 @@ pipe_input_stream_read (GInputStream  *stream,
     return count;
 }
 
+static GList *
+set_all_sources_ready (GList *sources)
+{
+    GList *it = sources;
+    while (it != NULL) {
+        GSource *s = it->data;
+        GList *next = it->next;
+
+        if (s == NULL || g_source_is_destroyed(s)) {
+            /* remove */
+            sources = g_list_delete_link(sources, it);
+            g_source_unref(s);
+        } else {
+            /* dispatch */
+            g_source_set_ready_time(s, 0);
+        }
+        it = next;
+    }
+    return sources;
+}
+
 static void
 pipe_input_stream_check_source (PipeInputStream *self)
 {
-    if (self->source && !g_source_is_destroyed(self->source) &&
-        g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
-        g_source_set_ready_time(self->source, 0);
+    if (g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
+        self->sources = set_all_sources_ready(self->sources);
 }
 
 static gboolean
@@ -193,10 +213,8 @@ pipe_input_stream_dispose(GObject *object)
         self->peer = NULL;
     }
 
-    if (self->source) {
-        g_source_unref(self->source);
-        self->source = NULL;
-    }
+    g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
+    self->sources = NULL;
 
     G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
 }
@@ -234,14 +252,8 @@ pipe_input_stream_create_source (GPollableInputStream *stream,
     PipeInputStream *self = PIPE_INPUT_STREAM(stream);
     GSource *pollable_source;
 
-    g_return_val_if_fail (self->source == NULL ||
-                          g_source_is_destroyed (self->source), NULL);
-
-    if (self->source && g_source_is_destroyed (self->source))
-        g_source_unref (self->source);
-
     pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
-    self->source = g_source_ref (pollable_source);
+    self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
 
     return pollable_source;
 }
@@ -319,10 +331,8 @@ pipe_output_stream_dispose(GObject *object)
         self->peer = NULL;
     }
 
-    if (self->source) {
-        g_source_unref(self->source);
-        self->source = NULL;
-    }
+    g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
+    self->sources = NULL;
 
     G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
 }
@@ -330,9 +340,8 @@ pipe_output_stream_dispose(GObject *object)
 static void
 pipe_output_stream_check_source (PipeOutputStream *self)
 {
-    if (self->source && !g_source_is_destroyed(self->source) &&
-        g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
-        g_source_set_ready_time(self->source, 0);
+    if (g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
+        self->sources = set_all_sources_ready(self->sources);
 }
 
 static gboolean
@@ -416,14 +425,8 @@ pipe_output_stream_create_source (GPollableOutputStream *stream,
     PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
     GSource *pollable_source;
 
-    g_return_val_if_fail (self->source == NULL ||
-                          g_source_is_destroyed (self->source), NULL);
-
-    if (self->source && g_source_is_destroyed (self->source))
-        g_source_unref (self->source);
-
     pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
-    self->source = g_source_ref (pollable_source);
+    self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
 
     return pollable_source;
 }


More information about the Spice-commits mailing list