[Spice-devel] [PATCH spice-gtk 1/5] Add GIOStream-based pipe

Christophe Fergeau cfergeau at redhat.com
Mon Feb 23 04:28:33 PST 2015


On Sat, Feb 21, 2015 at 01:40:12AM +0100, Marc-André Lureau wrote:
> This allows to create a pipe between 2 GIOStream, the input side read
> from the peer output side, and vice-versa.
> 
> In the following patches, this will avoid the socket communication
> to exchange with the embedded webdav server.
> ---
>  configure.ac      |   4 +-
>  gtk/Makefile.am   |   7 +
>  gtk/giopipe.c     | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  gtk/giopipe.h     |  29 ++++
>  tests/Makefile.am |   5 +
>  tests/pipe.c      | 313 ++++++++++++++++++++++++++++++++++++
>  6 files changed, 829 insertions(+), 1 deletion(-)
>  create mode 100644 gtk/giopipe.c
>  create mode 100644 gtk/giopipe.h
>  create mode 100644 tests/pipe.c
> 
> diff --git a/configure.ac b/configure.ac
> index 4e88dec..d98e502 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
>  if test "x$enable_webdav" = "xno"; then
>    have_phodav="no"
>  else
> -  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no])
> +  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])

This glib requirement comes from the use of GSimpleIOStream, could be
worth explicitly mentioning it in the commit log.

>    AC_SUBST(PHODAV_CFLAGS)
>    AC_SUBST(PHODAV_LIBS)
>  
> @@ -289,6 +289,8 @@ fi
>  AS_IF([test "x$have_phodav" = "xyes"],
>         AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav]))
>  
> +AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"])
> +
>  AC_ARG_WITH([audio],
>    AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]),
>    [],
> diff --git a/gtk/Makefile.am b/gtk/Makefile.am
> index 7728fec..ab50c79 100644
> --- a/gtk/Makefile.am
> +++ b/gtk/Makefile.am
> @@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES +=	\
>  	$(NULL)
>  endif
>  
> +if WITH_PHODAV
> +libspice_client_glib_2_0_la_SOURCES +=	\
> +	giopipe.c			\
> +	giopipe.h			\

Has this GioPipe been proposed upstream?

> +	$(NULL)
> +endif
> +
>  if WITH_UCONTEXT
>  libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c
>  endif
> diff --git a/gtk/giopipe.c b/gtk/giopipe.c
> new file mode 100644
> index 0000000..45007c4
> --- /dev/null
> +++ b/gtk/giopipe.c
> @@ -0,0 +1,472 @@
> +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
> +/*
> +  Copyright (C) 2015 Red Hat, Inc.
> +
> +  This library is free software; you can redistribute it and/or
> +  modify it under the terms of the GNU Lesser General Public
> +  License as published by the Free Software Foundation; either
> +  version 2.1 of the License, or (at your option) any later version.
> +
> +  This library is distributed in the hope that it will be useful,
> +  but WITHOUT ANY WARRANTY; without even the implied warranty of
> +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +  Lesser General Public License for more details.
> +
> +  You should have received a copy of the GNU Lesser General Public
> +  License along with this library; if not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#include <string.h>
> +#include <errno.h>
> +
> +#include "giopipe.h"
> +
> +#define TYPE_PIPE_INPUT_STREAM         (pipe_input_stream_get_type ())
> +#define PIPE_INPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
> +#define PIPE_INPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
> +#define IS_PIPE_INPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
> +#define IS_PIPE_INPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
> +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
> +
> +typedef struct _PipeInputStreamClass                              PipeInputStreamClass;
> +typedef struct _PipeInputStream                                   PipeInputStream;
> +typedef struct _PipeOutputStream                                  PipeOutputStream;
> +
> +struct _PipeInputStream
> +{
> +    GInputStream parent_instance;
> +
> +    PipeOutputStream *peer;
> +    gssize read;
> +
> +    /* GIOstream:closed is protected against pending operations, so we
> +     * use an additional close flag to cancel those when the peer is
> +     * closing.
> +     */
> +    gboolean closed;
> +    GSource *source;
> +};
> +
> +struct _PipeInputStreamClass
> +{
> +    GInputStreamClass parent_class;
> +};
> +
> +#define TYPE_PIPE_OUTPUT_STREAM         (pipe_output_stream_get_type ())
> +#define PIPE_OUTPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
> +#define PIPE_OUTPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
> +#define IS_PIPE_OUTPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
> +#define IS_PIPE_OUTPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
> +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
> +
> +typedef struct _PipeOutputStreamClass                             PipeOutputStreamClass;
> +
> +struct _PipeOutputStream
> +{
> +    GOutputStream parent_instance;
> +
> +    PipeInputStream *peer;
> +    const gchar *buffer;
> +    gsize count;
> +    gboolean closed;
> +    GSource *source;
> +};
> +
> +struct _PipeOutputStreamClass
> +{
> +    GOutputStreamClass parent_class;
> +};
> +
> +struct _SpicePipeStreamPrivate {
> +    PipeInputStream *input_stream;
> +    PipeOutputStream *output_stream;
> +};

This struct does not seem to be used.

> +
> +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
> +static void pipe_input_stream_check_source (PipeInputStream *self);
> +static void pipe_output_stream_check_source (PipeOutputStream *self);
> +
> +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
> +                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
> +                                                pipe_input_stream_pollable_iface_init))
> +
> +static gssize
> +pipe_input_stream_read (GInputStream  *stream,
> +                        void          *buffer,
> +                        gsize          count,
> +                        GCancellable  *cancellable,
> +                        GError       **error)
> +{
> +    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
> +
> +    g_return_val_if_fail(count > 0, -1);
> +
> +    if (g_input_stream_is_closed (stream) || self->closed) {

It's not clear why you need this 'closed' variable, it seems to always
be set together with calls to g_xxxx_stream_close(). Is that in case
some of the cleanup done in g_xxxx_stream_close() fails?

> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
> +                             "Stream is already closed");
> +        return -1;
> +    }
> +
> +    if (!self->peer->buffer) {
> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
> +                             g_strerror(EAGAIN));
> +        return -1;
> +    }
> +
> +    g_return_val_if_fail(self->peer->buffer, -1);

You just checked that self->peer->buffer is not NULL and returned an
error if it is.

> +
> +    count = MIN(self->peer->count, count);
> +    memcpy(buffer, self->peer->buffer, count);
> +    self->read = count;
> +    self->peer->buffer = NULL;
> +
> +    //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
> +    pipe_output_stream_check_source(self->peer);
> +
> +    return count;
> +}
> +
> +static void
> +pipe_input_stream_check_source (PipeInputStream *self)
> +{
> +    if (self->source && !g_source_is_destroyed(self->source) &&
> +        g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
> +        g_source_set_ready_time(self->source, 0);

g_source_set_ready_time API doc says "This API is only intended to be
used by implementations of GSource. Do not call this API on a GSource
that you did not create."

> +}
> +
> +static gboolean
> +pipe_input_stream_close (GInputStream  *stream,
> +                         GCancellable   *cancellable,
> +                         GError        **error)
> +{
> +    PipeInputStream *self;
> +
> +    self = PIPE_INPUT_STREAM(stream);
> +
> +    if (self->peer) {
> +        /* ignore any pending errors */
> +        self->peer->closed = TRUE;
> +        g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
> +        pipe_output_stream_check_source(self->peer);
> +    }

Why not set self->priv->closed as well?

> +
> +    return TRUE;
> +}
> +
> +static void
> +pipe_input_stream_close_async (GInputStream       *stream,
> +                               int                  io_priority,
> +                               GCancellable        *cancellable,
> +                               GAsyncReadyCallback  callback,
> +                               gpointer             data)
> +{
> +    GTask *task;
> +
> +    task = g_task_new (stream, cancellable, callback, data);
> +
> +    /* will always return TRUE */
> +    pipe_input_stream_close (stream, cancellable, NULL);
> +
> +    g_task_return_boolean (task, TRUE);
> +    g_object_unref (task);
> +}

g_input_stream_close_async() API doc says "The asyncronous methods have
a default fallback that uses threads to implement asynchronicity, so
they are optional for inheriting classes. However, if you override one
you must override all." I guess we should do without that one as this is
the only async operation you overrode.

> +
> +static gboolean
> +pipe_input_stream_close_finish (GInputStream  *stream,
> +                                GAsyncResult   *result,
> +                                GError        **error)
> +{
> +    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
> +
> +    return g_task_propagate_boolean (G_TASK (result), error);
> +}
> +
> +static void
> +pipe_input_stream_init (PipeInputStream *self)
> +{
> +    self->read = -1;
> +}
> +
> +static void
> +pipe_input_stream_dispose(GObject *object)
> +{
> +    PipeInputStream *self;
> +
> +    self = PIPE_INPUT_STREAM(object);
> +
> +    if (self->peer) {
> +        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
> +        self->peer = NULL;
> +    }
> +

No cleanup of the 'source' private member?

> +    G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
> +}
> +
> +static void
> +pipe_input_stream_class_init (PipeInputStreamClass *klass)
> +{
> +    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
> +    GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
> +
> +    istream_class->read_fn  = pipe_input_stream_read;
> +    istream_class->close_fn = pipe_input_stream_close;
> +    istream_class->close_async  = pipe_input_stream_close_async;
> +    istream_class->close_finish = pipe_input_stream_close_finish;
> +
> +    gobject_class->dispose = pipe_input_stream_dispose;
> +}
> +
> +static gboolean
> +pipe_input_stream_is_readable (GPollableInputStream *stream)
> +{
> +    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
> +    gboolean readable;
> +
> +    readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed;
> +    //g_debug("readable %p %d", self->peer, readable);
> +
> +    return readable;
> +}
> +
> +static GSource *
> +pipe_input_stream_create_source (GPollableInputStream *stream,
> +                                 GCancellable         *cancellable)
> +{
> +    PipeInputStream *self = PIPE_INPUT_STREAM(stream);
> +    GSource *pollable_source;
> +
> +    g_return_val_if_fail (self->source == NULL ||
> +                          g_source_is_destroyed (self->source), NULL);
> +
> +    if (self->source && g_source_is_destroyed (self->source))
> +        g_source_unref (self->source);
> +
> +    pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
> +    self->source = g_source_ref (pollable_source);
> +
> +    return pollable_source;
> +}
> +
> +static void
> +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
> +{
> +    iface->is_readable   = pipe_input_stream_is_readable;
> +    iface->create_source = pipe_input_stream_create_source;
> +}
> +
> +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
> +
> +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
> +                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
> +                                                pipe_output_stream_pollable_iface_init))
> +
> +static gssize
> +pipe_output_stream_write (GOutputStream  *stream,
> +                          const void     *buffer,
> +                          gsize           count,
> +                          GCancellable   *cancellable,
> +                          GError        **error)
> +{
> +    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
> +    PipeInputStream *peer = self->peer;
> +
> +    //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
> +    if (g_output_stream_is_closed (stream) || self->closed) {
> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
> +                             "Stream is already closed");
> +        return -1;
> +    }
> +
> +    /* this abuses pollable stream, writing sync would likely lead to
> +       crashes, since the buffer pointer would become invalid, a
> +       generic solution would need a copy..
> +    */
> +    g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
> +    self->buffer = buffer;
> +    self->count = count;
> +
> +    pipe_input_stream_check_source(self->peer);

This call will trigger a sync call to pipe_input_stream_read() if I
followed everything properly? A comment mentioning that might make the
code easier to follow.

> +
> +    if (peer->read < 0) {
> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
> +                             g_strerror (EAGAIN));
> +        return -1;
> +    }
> +
> +    g_assert(peer->read <= self->count);
> +    count = peer->read;


My understanding is that here you don't want to assume that self->count
== peer->read...
> +
> +    self->buffer = NULL;
> +    self->count = 0;

... but here you make the assumption that everything was consumed by the
input stream

> +    peer->read = -1;
> +
> +    return count;
> +}
> +
> +static void
> +pipe_output_stream_init (PipeOutputStream *stream)
> +{
> +}
> +
> +static void
> +pipe_output_stream_dispose(GObject *object)
> +{
> +    PipeOutputStream *self;
> +
> +    self = PIPE_OUTPUT_STREAM(object);
> +
> +    if (self->peer) {
> +        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
> +        self->peer = NULL;
> +    }
> +

Same question about the 'source' private member.

> +    G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
> +}
> +
> +static void
> +pipe_output_stream_check_source (PipeOutputStream *self)
> +{
> +    if (self->source && !g_source_is_destroyed(self->source) &&
> +        g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
> +        g_source_set_ready_time(self->source, 0);


Same comment about g_source_set_ready_time.

> +}
> +
> +static gboolean
> +pipe_output_stream_close (GOutputStream  *stream,
> +                          GCancellable   *cancellable,
> +                          GError        **error)
> +{
> +    PipeOutputStream *self;
> +
> +    self = PIPE_OUTPUT_STREAM(stream);
> +
> +    if (self->peer) {
> +        /* ignore any pending errors */
> +        self->peer->closed = TRUE;
> +        g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
> +        pipe_input_stream_check_source(self->peer);
> +    }
> +
> +    return TRUE;
> +}
> +
> +static void
> +pipe_output_stream_close_async (GOutputStream       *stream,
> +                                int                  io_priority,
> +                                GCancellable        *cancellable,
> +                                GAsyncReadyCallback  callback,
> +                                gpointer             data)
> +{
> +    GTask *task;
> +
> +    task = g_task_new (stream, cancellable, callback, data);
> +
> +    /* will always return TRUE */
> +    pipe_output_stream_close (stream, cancellable, NULL);
> +
> +    g_task_return_boolean (task, TRUE);
> +    g_object_unref (task);
> +}

Same comment as before about async methods


Christophe
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 819 bytes
Desc: not available
URL: <http://lists.freedesktop.org/archives/spice-devel/attachments/20150223/24b11c05/attachment.sig>


More information about the Spice-devel mailing list