[Spice-devel] [WIP vdagent] Add spice-webdavd

Marc-André Lureau marcandre.lureau at gmail.com
Sun Jan 12 09:41:08 PST 2014


This daemon publishes a local port to proxy a webdav server served over
/dev/virtio-ports/org.spice-space.webdav.0 (See spice-common protocol
documentation for details of this channel)

The service is announced over avahi/mdns, so that applications such as
nautilus can quickly notice existence of a new remote filesystem.
---
 Makefile.am         |   6 +-
 configure.ac        |   4 +-
 src/spice-webdavd.c | 611 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 619 insertions(+), 2 deletions(-)
 create mode 100644 src/spice-webdavd.c

diff --git a/Makefile.am b/Makefile.am
index 74cc313..6832f22 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -2,7 +2,7 @@ ACLOCAL_AMFLAGS = ${ACLOCAL_FLAGS}
 NULL =
 
 bin_PROGRAMS = src/spice-vdagent
-sbin_PROGRAMS = src/spice-vdagentd
+sbin_PROGRAMS = src/spice-vdagentd src/spice-webdavd
 
 src_spice_vdagent_CFLAGS = $(X_CFLAGS) $(SPICE_CFLAGS) $(GLIB2_CFLAGS)
 src_spice_vdagent_LDADD = $(X_LIBS) $(SPICE_LIBS) $(GLIB2_LIBS)
@@ -27,6 +27,10 @@ src_spice_vdagentd_SOURCES += src/dummy-session-info.c
 endif
 endif
 
+src_spice_webdavd_CFLAGS = $(WEBDAVD_CFLAGS) $(PIE_CFLAGS)
+src_spice_webdavd_LDADD = $(WEBDAVD_LIBS) $(PIE_LDFLAGS)
+src_spice_webdavd_SOURCES = src/spice-webdavd.c
+
 noinst_HEADERS = src/glib-compat.h \
                  src/session-info.h \
                  src/udscs.h \
diff --git a/configure.ac b/configure.ac
index 79905a8..b6dc823 100644
--- a/configure.ac
+++ b/configure.ac
@@ -76,10 +76,12 @@ AC_ARG_ENABLE([static-uinput],
               [enable_static_uinput="$enableval"],
               [enable_static_uinput="no"])
 
-PKG_CHECK_MODULES([GLIB2], [glib-2.0 >= 2.12])
+PKG_CHECK_MODULES(GLIB2, [glib-2.0 >= 2.12])
 PKG_CHECK_MODULES(X, [xfixes xrandr >= 1.3 xinerama x11])
 PKG_CHECK_MODULES(SPICE, [spice-protocol >= 0.12.5])
 
+PKG_CHECK_MODULES(WEBDAVD, [gio-unix-2.0 avahi-gobject avahi-client])
+
 if test "$with_session_info" = "auto" || test "$with_session_info" = "systemd"; then
     PKG_CHECK_MODULES([LIBSYSTEMD_LOGIN],
                       [libsystemd-login >= 42],
diff --git a/src/spice-webdavd.c b/src/spice-webdavd.c
new file mode 100644
index 0000000..e371e6e
--- /dev/null
+++ b/src/spice-webdavd.c
@@ -0,0 +1,611 @@
+#include <stdlib.h>
+#include <gio/gio.h>
+#include <gio/gunixsocketaddress.h>
+
+#include <avahi-gobject/ga-client.h>
+#include <avahi-gobject/ga-entry-group.h>
+
+typedef struct _OutputQueue {
+    GOutputStream *output;
+    gboolean flushing;
+    guint idle_id;
+    GQueue *queue;
+} OutputQueue;
+
+typedef struct _OutputQueueElem {
+    OutputQueue *queue;
+    const guint8 *buf;
+    gsize size;
+    GFunc cb;
+    gpointer user_data;
+} OutputQueueElem;
+
+static
+OutputQueue* output_queue_new(GOutputStream *output)
+{
+    OutputQueue *queue = g_new0(OutputQueue, 1);
+
+    queue->output = g_object_ref(output);
+    queue->queue = g_queue_new();
+
+    return queue;
+}
+
+static
+void output_queue_free(OutputQueue *queue)
+{
+    g_warn_if_fail(g_queue_get_length(queue->queue) == 0);
+    g_warn_if_fail(!queue->flushing);
+    g_warn_if_fail(!queue->idle_id);
+
+    g_queue_free_full(queue->queue, g_free);
+    g_clear_object(&queue->output);
+    g_free(queue);
+}
+
+static gboolean
+output_queue_idle(gpointer user_data);
+
+static void
+output_queue_flush_cb(GObject *source_object,
+                      GAsyncResult *res,
+                      gpointer user_data)
+{
+    GError *error = NULL;
+    OutputQueueElem *e = user_data;
+    OutputQueue *q = e->queue;
+
+    q->flushing = FALSE;
+    g_output_stream_flush_finish(G_OUTPUT_STREAM(source_object),
+                                 res, &error);
+    if (error)
+        g_warning("error: %s", error->message);
+
+    g_clear_error(&error);
+
+    if (!q->idle_id)
+        q->idle_id = g_idle_add(output_queue_idle, q);
+
+    g_free(e);
+}
+
+static gboolean
+output_queue_idle(gpointer user_data)
+{
+    OutputQueue *q = user_data;
+    OutputQueueElem *e;
+    GError *error = NULL;
+
+    if (q->flushing) {
+        g_debug("already flushing");
+        q->idle_id = 0;
+        return FALSE;
+    }
+
+    e = g_queue_pop_head(q->queue);
+    if (!e) {
+        g_debug("No more data to flush");
+        q->idle_id = 0;
+        return FALSE;
+    }
+
+    g_debug("flushing %" G_GSIZE_FORMAT, e->size);
+    g_output_stream_write_all(q->output, e->buf, e->size, NULL, NULL, &error);
+    if (error)
+        goto err;
+    else if (e->cb)
+        e->cb(q, e->user_data);
+
+    q->flushing = TRUE;
+    g_output_stream_flush_async(q->output, G_PRIORITY_DEFAULT, NULL, output_queue_flush_cb, e);
+
+    return TRUE;
+
+err:
+    g_warning("error: %s", error->message);
+    g_clear_error(&error);
+
+    q->idle_id = 0;
+    return FALSE;
+}
+
+static void
+output_queue_push(OutputQueue *q, const guint8 *buf, gsize size,
+                  GFunc pushed_cb, gpointer user_data)
+{
+    OutputQueueElem *e = g_new(OutputQueueElem, 1);
+
+    e->buf = buf;
+    e->size = size;
+    e->cb = pushed_cb;
+    e->user_data = user_data;
+    e->queue = q;
+    g_queue_push_tail(q->queue, e);
+
+    if (!q->idle_id && !q->flushing)
+        q->idle_id = g_idle_add(output_queue_idle, q);
+}
+
+
+static struct _DemuxData {
+    gint64 client;
+    guint16 size;
+    gchar buf[G_MAXUINT16];
+} demux;
+
+typedef struct _Client {
+    gint64 id;
+    guint8 buf[G_MAXUINT16];
+    guint16 size;
+    GSocketConnection *client_connection;
+    OutputQueue *queue;
+} Client;
+
+static GMainLoop *loop;
+static GIOStream *mux_iostream;
+static OutputQueue *mux_queue;
+static GHashTable *clients;
+
+static void
+start_mux_read(GIOStream *iostream);
+
+static void
+quit(int sig)
+{
+    g_main_loop_quit(loop);
+}
+
+static Client *
+add_client(GSocketConnection *client_connection)
+{
+    GIOStream *iostream = G_IO_STREAM(client_connection);
+    GOutputStream *ostream = g_io_stream_get_output_stream(iostream);
+    GOutputStream *bostream;
+    Client *client;
+
+    bostream = g_buffered_output_stream_new(ostream);
+    g_buffered_output_stream_set_auto_grow(G_BUFFERED_OUTPUT_STREAM(bostream), TRUE);
+
+    client = g_new0(Client, 1);
+    client->client_connection = client_connection;
+    client->id = GPOINTER_TO_INT(client_connection);
+    client->queue = output_queue_new(bostream);
+    g_object_unref(bostream);
+
+    g_hash_table_insert(clients, g_object_ref(client_connection), client);
+
+    return client;
+}
+
+static void
+remove_client(Client *client)
+{
+    g_debug("remove client %p", client);
+
+    output_queue_free(client->queue);
+    g_hash_table_remove(clients, client->client_connection);
+}
+
+typedef struct ReadData {
+    void *buffer;
+    gsize count;
+    gssize size;
+} ReadData;
+
+static void
+read_thread(GSimpleAsyncResult *simple,
+            GObject *object,
+            GCancellable *cancellable)
+{
+    GError *error = NULL;
+    GInputStream *stream = G_INPUT_STREAM(object);
+    ReadData *data;
+    gsize bread;
+
+    data = g_simple_async_result_get_op_res_gpointer(simple);
+
+    g_debug("my read %" G_GSIZE_FORMAT, data->count);
+    g_input_stream_read_all(stream,
+                            data->buffer, data->count, &bread,
+                            cancellable, &error);
+    if (bread != data->count)
+        data->size = -1;
+    else
+        data->size = bread;
+
+    if (error) {
+        g_debug("error: %s", error->message);
+        g_simple_async_result_set_from_error(simple, error);
+    }
+}
+
+static void
+my_input_stream_read_async(GInputStream *stream,
+                           void *buffer,
+                           gsize count,
+                           int io_priority,
+                           GCancellable *cancellable,
+                           GAsyncReadyCallback callback,
+                           gpointer user_data)
+{
+    GSimpleAsyncResult *simple;
+    ReadData *data = g_new(ReadData, 1);
+
+    data->buffer = buffer;
+    data->count = count;
+
+    simple = g_simple_async_result_new(G_OBJECT(stream),
+                                       callback, user_data,
+                                       my_input_stream_read_async);
+
+    g_simple_async_result_set_op_res_gpointer(simple, data, g_free);
+    g_simple_async_result_run_in_thread(simple, read_thread, io_priority, cancellable);
+}
+
+static gssize
+my_input_stream_read_finish(GInputStream *stream,
+                            GAsyncResult *result,
+                            GError      **error)
+{
+    GSimpleAsyncResult *simple;
+    ReadData *data;
+
+    g_return_val_if_fail(g_simple_async_result_is_valid(result,
+                                                        G_OBJECT(stream),
+                                                        my_input_stream_read_async),
+                         -1);
+
+    simple = G_SIMPLE_ASYNC_RESULT(result);
+
+    if (g_simple_async_result_propagate_error(simple, error))
+        return -1;
+
+    data = g_simple_async_result_get_op_res_gpointer(simple);
+
+    return data->size;
+}
+
+static void
+mux_pushed_client_cb(OutputQueue *q, gpointer user_data)
+{
+    start_mux_read(mux_iostream);
+}
+
+static void
+mux_data_read_cb(GObject *source_object,
+                 GAsyncResult *res,
+                 gpointer user_data)
+{
+    GError *error = NULL;
+    gssize size;
+
+    size = my_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+    g_return_if_fail(size == demux.size);
+    if (error) {
+        g_warning("error: %s", error->message);
+        g_clear_error(&error);
+        quit(0);
+        return;
+    }
+
+    Client *c = g_hash_table_lookup(clients, GINT_TO_POINTER(demux.client));
+    g_warn_if_fail(c != NULL);
+
+    if (c)
+        output_queue_push(c->queue, (guint8 *)demux.buf, demux.size,
+                          (GFunc)mux_pushed_client_cb, c);
+    else
+        start_mux_read(mux_iostream);
+}
+
+static void
+mux_size_read_cb(GObject *source_object,
+                 GAsyncResult *res,
+                 gpointer user_data)
+{
+    GInputStream *istream = G_INPUT_STREAM(source_object);
+    GError *error = NULL;
+    gssize size;
+
+    size = my_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+    if (error || size != sizeof(guint16))
+        goto end;
+
+    my_input_stream_read_async(istream,
+                               &demux.buf, demux.size, G_PRIORITY_DEFAULT,
+                               NULL, mux_data_read_cb, NULL);
+    return;
+
+end:
+    if (error) {
+        g_warning("error: %s", error->message);
+        g_clear_error(&error);
+    }
+
+    quit(0);
+}
+
+static void
+mux_client_read_cb(GObject *source_object,
+               GAsyncResult *res,
+               gpointer user_data)
+{
+    GInputStream *istream = G_INPUT_STREAM(source_object);
+    GError *error = NULL;
+    gssize size;
+
+    size = my_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+    if (error || size != sizeof(gint64))
+        goto end;
+
+    my_input_stream_read_async(istream,
+                               &demux.size, sizeof(guint16), G_PRIORITY_DEFAULT,
+                               NULL, mux_size_read_cb, NULL);
+    return;
+
+end:
+    if (error) {
+        g_warning("error: %s", error->message);
+        g_clear_error(&error);
+    }
+
+    quit(0);
+}
+
+static void
+start_mux_read(GIOStream *iostream)
+{
+    GInputStream *istream = g_io_stream_get_input_stream(iostream);
+
+    my_input_stream_read_async(istream,
+                               &demux.client, sizeof(gint64), G_PRIORITY_DEFAULT,
+                               NULL, mux_client_read_cb, NULL);
+}
+
+static void
+client_start_read(Client *client);
+
+static void
+mux_pushed_cb(OutputQueue *q, gpointer user_data)
+{
+    Client *client = user_data;
+
+    if (client->size == 0) {
+        g_debug("fixme, disconn");
+        remove_client(client);
+        return;
+    }
+
+    client_start_read(client);
+}
+
+static void
+client_read_cb(GObject *source_object,
+               GAsyncResult *res,
+               gpointer user_data)
+{
+    Client *client = user_data;
+    GError *error = NULL;
+    gsize size;
+
+    size = g_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+    if (error) {
+        g_warning("error: %s", error->message);
+        g_clear_error(&error);
+        remove_client(client);
+        return;
+    }
+
+    g_return_if_fail(size < G_MAXUINT16);
+    g_return_if_fail(size >= 0);
+    client->size = size;
+
+    output_queue_push(mux_queue, (guint8 *) &client->id, sizeof(gint64), NULL, NULL);
+    output_queue_push(mux_queue, (guint8 *) &client->size, sizeof(guint16), NULL, NULL);
+    output_queue_push(mux_queue, (guint8 *) client->buf, size, (GFunc)mux_pushed_cb, client);
+
+    return;
+}
+
+static void
+client_start_read(Client *client)
+{
+    GIOStream *iostream = G_IO_STREAM(client->client_connection);
+    GInputStream *istream = g_io_stream_get_input_stream(iostream);
+
+    g_input_stream_read_async(istream,
+                              client->buf, G_MAXUINT16, G_PRIORITY_DEFAULT,
+                              NULL, client_read_cb, client);
+}
+
+static gboolean
+incoming_callback(GSocketService *service,
+                  GSocketConnection *client_connection,
+                  GObject *source_object,
+                  gpointer user_data)
+{
+    Client *client;
+
+    g_debug("new client!");
+    client = add_client(client_connection);
+    client_start_read(client);
+
+    return FALSE;
+}
+
+static GaClient *mdns_client;
+static GaEntryGroup *mdns_group;
+static GaEntryGroupService *mdns_service;
+static int port;
+
+static void
+mdns_register_service(void)
+{
+    GError *error = NULL;
+
+    if (!mdns_group) {
+        mdns_group = ga_entry_group_new();
+
+        if (!ga_entry_group_attach (mdns_group, mdns_client, &error)) {
+            g_warning("Could not attach MDNS group to client: %s", error->message);
+            g_error_free(error);
+            return;
+        }
+    }
+
+    gchar *name = g_strdup_printf("%s\'s public share", g_get_user_name ());
+    mdns_service = ga_entry_group_add_service(mdns_group,
+                                              name, "_webdav._tcp",
+                                              port, &error,
+                                              NULL);
+    g_free(name);
+    if (!mdns_service) {
+        g_warning("Could not create service: %s", error->message);
+        g_error_free(error);
+        return;
+    }
+
+    gchar *record = g_strdup_printf("u=,p=,path=/");
+    if (!ga_entry_group_service_set(mdns_service, name, record, &error)) {
+        g_warning("Could not update TXT record: %s", error->message);
+        g_error_free(error);
+    }
+    g_free(record);
+
+    if (!ga_entry_group_commit(mdns_group, &error)) {
+        g_warning("Could not announce MDNS service: %s", error->message);
+        g_error_free(error);
+        return;
+    }
+}
+
+static void
+mdns_state_changed(GaClient *client, GaClientState state, gpointer user_data)
+{
+    switch (state) {
+      case GA_CLIENT_STATE_FAILURE:
+        g_warning("MDNS client state failure");
+        break;
+
+      case GA_CLIENT_STATE_S_RUNNING:
+        g_debug("MDNS client found server running");
+        mdns_register_service();
+        break;
+
+      case GA_CLIENT_STATE_S_COLLISION:
+      case GA_CLIENT_STATE_S_REGISTERING:
+        g_message("MDNS collision");
+        if (mdns_group) {
+            ga_entry_group_reset (mdns_group, NULL);
+            mdns_service = 0;
+        }
+        break;
+
+    default:
+        // Do nothing
+        break;
+    }
+}
+
+static void
+open_mux_path(const char *path)
+{
+    GError *error = NULL;
+    GFile *file;
+    GFileIOStream *fio;
+
+    g_return_if_fail(path);
+    g_return_if_fail(!mux_iostream);
+    g_return_if_fail(!mux_queue);
+
+    file = g_file_new_for_path(path);
+    fio = g_file_open_readwrite(file, NULL, &error);
+    g_object_unref(file);
+
+    if (error) {
+        g_printerr("%s\n", error->message);
+        exit(1);
+    }
+
+    mux_iostream = G_IO_STREAM(fio);
+    GOutputStream *ostream = g_io_stream_get_output_stream(mux_iostream);
+    mux_queue = output_queue_new(G_OUTPUT_STREAM(ostream));
+
+    start_mux_read(mux_iostream);
+}
+
+static GOptionEntry entries[] = {
+    { "port", 'p', 0,
+      G_OPTION_ARG_INT, &port,
+      "Port to listen on", NULL },
+    { NULL }
+};
+
+int main(int argc, char *argv[])
+{
+    GOptionContext *opts;
+    GError *error = NULL;
+
+    opts = g_option_context_new(NULL);
+    g_option_context_add_main_entries(opts, entries, NULL);
+    if (!g_option_context_parse(opts, &argc, &argv, &error)) {
+        g_printerr("Could not parse arguments: %s\n",
+                   error->message);
+        g_printerr("%s",
+                   g_option_context_get_help(opts, TRUE, NULL));
+        exit(1);
+    }
+    if (port == 0) {
+        g_printerr("please specify a valid port\n");
+        exit(1);
+    }
+    g_option_context_free(opts);
+
+    signal(SIGINT, quit);
+
+
+    GSocketService *service = g_socket_service_new();
+    GInetAddress *iaddr = g_inet_address_new_loopback(G_SOCKET_FAMILY_IPV4);
+    GSocketAddress *saddr = g_inet_socket_address_new(iaddr, port);
+    g_object_unref(iaddr);
+
+    g_socket_listener_add_address(G_SOCKET_LISTENER(service), saddr,
+                                  G_SOCKET_TYPE_STREAM,
+                                  G_SOCKET_PROTOCOL_TCP,
+                                  NULL,
+                                  NULL,
+                                  &error);
+    if (error) {
+        g_printerr("%s\n", error->message);
+        exit(1);
+    }
+
+    g_signal_connect(service,
+                     "incoming", G_CALLBACK(incoming_callback),
+                     NULL);
+
+    clients = g_hash_table_new_full(g_direct_hash, g_direct_equal, g_object_unref, g_free);
+    open_mux_path("/dev/virtio-ports/org.spice-space.webdav.0");
+
+    /* listen on port for incoming clients, multiplex there input into
+       virtio path, demultiplex input from there to the respective
+       clients */
+
+    g_socket_service_start(service);
+
+    mdns_client = ga_client_new(GA_CLIENT_FLAG_NO_FLAGS);
+    g_signal_connect(mdns_client, "state-changed", G_CALLBACK(mdns_state_changed), NULL);
+    if (!ga_client_start(mdns_client, &error)) {
+        g_printerr("%s\n", error->message);
+        exit(1);
+    }
+
+    loop = g_main_loop_new(NULL, TRUE);
+    g_main_loop_run(loop);
+    g_main_loop_unref(loop);
+
+    output_queue_free(mux_queue);
+    g_hash_table_unref(clients);
+
+    return 0;
+}
-- 
1.8.4.2



More information about the Spice-devel mailing list