[Spice-commits] 5 commits - configure.ac gtk/channel-webdav.c gtk/giopipe.c gtk/giopipe.h gtk/Makefile.am gtk/spice-session.c spice-common tests/Makefile.am tests/pipe.c

Marc-André Lureau elmarco at kemper.freedesktop.org
Tue Mar 3 05:54:40 PST 2015


 configure.ac         |    4 
 gtk/Makefile.am      |    7 
 gtk/channel-webdav.c |  246 ++++++--------------------
 gtk/giopipe.c        |  476 +++++++++++++++++++++++++++++++++++++++++++++++++++
 gtk/giopipe.h        |   29 +++
 gtk/spice-session.c  |   72 +++++--
 spice-common         |    2 
 tests/Makefile.am    |    6 
 tests/pipe.c         |  313 +++++++++++++++++++++++++++++++++
 9 files changed, 944 insertions(+), 211 deletions(-)

New commits:
commit 62d7ae80b692cb699ae9c9f2acee7ef6e9a91ecf
Author: Marc-André Lureau <marcandre.lureau at redhat.com>
Date:   Sun Feb 15 14:07:39 2015 +0100

    session: bind path and read-only to webdav server
    
    Keep the server property in sync with the session properties

diff --git a/gtk/spice-session.c b/gtk/spice-session.c
index 08bf80f..516184d 100644
--- a/gtk/spice-session.c
+++ b/gtk/spice-session.c
@@ -2662,9 +2662,18 @@ PhodavServer* spice_session_get_webdav_server(SpiceSession *session)
 
     g_mutex_lock(&mutex);
 
-    if (priv->webdav == NULL)
-        priv->webdav = phodav_server_new(shared_dir);
+    if (priv->webdav)
+        goto end;
+
+    priv->webdav = phodav_server_new(shared_dir);
+    g_object_bind_property(session,  "share-dir-ro",
+                           priv->webdav, "read-only",
+                           G_BINDING_SYNC_CREATE|G_BINDING_BIDIRECTIONAL);
+    g_object_bind_property(session,  "shared-dir",
+                           priv->webdav, "root",
+                           G_BINDING_SYNC_CREATE|G_BINDING_BIDIRECTIONAL);
 
+end:
     g_mutex_unlock(&mutex);
 #endif
 
commit 9de2fd2716fce5058316555202ad4fe23a285dae
Author: Marc-André Lureau <marcandre.lureau at redhat.com>
Date:   Sun Feb 15 14:06:28 2015 +0100

    session: add share-dir-ro property
    
    Add a property to specify if share folder access is read-only.

diff --git a/gtk/spice-session.c b/gtk/spice-session.c
index ebddaea..08bf80f 100644
--- a/gtk/spice-session.c
+++ b/gtk/spice-session.c
@@ -61,6 +61,7 @@ struct _SpiceSessionPrivate {
     gboolean          read_only;
     SpiceURI          *proxy;
     gchar             *shared_dir;
+    gboolean          share_dir_ro;
 
     /* whether to enable audio */
     gboolean          audio;
@@ -199,6 +200,7 @@ enum {
     PROP_PROXY,
     PROP_SECURE_CHANNELS,
     PROP_SHARED_DIR,
+    PROP_SHARE_DIR_RO,
     PROP_USERNAME,
     PROP_UNIX_PATH,
 };
@@ -653,6 +655,9 @@ static void spice_session_get_property(GObject    *gobject,
     case PROP_SHARED_DIR:
         g_value_set_string(value, spice_session_get_shared_dir(session));
         break;
+    case PROP_SHARE_DIR_RO:
+        g_value_set_boolean(value, s->share_dir_ro);
+        break;
     default:
 	G_OBJECT_WARN_INVALID_PROPERTY_ID(gobject, prop_id, pspec);
 	break;
@@ -786,6 +791,9 @@ static void spice_session_set_property(GObject      *gobject,
     case PROP_SHARED_DIR:
         spice_session_set_shared_dir(session, g_value_get_string(value));
         break;
+    case PROP_SHARE_DIR_RO:
+        s->share_dir_ro = g_value_get_boolean(value);
+        break;
     default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID(gobject, prop_id, pspec);
         break;
@@ -1378,6 +1386,23 @@ static void spice_session_class_init(SpiceSessionClass *klass)
                              G_PARAM_CONSTRUCT |
                              G_PARAM_STATIC_STRINGS));
 
+    /**
+     * SpiceSession:share-dir-ro:
+     *
+     * Whether to share the directory read-only.
+     *
+     * Since: 0.28
+     **/
+    g_object_class_install_property
+        (gobject_class, PROP_SHARE_DIR_RO,
+         g_param_spec_boolean("share-dir-ro",
+                              "Share directory read-only",
+                              "Share directory read-only",
+                              FALSE,
+                              G_PARAM_READWRITE |
+                              G_PARAM_CONSTRUCT |
+                              G_PARAM_STATIC_STRINGS));
+
     g_type_class_add_private(klass, sizeof(SpiceSessionPrivate));
 }
 
commit f8af1f9aeb85e0ccbe60b656a9d926e8d1e5ba5e
Author: Marc-André Lureau <marcandre.lureau at redhat.com>
Date:   Thu Feb 12 14:02:40 2015 +0100

    webdav: use a pipe to connect to server
    
    Instead of listening on TCP sockets, and proxying connections there,
    make the webdav server accept new connections from stream. The streams
    are user-space GIOStream pipe, one side is connected to the Spice webdav
    channel muxer/demuxer, the other side is a SoupSocket client.
    
    This makes the server not exposed any local public access, avoid the
    need for server threads, or proxying the connections through system
    sockets.

diff --git a/gtk/channel-webdav.c b/gtk/channel-webdav.c
index 8cf53cf..95c0521 100644
--- a/gtk/channel-webdav.c
+++ b/gtk/channel-webdav.c
@@ -24,6 +24,7 @@
 #include "spice-marshal.h"
 #include "glib-compat.h"
 #include "vmcstream.h"
+#include "giopipe.h"
 
 /**
  * SECTION:channel-webdav
@@ -185,8 +186,7 @@ typedef struct Client
 {
     guint refs;
     SpiceWebdavChannel *self;
-    GIOStream *conn;
-    OutputQueue *output;
+    GIOStream *pipe;
     gint64 id;
     GCancellable *cancellable;
 
@@ -204,9 +204,8 @@ client_unref(Client *client)
         return;
 
     g_free(client->mux.buf);
-    output_queue_free(client->output);
 
-    g_object_unref(client->conn);
+    g_object_unref(client->pipe);
     g_object_unref(client->cancellable);
 
     g_free(client);
@@ -289,7 +288,7 @@ static void client_start_read(SpiceWebdavChannel *self, Client *client)
 {
     GInputStream *input;
 
-    input = g_io_stream_get_input_stream(G_IO_STREAM(client->conn));
+    input = g_io_stream_get_input_stream(G_IO_STREAM(client->pipe));
     g_input_stream_read_async(input, client->mux.buf, MAX_MUX_SIZE,
                               G_PRIORITY_DEFAULT, client->cancellable, server_reply_cb,
                               client_ref(client));
@@ -297,43 +296,71 @@ static void client_start_read(SpiceWebdavChannel *self, Client *client)
 
 static void start_demux(SpiceWebdavChannel *self);
 
-static void pushed_client_cb(OutputQueue *q, gpointer user_data)
+static void demux_to_client_finish(SpiceWebdavChannel *self,
+                                   Client *client, gssize size)
 {
-    Client *client = user_data;
-    SpiceWebdavChannel *self = client->self;
     SpiceWebdavChannelPrivate *c = self->priv;
 
+    if (size <= 0) {
+        remove_client(self, client);
+    }
+
     c->demuxing = FALSE;
     start_demux(self);
 }
 
+static void demux_to_client_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    Client *client = user_data;
+    SpiceWebdavChannelPrivate *c = client->self->priv;
+    GError *error = NULL;
+    gssize size;
+
+    size = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error);
+
+    if (error) {
+        CHANNEL_DEBUG(client->self, "write failed: %s", error->message);
+        g_clear_error(&error);
+    }
+
+    g_warn_if_fail(size == c->demux.size);
+    demux_to_client_finish(client->self, client, size);
+}
+
 static void demux_to_client(SpiceWebdavChannel *self,
-                           Client *client)
+                            Client *client)
 {
     SpiceWebdavChannelPrivate *c = self->priv;
     gssize size = c->demux.size;
 
     CHANNEL_DEBUG(self, "pushing %"G_GSSIZE_FORMAT" to client %p", size, client);
 
-    if (size != 0) {
-        output_queue_push(client->output, (guint8 *)c->demux.buf, size,
-                          (GFunc)pushed_client_cb, client);
+    if (size > 0) {
+        g_output_stream_write_async(g_io_stream_get_output_stream(client->pipe),
+                                    c->demux.buf, size, G_PRIORITY_DEFAULT,
+                                    c->cancellable, demux_to_client_cb, client);
+        return;
     } else {
-        remove_client(self, client);
-        c->demuxing = FALSE;
-        start_demux(self);
+        demux_to_client_finish(self, client, size);
     }
 }
 
 static void start_client(SpiceWebdavChannel *self)
 {
+#ifdef USE_PHODAV
     SpiceWebdavChannelPrivate *c = self->priv;
-    GOutputStream *output;
     Client *client;
+    GIOStream *peer = NULL;
+    SpiceSession *session;
+    SoupServer *server;
+    GSocketAddress *addr;
+    GError *error = NULL;
+
+    session = spice_channel_get_session(SPICE_CHANNEL(self));
+    server = phodav_server_get_soup_server(spice_session_get_webdav_server(session));
 
     CHANNEL_DEBUG(self, "starting client %" G_GINT64_FORMAT, c->demux.client);
 
-    /* FIXME: connect to server */
     client = g_new0(Client, 1);
     client->refs = 1;
     client->id = c->demux.client;
@@ -341,15 +368,29 @@ static void start_client(SpiceWebdavChannel *self)
     client->mux.id = GINT64_TO_LE(client->id);
     client->mux.buf = g_malloc0(MAX_MUX_SIZE);
     client->cancellable = g_cancellable_new();
+    spice_make_pipe(&client->pipe, &peer);
 
-    output = g_buffered_output_stream_new(g_io_stream_get_output_stream(G_IO_STREAM(client->conn)));
-    client->output = output_queue_new(output);
-    g_object_unref(output);
+    addr = g_inet_socket_address_new_from_string ("127.0.0.1", 0);
+    if (!soup_server_accept_iostream(server, peer, addr, addr, &error))
+        goto fail;
 
-    client_start_read(self, client);
     g_hash_table_insert(c->clients, &client->id, client);
 
+    client_start_read(self, client);
     demux_to_client(self, client);
+
+    g_clear_object(&addr);
+    return;
+
+fail:
+    if (error)
+        CHANNEL_DEBUG(self, "failed to start client: %s", error->message);
+
+    g_clear_object(&addr);
+    g_clear_object(&peer);
+    g_clear_error(&error);
+    client_unref(client);
+#endif
 }
 
 static void data_read_cb(GObject *source_object,
commit 85ed26847ce631bb21b89bee90d250979b8178bd
Author: Marc-André Lureau <marcandre.lureau at redhat.com>
Date:   Sat Feb 7 02:01:34 2015 +0100

    Use libphodav-2 (breaks webdav server temporarily)
    
    This change breaks webdav server, since libphodav-2 no longer
    set up a TCP service running in a thread. It's up to the client
    to decide how best to accept and handle new connections.
    
    This commits remove all the hacks related to proxying the incoming
    connections to a TCP socket, and protected with a magic sequence.
    
    The following commit will use GIOStream pipes to handle each client
    connections.

diff --git a/configure.ac b/configure.ac
index d98e502..263f38b 100644
--- a/configure.ac
+++ b/configure.ac
@@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
 if test "x$enable_webdav" = "xno"; then
   have_phodav="no"
 else
-  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])
+  PKG_CHECK_MODULES(PHODAV, [libphodav-2.0 glib-2.0 >= 2.43.90 libsoup-2.4 >= 2.49.1], [have_phodav=yes], [have_phodav=no])
   AC_SUBST(PHODAV_CFLAGS)
   AC_SUBST(PHODAV_LIBS)
 
diff --git a/gtk/channel-webdav.c b/gtk/channel-webdav.c
index 75b027b..8cf53cf 100644
--- a/gtk/channel-webdav.c
+++ b/gtk/channel-webdav.c
@@ -25,8 +25,6 @@
 #include "glib-compat.h"
 #include "vmcstream.h"
 
-static PhodavServer* phodav_server_get(SpiceSession *session, gint *port);
-
 /**
  * SECTION:channel-webdav
  * @short_description: exports a directory
@@ -187,7 +185,7 @@ typedef struct Client
 {
     guint refs;
     SpiceWebdavChannel *self;
-    GSocketConnection *conn;
+    GIOStream *conn;
     OutputQueue *output;
     gint64 id;
     GCancellable *cancellable;
@@ -327,102 +325,31 @@ static void demux_to_client(SpiceWebdavChannel *self,
     }
 }
 
-static void magic_written(GObject *source_object,
-                          GAsyncResult *res,
-                          gpointer user_data)
-{
-    Client *client = user_data;
-    SpiceWebdavChannel *self = client->self;
-    SpiceWebdavChannelPrivate *c = self->priv;
-    gssize bytes_written;
-    GError *err = NULL;
-
-    bytes_written = g_output_stream_write_finish(G_OUTPUT_STREAM(source_object),
-                                                 res, &err);
-
-    if (err || bytes_written != WEBDAV_MAGIC_SIZE)
-        goto error;
-
-    client_start_read(self, client);
-    g_hash_table_insert(c->clients, &client->id, client);
-
-    demux_to_client(self, client);
-
-    return;
-
-error:
-    if (err) {
-        g_critical("socket creation failed %s", err->message);
-        g_clear_error(&err);
-    }
-    if (client) {
-        client_unref(client);
-    }
-}
-
-static void client_connected(GObject *source_object,
-                             GAsyncResult *res,
-                             gpointer user_data)
+static void start_client(SpiceWebdavChannel *self)
 {
-    SpiceWebdavChannel *self = user_data;
     SpiceWebdavChannelPrivate *c = self->priv;
-    GSocketClient *sclient = G_SOCKET_CLIENT(source_object);
-    GError *err = NULL;
-    GSocketConnection *conn;
-    SpiceSession *session;
-    Client *client = NULL;
     GOutputStream *output;
+    Client *client;
 
-    session = spice_channel_get_session(SPICE_CHANNEL(self));
-
-    conn = g_socket_client_connect_to_host_finish(sclient, res, &err);
-    g_object_unref(sclient);
-    if (err)
-        goto error;
+    CHANNEL_DEBUG(self, "starting client %" G_GINT64_FORMAT, c->demux.client);
 
+    /* FIXME: connect to server */
     client = g_new0(Client, 1);
     client->refs = 1;
     client->id = c->demux.client;
     client->self = self;
-    client->conn = conn;
     client->mux.id = GINT64_TO_LE(client->id);
     client->mux.buf = g_malloc0(MAX_MUX_SIZE);
     client->cancellable = g_cancellable_new();
 
-    output = g_buffered_output_stream_new(g_io_stream_get_output_stream(G_IO_STREAM(conn)));
+    output = g_buffered_output_stream_new(g_io_stream_get_output_stream(G_IO_STREAM(client->conn)));
     client->output = output_queue_new(output);
     g_object_unref(output);
 
-    g_output_stream_write_async(g_io_stream_get_output_stream(G_IO_STREAM(conn)),
-                                spice_session_get_webdav_magic(session), WEBDAV_MAGIC_SIZE,
-                                G_PRIORITY_DEFAULT, c->cancellable,
-                                magic_written, client);
-    return;
-
-error:
-    if (err) {
-        g_critical("socket creation failed %s", err->message);
-        g_clear_error(&err);
-    }
-    if (client) {
-        client_unref(client);
-    }
-}
-
-static void start_client(SpiceWebdavChannel *self)
-{
-    SpiceWebdavChannelPrivate *c = self->priv;
-    GSocketClient *sclient;
-    gint davport = -1;
-    SpiceSession *session;
-
-    session = spice_channel_get_session(SPICE_CHANNEL(self));
-    phodav_server_get(session, &davport);
-    CHANNEL_DEBUG(self, "starting client %" G_GINT64_FORMAT, c->demux.client);
+    client_start_read(self, client);
+    g_hash_table_insert(c->clients, &client->id, client);
 
-    sclient = g_socket_client_new();
-    g_socket_client_connect_to_host_async(sclient, "localhost", davport,
-                                          c->cancellable, client_connected, self);
+    demux_to_client(self, client);
 }
 
 static void data_read_cb(GObject *source_object,
@@ -639,99 +566,3 @@ static void spice_webdav_handle_msg(SpiceChannel *channel, SpiceMsgIn *msg)
     else
         g_return_if_reached();
 }
-
-
-
-#ifdef USE_PHODAV
-static void new_connection(SoupSocket *sock,
-                           SoupSocket *new,
-                           gpointer    user_data)
-{
-    SpiceSession *session = user_data;
-    SoupAddress *addr;
-    GSocketAddress *gaddr;
-    GInetAddress *iaddr;
-    guint port;
-    guint8 magic[WEBDAV_MAGIC_SIZE];
-    gsize nread;
-    gboolean success = FALSE;
-    SoupSocketIOStatus status;
-
-    /* note: this is sync calls, since webdav server is in a seperate thread */
-    addr = soup_socket_get_remote_address(new);
-    gaddr = soup_address_get_gsockaddr(addr);
-    iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(gaddr));
-    port = g_inet_socket_address_get_port(G_INET_SOCKET_ADDRESS(gaddr));
-
-    SPICE_DEBUG("port %d %p", port, iaddr);
-    if (!g_inet_address_get_is_loopback(iaddr)) {
-        g_warn_if_reached();
-        goto end;
-    }
-
-    g_object_set(new, "non-blocking", FALSE, NULL);
-    status = soup_socket_read(new, magic, sizeof(magic), &nread, NULL, NULL);
-    if (status != SOUP_SOCKET_OK) {
-        g_warning("bad initial socket read: %d", status);
-        goto end;
-    }
-    g_object_set(new, "non-blocking", TRUE, NULL);
-
-    /* check we got the right magic */
-    if (memcmp(spice_session_get_webdav_magic(session), magic, sizeof(magic))) {
-        g_warn_if_reached();
-        goto end;
-    }
-
-    success = TRUE;
-
-end:
-    if (!success) {
-        g_warn_if_reached();
-        soup_socket_disconnect(new);
-        g_signal_stop_emission_by_name(sock, "new_connection");
-    }
-    g_object_unref(gaddr);
-}
-
-G_GNUC_INTERNAL
-PhodavServer* channel_webdav_server_new(SpiceSession *session)
-{
-    PhodavServer *dav;
-    SoupServer *server;
-    SoupSocket *listener;
-    const char *shared_dir;
-
-    shared_dir = spice_session_get_shared_dir(session);
-    if (shared_dir == NULL) {
-        g_debug("No shared dir set, not creating webdav channel");
-        return NULL;
-    }
-
-    dav = phodav_server_new(0, shared_dir);
-
-    server = phodav_server_get_soup_server(dav);
-    listener = soup_server_get_listener(server);
-    spice_g_signal_connect_object(listener, "new_connection",
-                                  G_CALLBACK(new_connection), session,
-                                  0);
-
-    return dav;
-}
-#endif /* USE_PHODAV */
-
-static PhodavServer* phodav_server_get(SpiceSession *session, gint *port)
-{
-    g_return_val_if_fail(SPICE_IS_SESSION(session), NULL);
-
-#ifdef USE_PHODAV
-    PhodavServer *server = spice_session_get_webdav_server(session);
-
-    if (port)
-        *port = phodav_server_get_port(server);
-
-    return server;
-#else
-    g_return_val_if_reached(NULL);
-#endif
-}
diff --git a/gtk/spice-session.c b/gtk/spice-session.c
index 607224b..ebddaea 100644
--- a/gtk/spice-session.c
+++ b/gtk/spice-session.c
@@ -122,7 +122,6 @@ struct _SpiceSessionPrivate {
     SpiceUsbDeviceManager *usb_manager;
     SpicePlaybackChannel *playback_channel;
     PhodavServer      *webdav;
-    guint8             webdav_magic[WEBDAV_MAGIC_SIZE];
 };
 
 
@@ -2620,36 +2619,31 @@ gboolean spice_session_get_smartcard_enabled(SpiceSession *session)
 }
 
 G_GNUC_INTERNAL
-const guint8* spice_session_get_webdav_magic(SpiceSession *session)
-{
-    g_return_val_if_fail(SPICE_IS_SESSION(session), NULL);
-
-    return session->priv->webdav_magic;
-}
-
-G_GNUC_INTERNAL
 PhodavServer* spice_session_get_webdav_server(SpiceSession *session)
 {
+    SpiceSessionPrivate *priv;
+
     g_return_val_if_fail(SPICE_IS_SESSION(session), NULL);
+    priv = session->priv;
 
 #ifdef USE_PHODAV
-    static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
-    int i;
+    static GMutex mutex;
 
-    g_static_mutex_lock(&mutex);
-    if (!session->priv->webdav) {
-        for (i = 0; i < sizeof(session->priv->webdav_magic); i++)
-            session->priv->webdav_magic[i] = g_random_int_range(0, 255);
-
-        session->priv->webdav = channel_webdav_server_new(session);
-        if (session->priv->webdav != NULL) {
-            phodav_server_run(session->priv->webdav);
-        }
+    const gchar *shared_dir = spice_session_get_shared_dir(session);
+    if (shared_dir == NULL) {
+        g_debug("No shared dir set, not creating webdav server");
+        return NULL;
     }
-    g_static_mutex_unlock(&mutex);
+
+    g_mutex_lock(&mutex);
+
+    if (priv->webdav == NULL)
+        priv->webdav = phodav_server_new(shared_dir);
+
+    g_mutex_unlock(&mutex);
 #endif
 
-    return session->priv->webdav;
+    return priv->webdav;
 }
 
 /**
diff --git a/spice-common b/spice-common
index 3aad79d..36e2c50 160000
--- a/spice-common
+++ b/spice-common
@@ -1 +1 @@
-Subproject commit 3aad79d9c64e85bc2b474df427ccbedbf6840591
+Subproject commit 36e2c50f35e5ee15fc5053ab17e10d19810f3fac
commit 4e9c936dc245f8dd08417bd46aef826bad66ed6f
Author: Marc-André Lureau <marcandre.lureau at redhat.com>
Date:   Thu Feb 12 04:09:33 2015 +0100

    Add GIOStream-based pipe
    
    This code creates a pipe between 2 GIOStream, the input side read from
    the peer output side, and vice-versa.
    
    In the following patches, this will avoid the socket communication
    to exchange with the embedded webdav server.
    
    glib-2.0 >= 2.43.90 because GSimpleIOStream dependency.

diff --git a/configure.ac b/configure.ac
index 4e88dec..d98e502 100644
--- a/configure.ac
+++ b/configure.ac
@@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
 if test "x$enable_webdav" = "xno"; then
   have_phodav="no"
 else
-  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no])
+  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])
   AC_SUBST(PHODAV_CFLAGS)
   AC_SUBST(PHODAV_LIBS)
 
@@ -289,6 +289,8 @@ fi
 AS_IF([test "x$have_phodav" = "xyes"],
        AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav]))
 
+AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"])
+
 AC_ARG_WITH([audio],
   AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]),
   [],
diff --git a/gtk/Makefile.am b/gtk/Makefile.am
index 7728fec..ab50c79 100644
--- a/gtk/Makefile.am
+++ b/gtk/Makefile.am
@@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES +=	\
 	$(NULL)
 endif
 
+if WITH_PHODAV
+libspice_client_glib_2_0_la_SOURCES +=	\
+	giopipe.c			\
+	giopipe.h			\
+	$(NULL)
+endif
+
 if WITH_UCONTEXT
 libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c
 endif
diff --git a/gtk/giopipe.c b/gtk/giopipe.c
new file mode 100644
index 0000000..440cae9
--- /dev/null
+++ b/gtk/giopipe.c
@@ -0,0 +1,476 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+  Copyright (C) 2015 Red Hat, Inc.
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string.h>
+#include <errno.h>
+
+#include "giopipe.h"
+
+#define TYPE_PIPE_INPUT_STREAM         (pipe_input_stream_get_type ())
+#define PIPE_INPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
+#define PIPE_INPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+#define IS_PIPE_INPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
+#define IS_PIPE_INPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
+#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+
+typedef struct _PipeInputStreamClass                              PipeInputStreamClass;
+typedef struct _PipeInputStream                                   PipeInputStream;
+typedef struct _PipeOutputStream                                  PipeOutputStream;
+
+struct _PipeInputStream
+{
+    GInputStream parent_instance;
+
+    PipeOutputStream *peer;
+    gssize read;
+
+    /* GIOstream:closed is protected against pending operations, so we
+     * use an additional close flag to cancel those when the peer is
+     * closing.
+     */
+    gboolean peer_closed;
+    GSource *source;
+};
+
+struct _PipeInputStreamClass
+{
+    GInputStreamClass parent_class;
+};
+
+#define TYPE_PIPE_OUTPUT_STREAM         (pipe_output_stream_get_type ())
+#define PIPE_OUTPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
+#define PIPE_OUTPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+#define IS_PIPE_OUTPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
+#define IS_PIPE_OUTPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
+#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+
+typedef struct _PipeOutputStreamClass                             PipeOutputStreamClass;
+
+struct _PipeOutputStream
+{
+    GOutputStream parent_instance;
+
+    PipeInputStream *peer;
+    const gchar *buffer;
+    gsize count;
+    gboolean peer_closed;
+    GSource *source;
+};
+
+struct _PipeOutputStreamClass
+{
+    GOutputStreamClass parent_class;
+};
+
+static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+static void pipe_input_stream_check_source (PipeInputStream *self);
+static void pipe_output_stream_check_source (PipeOutputStream *self);
+
+G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                                pipe_input_stream_pollable_iface_init))
+
+static gssize
+pipe_input_stream_read (GInputStream  *stream,
+                        void          *buffer,
+                        gsize          count,
+                        GCancellable  *cancellable,
+                        GError       **error)
+{
+    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+
+    g_return_val_if_fail(count > 0, -1);
+
+    if (g_input_stream_is_closed (stream) || self->peer_closed) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                             "Stream is already closed");
+        return -1;
+    }
+
+    if (!self->peer->buffer) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                             g_strerror(EAGAIN));
+        return -1;
+    }
+
+    count = MIN(self->peer->count, count);
+    memcpy(buffer, self->peer->buffer, count);
+    self->read = count;
+    self->peer->buffer = NULL;
+
+    //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
+    /* schedule peer source */
+    pipe_output_stream_check_source(self->peer);
+
+    return count;
+}
+
+static void
+pipe_input_stream_check_source (PipeInputStream *self)
+{
+    if (self->source && !g_source_is_destroyed(self->source) &&
+        g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
+        g_source_set_ready_time(self->source, 0);
+}
+
+static gboolean
+pipe_input_stream_close (GInputStream  *stream,
+                         GCancellable   *cancellable,
+                         GError        **error)
+{
+    PipeInputStream *self;
+
+    self = PIPE_INPUT_STREAM(stream);
+
+    if (self->peer) {
+        /* ignore any pending errors */
+        self->peer->peer_closed = TRUE;
+        g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
+        pipe_output_stream_check_source(self->peer);
+    }
+
+    return TRUE;
+}
+
+static void
+pipe_input_stream_close_async (GInputStream       *stream,
+                               int                  io_priority,
+                               GCancellable        *cancellable,
+                               GAsyncReadyCallback  callback,
+                               gpointer             data)
+{
+    GTask *task;
+
+    task = g_task_new (stream, cancellable, callback, data);
+
+    /* will always return TRUE */
+    pipe_input_stream_close (stream, cancellable, NULL);
+
+    g_task_return_boolean (task, TRUE);
+    g_object_unref (task);
+}
+
+static gboolean
+pipe_input_stream_close_finish (GInputStream  *stream,
+                                GAsyncResult   *result,
+                                GError        **error)
+{
+    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+    return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+static void
+pipe_input_stream_init (PipeInputStream *self)
+{
+    self->read = -1;
+}
+
+static void
+pipe_input_stream_dispose(GObject *object)
+{
+    PipeInputStream *self;
+
+    self = PIPE_INPUT_STREAM(object);
+
+    if (self->peer) {
+        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+        self->peer = NULL;
+    }
+
+    if (self->source) {
+        g_source_unref(self->source);
+        self->source = NULL;
+    }
+
+    G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_input_stream_class_init (PipeInputStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+    GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+    istream_class->read_fn  = pipe_input_stream_read;
+    istream_class->close_fn = pipe_input_stream_close;
+    istream_class->close_async  = pipe_input_stream_close_async;
+    istream_class->close_finish = pipe_input_stream_close_finish;
+
+    gobject_class->dispose = pipe_input_stream_dispose;
+}
+
+static gboolean
+pipe_input_stream_is_readable (GPollableInputStream *stream)
+{
+    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+    gboolean readable;
+
+    readable = (self->peer && self->peer->buffer && self->read == -1) || self->peer_closed;
+    //g_debug("readable %p %d", self->peer, readable);
+
+    return readable;
+}
+
+static GSource *
+pipe_input_stream_create_source (GPollableInputStream *stream,
+                                 GCancellable         *cancellable)
+{
+    PipeInputStream *self = PIPE_INPUT_STREAM(stream);
+    GSource *pollable_source;
+
+    g_return_val_if_fail (self->source == NULL ||
+                          g_source_is_destroyed (self->source), NULL);
+
+    if (self->source && g_source_is_destroyed (self->source))
+        g_source_unref (self->source);
+
+    pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
+    self->source = g_source_ref (pollable_source);
+
+    return pollable_source;
+}
+
+static void
+pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+    iface->is_readable   = pipe_input_stream_is_readable;
+    iface->create_source = pipe_input_stream_create_source;
+}
+
+static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+                                                pipe_output_stream_pollable_iface_init))
+
+static gssize
+pipe_output_stream_write (GOutputStream  *stream,
+                          const void     *buffer,
+                          gsize           count,
+                          GCancellable   *cancellable,
+                          GError        **error)
+{
+    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+    PipeInputStream *peer = self->peer;
+
+    //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
+    if (g_output_stream_is_closed (stream) || self->peer_closed) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                             "Stream is already closed");
+        return -1;
+    }
+
+    /* this abuses pollable stream, writing sync would likely lead to
+       crashes, since the buffer pointer would become invalid, a
+       generic solution would need a copy..
+    */
+    g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
+    self->buffer = buffer;
+    self->count = count;
+
+    pipe_input_stream_check_source(self->peer);
+
+    if (peer->read < 0) {
+        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                             g_strerror (EAGAIN));
+        return -1;
+    }
+
+    g_assert(peer->read <= self->count);
+    count = peer->read;
+
+    self->buffer = NULL;
+    self->count = 0;
+    peer->read = -1;
+
+    return count;
+}
+
+static void
+pipe_output_stream_init (PipeOutputStream *stream)
+{
+}
+
+static void
+pipe_output_stream_dispose(GObject *object)
+{
+    PipeOutputStream *self;
+
+    self = PIPE_OUTPUT_STREAM(object);
+
+    if (self->peer) {
+        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+        self->peer = NULL;
+    }
+
+    if (self->source) {
+        g_source_unref(self->source);
+        self->source = NULL;
+    }
+
+    G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_output_stream_check_source (PipeOutputStream *self)
+{
+    if (self->source && !g_source_is_destroyed(self->source) &&
+        g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
+        g_source_set_ready_time(self->source, 0);
+}
+
+static gboolean
+pipe_output_stream_close (GOutputStream  *stream,
+                          GCancellable   *cancellable,
+                          GError        **error)
+{
+    PipeOutputStream *self;
+
+    self = PIPE_OUTPUT_STREAM(stream);
+
+    if (self->peer) {
+        /* ignore any pending errors */
+        self->peer->peer_closed = TRUE;
+        g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
+        pipe_input_stream_check_source(self->peer);
+    }
+
+    return TRUE;
+}
+
+static void
+pipe_output_stream_close_async (GOutputStream       *stream,
+                                int                  io_priority,
+                                GCancellable        *cancellable,
+                                GAsyncReadyCallback  callback,
+                                gpointer             data)
+{
+    GTask *task;
+
+    task = g_task_new (stream, cancellable, callback, data);
+
+    /* will always return TRUE */
+    pipe_output_stream_close (stream, cancellable, NULL);
+
+    g_task_return_boolean (task, TRUE);
+    g_object_unref (task);
+}
+
+static gboolean
+pipe_output_stream_close_finish (GOutputStream  *stream,
+                                 GAsyncResult   *result,
+                                 GError        **error)
+{
+    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+    return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+
+static void
+pipe_output_stream_class_init (PipeOutputStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+    GOutputStreamClass *ostream_class = G_OUTPUT_STREAM_CLASS (klass);
+
+    ostream_class->write_fn = pipe_output_stream_write;
+    ostream_class->close_fn = pipe_output_stream_close;
+    ostream_class->close_async  = pipe_output_stream_close_async;
+    ostream_class->close_finish = pipe_output_stream_close_finish;
+
+    gobject_class->dispose = pipe_output_stream_dispose;
+}
+
+static gboolean
+pipe_output_stream_is_writable (GPollableOutputStream *stream)
+{
+    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+    gboolean writable;
+
+    writable = self->buffer == NULL || self->peer->read >= 0;
+    //g_debug("writable %p %d", self, writable);
+
+    return writable;
+}
+
+static GSource *
+pipe_output_stream_create_source (GPollableOutputStream *stream,
+                                  GCancellable          *cancellable)
+{
+    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+    GSource *pollable_source;
+
+    g_return_val_if_fail (self->source == NULL ||
+                          g_source_is_destroyed (self->source), NULL);
+
+    if (self->source && g_source_is_destroyed (self->source))
+        g_source_unref (self->source);
+
+    pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
+    self->source = g_source_ref (pollable_source);
+
+    return pollable_source;
+}
+
+static void
+pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+    iface->is_writable = pipe_output_stream_is_writable;
+    iface->create_source = pipe_output_stream_create_source;
+}
+
+G_GNUC_INTERNAL void
+make_gio_pipe(GInputStream **input, GOutputStream **output)
+{
+    PipeInputStream *in;
+    PipeOutputStream *out;
+
+    g_return_if_fail(input != NULL && *input == NULL);
+    g_return_if_fail(output != NULL && *output == NULL);
+
+    in = g_object_new(TYPE_PIPE_INPUT_STREAM, NULL);
+    out = g_object_new(TYPE_PIPE_OUTPUT_STREAM, NULL);
+
+    out->peer = in;
+    g_object_add_weak_pointer(G_OBJECT(in), (gpointer*)&out->peer);
+
+    in->peer = out;
+    g_object_add_weak_pointer(G_OBJECT(out), (gpointer*)&in->peer);
+
+    *input = G_INPUT_STREAM(in);
+    *output = G_OUTPUT_STREAM(out);
+}
+
+G_GNUC_INTERNAL void
+spice_make_pipe(GIOStream **p1, GIOStream **p2)
+{
+    GInputStream *in1 = NULL, *in2 = NULL;
+    GOutputStream *out1 = NULL, *out2 = NULL;
+
+    g_return_if_fail(p1 != NULL);
+    g_return_if_fail(p2 != NULL);
+    g_return_if_fail(*p1 == NULL);
+    g_return_if_fail(*p2 == NULL);
+
+    make_gio_pipe(&in1, &out2);
+    make_gio_pipe(&in2, &out1);
+
+    *p1 = g_simple_io_stream_new(in1, out1);
+    *p2 = g_simple_io_stream_new(in2, out2);
+}
diff --git a/gtk/giopipe.h b/gtk/giopipe.h
new file mode 100644
index 0000000..46c2c9c
--- /dev/null
+++ b/gtk/giopipe.h
@@ -0,0 +1,29 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+  Copyright (C) 2015 Red Hat, Inc.
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+#ifndef __SPICE_GIO_PIPE_H__
+#define __SPICE_GIO_PIPE_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+void spice_make_pipe(GIOStream **p1, GIOStream **p2);
+
+G_END_DECLS
+
+#endif /* __SPICE_GIO_PIPE_H__ */
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 57b06db..10a7722 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -6,6 +6,10 @@ noinst_PROGRAMS =				\
 	session					\
 	$(NULL)
 
+if WITH_PHODAV
+noinst_PROGRAMS += pipe
+endif
+
 TESTS = $(noinst_PROGRAMS)
 
 AM_CPPFLAGS =					\
@@ -22,5 +26,7 @@ LDADD =							\
 util_SOURCES = util.c
 coroutine_SOURCES = coroutine.c
 session_SOURCES = session.c
+pipe_SOURCES = pipe.c
+
 
 -include $(top_srcdir)/git.mk
diff --git a/tests/pipe.c b/tests/pipe.c
new file mode 100644
index 0000000..841cb77
--- /dev/null
+++ b/tests/pipe.c
@@ -0,0 +1,313 @@
+#include <glib.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <locale.h>
+
+#include "giopipe.h"
+
+typedef struct _Fixture {
+    GIOStream *p1;
+    GIOStream *p2;
+
+    GInputStream *ip1;
+    GOutputStream *op1;
+    GInputStream *ip2;
+    GOutputStream *op2;
+
+    gchar buf[16];
+
+    GMainLoop *loop;
+    GCancellable *cancellable;
+    guint timeout;
+} Fixture;
+
+static gboolean
+stop_loop (gpointer data)
+{
+    GMainLoop *loop = data;
+
+    g_main_loop_quit (loop);
+    g_assert_not_reached();
+
+    return G_SOURCE_REMOVE;
+}
+
+static void
+fixture_set_up(Fixture *fixture,
+               gconstpointer user_data)
+{
+    int i;
+
+    spice_make_pipe(&fixture->p1, &fixture->p2);
+    g_assert_true(G_IS_IO_STREAM(fixture->p1));
+    g_assert_true(G_IS_IO_STREAM(fixture->p2));
+
+    fixture->op1 = g_io_stream_get_output_stream(fixture->p1);
+    g_assert_true(G_IS_OUTPUT_STREAM(fixture->op1));
+    fixture->ip1 = g_io_stream_get_input_stream(fixture->p1);
+    g_assert_true(G_IS_INPUT_STREAM(fixture->ip1));
+    fixture->op2 = g_io_stream_get_output_stream(fixture->p2);
+    g_assert_true(G_IS_OUTPUT_STREAM(fixture->op2));
+    fixture->ip2 = g_io_stream_get_input_stream(fixture->p2);
+    g_assert_true(G_IS_INPUT_STREAM(fixture->ip2));
+
+    for (i = 0; i < sizeof(fixture->buf); i++) {
+        fixture->buf[i] = 0x42 + i;
+    }
+
+    fixture->cancellable = g_cancellable_new();
+    fixture->loop = g_main_loop_new (NULL, FALSE);
+    fixture->timeout = g_timeout_add (1000, stop_loop, fixture->loop);
+}
+
+static void
+fixture_tear_down(Fixture *fixture,
+                  gconstpointer user_data)
+{
+    g_clear_object(&fixture->p1);
+    g_clear_object(&fixture->p2);
+
+    g_clear_object(&fixture->cancellable);
+    g_source_remove(fixture->timeout);
+    g_main_loop_unref(fixture->loop);
+}
+
+static void
+test_pipe_readblock(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+    gssize size;
+
+    size = g_input_stream_read(f->ip2, f->buf, 1,
+                               f->cancellable, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+
+    g_clear_error(&error);
+}
+
+static void
+test_pipe_writeblock(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+    gssize size;
+
+    size = g_output_stream_write(f->op1, "", 1,
+                                 f->cancellable, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+
+    g_clear_error(&error);
+}
+
+static void
+write_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    GMainLoop *loop = user_data;
+    gssize nbytes;
+
+    nbytes = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error);
+
+    g_assert_no_error(error);
+    g_assert_cmpint(nbytes, >, 0);
+    g_clear_error(&error);
+
+    g_main_loop_quit (loop);
+}
+
+static void
+read_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes, expected = GPOINTER_TO_INT(user_data);
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_cmpint(nbytes, ==, expected);
+    g_assert_no_error(error);
+    g_clear_error(&error);
+}
+
+static void
+test_pipe_writeread(Fixture *f, gconstpointer user_data)
+{
+    g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_cb, GINT_TO_POINTER(1));
+
+    g_main_loop_run (f->loop);
+
+    g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_cb, GINT_TO_POINTER(1));
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+test_pipe_readwrite(Fixture *f, gconstpointer user_data)
+{
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, read_cb, GINT_TO_POINTER(1));
+    g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+read8_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes;
+    GMainLoop *loop = user_data;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_cmpint(nbytes, ==, 8);
+    g_assert_no_error(error);
+    g_clear_error(&error);
+}
+
+static void
+test_pipe_write16read8(Fixture *f, gconstpointer user_data)
+{
+    g_output_stream_write_async(f->op1, "0123456789abcdef", 16, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 8, G_PRIORITY_DEFAULT,
+                              f->cancellable, read8_cb, GINT_TO_POINTER(8));
+
+    g_main_loop_run (f->loop);
+
+    /* check next read would block */
+    test_pipe_readblock(f, user_data);
+}
+
+static void
+test_pipe_write8read16(Fixture *f, gconstpointer user_data)
+{
+    g_output_stream_write_async(f->op1, "01234567", 8, G_PRIORITY_DEFAULT,
+                                f->cancellable, write_cb, f->loop);
+    g_input_stream_read_async(f->ip2, f->buf, 16, G_PRIORITY_DEFAULT,
+                              f->cancellable, read8_cb, GINT_TO_POINTER(8));
+
+    g_main_loop_run (f->loop);
+
+    /* check next read would block */
+    test_pipe_writeblock(f, user_data);
+}
+
+static void
+readclose_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes;
+    GMainLoop *loop = user_data;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+    g_clear_error(&error);
+
+    g_main_loop_quit (loop);
+}
+
+static void
+test_pipe_readclosestream(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, readclose_cb, f->loop);
+    g_io_stream_close(f->p1, f->cancellable, &error);
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+test_pipe_readclose(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, readclose_cb, f->loop);
+    g_output_stream_close(f->op1, f->cancellable, &error);
+
+    g_main_loop_run (f->loop);
+}
+
+static void
+readcancel_cb(GObject *source, GAsyncResult *result, gpointer user_data)
+{
+    GError *error = NULL;
+    gssize nbytes;
+    GMainLoop *loop = user_data;
+
+    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error);
+
+    g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+    g_clear_error(&error);
+
+    g_main_loop_quit (loop);
+}
+
+static void
+test_pipe_readcancel(Fixture *f, gconstpointer user_data)
+{
+    GError *error = NULL;
+
+    g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT,
+                              f->cancellable, readcancel_cb, f->loop);
+    g_output_stream_close(f->op1, f->cancellable, &error);
+
+    g_main_loop_run (f->loop);
+}
+
+int main(int argc, char* argv[])
+{
+    setlocale(LC_ALL, "");
+
+    g_test_init(&argc, &argv, NULL);
+
+    g_test_add("/pipe/readblock", Fixture, NULL,
+               fixture_set_up, test_pipe_readblock,
+               fixture_tear_down);
+
+    g_test_add("/pipe/writeblock", Fixture, NULL,
+               fixture_set_up, test_pipe_writeblock,
+               fixture_tear_down);
+
+    g_test_add("/pipe/writeread", Fixture, NULL,
+               fixture_set_up, test_pipe_writeread,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readwrite", Fixture, NULL,
+               fixture_set_up, test_pipe_readwrite,
+               fixture_tear_down);
+
+    g_test_add("/pipe/write16read8", Fixture, NULL,
+               fixture_set_up, test_pipe_write16read8,
+               fixture_tear_down);
+
+    g_test_add("/pipe/write8read16", Fixture, NULL,
+               fixture_set_up, test_pipe_write8read16,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readclosestream", Fixture, NULL,
+               fixture_set_up, test_pipe_readclosestream,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readclose", Fixture, NULL,
+               fixture_set_up, test_pipe_readclose,
+               fixture_tear_down);
+
+    g_test_add("/pipe/readcancel", Fixture, NULL,
+               fixture_set_up, test_pipe_readcancel,
+               fixture_tear_down);
+
+    return g_test_run();
+}


More information about the Spice-commits mailing list