[Spice-devel] [PATCH vdagent-linux 3/4] introduce VDAgentConnection

Jakub Janků jjanku at redhat.com
Sun Sep 30 18:05:22 UTC 2018


1) VDAgentConnection

Add vdagent-connection.{c,h} files.

Define a new GObject: VDAgentConnection which can be used to easily
write messages to and read from the given FD.

VDAgentConnection uses GIO and therefore integrates well with GMainLoop.

Read messages must begin with a header of a fixed size.
Message body size can vary.

User of VDAgentConnection is notified
through callbacks about the following events:
 * message header read
 * whole message read
 * I/O error

A new VDAgentConnection can be constructed using
vdagent_connection_new() based on a GIOStream that can be obtained using
vdagent_file_open() or vdagent_socket_connect().

vdagent_connection_destroy() destroyes the connection.
However, due to the asynchronous nature of used GIO functions,
this does NOT close the underlying FD immediately.

If vdagent_connection_destroy() is called outside of GMainLoop
(or the loop quits right after the function ivocation),
g_main_context_iteration() should be called to ensure that the
VDAgentConnection finalizes properly.

2) udscs

Rewrite udscs.c to use the new VDAgentConnection.
Use GSocketService in udscs_server.

Drop support for select(), remove:
 * udscs_server_fill_fds()
 * udscs_server_handle_fds()

3) virtio_port

Rewrite virtio-port.c to use the new VDAgentConnection.

Drop support for select(), remove:
 * vdagent_virtio_port_fill_fds()
 * vdagent_virtio_port_handle_fds()

2) vdagentd

Replace the main_loop() with a GMainLoop.

Use g_unix_signal_add() to handle SIGINT, SIGHUP, SIGTERM.
SIGQUIT handling is not supported by GLib.

Integrate the session_info into the loop using
GIOChannel and g_io_add_watch().

Signed-off-by: Jakub Janků <jjanku at redhat.com>
---
 Makefile.am                |   2 +
 src/udscs.c                | 483 +++++++++++--------------------------
 src/udscs.h                |  15 --
 src/vdagent-connection.c   | 300 +++++++++++++++++++++++
 src/vdagent-connection.h   | 124 ++++++++++
 src/vdagent/vdagent.c      |   3 +
 src/vdagentd/vdagentd.c    | 169 ++++++-------
 src/vdagentd/virtio-port.c | 389 ++++++++++-------------------
 src/vdagentd/virtio-port.h |  18 --
 9 files changed, 772 insertions(+), 731 deletions(-)
 create mode 100644 src/vdagent-connection.c
 create mode 100644 src/vdagent-connection.h

diff --git a/Makefile.am b/Makefile.am
index fa54bbc..b291b19 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
 common_sources =				\
 	src/udscs.c				\
 	src/udscs.h				\
+	src/vdagent-connection.c		\
+	src/vdagent-connection.h		\
 	src/vdagentd-proto-strings.h		\
 	src/vdagentd-proto.h			\
 	$(NULL)
diff --git a/src/udscs.c b/src/udscs.c
index 62abc97..3bf0089 100644
--- a/src/udscs.c
+++ b/src/udscs.c
@@ -24,42 +24,22 @@
 #include <config.h>
 #endif
 
-#include <stdio.h>
 #include <stdlib.h>
 #include <syslog.h>
-#include <unistd.h>
-#include <errno.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
 #include <glib-unix.h>
+#include <gio/gunixsocketaddress.h>
 #include "udscs.h"
 #include "vdagentd-proto-strings.h"
-
-struct udscs_buf {
-    uint8_t *buf;
-    size_t pos;
-    size_t size;
-
-    struct udscs_buf *next;
-};
+#include "vdagent-connection.h"
 
 struct udscs_connection {
-    int fd;
     int debug;
     void *user_data;
 #ifndef UDSCS_NO_SERVER
-    struct ucred peer_cred;
+    gint peer_pid;
 #endif
 
-    /* Read stuff, single buffer, separate header and data buffer */
-    int header_read;
-    struct udscs_message_header header;
-    struct udscs_buf data;
-
-    /* Writes are stored in a linked list of buffers, with both the header
-       + data for a single message in 1 buffer. */
-    struct udscs_buf *write_buf;
+    VDAgentConnection *conn;
 
     /* Callbacks */
     udscs_read_callback read_callback;
@@ -67,16 +47,8 @@ struct udscs_connection {
 
     struct udscs_connection *next;
     struct udscs_connection *prev;
-
-    GIOChannel                     *io_channel;
-    guint                           write_watch_id;
-    guint                           read_watch_id;
 };
 
-static gboolean udscs_io_channel_cb(GIOChannel *source,
-                                    GIOCondition condition,
-                                    gpointer data);
-
 static void debug_print_message_header(struct udscs_connection     *conn,
                                        struct udscs_message_header *header,
                                        const gchar                 *direction)
@@ -93,47 +65,61 @@ static void debug_print_message_header(struct udscs_connection     *conn,
         conn, direction, type, header->arg1, header->arg2, header->size);
 }
 
+static gboolean conn_header_read_cb(gpointer header_buff,
+                                    gsize   *body_size,
+                                    gpointer user_data)
+{
+    struct udscs_message_header *header = header_buff;
+    *body_size = header->size;
+    return TRUE;
+}
+
+static gboolean conn_read_cb(gpointer header_buff,
+                             gpointer data,
+                             gpointer user_data)
+{
+    struct udscs_connection *conn = user_data;
+    struct udscs_message_header *header = header_buff;
+
+    debug_print_message_header(conn, header, "received");
+
+    conn->read_callback(&conn, header, data);
+    return conn != NULL;
+}
+
+static void conn_error_cb(GError *err, gpointer user_data)
+{
+    struct udscs_connection *conn = user_data;
+    if (err)
+        syslog(LOG_ERR, "%p error: %s", conn, err->message);
+    udscs_destroy_connection(&conn);
+}
+
 struct udscs_connection *udscs_connect(const char *socketname,
     udscs_read_callback read_callback,
     udscs_disconnect_callback disconnect_callback,
     int debug)
 {
-    int c;
-    struct sockaddr_un address;
+    GIOStream *io_stream;
     struct udscs_connection *conn;
+    GError *err = NULL;
 
-    conn = g_new0(struct udscs_connection, 1);
-    conn->debug = debug;
-
-    conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (conn->fd == -1) {
-        syslog(LOG_ERR, "creating unix domain socket: %m");
-        g_free(conn);
-        return NULL;
-    }
-
-    address.sun_family = AF_UNIX;
-    snprintf(address.sun_path, sizeof(address.sun_path), "%s", socketname);
-    c = connect(conn->fd, (struct sockaddr *)&address, sizeof(address));
-    if (c != 0) {
-        if (conn->debug) {
-            syslog(LOG_DEBUG, "connect %s: %m", socketname);
-        }
-        g_free(conn);
-        return NULL;
-    }
-
-    conn->io_channel = g_io_channel_unix_new(conn->fd);
-    if (!conn->io_channel) {
-        udscs_destroy_connection(&conn);
+    io_stream = vdagent_socket_connect(socketname, &err);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
         return NULL;
     }
-    conn->read_watch_id =
-        g_io_add_watch(conn->io_channel,
-                       G_IO_IN | G_IO_ERR | G_IO_NVAL,
-                       udscs_io_channel_cb,
-                       conn);
 
+    conn = g_new0(struct udscs_connection, 1);
+    conn->debug = debug;
+    conn->conn = vdagent_connection_new(io_stream,
+                                        FALSE,
+                                        sizeof(struct udscs_message_header),
+                                        conn_header_read_cb,
+                                        conn_read_cb,
+                                        conn_error_cb,
+                                        conn);
     conn->read_callback = read_callback;
     conn->disconnect_callback = disconnect_callback;
 
@@ -145,7 +131,6 @@ struct udscs_connection *udscs_connect(const char *socketname,
 
 void udscs_destroy_connection(struct udscs_connection **connp)
 {
-    struct udscs_buf *wbuf, *next_wbuf;
     struct udscs_connection *conn = *connp;
 
     if (!conn)
@@ -154,28 +139,12 @@ void udscs_destroy_connection(struct udscs_connection **connp)
     if (conn->disconnect_callback)
         conn->disconnect_callback(conn);
 
-    wbuf = conn->write_buf;
-    while (wbuf) {
-        next_wbuf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-        wbuf = next_wbuf;
-    }
-
-    g_clear_pointer(&conn->data.buf, g_free);
-
     if (conn->next)
         conn->next->prev = conn->prev;
     if (conn->prev)
         conn->prev->next = conn->next;
 
-    close(conn->fd);
-
-    if (conn->write_watch_id != 0)
-        g_source_remove(conn->write_watch_id);
-    if (conn->read_watch_id != 0)
-        g_source_remove(conn->read_watch_id);
-    g_clear_pointer(&conn->io_channel, g_io_channel_unref);
+    vdagent_connection_destroy(conn->conn);
 
     if (conn->debug)
         syslog(LOG_DEBUG, "%p disconnected", conn);
@@ -199,174 +168,33 @@ void *udscs_get_user_data(struct udscs_connection *conn)
 void udscs_write(struct udscs_connection *conn, uint32_t type, uint32_t arg1,
     uint32_t arg2, const uint8_t *data, uint32_t size)
 {
-    struct udscs_buf *wbuf, *new_wbuf;
+    gpointer buff;
+    guint buff_size;
     struct udscs_message_header header;
 
-    new_wbuf = g_new(struct udscs_buf, 1);
-    new_wbuf->pos = 0;
-    new_wbuf->size = sizeof(header) + size;
-    new_wbuf->next = NULL;
-    new_wbuf->buf = g_malloc(new_wbuf->size);
+    buff_size = sizeof(header) + size;
+    buff = g_malloc(buff_size);
 
     header.type = type;
     header.arg1 = arg1;
     header.arg2 = arg2;
     header.size = size;
 
-    memcpy(new_wbuf->buf, &header, sizeof(header));
-    memcpy(new_wbuf->buf + sizeof(header), data, size);
+    memcpy(buff, &header, sizeof(header));
+    memcpy(buff + sizeof(header), data, size);
 
     debug_print_message_header(conn, &header, "sent");
 
-    if (conn->io_channel && conn->write_watch_id == 0)
-        conn->write_watch_id =
-            g_io_add_watch(conn->io_channel,
-                           G_IO_OUT | G_IO_ERR | G_IO_NVAL,
-                           udscs_io_channel_cb,
-                           conn);
-
-    if (!conn->write_buf) {
-        conn->write_buf = new_wbuf;
-        return;
-    }
-
-    /* maybe we should limit the write_buf stack depth ? */
-    wbuf = conn->write_buf;
-    while (wbuf->next)
-        wbuf = wbuf->next;
-
-    wbuf->next = new_wbuf;
-}
-
-/* A helper for udscs_do_read() */
-static void udscs_read_complete(struct udscs_connection **connp)
-{
-    struct udscs_connection *conn = *connp;
-
-    debug_print_message_header(conn, &conn->header, "received");
-
-    if (conn->read_callback) {
-        conn->read_callback(connp, &conn->header, conn->data.buf);
-        if (!*connp) /* Was the connection disconnected by the callback ? */
-            return;
-    }
-
-    g_free(conn->data.buf);
-    memset(&conn->data, 0, sizeof(conn->data)); /* data.buf = NULL */
-    conn->header_read = 0;
+    vdagent_connection_write(conn->conn, buff, buff_size);
 }
 
-static void udscs_do_read(struct udscs_connection **connp)
-{
-    ssize_t n;
-    size_t to_read;
-    uint8_t *dest;
-    struct udscs_connection *conn = *connp;
-
-    if (conn->header_read < sizeof(conn->header)) {
-        to_read = sizeof(conn->header) - conn->header_read;
-        dest = (uint8_t *)&conn->header + conn->header_read;
-    } else {
-        to_read = conn->data.size - conn->data.pos;
-        dest = conn->data.buf + conn->data.pos;
-    }
-
-    n = read(conn->fd, dest, to_read);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "reading unix domain socket: %m, disconnecting %p",
-               conn);
-    }
-    if (n <= 0) {
-        udscs_destroy_connection(connp);
-        return;
-    }
-
-    if (conn->header_read < sizeof(conn->header)) {
-        conn->header_read += n;
-        if (conn->header_read == sizeof(conn->header)) {
-            if (conn->header.size == 0) {
-                udscs_read_complete(connp);
-                return;
-            }
-            conn->data.pos = 0;
-            conn->data.size = conn->header.size;
-            conn->data.buf = g_malloc(conn->data.size);
-        }
-    } else {
-        conn->data.pos += n;
-        if (conn->data.pos == conn->data.size)
-            udscs_read_complete(connp);
-    }
-}
-
-static void udscs_do_write(struct udscs_connection **connp)
-{
-    ssize_t n;
-    size_t to_write;
-    struct udscs_connection *conn = *connp;
-
-    struct udscs_buf* wbuf = conn->write_buf;
-    if (!wbuf) {
-        syslog(LOG_ERR,
-               "%p do_write called on a connection without a write buf ?!",
-               conn);
-        return;
-    }
-
-    to_write = wbuf->size - wbuf->pos;
-    n = write(conn->fd, wbuf->buf + wbuf->pos, to_write);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "writing to unix domain socket: %m, disconnecting %p",
-               conn);
-        udscs_destroy_connection(connp);
-        return;
-    }
-
-    wbuf->pos += n;
-    if (wbuf->pos == wbuf->size) {
-        conn->write_buf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-    }
-}
-
-static gboolean udscs_io_channel_cb(GIOChannel *source,
-                                    GIOCondition condition,
-                                    gpointer data)
-{
-    struct udscs_connection *conn = data;
-
-    if (condition & G_IO_IN) {
-        udscs_do_read(&conn);
-        if (conn == NULL)
-            return G_SOURCE_REMOVE;
-        return G_SOURCE_CONTINUE;
-    }
-    if (condition & G_IO_OUT) {
-        udscs_do_write(&conn);
-        if (conn == NULL)
-            return G_SOURCE_REMOVE;
-        if (conn->write_buf)
-            return G_SOURCE_CONTINUE;
-        conn->write_watch_id = 0;
-        return G_SOURCE_REMOVE;
-    }
-
-    udscs_destroy_connection(&conn);
-    return G_SOURCE_REMOVE;
-}
-
-
 #ifndef UDSCS_NO_SERVER
 
 /* ---------- Server-side implementation ---------- */
 
 struct udscs_server {
-    int fd;
+    GSocketService *service;
+
     int debug;
     struct udscs_connection connections_head;
     udscs_connect_callback connect_callback;
@@ -374,7 +202,12 @@ struct udscs_server {
     udscs_disconnect_callback disconnect_callback;
 };
 
-struct udscs_server *udscs_create_server_for_fd(int fd,
+static gboolean udscs_server_accept_cb(GSocketService    *service,
+                                       GSocketConnection *socket_conn,
+                                       GObject           *source_object,
+                                       gpointer           user_data);
+
+static struct udscs_server *udscs_server_new(
     udscs_connect_callback connect_callback,
     udscs_read_callback read_callback,
     udscs_disconnect_callback disconnect_callback,
@@ -382,59 +215,74 @@ struct udscs_server *udscs_create_server_for_fd(int fd,
 {
     struct udscs_server *server;
 
-    if (fd <= 0) {
-        syslog(LOG_ERR, "Invalid file descriptor: %i", fd);
-        return NULL;
-    }
-
     server = g_new0(struct udscs_server, 1);
     server->debug = debug;
-    server->fd = fd;
     server->connect_callback = connect_callback;
     server->read_callback = read_callback;
     server->disconnect_callback = disconnect_callback;
+    server->service = g_socket_service_new();
+
+    g_signal_connect(server->service, "incoming",
+        G_CALLBACK(udscs_server_accept_cb), server);
 
     return server;
 }
 
-struct udscs_server *udscs_create_server(const char *socketname,
+struct udscs_server *udscs_create_server_for_fd(int fd,
     udscs_connect_callback connect_callback,
     udscs_read_callback read_callback,
     udscs_disconnect_callback disconnect_callback,
     int debug)
 {
-    int c;
-    int fd;
-    struct sockaddr_un address;
     struct udscs_server *server;
+    GSocket *socket;
+    GError *err = NULL;
 
-    fd = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (fd == -1) {
-        syslog(LOG_ERR, "creating unix domain socket: %m");
-        return NULL;
-    }
+    server = udscs_server_new(connect_callback, read_callback,
+                              disconnect_callback, debug);
 
-    address.sun_family = AF_UNIX;
-    snprintf(address.sun_path, sizeof(address.sun_path), "%s", socketname);
-    c = bind(fd, (struct sockaddr *)&address, sizeof(address));
-    if (c != 0) {
-        syslog(LOG_ERR, "bind %s: %m", socketname);
-        close(fd);
-        return NULL;
-    }
+    socket = g_socket_new_from_fd(fd, &err);
+    if (err)
+        goto error;
+    g_socket_listener_add_socket(G_SOCKET_LISTENER(server->service),
+                                 socket, NULL, &err);
+    g_object_unref(socket);
+    if (err)
+        goto error;
 
-    c = listen(fd, 5);
-    if (c != 0) {
-        syslog(LOG_ERR, "listen: %m");
-        close(fd);
-        return NULL;
-    }
-
-    server = udscs_create_server_for_fd(fd, connect_callback, read_callback,
-                                        disconnect_callback, debug);
+    return server;
+error:
+    syslog(LOG_ERR, "%s: %s", __func__, err->message);
+    g_error_free(err);
+    udscs_destroy_server(server);
+    return NULL;
+}
 
-    if (!server) {
-        close(fd);
+struct udscs_server *udscs_create_server(const char *socketname,
+    udscs_connect_callback connect_callback,
+    udscs_read_callback read_callback,
+    udscs_disconnect_callback disconnect_callback,
+    int debug)
+{
+    struct udscs_server *server;
+    GSocketAddress *socket_addr;
+    GError *err = NULL;
+
+    server = udscs_server_new(connect_callback, read_callback,
+                              disconnect_callback, debug);
+
+    socket_addr = g_unix_socket_address_new(socketname);
+    g_socket_listener_add_address(G_SOCKET_LISTENER(server->service),
+                                  socket_addr,
+                                  G_SOCKET_TYPE_STREAM,
+                                  G_SOCKET_PROTOCOL_DEFAULT,
+                                  NULL, NULL, &err);
+    g_object_unref(socket_addr);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
+        udscs_destroy_server(server);
+        return NULL;
     }
 
     return server;
@@ -453,43 +301,51 @@ void udscs_destroy_server(struct udscs_server *server)
         udscs_destroy_connection(&conn);
         conn = next_conn;
     }
-    close(server->fd);
+    g_object_unref(server->service);
     g_free(server);
 }
 
 int udscs_get_peer_pid(struct udscs_connection *conn)
 {
-    return (int)conn->peer_cred.pid;
+    return conn->peer_pid;
 }
 
-static void udscs_server_accept(struct udscs_server *server) {
+static gboolean udscs_server_accept_cb(GSocketService    *service,
+                                       GSocketConnection *socket_conn,
+                                       GObject           *source_object,
+                                       gpointer           user_data)
+{
+    struct udscs_server *server = user_data;
     struct udscs_connection *new_conn, *conn;
-    struct sockaddr_un address;
-    socklen_t length = sizeof(address);
-    int r, fd;
-
-    fd = accept(server->fd, (struct sockaddr *)&address, &length);
-    if (fd == -1) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "accept: %m");
-        return;
-    }
+    GCredentials *cred;
+    GError *err = NULL;
 
     new_conn = g_new0(struct udscs_connection, 1);
-    new_conn->fd = fd;
     new_conn->debug = server->debug;
     new_conn->read_callback = server->read_callback;
     new_conn->disconnect_callback = server->disconnect_callback;
 
-    length = sizeof(new_conn->peer_cred);
-    r = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &new_conn->peer_cred, &length);
-    if (r != 0) {
-        syslog(LOG_ERR, "Could not get peercred, disconnecting new client");
-        close(fd);
+    g_object_ref(socket_conn);
+    new_conn->conn = vdagent_connection_new(G_IO_STREAM(socket_conn),
+                                            FALSE,
+                                            sizeof(struct udscs_message_header),
+                                            conn_header_read_cb,
+                                            conn_read_cb,
+                                            conn_error_cb,
+                                            new_conn);
+
+
+    cred = vdagent_connection_get_peer_credentials(new_conn->conn, &err);
+    if (err) {
+        syslog(LOG_ERR, "Could not get peer PID, disconnecting new client: %s",
+                        err->message);
+        g_error_free(err);
+        vdagent_connection_destroy(new_conn->conn);
         g_free(new_conn);
-        return;
+        return TRUE;
     }
+    new_conn->peer_pid = g_credentials_get_unix_pid(cred, NULL);
+    g_object_unref(cred);
 
     conn = &server->connections_head;
     while (conn->next)
@@ -504,59 +360,8 @@ static void udscs_server_accept(struct udscs_server *server) {
 
     if (server->connect_callback)
         server->connect_callback(new_conn);
-}
-
-int udscs_server_fill_fds(struct udscs_server *server, fd_set *readfds,
-        fd_set *writefds)
-{
-    struct udscs_connection *conn;
-    int nfds;
 
-    if (!server)
-        return -1;
-
-    nfds = server->fd + 1;
-    FD_SET(server->fd, readfds);
-
-    conn = server->connections_head.next;
-    while (conn) {
-        FD_SET(conn->fd, readfds);
-        if (conn->write_buf)
-            FD_SET(conn->fd, writefds);
-
-        if (conn->fd >= nfds)
-            nfds = conn->fd + 1;
-
-        conn = conn->next;
-    }
-
-    return nfds;
-}
-
-void udscs_server_handle_fds(struct udscs_server *server, fd_set *readfds,
-        fd_set *writefds)
-{
-    struct udscs_connection *conn, *next_conn;
-
-    if (!server)
-        return;
-
-    if (FD_ISSET(server->fd, readfds))
-        udscs_server_accept(server);
-
-    conn = server->connections_head.next;
-    while (conn) {
-        /* conn may be destroyed by udscs_do_read() or udscs_do_write()
-         * (when disconnected), so get the next connection first. */
-        next_conn = conn->next;
-
-        if (FD_ISSET(conn->fd, readfds))
-            udscs_do_read(&conn);
-        if (conn && FD_ISSET(conn->fd, writefds))
-            udscs_do_write(&conn);
-
-        conn = next_conn;
-    }
+    return TRUE;
 }
 
 void udscs_server_write_all(struct udscs_server *server,
diff --git a/src/udscs.h b/src/udscs.h
index 363ca18..1c7fa2b 100644
--- a/src/udscs.h
+++ b/src/udscs.h
@@ -22,9 +22,7 @@
 #ifndef __UDSCS_H
 #define __UDSCS_H
 
-#include <stdio.h>
 #include <stdint.h>
-#include <sys/select.h>
 #include <sys/socket.h>
 
 
@@ -151,19 +149,6 @@ typedef int (*udscs_for_all_clients_callback)(struct udscs_connection **connp,
 int udscs_server_for_all_clients(struct udscs_server *server,
     udscs_for_all_clients_callback func, void *priv);
 
-/* Given a udscs server, fill the fd_sets pointed to by readfds and
- * writefds for select() usage.
- * Return value: value of the highest fd + 1 or -1 if server is NULL
- */
-int udscs_server_fill_fds(struct udscs_server *server, fd_set *readfds,
-    fd_set *writefds);
-
-/* Handle any events flagged by select for the given udscs server.
- * Does nothing if server is NULL.
- */
-void udscs_server_handle_fds(struct udscs_server *server, fd_set *readfds,
-    fd_set *writefds);
-
 /* Returns the peer's PID. */
 int udscs_get_peer_pid(struct udscs_connection *conn);
 
diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
new file mode 100644
index 0000000..e492770
--- /dev/null
+++ b/src/vdagent-connection.c
@@ -0,0 +1,300 @@
+/*  vdagent-connection.c
+
+    Copyright 2018 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <syslog.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <glib/gstdio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixsocketaddress.h>
+
+#include "vdagent-connection.h"
+
+struct VDAgentConnection {
+    GObject                 parent_instance;
+
+    GIOStream              *io_stream;
+    gboolean                opening;
+    GCancellable           *cancellable;
+
+    GQueue                 *write_queue;
+    GMainLoop              *flush_loop;
+
+    gsize                   header_size;
+    gpointer                header_buff;
+    gpointer                read_buff;
+
+    VDAgentConnHeaderReadCb header_read_cb;
+    VDAgentConnReadCb       read_cb;
+    VDAgentConnErrorCb      error_cb;
+
+    gpointer                user_data;
+};
+
+G_DEFINE_TYPE(VDAgentConnection, vdagent_connection, G_TYPE_OBJECT);
+
+static void write_next_message(VDAgentConnection *conn);
+static void read_next_message(VDAgentConnection *conn);
+
+GIOStream *vdagent_file_open(const gchar *path, GError **err)
+{
+    gint fd, errsv;
+
+    fd = g_open(path, O_RDWR);
+    if (fd == -1) {
+        errsv = errno;
+        g_set_error_literal(err, G_FILE_ERROR,
+                            g_file_error_from_errno(errsv),
+                            g_strerror(errsv));
+        return NULL;
+    }
+
+    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
+                                  g_unix_output_stream_new(fd, TRUE));
+}
+
+GIOStream *vdagent_socket_connect(const gchar *address, GError **err)
+{
+    GSocketConnection *socket_conn;
+    GSocketClient *client;
+    GSocketConnectable *connectable;
+
+    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
+    client = g_object_new(G_TYPE_SOCKET_CLIENT,
+                          "family", G_SOCKET_FAMILY_UNIX,
+                          "type", G_SOCKET_TYPE_STREAM,
+                          NULL);
+
+    socket_conn = g_socket_client_connect(client, connectable, NULL, err);
+    g_object_unref(client);
+    g_object_unref(connectable);
+    return G_IO_STREAM(socket_conn);
+}
+
+static void vdagent_connection_init(VDAgentConnection *conn)
+{
+    conn->cancellable = g_cancellable_new();
+    conn->write_queue = g_queue_new();
+    conn->flush_loop = NULL;
+    conn->read_buff = NULL;
+}
+
+static void vdagent_connection_dispose(GObject *obj)
+{
+    VDAgentConnection *conn = VDAGENT_CONNECTION(obj);
+    g_clear_object(&conn->cancellable);
+    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+    g_clear_object(&conn->io_stream);
+
+    G_OBJECT_CLASS(vdagent_connection_parent_class)->dispose(obj);
+}
+
+static void vdagent_connection_finalize(GObject *obj)
+{
+    VDAgentConnection *conn = VDAGENT_CONNECTION(obj);
+    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
+    g_free(conn->header_buff);
+    g_free(conn->read_buff);
+
+    G_OBJECT_CLASS(vdagent_connection_parent_class)->finalize(obj);
+}
+
+static void vdagent_connection_class_init(VDAgentConnectionClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    gobject_class->dispose  = vdagent_connection_dispose;
+    gobject_class->finalize = vdagent_connection_finalize;
+}
+
+VDAgentConnection *vdagent_connection_new(
+    GIOStream              *io_stream,
+    gboolean                wait_on_opening,
+    gsize                   header_size,
+    VDAgentConnHeaderReadCb header_read_cb,
+    VDAgentConnReadCb       read_cb,
+    VDAgentConnErrorCb      error_cb,
+    gpointer                user_data)
+{
+    VDAgentConnection *conn;
+    conn = g_object_new(VDAGENT_TYPE_CONNECTION, NULL);
+    conn->io_stream = io_stream;
+    conn->opening = wait_on_opening;
+    conn->header_size = header_size;
+    conn->header_buff = g_malloc(header_size);
+    conn->header_read_cb = header_read_cb;
+    conn->read_cb = read_cb;
+    conn->error_cb = error_cb;
+    conn->user_data = user_data;
+
+    read_next_message(conn);
+
+    return conn;
+}
+
+void vdagent_connection_destroy(VDAgentConnection *conn)
+{
+    g_cancellable_cancel(conn->cancellable);
+    g_object_unref(conn);
+}
+
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn,
+                                                      GError           **err)
+{
+    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
+
+    GSocketConnection *socket_conn = G_SOCKET_CONNECTION(conn->io_stream);
+    return g_socket_get_credentials(
+               g_socket_connection_get_socket(socket_conn), err);
+}
+
+static void message_write_cb(GObject      *source_object,
+                             GAsyncResult *res,
+                             gpointer      user_data)
+{
+    VDAgentConnection *conn = user_data;
+    GOutputStream *out = G_OUTPUT_STREAM(source_object);
+    GError *err = NULL;
+
+    g_output_stream_write_all_finish(out, res, NULL, &err);
+    g_bytes_unref(g_queue_pop_head(conn->write_queue));
+
+    if (err) {
+        if (!g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+            conn->error_cb(err, conn->user_data);
+        g_error_free(err);
+        g_object_unref(conn);
+        return;
+    }
+    g_object_unref(conn);
+
+    conn->opening = FALSE;
+
+    if (g_queue_is_empty(conn->write_queue))
+        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+    else
+        write_next_message(conn);
+}
+
+static void write_next_message(VDAgentConnection *conn)
+{
+    GBytes *msg;
+    GOutputStream *out;
+
+    msg = g_queue_peek_head(conn->write_queue);
+    out = g_io_stream_get_output_stream(conn->io_stream);
+
+    g_output_stream_write_all_async(out,
+        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
+        G_PRIORITY_DEFAULT, conn->cancellable,
+        message_write_cb, g_object_ref(conn));
+}
+
+void vdagent_connection_write(VDAgentConnection *conn,
+                              gpointer           data,
+                              gsize              size)
+{
+    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
+
+    if (g_queue_get_length(conn->write_queue) == 1)
+        write_next_message(conn);
+}
+
+void vdagent_connection_flush(VDAgentConnection *conn)
+{
+    GMainLoop *loop;
+    /* TODO: allow multiple flush calls at once? */
+    g_return_if_fail(conn->flush_loop == NULL);
+
+    if (g_queue_is_empty(conn->write_queue))
+        return;
+
+    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
+    /* When using GTK+, this should be wrapped with
+     * gdk_threads_leave() and gdk_threads_enter(),
+     * but since flush is used in virtio-port.c only
+     * let's leave it as it is for now. */
+    g_main_loop_run(loop);
+    g_main_loop_unref(loop);
+}
+
+static void message_read_cb(GObject      *source_object,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+    VDAgentConnection *conn = user_data;
+    GInputStream *in = G_INPUT_STREAM(source_object);
+    GError *err = NULL;
+    gsize bytes_read, data_size;
+
+    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
+    if (err) {
+        if (!g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+            conn->error_cb(err, conn->user_data);
+        g_error_free(err);
+        g_object_unref(conn);
+        return;
+    }
+    g_object_unref(conn);
+    if (bytes_read == 0) {
+        /* see virtio-port.c for the rationale behind this */
+        if (conn->opening) {
+            g_usleep(10000);
+            read_next_message(conn);
+        } else {
+            conn->error_cb(NULL, conn->user_data);
+        }
+        return;
+    }
+    conn->opening = FALSE;
+
+    if (conn->read_buff == NULL) {
+        /* we've read the message header, now let's read its body */
+        if (conn->header_read_cb(conn->header_buff,
+                                 &data_size,
+                                 conn->user_data) == FALSE)
+            return;
+
+        if (data_size > 0) {
+            conn->read_buff = g_malloc(data_size);
+            g_input_stream_read_all_async(in,
+                conn->read_buff, data_size,
+                G_PRIORITY_DEFAULT, conn->cancellable,
+                message_read_cb, g_object_ref(conn));
+            return;
+        }
+    }
+
+    if (conn->read_cb(conn->header_buff,
+                      conn->read_buff,
+                      conn->user_data) == FALSE)
+        return;
+    g_clear_pointer(&conn->read_buff, g_free);
+    read_next_message(conn);
+}
+
+static void read_next_message(VDAgentConnection *conn)
+{
+    GInputStream *in;
+    in = g_io_stream_get_input_stream(conn->io_stream);
+
+    g_input_stream_read_all_async(in,
+        conn->header_buff, conn->header_size,
+        G_PRIORITY_DEFAULT, conn->cancellable,
+        message_read_cb, g_object_ref(conn));
+}
diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
new file mode 100644
index 0000000..fbfc2fb
--- /dev/null
+++ b/src/vdagent-connection.h
@@ -0,0 +1,124 @@
+/*  vdagent-connection.h
+
+    Copyright 2018 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __VDAGENT_CONNECTION_H
+#define __VDAGENT_CONNECTION_H
+
+#include <glib.h>
+#include <gio/gio.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define VDAGENT_TYPE_CONNECTION            (vdagent_connection_get_type())
+#define VDAGENT_CONNECTION(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj), VDAGENT_TYPE_CONNECTION, VDAgentConnection))
+#define VDAGENT_IS_CONNECTION(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj), VDAGENT_TYPE_CONNECTION))
+#define VDAGENT_CONNECTION_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass), VDAGENT_TYPE_CONNECTION, VDAgentConnectionClass))
+#define VDAGENT_IS_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), VDAGENT_TYPE_CONNECTION))
+#define VDAGENT_CONNECTION_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS((obj), VDAGENT_TYPE_CONNECTION, VDAgentConnectionClass))
+
+typedef struct VDAgentConnection      VDAgentConnection;
+typedef struct VDAgentConnectionClass VDAgentConnectionClass;
+
+struct VDAgentConnectionClass {
+    GObjectClass parent_class;
+};
+
+GType vdagent_connection_get_type(void);
+
+/* Called when a message header has been read.
+ *
+ * If the handler wishes to continue reading,
+ * it must set @body_size to the size of message's body and return TRUE.
+ * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
+ *
+ * Otherwise the handler should return FALSE
+ * and call vdagent_connection_destroy().
+ *
+ * @header_buff is owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
+                                            gsize   *body_size,
+                                            gpointer user_data);
+
+/* Called when a full message has been read.
+ *
+ * If the handler wished to continue reading, it must return TRUE,
+ * otherwise FALSE and call vdagent_connection_destroy().
+ *
+ * @header, @data are owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnReadCb)(gpointer header,
+                                      gpointer data,
+                                      gpointer user_data);
+
+/* Called when an error occured during read or write.
+ * If @err is NULL, the connection was closed by the remote side.
+ * The handler is expected to call vdagent_connection_destroy(). */
+typedef void (*VDAgentConnErrorCb)(GError *err, gpointer user_data);
+
+/* Open a file in @path for read and write.
+ * Returns a new GIOStream to the given file or NULL when @err is set. */
+GIOStream *vdagent_file_open(const gchar *path, GError **err);
+
+/* Create a socket and initiate a new connection to the socket on @address.
+ * Returns a new GIOStream or NULL when @err is set. */
+GIOStream *vdagent_socket_connect(const gchar *address, GError **err);
+
+/* Create new VDAgentConnection and start reading incoming messages.
+ *
+ * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
+ * until the first message is successfully read or written to the @io_stream.
+ *
+ * @user_data will be passed to the supplied callbacks. */
+VDAgentConnection *vdagent_connection_new(
+    GIOStream              *io_stream,
+    gboolean                wait_on_opening,
+    gsize                   header_size,
+    VDAgentConnHeaderReadCb header_read_cb,
+    VDAgentConnReadCb       read_cb,
+    VDAgentConnErrorCb      error_cb,
+    gpointer                user_data);
+
+/* Free up all resources associated with the VDAgentConnection.
+ *
+ * This operation can be asynchronous. */
+void vdagent_connection_destroy(VDAgentConnection *conn);
+
+/* Append a message to the write queue.
+ *
+ * VDAgentConnection takes ownership of the @data
+ * and frees it once the message is flushed. */
+void vdagent_connection_write(VDAgentConnection *conn,
+                              gpointer           data,
+                              gsize              size);
+
+/* Waits until all queued messages get written to the output stream.
+ *
+ * Note: other GSources can be triggered during this call */
+void vdagent_connection_flush(VDAgentConnection *conn);
+
+/* Returns the credentials of the foreign process connected to the socket.
+ * The returned object must be freed using g_object_unref().
+ *
+ * It is an error to call this function with a VDAgentConnection
+ * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn,
+                                                      GError           **err);
+
+G_END_DECLS
+
+#endif
diff --git a/src/vdagent/vdagent.c b/src/vdagent/vdagent.c
index f7c8b72..d7e2aca 100644
--- a/src/vdagent/vdagent.c
+++ b/src/vdagent/vdagent.c
@@ -471,6 +471,9 @@ reconnect:
     vdagent_destroy(agent);
     agent = NULL;
 
+    /* allow the VDAgentConnection to close and finalize properly */
+    g_main_context_iteration(NULL, FALSE);
+
     if (!quit && do_daemonize)
         goto reconnect;
 
diff --git a/src/vdagentd/vdagentd.c b/src/vdagentd/vdagentd.c
index 99683da..f52f039 100644
--- a/src/vdagentd/vdagentd.c
+++ b/src/vdagentd/vdagentd.c
@@ -31,10 +31,9 @@
 #include <errno.h>
 #include <signal.h>
 #include <syslog.h>
-#include <sys/select.h>
 #include <sys/stat.h>
 #include <spice/vd_agent.h>
-#include <glib.h>
+#include <glib-unix.h>
 
 #ifdef WITH_SYSTEMD_SOCKET_ACTIVATION
 #include <systemd/sd-daemon.h>
@@ -81,11 +80,18 @@ static const char *active_session = NULL;
 static unsigned int session_count = 0;
 static struct udscs_connection *active_session_conn = NULL;
 static int agent_owns_clipboard[256] = { 0, };
-static int quit = 0;
 static int retval = 0;
 static int client_connected = 0;
 static int max_clipboard = -1;
 
+static GMainLoop *loop;
+
+static void vdagentd_quit(gint exit_code)
+{
+    retval = exit_code;
+    g_main_loop_quit(loop);
+}
+
 /* utility functions */
 static void virtio_msg_uint32_to_le(uint8_t *_msg, uint32_t size, uint32_t offset)
 {
@@ -168,8 +174,7 @@ void do_client_mouse(struct vdagentd_uinput **uinputp, VDAgentMouseState *mouse)
                                               uinput_fake);
         if (!*uinputp) {
             syslog(LOG_CRIT, "Fatal uinput error");
-            retval = 1;
-            quit = 1;
+            vdagentd_quit(1);
         }
     }
 }
@@ -510,6 +515,11 @@ static int virtio_port_read_complete(
         VDAgentMessage *message_header,
         uint8_t *data)
 {
+    /* This callback could be invoked during vdagent_virtio_port_flush(),
+     * don't process any incoming messages when quitting. */
+    if (!g_main_loop_is_running(loop))
+        return 0;
+
     if (!vdagent_message_check_size(message_header))
         return 0;
 
@@ -565,6 +575,27 @@ static int virtio_port_read_complete(
     return 0;
 }
 
+static void virtio_port_disconnect_cb(struct vdagent_virtio_port *vport,
+                                      GError *err)
+{
+    if (err == NULL)
+        return;
+
+    gboolean old_client_connected = client_connected;
+    syslog(LOG_CRIT, "AIIEEE lost spice client connection, reconnecting: %s",
+                     err->message);
+    virtio_port = vdagent_virtio_port_create(portdev,
+                                             virtio_port_read_complete,
+                                             virtio_port_disconnect_cb);
+    if (virtio_port == NULL) {
+        syslog(LOG_CRIT, "Fatal error opening vdagent virtio channel");
+        vdagentd_quit(1);
+        return;
+    }
+    do_client_disconnect();
+    client_connected = old_client_connected;
+}
+
 static void virtio_write_clipboard(uint8_t selection, uint32_t msg_type,
     uint32_t data_type, uint8_t *data, uint32_t data_size)
 {
@@ -703,8 +734,7 @@ static void check_xorg_resolution(void)
                                         agent_data->screen_count);
         if (!uinput) {
             syslog(LOG_CRIT, "Fatal uinput error");
-            retval = 1;
-            quit = 1;
+            vdagentd_quit(1);
             return;
         }
 
@@ -712,11 +742,10 @@ static void check_xorg_resolution(void)
             syslog(LOG_INFO, "opening vdagent virtio channel");
             virtio_port = vdagent_virtio_port_create(portdev,
                                                      virtio_port_read_complete,
-                                                     NULL);
+                                                     virtio_port_disconnect_cb);
             if (!virtio_port) {
                 syslog(LOG_CRIT, "Fatal error opening vdagent virtio channel");
-                retval = 1;
-                quit = 1;
+                vdagentd_quit(1);
                 return;
             }
             send_capabilities(virtio_port, 1);
@@ -726,6 +755,11 @@ static void check_xorg_resolution(void)
         vdagentd_uinput_destroy(&uinput);
 #endif
         if (virtio_port) {
+            if (only_once) {
+                syslog(LOG_INFO, "Exiting after one client session.");
+                vdagentd_quit(0);
+                return;
+            }
             vdagent_virtio_port_flush(&virtio_port);
             vdagent_virtio_port_destroy(&virtio_port);
             syslog(LOG_INFO, "closed vdagent virtio channel");
@@ -939,6 +973,15 @@ static void agent_read_complete(struct udscs_connection **connp,
     }
 }
 
+static gboolean si_io_channel_cb(GIOChannel  *source,
+                                 GIOCondition condition,
+                                 gpointer     data)
+{
+    active_session = session_info_get_active_session(session_info);
+    update_active_session_connection(NULL);
+    return G_SOURCE_CONTINUE;
+}
+
 /* main */
 
 static void daemonize(void)
@@ -967,76 +1010,10 @@ static void daemonize(void)
     }
 }
 
-static void main_loop(void)
-{
-    fd_set readfds, writefds;
-    int n, nfds;
-    int ck_fd = 0;
-    int once = 0;
-
-    while (!quit) {
-        FD_ZERO(&readfds);
-        FD_ZERO(&writefds);
-
-        nfds = udscs_server_fill_fds(server, &readfds, &writefds);
-        n = vdagent_virtio_port_fill_fds(virtio_port, &readfds, &writefds);
-        if (n >= nfds)
-            nfds = n + 1;
-
-        if (session_info) {
-            ck_fd = session_info_get_fd(session_info);
-            FD_SET(ck_fd, &readfds);
-            if (ck_fd >= nfds)
-                nfds = ck_fd + 1;
-        }
-
-        n = select(nfds, &readfds, &writefds, NULL, NULL);
-        if (n == -1) {
-            if (errno == EINTR)
-                continue;
-            syslog(LOG_CRIT, "Fatal error select: %m");
-            retval = 1;
-            break;
-        }
-
-        udscs_server_handle_fds(server, &readfds, &writefds);
-
-        if (virtio_port) {
-            once = 1;
-            vdagent_virtio_port_handle_fds(&virtio_port, &readfds, &writefds);
-            if (!virtio_port) {
-                int old_client_connected = client_connected;
-                syslog(LOG_CRIT,
-                       "AIIEEE lost spice client connection, reconnecting");
-                virtio_port = vdagent_virtio_port_create(portdev,
-                                                     virtio_port_read_complete,
-                                                     NULL);
-                if (!virtio_port) {
-                    syslog(LOG_CRIT,
-                           "Fatal error opening vdagent virtio channel");
-                    retval = 1;
-                    break;
-                }
-                do_client_disconnect();
-                client_connected = old_client_connected;
-            }
-        }
-        else if (only_once && once)
-        {
-            syslog(LOG_INFO, "Exiting after one client session.");
-            break;
-        }
-
-        if (session_info && FD_ISSET(ck_fd, &readfds)) {
-            active_session = session_info_get_active_session(session_info);
-            update_active_session_connection(NULL);
-        }
-    }
-}
-
-static void quit_handler(int sig)
+static gboolean signal_handler(gpointer user_data)
 {
-    quit = 1;
+    vdagentd_quit(0);
+    return G_SOURCE_REMOVE;
 }
 
 static gboolean parse_debug_level_cb(const gchar *option_name,
@@ -1090,8 +1067,8 @@ int main(int argc, char *argv[])
 {
     GOptionContext *context;
     GError *err = NULL;
-    struct sigaction act;
     gboolean own_socket = TRUE;
+    GIOChannel *si_io_channel = NULL;
 
     context = g_option_context_new(NULL);
     g_option_context_add_main_entries(context, cmd_entries, NULL);
@@ -1116,13 +1093,9 @@ int main(int argc, char *argv[])
         uinput_device = g_strdup(DEFAULT_UINPUT_DEVICE);
     }
 
-    memset(&act, 0, sizeof(act));
-    act.sa_flags = SA_RESTART;
-    act.sa_handler = quit_handler;
-    sigaction(SIGINT, &act, NULL);
-    sigaction(SIGHUP, &act, NULL);
-    sigaction(SIGTERM, &act, NULL);
-    sigaction(SIGQUIT, &act, NULL);
+    g_unix_signal_add(SIGINT, signal_handler, NULL);
+    g_unix_signal_add(SIGHUP, signal_handler, NULL);
+    g_unix_signal_add(SIGTERM, signal_handler, NULL);
 
     openlog("spice-vdagentd", do_daemonize ? 0 : LOG_PERROR, LOG_USER);
 
@@ -1154,7 +1127,7 @@ int main(int argc, char *argv[])
             syslog(LOG_CRIT, "Fatal the server socket %s exists already. Delete it?",
                    vdagentd_socket);
         } else {
-            syslog(LOG_CRIT, "Fatal could not create the server socket %s: %m",
+            syslog(LOG_CRIT, "Fatal could not create the server socket %s",
                    vdagentd_socket);
         }
         return 1;
@@ -1184,19 +1157,31 @@ int main(int argc, char *argv[])
 
     if (want_session_info)
         session_info = session_info_create(debug);
-    if (!session_info)
+    if (session_info) {
+        si_io_channel = g_io_channel_unix_new(
+                            session_info_get_fd(session_info));
+        g_io_add_watch(si_io_channel, G_IO_IN, si_io_channel_cb, NULL);
+    } else
         syslog(LOG_WARNING, "no session info, max 1 session agent allowed");
 
     active_xfers = g_hash_table_new(g_direct_hash, g_direct_equal);
-    main_loop();
+
+    loop = g_main_loop_new(NULL, FALSE);
+    g_main_loop_run(loop);
 
     release_clipboards();
 
     vdagentd_uinput_destroy(&uinput);
+    g_clear_pointer(&si_io_channel, g_io_channel_unref);
+    g_clear_pointer(&session_info, session_info_destroy);
+    g_clear_pointer(&server, udscs_destroy_server);
     vdagent_virtio_port_flush(&virtio_port);
     vdagent_virtio_port_destroy(&virtio_port);
-    session_info_destroy(session_info);
-    udscs_destroy_server(server);
+
+    /* allow the VDAgentConnection(s) to close and finalize properly */
+    g_main_context_iteration(NULL, FALSE);
+
+    g_main_loop_unref(loop);
 
     /* leave the socket around if it was provided by systemd */
     if (own_socket) {
diff --git a/src/vdagentd/virtio-port.c b/src/vdagentd/virtio-port.c
index 497811e..5e65a7f 100644
--- a/src/vdagentd/virtio-port.c
+++ b/src/vdagentd/virtio-port.c
@@ -19,28 +19,20 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
-#include <errno.h>
 #include <stdlib.h>
 #include <string.h>
 #include <syslog.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
 #include <gio/gio.h>
+#include <glib-unix.h>
 
+#include "vdagent-connection.h"
 #include "virtio-port.h"
 
 
 struct vdagent_virtio_port_buf {
     uint8_t *buf;
-    size_t pos;
     size_t size;
     size_t write_pos;
-
-    struct vdagent_virtio_port_buf *next;
 };
 
 /* Data to keep track of the assembling of vdagent messages per chunk port,
@@ -53,78 +45,126 @@ struct vdagent_virtio_port_chunk_port_data {
 };
 
 struct vdagent_virtio_port {
-    int fd;
-    int opening;
-    int is_uds;
-
-    /* Chunk read stuff, single buffer, separate header and data buffer */
-    int chunk_header_read;
-    int chunk_data_pos;
-    VDIChunkHeader chunk_header;
-    uint8_t chunk_data[VD_AGENT_MAX_DATA_SIZE];
+    VDAgentConnection *conn;
 
     /* Per chunk port data */
     struct vdagent_virtio_port_chunk_port_data port_data[VDP_END_PORT];
 
-    /* Writes are stored in a linked list of buffers, with both the header
-       + data for a single message in 1 buffer. */
-    struct vdagent_virtio_port_buf *write_buf;
+    struct vdagent_virtio_port_buf write_buf;
 
     /* Callbacks */
     vdagent_virtio_port_read_callback read_callback;
     vdagent_virtio_port_disconnect_callback disconnect_callback;
 };
 
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp);
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp);
+static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp,
+                                         VDIChunkHeader *chunk_header,
+                                         gconstpointer chunk_data);
+
+static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
+                                GError *err);
+
+static gboolean conn_header_read_cb(gpointer header_buff,
+                                    gsize   *body_size,
+                                    gpointer user_data)
+{
+    struct vdagent_virtio_port *vport = user_data;
+    VDIChunkHeader *header = header_buff;
+    GError *err;
+
+    header->size = GUINT32_FROM_LE(header->size);
+    header->port = GUINT32_FROM_LE(header->port);
+
+    if (header->size > VD_AGENT_MAX_DATA_SIZE) {
+        err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+                          "chunk size %u too large", header->size);
+        virtio_port_destroy(&vport, err);
+        return FALSE;
+    }
+    if (header->port >= VDP_END_PORT) {
+        err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+                          "chunk port %u out of range", header->port);
+        virtio_port_destroy(&vport, err);
+        return FALSE;
+    }
+
+    *body_size = header->size;
+    return TRUE;
+}
+
+static gboolean conn_read_cb(gpointer header,
+                             gpointer data,
+                             gpointer user_data)
+{
+    struct vdagent_virtio_port *vport = user_data;
+    vdagent_virtio_port_do_chunk(&vport, header, data);
+    return vport != NULL;
+}
+
+static void conn_error_cb(GError *err, gpointer user_data)
+{
+    struct vdagent_virtio_port *vport = user_data;
+    virtio_port_destroy(&vport, err ? g_error_copy(err) : NULL);
+}
 
 struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
     vdagent_virtio_port_read_callback read_callback,
     vdagent_virtio_port_disconnect_callback disconnect_callback)
 {
     struct vdagent_virtio_port *vport;
-    struct sockaddr_un address;
-    int c;
-
-    vport = g_new0(struct vdagent_virtio_port, 1);
-
-    vport->fd = open(portname, O_RDWR);
-    if (vport->fd == -1) {
-        vport->fd = socket(PF_UNIX, SOCK_STREAM, 0);
-        if (vport->fd == -1) {
-            goto error;
-        }
-        address.sun_family = AF_UNIX;
-        snprintf(address.sun_path, sizeof(address.sun_path), "%s", portname);
-        c = connect(vport->fd, (struct sockaddr *)&address, sizeof(address));
-        if (c == 0) {
-            vport->is_uds = 1;
-        } else {
-            goto error;
+    GIOStream *io_stream;
+    GError *err = NULL;
+
+    io_stream = vdagent_file_open(portname, &err);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_clear_error(&err);
+        io_stream = vdagent_socket_connect(portname, &err);
+        if (err) {
+            syslog(LOG_ERR, "%s: %s", __func__, err->message);
+            g_error_free(err);
+            return NULL;
         }
-    } else {
-        vport->is_uds = 0;
     }
-    vport->opening = 1;
 
+    vport = g_new0(struct vdagent_virtio_port, 1);
+
+    /* When calling vdagent_connection_new(),
+     * @wait_on_opening MUST be set to TRUE:
+     *
+     * When we open the virtio serial port, the following happens:
+     * 1) The linux kernel virtio_console driver sends a
+     *    VIRTIO_CONSOLE_PORT_OPEN message to qemu
+     * 2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
+     *    register the agent chardev with the spice-server
+     * 3) spice-server then calls the spicevmc chardev driver's state
+     *    callback to let it know it is ready to receive data
+     * 4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
+     *    chardev backend
+     * 5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
+     *    to the linux kernel virtio_console driver
+     *
+     * Until steps 1 - 5 have completed the linux kernel virtio_console
+     * driver sees the virtio serial port as being in a disconnected state
+     * and read will return 0 ! So if we blindly assume that a read 0 means
+     * that the channel is closed we will hit a race here.
+     */
+    vport->conn = vdagent_connection_new(io_stream,
+                                         TRUE,
+                                         sizeof(VDIChunkHeader),
+                                         conn_header_read_cb,
+                                         conn_read_cb,
+                                         conn_error_cb,
+                                         vport);
     vport->read_callback = read_callback;
     vport->disconnect_callback = disconnect_callback;
 
     return vport;
-
-error:
-    syslog(LOG_ERR, "open %s: %m", portname);
-    if (vport->fd != -1) {
-        close(vport->fd);
-    }
-    g_free(vport);
-    return NULL;
 }
 
 static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
                                 GError *err)
 {
-    struct vdagent_virtio_port_buf *wbuf, *next_wbuf;
     struct vdagent_virtio_port *vport = *vportp;
     int i;
 
@@ -136,19 +176,13 @@ static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
 
     g_clear_error(&err);
 
-    wbuf = vport->write_buf;
-    while (wbuf) {
-        next_wbuf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-        wbuf = next_wbuf;
-    }
+    g_free(vport->write_buf.buf);
 
     for (i = 0; i < VDP_END_PORT; i++) {
         g_free(vport->port_data[i].message_data);
     }
 
-    close(vport->fd);
+    vdagent_connection_destroy(vport->conn);
     g_clear_pointer(vportp, g_free);
 }
 
@@ -157,47 +191,6 @@ void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp)
     virtio_port_destroy(vportp, NULL);
 }
 
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
-        fd_set *readfds, fd_set *writefds)
-{
-    if (!vport)
-        return -1;
-
-    FD_SET(vport->fd, readfds);
-    if (vport->write_buf)
-        FD_SET(vport->fd, writefds);
-
-    return vport->fd + 1;
-}
-
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
-        fd_set *readfds, fd_set *writefds)
-{
-    if (!*vportp)
-        return;
-
-    if (FD_ISSET((*vportp)->fd, readfds))
-        vdagent_virtio_port_do_read(vportp);
-
-    if (*vportp && FD_ISSET((*vportp)->fd, writefds))
-        vdagent_virtio_port_do_write(vportp);
-}
-
-static struct vdagent_virtio_port_buf* vdagent_virtio_port_get_last_wbuf(
-    struct vdagent_virtio_port *vport)
-{
-    struct vdagent_virtio_port_buf *wbuf;
-
-    wbuf = vport->write_buf;
-    if (!wbuf)
-        return NULL;
-
-    while (wbuf->next)
-        wbuf = wbuf->next;
-
-    return wbuf;
-}
-
 void vdagent_virtio_port_write_start(
         struct vdagent_virtio_port *vport,
         uint32_t port_nr,
@@ -205,15 +198,15 @@ void vdagent_virtio_port_write_start(
         uint32_t message_opaque,
         uint32_t data_size)
 {
-    struct vdagent_virtio_port_buf *wbuf, *new_wbuf;
+    struct vdagent_virtio_port_buf *new_wbuf;
     VDIChunkHeader chunk_header;
     VDAgentMessage message_header;
 
-    new_wbuf = g_new(struct vdagent_virtio_port_buf, 1);
-    new_wbuf->pos = 0;
+    g_return_if_fail(vport->write_buf.buf == NULL);
+
+    new_wbuf = &vport->write_buf;
     new_wbuf->write_pos = 0;
     new_wbuf->size = sizeof(chunk_header) + sizeof(message_header) + data_size;
-    new_wbuf->next = NULL;
     new_wbuf->buf = g_malloc(new_wbuf->size);
 
     chunk_header.port = GUINT32_TO_LE(port_nr);
@@ -229,14 +222,6 @@ void vdagent_virtio_port_write_start(
     memcpy(new_wbuf->buf + new_wbuf->write_pos, &message_header,
            sizeof(message_header));
     new_wbuf->write_pos += sizeof(message_header);
-
-    if (!vport->write_buf) {
-        vport->write_buf = new_wbuf;
-        return;
-    }
-
-    wbuf = vdagent_virtio_port_get_last_wbuf(vport);
-    wbuf->next = new_wbuf;
 }
 
 int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
@@ -244,8 +229,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
 {
     struct vdagent_virtio_port_buf *wbuf;
 
-    wbuf = vdagent_virtio_port_get_last_wbuf(vport);
-    if (!wbuf) {
+    if (size == 0)
+        return 0;
+
+    wbuf = &vport->write_buf;
+    if (!wbuf->buf) {
         syslog(LOG_ERR, "can't append without a buffer");
         return -1;
     }
@@ -257,6 +245,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
 
     memcpy(wbuf->buf + wbuf->write_pos, data, size);
     wbuf->write_pos += size;
+
+    if (wbuf->write_pos == wbuf->size) {
+        vdagent_connection_write(vport->conn, wbuf->buf, wbuf->size);
+        wbuf->buf = NULL;
+    }
     return 0;
 }
 
@@ -275,8 +268,8 @@ void vdagent_virtio_port_write(
 
 void vdagent_virtio_port_flush(struct vdagent_virtio_port **vportp)
 {
-    while (*vportp && (*vportp)->write_buf)
-        vdagent_virtio_port_do_write(vportp);
+    if (*vportp)
+        vdagent_connection_flush((*vportp)->conn);
 }
 
 void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
@@ -289,20 +282,22 @@ void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
     memset(&vport->port_data[port], 0, sizeof(vport->port_data[0]));
 }
 
-static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
+static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp,
+                                         VDIChunkHeader *chunk_header,
+                                         gconstpointer chunk_data)
 {
     int avail, read, pos = 0;
     struct vdagent_virtio_port *vport = *vportp;
     struct vdagent_virtio_port_chunk_port_data *port =
-        &vport->port_data[vport->chunk_header.port];
+        &vport->port_data[chunk_header->port];
 
     if (port->message_header_read < sizeof(port->message_header)) {
         read = sizeof(port->message_header) - port->message_header_read;
-        if (read > vport->chunk_header.size) {
-            read = vport->chunk_header.size;
+        if (read > chunk_header->size) {
+            read = chunk_header->size;
         }
         memcpy((uint8_t *)&port->message_header + port->message_header_read,
-               vport->chunk_data, read);
+               chunk_data, read);
         port->message_header_read += read;
         if (port->message_header_read == sizeof(port->message_header)) {
 
@@ -320,7 +315,7 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
 
     if (port->message_header_read == sizeof(port->message_header)) {
         read  = port->message_header.size - port->message_data_pos;
-        avail = vport->chunk_header.size - pos;
+        avail = chunk_header->size - pos;
 
         if (avail > read) {
             GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
@@ -334,13 +329,13 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
 
         if (read) {
             memcpy(port->message_data + port->message_data_pos,
-                   vport->chunk_data + pos, read);
+                   chunk_data + pos, read);
             port->message_data_pos += read;
         }
 
         if (port->message_data_pos == port->message_header.size) {
             if (vport->read_callback) {
-                int r = vport->read_callback(vport, vport->chunk_header.port,
+                int r = vport->read_callback(vport, chunk_header->port,
                                  &port->message_header, port->message_data);
                 if (r == -1) {
                     virtio_port_destroy(vportp, NULL);
@@ -353,143 +348,3 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
         }
     }
 }
-
-static int vport_read(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
-    if (vport->is_uds) {
-        return recv(vport->fd, buf, len, 0);
-    } else {
-        return read(vport->fd, buf, len);
-    }
-}
-
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp)
-{
-    ssize_t n;
-    size_t to_read;
-    uint8_t *dest;
-    struct vdagent_virtio_port *vport = *vportp;
-
-    if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
-        to_read = sizeof(vport->chunk_header) - vport->chunk_header_read;
-        dest = (uint8_t *)&vport->chunk_header + vport->chunk_header_read;
-    } else {
-        to_read = vport->chunk_header.size - vport->chunk_data_pos;
-        dest = vport->chunk_data + vport->chunk_data_pos;
-    }
-
-    n = vport_read(vport, dest, to_read);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-    }
-    if (n == 0 && vport->opening) {
-        /* When we open the virtio serial port, the following happens:
-           1) The linux kernel virtio_console driver sends a
-              VIRTIO_CONSOLE_PORT_OPEN message to qemu
-           2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
-              register the agent chardev with the spice-server
-           3) spice-server then calls the spicevmc chardev driver's state
-              callback to let it know it is ready to receive data
-           4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
-              chardev backend
-           5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
-              to the linux kernel virtio_console driver
-
-           Until steps 1 - 5 have completed the linux kernel virtio_console
-           driver sees the virtio serial port as being in a disconnected state
-           and read will return 0 ! So if we blindly assume that a read 0 means
-           that the channel is closed we will hit a race here.
-
-           Therefore we ignore read returning 0 until we've successfully read
-           or written some data. If we hit this race we also sleep a bit here
-           to avoid busy waiting until the above steps complete */
-        usleep(10000);
-        return;
-    }
-    if (n <= 0) {
-        GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
-                                  "reading from vdagent virtio port: %m");
-        virtio_port_destroy(vportp, err);
-        return;
-    }
-    vport->opening = 0;
-
-    if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
-        vport->chunk_header_read += n;
-        if (vport->chunk_header_read == sizeof(vport->chunk_header)) {
-            vport->chunk_header.size = GUINT32_FROM_LE(vport->chunk_header.size);
-            vport->chunk_header.port = GUINT32_FROM_LE(vport->chunk_header.port);
-            if (vport->chunk_header.size > VD_AGENT_MAX_DATA_SIZE) {
-                GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
-                                          "chunk size %u too large",
-                                          vport->chunk_header.size);
-                virtio_port_destroy(vportp, err);
-                return;
-            }
-            if (vport->chunk_header.port >= VDP_END_PORT) {
-                GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
-                                          "chunk port %u out of range",
-                                          vport->chunk_header.port);
-                virtio_port_destroy(vportp, err);
-                return;
-            }
-        }
-    } else {
-        vport->chunk_data_pos += n;
-        if (vport->chunk_data_pos == vport->chunk_header.size) {
-            vdagent_virtio_port_do_chunk(vportp);
-            if (!*vportp)
-                return;
-            vport->chunk_header_read = 0;
-            vport->chunk_data_pos = 0;
-        }
-    }
-}
-
-static int vport_write(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
-    if (vport->is_uds) {
-        return send(vport->fd, buf, len, 0);
-    } else {
-        return write(vport->fd, buf, len);
-    }
-}
-
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp)
-{
-    ssize_t n;
-    size_t to_write;
-    struct vdagent_virtio_port *vport = *vportp;
-
-    struct vdagent_virtio_port_buf* wbuf = vport->write_buf;
-    if (!wbuf) {
-        syslog(LOG_ERR, "do_write called on a port without a write buf ?!");
-        return;
-    }
-
-    if (wbuf->write_pos != wbuf->size) {
-        syslog(LOG_ERR, "do_write: buffer is incomplete!!");
-        return;
-    }
-
-    to_write = wbuf->size - wbuf->pos;
-    n = vport_write(vport, wbuf->buf + wbuf->pos, to_write);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
-                                  "writing to vdagent virtio port: %m");
-        virtio_port_destroy(vportp, err);
-        return;
-    }
-    if (n > 0)
-        vport->opening = 0;
-
-    wbuf->pos += n;
-    if (wbuf->pos == wbuf->size) {
-        vport->write_buf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-    }
-}
diff --git a/src/vdagentd/virtio-port.h b/src/vdagentd/virtio-port.h
index dfbe27b..7d14deb 100644
--- a/src/vdagentd/virtio-port.h
+++ b/src/vdagentd/virtio-port.h
@@ -22,9 +22,7 @@
 #ifndef __VIRTIO_PORT_H
 #define __VIRTIO_PORT_H
 
-#include <stdio.h>
 #include <stdint.h>
-#include <sys/select.h>
 #include <spice/vd_agent.h>
 
 struct vdagent_virtio_port;
@@ -57,22 +55,6 @@ struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
 /* The contents of portp will be made NULL */
 void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp);
 
-
-/* Given a vdagent_virtio_port fill the fd_sets pointed to by readfds and
-   writefds for select() usage.
-
-   Return value: value of the highest fd + 1 */
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
-        fd_set *readfds, fd_set *writefds);
-
-/* Handle any events flagged by select for the given vdagent_virtio_port.
-   Note the port may be destroyed (when disconnected) by this call
-   in this case the disconnect calllback will get called before the
-   destruction and the contents of connp will be made NULL */
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
-        fd_set *readfds, fd_set *writefds);
-
-
 /* Queue a message for delivery, either bit by bit, or all at once */
 void vdagent_virtio_port_write_start(
         struct vdagent_virtio_port *vport,
-- 
2.17.1



More information about the Spice-devel mailing list