[Spice-devel] [PATCH spice-gtk 1/5] Add GIOStream-based pipe
Marc-André Lureau
marcandre.lureau at gmail.com
Mon Feb 23 05:22:42 PST 2015
On Mon, Feb 23, 2015 at 1:28 PM, Christophe Fergeau <cfergeau at redhat.com> wrote:
> On Sat, Feb 21, 2015 at 01:40:12AM +0100, Marc-André Lureau wrote:
>> This allows to create a pipe between 2 GIOStream, the input side read
>> from the peer output side, and vice-versa.
>>
>> In the following patches, this will avoid the socket communication
>> to exchange with the embedded webdav server.
>> ---
>> configure.ac | 4 +-
>> gtk/Makefile.am | 7 +
>> gtk/giopipe.c | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>> gtk/giopipe.h | 29 ++++
>> tests/Makefile.am | 5 +
>> tests/pipe.c | 313 ++++++++++++++++++++++++++++++++++++
>> 6 files changed, 829 insertions(+), 1 deletion(-)
>> create mode 100644 gtk/giopipe.c
>> create mode 100644 gtk/giopipe.h
>> create mode 100644 tests/pipe.c
>>
>> diff --git a/configure.ac b/configure.ac
>> index 4e88dec..d98e502 100644
>> --- a/configure.ac
>> +++ b/configure.ac
>> @@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
>> if test "x$enable_webdav" = "xno"; then
>> have_phodav="no"
>> else
>> - PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no])
>> + PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])
>
> This glib requirement comes from the use of GSimpleIOStream, could be
> worth explicitly mentioning it in the commit log.
I don't see much need for this detail, there might be other API from
newer glib used, but ok.
>
>> AC_SUBST(PHODAV_CFLAGS)
>> AC_SUBST(PHODAV_LIBS)
>>
>> @@ -289,6 +289,8 @@ fi
>> AS_IF([test "x$have_phodav" = "xyes"],
>> AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav]))
>>
>> +AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"])
>> +
>> AC_ARG_WITH([audio],
>> AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]),
>> [],
>> diff --git a/gtk/Makefile.am b/gtk/Makefile.am
>> index 7728fec..ab50c79 100644
>> --- a/gtk/Makefile.am
>> +++ b/gtk/Makefile.am
>> @@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES += \
>> $(NULL)
>> endif
>>
>> +if WITH_PHODAV
>> +libspice_client_glib_2_0_la_SOURCES += \
>> + giopipe.c \
>> + giopipe.h \
>
> Has this GioPipe been proposed upstream?
No, it's far from being acceptable by upstream since it's only
handling async. Furthermore, I think upstream is reluctant to adding
more GIO stream code for some reason (I think too much burden
already).
>
>> + $(NULL)
>> +endif
>> +
>> if WITH_UCONTEXT
>> libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c
>> endif
>> diff --git a/gtk/giopipe.c b/gtk/giopipe.c
>> new file mode 100644
>> index 0000000..45007c4
>> --- /dev/null
>> +++ b/gtk/giopipe.c
>> @@ -0,0 +1,472 @@
>> +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
>> +/*
>> + Copyright (C) 2015 Red Hat, Inc.
>> +
>> + This library is free software; you can redistribute it and/or
>> + modify it under the terms of the GNU Lesser General Public
>> + License as published by the Free Software Foundation; either
>> + version 2.1 of the License, or (at your option) any later version.
>> +
>> + This library is distributed in the hope that it will be useful,
>> + but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
>> + Lesser General Public License for more details.
>> +
>> + You should have received a copy of the GNU Lesser General Public
>> + License along with this library; if not, see <http://www.gnu.org/licenses/>.
>> +*/
>> +
>> +#include <string.h>
>> +#include <errno.h>
>> +
>> +#include "giopipe.h"
>> +
>> +#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ())
>> +#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
>> +#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
>> +#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
>> +#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
>> +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
>> +
>> +typedef struct _PipeInputStreamClass PipeInputStreamClass;
>> +typedef struct _PipeInputStream PipeInputStream;
>> +typedef struct _PipeOutputStream PipeOutputStream;
>> +
>> +struct _PipeInputStream
>> +{
>> + GInputStream parent_instance;
>> +
>> + PipeOutputStream *peer;
>> + gssize read;
>> +
>> + /* GIOstream:closed is protected against pending operations, so we
>> + * use an additional close flag to cancel those when the peer is
>> + * closing.
>> + */
>> + gboolean closed;
>> + GSource *source;
>> +};
>> +
>> +struct _PipeInputStreamClass
>> +{
>> + GInputStreamClass parent_class;
>> +};
>> +
>> +#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ())
>> +#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
>> +#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
>> +#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
>> +#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
>> +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
>> +
>> +typedef struct _PipeOutputStreamClass PipeOutputStreamClass;
>> +
>> +struct _PipeOutputStream
>> +{
>> + GOutputStream parent_instance;
>> +
>> + PipeInputStream *peer;
>> + const gchar *buffer;
>> + gsize count;
>> + gboolean closed;
>> + GSource *source;
>> +};
>> +
>> +struct _PipeOutputStreamClass
>> +{
>> + GOutputStreamClass parent_class;
>> +};
>> +
>> +struct _SpicePipeStreamPrivate {
>> + PipeInputStream *input_stream;
>> + PipeOutputStream *output_stream;
>> +};
>
> This struct does not seem to be used.
right, it's a left over from previous version
>
>> +
>> +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
>> +static void pipe_input_stream_check_source (PipeInputStream *self);
>> +static void pipe_output_stream_check_source (PipeOutputStream *self);
>> +
>> +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
>> + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
>> + pipe_input_stream_pollable_iface_init))
>> +
>> +static gssize
>> +pipe_input_stream_read (GInputStream *stream,
>> + void *buffer,
>> + gsize count,
>> + GCancellable *cancellable,
>> + GError **error)
>> +{
>> + PipeInputStream *self = PIPE_INPUT_STREAM (stream);
>> +
>> + g_return_val_if_fail(count > 0, -1);
>> +
>> + if (g_input_stream_is_closed (stream) || self->closed) {
>
> It's not clear why you need this 'closed' variable, it seems to always
> be set together with calls to g_xxxx_stream_close(). Is that in case
> some of the cleanup done in g_xxxx_stream_close() fails?
because closing from peer must immediately close the other side (they
share memory), but the other side might have pending operations, so
g_xxxx_stream_close() will fail.
>
>> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
>> + "Stream is already closed");
>> + return -1;
>> + }
>> +
>> + if (!self->peer->buffer) {
>> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
>> + g_strerror(EAGAIN));
>> + return -1;
>> + }
>> +
>> + g_return_val_if_fail(self->peer->buffer, -1);
>
> You just checked that self->peer->buffer is not NULL and returned an
> error if it is.
right, removed
>
>> +
>> + count = MIN(self->peer->count, count);
>> + memcpy(buffer, self->peer->buffer, count);
>> + self->read = count;
>> + self->peer->buffer = NULL;
>> +
>> + //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
>> + pipe_output_stream_check_source(self->peer);
>> +
>> + return count;
>> +}
>> +
>> +static void
>> +pipe_input_stream_check_source (PipeInputStream *self)
>> +{
>> + if (self->source && !g_source_is_destroyed(self->source) &&
>> + g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
>> + g_source_set_ready_time(self->source, 0);
>
> g_source_set_ready_time API doc says "This API is only intended to be
> used by implementations of GSource. Do not call this API on a GSource
> that you did not create."
Well, I used a "GConditionSource" in previous series, similar to the
condition source code in gio-coroutine.c. I proposed it upstream
(because I needed private fields for GPollable to work!), but I was
told to use this API instead by Ryan, who also wrote that
documentation! I guess he should modify it.
>> +}
>> +
>> +static gboolean
>> +pipe_input_stream_close (GInputStream *stream,
>> + GCancellable *cancellable,
>> + GError **error)
>> +{
>> + PipeInputStream *self;
>> +
>> + self = PIPE_INPUT_STREAM(stream);
>> +
>> + if (self->peer) {
>> + /* ignore any pending errors */
>> + self->peer->closed = TRUE;
>> + g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
>> + pipe_output_stream_check_source(self->peer);
>> + }
>
> Why not set self->priv->closed as well?
Because that field is for peer closing. I can rename it peer_closed
perhaps? (peer->peer_closed...)
>
>> +
>> + return TRUE;
>> +}
>> +
>> +static void
>> +pipe_input_stream_close_async (GInputStream *stream,
>> + int io_priority,
>> + GCancellable *cancellable,
>> + GAsyncReadyCallback callback,
>> + gpointer data)
>> +{
>> + GTask *task;
>> +
>> + task = g_task_new (stream, cancellable, callback, data);
>> +
>> + /* will always return TRUE */
>> + pipe_input_stream_close (stream, cancellable, NULL);
>> +
>> + g_task_return_boolean (task, TRUE);
>> + g_object_unref (task);
>> +}
>
> g_input_stream_close_async() API doc says "The asyncronous methods have
> a default fallback that uses threads to implement asynchronicity, so
> they are optional for inheriting classes. However, if you override one
> you must override all." I guess we should do without that one as this is
> the only async operation you overrode.
Yes, I am not sure what that means in practice. There are also default
_async fallback for other methods, this might be outdated comment?
>
>> +
>> +static gboolean
>> +pipe_input_stream_close_finish (GInputStream *stream,
>> + GAsyncResult *result,
>> + GError **error)
>> +{
>> + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
>> +
>> + return g_task_propagate_boolean (G_TASK (result), error);
>> +}
>> +
>> +static void
>> +pipe_input_stream_init (PipeInputStream *self)
>> +{
>> + self->read = -1;
>> +}
>> +
>> +static void
>> +pipe_input_stream_dispose(GObject *object)
>> +{
>> + PipeInputStream *self;
>> +
>> + self = PIPE_INPUT_STREAM(object);
>> +
>> + if (self->peer) {
>> + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
>> + self->peer = NULL;
>> + }
>> +
>
> No cleanup of the 'source' private member?
Clean-up added
>
>> + G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
>> +}
>> +
>> +static void
>> +pipe_input_stream_class_init (PipeInputStreamClass *klass)
>> +{
>> + GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
>> + GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
>> +
>> + istream_class->read_fn = pipe_input_stream_read;
>> + istream_class->close_fn = pipe_input_stream_close;
>> + istream_class->close_async = pipe_input_stream_close_async;
>> + istream_class->close_finish = pipe_input_stream_close_finish;
>> +
>> + gobject_class->dispose = pipe_input_stream_dispose;
>> +}
>> +
>> +static gboolean
>> +pipe_input_stream_is_readable (GPollableInputStream *stream)
>> +{
>> + PipeInputStream *self = PIPE_INPUT_STREAM (stream);
>> + gboolean readable;
>> +
>> + readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed;
>> + //g_debug("readable %p %d", self->peer, readable);
>> +
>> + return readable;
>> +}
>> +
>> +static GSource *
>> +pipe_input_stream_create_source (GPollableInputStream *stream,
>> + GCancellable *cancellable)
>> +{
>> + PipeInputStream *self = PIPE_INPUT_STREAM(stream);
>> + GSource *pollable_source;
>> +
>> + g_return_val_if_fail (self->source == NULL ||
>> + g_source_is_destroyed (self->source), NULL);
>> +
>> + if (self->source && g_source_is_destroyed (self->source))
>> + g_source_unref (self->source);
>> +
>> + pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
>> + self->source = g_source_ref (pollable_source);
>> +
>> + return pollable_source;
>> +}
>> +
>> +static void
>> +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
>> +{
>> + iface->is_readable = pipe_input_stream_is_readable;
>> + iface->create_source = pipe_input_stream_create_source;
>> +}
>> +
>> +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
>> +
>> +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
>> + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
>> + pipe_output_stream_pollable_iface_init))
>> +
>> +static gssize
>> +pipe_output_stream_write (GOutputStream *stream,
>> + const void *buffer,
>> + gsize count,
>> + GCancellable *cancellable,
>> + GError **error)
>> +{
>> + PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
>> + PipeInputStream *peer = self->peer;
>> +
>> + //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
>> + if (g_output_stream_is_closed (stream) || self->closed) {
>> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
>> + "Stream is already closed");
>> + return -1;
>> + }
>> +
>> + /* this abuses pollable stream, writing sync would likely lead to
>> + crashes, since the buffer pointer would become invalid, a
>> + generic solution would need a copy..
>> + */
>> + g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
>> + self->buffer = buffer;
>> + self->count = count;
>> +
>> + pipe_input_stream_check_source(self->peer);
>
> This call will trigger a sync call to pipe_input_stream_read() if I
> followed everything properly? A comment mentioning that might make the
> code easier to follow.
It's not sync, it will "schedule peer source". As said in the comment,
this abuses pollable stream, since it keeps a pointer to the buffer
and assumes the function will be resumed with the same data (there are
preconditions checks for that)
>
>> +
>> + if (peer->read < 0) {
>> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
>> + g_strerror (EAGAIN));
>> + return -1;
>> + }
>> +
>> + g_assert(peer->read <= self->count);
>> + count = peer->read;
>
>
> My understanding is that here you don't want to assume that self->count
> == peer->read...
>> +
>> + self->buffer = NULL;
>> + self->count = 0;
>
> ... but here you make the assumption that everything was consumed by the
> input stream
it's just clearing internal state of the input data, the actual bytes
written count is returned to writer.
>> + peer->read = -1;
>> +
>> + return count;
>> +}
>> +
>> +static void
>> +pipe_output_stream_init (PipeOutputStream *stream)
>> +{
>> +}
>> +
>> +static void
>> +pipe_output_stream_dispose(GObject *object)
>> +{
>> + PipeOutputStream *self;
>> +
>> + self = PIPE_OUTPUT_STREAM(object);
>> +
>> + if (self->peer) {
>> + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
>> + self->peer = NULL;
>> + }
>> +
>
> Same question about the 'source' private member.
>
> Same comment about g_source_set_ready_time.
>
> Same comment as before about async methods
>
>
same answers.
--
Marc-André Lureau
More information about the Spice-devel
mailing list