[Spice-devel] [PATCH phodav 11/13] spice: move OutputQueue to file

Marc-André Lureau marcandre.lureau at gmail.com
Thu May 23 13:42:19 UTC 2019


Hi

On Thu, May 23, 2019 at 10:37 AM Jakub Janků <jjanku at redhat.com> wrote:
>
> OutputQueue is a self-contained unit and as such can be put in
> a separate file to make the spice-webdavd.c less cluttered.
>
> Also, as the current implementation defines output_queue_{ref, unref},
> turn OutputQueue into a GObject which can handle these for us.
>
> Signed-off-by: Jakub Janků <jjanku at redhat.com>

ack in principle, minor coding style issues

The phodav source code tries to follow glib code style. Can you indent
accordingly?

> ---
>  spice/meson.build     |   8 ++-
>  spice/output-queue.c  | 164 ++++++++++++++++++++++++++++++++++++++++++
>  spice/output-queue.h  |  38 ++++++++++
>  spice/spice-webdavd.c | 162 ++---------------------------------------
>  4 files changed, 214 insertions(+), 158 deletions(-)
>  create mode 100644 spice/output-queue.c
>  create mode 100644 spice/output-queue.h
>
> diff --git a/spice/meson.build b/spice/meson.build
> index 6db22cc..06d20e6 100644
> --- a/spice/meson.build
> +++ b/spice/meson.build
> @@ -4,9 +4,15 @@ if host_machine.system() == 'windows'
>    win32_deps += compiler.find_library('mpr')
>  endif
>
> +sources = [
> +  'spice-webdavd.c',
> +  'output-queue.c',
> +  'output-queue.h'
> +]
> +
>  executable(
>    'spice-webdavd',
> -  [ 'spice-webdavd.c' ],
> +  sources,
>    install_dir : sbindir,
>    include_directories : incdir,
>    dependencies : win32_deps + avahi_deps + deps,
> diff --git a/spice/output-queue.c b/spice/output-queue.c
> new file mode 100644
> index 0000000..6991493
> --- /dev/null
> +++ b/spice/output-queue.c
> @@ -0,0 +1,164 @@
> +/*
> + * Copyright (C) 2019 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <config.h>
> +
> +#include "output-queue.h"
> +
> +typedef struct _OutputQueueElem
> +{
> +  OutputQueue  *queue;
> +  const guint8 *buf;
> +  gsize         size;
> +  PushedCb      cb;
> +  gpointer      user_data;
> +} OutputQueueElem;
> +
> +struct _OutputQueue
> +{
> +  GObject        parent_instance;
> +  GOutputStream *output;
> +  gboolean       flushing;
> +  guint          idle_id;
> +  GQueue        *queue;
> +  GCancellable  *cancel;
> +};
> +
> +G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT);
> +
> +static void output_queue_init(OutputQueue *self)
> +{
> +    self->queue = g_queue_new ();
> +}
> +
> +static void output_queue_finalize(GObject *obj)
> +{
> +    OutputQueue *self = OUTPUT_QUEUE(obj);
> +
> +    g_warn_if_fail (g_queue_get_length (self->queue) == 0);
> +    g_warn_if_fail (!self->flushing);
> +    g_warn_if_fail (!self->idle_id);
> +
> +    g_queue_free_full (self->queue, g_free);
> +    g_object_unref (self->output);
> +    g_object_unref (self->cancel);
> +
> +    G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj);
> +}
> +
> +static void output_queue_class_init(OutputQueueClass *klass)
> +{
> +    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
> +    gobject_class->finalize = output_queue_finalize;
> +}
> +
> +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
> +{
> +  OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL);
> +  self->output = g_object_ref (output);
> +  self->cancel = g_object_ref (cancel);
> +  return self;
> +}
> +
> +static gboolean output_queue_idle (gpointer user_data);
> +
> +static void
> +output_queue_flush_cb (GObject      *source_object,
> +                       GAsyncResult *res,
> +                       gpointer      user_data)
> +{
> +  GError *error = NULL;
> +  OutputQueueElem *e = user_data;
> +  OutputQueue *q = e->queue;
> +
> +  g_debug ("flushed");
> +  q->flushing = FALSE;
> +  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
> +                                res, &error);
> +  if (error)
> +    g_warning ("error: %s", error->message);
> +
> +  g_clear_error (&error);
> +
> +  if (!q->idle_id)
> +    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
> +
> +  g_free (e);
> +  g_object_unref (q);
> +}
> +
> +static gboolean
> +output_queue_idle (gpointer user_data)
> +{
> +  OutputQueue *q = user_data;
> +  OutputQueueElem *e = NULL;
> +  GError *error = NULL;
> +
> +  if (q->flushing)
> +    {
> +      g_debug ("already flushing");
> +      goto end;
> +    }
> +
> +  e = g_queue_pop_head (q->queue);
> +  if (!e)
> +    {
> +      g_debug ("No more data to flush");
> +      goto end;
> +    }
> +
> +  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
> +  g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
> +  if (e->cb)
> +    e->cb (q, e->user_data, error);
> +
> +  if (error)
> +      goto end;
> +
> +  q->flushing = TRUE;
> +  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
> +
> +  q->idle_id = 0;
> +  return FALSE;
> +
> +end:
> +  g_clear_error (&error);
> +  q->idle_id = 0;
> +  g_free (e);
> +  g_object_unref (q);
> +
> +  return FALSE;
> +}
> +
> +void
> +output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> +                   PushedCb pushed_cb, gpointer user_data)
> +{
> +  OutputQueueElem *e;
> +
> +  g_return_if_fail (q != NULL);
> +
> +  e = g_new (OutputQueueElem, 1);
> +  e->buf = buf;
> +  e->size = size;
> +  e->cb = pushed_cb;
> +  e->user_data = user_data;
> +  e->queue = q;
> +  g_queue_push_tail (q->queue, e);
> +
> +  if (!q->idle_id && !q->flushing)
> +    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
> +}
> diff --git a/spice/output-queue.h b/spice/output-queue.h
> new file mode 100644
> index 0000000..ab8f6eb
> --- /dev/null
> +++ b/spice/output-queue.h
> @@ -0,0 +1,38 @@
> +/*
> + * Copyright (C) 2019 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef __OUTPUT_QUEUE_H
> +#define __OUTPUT_QUEUE_H
> +
> +#include <gio/gio.h>
> +#include <glib-object.h>
> +
> +G_BEGIN_DECLS
> +
> +#define OUTPUT_TYPE_QUEUE output_queue_get_type()
> +G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject);
> +
> +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel);
> +
> +typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
> +
> +void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> +                        PushedCb pushed_cb, gpointer user_data);
> +
> +G_END_DECLS
> +
> +#endif
> diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
> index f2c7f07..84ab770 100644
> --- a/spice/spice-webdavd.c
> +++ b/spice/spice-webdavd.c
> @@ -39,25 +39,7 @@
>  #include <avahi-gobject/ga-entry-group.h>
>  #endif
>
> -typedef struct _OutputQueue
> -{
> -  guint          refs;
> -  GOutputStream *output;
> -  gboolean       flushing;
> -  guint          idle_id;
> -  GQueue        *queue;
> -} OutputQueue;
> -
> -typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
> -
> -typedef struct _OutputQueueElem
> -{
> -  OutputQueue  *queue;
> -  const guint8 *buf;
> -  gsize         size;
> -  PushedCb      cb;
> -  gpointer      user_data;
> -} OutputQueueElem;
> +#include "output-queue.h"
>
>  typedef struct _ServiceData
>  {
> @@ -69,139 +51,6 @@ typedef struct _ServiceData
>
>  static GCancellable *cancel;
>
> -static OutputQueue*
> -output_queue_new (GOutputStream *output)
> -{
> -  OutputQueue *queue = g_new0 (OutputQueue, 1);
> -
> -  queue->output = g_object_ref (output);
> -  queue->queue = g_queue_new ();
> -  queue->refs = 1;
> -
> -  return queue;
> -}
> -
> -static
> -void
> -output_queue_free (OutputQueue *queue)
> -{
> -  g_warn_if_fail (g_queue_get_length (queue->queue) == 0);
> -  g_warn_if_fail (!queue->flushing);
> -  g_warn_if_fail (!queue->idle_id);
> -
> -  g_queue_free_full (queue->queue, g_free);
> -  g_clear_object (&queue->output);
> -  g_free (queue);
> -}
> -
> -static OutputQueue*
> -output_queue_ref (OutputQueue *q)
> -{
> -  q->refs++;
> -  return q;
> -}
> -
> -static void
> -output_queue_unref (OutputQueue *q)
> -{
> -  g_return_if_fail (q != NULL);
> -
> -  q->refs--;
> -  if (q->refs == 0)
> -    output_queue_free (q);
> -}
> -
> -static gboolean output_queue_idle (gpointer user_data);
> -
> -static void
> -output_queue_flush_cb (GObject      *source_object,
> -                       GAsyncResult *res,
> -                       gpointer      user_data)
> -{
> -  GError *error = NULL;
> -  OutputQueueElem *e = user_data;
> -  OutputQueue *q = e->queue;
> -
> -  g_debug ("flushed");
> -  q->flushing = FALSE;
> -  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
> -                                res, &error);
> -  if (error)
> -    g_warning ("error: %s", error->message);
> -
> -  g_clear_error (&error);
> -
> -  if (!q->idle_id)
> -    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
> -
> -  g_free (e);
> -  output_queue_unref (q);
> -}
> -
> -static gboolean
> -output_queue_idle (gpointer user_data)
> -{
> -  OutputQueue *q = user_data;
> -  OutputQueueElem *e = NULL;
> -  GError *error = NULL;
> -
> -  if (q->flushing)
> -    {
> -      g_debug ("already flushing");
> -      goto end;
> -    }
> -
> -  e = g_queue_pop_head (q->queue);
> -  if (!e)
> -    {
> -      g_debug ("No more data to flush");
> -      goto end;
> -    }
> -
> -  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
> -  g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error);
> -  if (e->cb)
> -    e->cb (q, e->user_data, error);
> -
> -  if (error)
> -      goto end;
> -
> -  q->flushing = TRUE;
> -  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e);
> -
> -  q->idle_id = 0;
> -  return FALSE;
> -
> -end:
> -  g_clear_error (&error);
> -  q->idle_id = 0;
> -  g_free (e);
> -  output_queue_unref (q);
> -
> -  return FALSE;
> -}
> -
> -static void
> -output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> -                   PushedCb pushed_cb, gpointer user_data)
> -{
> -  OutputQueueElem *e;
> -
> -  g_return_if_fail (q != NULL);
> -
> -  e = g_new (OutputQueueElem, 1);
> -  e->buf = buf;
> -  e->size = size;
> -  e->cb = pushed_cb;
> -  e->user_data = user_data;
> -  e->queue = q;
> -  g_queue_push_tail (q->queue, e);
> -
> -  if (!q->idle_id && !q->flushing)
> -    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
> -}
> -
> -
>  static struct _DemuxData
>  {
>    gint64  client;
> @@ -264,7 +113,7 @@ add_client (GSocketConnection *client_connection)
>    client->client_connection = g_object_ref (client_connection);
>    // TODO: check if usage of this idiom is portable, or if we need to check collisions
>    client->id = GPOINTER_TO_INT (client_connection);
> -  client->queue = output_queue_new (bostream);
> +  client->queue = output_queue_new (bostream, cancel);
>    g_object_unref (bostream);
>
>    g_hash_table_insert (clients, &client->id, client);
> @@ -280,7 +129,7 @@ client_free (Client *c)
>
>    g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
>    g_object_unref (c->client_connection);
> -  output_queue_unref (c->queue);
> +  g_object_unref (c->queue);
>    g_free (c);
>  }
>
> @@ -732,7 +581,7 @@ open_mux_path (const char *path)
>    mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE));
>  #endif
>
> -  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream));
> +  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel);
>  }
>
>  #ifdef G_OS_WIN32
> @@ -1002,12 +851,11 @@ run_service (ServiceData *service_data)
>    g_clear_object (&mux_istream);
>    g_clear_object (&mux_ostream);
>
> -  output_queue_unref (mux_queue);
> +  g_clear_object (&mux_queue);
>    g_hash_table_unref (clients);
>
>    g_socket_service_stop (socket_service);
>
> -  mux_queue = NULL;
>    g_clear_object (&cancel);
>
>  #ifdef G_OS_WIN32
> --
> 2.21.0
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel



-- 
Marc-André Lureau


More information about the Spice-devel mailing list