[Spice-devel] [PATCH spice-gtk 1/3] Add GIOStream-based pipe

Marc-André Lureau marcandre.lureau at redhat.com
Thu Feb 12 05:12:39 PST 2015


This allows to create a pipe between 2 GIOStream, the input side read
from the peer output side, and vice-versa.

In the following patches, this will avoid the socket communication
to exchange with the embedded webdav server.
---
 gtk/Makefile.am   |   2 +
 gtk/giopipe.c     | 569 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 gtk/giopipe.h     |  53 +++++
 tests/Makefile.am |   2 +
 tests/pipe.c      | 313 ++++++++++++++++++++++++++++++
 5 files changed, 939 insertions(+)
 create mode 100644 gtk/giopipe.c
 create mode 100644 gtk/giopipe.h
 create mode 100644 tests/pipe.c

diff --git a/gtk/Makefile.am b/gtk/Makefile.am
index 7728fec..678dd26 100644
--- a/gtk/Makefile.am
+++ b/gtk/Makefile.am
@@ -280,6 +280,8 @@ libspice_client_glib_2_0_la_SOURCES =			\
 	$(USB_ACL_HELPER_SRCS)				\
 	vmcstream.c					\
 	vmcstream.h					\
+	giopipe.c					\
+	giopipe.h					\
 	wocky-http-proxy.c				\
 	wocky-http-proxy.h				\
 							\
diff --git a/gtk/giopipe.c b/gtk/giopipe.c
new file mode 100644
index 0000000..c62ba9f
--- /dev/null
+++ b/gtk/giopipe.c
@@ -0,0 +1,569 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+  Copyright (C) 2015 Red Hat, Inc.
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string.h>
+#include <errno.h>
+
+#include "giopipe.h"
+
+#define TYPE_PIPE_INPUT_STREAM         (pipe_input_stream_get_type ())
+#define PIPE_INPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
+#define PIPE_INPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+#define IS_PIPE_INPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
+#define IS_PIPE_INPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
+#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+
+typedef struct _PipeInputStreamClass                              PipeInputStreamClass;
+typedef struct _PipeInputStream                                   PipeInputStream;
+typedef struct _PipeOutputStream                                  PipeOutputStream;
+
+struct _PipeInputStream
+{
+    GInputStream parent_instance;
+
+    PipeOutputStream *peer;
+    gssize read;
+
+    /* GIOstream:closed is protected against pending operations, so we
+     * use an additional close flag to cancel those when the peer is
+     * closing.
+     */
+    gboolean closed;
+};
+
+struct _PipeInputStreamClass
+{
+    GInputStreamClass parent_class;
+};
+
+#define TYPE_PIPE_OUTPUT_STREAM         (pipe_output_stream_get_type ())
+#define PIPE_OUTPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
+#define PIPE_OUTPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+#define IS_PIPE_OUTPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
+#define IS_PIPE_OUTPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
+#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+
+typedef struct _PipeOutputStreamClass                             PipeOutputStreamClass;
+
+struct _PipeOutputStream
+{
+    GOutputStream parent_instance;
+
+    PipeInputStream *peer;
+    const gchar *buffer;
+    gsize count;
+    gboolean closed;
+};
+
+struct _PipeOutputStreamClass
+{
+    GOutputStreamClass parent_class;
+};
+
+struct _SpicePipeStreamPrivate {
+    PipeInputStream *input_stream;
+    PipeOutputStream *output_stream;
+};
+
+typedef struct _ConditionSource
+{
+    GSource src;
+
+    GSourceFunc condition;
+    gpointer data;
+} ConditionSource;
+
+/*
+ * Call immediately before the main loop does an iteration. Returns
+ * true if the condition we're checking is ready for dispatch
+ */
+static gboolean
+condition_prepare(GSource *source, int *timeout)
+{
+    ConditionSource *src = (ConditionSource *)source;
+
+    *timeout = -1;
+
+    return src->condition(src->data);
+}
+
+/*
+ * Call immediately after the main loop does an iteration. Returns
+ * true if the condition we're checking is ready for dispatch
+ */
+static gboolean
+condition_check(GSource *source)
+{
+    ConditionSource *src = (ConditionSource *)source;
+
+    return src->condition(src->data);
+}
+
+static gboolean
+condition_dispatch(GSource *source G_GNUC_UNUSED,
+                   GSourceFunc callback,
+                   gpointer user_data)
+{
+    if (!callback) {
+        g_warning ("Condition source dispatched without callback\n"
+                   "You must call g_source_set_callback().");
+        return FALSE;
+    }
+
+    return callback(user_data);
+}
+
+static gboolean
+condition_closure_callback (gpointer data)
+{
+    GClosure *closure = data;
+    GValue result_value = G_VALUE_INIT;
+    gboolean result;
+
+    g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+    g_closure_invoke (closure, &result_value, 0, NULL, NULL);
+
+    result = g_value_get_boolean (&result_value);
+    g_value_unset (&result_value);
+
+    return result;
+}
+
+GSourceFuncs conditionFuncs = {
+    .prepare = condition_prepare,
+    .check = condition_check,
+    .dispatch = condition_dispatch,
+    .closure_callback = condition_closure_callback
+};
+
+static GSource *
+condition_source_new (GSourceFunc condition, gpointer data)
+{
+    GSource *source;
+    ConditionSource *src;
+
+    source = g_source_new (&conditionFuncs, sizeof (ConditionSource));
+    g_source_set_priority (source, G_PRIORITY_DEFAULT_IDLE);
+
+    src = (ConditionSource *)source;
+    src->condition = condition;
+    src->data = data;
+
+    return source;
+}
+
+static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                                pipe_input_stream_pollable_iface_init))
+
+static gssize
+pipe_input_stream_read (GInputStream  *stream,
+                        void          *buffer,
+                        gsize          count,
+                        GCancellable  *cancellable,
+                        GError       **error)
+{
+    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+
+    g_return_val_if_fail(count > 0, -1);
+
+    if (g_input_stream_is_closed (stream) || self->closed) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                             "Stream is already closed");
+        return -1;
+    }
+
+    if (!self->peer->buffer) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                             g_strerror(EAGAIN));
+        return -1;
+    }
+
+    g_return_val_if_fail(self->peer->buffer, -1);
+
+    count = MIN(self->peer->count, count);
+    memcpy(buffer, self->peer->buffer, count);
+    self->read = count;
+    self->peer->buffer = NULL;
+
+    g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
+
+    return count;
+}
+
+static gboolean
+pipe_input_stream_close (GInputStream  *stream,
+                         GCancellable   *cancellable,
+                         GError        **error)
+{
+    PipeInputStream *self;
+
+    self = PIPE_INPUT_STREAM(stream);
+
+    if (self->peer) {
+        self->peer->closed = TRUE;
+        /* ignore any pending errors */
+        g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
+    }
+
+    return TRUE;
+}
+
+static void
+pipe_input_stream_close_async (GInputStream       *stream,
+                               int                  io_priority,
+                               GCancellable        *cancellable,
+                               GAsyncReadyCallback  callback,
+                               gpointer             data)
+{
+    GTask *task;
+
+    task = g_task_new (stream, cancellable, callback, data);
+
+    /* will always return TRUE */
+    pipe_input_stream_close (stream, cancellable, NULL);
+
+    g_task_return_boolean (task, TRUE);
+    g_object_unref (task);
+}
+
+static gboolean
+pipe_input_stream_close_finish (GInputStream  *stream,
+                                GAsyncResult   *result,
+                                GError        **error)
+{
+    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+    return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+static void
+pipe_input_stream_init (PipeInputStream *self)
+{
+    self->read = -1;
+}
+
+static void
+pipe_input_stream_dispose(GObject *object)
+{
+    PipeInputStream *self;
+
+    self = PIPE_INPUT_STREAM(object);
+
+    if (self->peer) {
+        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+        self->peer = NULL;
+    }
+
+    G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_input_stream_class_init (PipeInputStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+    GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+    istream_class->read_fn  = pipe_input_stream_read;
+    istream_class->close_fn = pipe_input_stream_close;
+    istream_class->close_async  = pipe_input_stream_close_async;
+    istream_class->close_finish = pipe_input_stream_close_finish;
+
+    gobject_class->dispose = pipe_input_stream_dispose;
+}
+
+static gboolean
+pipe_input_stream_is_readable (GPollableInputStream *stream)
+{
+    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+    gboolean readable;
+
+    readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed;
+    g_debug("readable %p %d", self->peer, readable);
+
+    return readable;
+}
+
+static GSource *
+pipe_input_stream_create_source (GPollableInputStream *stream,
+                                 GCancellable         *cancellable)
+{
+    GSource *base_source, *pollable_source;
+
+    base_source = condition_source_new ((GSourceFunc)pipe_input_stream_is_readable, stream);
+    g_source_set_name(base_source, "pipe read condition");
+    pollable_source = g_pollable_source_new_full (stream, base_source,
+                                                  cancellable);
+    g_source_unref (base_source);
+
+    return pollable_source;
+}
+
+static void
+pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+    iface->is_readable   = pipe_input_stream_is_readable;
+    iface->create_source = pipe_input_stream_create_source;
+}
+
+static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+                                                pipe_output_stream_pollable_iface_init))
+
+static gssize
+pipe_output_stream_write (GOutputStream  *stream,
+                          const void     *buffer,
+                          gsize           count,
+                          GCancellable   *cancellable,
+                          GError        **error)
+{
+    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+    PipeInputStream *peer = self->peer;
+
+    g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
+    if (g_output_stream_is_closed (stream) || self->closed) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                             "Stream is already closed");
+        return -1;
+    }
+
+    /* this abuses pollable stream, writing sync would likely lead to
+       crashes, since the buffer pointer would become invalid, a
+       generic solution would need a copy..
+    */
+    g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
+    self->buffer = buffer;
+    self->count = count;
+
+    if (peer->read < 0) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                             g_strerror (EAGAIN));
+        return -1;
+    }
+
+    g_assert(peer->read <= self->count);
+    count = peer->read;
+
+    self->buffer = NULL;
+    self->count = 0;
+    peer->read = -1;
+
+    return count;
+}
+
+static void
+pipe_output_stream_init (PipeOutputStream *stream)
+{
+}
+
+static void
+pipe_output_stream_dispose(GObject *object)
+{
+    PipeOutputStream *self;
+
+    self = PIPE_OUTPUT_STREAM(object);
+
+    if (self->peer) {
+        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+        self->peer = NULL;
+    }
+
+    G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
+}
+
+static gboolean
+pipe_output_stream_close (GOutputStream  *stream,
+                          GCancellable   *cancellable,
+                          GError        **error)
+{
+    PipeOutputStream *self;
+
+    self = PIPE_OUTPUT_STREAM(stream);
+
+    if (self->peer) {
+        self->peer->closed = TRUE;
+        /* ignore any pending errors */
+        g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
+    }
+
+    return TRUE;
+}
+
+static void
+pipe_output_stream_close_async (GOutputStream       *stream,
+                                int                  io_priority,
+                                GCancellable        *cancellable,
+                                GAsyncReadyCallback  callback,
+                                gpointer             data)
+{
+    GTask *task;
+
+    task = g_task_new (stream, cancellable, callback, data);
+
+    /* will always return TRUE */
+    pipe_output_stream_close (stream, cancellable, NULL);
+
+    g_task_return_boolean (task, TRUE);
+    g_object_unref (task);
+}
+
+static gboolean
+pipe_output_stream_close_finish (GOutputStream  *stream,
+                                 GAsyncResult   *result,
+                                 GError        **error)
+{
+    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+    return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+
+static void
+pipe_output_stream_class_init (PipeOutputStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+    GOutputStreamClass *ostream_class = G_OUTPUT_STREAM_CLASS (klass);
+
+    ostream_class->write_fn = pipe_output_stream_write;
+    ostream_class->close_fn = pipe_output_stream_close;
+    ostream_class->close_async  = pipe_output_stream_close_async;
+    ostream_class->close_finish = pipe_output_stream_close_finish;
+
+    gobject_class->dispose = pipe_output_stream_dispose;
+}
+
+static gboolean
+pipe_output_stream_is_writable (GPollableOutputStream *stream)
+{
+    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+    gboolean writable;
+
+    writable = self->buffer == NULL || self->peer->read >= 0;
+    g_debug("writable %p %d", self, writable);
+
+    return writable;
+}
+
+static GSource *
+pipe_output_stream_create_source (GPollableOutputStream *self,
+                                  GCancellable          *cancellable)
+{
+    GSource *base_source, *pollable_source;
+
+    base_source = condition_source_new ((GSourceFunc)pipe_output_stream_is_writable, self);
+    g_source_set_name(base_source, "pipe write condition");
+    pollable_source = g_pollable_source_new_full (self, base_source, cancellable);
+    g_source_unref (base_source);
+
+    return pollable_source;
+}
+
+static void
+pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+    iface->is_writable = pipe_output_stream_is_writable;
+    iface->create_source = pipe_output_stream_create_source;
+}
+
+G_DEFINE_TYPE_WITH_PRIVATE(SpicePipeStream, spice_pipe_stream, G_TYPE_IO_STREAM)
+
+static GInputStream *
+get_input_stream (GIOStream *io_stream)
+{
+    SpicePipeStream *self =  (SpicePipeStream *) io_stream;
+
+    return (GInputStream *)self->priv->input_stream;
+}
+
+static GOutputStream *
+get_output_stream (GIOStream *io_stream)
+{
+    SpicePipeStream *self =  (SpicePipeStream *) io_stream;
+
+    return (GOutputStream *)self->priv->output_stream;
+}
+
+static void
+spice_pipe_stream_init(SpicePipeStream *pipe)
+{
+    pipe->priv = spice_pipe_stream_get_instance_private (pipe);
+
+    pipe->priv->input_stream = g_object_new(TYPE_PIPE_INPUT_STREAM, NULL);
+    pipe->priv->output_stream = g_object_new(TYPE_PIPE_OUTPUT_STREAM, NULL);
+}
+
+static void
+spice_pipe_set_peer(SpicePipeStream *pipe, SpicePipeStream *peer)
+{
+    pipe->priv->output_stream->peer = peer->priv->input_stream;
+    g_object_add_weak_pointer(G_OBJECT(pipe->priv->output_stream->peer),
+                              (gpointer*)&pipe->priv->output_stream->peer);
+    pipe->priv->input_stream->peer = peer->priv->output_stream;
+    g_object_add_weak_pointer(G_OBJECT(pipe->priv->input_stream->peer),
+                              (gpointer*)&pipe->priv->input_stream->peer);
+}
+
+static void
+spice_pipe_stream_finalize(GObject *object)
+{
+    SpicePipeStream *pipe;
+
+    pipe = SPICE_PIPE_STREAM(object);
+
+    g_clear_object(&pipe->priv->input_stream);
+    g_clear_object(&pipe->priv->output_stream);
+
+    G_OBJECT_CLASS(spice_pipe_stream_parent_class)->finalize (object);
+}
+
+static void
+spice_pipe_stream_class_init(SpicePipeStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+    GIOStreamClass *io_class = G_IO_STREAM_CLASS (klass);
+
+    io_class->get_input_stream = get_input_stream;
+    io_class->get_output_stream = get_output_stream;
+
+    gobject_class->finalize = spice_pipe_stream_finalize;
+}
+
+G_GNUC_INTERNAL void
+spice_make_pipe(GIOStream **p1, GIOStream **p2)
+{
+    SpicePipeStream *a, *b;
+
+    g_return_if_fail(p1 != NULL);
+    g_return_if_fail(p2 != NULL);
+    g_return_if_fail(*p1 == NULL);
+    g_return_if_fail(*p2 == NULL);
+
+    a = g_object_new(SPICE_TYPE_PIPE_STREAM, NULL);
+    b = g_object_new(SPICE_TYPE_PIPE_STREAM, NULL);
+
+    spice_pipe_set_peer(a, b);
+    spice_pipe_set_peer(b, a);
+
+    *p1 = G_IO_STREAM(a);
+    *p2 = G_IO_STREAM(b);
+}
diff --git a/gtk/giopipe.h b/gtk/giopipe.h
new file mode 100644
index 0000000..e8ce296
--- /dev/null
+++ b/gtk/giopipe.h
@@ -0,0 +1,53 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+  Copyright (C) 2015 Red Hat, Inc.
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+#ifndef __SPICE_GIO_PIPE_H__
+#define __SPICE_GIO_PIPE_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define SPICE_TYPE_PIPE_STREAM         (spice_pipe_stream_get_type ())
+#define SPICE_PIPE_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_PIPE_STREAM, SpicePipeStream))
+#define SPICE_PIPE_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_PIPE_STREAM, SpicePipeStreamClass))
+#define SPICE_IS_PIPE_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_PIPE_STREAM))
+#define SPICE_IS_PIPE_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_PIPE_STREAM))
+#define SPICE_PIPE_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_PIPE_STREAM, SpicePipeStreamClass))
+
+typedef struct _SpicePipeStreamPrivate                            SpicePipeStreamPrivate;
+typedef struct _SpicePipeStreamClass                              SpicePipeStreamClass;
+typedef struct _SpicePipeStream                                   SpicePipeStream;
+
+struct _SpicePipeStream
+{
+    GIOStream parent_instance;
+
+    /*< private >*/
+    SpicePipeStreamPrivate *priv;
+};
+
+struct _SpicePipeStreamClass
+{
+    GIOStreamClass parent_class;
+};
+
+void spice_make_pipe(GIOStream **p1, GIOStream **p2);
+
+G_END_DECLS
+
+#endif /* __SPICE_GIO_PIPE_H__ */
diff --git a/tests/Makefile.am b/tests/Makefile.am
index b236b12..5e57aa6 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -3,6 +3,7 @@ NULL =
 noinst_PROGRAMS =				\
 	coroutine				\
 	util					\
+	pipe					\
 	$(NULL)
 
 TESTS = $(noinst_PROGRAMS)
@@ -20,5 +21,6 @@ LDADD =							\
 
 util_SOURCES = util.c
 coroutine_SOURCES = coroutine.c
+pipe_SOURCES = pipe.c
 
 -include $(top_srcdir)/git.mk
diff --git a/tests/pipe.c b/tests/pipe.c
new file mode 100644
index 0000000..841cb77
--- /dev/null
+++ b/tests/pipe.c
@@ -0,0 +1,313 @@
+#include <glib.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <locale.h>
+
+#include "giopipe.h"
+
+typedef struct _Fixture {
+    GIOStream *p1;
+    GIOStream *p2;
+
+    GInputStream *ip1;
+    GOutputStream *op1;
+    GInputStream *ip2;
+    GOutputStream *op2;
+
+    gchar buf[16];
+
+    GMainLoop *loop;
+    GCancellable *cancellable;
+    guint timeout;
+} Fixture;
+
+static gboolean
+stop_loop (gpointer data)
+{
+    GMainLoop *loop = data;
+
+    g_main_loop_quit (loop);
+    g_assert_not_reached();
+
+    return G_SOURCE_REMOVE;
+}
+
+static void
+fixture_set_up(Fixture *fixture,
+               gconstpointer user_data)
+{
+    int i;
+
+    spice_make_pipe(&fixture->p1, &fixture->p2);
+    g_assert_true(G_IS_IO_STREAM(fixture->p1));
+    g_assert_true(G_IS_IO_STREAM(fixture->p2));
+
+    fixture->op1 = g_io_stream_get_output_stream(fixture->p1);
+    g_assert_true(G_IS_OUTPUT_STREAM(fixture->op1));
+    fixture->ip1 = g_io_stream_get_input_stream(fixture->p1);
+    g_assert_true(G_IS_INPUT_STREAM(fixture->ip1));
+    fixture->op2 = g_io_stream_get_output_stream(fixture->p2);
+    g_assert_true(G_IS_OUTPUT_STREAM(fixture->op2));
+    fixture->ip2 = g_io_stream_get_input_stream(fixture->p2);
+    g_assert_true(G_IS_INPUT_STREAM(fixture->ip2));
+
+    for (i = 0; i < sizeof(fixture->buf); i++) {
+        fixture->buf[i] = 0x42 + i;
+    }
+
+    fixture->cancellable = g_cancellable_new();
+    fixture->loop = g_main_loop_new (NULL, FALSE);
+    fixture->timeout = g_timeout_add (1000, stop_loop, fixture->loop);
+}
+
+static void
+fixture_tear_down(Fixture *fixture,
+                  gconstpointer user_data)
+{
+    g_clear_object(&fixture->p1);
+    g_clear_object(&fixture->p2);
+
+    g_clear_object(&fixture->cancellable);
+    g_source_remove(fixture->timeout);
+    g_main_loop_unref(fixture->loop);
+}
+
+static void
+test_pipe_readblock(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+    gssize size;
+
+    size = g_input_stream_read(f->ip2, f->buf, 1,
+                               f->cancellable, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+
+    g_clear_error(&error);
+}
+
+static void
+test_pipe_writeblock(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+    gssize size;
+
+    size = g_output_stream_write(f->op1, "", 1,
+                                 f->cancellable, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+
+    g_clear_error(&error);
+}
+
+static void
+write_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    GMainLoop *loop = user_data;
+    gssize nbytes;
+
+    nbytes = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error);
+
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, >, 0);
+    g_clear_error(&error);
+
+    g_main_loop_quit (loop);
+}
+
+static void
+read_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes, expected = GPOINTER_TO_INT(user_data);
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_cmpint(nbytes, ==, expected);
+    g_assert_no_error(error);
+    g_clear_error(&error);
+}
+
+static void
+test_pipe_writeread(Fixture *f, gconstpointer user_data)
+{
+    g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_cb, GINT_TO_POINTER(1));
+
+    g_main_loop_run (f->loop);
+
+    g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_cb, GINT_TO_POINTER(1));
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+test_pipe_readwrite(Fixture *f, gconstpointer user_data)
+{
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_cb, GINT_TO_POINTER(1));
+    g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+read8_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes;
+    GMainLoop *loop = user_data;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_cmpint(nbytes, ==, 8);
+    g_assert_no_error(error);
+    g_clear_error(&error);
+}
+
+static void
+test_pipe_write16read8(Fixture *f, gconstpointer user_data)
+{
+    g_output_stream_write_async(f->op1, "0123456789abcdef", 16, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 8, G_PRIORITY_DEFAULT,
+                              f->cancellable, read8_cb, GINT_TO_POINTER(8));
+
+    g_main_loop_run (f->loop);
+
+    /* check next read would block */
+    test_pipe_readblock(f, user_data);
+}
+
+static void
+test_pipe_write8read16(Fixture *f, gconstpointer user_data)
+{
+    g_output_stream_write_async(f->op1, "01234567", 8, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 16, G_PRIORITY_DEFAULT,
+                              f->cancellable, read8_cb, GINT_TO_POINTER(8));
+
+    g_main_loop_run (f->loop);
+
+    /* check next read would block */
+    test_pipe_writeblock(f, user_data);
+}
+
+static void
+readclose_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes;
+    GMainLoop *loop = user_data;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+    g_clear_error(&error);
+
+    g_main_loop_quit (loop);
+}
+
+static void
+test_pipe_readclosestream(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, readclose_cb, f->loop);
+    g_io_stream_close(f->p1, f->cancellable, &error);
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+test_pipe_readclose(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, readclose_cb, f->loop);
+    g_output_stream_close(f->op1, f->cancellable, &error);
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+readcancel_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes;
+    GMainLoop *loop = user_data;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+    g_clear_error(&error);
+
+    g_main_loop_quit (loop);
+}
+
+static void
+test_pipe_readcancel(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, readcancel_cb, f->loop);
+    g_output_stream_close(f->op1, f->cancellable, &error);
+
+    g_main_loop_run (f->loop);
+}
+
+int main(int argc, char* argv[])
+{
+    setlocale(LC_ALL, "");
+
+    g_test_init(&argc, &argv, NULL);
+
+    g_test_add("/pipe/readblock", Fixture, NULL,
+               fixture_set_up, test_pipe_readblock,
+               fixture_tear_down);
+
+    g_test_add("/pipe/writeblock", Fixture, NULL,
+               fixture_set_up, test_pipe_writeblock,
+               fixture_tear_down);
+
+    g_test_add("/pipe/writeread", Fixture, NULL,
+               fixture_set_up, test_pipe_writeread,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readwrite", Fixture, NULL,
+               fixture_set_up, test_pipe_readwrite,
+               fixture_tear_down);
+
+    g_test_add("/pipe/write16read8", Fixture, NULL,
+               fixture_set_up, test_pipe_write16read8,
+               fixture_tear_down);
+
+    g_test_add("/pipe/write8read16", Fixture, NULL,
+               fixture_set_up, test_pipe_write8read16,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readclosestream", Fixture, NULL,
+               fixture_set_up, test_pipe_readclosestream,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readclose", Fixture, NULL,
+               fixture_set_up, test_pipe_readclose,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readcancel", Fixture, NULL,
+               fixture_set_up, test_pipe_readcancel,
+               fixture_tear_down);
+
+    return g_test_run();
+}
-- 
2.1.0



More information about the Spice-devel mailing list