[Spice-devel] [WIP vdagent] Add spice-webdavd
Marc-André Lureau
marcandre.lureau at gmail.com
Sun Jan 12 09:41:08 PST 2014
This daemon publishes a local port to proxy a webdav server served over
/dev/virtio-ports/org.spice-space.webdav.0 (See spice-common protocol
documentation for details of this channel)
The service is announced over avahi/mdns, so that applications such as
nautilus can quickly notice existence of a new remote filesystem.
---
Makefile.am | 6 +-
configure.ac | 4 +-
src/spice-webdavd.c | 611 ++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 619 insertions(+), 2 deletions(-)
create mode 100644 src/spice-webdavd.c
diff --git a/Makefile.am b/Makefile.am
index 74cc313..6832f22 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -2,7 +2,7 @@ ACLOCAL_AMFLAGS = ${ACLOCAL_FLAGS}
NULL =
bin_PROGRAMS = src/spice-vdagent
-sbin_PROGRAMS = src/spice-vdagentd
+sbin_PROGRAMS = src/spice-vdagentd src/spice-webdavd
src_spice_vdagent_CFLAGS = $(X_CFLAGS) $(SPICE_CFLAGS) $(GLIB2_CFLAGS)
src_spice_vdagent_LDADD = $(X_LIBS) $(SPICE_LIBS) $(GLIB2_LIBS)
@@ -27,6 +27,10 @@ src_spice_vdagentd_SOURCES += src/dummy-session-info.c
endif
endif
+src_spice_webdavd_CFLAGS = $(WEBDAVD_CFLAGS) $(PIE_CFLAGS)
+src_spice_webdavd_LDADD = $(WEBDAVD_LIBS) $(PIE_LDFLAGS)
+src_spice_webdavd_SOURCES = src/spice-webdavd.c
+
noinst_HEADERS = src/glib-compat.h \
src/session-info.h \
src/udscs.h \
diff --git a/configure.ac b/configure.ac
index 79905a8..b6dc823 100644
--- a/configure.ac
+++ b/configure.ac
@@ -76,10 +76,12 @@ AC_ARG_ENABLE([static-uinput],
[enable_static_uinput="$enableval"],
[enable_static_uinput="no"])
-PKG_CHECK_MODULES([GLIB2], [glib-2.0 >= 2.12])
+PKG_CHECK_MODULES(GLIB2, [glib-2.0 >= 2.12])
PKG_CHECK_MODULES(X, [xfixes xrandr >= 1.3 xinerama x11])
PKG_CHECK_MODULES(SPICE, [spice-protocol >= 0.12.5])
+PKG_CHECK_MODULES(WEBDAVD, [gio-unix-2.0 avahi-gobject avahi-client])
+
if test "$with_session_info" = "auto" || test "$with_session_info" = "systemd"; then
PKG_CHECK_MODULES([LIBSYSTEMD_LOGIN],
[libsystemd-login >= 42],
diff --git a/src/spice-webdavd.c b/src/spice-webdavd.c
new file mode 100644
index 0000000..e371e6e
--- /dev/null
+++ b/src/spice-webdavd.c
@@ -0,0 +1,611 @@
+#include <stdlib.h>
+#include <gio/gio.h>
+#include <gio/gunixsocketaddress.h>
+
+#include <avahi-gobject/ga-client.h>
+#include <avahi-gobject/ga-entry-group.h>
+
+typedef struct _OutputQueue {
+ GOutputStream *output;
+ gboolean flushing;
+ guint idle_id;
+ GQueue *queue;
+} OutputQueue;
+
+typedef struct _OutputQueueElem {
+ OutputQueue *queue;
+ const guint8 *buf;
+ gsize size;
+ GFunc cb;
+ gpointer user_data;
+} OutputQueueElem;
+
+static
+OutputQueue* output_queue_new(GOutputStream *output)
+{
+ OutputQueue *queue = g_new0(OutputQueue, 1);
+
+ queue->output = g_object_ref(output);
+ queue->queue = g_queue_new();
+
+ return queue;
+}
+
+static
+void output_queue_free(OutputQueue *queue)
+{
+ g_warn_if_fail(g_queue_get_length(queue->queue) == 0);
+ g_warn_if_fail(!queue->flushing);
+ g_warn_if_fail(!queue->idle_id);
+
+ g_queue_free_full(queue->queue, g_free);
+ g_clear_object(&queue->output);
+ g_free(queue);
+}
+
+static gboolean
+output_queue_idle(gpointer user_data);
+
+static void
+output_queue_flush_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GError *error = NULL;
+ OutputQueueElem *e = user_data;
+ OutputQueue *q = e->queue;
+
+ q->flushing = FALSE;
+ g_output_stream_flush_finish(G_OUTPUT_STREAM(source_object),
+ res, &error);
+ if (error)
+ g_warning("error: %s", error->message);
+
+ g_clear_error(&error);
+
+ if (!q->idle_id)
+ q->idle_id = g_idle_add(output_queue_idle, q);
+
+ g_free(e);
+}
+
+static gboolean
+output_queue_idle(gpointer user_data)
+{
+ OutputQueue *q = user_data;
+ OutputQueueElem *e;
+ GError *error = NULL;
+
+ if (q->flushing) {
+ g_debug("already flushing");
+ q->idle_id = 0;
+ return FALSE;
+ }
+
+ e = g_queue_pop_head(q->queue);
+ if (!e) {
+ g_debug("No more data to flush");
+ q->idle_id = 0;
+ return FALSE;
+ }
+
+ g_debug("flushing %" G_GSIZE_FORMAT, e->size);
+ g_output_stream_write_all(q->output, e->buf, e->size, NULL, NULL, &error);
+ if (error)
+ goto err;
+ else if (e->cb)
+ e->cb(q, e->user_data);
+
+ q->flushing = TRUE;
+ g_output_stream_flush_async(q->output, G_PRIORITY_DEFAULT, NULL, output_queue_flush_cb, e);
+
+ return TRUE;
+
+err:
+ g_warning("error: %s", error->message);
+ g_clear_error(&error);
+
+ q->idle_id = 0;
+ return FALSE;
+}
+
+static void
+output_queue_push(OutputQueue *q, const guint8 *buf, gsize size,
+ GFunc pushed_cb, gpointer user_data)
+{
+ OutputQueueElem *e = g_new(OutputQueueElem, 1);
+
+ e->buf = buf;
+ e->size = size;
+ e->cb = pushed_cb;
+ e->user_data = user_data;
+ e->queue = q;
+ g_queue_push_tail(q->queue, e);
+
+ if (!q->idle_id && !q->flushing)
+ q->idle_id = g_idle_add(output_queue_idle, q);
+}
+
+
+static struct _DemuxData {
+ gint64 client;
+ guint16 size;
+ gchar buf[G_MAXUINT16];
+} demux;
+
+typedef struct _Client {
+ gint64 id;
+ guint8 buf[G_MAXUINT16];
+ guint16 size;
+ GSocketConnection *client_connection;
+ OutputQueue *queue;
+} Client;
+
+static GMainLoop *loop;
+static GIOStream *mux_iostream;
+static OutputQueue *mux_queue;
+static GHashTable *clients;
+
+static void
+start_mux_read(GIOStream *iostream);
+
+static void
+quit(int sig)
+{
+ g_main_loop_quit(loop);
+}
+
+static Client *
+add_client(GSocketConnection *client_connection)
+{
+ GIOStream *iostream = G_IO_STREAM(client_connection);
+ GOutputStream *ostream = g_io_stream_get_output_stream(iostream);
+ GOutputStream *bostream;
+ Client *client;
+
+ bostream = g_buffered_output_stream_new(ostream);
+ g_buffered_output_stream_set_auto_grow(G_BUFFERED_OUTPUT_STREAM(bostream), TRUE);
+
+ client = g_new0(Client, 1);
+ client->client_connection = client_connection;
+ client->id = GPOINTER_TO_INT(client_connection);
+ client->queue = output_queue_new(bostream);
+ g_object_unref(bostream);
+
+ g_hash_table_insert(clients, g_object_ref(client_connection), client);
+
+ return client;
+}
+
+static void
+remove_client(Client *client)
+{
+ g_debug("remove client %p", client);
+
+ output_queue_free(client->queue);
+ g_hash_table_remove(clients, client->client_connection);
+}
+
+typedef struct ReadData {
+ void *buffer;
+ gsize count;
+ gssize size;
+} ReadData;
+
+static void
+read_thread(GSimpleAsyncResult *simple,
+ GObject *object,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+ GInputStream *stream = G_INPUT_STREAM(object);
+ ReadData *data;
+ gsize bread;
+
+ data = g_simple_async_result_get_op_res_gpointer(simple);
+
+ g_debug("my read %" G_GSIZE_FORMAT, data->count);
+ g_input_stream_read_all(stream,
+ data->buffer, data->count, &bread,
+ cancellable, &error);
+ if (bread != data->count)
+ data->size = -1;
+ else
+ data->size = bread;
+
+ if (error) {
+ g_debug("error: %s", error->message);
+ g_simple_async_result_set_from_error(simple, error);
+ }
+}
+
+static void
+my_input_stream_read_async(GInputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *simple;
+ ReadData *data = g_new(ReadData, 1);
+
+ data->buffer = buffer;
+ data->count = count;
+
+ simple = g_simple_async_result_new(G_OBJECT(stream),
+ callback, user_data,
+ my_input_stream_read_async);
+
+ g_simple_async_result_set_op_res_gpointer(simple, data, g_free);
+ g_simple_async_result_run_in_thread(simple, read_thread, io_priority, cancellable);
+}
+
+static gssize
+my_input_stream_read_finish(GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+ ReadData *data;
+
+ g_return_val_if_fail(g_simple_async_result_is_valid(result,
+ G_OBJECT(stream),
+ my_input_stream_read_async),
+ -1);
+
+ simple = G_SIMPLE_ASYNC_RESULT(result);
+
+ if (g_simple_async_result_propagate_error(simple, error))
+ return -1;
+
+ data = g_simple_async_result_get_op_res_gpointer(simple);
+
+ return data->size;
+}
+
+static void
+mux_pushed_client_cb(OutputQueue *q, gpointer user_data)
+{
+ start_mux_read(mux_iostream);
+}
+
+static void
+mux_data_read_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GError *error = NULL;
+ gssize size;
+
+ size = my_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+ g_return_if_fail(size == demux.size);
+ if (error) {
+ g_warning("error: %s", error->message);
+ g_clear_error(&error);
+ quit(0);
+ return;
+ }
+
+ Client *c = g_hash_table_lookup(clients, GINT_TO_POINTER(demux.client));
+ g_warn_if_fail(c != NULL);
+
+ if (c)
+ output_queue_push(c->queue, (guint8 *)demux.buf, demux.size,
+ (GFunc)mux_pushed_client_cb, c);
+ else
+ start_mux_read(mux_iostream);
+}
+
+static void
+mux_size_read_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GInputStream *istream = G_INPUT_STREAM(source_object);
+ GError *error = NULL;
+ gssize size;
+
+ size = my_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+ if (error || size != sizeof(guint16))
+ goto end;
+
+ my_input_stream_read_async(istream,
+ &demux.buf, demux.size, G_PRIORITY_DEFAULT,
+ NULL, mux_data_read_cb, NULL);
+ return;
+
+end:
+ if (error) {
+ g_warning("error: %s", error->message);
+ g_clear_error(&error);
+ }
+
+ quit(0);
+}
+
+static void
+mux_client_read_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GInputStream *istream = G_INPUT_STREAM(source_object);
+ GError *error = NULL;
+ gssize size;
+
+ size = my_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+ if (error || size != sizeof(gint64))
+ goto end;
+
+ my_input_stream_read_async(istream,
+ &demux.size, sizeof(guint16), G_PRIORITY_DEFAULT,
+ NULL, mux_size_read_cb, NULL);
+ return;
+
+end:
+ if (error) {
+ g_warning("error: %s", error->message);
+ g_clear_error(&error);
+ }
+
+ quit(0);
+}
+
+static void
+start_mux_read(GIOStream *iostream)
+{
+ GInputStream *istream = g_io_stream_get_input_stream(iostream);
+
+ my_input_stream_read_async(istream,
+ &demux.client, sizeof(gint64), G_PRIORITY_DEFAULT,
+ NULL, mux_client_read_cb, NULL);
+}
+
+static void
+client_start_read(Client *client);
+
+static void
+mux_pushed_cb(OutputQueue *q, gpointer user_data)
+{
+ Client *client = user_data;
+
+ if (client->size == 0) {
+ g_debug("fixme, disconn");
+ remove_client(client);
+ return;
+ }
+
+ client_start_read(client);
+}
+
+static void
+client_read_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ Client *client = user_data;
+ GError *error = NULL;
+ gsize size;
+
+ size = g_input_stream_read_finish(G_INPUT_STREAM(source_object), res, &error);
+ if (error) {
+ g_warning("error: %s", error->message);
+ g_clear_error(&error);
+ remove_client(client);
+ return;
+ }
+
+ g_return_if_fail(size < G_MAXUINT16);
+ g_return_if_fail(size >= 0);
+ client->size = size;
+
+ output_queue_push(mux_queue, (guint8 *) &client->id, sizeof(gint64), NULL, NULL);
+ output_queue_push(mux_queue, (guint8 *) &client->size, sizeof(guint16), NULL, NULL);
+ output_queue_push(mux_queue, (guint8 *) client->buf, size, (GFunc)mux_pushed_cb, client);
+
+ return;
+}
+
+static void
+client_start_read(Client *client)
+{
+ GIOStream *iostream = G_IO_STREAM(client->client_connection);
+ GInputStream *istream = g_io_stream_get_input_stream(iostream);
+
+ g_input_stream_read_async(istream,
+ client->buf, G_MAXUINT16, G_PRIORITY_DEFAULT,
+ NULL, client_read_cb, client);
+}
+
+static gboolean
+incoming_callback(GSocketService *service,
+ GSocketConnection *client_connection,
+ GObject *source_object,
+ gpointer user_data)
+{
+ Client *client;
+
+ g_debug("new client!");
+ client = add_client(client_connection);
+ client_start_read(client);
+
+ return FALSE;
+}
+
+static GaClient *mdns_client;
+static GaEntryGroup *mdns_group;
+static GaEntryGroupService *mdns_service;
+static int port;
+
+static void
+mdns_register_service(void)
+{
+ GError *error = NULL;
+
+ if (!mdns_group) {
+ mdns_group = ga_entry_group_new();
+
+ if (!ga_entry_group_attach (mdns_group, mdns_client, &error)) {
+ g_warning("Could not attach MDNS group to client: %s", error->message);
+ g_error_free(error);
+ return;
+ }
+ }
+
+ gchar *name = g_strdup_printf("%s\'s public share", g_get_user_name ());
+ mdns_service = ga_entry_group_add_service(mdns_group,
+ name, "_webdav._tcp",
+ port, &error,
+ NULL);
+ g_free(name);
+ if (!mdns_service) {
+ g_warning("Could not create service: %s", error->message);
+ g_error_free(error);
+ return;
+ }
+
+ gchar *record = g_strdup_printf("u=,p=,path=/");
+ if (!ga_entry_group_service_set(mdns_service, name, record, &error)) {
+ g_warning("Could not update TXT record: %s", error->message);
+ g_error_free(error);
+ }
+ g_free(record);
+
+ if (!ga_entry_group_commit(mdns_group, &error)) {
+ g_warning("Could not announce MDNS service: %s", error->message);
+ g_error_free(error);
+ return;
+ }
+}
+
+static void
+mdns_state_changed(GaClient *client, GaClientState state, gpointer user_data)
+{
+ switch (state) {
+ case GA_CLIENT_STATE_FAILURE:
+ g_warning("MDNS client state failure");
+ break;
+
+ case GA_CLIENT_STATE_S_RUNNING:
+ g_debug("MDNS client found server running");
+ mdns_register_service();
+ break;
+
+ case GA_CLIENT_STATE_S_COLLISION:
+ case GA_CLIENT_STATE_S_REGISTERING:
+ g_message("MDNS collision");
+ if (mdns_group) {
+ ga_entry_group_reset (mdns_group, NULL);
+ mdns_service = 0;
+ }
+ break;
+
+ default:
+ // Do nothing
+ break;
+ }
+}
+
+static void
+open_mux_path(const char *path)
+{
+ GError *error = NULL;
+ GFile *file;
+ GFileIOStream *fio;
+
+ g_return_if_fail(path);
+ g_return_if_fail(!mux_iostream);
+ g_return_if_fail(!mux_queue);
+
+ file = g_file_new_for_path(path);
+ fio = g_file_open_readwrite(file, NULL, &error);
+ g_object_unref(file);
+
+ if (error) {
+ g_printerr("%s\n", error->message);
+ exit(1);
+ }
+
+ mux_iostream = G_IO_STREAM(fio);
+ GOutputStream *ostream = g_io_stream_get_output_stream(mux_iostream);
+ mux_queue = output_queue_new(G_OUTPUT_STREAM(ostream));
+
+ start_mux_read(mux_iostream);
+}
+
+static GOptionEntry entries[] = {
+ { "port", 'p', 0,
+ G_OPTION_ARG_INT, &port,
+ "Port to listen on", NULL },
+ { NULL }
+};
+
+int main(int argc, char *argv[])
+{
+ GOptionContext *opts;
+ GError *error = NULL;
+
+ opts = g_option_context_new(NULL);
+ g_option_context_add_main_entries(opts, entries, NULL);
+ if (!g_option_context_parse(opts, &argc, &argv, &error)) {
+ g_printerr("Could not parse arguments: %s\n",
+ error->message);
+ g_printerr("%s",
+ g_option_context_get_help(opts, TRUE, NULL));
+ exit(1);
+ }
+ if (port == 0) {
+ g_printerr("please specify a valid port\n");
+ exit(1);
+ }
+ g_option_context_free(opts);
+
+ signal(SIGINT, quit);
+
+
+ GSocketService *service = g_socket_service_new();
+ GInetAddress *iaddr = g_inet_address_new_loopback(G_SOCKET_FAMILY_IPV4);
+ GSocketAddress *saddr = g_inet_socket_address_new(iaddr, port);
+ g_object_unref(iaddr);
+
+ g_socket_listener_add_address(G_SOCKET_LISTENER(service), saddr,
+ G_SOCKET_TYPE_STREAM,
+ G_SOCKET_PROTOCOL_TCP,
+ NULL,
+ NULL,
+ &error);
+ if (error) {
+ g_printerr("%s\n", error->message);
+ exit(1);
+ }
+
+ g_signal_connect(service,
+ "incoming", G_CALLBACK(incoming_callback),
+ NULL);
+
+ clients = g_hash_table_new_full(g_direct_hash, g_direct_equal, g_object_unref, g_free);
+ open_mux_path("/dev/virtio-ports/org.spice-space.webdav.0");
+
+ /* listen on port for incoming clients, multiplex there input into
+ virtio path, demultiplex input from there to the respective
+ clients */
+
+ g_socket_service_start(service);
+
+ mdns_client = ga_client_new(GA_CLIENT_FLAG_NO_FLAGS);
+ g_signal_connect(mdns_client, "state-changed", G_CALLBACK(mdns_state_changed), NULL);
+ if (!ga_client_start(mdns_client, &error)) {
+ g_printerr("%s\n", error->message);
+ exit(1);
+ }
+
+ loop = g_main_loop_new(NULL, TRUE);
+ g_main_loop_run(loop);
+ g_main_loop_unref(loop);
+
+ output_queue_free(mux_queue);
+ g_hash_table_unref(clients);
+
+ return 0;
+}
--
1.8.4.2
More information about the Spice-devel
mailing list