[Spice-commits] 6 commits - gtk/channel-webdav.c gtk/giopipe.c tests/pipe.c
Victor Toso de Carvalho
victortoso at kemper.freedesktop.org
Fri Jun 5 00:29:08 PDT 2015
gtk/channel-webdav.c | 25 ++---
gtk/giopipe.c | 63 ++++++-------
tests/pipe.c | 244 +++++++++++++++++++++++++++++++++++++++++++++++----
3 files changed, 275 insertions(+), 57 deletions(-)
New commits:
commit 70cd7ee2107a47bcd44887501fd61bf45de03dfb
Author: Victor Toso <victortoso at redhat.com>
Date: Tue Jun 2 12:33:11 2015 +0200
tests: add test to check for zombie GSources
Using g_pollable_input_stream_create_source to generage several dummy
GSources in order to check if giopipe sets all of them to be dispatched.
This test check for zombie GSources during a write_all/read_chunk
operation.
diff --git a/tests/pipe.c b/tests/pipe.c
index a0ddde4..01629da 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -21,6 +21,8 @@ typedef struct _Fixture {
guint16 read_size;
guint16 total_read;
+ GList *sources;
+
GMainLoop *loop;
GCancellable *cancellable;
guint timeout;
@@ -60,6 +62,7 @@ fixture_set_up(Fixture *fixture,
fixture->buf[i] = 0x42 + i;
}
+ fixture->sources = NULL;
fixture->cancellable = g_cancellable_new();
fixture->loop = g_main_loop_new (NULL, FALSE);
fixture->timeout = g_timeout_add (1000, stop_loop, fixture->loop);
@@ -72,6 +75,9 @@ fixture_tear_down(Fixture *fixture,
g_clear_object(&fixture->p1);
g_clear_object(&fixture->p2);
+ if (fixture->sources)
+ g_list_free_full(fixture->sources, (GDestroyNotify) g_source_unref);
+
g_clear_pointer(&fixture->data, g_free);
g_clear_object(&fixture->cancellable);
g_source_remove(fixture->timeout);
@@ -365,6 +371,102 @@ test_pipe_concurrent_write(Fixture *f, gconstpointer user_data)
g_main_loop_run (f->loop);
}
+static void
+write_all_cb_zombie_check(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ Fixture *f = user_data;
+ GError *error = NULL;
+ gsize nbytes;
+ GList *it;
+
+ g_output_stream_write_all_finish(G_OUTPUT_STREAM(source), result, &nbytes, &error);
+ g_assert_no_error(error);
+ g_assert_cmpint(nbytes, ==, f->data_len);
+ g_clear_error(&error);
+
+ for (it = f->sources; it != NULL; it = it->next) {
+ GSource *s = it->data;
+ g_assert_true (g_source_is_destroyed (s));
+ }
+
+ g_main_loop_quit (f->loop);
+}
+
+static gboolean
+source_cb (gpointer user_data)
+{
+ return G_SOURCE_REMOVE;
+}
+
+#define NUM_OF_DUMMY_GSOURCE 1000
+
+static void
+read_chunk_cb_and_do_zombie(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ Fixture *f = user_data;
+ GError *error = NULL;
+ gssize nbytes;
+ gboolean data_match, try_zombie;
+ gint i;
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+ g_assert_no_error(error);
+ g_assert_cmpint(nbytes, >, 0);
+ data_match = (g_ascii_strncasecmp(f->data + f->total_read, f->buf, nbytes) == 0);
+ g_assert_true(data_match);
+
+ /* Simulate more Pollable GSources created to read from Pipe; This should
+ * not fail but giopipe does not allow concurrent read/write which means
+ * that only the *last* GSource created will be the one that does the actual
+ * read; The other GSources that are still active should be dispatched.
+ * (In this test, only the real GSource created in g_input_stream_read_async
+ * will read the data) */
+
+ /* Create GSources in all iterations besides the last one, simply because
+ * it is convenient! The execution of the last interaction should give enough
+ * time for for all dummy GSources being detached. */
+ f->total_read += nbytes;
+ try_zombie = (f->total_read + f->read_size < f->data_len);
+
+ if (try_zombie) {
+ for (i = 0; i < NUM_OF_DUMMY_GSOURCE/2; i++) {
+ GSource *s = g_pollable_input_stream_create_source(G_POLLABLE_INPUT_STREAM(f->ip2), NULL);
+ g_source_set_callback(s, source_cb, NULL, NULL);
+ g_source_attach(s, NULL);
+ f->sources = g_list_prepend(f->sources, s);
+ }
+ }
+
+ if (f->total_read != f->data_len)
+ g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+ f->cancellable, read_chunk_cb_and_do_zombie, f);
+
+ if (try_zombie) {
+ for (i = 0; i < NUM_OF_DUMMY_GSOURCE/2; i++) {
+ GSource *s = g_pollable_input_stream_create_source(G_POLLABLE_INPUT_STREAM(f->ip2), NULL);
+ g_source_set_callback(s, source_cb, NULL, NULL);
+ g_source_attach(s, NULL);
+ f->sources = g_list_prepend(f->sources, s);
+ }
+ }
+}
+
+static void
+test_pipe_zombie_sources(Fixture *f, gconstpointer user_data)
+{
+ gint i;
+ f->data_len = 64;
+ f->data = get_test_data(f->data_len);
+ f->read_size = 16;
+ f->total_read = 0;
+
+ g_output_stream_write_all_async(f->op1, f->data, f->data_len, G_PRIORITY_DEFAULT,
+ f->cancellable, write_all_cb_zombie_check, f);
+ g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+ f->cancellable, read_chunk_cb_and_do_zombie, f);
+ g_main_loop_run (f->loop);
+}
+
int main(int argc, char* argv[])
{
setlocale(LC_ALL, "");
@@ -403,6 +505,10 @@ int main(int argc, char* argv[])
fixture_set_up, test_pipe_concurrent_write,
fixture_tear_down);
+ g_test_add("/pipe/zombie-sources", Fixture, NULL,
+ fixture_set_up, test_pipe_zombie_sources,
+ fixture_tear_down);
+
g_test_add("/pipe/readclosestream", Fixture, NULL,
fixture_set_up, test_pipe_readclosestream,
fixture_tear_down);
commit b3234ab353d5dfed6a0ba2e2ccf9733aff7e053b
Author: Victor Toso <victortoso at redhat.com>
Date: Tue May 26 11:45:15 2015 +0200
tests: add test to concurrent write to pipe
Concurrent write is not supported and should fail. The GIO error is
G_IO_ERROR_PENDING
diff --git a/tests/pipe.c b/tests/pipe.c
index e4757b6..a0ddde4 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -324,6 +324,47 @@ test_pipe_write_all_64_read_chunks_16(Fixture *f, gconstpointer user_data)
g_main_loop_run (f->loop);
}
+static void
+read_chunk_cb_and_try_write(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ Fixture *f = user_data;
+ GError *error = NULL;
+ gssize nbytes;
+ gboolean data_match;
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+ g_assert_no_error(error);
+ g_assert_cmpint(nbytes, >, 0);
+ data_match = (g_ascii_strncasecmp(f->data + f->total_read, f->buf, nbytes) == 0);
+ g_assert_true(data_match);
+
+ f->total_read += nbytes;
+ if (f->total_read != f->data_len) {
+ /* try write before reading another chunk */
+ g_output_stream_write(f->op1, "", 1, f->cancellable, &error);
+ g_assert_error(error, G_IO_ERROR, G_IO_ERROR_PENDING);
+ g_clear_error(&error);
+
+ g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+ f->cancellable, read_chunk_cb_and_try_write, f);
+ }
+}
+
+static void
+test_pipe_concurrent_write(Fixture *f, gconstpointer user_data)
+{
+ f->data_len = 64;
+ f->data = get_test_data(f->data_len);
+ f->read_size = 16;
+ f->total_read = 0;
+
+ g_output_stream_write_all_async(f->op1, f->data, f->data_len, G_PRIORITY_DEFAULT,
+ f->cancellable, write_all_cb, f);
+ g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+ f->cancellable, read_chunk_cb_and_try_write, f);
+ g_main_loop_run (f->loop);
+}
+
int main(int argc, char* argv[])
{
setlocale(LC_ALL, "");
@@ -358,6 +399,10 @@ int main(int argc, char* argv[])
fixture_set_up, test_pipe_write_all_64_read_chunks_16,
fixture_tear_down);
+ g_test_add("/pipe/concurrent-write", Fixture, NULL,
+ fixture_set_up, test_pipe_concurrent_write,
+ fixture_tear_down);
+
g_test_add("/pipe/readclosestream", Fixture, NULL,
fixture_set_up, test_pipe_readclosestream,
fixture_tear_down);
commit da01ae66221e88be3f40055e90d6a7ce59c35fae
Author: Victor Toso <victortoso at redhat.com>
Date: Tue May 19 15:12:59 2015 +0200
tests: pipe using _write_all_async function
diff --git a/tests/pipe.c b/tests/pipe.c
index ec11b6f..e4757b6 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -16,6 +16,10 @@ typedef struct _Fixture {
GOutputStream *op2;
gchar buf[16];
+ gchar *data;
+ guint16 data_len;
+ guint16 read_size;
+ guint16 total_read;
GMainLoop *loop;
GCancellable *cancellable;
@@ -68,6 +72,7 @@ fixture_tear_down(Fixture *fixture,
g_clear_object(&fixture->p1);
g_clear_object(&fixture->p2);
+ g_clear_pointer(&fixture->data, g_free);
g_clear_object(&fixture->cancellable);
g_source_remove(fixture->timeout);
g_main_loop_unref(fixture->loop);
@@ -253,6 +258,72 @@ test_pipe_readcancel(Fixture *f, gconstpointer user_data)
g_main_loop_run (f->loop);
}
+static gchar *
+get_test_data(gint n)
+{
+ GString *s = g_string_sized_new(n);
+ const gchar *data = "01234567abcdefgh";
+ gint i, q;
+
+ q = n / 16;
+ for (i = 0; i < q; i++)
+ s = g_string_append(s, data);
+
+ s = g_string_append_len(s, data, (n % 16));
+ return g_string_free(s, FALSE);
+}
+
+static void
+write_all_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ Fixture *f = user_data;
+ GError *error = NULL;
+ gsize nbytes;
+
+ g_output_stream_write_all_finish(G_OUTPUT_STREAM(source), result, &nbytes, &error);
+ g_assert_no_error(error);
+ g_assert_cmpint(nbytes, ==, f->data_len);
+ g_clear_error(&error);
+
+ g_main_loop_quit (f->loop);
+}
+
+static void
+read_chunk_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ Fixture *f = user_data;
+ GError *error = NULL;
+ gssize nbytes;
+ gboolean data_match;
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+ g_assert_no_error(error);
+ g_assert_cmpint(nbytes, >, 0);
+ data_match = (g_ascii_strncasecmp(f->data + f->total_read, f->buf, nbytes) == 0);
+ g_assert_true(data_match);
+
+ f->total_read += nbytes;
+ if (f->total_read != f->data_len) {
+ g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+ f->cancellable, read_chunk_cb, f);
+ }
+}
+
+static void
+test_pipe_write_all_64_read_chunks_16(Fixture *f, gconstpointer user_data)
+{
+ f->data_len = 64;
+ f->data = get_test_data(f->data_len);
+ f->read_size = 16;
+ f->total_read = 0;
+
+ g_output_stream_write_all_async(f->op1, f->data, f->data_len, G_PRIORITY_DEFAULT,
+ f->cancellable, write_all_cb, f);
+ g_input_stream_read_async(f->ip2, f->buf, f->read_size, G_PRIORITY_DEFAULT,
+ f->cancellable, read_chunk_cb, f);
+ g_main_loop_run (f->loop);
+}
+
int main(int argc, char* argv[])
{
setlocale(LC_ALL, "");
@@ -283,6 +354,10 @@ int main(int argc, char* argv[])
fixture_set_up, test_pipe_write8read16,
fixture_tear_down);
+ g_test_add("/pipe/write-all64-read-chunks16", Fixture, NULL,
+ fixture_set_up, test_pipe_write_all_64_read_chunks_16,
+ fixture_tear_down);
+
g_test_add("/pipe/readclosestream", Fixture, NULL,
fixture_set_up, test_pipe_readclosestream,
fixture_tear_down);
commit e5d2047812412bc7d5e11cfb63a4d8b55551a408
Author: Victor Toso <victortoso at redhat.com>
Date: Mon May 25 17:02:53 2015 +0200
tests: remove read8_cb to use generic read_cb
Both functions are basic the same so let's keep the generic one.
diff --git a/tests/pipe.c b/tests/pipe.c
index 841cb77..ec11b6f 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -160,26 +160,12 @@ test_pipe_readwrite(Fixture *f, gconstpointer user_data)
}
static void
-read8_cb(GObject *source, GAsyncResult *result, gpointer user_data)
-{
- GError *error = NULL;
- gssize nbytes;
- GMainLoop *loop = user_data;
-
- nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
-
- g_assert_cmpint(nbytes, ==, 8);
- g_assert_no_error(error);
- g_clear_error(&error);
-}
-
-static void
test_pipe_write16read8(Fixture *f, gconstpointer user_data)
{
g_output_stream_write_async(f->op1, "0123456789abcdef", 16, G_PRIORITY_DEFAULT,
f->cancellable, write_cb, f->loop);
g_input_stream_read_async(f->ip2, f->buf, 8, G_PRIORITY_DEFAULT,
- f->cancellable, read8_cb, GINT_TO_POINTER(8));
+ f->cancellable, read_cb, GINT_TO_POINTER(8));
g_main_loop_run (f->loop);
@@ -193,7 +179,7 @@ test_pipe_write8read16(Fixture *f, gconstpointer user_data)
g_output_stream_write_async(f->op1, "01234567", 8, G_PRIORITY_DEFAULT,
f->cancellable, write_cb, f->loop);
g_input_stream_read_async(f->ip2, f->buf, 16, G_PRIORITY_DEFAULT,
- f->cancellable, read8_cb, GINT_TO_POINTER(8));
+ f->cancellable, read_cb, GINT_TO_POINTER(8));
g_main_loop_run (f->loop);
commit 33d4016228798108250b0ef9173f2c4c719a5065
Author: Victor Toso <victortoso at redhat.com>
Date: Tue May 19 15:59:00 2015 +0200
webdav: write all buffer to client webdav
Client's webdav can request less data (8192) then the amount sent by
guest's webdav. Using g_output_stream_write_all_async in order to avoid
losing data.
diff --git a/gtk/channel-webdav.c b/gtk/channel-webdav.c
index 1d3862e..bde728e 100644
--- a/gtk/channel-webdav.c
+++ b/gtk/channel-webdav.c
@@ -298,11 +298,11 @@ static void client_start_read(SpiceWebdavChannel *self, Client *client)
static void start_demux(SpiceWebdavChannel *self);
static void demux_to_client_finish(SpiceWebdavChannel *self,
- Client *client, gssize size)
+ Client *client, gboolean fail)
{
SpiceWebdavChannelPrivate *c = self->priv;
- if (size <= 0) {
+ if (fail) {
remove_client(self, client);
}
@@ -315,34 +315,37 @@ static void demux_to_client_cb(GObject *source, GAsyncResult *result, gpointer u
Client *client = user_data;
SpiceWebdavChannelPrivate *c = client->self->priv;
GError *error = NULL;
- gssize size;
+ gboolean fail;
+ gsize size;
- size = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error);
+ g_output_stream_write_all_finish(G_OUTPUT_STREAM(source), result, &size, &error);
if (error) {
CHANNEL_DEBUG(client->self, "write failed: %s", error->message);
g_clear_error(&error);
}
+ fail = (size != c->demux.size);
g_warn_if_fail(size == c->demux.size);
- demux_to_client_finish(client->self, client, size);
+ demux_to_client_finish(client->self, client, fail);
}
static void demux_to_client(SpiceWebdavChannel *self,
Client *client)
{
SpiceWebdavChannelPrivate *c = self->priv;
- gssize size = c->demux.size;
+ gsize size = c->demux.size;
- CHANNEL_DEBUG(self, "pushing %"G_GSSIZE_FORMAT" to client %p", size, client);
+ CHANNEL_DEBUG(self, "pushing %"G_GSIZE_FORMAT" to client %p", size, client);
if (size > 0) {
- g_output_stream_write_async(g_io_stream_get_output_stream(client->pipe),
- c->demux.buf, size, G_PRIORITY_DEFAULT,
- c->cancellable, demux_to_client_cb, client);
+ g_output_stream_write_all_async(g_io_stream_get_output_stream(client->pipe),
+ c->demux.buf, size, G_PRIORITY_DEFAULT,
+ c->cancellable, demux_to_client_cb, client);
return;
} else {
- demux_to_client_finish(self, client, size);
+ /* Nothing to write */
+ demux_to_client_finish(self, client, FALSE);
}
}
commit 5cf5f631cc5f74ce8d1d462d06a80cb71b3121ed
Author: Victor Toso <victortoso at redhat.com>
Date: Fri May 15 17:46:27 2015 +0200
giopipe: don't fail on create_source
PipeInputStream and PipeOutputStream should not fail when creating
GPollableStream source as this currently does not work with default
write_all and read_all functions;
In order to avoid creating zombie GSource in create_source of both
PipeInputStream and PipeOutputStream, we track all created GSources and
set them to be dispatched when data is available to read/write. It is
worth to mention that concurrent write/read is not possible with current
giopipe and only the last created GSource will read the data as it is
dispatched first.
diff --git a/gtk/giopipe.c b/gtk/giopipe.c
index 50edb5b..d91c4d9 100644
--- a/gtk/giopipe.c
+++ b/gtk/giopipe.c
@@ -44,7 +44,7 @@ struct _PipeInputStream
* closing.
*/
gboolean peer_closed;
- GSource *source;
+ GList *sources;
};
struct _PipeInputStreamClass
@@ -69,7 +69,7 @@ struct _PipeOutputStream
const gchar *buffer;
gsize count;
gboolean peer_closed;
- GSource *source;
+ GList *sources;
};
struct _PipeOutputStreamClass
@@ -120,12 +120,32 @@ pipe_input_stream_read (GInputStream *stream,
return count;
}
+static GList *
+set_all_sources_ready (GList *sources)
+{
+ GList *it = sources;
+ while (it != NULL) {
+ GSource *s = it->data;
+ GList *next = it->next;
+
+ if (s == NULL || g_source_is_destroyed(s)) {
+ /* remove */
+ sources = g_list_delete_link(sources, it);
+ g_source_unref(s);
+ } else {
+ /* dispatch */
+ g_source_set_ready_time(s, 0);
+ }
+ it = next;
+ }
+ return sources;
+}
+
static void
pipe_input_stream_check_source (PipeInputStream *self)
{
- if (self->source && !g_source_is_destroyed(self->source) &&
- g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
- g_source_set_ready_time(self->source, 0);
+ if (g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
+ self->sources = set_all_sources_ready(self->sources);
}
static gboolean
@@ -193,10 +213,8 @@ pipe_input_stream_dispose(GObject *object)
self->peer = NULL;
}
- if (self->source) {
- g_source_unref(self->source);
- self->source = NULL;
- }
+ g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
+ self->sources = NULL;
G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
}
@@ -234,14 +252,8 @@ pipe_input_stream_create_source (GPollableInputStream *stream,
PipeInputStream *self = PIPE_INPUT_STREAM(stream);
GSource *pollable_source;
- g_return_val_if_fail (self->source == NULL ||
- g_source_is_destroyed (self->source), NULL);
-
- if (self->source && g_source_is_destroyed (self->source))
- g_source_unref (self->source);
-
pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
- self->source = g_source_ref (pollable_source);
+ self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
return pollable_source;
}
@@ -319,10 +331,8 @@ pipe_output_stream_dispose(GObject *object)
self->peer = NULL;
}
- if (self->source) {
- g_source_unref(self->source);
- self->source = NULL;
- }
+ g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
+ self->sources = NULL;
G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
}
@@ -330,9 +340,8 @@ pipe_output_stream_dispose(GObject *object)
static void
pipe_output_stream_check_source (PipeOutputStream *self)
{
- if (self->source && !g_source_is_destroyed(self->source) &&
- g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
- g_source_set_ready_time(self->source, 0);
+ if (g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
+ self->sources = set_all_sources_ready(self->sources);
}
static gboolean
@@ -416,14 +425,8 @@ pipe_output_stream_create_source (GPollableOutputStream *stream,
PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
GSource *pollable_source;
- g_return_val_if_fail (self->source == NULL ||
- g_source_is_destroyed (self->source), NULL);
-
- if (self->source && g_source_is_destroyed (self->source))
- g_source_unref (self->source);
-
pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
- self->source = g_source_ref (pollable_source);
+ self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
return pollable_source;
}
More information about the Spice-commits
mailing list