[Spice-devel] [RFC spice-vdagent 05/18] add VDAgentConnection

Jakub Janku jjanku at redhat.com
Mon Sep 3 16:49:38 UTC 2018


Hey,

On Tue, Aug 28, 2018 at 10:04 AM Victor Toso <victortoso at redhat.com> wrote:
>
> Hi,
>
> On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote:
> > Add a set of helper functions built around GIO that can be used to
> > easily write messages to and read from the given FD.
> >
> > Since VDAgentConnection uses GIO,
> > it integrates well with GMainLoop.
> >
> > Read messages must begin with a header of a fixed size.
> > Message body size can vary.
> >
> > User of VDAgentConnection is notified
> > through callbacks about the following events:
> > - message header read
> > - message body read
> > - I/O error
> >
> > A new VDAgentConnection can be constructed using
> > vdagent_connection_new() based on a GIOStream.
> >
> > A new GIOStream can be obtained using
> > vdagent_file_open() or vdagent_socket_connect().
> >
> > vdagent_connection_destroy() destroyes the connection.
> > However, due to the asynchronous nature of used GIO functions,
> > this does NOT close the underlying FD immediately.
>
> Yep, commented about it on 00/18 but I take that making this a
> GObject might help. Not giving a full review here, just small
> note after looking at the patch and the follow up ones.
>
> > ---
> >  Makefile.am              |   2 +
> >  src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
> >  src/vdagent-connection.h | 103 ++++++++++++++
> >  3 files changed, 406 insertions(+)
> >  create mode 100644 src/vdagent-connection.c
> >  create mode 100644 src/vdagent-connection.h
> >
> > diff --git a/Makefile.am b/Makefile.am
> > index fa54bbc..b291b19 100644
> > --- a/Makefile.am
> > +++ b/Makefile.am
> > @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
> >  common_sources =                             \
> >       src/udscs.c                             \
> >       src/udscs.h                             \
> > +     src/vdagent-connection.c                \
> > +     src/vdagent-connection.h                \
> >       src/vdagentd-proto-strings.h            \
> >       src/vdagentd-proto.h                    \
> >       $(NULL)
> > diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
> > new file mode 100644
> > index 0000000..0eb2ec9
> > --- /dev/null
> > +++ b/src/vdagent-connection.c
> > @@ -0,0 +1,301 @@
> > +/*  vdagent-connection.c
> > +
> > +    Copyright 2018 Red Hat, Inc.
> > +
> > +    This program is free software: you can redistribute it and/or modify
> > +    it under the terms of the GNU General Public License as published by
> > +    the Free Software Foundation, either version 3 of the License, or
> > +    (at your option) any later version.
> > +
> > +    This program is distributed in the hope that it will be useful,
> > +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> > +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> > +    GNU General Public License for more details.
> > +
> > +    You should have received a copy of the GNU General Public License
> > +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> > +*/
> > +
> > +#include <syslog.h>
> > +#include <fcntl.h>
> > +#include <glib/gstdio.h>
> > +#include <gio/gunixinputstream.h>
> > +#include <gio/gunixoutputstream.h>
> > +#include <gio/gunixsocketaddress.h>
> > +
> > +#include "vdagent-connection.h"
> > +
> > +struct VDAgentConnection {
> > +    GIOStream              *io_stream;
> > +    gboolean                opening;
> > +    GCancellable           *cancellable;
> > +
> > +    GQueue                 *write_queue;
> > +    GMainLoop              *flush_loop;
> > +
> > +    VDAgentConnReadCb       read_cb;
> > +    gpointer                read_buff;
> > +    gpointer                header_buff;
> > +    gsize                   header_size;
> > +    VDAgentConnHeaderReadCb header_read_cb;
> > +
> > +    VDAgentConnErrorCb      error_cb;
> > +
> > +    GCredentials           *credentials;
> > +
> > +    gpointer                user_data;
> > +};
> > +
> > +static void request_message_write(VDAgentConnection *conn);
> > +static void request_message_read(VDAgentConnection *conn);
> > +
> > +GIOStream *vdagent_file_open(const gchar *path)
> > +{
> > +    gint fd;
> > +
> > +    fd = g_open(path, O_RDWR);
> > +    if (fd == -1) {
> > +        syslog(LOG_ERR, "%s: %m", __func__);
> > +        return NULL;
> > +    }
> > +
> > +    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
> > +                                  g_unix_output_stream_new(fd, TRUE));
> > +}
> > +
> > +GIOStream *vdagent_socket_connect(const gchar *address)
> > +{
> > +    GSocketConnection *socket_conn;
> > +    GSocketClient *client;
> > +    GSocketConnectable *connectable;
> > +    GError *err = NULL;
> > +
> > +    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
> > +    client = g_object_new(G_TYPE_SOCKET_CLIENT,
> > +                          "family", G_SOCKET_FAMILY_UNIX,
> > +                          "type", G_SOCKET_TYPE_STREAM,
> > +                          NULL);
> > +
> > +    socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
> > +    g_object_unref(client);
> > +    g_object_unref(connectable);
> > +    if (err) {
> > +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> > +        g_error_free(err);
> > +    }
> > +    return G_IO_STREAM(socket_conn);
> > +}
>
> Not convinced that this API is really needed? The function can be
> kept but to be used internally by vdagent_connection_new()
> itself, I guess. I don't see other components using GIOStream
> unless to pass it to vdagent_connection_new ().

The rationale behind this was actually purely cosmetic.
We could have something like vdagent_connection_new_file() and
vdagent_connection_new_socket(),
but both functions would need to have 7 arguments and we would need to
initialize all the members of VDAgentConnection struct in both of the
functions.
So having just one common vdagent_connection_new() function seemed
cleaner to me.
>
> Might make sense to make socket's address as VDAgentConnection's
> property which should be set on g_object_new () too.
>
> Reviewed-by: Victor Toso <victortoso at redhat.com>
>
> Victor
>
> > +
> > +VDAgentConnection *vdagent_connection_new(
> > +    GIOStream              *io_stream,
> > +    gboolean                wait_on_opening,
> > +    gsize                   header_size,
> > +    VDAgentConnHeaderReadCb header_read_cb,
> > +    VDAgentConnReadCb       read_cb,
> > +    VDAgentConnErrorCb      error_cb,
> > +    gpointer                user_data)
> > +{
> > +    VDAgentConnection *conn;
> > +
> > +    conn = g_new(VDAgentConnection, 1);
> > +    conn->io_stream = io_stream;
> > +    conn->cancellable = g_cancellable_new();
> > +    conn->opening = wait_on_opening;
> > +    conn->write_queue = g_queue_new();
> > +    conn->flush_loop = NULL;
> > +    conn->read_cb = read_cb;
> > +    conn->read_buff = NULL;
> > +    conn->header_buff = g_malloc(header_size);
> > +    conn->header_size = header_size;
> > +    conn->header_read_cb = header_read_cb;
> > +    conn->error_cb = error_cb;
> > +    conn->credentials = NULL;
> > +    conn->user_data = user_data;
> > +
> > +    request_message_read(conn);
> > +
> > +    return conn;
> > +}
> > +
> > +static gboolean connection_has_pending(VDAgentConnection *conn)
> > +{
> > +    GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
> > +    GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
> > +
> > +    return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
> > +}
> > +
> > +/* Free up all resources used by VDAgentConnection
> > + * once all I/O operations have finished. */
> > +static void connection_finalize(VDAgentConnection *conn)
> > +{
> > +    g_object_unref(conn->cancellable);
> > +    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
> > +    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> > +    g_clear_object(&conn->credentials);
> > +    g_free(conn->header_buff);
> > +    g_free(conn->read_buff);
> > +    g_object_unref(conn->io_stream);
> > +    g_free(conn);
> > +}
> > +
> > +void vdagent_connection_destroy(VDAgentConnection *conn)
> > +{
> > +    /* If there's a pending I/O operation on either of the streams, cancel it,
> > +     * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
> > +    if (connection_has_pending(conn))
> > +        g_cancellable_cancel(conn->cancellable);
> > +    else
> > +        connection_finalize(conn);
> > +}
> > +
> > +static void handle_io_error(VDAgentConnection *conn, GError *err)
> > +{
> > +    if (g_cancellable_is_cancelled(conn->cancellable)) {
> > +        if (!connection_has_pending(conn))
> > +            connection_finalize(conn);
> > +    } else {
> > +        syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
> > +        conn->error_cb(conn->user_data);
> > +    }
> > +    g_error_free(err);
> > +}
> > +
> > +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
> > +{
> > +    GSocket *socket;
> > +    GError *err = NULL;
> > +
> > +    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
> > +
> > +    if (conn->credentials)
> > +        return conn->credentials;
> > +
> > +    socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
> > +    conn->credentials = g_socket_get_credentials(socket, &err);
> > +    if (err) {
> > +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> > +        g_error_free(err);
> > +    }
> > +    return conn->credentials;
> > +}
> > +
> > +static void message_write_cb(GObject      *source_object,
> > +                             GAsyncResult *res,
> > +                             gpointer      user_data)
> > +{
> > +    VDAgentConnection *conn = user_data;
> > +    GOutputStream *out = G_OUTPUT_STREAM(source_object);
> > +    GError *err = NULL;
> > +
> > +    g_output_stream_write_all_finish(out, res, NULL, &err);
> > +    g_bytes_unref(g_queue_pop_head(conn->write_queue));
> > +
> > +    if (err)
> > +        return handle_io_error(conn, err);
> > +
> > +    conn->opening = FALSE;
> > +
> > +    if (g_queue_is_empty(conn->write_queue))
> > +        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> > +    else
> > +        request_message_write(conn);
> > +}
> > +
> > +static void request_message_write(VDAgentConnection *conn)
> > +{
> > +    GBytes *msg;
> > +    GOutputStream *out;
> > +
> > +    msg = g_queue_peek_head(conn->write_queue);
> > +    out = g_io_stream_get_output_stream(conn->io_stream);
> > +
> > +    g_output_stream_write_all_async(out,
> > +        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
> > +        G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
> > +}
> > +
> > +void vdagent_connection_write(VDAgentConnection *conn,
> > +                              gpointer           data,
> > +                              gsize              size)
> > +{
> > +    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
> > +
> > +    if (g_queue_get_length(conn->write_queue) == 1)
> > +        request_message_write(conn);
> > +}
> > +
> > +void vdagent_connection_flush(VDAgentConnection *conn)
> > +{
> > +    GMainLoop *loop;
> > +    /* TODO: allow multiple flush calls at once? */
> > +    g_return_if_fail(conn->flush_loop == NULL);
> > +
> > +    if (g_queue_is_empty(conn->write_queue))
> > +        return;
> > +
> > +    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
> > +    /* When using GTK+, this should be wrapped with
> > +     * gdk_threads_leave() and gdk_threads_enter(),
> > +     * but since flush is used in virtio-port.c only
> > +     * let's leave it as it is for now. */
> > +    g_main_loop_run(loop);
> > +    g_main_loop_unref(loop);
> > +}
> > +
> > +static void message_read_cb(GObject      *source_object,
> > +                            GAsyncResult *res,
> > +                            gpointer      user_data)
> > +{
> > +    VDAgentConnection *conn = user_data;
> > +    GInputStream *in = G_INPUT_STREAM(source_object);
> > +    GError *err = NULL;
> > +    gsize bytes_read, data_size;
> > +
> > +    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
> > +    if (err)
> > +        return handle_io_error(conn, err);
> > +    if (bytes_read == 0) {
> > +        /* see virtio-port.c for the rationale behind this */
> > +        if (conn->opening) {
> > +            g_usleep(10000);
> > +            request_message_read(conn);
> > +        } else {
> > +            conn->error_cb(conn->user_data);
> > +        }
> > +        return;
> > +    }
> > +    conn->opening = FALSE;
> > +
> > +    if (conn->read_buff == NULL) {
> > +        /* we've read the message header, now let's read its body */
> > +        if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
> > +            return;
> > +        if (data_size > 0) {
> > +            conn->read_buff = g_malloc(data_size);
> > +            /* TODO: if allocation fails, we could try g_input_stream_skip()
> > +             * and hope that the message wasn't crucial for proper functiong.
> > +             * An example might be when a user tries to copy large clipboard.
> > +             * Not sure whether it's worth implementing.
> > +             * Other stuff might just as well fall apart
> > +             * when the system is running out of memory? */
> > +            g_input_stream_read_all_async(in, conn->read_buff, data_size,
> > +                G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> > +            return;
> > +        }
> > +    }
> > +
> > +    if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
> > +        return;
> > +    g_clear_pointer(&conn->read_buff, g_free);
> > +    request_message_read(conn);
> > +}
> > +
> > +static void request_message_read(VDAgentConnection *conn)
> > +{
> > +    GInputStream *in;
> > +    in = g_io_stream_get_input_stream(conn->io_stream);
> > +
> > +    g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
> > +        G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> > +}
> > diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
> > new file mode 100644
> > index 0000000..6fc0081
> > --- /dev/null
> > +++ b/src/vdagent-connection.h
> > @@ -0,0 +1,103 @@
> > +/*  vdagent-connection.h
> > +
> > +    Copyright 2018 Red Hat, Inc.
> > +
> > +    This program is free software: you can redistribute it and/or modify
> > +    it under the terms of the GNU General Public License as published by
> > +    the Free Software Foundation, either version 3 of the License, or
> > +    (at your option) any later version.
> > +
> > +    This program is distributed in the hope that it will be useful,
> > +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> > +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> > +    GNU General Public License for more details.
> > +
> > +    You should have received a copy of the GNU General Public License
> > +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> > +*/
> > +
> > +#ifndef __VDAGENT_CONNECTION_H
> > +#define __VDAGENT_CONNECTION_H
> > +
> > +#include <glib.h>
> > +#include <gio/gio.h>
> > +
> > +typedef struct VDAgentConnection VDAgentConnection;
> > +
> > +/* Called when a message header has been read.
> > + *
> > + * If the handler wishes to continue reading,
> > + * it must set @body_size to the size of message's body and return TRUE.
> > + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
> > + *
> > + * Otherwise the handler should return FALSE
> > + * and call vdagent_connection_destroy().
> > + *
> > + * @header_buff is owned by VDAgentConnection and must not be freed. */
> > +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
> > +                                            gsize   *body_size,
> > +                                            gpointer user_data);
> > +
> > +/* Called when a full message has been read.
> > + *
> > + * If the handler wished to continue reading, it must return TRUE,
> > + * otherwise FALSE and call vdagent_connection_destroy().
> > + *
> > + * @header, @data are owned by VDAgentConnection and must not be freed. */
> > +typedef gboolean (*VDAgentConnReadCb)(gpointer header,
> > +                                      gpointer data,
> > +                                      gpointer user_data);
> > +
> > +/* Called when an error occured during read or wirte.
> > + * The handler is expected to call vdagent_connection_destroy(). */
> > +typedef void (*VDAgentConnErrorCb)(gpointer user_data);
> > +
> > +/* Open a file in @path for read and write.
> > + * Returns a GIOStream to the given file or NULL on error. */
> > +GIOStream *vdagent_file_open(const gchar *path);
> > +
> > +/* Create a socket and initiate a new connection to the socket on @address.
> > + * Returns a GIOStream corresponding to the new connection or NULL on error. */
> > +GIOStream *vdagent_socket_connect(const gchar *address);
> > +
> > +/* Create new VDAgentConnection and start reading incoming messages.
> > + *
> > + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
> > + * until the first message is successfully read or written to the @io_stream.
> > + *
> > + * @user_data will be passed to the supplied callbacks. */
> > +VDAgentConnection *vdagent_connection_new(
> > +    GIOStream              *io_stream,
> > +    gboolean                wait_on_opening,
> > +    gsize                   header_size,
> > +    VDAgentConnHeaderReadCb header_read_cb,
> > +    VDAgentConnReadCb       read_cb,
> > +    VDAgentConnErrorCb      error_cb,
> > +    gpointer                user_data);
> > +
> > +/* Free up all resources associated with the VDAgentConnection.
> > + *
> > + * This operation can be asynchronous. */
> > +void vdagent_connection_destroy(VDAgentConnection *conn);
> > +
> > +/* Append a message to write queue.
> > + *
> > + * VDAgentConnection takes ownership of the @data
> > + * and frees it once the message is flushed. */
> > +void vdagent_connection_write(VDAgentConnection *conn,
> > +                              gpointer           data,
> > +                              gsize              size);
> > +
> > +/* Waits until all queued messages get written to the output stream.
> > + *
> > + * Note: other GSources can be triggered during this call */
> > +void vdagent_connection_flush(VDAgentConnection *conn);
> > +
> > +/* Returns the credentials of the foreign process connected to the socket.
> > + *
> > + * It is an error to call this function with a VDAgentConnection
> > + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
> > +GCredentials *vdagent_connection_get_peer_credentials(
> > +    VDAgentConnection *conn);
> > +
> > +#endif
> > --
> > 2.17.1
> >
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel at lists.freedesktop.org
> > https://lists.freedesktop.org/mailman/listinfo/spice-devel


More information about the Spice-devel mailing list