[Spice-devel] [PATCH phodav 11/13] spice: move OutputQueue to file
Jakub Janků
jjanku at redhat.com
Thu May 23 08:37:23 UTC 2019
OutputQueue is a self-contained unit and as such can be put in
a separate file to make the spice-webdavd.c less cluttered.
Also, as the current implementation defines output_queue_{ref, unref},
turn OutputQueue into a GObject which can handle these for us.
Signed-off-by: Jakub Janků <jjanku at redhat.com>
---
spice/meson.build | 8 ++-
spice/output-queue.c | 164 ++++++++++++++++++++++++++++++++++++++++++
spice/output-queue.h | 38 ++++++++++
spice/spice-webdavd.c | 162 ++---------------------------------------
4 files changed, 214 insertions(+), 158 deletions(-)
create mode 100644 spice/output-queue.c
create mode 100644 spice/output-queue.h
diff --git a/spice/meson.build b/spice/meson.build
index 6db22cc..06d20e6 100644
--- a/spice/meson.build
+++ b/spice/meson.build
@@ -4,9 +4,15 @@ if host_machine.system() == 'windows'
win32_deps += compiler.find_library('mpr')
endif
+sources = [
+ 'spice-webdavd.c',
+ 'output-queue.c',
+ 'output-queue.h'
+]
+
executable(
'spice-webdavd',
- [ 'spice-webdavd.c' ],
+ sources,
install_dir : sbindir,
include_directories : incdir,
dependencies : win32_deps + avahi_deps + deps,
diff --git a/spice/output-queue.c b/spice/output-queue.c
new file mode 100644
index 0000000..6991493
--- /dev/null
+++ b/spice/output-queue.c
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+#include <config.h>
+
+#include "output-queue.h"
+
+typedef struct _OutputQueueElem
+{
+ OutputQueue *queue;
+ const guint8 *buf;
+ gsize size;
+ PushedCb cb;
+ gpointer user_data;
+} OutputQueueElem;
+
+struct _OutputQueue
+{
+ GObject parent_instance;
+ GOutputStream *output;
+ gboolean flushing;
+ guint idle_id;
+ GQueue *queue;
+ GCancellable *cancel;
+};
+
+G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT);
+
+static void output_queue_init(OutputQueue *self)
+{
+ self->queue = g_queue_new ();
+}
+
+static void output_queue_finalize(GObject *obj)
+{
+ OutputQueue *self = OUTPUT_QUEUE(obj);
+
+ g_warn_if_fail (g_queue_get_length (self->queue) == 0);
+ g_warn_if_fail (!self->flushing);
+ g_warn_if_fail (!self->idle_id);
+
+ g_queue_free_full (self->queue, g_free);
+ g_object_unref (self->output);
+ g_object_unref (self->cancel);
+
+ G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj);
+}
+
+static void output_queue_class_init(OutputQueueClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+ gobject_class->finalize = output_queue_finalize;
+}
+
+OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
+{
+ OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL);
+ self->output = g_object_ref (output);
+ self->cancel = g_object_ref (cancel);
+ return self;
+}
+
+static gboolean output_queue_idle (gpointer user_data);
+
+static void
+output_queue_flush_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GError *error = NULL;
+ OutputQueueElem *e = user_data;
+ OutputQueue *q = e->queue;
+
+ g_debug ("flushed");
+ q->flushing = FALSE;
+ g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+ res, &error);
+ if (error)
+ g_warning ("error: %s", error->message);
+
+ g_clear_error (&error);
+
+ if (!q->idle_id)
+ q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+
+ g_free (e);
+ g_object_unref (q);
+}
+
+static gboolean
+output_queue_idle (gpointer user_data)
+{
+ OutputQueue *q = user_data;
+ OutputQueueElem *e = NULL;
+ GError *error = NULL;
+
+ if (q->flushing)
+ {
+ g_debug ("already flushing");
+ goto end;
+ }
+
+ e = g_queue_pop_head (q->queue);
+ if (!e)
+ {
+ g_debug ("No more data to flush");
+ goto end;
+ }
+
+ g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
+ g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
+ if (e->cb)
+ e->cb (q, e->user_data, error);
+
+ if (error)
+ goto end;
+
+ q->flushing = TRUE;
+ g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
+
+ q->idle_id = 0;
+ return FALSE;
+
+end:
+ g_clear_error (&error);
+ q->idle_id = 0;
+ g_free (e);
+ g_object_unref (q);
+
+ return FALSE;
+}
+
+void
+output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
+ PushedCb pushed_cb, gpointer user_data)
+{
+ OutputQueueElem *e;
+
+ g_return_if_fail (q != NULL);
+
+ e = g_new (OutputQueueElem, 1);
+ e->buf = buf;
+ e->size = size;
+ e->cb = pushed_cb;
+ e->user_data = user_data;
+ e->queue = q;
+ g_queue_push_tail (q->queue, e);
+
+ if (!q->idle_id && !q->flushing)
+ q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+}
diff --git a/spice/output-queue.h b/spice/output-queue.h
new file mode 100644
index 0000000..ab8f6eb
--- /dev/null
+++ b/spice/output-queue.h
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __OUTPUT_QUEUE_H
+#define __OUTPUT_QUEUE_H
+
+#include <gio/gio.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define OUTPUT_TYPE_QUEUE output_queue_get_type()
+G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject);
+
+OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel);
+
+typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
+
+void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
+ PushedCb pushed_cb, gpointer user_data);
+
+G_END_DECLS
+
+#endif
diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
index f2c7f07..84ab770 100644
--- a/spice/spice-webdavd.c
+++ b/spice/spice-webdavd.c
@@ -39,25 +39,7 @@
#include <avahi-gobject/ga-entry-group.h>
#endif
-typedef struct _OutputQueue
-{
- guint refs;
- GOutputStream *output;
- gboolean flushing;
- guint idle_id;
- GQueue *queue;
-} OutputQueue;
-
-typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
-
-typedef struct _OutputQueueElem
-{
- OutputQueue *queue;
- const guint8 *buf;
- gsize size;
- PushedCb cb;
- gpointer user_data;
-} OutputQueueElem;
+#include "output-queue.h"
typedef struct _ServiceData
{
@@ -69,139 +51,6 @@ typedef struct _ServiceData
static GCancellable *cancel;
-static OutputQueue*
-output_queue_new (GOutputStream *output)
-{
- OutputQueue *queue = g_new0 (OutputQueue, 1);
-
- queue->output = g_object_ref (output);
- queue->queue = g_queue_new ();
- queue->refs = 1;
-
- return queue;
-}
-
-static
-void
-output_queue_free (OutputQueue *queue)
-{
- g_warn_if_fail (g_queue_get_length (queue->queue) == 0);
- g_warn_if_fail (!queue->flushing);
- g_warn_if_fail (!queue->idle_id);
-
- g_queue_free_full (queue->queue, g_free);
- g_clear_object (&queue->output);
- g_free (queue);
-}
-
-static OutputQueue*
-output_queue_ref (OutputQueue *q)
-{
- q->refs++;
- return q;
-}
-
-static void
-output_queue_unref (OutputQueue *q)
-{
- g_return_if_fail (q != NULL);
-
- q->refs--;
- if (q->refs == 0)
- output_queue_free (q);
-}
-
-static gboolean output_queue_idle (gpointer user_data);
-
-static void
-output_queue_flush_cb (GObject *source_object,
- GAsyncResult *res,
- gpointer user_data)
-{
- GError *error = NULL;
- OutputQueueElem *e = user_data;
- OutputQueue *q = e->queue;
-
- g_debug ("flushed");
- q->flushing = FALSE;
- g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
- res, &error);
- if (error)
- g_warning ("error: %s", error->message);
-
- g_clear_error (&error);
-
- if (!q->idle_id)
- q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
-
- g_free (e);
- output_queue_unref (q);
-}
-
-static gboolean
-output_queue_idle (gpointer user_data)
-{
- OutputQueue *q = user_data;
- OutputQueueElem *e = NULL;
- GError *error = NULL;
-
- if (q->flushing)
- {
- g_debug ("already flushing");
- goto end;
- }
-
- e = g_queue_pop_head (q->queue);
- if (!e)
- {
- g_debug ("No more data to flush");
- goto end;
- }
-
- g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
- g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error);
- if (e->cb)
- e->cb (q, e->user_data, error);
-
- if (error)
- goto end;
-
- q->flushing = TRUE;
- g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e);
-
- q->idle_id = 0;
- return FALSE;
-
-end:
- g_clear_error (&error);
- q->idle_id = 0;
- g_free (e);
- output_queue_unref (q);
-
- return FALSE;
-}
-
-static void
-output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
- PushedCb pushed_cb, gpointer user_data)
-{
- OutputQueueElem *e;
-
- g_return_if_fail (q != NULL);
-
- e = g_new (OutputQueueElem, 1);
- e->buf = buf;
- e->size = size;
- e->cb = pushed_cb;
- e->user_data = user_data;
- e->queue = q;
- g_queue_push_tail (q->queue, e);
-
- if (!q->idle_id && !q->flushing)
- q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
-}
-
-
static struct _DemuxData
{
gint64 client;
@@ -264,7 +113,7 @@ add_client (GSocketConnection *client_connection)
client->client_connection = g_object_ref (client_connection);
// TODO: check if usage of this idiom is portable, or if we need to check collisions
client->id = GPOINTER_TO_INT (client_connection);
- client->queue = output_queue_new (bostream);
+ client->queue = output_queue_new (bostream, cancel);
g_object_unref (bostream);
g_hash_table_insert (clients, &client->id, client);
@@ -280,7 +129,7 @@ client_free (Client *c)
g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
g_object_unref (c->client_connection);
- output_queue_unref (c->queue);
+ g_object_unref (c->queue);
g_free (c);
}
@@ -732,7 +581,7 @@ open_mux_path (const char *path)
mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE));
#endif
- mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream));
+ mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel);
}
#ifdef G_OS_WIN32
@@ -1002,12 +851,11 @@ run_service (ServiceData *service_data)
g_clear_object (&mux_istream);
g_clear_object (&mux_ostream);
- output_queue_unref (mux_queue);
+ g_clear_object (&mux_queue);
g_hash_table_unref (clients);
g_socket_service_stop (socket_service);
- mux_queue = NULL;
g_clear_object (&cancel);
#ifdef G_OS_WIN32
--
2.21.0
More information about the Spice-devel
mailing list