[Spice-devel] [PATCH spice-gtk 1/5] Add GIOStream-based pipe
Christophe Fergeau
cfergeau at redhat.com
Mon Feb 23 04:28:33 PST 2015
On Sat, Feb 21, 2015 at 01:40:12AM +0100, Marc-André Lureau wrote:
> This allows to create a pipe between 2 GIOStream, the input side read
> from the peer output side, and vice-versa.
>
> In the following patches, this will avoid the socket communication
> to exchange with the embedded webdav server.
> ---
> configure.ac | 4 +-
> gtk/Makefile.am | 7 +
> gtk/giopipe.c | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
> gtk/giopipe.h | 29 ++++
> tests/Makefile.am | 5 +
> tests/pipe.c | 313 ++++++++++++++++++++++++++++++++++++
> 6 files changed, 829 insertions(+), 1 deletion(-)
> create mode 100644 gtk/giopipe.c
> create mode 100644 gtk/giopipe.h
> create mode 100644 tests/pipe.c
>
> diff --git a/configure.ac b/configure.ac
> index 4e88dec..d98e502 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav],
> if test "x$enable_webdav" = "xno"; then
> have_phodav="no"
> else
> - PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no])
> + PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no])
This glib requirement comes from the use of GSimpleIOStream, could be
worth explicitly mentioning it in the commit log.
> AC_SUBST(PHODAV_CFLAGS)
> AC_SUBST(PHODAV_LIBS)
>
> @@ -289,6 +289,8 @@ fi
> AS_IF([test "x$have_phodav" = "xyes"],
> AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav]))
>
> +AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"])
> +
> AC_ARG_WITH([audio],
> AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]),
> [],
> diff --git a/gtk/Makefile.am b/gtk/Makefile.am
> index 7728fec..ab50c79 100644
> --- a/gtk/Makefile.am
> +++ b/gtk/Makefile.am
> @@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES += \
> $(NULL)
> endif
>
> +if WITH_PHODAV
> +libspice_client_glib_2_0_la_SOURCES += \
> + giopipe.c \
> + giopipe.h \
Has this GioPipe been proposed upstream?
> + $(NULL)
> +endif
> +
> if WITH_UCONTEXT
> libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c
> endif
> diff --git a/gtk/giopipe.c b/gtk/giopipe.c
> new file mode 100644
> index 0000000..45007c4
> --- /dev/null
> +++ b/gtk/giopipe.c
> @@ -0,0 +1,472 @@
> +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
> +/*
> + Copyright (C) 2015 Red Hat, Inc.
> +
> + This library is free software; you can redistribute it and/or
> + modify it under the terms of the GNU Lesser General Public
> + License as published by the Free Software Foundation; either
> + version 2.1 of the License, or (at your option) any later version.
> +
> + This library is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + Lesser General Public License for more details.
> +
> + You should have received a copy of the GNU Lesser General Public
> + License along with this library; if not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#include <string.h>
> +#include <errno.h>
> +
> +#include "giopipe.h"
> +
> +#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ())
> +#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
> +#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
> +#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
> +#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
> +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
> +
> +typedef struct _PipeInputStreamClass PipeInputStreamClass;
> +typedef struct _PipeInputStream PipeInputStream;
> +typedef struct _PipeOutputStream PipeOutputStream;
> +
> +struct _PipeInputStream
> +{
> + GInputStream parent_instance;
> +
> + PipeOutputStream *peer;
> + gssize read;
> +
> + /* GIOstream:closed is protected against pending operations, so we
> + * use an additional close flag to cancel those when the peer is
> + * closing.
> + */
> + gboolean closed;
> + GSource *source;
> +};
> +
> +struct _PipeInputStreamClass
> +{
> + GInputStreamClass parent_class;
> +};
> +
> +#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ())
> +#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
> +#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
> +#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
> +#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
> +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
> +
> +typedef struct _PipeOutputStreamClass PipeOutputStreamClass;
> +
> +struct _PipeOutputStream
> +{
> + GOutputStream parent_instance;
> +
> + PipeInputStream *peer;
> + const gchar *buffer;
> + gsize count;
> + gboolean closed;
> + GSource *source;
> +};
> +
> +struct _PipeOutputStreamClass
> +{
> + GOutputStreamClass parent_class;
> +};
> +
> +struct _SpicePipeStreamPrivate {
> + PipeInputStream *input_stream;
> + PipeOutputStream *output_stream;
> +};
This struct does not seem to be used.
> +
> +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
> +static void pipe_input_stream_check_source (PipeInputStream *self);
> +static void pipe_output_stream_check_source (PipeOutputStream *self);
> +
> +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
> + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
> + pipe_input_stream_pollable_iface_init))
> +
> +static gssize
> +pipe_input_stream_read (GInputStream *stream,
> + void *buffer,
> + gsize count,
> + GCancellable *cancellable,
> + GError **error)
> +{
> + PipeInputStream *self = PIPE_INPUT_STREAM (stream);
> +
> + g_return_val_if_fail(count > 0, -1);
> +
> + if (g_input_stream_is_closed (stream) || self->closed) {
It's not clear why you need this 'closed' variable, it seems to always
be set together with calls to g_xxxx_stream_close(). Is that in case
some of the cleanup done in g_xxxx_stream_close() fails?
> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
> + "Stream is already closed");
> + return -1;
> + }
> +
> + if (!self->peer->buffer) {
> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
> + g_strerror(EAGAIN));
> + return -1;
> + }
> +
> + g_return_val_if_fail(self->peer->buffer, -1);
You just checked that self->peer->buffer is not NULL and returned an
error if it is.
> +
> + count = MIN(self->peer->count, count);
> + memcpy(buffer, self->peer->buffer, count);
> + self->read = count;
> + self->peer->buffer = NULL;
> +
> + //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
> + pipe_output_stream_check_source(self->peer);
> +
> + return count;
> +}
> +
> +static void
> +pipe_input_stream_check_source (PipeInputStream *self)
> +{
> + if (self->source && !g_source_is_destroyed(self->source) &&
> + g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
> + g_source_set_ready_time(self->source, 0);
g_source_set_ready_time API doc says "This API is only intended to be
used by implementations of GSource. Do not call this API on a GSource
that you did not create."
> +}
> +
> +static gboolean
> +pipe_input_stream_close (GInputStream *stream,
> + GCancellable *cancellable,
> + GError **error)
> +{
> + PipeInputStream *self;
> +
> + self = PIPE_INPUT_STREAM(stream);
> +
> + if (self->peer) {
> + /* ignore any pending errors */
> + self->peer->closed = TRUE;
> + g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
> + pipe_output_stream_check_source(self->peer);
> + }
Why not set self->priv->closed as well?
> +
> + return TRUE;
> +}
> +
> +static void
> +pipe_input_stream_close_async (GInputStream *stream,
> + int io_priority,
> + GCancellable *cancellable,
> + GAsyncReadyCallback callback,
> + gpointer data)
> +{
> + GTask *task;
> +
> + task = g_task_new (stream, cancellable, callback, data);
> +
> + /* will always return TRUE */
> + pipe_input_stream_close (stream, cancellable, NULL);
> +
> + g_task_return_boolean (task, TRUE);
> + g_object_unref (task);
> +}
g_input_stream_close_async() API doc says "The asyncronous methods have
a default fallback that uses threads to implement asynchronicity, so
they are optional for inheriting classes. However, if you override one
you must override all." I guess we should do without that one as this is
the only async operation you overrode.
> +
> +static gboolean
> +pipe_input_stream_close_finish (GInputStream *stream,
> + GAsyncResult *result,
> + GError **error)
> +{
> + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
> +
> + return g_task_propagate_boolean (G_TASK (result), error);
> +}
> +
> +static void
> +pipe_input_stream_init (PipeInputStream *self)
> +{
> + self->read = -1;
> +}
> +
> +static void
> +pipe_input_stream_dispose(GObject *object)
> +{
> + PipeInputStream *self;
> +
> + self = PIPE_INPUT_STREAM(object);
> +
> + if (self->peer) {
> + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
> + self->peer = NULL;
> + }
> +
No cleanup of the 'source' private member?
> + G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
> +}
> +
> +static void
> +pipe_input_stream_class_init (PipeInputStreamClass *klass)
> +{
> + GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
> + GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
> +
> + istream_class->read_fn = pipe_input_stream_read;
> + istream_class->close_fn = pipe_input_stream_close;
> + istream_class->close_async = pipe_input_stream_close_async;
> + istream_class->close_finish = pipe_input_stream_close_finish;
> +
> + gobject_class->dispose = pipe_input_stream_dispose;
> +}
> +
> +static gboolean
> +pipe_input_stream_is_readable (GPollableInputStream *stream)
> +{
> + PipeInputStream *self = PIPE_INPUT_STREAM (stream);
> + gboolean readable;
> +
> + readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed;
> + //g_debug("readable %p %d", self->peer, readable);
> +
> + return readable;
> +}
> +
> +static GSource *
> +pipe_input_stream_create_source (GPollableInputStream *stream,
> + GCancellable *cancellable)
> +{
> + PipeInputStream *self = PIPE_INPUT_STREAM(stream);
> + GSource *pollable_source;
> +
> + g_return_val_if_fail (self->source == NULL ||
> + g_source_is_destroyed (self->source), NULL);
> +
> + if (self->source && g_source_is_destroyed (self->source))
> + g_source_unref (self->source);
> +
> + pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
> + self->source = g_source_ref (pollable_source);
> +
> + return pollable_source;
> +}
> +
> +static void
> +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
> +{
> + iface->is_readable = pipe_input_stream_is_readable;
> + iface->create_source = pipe_input_stream_create_source;
> +}
> +
> +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
> +
> +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
> + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
> + pipe_output_stream_pollable_iface_init))
> +
> +static gssize
> +pipe_output_stream_write (GOutputStream *stream,
> + const void *buffer,
> + gsize count,
> + GCancellable *cancellable,
> + GError **error)
> +{
> + PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
> + PipeInputStream *peer = self->peer;
> +
> + //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
> + if (g_output_stream_is_closed (stream) || self->closed) {
> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
> + "Stream is already closed");
> + return -1;
> + }
> +
> + /* this abuses pollable stream, writing sync would likely lead to
> + crashes, since the buffer pointer would become invalid, a
> + generic solution would need a copy..
> + */
> + g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
> + self->buffer = buffer;
> + self->count = count;
> +
> + pipe_input_stream_check_source(self->peer);
This call will trigger a sync call to pipe_input_stream_read() if I
followed everything properly? A comment mentioning that might make the
code easier to follow.
> +
> + if (peer->read < 0) {
> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
> + g_strerror (EAGAIN));
> + return -1;
> + }
> +
> + g_assert(peer->read <= self->count);
> + count = peer->read;
My understanding is that here you don't want to assume that self->count
== peer->read...
> +
> + self->buffer = NULL;
> + self->count = 0;
... but here you make the assumption that everything was consumed by the
input stream
> + peer->read = -1;
> +
> + return count;
> +}
> +
> +static void
> +pipe_output_stream_init (PipeOutputStream *stream)
> +{
> +}
> +
> +static void
> +pipe_output_stream_dispose(GObject *object)
> +{
> + PipeOutputStream *self;
> +
> + self = PIPE_OUTPUT_STREAM(object);
> +
> + if (self->peer) {
> + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
> + self->peer = NULL;
> + }
> +
Same question about the 'source' private member.
> + G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
> +}
> +
> +static void
> +pipe_output_stream_check_source (PipeOutputStream *self)
> +{
> + if (self->source && !g_source_is_destroyed(self->source) &&
> + g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
> + g_source_set_ready_time(self->source, 0);
Same comment about g_source_set_ready_time.
> +}
> +
> +static gboolean
> +pipe_output_stream_close (GOutputStream *stream,
> + GCancellable *cancellable,
> + GError **error)
> +{
> + PipeOutputStream *self;
> +
> + self = PIPE_OUTPUT_STREAM(stream);
> +
> + if (self->peer) {
> + /* ignore any pending errors */
> + self->peer->closed = TRUE;
> + g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
> + pipe_input_stream_check_source(self->peer);
> + }
> +
> + return TRUE;
> +}
> +
> +static void
> +pipe_output_stream_close_async (GOutputStream *stream,
> + int io_priority,
> + GCancellable *cancellable,
> + GAsyncReadyCallback callback,
> + gpointer data)
> +{
> + GTask *task;
> +
> + task = g_task_new (stream, cancellable, callback, data);
> +
> + /* will always return TRUE */
> + pipe_output_stream_close (stream, cancellable, NULL);
> +
> + g_task_return_boolean (task, TRUE);
> + g_object_unref (task);
> +}
Same comment as before about async methods
Christophe
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 819 bytes
Desc: not available
URL: <http://lists.freedesktop.org/archives/spice-devel/attachments/20150223/24b11c05/attachment.sig>
More information about the Spice-devel
mailing list