[Spice-devel] [PATCH spice-gtk] Add GIOStream-based pipe
Marc-André Lureau
marcandre.lureau at redhat.com
Mon Feb 23 06:32:21 PST 2015
This code creates a pipe between 2 GIOStream, the input side read from
the peer output side, and vice-versa.
In the following patches, this will avoid the socket communication
to exchange with the embedded webdav server.
glib-2.0 >= 2.43.90 because GSimpleIOStream dependency.
---
Addresses Christophe review findings:
- remove dead struct
- remove useless checks
- clean up source in dispose
configure.ac | 4 +-
gtk/Makefile.am | 7 +
gtk/giopipe.c | 476 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
gtk/giopipe.h | 29 ++++
tests/Makefile.am | 5 +
tests/pipe.c | 313 +++++++++++++++++++++++++++++++++++
6 files changed, 833 insertions(+), 1 deletion(-)
create mode 100644 gtk/giopipe.c
create mode 100644 gtk/giopipe.h
create mode 100644 tests/pipe.c
diff --git a/configure.ac b/configure.ac
index 4e88dec..d98e502 100644
--- a/configure.ac
+++ b/configure.ac
@@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
if test "x$enable_webdav" = "xno"; then
have_phodav="no"
else
- PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no])
+ PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])
AC_SUBST(PHODAV_CFLAGS)
AC_SUBST(PHODAV_LIBS)
@@ -289,6 +289,8 @@ fi
AS_IF([test "x$have_phodav" = "xyes"],
AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav]))
+AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"])
+
AC_ARG_WITH([audio],
AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]),
[],
diff --git a/gtk/Makefile.am b/gtk/Makefile.am
index 7728fec..ab50c79 100644
--- a/gtk/Makefile.am
+++ b/gtk/Makefile.am
@@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES += \
$(NULL)
endif
+if WITH_PHODAV
+libspice_client_glib_2_0_la_SOURCES += \
+ giopipe.c \
+ giopipe.h \
+ $(NULL)
+endif
+
if WITH_UCONTEXT
libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c
endif
diff --git a/gtk/giopipe.c b/gtk/giopipe.c
new file mode 100644
index 0000000..d0b6ef2
--- /dev/null
+++ b/gtk/giopipe.c
@@ -0,0 +1,476 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ Copyright (C) 2015 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string.h>
+#include <errno.h>
+
+#include "giopipe.h"
+
+#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ())
+#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
+#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
+#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
+#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+
+typedef struct _PipeInputStreamClass PipeInputStreamClass;
+typedef struct _PipeInputStream PipeInputStream;
+typedef struct _PipeOutputStream PipeOutputStream;
+
+struct _PipeInputStream
+{
+ GInputStream parent_instance;
+
+ PipeOutputStream *peer;
+ gssize read;
+
+ /* GIOstream:closed is protected against pending operations, so we
+ * use an additional close flag to cancel those when the peer is
+ * closing.
+ */
+ gboolean closed;
+ GSource *source;
+};
+
+struct _PipeInputStreamClass
+{
+ GInputStreamClass parent_class;
+};
+
+#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ())
+#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
+#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
+#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
+#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+
+typedef struct _PipeOutputStreamClass PipeOutputStreamClass;
+
+struct _PipeOutputStream
+{
+ GOutputStream parent_instance;
+
+ PipeInputStream *peer;
+ const gchar *buffer;
+ gsize count;
+ gboolean closed;
+ GSource *source;
+};
+
+struct _PipeOutputStreamClass
+{
+ GOutputStreamClass parent_class;
+};
+
+static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+static void pipe_input_stream_check_source (PipeInputStream *self);
+static void pipe_output_stream_check_source (PipeOutputStream *self);
+
+G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ pipe_input_stream_pollable_iface_init))
+
+static gssize
+pipe_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+
+ g_return_val_if_fail(count > 0, -1);
+
+ if (g_input_stream_is_closed (stream) || self->closed) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is already closed");
+ return -1;
+ }
+
+ if (!self->peer->buffer) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror(EAGAIN));
+ return -1;
+ }
+
+ count = MIN(self->peer->count, count);
+ memcpy(buffer, self->peer->buffer, count);
+ self->read = count;
+ self->peer->buffer = NULL;
+
+ //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
+ /* schedule peer source */
+ pipe_output_stream_check_source(self->peer);
+
+ return count;
+}
+
+static void
+pipe_input_stream_check_source (PipeInputStream *self)
+{
+ if (self->source && !g_source_is_destroyed(self->source) &&
+ g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
+ g_source_set_ready_time(self->source, 0);
+}
+
+static gboolean
+pipe_input_stream_close (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeInputStream *self;
+
+ self = PIPE_INPUT_STREAM(stream);
+
+ if (self->peer) {
+ /* ignore any pending errors */
+ self->peer->closed = TRUE;
+ g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
+ pipe_output_stream_check_source(self->peer);
+ }
+
+ return TRUE;
+}
+
+static void
+pipe_input_stream_close_async (GInputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data)
+{
+ GTask *task;
+
+ task = g_task_new (stream, cancellable, callback, data);
+
+ /* will always return TRUE */
+ pipe_input_stream_close (stream, cancellable, NULL);
+
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+static gboolean
+pipe_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+ return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+static void
+pipe_input_stream_init (PipeInputStream *self)
+{
+ self->read = -1;
+}
+
+static void
+pipe_input_stream_dispose(GObject *object)
+{
+ PipeInputStream *self;
+
+ self = PIPE_INPUT_STREAM(object);
+
+ if (self->peer) {
+ g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+ self->peer = NULL;
+ }
+
+ if (self->source) {
+ g_source_unref(self->source);
+ self->source = NULL;
+ }
+
+ G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_input_stream_class_init (PipeInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+ istream_class->read_fn = pipe_input_stream_read;
+ istream_class->close_fn = pipe_input_stream_close;
+ istream_class->close_async = pipe_input_stream_close_async;
+ istream_class->close_finish = pipe_input_stream_close_finish;
+
+ gobject_class->dispose = pipe_input_stream_dispose;
+}
+
+static gboolean
+pipe_input_stream_is_readable (GPollableInputStream *stream)
+{
+ PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+ gboolean readable;
+
+ readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed;
+ //g_debug("readable %p %d", self->peer, readable);
+
+ return readable;
+}
+
+static GSource *
+pipe_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ PipeInputStream *self = PIPE_INPUT_STREAM(stream);
+ GSource *pollable_source;
+
+ g_return_val_if_fail (self->source == NULL ||
+ g_source_is_destroyed (self->source), NULL);
+
+ if (self->source && g_source_is_destroyed (self->source))
+ g_source_unref (self->source);
+
+ pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
+ self->source = g_source_ref (pollable_source);
+
+ return pollable_source;
+}
+
+static void
+pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = pipe_input_stream_is_readable;
+ iface->create_source = pipe_input_stream_create_source;
+}
+
+static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ pipe_output_stream_pollable_iface_init))
+
+static gssize
+pipe_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+ PipeInputStream *peer = self->peer;
+
+ //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
+ if (g_output_stream_is_closed (stream) || self->closed) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is already closed");
+ return -1;
+ }
+
+ /* this abuses pollable stream, writing sync would likely lead to
+ crashes, since the buffer pointer would become invalid, a
+ generic solution would need a copy..
+ */
+ g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
+ self->buffer = buffer;
+ self->count = count;
+
+ pipe_input_stream_check_source(self->peer);
+
+ if (peer->read < 0) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror (EAGAIN));
+ return -1;
+ }
+
+ g_assert(peer->read <= self->count);
+ count = peer->read;
+
+ self->buffer = NULL;
+ self->count = 0;
+ peer->read = -1;
+
+ return count;
+}
+
+static void
+pipe_output_stream_init (PipeOutputStream *stream)
+{
+}
+
+static void
+pipe_output_stream_dispose(GObject *object)
+{
+ PipeOutputStream *self;
+
+ self = PIPE_OUTPUT_STREAM(object);
+
+ if (self->peer) {
+ g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+ self->peer = NULL;
+ }
+
+ if (self->source) {
+ g_source_unref(self->source);
+ self->source = NULL;
+ }
+
+ G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_output_stream_check_source (PipeOutputStream *self)
+{
+ if (self->source && !g_source_is_destroyed(self->source) &&
+ g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
+ g_source_set_ready_time(self->source, 0);
+}
+
+static gboolean
+pipe_output_stream_close (GOutputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeOutputStream *self;
+
+ self = PIPE_OUTPUT_STREAM(stream);
+
+ if (self->peer) {
+ /* ignore any pending errors */
+ self->peer->closed = TRUE;
+ g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
+ pipe_input_stream_check_source(self->peer);
+ }
+
+ return TRUE;
+}
+
+static void
+pipe_output_stream_close_async (GOutputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data)
+{
+ GTask *task;
+
+ task = g_task_new (stream, cancellable, callback, data);
+
+ /* will always return TRUE */
+ pipe_output_stream_close (stream, cancellable, NULL);
+
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+static gboolean
+pipe_output_stream_close_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+ return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+
+static void
+pipe_output_stream_class_init (PipeOutputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GOutputStreamClass *ostream_class = G_OUTPUT_STREAM_CLASS (klass);
+
+ ostream_class->write_fn = pipe_output_stream_write;
+ ostream_class->close_fn = pipe_output_stream_close;
+ ostream_class->close_async = pipe_output_stream_close_async;
+ ostream_class->close_finish = pipe_output_stream_close_finish;
+
+ gobject_class->dispose = pipe_output_stream_dispose;
+}
+
+static gboolean
+pipe_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+ gboolean writable;
+
+ writable = self->buffer == NULL || self->peer->read >= 0;
+ //g_debug("writable %p %d", self, writable);
+
+ return writable;
+}
+
+static GSource *
+pipe_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+ GSource *pollable_source;
+
+ g_return_val_if_fail (self->source == NULL ||
+ g_source_is_destroyed (self->source), NULL);
+
+ if (self->source && g_source_is_destroyed (self->source))
+ g_source_unref (self->source);
+
+ pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
+ self->source = g_source_ref (pollable_source);
+
+ return pollable_source;
+}
+
+static void
+pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+ iface->is_writable = pipe_output_stream_is_writable;
+ iface->create_source = pipe_output_stream_create_source;
+}
+
+G_GNUC_INTERNAL void
+make_gio_pipe(GInputStream **input, GOutputStream **output)
+{
+ PipeInputStream *in;
+ PipeOutputStream *out;
+
+ g_return_if_fail(input != NULL && *input == NULL);
+ g_return_if_fail(output != NULL && *output == NULL);
+
+ in = g_object_new(TYPE_PIPE_INPUT_STREAM, NULL);
+ out = g_object_new(TYPE_PIPE_OUTPUT_STREAM, NULL);
+
+ out->peer = in;
+ g_object_add_weak_pointer(G_OBJECT(in), (gpointer*)&out->peer);
+
+ in->peer = out;
+ g_object_add_weak_pointer(G_OBJECT(out), (gpointer*)&in->peer);
+
+ *input = G_INPUT_STREAM(in);
+ *output = G_OUTPUT_STREAM(out);
+}
+
+G_GNUC_INTERNAL void
+spice_make_pipe(GIOStream **p1, GIOStream **p2)
+{
+ GInputStream *in1 = NULL, *in2 = NULL;
+ GOutputStream *out1 = NULL, *out2 = NULL;
+
+ g_return_if_fail(p1 != NULL);
+ g_return_if_fail(p2 != NULL);
+ g_return_if_fail(*p1 == NULL);
+ g_return_if_fail(*p2 == NULL);
+
+ make_gio_pipe(&in1, &out2);
+ make_gio_pipe(&in2, &out1);
+
+ *p1 = g_simple_io_stream_new(in1, out1);
+ *p2 = g_simple_io_stream_new(in2, out2);
+}
diff --git a/gtk/giopipe.h b/gtk/giopipe.h
new file mode 100644
index 0000000..46c2c9c
--- /dev/null
+++ b/gtk/giopipe.h
@@ -0,0 +1,29 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ Copyright (C) 2015 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+#ifndef __SPICE_GIO_PIPE_H__
+#define __SPICE_GIO_PIPE_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+void spice_make_pipe(GIOStream **p1, GIOStream **p2);
+
+G_END_DECLS
+
+#endif /* __SPICE_GIO_PIPE_H__ */
diff --git a/tests/Makefile.am b/tests/Makefile.am
index b236b12..34cfd5b 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,6 +5,10 @@ noinst_PROGRAMS = \
util \
$(NULL)
+if WITH_PHODAV
+noinst_PROGRAMS += pipe
+endif
+
TESTS = $(noinst_PROGRAMS)
AM_CPPFLAGS = \
@@ -20,5 +24,6 @@ LDADD = \
util_SOURCES = util.c
coroutine_SOURCES = coroutine.c
+pipe_SOURCES = pipe.c
-include $(top_srcdir)/git.mk
diff --git a/tests/pipe.c b/tests/pipe.c
new file mode 100644
index 0000000..841cb77
--- /dev/null
+++ b/tests/pipe.c
@@ -0,0 +1,313 @@
+#include <glib.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <locale.h>
+
+#include "giopipe.h"
+
+typedef struct _Fixture {
+ GIOStream *p1;
+ GIOStream *p2;
+
+ GInputStream *ip1;
+ GOutputStream *op1;
+ GInputStream *ip2;
+ GOutputStream *op2;
+
+ gchar buf[16];
+
+ GMainLoop *loop;
+ GCancellable *cancellable;
+ guint timeout;
+} Fixture;
+
+static gboolean
+stop_loop (gpointer data)
+{
+ GMainLoop *loop = data;
+
+ g_main_loop_quit (loop);
+ g_assert_not_reached();
+
+ return G_SOURCE_REMOVE;
+}
+
+static void
+fixture_set_up(Fixture *fixture,
+ gconstpointer user_data)
+{
+ int i;
+
+ spice_make_pipe(&fixture->p1, &fixture->p2);
+ g_assert_true(G_IS_IO_STREAM(fixture->p1));
+ g_assert_true(G_IS_IO_STREAM(fixture->p2));
+
+ fixture->op1 = g_io_stream_get_output_stream(fixture->p1);
+ g_assert_true(G_IS_OUTPUT_STREAM(fixture->op1));
+ fixture->ip1 = g_io_stream_get_input_stream(fixture->p1);
+ g_assert_true(G_IS_INPUT_STREAM(fixture->ip1));
+ fixture->op2 = g_io_stream_get_output_stream(fixture->p2);
+ g_assert_true(G_IS_OUTPUT_STREAM(fixture->op2));
+ fixture->ip2 = g_io_stream_get_input_stream(fixture->p2);
+ g_assert_true(G_IS_INPUT_STREAM(fixture->ip2));
+
+ for (i = 0; i < sizeof(fixture->buf); i++) {
+ fixture->buf[i] = 0x42 + i;
+ }
+
+ fixture->cancellable = g_cancellable_new();
+ fixture->loop = g_main_loop_new (NULL, FALSE);
+ fixture->timeout = g_timeout_add (1000, stop_loop, fixture->loop);
+}
+
+static void
+fixture_tear_down(Fixture *fixture,
+ gconstpointer user_data)
+{
+ g_clear_object(&fixture->p1);
+ g_clear_object(&fixture->p2);
+
+ g_clear_object(&fixture->cancellable);
+ g_source_remove(fixture->timeout);
+ g_main_loop_unref(fixture->loop);
+}
+
+static void
+test_pipe_readblock(Fixture *f, gconstpointer user_data)
+{
+ GError *error = NULL;
+ gssize size;
+
+ size = g_input_stream_read(f->ip2, f->buf, 1,
+ f->cancellable, &error);
+
+ g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+
+ g_clear_error(&error);
+}
+
+static void
+test_pipe_writeblock(Fixture *f, gconstpointer user_data)
+{
+ GError *error = NULL;
+ gssize size;
+
+ size = g_output_stream_write(f->op1, "", 1,
+ f->cancellable, &error);
+
+ g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+
+ g_clear_error(&error);
+}
+
+static void
+write_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ GError *error = NULL;
+ GMainLoop *loop = user_data;
+ gssize nbytes;
+
+ nbytes = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error);
+
+ g_assert_no_error(error);
+ g_assert_cmpint(nbytes, >, 0);
+ g_clear_error(&error);
+
+ g_main_loop_quit (loop);
+}
+
+static void
+read_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ GError *error = NULL;
+ gssize nbytes, expected = GPOINTER_TO_INT(user_data);
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+ g_assert_cmpint(nbytes, ==, expected);
+ g_assert_no_error(error);
+ g_clear_error(&error);
+}
+
+static void
+test_pipe_writeread(Fixture *f, gconstpointer user_data)
+{
+ g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+ f->cancellable, write_cb, f->loop);
+ g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+ f->cancellable, read_cb, GINT_TO_POINTER(1));
+
+ g_main_loop_run (f->loop);
+
+ g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+ f->cancellable, write_cb, f->loop);
+ g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+ f->cancellable, read_cb, GINT_TO_POINTER(1));
+
+ g_main_loop_run (f->loop);
+}
+
+static void
+test_pipe_readwrite(Fixture *f, gconstpointer user_data)
+{
+ g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+ f->cancellable, read_cb, GINT_TO_POINTER(1));
+ g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+ f->cancellable, write_cb, f->loop);
+
+ g_main_loop_run (f->loop);
+}
+
+static void
+read8_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ GError *error = NULL;
+ gssize nbytes;
+ GMainLoop *loop = user_data;
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+ g_assert_cmpint(nbytes, ==, 8);
+ g_assert_no_error(error);
+ g_clear_error(&error);
+}
+
+static void
+test_pipe_write16read8(Fixture *f, gconstpointer user_data)
+{
+ g_output_stream_write_async(f->op1, "0123456789abcdef", 16, G_PRIORITY_DEFAULT,
+ f->cancellable, write_cb, f->loop);
+ g_input_stream_read_async(f->ip2, f->buf, 8, G_PRIORITY_DEFAULT,
+ f->cancellable, read8_cb, GINT_TO_POINTER(8));
+
+ g_main_loop_run (f->loop);
+
+ /* check next read would block */
+ test_pipe_readblock(f, user_data);
+}
+
+static void
+test_pipe_write8read16(Fixture *f, gconstpointer user_data)
+{
+ g_output_stream_write_async(f->op1, "01234567", 8, G_PRIORITY_DEFAULT,
+ f->cancellable, write_cb, f->loop);
+ g_input_stream_read_async(f->ip2, f->buf, 16, G_PRIORITY_DEFAULT,
+ f->cancellable, read8_cb, GINT_TO_POINTER(8));
+
+ g_main_loop_run (f->loop);
+
+ /* check next read would block */
+ test_pipe_writeblock(f, user_data);
+}
+
+static void
+readclose_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ GError *error = NULL;
+ gssize nbytes;
+ GMainLoop *loop = user_data;
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+ g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+ g_clear_error(&error);
+
+ g_main_loop_quit (loop);
+}
+
+static void
+test_pipe_readclosestream(Fixture *f, gconstpointer user_data)
+{
+ GError *error = NULL;
+
+ g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+ f->cancellable, readclose_cb, f->loop);
+ g_io_stream_close(f->p1, f->cancellable, &error);
+
+ g_main_loop_run (f->loop);
+}
+
+static void
+test_pipe_readclose(Fixture *f, gconstpointer user_data)
+{
+ GError *error = NULL;
+
+ g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+ f->cancellable, readclose_cb, f->loop);
+ g_output_stream_close(f->op1, f->cancellable, &error);
+
+ g_main_loop_run (f->loop);
+}
+
+static void
+readcancel_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ GError *error = NULL;
+ gssize nbytes;
+ GMainLoop *loop = user_data;
+
+ nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+ g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+ g_clear_error(&error);
+
+ g_main_loop_quit (loop);
+}
+
+static void
+test_pipe_readcancel(Fixture *f, gconstpointer user_data)
+{
+ GError *error = NULL;
+
+ g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+ f->cancellable, readcancel_cb, f->loop);
+ g_output_stream_close(f->op1, f->cancellable, &error);
+
+ g_main_loop_run (f->loop);
+}
+
+int main(int argc, char* argv[])
+{
+ setlocale(LC_ALL, "");
+
+ g_test_init(&argc, &argv, NULL);
+
+ g_test_add("/pipe/readblock", Fixture, NULL,
+ fixture_set_up, test_pipe_readblock,
+ fixture_tear_down);
+
+ g_test_add("/pipe/writeblock", Fixture, NULL,
+ fixture_set_up, test_pipe_writeblock,
+ fixture_tear_down);
+
+ g_test_add("/pipe/writeread", Fixture, NULL,
+ fixture_set_up, test_pipe_writeread,
+ fixture_tear_down);
+
+ g_test_add("/pipe/readwrite", Fixture, NULL,
+ fixture_set_up, test_pipe_readwrite,
+ fixture_tear_down);
+
+ g_test_add("/pipe/write16read8", Fixture, NULL,
+ fixture_set_up, test_pipe_write16read8,
+ fixture_tear_down);
+
+ g_test_add("/pipe/write8read16", Fixture, NULL,
+ fixture_set_up, test_pipe_write8read16,
+ fixture_tear_down);
+
+ g_test_add("/pipe/readclosestream", Fixture, NULL,
+ fixture_set_up, test_pipe_readclosestream,
+ fixture_tear_down);
+
+ g_test_add("/pipe/readclose", Fixture, NULL,
+ fixture_set_up, test_pipe_readclose,
+ fixture_tear_down);
+
+ g_test_add("/pipe/readcancel", Fixture, NULL,
+ fixture_set_up, test_pipe_readcancel,
+ fixture_tear_down);
+
+ return g_test_run();
+}
--
2.1.0
More information about the Spice-devel
mailing list