[Spice-devel] [RFC spice-vdagent 05/18] add VDAgentConnection

Jakub Janků jjanku at redhat.com
Tue Aug 14 18:53:39 UTC 2018


Add a set of helper functions built around GIO that can be used to
easily write messages to and read from the given FD.

Since VDAgentConnection uses GIO,
it integrates well with GMainLoop.

Read messages must begin with a header of a fixed size.
Message body size can vary.

User of VDAgentConnection is notified
through callbacks about the following events:
- message header read
- message body read
- I/O error

A new VDAgentConnection can be constructed using
vdagent_connection_new() based on a GIOStream.

A new GIOStream can be obtained using
vdagent_file_open() or vdagent_socket_connect().

vdagent_connection_destroy() destroyes the connection.
However, due to the asynchronous nature of used GIO functions,
this does NOT close the underlying FD immediately.
---
 Makefile.am              |   2 +
 src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
 src/vdagent-connection.h | 103 ++++++++++++++
 3 files changed, 406 insertions(+)
 create mode 100644 src/vdagent-connection.c
 create mode 100644 src/vdagent-connection.h

diff --git a/Makefile.am b/Makefile.am
index fa54bbc..b291b19 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
 common_sources =				\
 	src/udscs.c				\
 	src/udscs.h				\
+	src/vdagent-connection.c		\
+	src/vdagent-connection.h		\
 	src/vdagentd-proto-strings.h		\
 	src/vdagentd-proto.h			\
 	$(NULL)
diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
new file mode 100644
index 0000000..0eb2ec9
--- /dev/null
+++ b/src/vdagent-connection.c
@@ -0,0 +1,301 @@
+/*  vdagent-connection.c
+
+    Copyright 2018 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <syslog.h>
+#include <fcntl.h>
+#include <glib/gstdio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixsocketaddress.h>
+
+#include "vdagent-connection.h"
+
+struct VDAgentConnection {
+    GIOStream              *io_stream;
+    gboolean                opening;
+    GCancellable           *cancellable;
+
+    GQueue                 *write_queue;
+    GMainLoop              *flush_loop;
+
+    VDAgentConnReadCb       read_cb;
+    gpointer                read_buff;
+    gpointer                header_buff;
+    gsize                   header_size;
+    VDAgentConnHeaderReadCb header_read_cb;
+
+    VDAgentConnErrorCb      error_cb;
+
+    GCredentials           *credentials;
+
+    gpointer                user_data;
+};
+
+static void request_message_write(VDAgentConnection *conn);
+static void request_message_read(VDAgentConnection *conn);
+
+GIOStream *vdagent_file_open(const gchar *path)
+{
+    gint fd;
+
+    fd = g_open(path, O_RDWR);
+    if (fd == -1) {
+        syslog(LOG_ERR, "%s: %m", __func__);
+        return NULL;
+    }
+
+    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
+                                  g_unix_output_stream_new(fd, TRUE));
+}
+
+GIOStream *vdagent_socket_connect(const gchar *address)
+{
+    GSocketConnection *socket_conn;
+    GSocketClient *client;
+    GSocketConnectable *connectable;
+    GError *err = NULL;
+
+    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
+    client = g_object_new(G_TYPE_SOCKET_CLIENT,
+                          "family", G_SOCKET_FAMILY_UNIX,
+                          "type", G_SOCKET_TYPE_STREAM,
+                          NULL);
+
+    socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
+    g_object_unref(client);
+    g_object_unref(connectable);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
+    }
+    return G_IO_STREAM(socket_conn);
+}
+
+VDAgentConnection *vdagent_connection_new(
+    GIOStream              *io_stream,
+    gboolean                wait_on_opening,
+    gsize                   header_size,
+    VDAgentConnHeaderReadCb header_read_cb,
+    VDAgentConnReadCb       read_cb,
+    VDAgentConnErrorCb      error_cb,
+    gpointer                user_data)
+{
+    VDAgentConnection *conn;
+
+    conn = g_new(VDAgentConnection, 1);
+    conn->io_stream = io_stream;
+    conn->cancellable = g_cancellable_new();
+    conn->opening = wait_on_opening;
+    conn->write_queue = g_queue_new();
+    conn->flush_loop = NULL;
+    conn->read_cb = read_cb;
+    conn->read_buff = NULL;
+    conn->header_buff = g_malloc(header_size);
+    conn->header_size = header_size;
+    conn->header_read_cb = header_read_cb;
+    conn->error_cb = error_cb;
+    conn->credentials = NULL;
+    conn->user_data = user_data;
+
+    request_message_read(conn);
+
+    return conn;
+}
+
+static gboolean connection_has_pending(VDAgentConnection *conn)
+{
+    GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
+    GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
+
+    return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
+}
+
+/* Free up all resources used by VDAgentConnection
+ * once all I/O operations have finished. */
+static void connection_finalize(VDAgentConnection *conn)
+{
+    g_object_unref(conn->cancellable);
+    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
+    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+    g_clear_object(&conn->credentials);
+    g_free(conn->header_buff);
+    g_free(conn->read_buff);
+    g_object_unref(conn->io_stream);
+    g_free(conn);
+}
+
+void vdagent_connection_destroy(VDAgentConnection *conn)
+{
+    /* If there's a pending I/O operation on either of the streams, cancel it,
+     * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
+    if (connection_has_pending(conn))
+        g_cancellable_cancel(conn->cancellable);
+    else
+        connection_finalize(conn);
+}
+
+static void handle_io_error(VDAgentConnection *conn, GError *err)
+{
+    if (g_cancellable_is_cancelled(conn->cancellable)) {
+        if (!connection_has_pending(conn))
+            connection_finalize(conn);
+    } else {
+        syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
+        conn->error_cb(conn->user_data);
+    }
+    g_error_free(err);
+}
+
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
+{
+    GSocket *socket;
+    GError *err = NULL;
+
+    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
+
+    if (conn->credentials)
+        return conn->credentials;
+
+    socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
+    conn->credentials = g_socket_get_credentials(socket, &err);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
+    }
+    return conn->credentials;
+}
+
+static void message_write_cb(GObject      *source_object,
+                             GAsyncResult *res,
+                             gpointer      user_data)
+{
+    VDAgentConnection *conn = user_data;
+    GOutputStream *out = G_OUTPUT_STREAM(source_object);
+    GError *err = NULL;
+
+    g_output_stream_write_all_finish(out, res, NULL, &err);
+    g_bytes_unref(g_queue_pop_head(conn->write_queue));
+
+    if (err)
+        return handle_io_error(conn, err);
+
+    conn->opening = FALSE;
+
+    if (g_queue_is_empty(conn->write_queue))
+        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+    else
+        request_message_write(conn);
+}
+
+static void request_message_write(VDAgentConnection *conn)
+{
+    GBytes *msg;
+    GOutputStream *out;
+
+    msg = g_queue_peek_head(conn->write_queue);
+    out = g_io_stream_get_output_stream(conn->io_stream);
+
+    g_output_stream_write_all_async(out,
+        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
+        G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
+}
+
+void vdagent_connection_write(VDAgentConnection *conn,
+                              gpointer           data,
+                              gsize              size)
+{
+    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
+
+    if (g_queue_get_length(conn->write_queue) == 1)
+        request_message_write(conn);
+}
+
+void vdagent_connection_flush(VDAgentConnection *conn)
+{
+    GMainLoop *loop;
+    /* TODO: allow multiple flush calls at once? */
+    g_return_if_fail(conn->flush_loop == NULL);
+
+    if (g_queue_is_empty(conn->write_queue))
+        return;
+
+    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
+    /* When using GTK+, this should be wrapped with
+     * gdk_threads_leave() and gdk_threads_enter(),
+     * but since flush is used in virtio-port.c only
+     * let's leave it as it is for now. */
+    g_main_loop_run(loop);
+    g_main_loop_unref(loop);
+}
+
+static void message_read_cb(GObject      *source_object,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+    VDAgentConnection *conn = user_data;
+    GInputStream *in = G_INPUT_STREAM(source_object);
+    GError *err = NULL;
+    gsize bytes_read, data_size;
+
+    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
+    if (err)
+        return handle_io_error(conn, err);
+    if (bytes_read == 0) {
+        /* see virtio-port.c for the rationale behind this */
+        if (conn->opening) {
+            g_usleep(10000);
+            request_message_read(conn);
+        } else {
+            conn->error_cb(conn->user_data);
+        }
+        return;
+    }
+    conn->opening = FALSE;
+
+    if (conn->read_buff == NULL) {
+        /* we've read the message header, now let's read its body */
+        if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
+            return;
+        if (data_size > 0) {
+            conn->read_buff = g_malloc(data_size);
+            /* TODO: if allocation fails, we could try g_input_stream_skip()
+             * and hope that the message wasn't crucial for proper functiong.
+             * An example might be when a user tries to copy large clipboard.
+             * Not sure whether it's worth implementing.
+             * Other stuff might just as well fall apart
+             * when the system is running out of memory? */
+            g_input_stream_read_all_async(in, conn->read_buff, data_size,
+                G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
+            return;
+        }
+    }
+
+    if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
+        return;
+    g_clear_pointer(&conn->read_buff, g_free);
+    request_message_read(conn);
+}
+
+static void request_message_read(VDAgentConnection *conn)
+{
+    GInputStream *in;
+    in = g_io_stream_get_input_stream(conn->io_stream);
+
+    g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
+        G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
+}
diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
new file mode 100644
index 0000000..6fc0081
--- /dev/null
+++ b/src/vdagent-connection.h
@@ -0,0 +1,103 @@
+/*  vdagent-connection.h
+
+    Copyright 2018 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __VDAGENT_CONNECTION_H
+#define __VDAGENT_CONNECTION_H
+
+#include <glib.h>
+#include <gio/gio.h>
+
+typedef struct VDAgentConnection VDAgentConnection;
+
+/* Called when a message header has been read.
+ *
+ * If the handler wishes to continue reading,
+ * it must set @body_size to the size of message's body and return TRUE.
+ * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
+ *
+ * Otherwise the handler should return FALSE
+ * and call vdagent_connection_destroy().
+ *
+ * @header_buff is owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
+                                            gsize   *body_size,
+                                            gpointer user_data);
+
+/* Called when a full message has been read.
+ *
+ * If the handler wished to continue reading, it must return TRUE,
+ * otherwise FALSE and call vdagent_connection_destroy().
+ *
+ * @header, @data are owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnReadCb)(gpointer header,
+                                      gpointer data,
+                                      gpointer user_data);
+
+/* Called when an error occured during read or wirte.
+ * The handler is expected to call vdagent_connection_destroy(). */
+typedef void (*VDAgentConnErrorCb)(gpointer user_data);
+
+/* Open a file in @path for read and write.
+ * Returns a GIOStream to the given file or NULL on error. */
+GIOStream *vdagent_file_open(const gchar *path);
+
+/* Create a socket and initiate a new connection to the socket on @address.
+ * Returns a GIOStream corresponding to the new connection or NULL on error. */
+GIOStream *vdagent_socket_connect(const gchar *address);
+
+/* Create new VDAgentConnection and start reading incoming messages.
+ *
+ * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
+ * until the first message is successfully read or written to the @io_stream.
+ *
+ * @user_data will be passed to the supplied callbacks. */
+VDAgentConnection *vdagent_connection_new(
+    GIOStream              *io_stream,
+    gboolean                wait_on_opening,
+    gsize                   header_size,
+    VDAgentConnHeaderReadCb header_read_cb,
+    VDAgentConnReadCb       read_cb,
+    VDAgentConnErrorCb      error_cb,
+    gpointer                user_data);
+
+/* Free up all resources associated with the VDAgentConnection.
+ *
+ * This operation can be asynchronous. */
+void vdagent_connection_destroy(VDAgentConnection *conn);
+
+/* Append a message to write queue.
+ *
+ * VDAgentConnection takes ownership of the @data
+ * and frees it once the message is flushed. */
+void vdagent_connection_write(VDAgentConnection *conn,
+                              gpointer           data,
+                              gsize              size);
+
+/* Waits until all queued messages get written to the output stream.
+ *
+ * Note: other GSources can be triggered during this call */
+void vdagent_connection_flush(VDAgentConnection *conn);
+
+/* Returns the credentials of the foreign process connected to the socket.
+ *
+ * It is an error to call this function with a VDAgentConnection
+ * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
+GCredentials *vdagent_connection_get_peer_credentials(
+    VDAgentConnection *conn);
+
+#endif
-- 
2.17.1



More information about the Spice-devel mailing list