[Spice-devel] [RFC spice-vdagent 05/18] add VDAgentConnection
Jakub Janků
jjanku at redhat.com
Tue Aug 14 18:53:39 UTC 2018
Add a set of helper functions built around GIO that can be used to
easily write messages to and read from the given FD.
Since VDAgentConnection uses GIO,
it integrates well with GMainLoop.
Read messages must begin with a header of a fixed size.
Message body size can vary.
User of VDAgentConnection is notified
through callbacks about the following events:
- message header read
- message body read
- I/O error
A new VDAgentConnection can be constructed using
vdagent_connection_new() based on a GIOStream.
A new GIOStream can be obtained using
vdagent_file_open() or vdagent_socket_connect().
vdagent_connection_destroy() destroyes the connection.
However, due to the asynchronous nature of used GIO functions,
this does NOT close the underlying FD immediately.
---
Makefile.am | 2 +
src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
src/vdagent-connection.h | 103 ++++++++++++++
3 files changed, 406 insertions(+)
create mode 100644 src/vdagent-connection.c
create mode 100644 src/vdagent-connection.h
diff --git a/Makefile.am b/Makefile.am
index fa54bbc..b291b19 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
common_sources = \
src/udscs.c \
src/udscs.h \
+ src/vdagent-connection.c \
+ src/vdagent-connection.h \
src/vdagentd-proto-strings.h \
src/vdagentd-proto.h \
$(NULL)
diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
new file mode 100644
index 0000000..0eb2ec9
--- /dev/null
+++ b/src/vdagent-connection.c
@@ -0,0 +1,301 @@
+/* vdagent-connection.c
+
+ Copyright 2018 Red Hat, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <syslog.h>
+#include <fcntl.h>
+#include <glib/gstdio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixsocketaddress.h>
+
+#include "vdagent-connection.h"
+
+struct VDAgentConnection {
+ GIOStream *io_stream;
+ gboolean opening;
+ GCancellable *cancellable;
+
+ GQueue *write_queue;
+ GMainLoop *flush_loop;
+
+ VDAgentConnReadCb read_cb;
+ gpointer read_buff;
+ gpointer header_buff;
+ gsize header_size;
+ VDAgentConnHeaderReadCb header_read_cb;
+
+ VDAgentConnErrorCb error_cb;
+
+ GCredentials *credentials;
+
+ gpointer user_data;
+};
+
+static void request_message_write(VDAgentConnection *conn);
+static void request_message_read(VDAgentConnection *conn);
+
+GIOStream *vdagent_file_open(const gchar *path)
+{
+ gint fd;
+
+ fd = g_open(path, O_RDWR);
+ if (fd == -1) {
+ syslog(LOG_ERR, "%s: %m", __func__);
+ return NULL;
+ }
+
+ return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
+ g_unix_output_stream_new(fd, TRUE));
+}
+
+GIOStream *vdagent_socket_connect(const gchar *address)
+{
+ GSocketConnection *socket_conn;
+ GSocketClient *client;
+ GSocketConnectable *connectable;
+ GError *err = NULL;
+
+ connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
+ client = g_object_new(G_TYPE_SOCKET_CLIENT,
+ "family", G_SOCKET_FAMILY_UNIX,
+ "type", G_SOCKET_TYPE_STREAM,
+ NULL);
+
+ socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
+ g_object_unref(client);
+ g_object_unref(connectable);
+ if (err) {
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_error_free(err);
+ }
+ return G_IO_STREAM(socket_conn);
+}
+
+VDAgentConnection *vdagent_connection_new(
+ GIOStream *io_stream,
+ gboolean wait_on_opening,
+ gsize header_size,
+ VDAgentConnHeaderReadCb header_read_cb,
+ VDAgentConnReadCb read_cb,
+ VDAgentConnErrorCb error_cb,
+ gpointer user_data)
+{
+ VDAgentConnection *conn;
+
+ conn = g_new(VDAgentConnection, 1);
+ conn->io_stream = io_stream;
+ conn->cancellable = g_cancellable_new();
+ conn->opening = wait_on_opening;
+ conn->write_queue = g_queue_new();
+ conn->flush_loop = NULL;
+ conn->read_cb = read_cb;
+ conn->read_buff = NULL;
+ conn->header_buff = g_malloc(header_size);
+ conn->header_size = header_size;
+ conn->header_read_cb = header_read_cb;
+ conn->error_cb = error_cb;
+ conn->credentials = NULL;
+ conn->user_data = user_data;
+
+ request_message_read(conn);
+
+ return conn;
+}
+
+static gboolean connection_has_pending(VDAgentConnection *conn)
+{
+ GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
+ GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
+
+ return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
+}
+
+/* Free up all resources used by VDAgentConnection
+ * once all I/O operations have finished. */
+static void connection_finalize(VDAgentConnection *conn)
+{
+ g_object_unref(conn->cancellable);
+ g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
+ g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+ g_clear_object(&conn->credentials);
+ g_free(conn->header_buff);
+ g_free(conn->read_buff);
+ g_object_unref(conn->io_stream);
+ g_free(conn);
+}
+
+void vdagent_connection_destroy(VDAgentConnection *conn)
+{
+ /* If there's a pending I/O operation on either of the streams, cancel it,
+ * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
+ if (connection_has_pending(conn))
+ g_cancellable_cancel(conn->cancellable);
+ else
+ connection_finalize(conn);
+}
+
+static void handle_io_error(VDAgentConnection *conn, GError *err)
+{
+ if (g_cancellable_is_cancelled(conn->cancellable)) {
+ if (!connection_has_pending(conn))
+ connection_finalize(conn);
+ } else {
+ syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
+ conn->error_cb(conn->user_data);
+ }
+ g_error_free(err);
+}
+
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
+{
+ GSocket *socket;
+ GError *err = NULL;
+
+ g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
+
+ if (conn->credentials)
+ return conn->credentials;
+
+ socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
+ conn->credentials = g_socket_get_credentials(socket, &err);
+ if (err) {
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_error_free(err);
+ }
+ return conn->credentials;
+}
+
+static void message_write_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ VDAgentConnection *conn = user_data;
+ GOutputStream *out = G_OUTPUT_STREAM(source_object);
+ GError *err = NULL;
+
+ g_output_stream_write_all_finish(out, res, NULL, &err);
+ g_bytes_unref(g_queue_pop_head(conn->write_queue));
+
+ if (err)
+ return handle_io_error(conn, err);
+
+ conn->opening = FALSE;
+
+ if (g_queue_is_empty(conn->write_queue))
+ g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+ else
+ request_message_write(conn);
+}
+
+static void request_message_write(VDAgentConnection *conn)
+{
+ GBytes *msg;
+ GOutputStream *out;
+
+ msg = g_queue_peek_head(conn->write_queue);
+ out = g_io_stream_get_output_stream(conn->io_stream);
+
+ g_output_stream_write_all_async(out,
+ g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
+ G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
+}
+
+void vdagent_connection_write(VDAgentConnection *conn,
+ gpointer data,
+ gsize size)
+{
+ g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
+
+ if (g_queue_get_length(conn->write_queue) == 1)
+ request_message_write(conn);
+}
+
+void vdagent_connection_flush(VDAgentConnection *conn)
+{
+ GMainLoop *loop;
+ /* TODO: allow multiple flush calls at once? */
+ g_return_if_fail(conn->flush_loop == NULL);
+
+ if (g_queue_is_empty(conn->write_queue))
+ return;
+
+ loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
+ /* When using GTK+, this should be wrapped with
+ * gdk_threads_leave() and gdk_threads_enter(),
+ * but since flush is used in virtio-port.c only
+ * let's leave it as it is for now. */
+ g_main_loop_run(loop);
+ g_main_loop_unref(loop);
+}
+
+static void message_read_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ VDAgentConnection *conn = user_data;
+ GInputStream *in = G_INPUT_STREAM(source_object);
+ GError *err = NULL;
+ gsize bytes_read, data_size;
+
+ g_input_stream_read_all_finish(in, res, &bytes_read, &err);
+ if (err)
+ return handle_io_error(conn, err);
+ if (bytes_read == 0) {
+ /* see virtio-port.c for the rationale behind this */
+ if (conn->opening) {
+ g_usleep(10000);
+ request_message_read(conn);
+ } else {
+ conn->error_cb(conn->user_data);
+ }
+ return;
+ }
+ conn->opening = FALSE;
+
+ if (conn->read_buff == NULL) {
+ /* we've read the message header, now let's read its body */
+ if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
+ return;
+ if (data_size > 0) {
+ conn->read_buff = g_malloc(data_size);
+ /* TODO: if allocation fails, we could try g_input_stream_skip()
+ * and hope that the message wasn't crucial for proper functiong.
+ * An example might be when a user tries to copy large clipboard.
+ * Not sure whether it's worth implementing.
+ * Other stuff might just as well fall apart
+ * when the system is running out of memory? */
+ g_input_stream_read_all_async(in, conn->read_buff, data_size,
+ G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
+ return;
+ }
+ }
+
+ if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
+ return;
+ g_clear_pointer(&conn->read_buff, g_free);
+ request_message_read(conn);
+}
+
+static void request_message_read(VDAgentConnection *conn)
+{
+ GInputStream *in;
+ in = g_io_stream_get_input_stream(conn->io_stream);
+
+ g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
+ G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
+}
diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
new file mode 100644
index 0000000..6fc0081
--- /dev/null
+++ b/src/vdagent-connection.h
@@ -0,0 +1,103 @@
+/* vdagent-connection.h
+
+ Copyright 2018 Red Hat, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __VDAGENT_CONNECTION_H
+#define __VDAGENT_CONNECTION_H
+
+#include <glib.h>
+#include <gio/gio.h>
+
+typedef struct VDAgentConnection VDAgentConnection;
+
+/* Called when a message header has been read.
+ *
+ * If the handler wishes to continue reading,
+ * it must set @body_size to the size of message's body and return TRUE.
+ * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
+ *
+ * Otherwise the handler should return FALSE
+ * and call vdagent_connection_destroy().
+ *
+ * @header_buff is owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
+ gsize *body_size,
+ gpointer user_data);
+
+/* Called when a full message has been read.
+ *
+ * If the handler wished to continue reading, it must return TRUE,
+ * otherwise FALSE and call vdagent_connection_destroy().
+ *
+ * @header, @data are owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnReadCb)(gpointer header,
+ gpointer data,
+ gpointer user_data);
+
+/* Called when an error occured during read or wirte.
+ * The handler is expected to call vdagent_connection_destroy(). */
+typedef void (*VDAgentConnErrorCb)(gpointer user_data);
+
+/* Open a file in @path for read and write.
+ * Returns a GIOStream to the given file or NULL on error. */
+GIOStream *vdagent_file_open(const gchar *path);
+
+/* Create a socket and initiate a new connection to the socket on @address.
+ * Returns a GIOStream corresponding to the new connection or NULL on error. */
+GIOStream *vdagent_socket_connect(const gchar *address);
+
+/* Create new VDAgentConnection and start reading incoming messages.
+ *
+ * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
+ * until the first message is successfully read or written to the @io_stream.
+ *
+ * @user_data will be passed to the supplied callbacks. */
+VDAgentConnection *vdagent_connection_new(
+ GIOStream *io_stream,
+ gboolean wait_on_opening,
+ gsize header_size,
+ VDAgentConnHeaderReadCb header_read_cb,
+ VDAgentConnReadCb read_cb,
+ VDAgentConnErrorCb error_cb,
+ gpointer user_data);
+
+/* Free up all resources associated with the VDAgentConnection.
+ *
+ * This operation can be asynchronous. */
+void vdagent_connection_destroy(VDAgentConnection *conn);
+
+/* Append a message to write queue.
+ *
+ * VDAgentConnection takes ownership of the @data
+ * and frees it once the message is flushed. */
+void vdagent_connection_write(VDAgentConnection *conn,
+ gpointer data,
+ gsize size);
+
+/* Waits until all queued messages get written to the output stream.
+ *
+ * Note: other GSources can be triggered during this call */
+void vdagent_connection_flush(VDAgentConnection *conn);
+
+/* Returns the credentials of the foreign process connected to the socket.
+ *
+ * It is an error to call this function with a VDAgentConnection
+ * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
+GCredentials *vdagent_connection_get_peer_credentials(
+ VDAgentConnection *conn);
+
+#endif
--
2.17.1
More information about the Spice-devel
mailing list