[Spice-devel] [RFC spice-vdagent 05/18] add VDAgentConnection
Victor Toso
victortoso at redhat.com
Tue Aug 28 08:04:06 UTC 2018
Hi,
On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote:
> Add a set of helper functions built around GIO that can be used to
> easily write messages to and read from the given FD.
>
> Since VDAgentConnection uses GIO,
> it integrates well with GMainLoop.
>
> Read messages must begin with a header of a fixed size.
> Message body size can vary.
>
> User of VDAgentConnection is notified
> through callbacks about the following events:
> - message header read
> - message body read
> - I/O error
>
> A new VDAgentConnection can be constructed using
> vdagent_connection_new() based on a GIOStream.
>
> A new GIOStream can be obtained using
> vdagent_file_open() or vdagent_socket_connect().
>
> vdagent_connection_destroy() destroyes the connection.
> However, due to the asynchronous nature of used GIO functions,
> this does NOT close the underlying FD immediately.
Yep, commented about it on 00/18 but I take that making this a
GObject might help. Not giving a full review here, just small
note after looking at the patch and the follow up ones.
> ---
> Makefile.am | 2 +
> src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
> src/vdagent-connection.h | 103 ++++++++++++++
> 3 files changed, 406 insertions(+)
> create mode 100644 src/vdagent-connection.c
> create mode 100644 src/vdagent-connection.h
>
> diff --git a/Makefile.am b/Makefile.am
> index fa54bbc..b291b19 100644
> --- a/Makefile.am
> +++ b/Makefile.am
> @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
> common_sources = \
> src/udscs.c \
> src/udscs.h \
> + src/vdagent-connection.c \
> + src/vdagent-connection.h \
> src/vdagentd-proto-strings.h \
> src/vdagentd-proto.h \
> $(NULL)
> diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
> new file mode 100644
> index 0000000..0eb2ec9
> --- /dev/null
> +++ b/src/vdagent-connection.c
> @@ -0,0 +1,301 @@
> +/* vdagent-connection.c
> +
> + Copyright 2018 Red Hat, Inc.
> +
> + This program is free software: you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation, either version 3 of the License, or
> + (at your option) any later version.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program. If not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#include <syslog.h>
> +#include <fcntl.h>
> +#include <glib/gstdio.h>
> +#include <gio/gunixinputstream.h>
> +#include <gio/gunixoutputstream.h>
> +#include <gio/gunixsocketaddress.h>
> +
> +#include "vdagent-connection.h"
> +
> +struct VDAgentConnection {
> + GIOStream *io_stream;
> + gboolean opening;
> + GCancellable *cancellable;
> +
> + GQueue *write_queue;
> + GMainLoop *flush_loop;
> +
> + VDAgentConnReadCb read_cb;
> + gpointer read_buff;
> + gpointer header_buff;
> + gsize header_size;
> + VDAgentConnHeaderReadCb header_read_cb;
> +
> + VDAgentConnErrorCb error_cb;
> +
> + GCredentials *credentials;
> +
> + gpointer user_data;
> +};
> +
> +static void request_message_write(VDAgentConnection *conn);
> +static void request_message_read(VDAgentConnection *conn);
> +
> +GIOStream *vdagent_file_open(const gchar *path)
> +{
> + gint fd;
> +
> + fd = g_open(path, O_RDWR);
> + if (fd == -1) {
> + syslog(LOG_ERR, "%s: %m", __func__);
> + return NULL;
> + }
> +
> + return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
> + g_unix_output_stream_new(fd, TRUE));
> +}
> +
> +GIOStream *vdagent_socket_connect(const gchar *address)
> +{
> + GSocketConnection *socket_conn;
> + GSocketClient *client;
> + GSocketConnectable *connectable;
> + GError *err = NULL;
> +
> + connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
> + client = g_object_new(G_TYPE_SOCKET_CLIENT,
> + "family", G_SOCKET_FAMILY_UNIX,
> + "type", G_SOCKET_TYPE_STREAM,
> + NULL);
> +
> + socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
> + g_object_unref(client);
> + g_object_unref(connectable);
> + if (err) {
> + syslog(LOG_ERR, "%s: %s", __func__, err->message);
> + g_error_free(err);
> + }
> + return G_IO_STREAM(socket_conn);
> +}
Not convinced that this API is really needed? The function can be
kept but to be used internally by vdagent_connection_new()
itself, I guess. I don't see other components using GIOStream
unless to pass it to vdagent_connection_new ().
Might make sense to make socket's address as VDAgentConnection's
property which should be set on g_object_new () too.
Reviewed-by: Victor Toso <victortoso at redhat.com>
Victor
> +
> +VDAgentConnection *vdagent_connection_new(
> + GIOStream *io_stream,
> + gboolean wait_on_opening,
> + gsize header_size,
> + VDAgentConnHeaderReadCb header_read_cb,
> + VDAgentConnReadCb read_cb,
> + VDAgentConnErrorCb error_cb,
> + gpointer user_data)
> +{
> + VDAgentConnection *conn;
> +
> + conn = g_new(VDAgentConnection, 1);
> + conn->io_stream = io_stream;
> + conn->cancellable = g_cancellable_new();
> + conn->opening = wait_on_opening;
> + conn->write_queue = g_queue_new();
> + conn->flush_loop = NULL;
> + conn->read_cb = read_cb;
> + conn->read_buff = NULL;
> + conn->header_buff = g_malloc(header_size);
> + conn->header_size = header_size;
> + conn->header_read_cb = header_read_cb;
> + conn->error_cb = error_cb;
> + conn->credentials = NULL;
> + conn->user_data = user_data;
> +
> + request_message_read(conn);
> +
> + return conn;
> +}
> +
> +static gboolean connection_has_pending(VDAgentConnection *conn)
> +{
> + GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
> + GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
> +
> + return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
> +}
> +
> +/* Free up all resources used by VDAgentConnection
> + * once all I/O operations have finished. */
> +static void connection_finalize(VDAgentConnection *conn)
> +{
> + g_object_unref(conn->cancellable);
> + g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
> + g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> + g_clear_object(&conn->credentials);
> + g_free(conn->header_buff);
> + g_free(conn->read_buff);
> + g_object_unref(conn->io_stream);
> + g_free(conn);
> +}
> +
> +void vdagent_connection_destroy(VDAgentConnection *conn)
> +{
> + /* If there's a pending I/O operation on either of the streams, cancel it,
> + * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
> + if (connection_has_pending(conn))
> + g_cancellable_cancel(conn->cancellable);
> + else
> + connection_finalize(conn);
> +}
> +
> +static void handle_io_error(VDAgentConnection *conn, GError *err)
> +{
> + if (g_cancellable_is_cancelled(conn->cancellable)) {
> + if (!connection_has_pending(conn))
> + connection_finalize(conn);
> + } else {
> + syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
> + conn->error_cb(conn->user_data);
> + }
> + g_error_free(err);
> +}
> +
> +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
> +{
> + GSocket *socket;
> + GError *err = NULL;
> +
> + g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
> +
> + if (conn->credentials)
> + return conn->credentials;
> +
> + socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
> + conn->credentials = g_socket_get_credentials(socket, &err);
> + if (err) {
> + syslog(LOG_ERR, "%s: %s", __func__, err->message);
> + g_error_free(err);
> + }
> + return conn->credentials;
> +}
> +
> +static void message_write_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + VDAgentConnection *conn = user_data;
> + GOutputStream *out = G_OUTPUT_STREAM(source_object);
> + GError *err = NULL;
> +
> + g_output_stream_write_all_finish(out, res, NULL, &err);
> + g_bytes_unref(g_queue_pop_head(conn->write_queue));
> +
> + if (err)
> + return handle_io_error(conn, err);
> +
> + conn->opening = FALSE;
> +
> + if (g_queue_is_empty(conn->write_queue))
> + g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> + else
> + request_message_write(conn);
> +}
> +
> +static void request_message_write(VDAgentConnection *conn)
> +{
> + GBytes *msg;
> + GOutputStream *out;
> +
> + msg = g_queue_peek_head(conn->write_queue);
> + out = g_io_stream_get_output_stream(conn->io_stream);
> +
> + g_output_stream_write_all_async(out,
> + g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
> + G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
> +}
> +
> +void vdagent_connection_write(VDAgentConnection *conn,
> + gpointer data,
> + gsize size)
> +{
> + g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
> +
> + if (g_queue_get_length(conn->write_queue) == 1)
> + request_message_write(conn);
> +}
> +
> +void vdagent_connection_flush(VDAgentConnection *conn)
> +{
> + GMainLoop *loop;
> + /* TODO: allow multiple flush calls at once? */
> + g_return_if_fail(conn->flush_loop == NULL);
> +
> + if (g_queue_is_empty(conn->write_queue))
> + return;
> +
> + loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
> + /* When using GTK+, this should be wrapped with
> + * gdk_threads_leave() and gdk_threads_enter(),
> + * but since flush is used in virtio-port.c only
> + * let's leave it as it is for now. */
> + g_main_loop_run(loop);
> + g_main_loop_unref(loop);
> +}
> +
> +static void message_read_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + VDAgentConnection *conn = user_data;
> + GInputStream *in = G_INPUT_STREAM(source_object);
> + GError *err = NULL;
> + gsize bytes_read, data_size;
> +
> + g_input_stream_read_all_finish(in, res, &bytes_read, &err);
> + if (err)
> + return handle_io_error(conn, err);
> + if (bytes_read == 0) {
> + /* see virtio-port.c for the rationale behind this */
> + if (conn->opening) {
> + g_usleep(10000);
> + request_message_read(conn);
> + } else {
> + conn->error_cb(conn->user_data);
> + }
> + return;
> + }
> + conn->opening = FALSE;
> +
> + if (conn->read_buff == NULL) {
> + /* we've read the message header, now let's read its body */
> + if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
> + return;
> + if (data_size > 0) {
> + conn->read_buff = g_malloc(data_size);
> + /* TODO: if allocation fails, we could try g_input_stream_skip()
> + * and hope that the message wasn't crucial for proper functiong.
> + * An example might be when a user tries to copy large clipboard.
> + * Not sure whether it's worth implementing.
> + * Other stuff might just as well fall apart
> + * when the system is running out of memory? */
> + g_input_stream_read_all_async(in, conn->read_buff, data_size,
> + G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> + return;
> + }
> + }
> +
> + if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
> + return;
> + g_clear_pointer(&conn->read_buff, g_free);
> + request_message_read(conn);
> +}
> +
> +static void request_message_read(VDAgentConnection *conn)
> +{
> + GInputStream *in;
> + in = g_io_stream_get_input_stream(conn->io_stream);
> +
> + g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
> + G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> +}
> diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
> new file mode 100644
> index 0000000..6fc0081
> --- /dev/null
> +++ b/src/vdagent-connection.h
> @@ -0,0 +1,103 @@
> +/* vdagent-connection.h
> +
> + Copyright 2018 Red Hat, Inc.
> +
> + This program is free software: you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation, either version 3 of the License, or
> + (at your option) any later version.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program. If not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#ifndef __VDAGENT_CONNECTION_H
> +#define __VDAGENT_CONNECTION_H
> +
> +#include <glib.h>
> +#include <gio/gio.h>
> +
> +typedef struct VDAgentConnection VDAgentConnection;
> +
> +/* Called when a message header has been read.
> + *
> + * If the handler wishes to continue reading,
> + * it must set @body_size to the size of message's body and return TRUE.
> + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
> + *
> + * Otherwise the handler should return FALSE
> + * and call vdagent_connection_destroy().
> + *
> + * @header_buff is owned by VDAgentConnection and must not be freed. */
> +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
> + gsize *body_size,
> + gpointer user_data);
> +
> +/* Called when a full message has been read.
> + *
> + * If the handler wished to continue reading, it must return TRUE,
> + * otherwise FALSE and call vdagent_connection_destroy().
> + *
> + * @header, @data are owned by VDAgentConnection and must not be freed. */
> +typedef gboolean (*VDAgentConnReadCb)(gpointer header,
> + gpointer data,
> + gpointer user_data);
> +
> +/* Called when an error occured during read or wirte.
> + * The handler is expected to call vdagent_connection_destroy(). */
> +typedef void (*VDAgentConnErrorCb)(gpointer user_data);
> +
> +/* Open a file in @path for read and write.
> + * Returns a GIOStream to the given file or NULL on error. */
> +GIOStream *vdagent_file_open(const gchar *path);
> +
> +/* Create a socket and initiate a new connection to the socket on @address.
> + * Returns a GIOStream corresponding to the new connection or NULL on error. */
> +GIOStream *vdagent_socket_connect(const gchar *address);
> +
> +/* Create new VDAgentConnection and start reading incoming messages.
> + *
> + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
> + * until the first message is successfully read or written to the @io_stream.
> + *
> + * @user_data will be passed to the supplied callbacks. */
> +VDAgentConnection *vdagent_connection_new(
> + GIOStream *io_stream,
> + gboolean wait_on_opening,
> + gsize header_size,
> + VDAgentConnHeaderReadCb header_read_cb,
> + VDAgentConnReadCb read_cb,
> + VDAgentConnErrorCb error_cb,
> + gpointer user_data);
> +
> +/* Free up all resources associated with the VDAgentConnection.
> + *
> + * This operation can be asynchronous. */
> +void vdagent_connection_destroy(VDAgentConnection *conn);
> +
> +/* Append a message to write queue.
> + *
> + * VDAgentConnection takes ownership of the @data
> + * and frees it once the message is flushed. */
> +void vdagent_connection_write(VDAgentConnection *conn,
> + gpointer data,
> + gsize size);
> +
> +/* Waits until all queued messages get written to the output stream.
> + *
> + * Note: other GSources can be triggered during this call */
> +void vdagent_connection_flush(VDAgentConnection *conn);
> +
> +/* Returns the credentials of the foreign process connected to the socket.
> + *
> + * It is an error to call this function with a VDAgentConnection
> + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
> +GCredentials *vdagent_connection_get_peer_credentials(
> + VDAgentConnection *conn);
> +
> +#endif
> --
> 2.17.1
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 833 bytes
Desc: not available
URL: <https://lists.freedesktop.org/archives/spice-devel/attachments/20180828/23b5c3b2/attachment-0001.sig>
More information about the Spice-devel
mailing list