[Spice-devel] [PATCH spice-gtk 1/5] Add GIOStream-based pipe

Marc-André Lureau marcandre.lureau at gmail.com
Mon Feb 23 05:22:42 PST 2015


On Mon, Feb 23, 2015 at 1:28 PM, Christophe Fergeau <cfergeau at redhat.com> wrote:
> On Sat, Feb 21, 2015 at 01:40:12AM +0100, Marc-André Lureau wrote:
>> This allows to create a pipe between 2 GIOStream, the input side read
>> from the peer output side, and vice-versa.
>>
>> In the following patches, this will avoid the socket communication
>> to exchange with the embedded webdav server.
>> ---
>>  configure.ac      |   4 +-
>>  gtk/Makefile.am   |   7 +
>>  gtk/giopipe.c     | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  gtk/giopipe.h     |  29 ++++
>>  tests/Makefile.am |   5 +
>>  tests/pipe.c      | 313 ++++++++++++++++++++++++++++++++++++
>>  6 files changed, 829 insertions(+), 1 deletion(-)
>>  create mode 100644 gtk/giopipe.c
>>  create mode 100644 gtk/giopipe.h
>>  create mode 100644 tests/pipe.c
>>
>> diff --git a/configure.ac b/configure.ac
>> index 4e88dec..d98e502 100644
>> --- a/configure.ac
>> +++ b/configure.ac
>> @@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
>>  if test "x$enable_webdav" = "xno"; then
>>    have_phodav="no"
>>  else
>> -  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no])
>> +  PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])
>
> This glib requirement comes from the use of GSimpleIOStream, could be
> worth explicitly mentioning it in the commit log.

I don't see much need for this detail, there might be other API from
newer glib used, but ok.

>
>>    AC_SUBST(PHODAV_CFLAGS)
>>    AC_SUBST(PHODAV_LIBS)
>>
>> @@ -289,6 +289,8 @@ fi
>>  AS_IF([test "x$have_phodav" = "xyes"],
>>         AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav]))
>>
>> +AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"])
>> +
>>  AC_ARG_WITH([audio],
>>    AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]),
>>    [],
>> diff --git a/gtk/Makefile.am b/gtk/Makefile.am
>> index 7728fec..ab50c79 100644
>> --- a/gtk/Makefile.am
>> +++ b/gtk/Makefile.am
>> @@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES +=   \
>>       $(NULL)
>>  endif
>>
>> +if WITH_PHODAV
>> +libspice_client_glib_2_0_la_SOURCES +=       \
>> +     giopipe.c                       \
>> +     giopipe.h                       \
>
> Has this GioPipe been proposed upstream?

No, it's far from being acceptable by upstream since it's only
handling async. Furthermore, I think upstream is reluctant to adding
more GIO stream code for some reason (I think too much burden
already).

>
>> +     $(NULL)
>> +endif
>> +
>>  if WITH_UCONTEXT
>>  libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c
>>  endif
>> diff --git a/gtk/giopipe.c b/gtk/giopipe.c
>> new file mode 100644
>> index 0000000..45007c4
>> --- /dev/null
>> +++ b/gtk/giopipe.c
>> @@ -0,0 +1,472 @@
>> +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
>> +/*
>> +  Copyright (C) 2015 Red Hat, Inc.
>> +
>> +  This library is free software; you can redistribute it and/or
>> +  modify it under the terms of the GNU Lesser General Public
>> +  License as published by the Free Software Foundation; either
>> +  version 2.1 of the License, or (at your option) any later version.
>> +
>> +  This library is distributed in the hope that it will be useful,
>> +  but WITHOUT ANY WARRANTY; without even the implied warranty of
>> +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>> +  Lesser General Public License for more details.
>> +
>> +  You should have received a copy of the GNU Lesser General Public
>> +  License along with this library; if not, see <http://www.gnu.org/licenses/>.
>> +*/
>> +
>> +#include <string.h>
>> +#include <errno.h>
>> +
>> +#include "giopipe.h"
>> +
>> +#define TYPE_PIPE_INPUT_STREAM         (pipe_input_stream_get_type ())
>> +#define PIPE_INPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
>> +#define PIPE_INPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
>> +#define IS_PIPE_INPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
>> +#define IS_PIPE_INPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
>> +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
>> +
>> +typedef struct _PipeInputStreamClass                              PipeInputStreamClass;
>> +typedef struct _PipeInputStream                                   PipeInputStream;
>> +typedef struct _PipeOutputStream                                  PipeOutputStream;
>> +
>> +struct _PipeInputStream
>> +{
>> +    GInputStream parent_instance;
>> +
>> +    PipeOutputStream *peer;
>> +    gssize read;
>> +
>> +    /* GIOstream:closed is protected against pending operations, so we
>> +     * use an additional close flag to cancel those when the peer is
>> +     * closing.
>> +     */
>> +    gboolean closed;
>> +    GSource *source;
>> +};
>> +
>> +struct _PipeInputStreamClass
>> +{
>> +    GInputStreamClass parent_class;
>> +};
>> +
>> +#define TYPE_PIPE_OUTPUT_STREAM         (pipe_output_stream_get_type ())
>> +#define PIPE_OUTPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
>> +#define PIPE_OUTPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
>> +#define IS_PIPE_OUTPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
>> +#define IS_PIPE_OUTPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
>> +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
>> +
>> +typedef struct _PipeOutputStreamClass                             PipeOutputStreamClass;
>> +
>> +struct _PipeOutputStream
>> +{
>> +    GOutputStream parent_instance;
>> +
>> +    PipeInputStream *peer;
>> +    const gchar *buffer;
>> +    gsize count;
>> +    gboolean closed;
>> +    GSource *source;
>> +};
>> +
>> +struct _PipeOutputStreamClass
>> +{
>> +    GOutputStreamClass parent_class;
>> +};
>> +
>> +struct _SpicePipeStreamPrivate {
>> +    PipeInputStream *input_stream;
>> +    PipeOutputStream *output_stream;
>> +};
>
> This struct does not seem to be used.

right, it's a left over from previous version

>
>> +
>> +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
>> +static void pipe_input_stream_check_source (PipeInputStream *self);
>> +static void pipe_output_stream_check_source (PipeOutputStream *self);
>> +
>> +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
>> +                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
>> +                                                pipe_input_stream_pollable_iface_init))
>> +
>> +static gssize
>> +pipe_input_stream_read (GInputStream  *stream,
>> +                        void          *buffer,
>> +                        gsize          count,
>> +                        GCancellable  *cancellable,
>> +                        GError       **error)
>> +{
>> +    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
>> +
>> +    g_return_val_if_fail(count > 0, -1);
>> +
>> +    if (g_input_stream_is_closed (stream) || self->closed) {
>
> It's not clear why you need this 'closed' variable, it seems to always
> be set together with calls to g_xxxx_stream_close(). Is that in case
> some of the cleanup done in g_xxxx_stream_close() fails?

because closing from peer must immediately close the other side (they
share memory), but the other side might have pending operations, so
g_xxxx_stream_close() will fail.

>
>> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
>> +                             "Stream is already closed");
>> +        return -1;
>> +    }
>> +
>> +    if (!self->peer->buffer) {
>> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
>> +                             g_strerror(EAGAIN));
>> +        return -1;
>> +    }
>> +
>> +    g_return_val_if_fail(self->peer->buffer, -1);
>
> You just checked that self->peer->buffer is not NULL and returned an
> error if it is.

right, removed

>
>> +
>> +    count = MIN(self->peer->count, count);
>> +    memcpy(buffer, self->peer->buffer, count);
>> +    self->read = count;
>> +    self->peer->buffer = NULL;
>> +
>> +    //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
>> +    pipe_output_stream_check_source(self->peer);
>> +
>> +    return count;
>> +}
>> +
>> +static void
>> +pipe_input_stream_check_source (PipeInputStream *self)
>> +{
>> +    if (self->source && !g_source_is_destroyed(self->source) &&
>> +        g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
>> +        g_source_set_ready_time(self->source, 0);
>
> g_source_set_ready_time API doc says "This API is only intended to be
> used by implementations of GSource. Do not call this API on a GSource
> that you did not create."

Well, I used a "GConditionSource" in previous series, similar to the
condition source code in gio-coroutine.c. I proposed it upstream
(because I needed private fields for GPollable to work!), but I was
told to use this API instead by Ryan, who also wrote that
documentation! I guess he should modify it.

>> +}
>> +
>> +static gboolean
>> +pipe_input_stream_close (GInputStream  *stream,
>> +                         GCancellable   *cancellable,
>> +                         GError        **error)
>> +{
>> +    PipeInputStream *self;
>> +
>> +    self = PIPE_INPUT_STREAM(stream);
>> +
>> +    if (self->peer) {
>> +        /* ignore any pending errors */
>> +        self->peer->closed = TRUE;
>> +        g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
>> +        pipe_output_stream_check_source(self->peer);
>> +    }
>
> Why not set self->priv->closed as well?

Because that field is for peer closing. I can rename it peer_closed
perhaps? (peer->peer_closed...)

>
>> +
>> +    return TRUE;
>> +}
>> +
>> +static void
>> +pipe_input_stream_close_async (GInputStream       *stream,
>> +                               int                  io_priority,
>> +                               GCancellable        *cancellable,
>> +                               GAsyncReadyCallback  callback,
>> +                               gpointer             data)
>> +{
>> +    GTask *task;
>> +
>> +    task = g_task_new (stream, cancellable, callback, data);
>> +
>> +    /* will always return TRUE */
>> +    pipe_input_stream_close (stream, cancellable, NULL);
>> +
>> +    g_task_return_boolean (task, TRUE);
>> +    g_object_unref (task);
>> +}
>
> g_input_stream_close_async() API doc says "The asyncronous methods have
> a default fallback that uses threads to implement asynchronicity, so
> they are optional for inheriting classes. However, if you override one
> you must override all." I guess we should do without that one as this is
> the only async operation you overrode.

Yes, I am not sure what that means in practice. There are also default
_async fallback for other methods, this might be outdated comment?

>
>> +
>> +static gboolean
>> +pipe_input_stream_close_finish (GInputStream  *stream,
>> +                                GAsyncResult   *result,
>> +                                GError        **error)
>> +{
>> +    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
>> +
>> +    return g_task_propagate_boolean (G_TASK (result), error);
>> +}
>> +
>> +static void
>> +pipe_input_stream_init (PipeInputStream *self)
>> +{
>> +    self->read = -1;
>> +}
>> +
>> +static void
>> +pipe_input_stream_dispose(GObject *object)
>> +{
>> +    PipeInputStream *self;
>> +
>> +    self = PIPE_INPUT_STREAM(object);
>> +
>> +    if (self->peer) {
>> +        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
>> +        self->peer = NULL;
>> +    }
>> +
>
> No cleanup of the 'source' private member?

Clean-up added

>
>> +    G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
>> +}
>> +
>> +static void
>> +pipe_input_stream_class_init (PipeInputStreamClass *klass)
>> +{
>> +    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
>> +    GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
>> +
>> +    istream_class->read_fn  = pipe_input_stream_read;
>> +    istream_class->close_fn = pipe_input_stream_close;
>> +    istream_class->close_async  = pipe_input_stream_close_async;
>> +    istream_class->close_finish = pipe_input_stream_close_finish;
>> +
>> +    gobject_class->dispose = pipe_input_stream_dispose;
>> +}
>> +
>> +static gboolean
>> +pipe_input_stream_is_readable (GPollableInputStream *stream)
>> +{
>> +    PipeInputStream *self = PIPE_INPUT_STREAM (stream);
>> +    gboolean readable;
>> +
>> +    readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed;
>> +    //g_debug("readable %p %d", self->peer, readable);
>> +
>> +    return readable;
>> +}
>> +
>> +static GSource *
>> +pipe_input_stream_create_source (GPollableInputStream *stream,
>> +                                 GCancellable         *cancellable)
>> +{
>> +    PipeInputStream *self = PIPE_INPUT_STREAM(stream);
>> +    GSource *pollable_source;
>> +
>> +    g_return_val_if_fail (self->source == NULL ||
>> +                          g_source_is_destroyed (self->source), NULL);
>> +
>> +    if (self->source && g_source_is_destroyed (self->source))
>> +        g_source_unref (self->source);
>> +
>> +    pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
>> +    self->source = g_source_ref (pollable_source);
>> +
>> +    return pollable_source;
>> +}
>> +
>> +static void
>> +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
>> +{
>> +    iface->is_readable   = pipe_input_stream_is_readable;
>> +    iface->create_source = pipe_input_stream_create_source;
>> +}
>> +
>> +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
>> +
>> +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
>> +                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
>> +                                                pipe_output_stream_pollable_iface_init))
>> +
>> +static gssize
>> +pipe_output_stream_write (GOutputStream  *stream,
>> +                          const void     *buffer,
>> +                          gsize           count,
>> +                          GCancellable   *cancellable,
>> +                          GError        **error)
>> +{
>> +    PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
>> +    PipeInputStream *peer = self->peer;
>> +
>> +    //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
>> +    if (g_output_stream_is_closed (stream) || self->closed) {
>> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
>> +                             "Stream is already closed");
>> +        return -1;
>> +    }
>> +
>> +    /* this abuses pollable stream, writing sync would likely lead to
>> +       crashes, since the buffer pointer would become invalid, a
>> +       generic solution would need a copy..
>> +    */
>> +    g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
>> +    self->buffer = buffer;
>> +    self->count = count;
>> +
>> +    pipe_input_stream_check_source(self->peer);
>
> This call will trigger a sync call to pipe_input_stream_read() if I
> followed everything properly? A comment mentioning that might make the
> code easier to follow.

It's not sync, it will "schedule peer source". As said in the comment,
this abuses pollable stream, since it keeps a pointer to the buffer
and assumes the function will be resumed with the same data (there are
preconditions checks for that)

>
>> +
>> +    if (peer->read < 0) {
>> +        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
>> +                             g_strerror (EAGAIN));
>> +        return -1;
>> +    }
>> +
>> +    g_assert(peer->read <= self->count);
>> +    count = peer->read;
>
>
> My understanding is that here you don't want to assume that self->count
> == peer->read...
>> +
>> +    self->buffer = NULL;
>> +    self->count = 0;
>
> ... but here you make the assumption that everything was consumed by the
> input stream

it's just clearing internal state of the input data, the actual bytes
written count is returned to writer.


>> +    peer->read = -1;
>> +
>> +    return count;
>> +}
>> +
>> +static void
>> +pipe_output_stream_init (PipeOutputStream *stream)
>> +{
>> +}
>> +
>> +static void
>> +pipe_output_stream_dispose(GObject *object)
>> +{
>> +    PipeOutputStream *self;
>> +
>> +    self = PIPE_OUTPUT_STREAM(object);
>> +
>> +    if (self->peer) {
>> +        g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
>> +        self->peer = NULL;
>> +    }
>> +
>
> Same question about the 'source' private member.
>
> Same comment about g_source_set_ready_time.
>
> Same comment as before about async methods
>
>

same answers.



-- 
Marc-André Lureau


More information about the Spice-devel mailing list