[Spice-devel] [PATCH phodav 11/13] spice: move OutputQueue to file
Marc-André Lureau
marcandre.lureau at gmail.com
Thu May 23 13:42:19 UTC 2019
Hi
On Thu, May 23, 2019 at 10:37 AM Jakub Janků <jjanku at redhat.com> wrote:
>
> OutputQueue is a self-contained unit and as such can be put in
> a separate file to make the spice-webdavd.c less cluttered.
>
> Also, as the current implementation defines output_queue_{ref, unref},
> turn OutputQueue into a GObject which can handle these for us.
>
> Signed-off-by: Jakub Janků <jjanku at redhat.com>
ack in principle, minor coding style issues
The phodav source code tries to follow glib code style. Can you indent
accordingly?
> ---
> spice/meson.build | 8 ++-
> spice/output-queue.c | 164 ++++++++++++++++++++++++++++++++++++++++++
> spice/output-queue.h | 38 ++++++++++
> spice/spice-webdavd.c | 162 ++---------------------------------------
> 4 files changed, 214 insertions(+), 158 deletions(-)
> create mode 100644 spice/output-queue.c
> create mode 100644 spice/output-queue.h
>
> diff --git a/spice/meson.build b/spice/meson.build
> index 6db22cc..06d20e6 100644
> --- a/spice/meson.build
> +++ b/spice/meson.build
> @@ -4,9 +4,15 @@ if host_machine.system() == 'windows'
> win32_deps += compiler.find_library('mpr')
> endif
>
> +sources = [
> + 'spice-webdavd.c',
> + 'output-queue.c',
> + 'output-queue.h'
> +]
> +
> executable(
> 'spice-webdavd',
> - [ 'spice-webdavd.c' ],
> + sources,
> install_dir : sbindir,
> include_directories : incdir,
> dependencies : win32_deps + avahi_deps + deps,
> diff --git a/spice/output-queue.c b/spice/output-queue.c
> new file mode 100644
> index 0000000..6991493
> --- /dev/null
> +++ b/spice/output-queue.c
> @@ -0,0 +1,164 @@
> +/*
> + * Copyright (C) 2019 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <config.h>
> +
> +#include "output-queue.h"
> +
> +typedef struct _OutputQueueElem
> +{
> + OutputQueue *queue;
> + const guint8 *buf;
> + gsize size;
> + PushedCb cb;
> + gpointer user_data;
> +} OutputQueueElem;
> +
> +struct _OutputQueue
> +{
> + GObject parent_instance;
> + GOutputStream *output;
> + gboolean flushing;
> + guint idle_id;
> + GQueue *queue;
> + GCancellable *cancel;
> +};
> +
> +G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT);
> +
> +static void output_queue_init(OutputQueue *self)
> +{
> + self->queue = g_queue_new ();
> +}
> +
> +static void output_queue_finalize(GObject *obj)
> +{
> + OutputQueue *self = OUTPUT_QUEUE(obj);
> +
> + g_warn_if_fail (g_queue_get_length (self->queue) == 0);
> + g_warn_if_fail (!self->flushing);
> + g_warn_if_fail (!self->idle_id);
> +
> + g_queue_free_full (self->queue, g_free);
> + g_object_unref (self->output);
> + g_object_unref (self->cancel);
> +
> + G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj);
> +}
> +
> +static void output_queue_class_init(OutputQueueClass *klass)
> +{
> + GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
> + gobject_class->finalize = output_queue_finalize;
> +}
> +
> +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
> +{
> + OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL);
> + self->output = g_object_ref (output);
> + self->cancel = g_object_ref (cancel);
> + return self;
> +}
> +
> +static gboolean output_queue_idle (gpointer user_data);
> +
> +static void
> +output_queue_flush_cb (GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + GError *error = NULL;
> + OutputQueueElem *e = user_data;
> + OutputQueue *q = e->queue;
> +
> + g_debug ("flushed");
> + q->flushing = FALSE;
> + g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
> + res, &error);
> + if (error)
> + g_warning ("error: %s", error->message);
> +
> + g_clear_error (&error);
> +
> + if (!q->idle_id)
> + q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
> +
> + g_free (e);
> + g_object_unref (q);
> +}
> +
> +static gboolean
> +output_queue_idle (gpointer user_data)
> +{
> + OutputQueue *q = user_data;
> + OutputQueueElem *e = NULL;
> + GError *error = NULL;
> +
> + if (q->flushing)
> + {
> + g_debug ("already flushing");
> + goto end;
> + }
> +
> + e = g_queue_pop_head (q->queue);
> + if (!e)
> + {
> + g_debug ("No more data to flush");
> + goto end;
> + }
> +
> + g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
> + g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
> + if (e->cb)
> + e->cb (q, e->user_data, error);
> +
> + if (error)
> + goto end;
> +
> + q->flushing = TRUE;
> + g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
> +
> + q->idle_id = 0;
> + return FALSE;
> +
> +end:
> + g_clear_error (&error);
> + q->idle_id = 0;
> + g_free (e);
> + g_object_unref (q);
> +
> + return FALSE;
> +}
> +
> +void
> +output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> + PushedCb pushed_cb, gpointer user_data)
> +{
> + OutputQueueElem *e;
> +
> + g_return_if_fail (q != NULL);
> +
> + e = g_new (OutputQueueElem, 1);
> + e->buf = buf;
> + e->size = size;
> + e->cb = pushed_cb;
> + e->user_data = user_data;
> + e->queue = q;
> + g_queue_push_tail (q->queue, e);
> +
> + if (!q->idle_id && !q->flushing)
> + q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
> +}
> diff --git a/spice/output-queue.h b/spice/output-queue.h
> new file mode 100644
> index 0000000..ab8f6eb
> --- /dev/null
> +++ b/spice/output-queue.h
> @@ -0,0 +1,38 @@
> +/*
> + * Copyright (C) 2019 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef __OUTPUT_QUEUE_H
> +#define __OUTPUT_QUEUE_H
> +
> +#include <gio/gio.h>
> +#include <glib-object.h>
> +
> +G_BEGIN_DECLS
> +
> +#define OUTPUT_TYPE_QUEUE output_queue_get_type()
> +G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject);
> +
> +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel);
> +
> +typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
> +
> +void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> + PushedCb pushed_cb, gpointer user_data);
> +
> +G_END_DECLS
> +
> +#endif
> diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
> index f2c7f07..84ab770 100644
> --- a/spice/spice-webdavd.c
> +++ b/spice/spice-webdavd.c
> @@ -39,25 +39,7 @@
> #include <avahi-gobject/ga-entry-group.h>
> #endif
>
> -typedef struct _OutputQueue
> -{
> - guint refs;
> - GOutputStream *output;
> - gboolean flushing;
> - guint idle_id;
> - GQueue *queue;
> -} OutputQueue;
> -
> -typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
> -
> -typedef struct _OutputQueueElem
> -{
> - OutputQueue *queue;
> - const guint8 *buf;
> - gsize size;
> - PushedCb cb;
> - gpointer user_data;
> -} OutputQueueElem;
> +#include "output-queue.h"
>
> typedef struct _ServiceData
> {
> @@ -69,139 +51,6 @@ typedef struct _ServiceData
>
> static GCancellable *cancel;
>
> -static OutputQueue*
> -output_queue_new (GOutputStream *output)
> -{
> - OutputQueue *queue = g_new0 (OutputQueue, 1);
> -
> - queue->output = g_object_ref (output);
> - queue->queue = g_queue_new ();
> - queue->refs = 1;
> -
> - return queue;
> -}
> -
> -static
> -void
> -output_queue_free (OutputQueue *queue)
> -{
> - g_warn_if_fail (g_queue_get_length (queue->queue) == 0);
> - g_warn_if_fail (!queue->flushing);
> - g_warn_if_fail (!queue->idle_id);
> -
> - g_queue_free_full (queue->queue, g_free);
> - g_clear_object (&queue->output);
> - g_free (queue);
> -}
> -
> -static OutputQueue*
> -output_queue_ref (OutputQueue *q)
> -{
> - q->refs++;
> - return q;
> -}
> -
> -static void
> -output_queue_unref (OutputQueue *q)
> -{
> - g_return_if_fail (q != NULL);
> -
> - q->refs--;
> - if (q->refs == 0)
> - output_queue_free (q);
> -}
> -
> -static gboolean output_queue_idle (gpointer user_data);
> -
> -static void
> -output_queue_flush_cb (GObject *source_object,
> - GAsyncResult *res,
> - gpointer user_data)
> -{
> - GError *error = NULL;
> - OutputQueueElem *e = user_data;
> - OutputQueue *q = e->queue;
> -
> - g_debug ("flushed");
> - q->flushing = FALSE;
> - g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
> - res, &error);
> - if (error)
> - g_warning ("error: %s", error->message);
> -
> - g_clear_error (&error);
> -
> - if (!q->idle_id)
> - q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
> -
> - g_free (e);
> - output_queue_unref (q);
> -}
> -
> -static gboolean
> -output_queue_idle (gpointer user_data)
> -{
> - OutputQueue *q = user_data;
> - OutputQueueElem *e = NULL;
> - GError *error = NULL;
> -
> - if (q->flushing)
> - {
> - g_debug ("already flushing");
> - goto end;
> - }
> -
> - e = g_queue_pop_head (q->queue);
> - if (!e)
> - {
> - g_debug ("No more data to flush");
> - goto end;
> - }
> -
> - g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
> - g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error);
> - if (e->cb)
> - e->cb (q, e->user_data, error);
> -
> - if (error)
> - goto end;
> -
> - q->flushing = TRUE;
> - g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e);
> -
> - q->idle_id = 0;
> - return FALSE;
> -
> -end:
> - g_clear_error (&error);
> - q->idle_id = 0;
> - g_free (e);
> - output_queue_unref (q);
> -
> - return FALSE;
> -}
> -
> -static void
> -output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> - PushedCb pushed_cb, gpointer user_data)
> -{
> - OutputQueueElem *e;
> -
> - g_return_if_fail (q != NULL);
> -
> - e = g_new (OutputQueueElem, 1);
> - e->buf = buf;
> - e->size = size;
> - e->cb = pushed_cb;
> - e->user_data = user_data;
> - e->queue = q;
> - g_queue_push_tail (q->queue, e);
> -
> - if (!q->idle_id && !q->flushing)
> - q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
> -}
> -
> -
> static struct _DemuxData
> {
> gint64 client;
> @@ -264,7 +113,7 @@ add_client (GSocketConnection *client_connection)
> client->client_connection = g_object_ref (client_connection);
> // TODO: check if usage of this idiom is portable, or if we need to check collisions
> client->id = GPOINTER_TO_INT (client_connection);
> - client->queue = output_queue_new (bostream);
> + client->queue = output_queue_new (bostream, cancel);
> g_object_unref (bostream);
>
> g_hash_table_insert (clients, &client->id, client);
> @@ -280,7 +129,7 @@ client_free (Client *c)
>
> g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
> g_object_unref (c->client_connection);
> - output_queue_unref (c->queue);
> + g_object_unref (c->queue);
> g_free (c);
> }
>
> @@ -732,7 +581,7 @@ open_mux_path (const char *path)
> mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE));
> #endif
>
> - mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream));
> + mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel);
> }
>
> #ifdef G_OS_WIN32
> @@ -1002,12 +851,11 @@ run_service (ServiceData *service_data)
> g_clear_object (&mux_istream);
> g_clear_object (&mux_ostream);
>
> - output_queue_unref (mux_queue);
> + g_clear_object (&mux_queue);
> g_hash_table_unref (clients);
>
> g_socket_service_stop (socket_service);
>
> - mux_queue = NULL;
> g_clear_object (&cancel);
>
> #ifdef G_OS_WIN32
> --
> 2.21.0
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel
--
Marc-André Lureau
More information about the Spice-devel
mailing list