[Spice-devel] [vdagent-linux v2 2/5] introduce VDAgentConnection

Jakub Janků jjanku at redhat.com
Tue Apr 30 09:33:36 UTC 2019


1) VDAgentConnection

Add vdagent-connection.{c,h} files,
define a new derivable class VDAgentConnection.

VDAgentConnection uses GIO and therefore integrates well with GMainLoop.

Read messages must begin with a header of a fixed size.
Message body size can vary.

Note:
vdagent_connection_destroy() cancels ongoing I/O-ops
and closes the underlying FD.
However, the I/O-ops do not finish immediately,
but asynchronously in the next iteration of the main loop.
At the beginning of each op, VDAgentConnection is referenced,
so the object isn't finalized until the ops themselves finish.
To solve this, call g_main_context_iteration() before the agent exits.

2) udscs

Make udscs_connection a subclass of VDAgentConnection.

Rewrite udscs_server using GSocketService.
Use GList to store server's connections.

Drop support for select(), remove:
 * udscs_server_fill_fds()
 * udscs_server_handle_fds()

Remove udscs_{set,get}_user_data(). Custom data can be
associated with the GObject using g_object_set_data().

3) virtio_port

Make vdagent_virtio_port a subclass of VDAgentConnection.

Drop support for select(), remove:
 * vdagent_virtio_port_fill_fds()
 * vdagent_virtio_port_handle_fds()

4) vdagentd

Replace the main_loop() with a GMainLoop.

Use g_unix_signal_add() to handle SIGINT, SIGHUP, SIGTERM.
SIGQUIT handling is not supported by GLib.

Integrate the session_info into the loop using
GIOChannel and g_io_add_watch().

Signed-off-by: Jakub Janků <jjanku at redhat.com>
---
 Makefile.am                |   2 +
 src/udscs.c                | 586 ++++++++++---------------------------
 src/udscs.h                |  80 ++---
 src/vdagent-connection.c   | 322 ++++++++++++++++++++
 src/vdagent-connection.h   | 102 +++++++
 src/vdagent/vdagent.c      |  30 +-
 src/vdagentd/vdagentd.c    | 256 ++++++++--------
 src/vdagentd/virtio-port.c | 410 ++++++++------------------
 src/vdagentd/virtio-port.h |  64 ++--
 9 files changed, 912 insertions(+), 940 deletions(-)
 create mode 100644 src/vdagent-connection.c
 create mode 100644 src/vdagent-connection.h

diff --git a/Makefile.am b/Makefile.am
index bf937b9..787f158 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -9,6 +9,8 @@ TESTS = $(check_PROGRAMS)
 common_sources =				\
 	src/udscs.c				\
 	src/udscs.h				\
+	src/vdagent-connection.c		\
+	src/vdagent-connection.h		\
 	src/vdagentd-proto-strings.h		\
 	src/vdagentd-proto.h			\
 	$(NULL)
diff --git a/src/udscs.c b/src/udscs.c
index 32bd6e6..7ae1ebe 100644
--- a/src/udscs.c
+++ b/src/udscs.c
@@ -22,58 +22,24 @@
 */
 #include <config.h>
 
-#include <stdio.h>
 #include <stdlib.h>
 #include <syslog.h>
-#include <unistd.h>
-#include <errno.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
 #include <glib-unix.h>
+#include <gio/gunixsocketaddress.h>
 #include "udscs.h"
 #include "vdagentd-proto-strings.h"
-
-struct udscs_buf {
-    uint8_t *buf;
-    size_t pos;
-    size_t size;
-
-    struct udscs_buf *next;
-};
+#include "vdagent-connection.h"
 
 struct udscs_connection {
-    int fd;
-    int debug;
-    void *user_data;
-#ifndef UDSCS_NO_SERVER
-    struct ucred peer_cred;
-#endif
-
-    /* Read stuff, single buffer, separate header and data buffer */
-    int header_read;
-    struct udscs_message_header header;
-    struct udscs_buf data;
+    VDAgentConnection parent_instance;
 
-    /* Writes are stored in a linked list of buffers, with both the header
-       + data for a single message in 1 buffer. */
-    struct udscs_buf *write_buf;
+    int debug;
 
     /* Callbacks */
     udscs_read_callback read_callback;
-    udscs_disconnect_callback disconnect_callback;
-
-    struct udscs_connection *next;
-    struct udscs_connection *prev;
-
-    GIOChannel                     *io_channel;
-    guint                           write_watch_id;
-    guint                           read_watch_id;
 };
 
-static gboolean udscs_io_channel_cb(GIOChannel *source,
-                                    GIOCondition condition,
-                                    gpointer data);
+G_DEFINE_TYPE(UdscsConnection, udscs_connection, VDAGENT_TYPE_CONNECTION);
 
 static void debug_print_message_header(struct udscs_connection     *conn,
                                        struct udscs_message_header *header,
@@ -91,485 +57,256 @@ static void debug_print_message_header(struct udscs_connection     *conn,
         conn, direction, type, header->arg1, header->arg2, header->size);
 }
 
-struct udscs_connection *udscs_connect(const char *socketname,
-    udscs_read_callback read_callback,
-    udscs_disconnect_callback disconnect_callback,
-    int debug)
+static gsize conn_handle_header(VDAgentConnection *conn,
+                                gpointer           header_buf)
 {
-    int c;
-    struct sockaddr_un address;
-    struct udscs_connection *conn;
-
-    conn = g_new0(struct udscs_connection, 1);
-    conn->debug = debug;
-
-    conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (conn->fd == -1) {
-        syslog(LOG_ERR, "creating unix domain socket: %m");
-        g_free(conn);
-        return NULL;
-    }
-
-    address.sun_family = AF_UNIX;
-    snprintf(address.sun_path, sizeof(address.sun_path), "%s", socketname);
-    c = connect(conn->fd, (struct sockaddr *)&address, sizeof(address));
-    if (c != 0) {
-        if (conn->debug) {
-            syslog(LOG_DEBUG, "connect %s: %m", socketname);
-        }
-        close(conn->fd);
-        g_free(conn);
-        return NULL;
-    }
-
-    conn->io_channel = g_io_channel_unix_new(conn->fd);
-    if (!conn->io_channel) {
-        udscs_destroy_connection(&conn);
-        return NULL;
-    }
-    conn->read_watch_id =
-        g_io_add_watch(conn->io_channel,
-                       G_IO_IN | G_IO_ERR | G_IO_NVAL,
-                       udscs_io_channel_cb,
-                       conn);
-
-    conn->read_callback = read_callback;
-    conn->disconnect_callback = disconnect_callback;
-
-    if (conn->debug)
-        syslog(LOG_DEBUG, "%p connected to %s", conn, socketname);
-
-    return conn;
+    return ((struct udscs_message_header *)header_buf)->size;
 }
 
-void udscs_destroy_connection(struct udscs_connection **connp)
+static void conn_handle_message(VDAgentConnection *conn,
+                                gpointer           header_buf,
+                                gpointer           data)
 {
-    struct udscs_buf *wbuf, *next_wbuf;
-    struct udscs_connection *conn = *connp;
+    UdscsConnection *self = UDSCS_CONNECTION(conn);
+    struct udscs_message_header *header = header_buf;
 
-    if (!conn)
-        return;
+    debug_print_message_header(self, header, "received");
 
-    if (conn->disconnect_callback)
-        conn->disconnect_callback(conn);
+    self->read_callback(self, header, data);
+}
 
-    wbuf = conn->write_buf;
-    while (wbuf) {
-        next_wbuf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-        wbuf = next_wbuf;
-    }
+static void udscs_connection_init(UdscsConnection *self)
+{
+}
 
-    g_clear_pointer(&conn->data.buf, g_free);
+static void udscs_connection_finalize(GObject *obj)
+{
+    UdscsConnection *self = UDSCS_CONNECTION(obj);
 
-    if (conn->next)
-        conn->next->prev = conn->prev;
-    if (conn->prev)
-        conn->prev->next = conn->next;
+    if (self->debug)
+        syslog(LOG_DEBUG, "%p disconnected", self);
 
-    close(conn->fd);
+    G_OBJECT_CLASS(udscs_connection_parent_class)->finalize(obj);
+}
 
-    if (conn->write_watch_id != 0)
-        g_source_remove(conn->write_watch_id);
-    if (conn->read_watch_id != 0)
-        g_source_remove(conn->read_watch_id);
-    g_clear_pointer(&conn->io_channel, g_io_channel_unref);
+static void udscs_connection_class_init(UdscsConnectionClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    VDAgentConnectionClass *conn_class = VDAGENT_CONNECTION_CLASS(klass);
 
-    if (conn->debug)
-        syslog(LOG_DEBUG, "%p disconnected", conn);
+    gobject_class->finalize = udscs_connection_finalize;
 
-    g_clear_pointer(connp, g_free);
+    conn_class->handle_header = conn_handle_header;
+    conn_class->handle_message = conn_handle_message;
 }
 
-void udscs_set_user_data(struct udscs_connection *conn, void *data)
+struct udscs_connection *udscs_connect(const char *socketname,
+    udscs_read_callback read_callback,
+    VDAgentConnErrorCb error_cb,
+    int debug)
 {
-    conn->user_data = data;
-}
+    GIOStream *io_stream;
+    struct udscs_connection *conn;
+    GError *err = NULL;
 
-void *udscs_get_user_data(struct udscs_connection *conn)
-{
-    if (!conn)
+    io_stream = vdagent_socket_connect(socketname, &err);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
         return NULL;
+    }
 
-    return conn->user_data;
+    conn = g_object_new(UDSCS_TYPE_CONNECTION, NULL);
+    conn->debug = debug;
+    conn->read_callback = read_callback;
+    vdagent_connection_setup(VDAGENT_CONNECTION(conn),
+                             io_stream,
+                             FALSE,
+                             sizeof(struct udscs_message_header),
+                             error_cb);
+
+    if (conn->debug)
+        syslog(LOG_DEBUG, "%p connected to %s", conn, socketname);
+
+    return conn;
 }
 
 void udscs_write(struct udscs_connection *conn, uint32_t type, uint32_t arg1,
     uint32_t arg2, const uint8_t *data, uint32_t size)
 {
-    struct udscs_buf *wbuf, *new_wbuf;
+    gpointer buf;
+    guint buf_size;
     struct udscs_message_header header;
 
-    new_wbuf = g_new(struct udscs_buf, 1);
-    new_wbuf->pos = 0;
-    new_wbuf->size = sizeof(header) + size;
-    new_wbuf->next = NULL;
-    new_wbuf->buf = g_malloc(new_wbuf->size);
+    buf_size = sizeof(header) + size;
+    buf = g_malloc(buf_size);
 
     header.type = type;
     header.arg1 = arg1;
     header.arg2 = arg2;
     header.size = size;
 
-    memcpy(new_wbuf->buf, &header, sizeof(header));
-    memcpy(new_wbuf->buf + sizeof(header), data, size);
+    memcpy(buf, &header, sizeof(header));
+    memcpy(buf + sizeof(header), data, size);
 
     debug_print_message_header(conn, &header, "sent");
 
-    if (conn->io_channel && conn->write_watch_id == 0)
-        conn->write_watch_id =
-            g_io_add_watch(conn->io_channel,
-                           G_IO_OUT | G_IO_ERR | G_IO_NVAL,
-                           udscs_io_channel_cb,
-                           conn);
-
-    if (!conn->write_buf) {
-        conn->write_buf = new_wbuf;
-        return;
-    }
-
-    /* maybe we should limit the write_buf stack depth ? */
-    wbuf = conn->write_buf;
-    while (wbuf->next)
-        wbuf = wbuf->next;
-
-    wbuf->next = new_wbuf;
-}
-
-/* A helper for udscs_do_read() */
-static void udscs_read_complete(struct udscs_connection **connp)
-{
-    struct udscs_connection *conn = *connp;
-
-    debug_print_message_header(conn, &conn->header, "received");
-
-    if (conn->read_callback) {
-        conn->read_callback(connp, &conn->header, conn->data.buf);
-        if (!*connp) /* Was the connection disconnected by the callback ? */
-            return;
-    }
-
-    g_free(conn->data.buf);
-    memset(&conn->data, 0, sizeof(conn->data)); /* data.buf = NULL */
-    conn->header_read = 0;
-}
-
-static void udscs_do_read(struct udscs_connection **connp)
-{
-    ssize_t n;
-    size_t to_read;
-    uint8_t *dest;
-    struct udscs_connection *conn = *connp;
-
-    if (conn->header_read < sizeof(conn->header)) {
-        to_read = sizeof(conn->header) - conn->header_read;
-        dest = (uint8_t *)&conn->header + conn->header_read;
-    } else {
-        to_read = conn->data.size - conn->data.pos;
-        dest = conn->data.buf + conn->data.pos;
-    }
-
-    n = read(conn->fd, dest, to_read);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "reading unix domain socket: %m, disconnecting %p",
-               conn);
-    }
-    if (n <= 0) {
-        udscs_destroy_connection(connp);
-        return;
-    }
-
-    if (conn->header_read < sizeof(conn->header)) {
-        conn->header_read += n;
-        if (conn->header_read == sizeof(conn->header)) {
-            if (conn->header.size == 0) {
-                udscs_read_complete(connp);
-                return;
-            }
-            conn->data.pos = 0;
-            conn->data.size = conn->header.size;
-            conn->data.buf = g_malloc(conn->data.size);
-        }
-    } else {
-        conn->data.pos += n;
-        if (conn->data.pos == conn->data.size)
-            udscs_read_complete(connp);
-    }
-}
-
-static void udscs_do_write(struct udscs_connection **connp)
-{
-    ssize_t n;
-    size_t to_write;
-    struct udscs_connection *conn = *connp;
-
-    struct udscs_buf* wbuf = conn->write_buf;
-    if (!wbuf) {
-        syslog(LOG_ERR,
-               "%p do_write called on a connection without a write buf ?!",
-               conn);
-        return;
-    }
-
-    to_write = wbuf->size - wbuf->pos;
-    n = write(conn->fd, wbuf->buf + wbuf->pos, to_write);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "writing to unix domain socket: %m, disconnecting %p",
-               conn);
-        udscs_destroy_connection(connp);
-        return;
-    }
-
-    wbuf->pos += n;
-    if (wbuf->pos == wbuf->size) {
-        conn->write_buf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-    }
-}
-
-static gboolean udscs_io_channel_cb(GIOChannel *source,
-                                    GIOCondition condition,
-                                    gpointer data)
-{
-    struct udscs_connection *conn = data;
-
-    if (condition & G_IO_IN) {
-        udscs_do_read(&conn);
-        // coverity[check_after_deref] previous function can change conn
-        if (conn == NULL)
-            return G_SOURCE_REMOVE;
-        return G_SOURCE_CONTINUE;
-    }
-    if (condition & G_IO_OUT) {
-        udscs_do_write(&conn);
-        // coverity[check_after_deref] previous function can change conn
-        if (conn == NULL)
-            return G_SOURCE_REMOVE;
-        if (conn->write_buf)
-            return G_SOURCE_CONTINUE;
-        conn->write_watch_id = 0;
-        return G_SOURCE_REMOVE;
-    }
-
-    udscs_destroy_connection(&conn);
-    return G_SOURCE_REMOVE;
+    vdagent_connection_write(VDAGENT_CONNECTION(conn), buf, buf_size);
 }
 
-
 #ifndef UDSCS_NO_SERVER
 
 /* ---------- Server-side implementation ---------- */
 
 struct udscs_server {
-    int fd;
+    GSocketService *service;
+    GList *connections;
+
     int debug;
-    struct udscs_connection connections_head;
     udscs_connect_callback connect_callback;
     udscs_read_callback read_callback;
-    udscs_disconnect_callback disconnect_callback;
+    VDAgentConnErrorCb error_cb;
 };
 
-struct udscs_server *udscs_create_server_for_fd(int fd,
+static gboolean udscs_server_accept_cb(GSocketService    *service,
+                                       GSocketConnection *socket_conn,
+                                       GObject           *source_object,
+                                       gpointer           user_data);
+
+static struct udscs_server *udscs_server_new(
     udscs_connect_callback connect_callback,
     udscs_read_callback read_callback,
-    udscs_disconnect_callback disconnect_callback,
+    VDAgentConnErrorCb error_cb,
     int debug)
 {
     struct udscs_server *server;
 
-    if (fd <= 0) {
-        syslog(LOG_ERR, "Invalid file descriptor: %i", fd);
-        return NULL;
-    }
-
     server = g_new0(struct udscs_server, 1);
     server->debug = debug;
-    server->fd = fd;
     server->connect_callback = connect_callback;
     server->read_callback = read_callback;
-    server->disconnect_callback = disconnect_callback;
+    server->error_cb = error_cb;
+    server->service = g_socket_service_new();
+
+    g_signal_connect(server->service, "incoming",
+        G_CALLBACK(udscs_server_accept_cb), server);
 
     return server;
 }
 
-struct udscs_server *udscs_create_server(const char *socketname,
+struct udscs_server *udscs_create_server_for_fd(int fd,
     udscs_connect_callback connect_callback,
     udscs_read_callback read_callback,
-    udscs_disconnect_callback disconnect_callback,
+    VDAgentConnErrorCb error_cb,
     int debug)
 {
-    int c;
-    int fd;
-    struct sockaddr_un address;
     struct udscs_server *server;
+    GSocket *socket;
+    GError *err = NULL;
 
-    fd = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (fd == -1) {
-        syslog(LOG_ERR, "creating unix domain socket: %m");
-        return NULL;
-    }
+    server = udscs_server_new(connect_callback, read_callback,
+                              error_cb, debug);
 
-    address.sun_family = AF_UNIX;
-    snprintf(address.sun_path, sizeof(address.sun_path), "%s", socketname);
-    c = bind(fd, (struct sockaddr *)&address, sizeof(address));
-    if (c != 0) {
-        syslog(LOG_ERR, "bind %s: %m", socketname);
-        close(fd);
-        return NULL;
-    }
+    socket = g_socket_new_from_fd(fd, &err);
+    if (err)
+        goto error;
+    g_socket_listener_add_socket(G_SOCKET_LISTENER(server->service),
+                                 socket, NULL, &err);
+    g_object_unref(socket);
+    if (err)
+        goto error;
 
-    c = listen(fd, 5);
-    if (c != 0) {
-        syslog(LOG_ERR, "listen: %m");
-        close(fd);
-        return NULL;
-    }
-
-    server = udscs_create_server_for_fd(fd, connect_callback, read_callback,
-                                        disconnect_callback, debug);
+    return server;
+error:
+    syslog(LOG_ERR, "%s: %s", __func__, err->message);
+    g_error_free(err);
+    udscs_destroy_server(server);
+    return NULL;
+}
 
-    if (!server) {
-        close(fd);
+struct udscs_server *udscs_create_server(const char *socketname,
+    udscs_connect_callback connect_callback,
+    udscs_read_callback read_callback,
+    VDAgentConnErrorCb error_cb,
+    int debug)
+{
+    struct udscs_server *server;
+    GSocketAddress *socket_addr;
+    GError *err = NULL;
+
+    server = udscs_server_new(connect_callback, read_callback,
+                              error_cb, debug);
+
+    socket_addr = g_unix_socket_address_new(socketname);
+    g_socket_listener_add_address(G_SOCKET_LISTENER(server->service),
+                                  socket_addr,
+                                  G_SOCKET_TYPE_STREAM,
+                                  G_SOCKET_PROTOCOL_DEFAULT,
+                                  NULL, NULL, &err);
+    g_object_unref(socket_addr);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
+        udscs_destroy_server(server);
+        return NULL;
     }
 
     return server;
 }
 
-void udscs_destroy_server(struct udscs_server *server)
+void udscs_server_destroy_connection(struct udscs_server *server,
+                                     UdscsConnection     *conn)
 {
-    struct udscs_connection *conn, *next_conn;
+    server->connections = g_list_remove(server->connections, conn);
+    vdagent_connection_destroy(conn);
+}
 
+void udscs_destroy_server(struct udscs_server *server)
+{
     if (!server)
         return;
 
-    conn = server->connections_head.next;
-    while (conn) {
-        next_conn = conn->next;
-        udscs_destroy_connection(&conn);
-        conn = next_conn;
-    }
-    close(server->fd);
+    g_list_free_full(server->connections, vdagent_connection_destroy);
+    g_object_unref(server->service);
     g_free(server);
 }
 
-int udscs_get_peer_pid(struct udscs_connection *conn)
+static gboolean udscs_server_accept_cb(GSocketService    *service,
+                                       GSocketConnection *socket_conn,
+                                       GObject           *source_object,
+                                       gpointer           user_data)
 {
-    return (int)conn->peer_cred.pid;
-}
-
-static void udscs_server_accept(struct udscs_server *server) {
-    struct udscs_connection *new_conn, *conn;
-    struct sockaddr_un address;
-    socklen_t length = sizeof(address);
-    int r, fd;
-
-    fd = accept(server->fd, (struct sockaddr *)&address, &length);
-    if (fd == -1) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "accept: %m");
-        return;
-    }
+    struct udscs_server *server = user_data;
+    struct udscs_connection *new_conn;
 
-    new_conn = g_new0(struct udscs_connection, 1);
-    new_conn->fd = fd;
+    new_conn = g_object_new(UDSCS_TYPE_CONNECTION, NULL);
     new_conn->debug = server->debug;
     new_conn->read_callback = server->read_callback;
-    new_conn->disconnect_callback = server->disconnect_callback;
-
-    length = sizeof(new_conn->peer_cred);
-    r = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &new_conn->peer_cred, &length);
-    if (r != 0) {
-        syslog(LOG_ERR, "Could not get peercred, disconnecting new client");
-        close(fd);
-        g_free(new_conn);
-        return;
-    }
-
-    conn = &server->connections_head;
-    while (conn->next)
-        conn = conn->next;
+    g_object_ref(socket_conn);
+    vdagent_connection_setup(VDAGENT_CONNECTION(new_conn),
+                             G_IO_STREAM(socket_conn),
+                             FALSE,
+                             sizeof(struct udscs_message_header),
+                             server->error_cb);
 
-    new_conn->prev = conn;
-    conn->next = new_conn;
+    server->connections = g_list_prepend(server->connections, new_conn);
 
     if (server->debug)
-        syslog(LOG_DEBUG, "new client accepted: %p, pid: %d",
-               new_conn, udscs_get_peer_pid(new_conn));
+        syslog(LOG_DEBUG, "new client accepted: %p", new_conn);
 
     if (server->connect_callback)
         server->connect_callback(new_conn);
-}
 
-int udscs_server_fill_fds(struct udscs_server *server, fd_set *readfds,
-        fd_set *writefds)
-{
-    struct udscs_connection *conn;
-    int nfds;
-
-    if (!server)
-        return -1;
-
-    nfds = server->fd + 1;
-    FD_SET(server->fd, readfds);
-
-    conn = server->connections_head.next;
-    while (conn) {
-        FD_SET(conn->fd, readfds);
-        if (conn->write_buf)
-            FD_SET(conn->fd, writefds);
-
-        if (conn->fd >= nfds)
-            nfds = conn->fd + 1;
-
-        conn = conn->next;
-    }
-
-    return nfds;
-}
-
-void udscs_server_handle_fds(struct udscs_server *server, fd_set *readfds,
-        fd_set *writefds)
-{
-    struct udscs_connection *conn, *next_conn;
-
-    if (!server)
-        return;
-
-    if (FD_ISSET(server->fd, readfds))
-        udscs_server_accept(server);
-
-    conn = server->connections_head.next;
-    while (conn) {
-        /* conn may be destroyed by udscs_do_read() or udscs_do_write()
-         * (when disconnected), so get the next connection first. */
-        next_conn = conn->next;
-
-        if (FD_ISSET(conn->fd, readfds))
-            udscs_do_read(&conn);
-        if (conn && FD_ISSET(conn->fd, writefds))
-            udscs_do_write(&conn);
-
-        conn = next_conn;
-    }
+    return TRUE;
 }
 
 void udscs_server_write_all(struct udscs_server *server,
         uint32_t type, uint32_t arg1, uint32_t arg2,
         const uint8_t *data, uint32_t size)
 {
-    struct udscs_connection *conn;
-
-    conn = server->connections_head.next;
-    while (conn) {
-        udscs_write(conn, type, arg1, arg2, data, size);
-        conn = conn->next;
+    GList *l;
+    for (l = server->connections; l; l = l->next) {
+        udscs_write(UDSCS_CONNECTION(l->data), type, arg1, arg2, data, size);
     }
 }
 
@@ -577,17 +314,16 @@ int udscs_server_for_all_clients(struct udscs_server *server,
     udscs_for_all_clients_callback func, void *priv)
 {
     int r = 0;
-    struct udscs_connection *conn, *next_conn;
+    GList *l, *next;
 
     if (!server)
         return 0;
 
-    conn = server->connections_head.next;
-    while (conn) {
-        /* Get next conn as func may destroy the current conn */
-        next_conn = conn->next;
-        r += func(&conn, priv);
-        conn = next_conn;
+    l = server->connections;
+    while (l) {
+        next = l->next;
+        r += func(l->data, priv);
+        l = next;
     }
     return r;
 }
diff --git a/src/udscs.h b/src/udscs.h
index 363ca18..382da06 100644
--- a/src/udscs.h
+++ b/src/udscs.h
@@ -22,11 +22,28 @@
 #ifndef __UDSCS_H
 #define __UDSCS_H
 
-#include <stdio.h>
 #include <stdint.h>
-#include <sys/select.h>
 #include <sys/socket.h>
+#include <glib-object.h>
+#include "vdagent-connection.h"
 
+G_BEGIN_DECLS
+
+#define UDSCS_TYPE_CONNECTION            (udscs_connection_get_type())
+#define UDSCS_CONNECTION(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj), UDSCS_TYPE_CONNECTION, UdscsConnection))
+#define UDSCS_IS_CONNECTION(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj), UDSCS_TYPE_CONNECTION))
+#define UDSCS_CONNECTION_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass), UDSCS_TYPE_CONNECTION, UdscsConnectionClass))
+#define UDSCS_IS_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UDSCS_TYPE_CONNECTION))
+#define UDSCS_CONNECTION_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS((obj), UDSCS_TYPE_CONNECTION, UdscsConnectionClass))
+
+typedef struct udscs_connection     UdscsConnection;
+typedef struct UdscsConnectionClass UdscsConnectionClass;
+
+struct UdscsConnectionClass {
+    VDAgentConnectionClass parent_class;
+};
+
+GType udscs_connection_get_type(void);
 
 /* ---------- Generic bits and client-side API ---------- */
 
@@ -41,22 +58,10 @@ struct udscs_message_header {
 /* Callbacks with this type will be called when a complete message has been
  * received. The callback does not own the data buffer and should not free it.
  * The data buffer will be freed shortly after the read callback returns.
- * The callback may call udscs_destroy_connection, in which case *connp must be
- * made NULL (which udscs_destroy_connection takes care of).
  */
-typedef void (*udscs_read_callback)(struct udscs_connection **connp,
+typedef void (*udscs_read_callback)(struct udscs_connection *connp,
     struct udscs_message_header *header, uint8_t *data);
 
-/* Callbacks with this type will be called when the connection is disconnected.
- * Note:
- * 1) udscs will destroy the connection in question itself after
- *    this callback has completed!
- * 2) This callback is always called, even if the disconnect is initiated
- *    by the udscs user through returning -1 from a read callback, or
- *    by explicitly calling udscs_destroy_connection.
- */
-typedef void (*udscs_disconnect_callback)(struct udscs_connection *conn);
-
 /* Connect to the unix domain socket specified by socketname.
  * Only sockets bound to a pathname are supported.
  *
@@ -65,30 +70,14 @@ typedef void (*udscs_disconnect_callback)(struct udscs_connection *conn);
  */
 struct udscs_connection *udscs_connect(const char *socketname,
     udscs_read_callback read_callback,
-    udscs_disconnect_callback disconnect_callback,
+    VDAgentConnErrorCb error_cb,
     int debug);
 
-/* Close the connection, releases the corresponding resources and
- * sets *connp to NULL.
- *
- * Does nothing if *connp is NULL.
- */
-void udscs_destroy_connection(struct udscs_connection **connp);
-
 /* Queue a message for delivery to the client connected through conn.
  */
 void udscs_write(struct udscs_connection *conn, uint32_t type, uint32_t arg1,
         uint32_t arg2, const uint8_t *data, uint32_t size);
 
-/* Associates the specified user data with the connection. */
-void udscs_set_user_data(struct udscs_connection *conn, void *data);
-
-/* Return value: the connection's associated user data,
- * NULL if conn is NULL.
- */
-void *udscs_get_user_data(struct udscs_connection *conn);
-
-
 #ifndef UDSCS_NO_SERVER
 
 /* ---------- Server-side API ---------- */
@@ -108,7 +97,7 @@ typedef void (*udscs_connect_callback)(struct udscs_connection *conn);
 struct udscs_server *udscs_create_server_for_fd(int fd,
     udscs_connect_callback connect_callback,
     udscs_read_callback read_callback,
-    udscs_disconnect_callback disconnect_callback,
+    VDAgentConnErrorCb error_cb,
     int debug);
 
 /* Create the unix domain socket specified by socketname and
@@ -122,9 +111,12 @@ struct udscs_server *udscs_create_server_for_fd(int fd,
 struct udscs_server *udscs_create_server(const char *socketname,
     udscs_connect_callback connect_callback,
     udscs_read_callback read_callback,
-    udscs_disconnect_callback disconnect_callback,
+    VDAgentConnErrorCb error_cb,
     int debug);
 
+void udscs_server_destroy_connection(struct udscs_server *server,
+                                     UdscsConnection     *conn);
+
 /* Close all the server's connections and releases the corresponding
  * resources.
  * Does nothing if server is NULL.
@@ -141,7 +133,7 @@ void udscs_server_write_all(struct udscs_server *server,
 /* Callback type for udscs_server_for_all_clients. Clients can be disconnected
  * from this callback just like with a read callback.
  */
-typedef int (*udscs_for_all_clients_callback)(struct udscs_connection **connp,
+typedef int (*udscs_for_all_clients_callback)(struct udscs_connection *conn,
     void *priv);
 
 /* Call func for all clients connected to the server, passing through
@@ -151,22 +143,8 @@ typedef int (*udscs_for_all_clients_callback)(struct udscs_connection **connp,
 int udscs_server_for_all_clients(struct udscs_server *server,
     udscs_for_all_clients_callback func, void *priv);
 
-/* Given a udscs server, fill the fd_sets pointed to by readfds and
- * writefds for select() usage.
- * Return value: value of the highest fd + 1 or -1 if server is NULL
- */
-int udscs_server_fill_fds(struct udscs_server *server, fd_set *readfds,
-    fd_set *writefds);
-
-/* Handle any events flagged by select for the given udscs server.
- * Does nothing if server is NULL.
- */
-void udscs_server_handle_fds(struct udscs_server *server, fd_set *readfds,
-    fd_set *writefds);
-
-/* Returns the peer's PID. */
-int udscs_get_peer_pid(struct udscs_connection *conn);
-
 #endif
 
+G_END_DECLS
+
 #endif
diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
new file mode 100644
index 0000000..4ff5879
--- /dev/null
+++ b/src/vdagent-connection.c
@@ -0,0 +1,322 @@
+/*  vdagent-connection.c
+
+    Copyright 2019 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <syslog.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <glib/gstdio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixsocketaddress.h>
+
+#include "vdagent-connection.h"
+
+typedef struct {
+    GIOStream         *io_stream;
+    gboolean           opening;
+    VDAgentConnErrorCb error_cb;
+    GCancellable      *cancellable;
+
+    GQueue            *write_queue;
+    gsize              bytes_written;
+
+    gsize              header_size;
+    gpointer           header_buf;
+    gpointer           data_buf;
+} VDAgentConnectionPrivate;
+
+G_DEFINE_TYPE_WITH_PRIVATE(VDAgentConnection, vdagent_connection, G_TYPE_OBJECT);
+
+static void read_next_message(VDAgentConnection *self);
+
+GIOStream *vdagent_file_open(const gchar *path, GError **err)
+{
+    gint fd, errsv;
+
+    fd = g_open(path, O_RDWR);
+    if (fd == -1) {
+        errsv = errno;
+        g_set_error_literal(err, G_FILE_ERROR,
+                            g_file_error_from_errno(errsv),
+                            g_strerror(errsv));
+        return NULL;
+    }
+
+    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
+                                  g_unix_output_stream_new(fd, TRUE));
+}
+
+GIOStream *vdagent_socket_connect(const gchar *path, GError **err)
+{
+    GSocketConnection *conn;
+    GSocketAddress *addr;
+    GSocketClient *client;
+
+    addr = g_unix_socket_address_new(path);
+    client = g_object_new(G_TYPE_SOCKET_CLIENT,
+                          "family", G_SOCKET_FAMILY_UNIX,
+                          "type", G_SOCKET_TYPE_STREAM,
+                          NULL);
+    conn = g_socket_client_connect(client, G_SOCKET_CONNECTABLE(addr), NULL, err);
+    g_object_unref(client);
+    g_object_unref(addr);
+    return G_IO_STREAM(conn);
+}
+
+static void vdagent_connection_init(VDAgentConnection *self)
+{
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    priv->cancellable = g_cancellable_new();
+    priv->write_queue = g_queue_new();
+}
+
+static void vdagent_connection_dispose(GObject *obj)
+{
+    VDAgentConnection *self = VDAGENT_CONNECTION(obj);
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+
+    g_clear_object(&priv->cancellable);
+    g_clear_object(&priv->io_stream);
+
+    G_OBJECT_CLASS(vdagent_connection_parent_class)->dispose(obj);
+}
+
+static void vdagent_connection_finalize(GObject *obj)
+{
+    VDAgentConnection *self = VDAGENT_CONNECTION(obj);
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+
+    g_queue_free_full(priv->write_queue, (GDestroyNotify)g_bytes_unref);
+    g_free(priv->header_buf);
+    g_free(priv->data_buf);
+
+    G_OBJECT_CLASS(vdagent_connection_parent_class)->finalize(obj);
+}
+
+static void vdagent_connection_class_init(VDAgentConnectionClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    gobject_class->dispose  = vdagent_connection_dispose;
+    gobject_class->finalize = vdagent_connection_finalize;
+}
+
+void vdagent_connection_setup(VDAgentConnection *self,
+                              GIOStream         *io_stream,
+                              gboolean           wait_on_opening,
+                              gsize              header_size,
+                              VDAgentConnErrorCb error_cb)
+{
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    priv->io_stream = io_stream;
+    priv->opening = wait_on_opening;
+    priv->header_size = header_size;
+    priv->header_buf = g_malloc(header_size);
+    priv->error_cb = error_cb;
+
+    read_next_message(self);
+}
+
+void vdagent_connection_destroy(gpointer p)
+{
+    g_return_if_fail(VDAGENT_IS_CONNECTION(p));
+
+    VDAgentConnection *self = VDAGENT_CONNECTION(p);
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    g_cancellable_cancel(priv->cancellable);
+    g_io_stream_close(priv->io_stream, NULL, NULL);
+    g_object_unref(self);
+}
+
+gint vdagent_connection_get_peer_pid(VDAgentConnection *self,
+                                     GError           **err)
+{
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    GSocket *sock;
+    GCredentials *cred;
+    gint pid = -1;
+
+    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(priv->io_stream), pid);
+
+    sock = g_socket_connection_get_socket(G_SOCKET_CONNECTION(priv->io_stream));
+    cred = g_socket_get_credentials(sock, err);
+    if (cred) {
+        pid = g_credentials_get_unix_pid(cred, NULL);
+        g_object_unref(cred);
+    }
+
+    return pid;
+}
+
+/* Performs single write operation,
+ * returns TRUE if there's still data to be written, otherwise FALSE. */
+static gboolean do_write(VDAgentConnection *self, gboolean block)
+{
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    GOutputStream *out;
+    GBytes *msg;
+    gssize res;
+    GError *err = NULL;
+
+    msg = g_queue_peek_head(priv->write_queue);
+    out = g_io_stream_get_output_stream(priv->io_stream);
+
+    if (!msg) {
+        return FALSE;
+    }
+
+    res = g_pollable_stream_write(out,
+        g_bytes_get_data(msg, NULL) + priv->bytes_written,
+        g_bytes_get_size(msg) - priv->bytes_written,
+        block, priv->cancellable, &err);
+
+    if (err) {
+        if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+            g_error_free(err);
+            return TRUE;
+        } else if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+            g_error_free(err);
+            return FALSE;
+        } else {
+            priv->error_cb(self, err);
+            return FALSE;
+        }
+    }
+
+    priv->bytes_written += res;
+
+    if (priv->bytes_written == g_bytes_get_size(msg)) {
+        g_bytes_unref(g_queue_pop_head(priv->write_queue));
+        priv->bytes_written = 0;
+    }
+
+    return !g_queue_is_empty(priv->write_queue);
+}
+
+static gboolean out_stream_ready_cb(GObject *pollable_stream,
+                                    gpointer user_data)
+{
+    if (do_write(user_data, FALSE)) {
+        return TRUE;
+    }
+    g_object_unref(user_data);
+    return FALSE;
+}
+
+void vdagent_connection_write(VDAgentConnection *self,
+                              gpointer           data,
+                              gsize              size)
+{
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    GPollableOutputStream *out;
+    GSource *source;
+
+    g_queue_push_tail(priv->write_queue, g_bytes_new_take(data, size));
+
+    if (g_queue_get_length(priv->write_queue) == 1) {
+        out = G_POLLABLE_OUTPUT_STREAM(g_io_stream_get_output_stream(priv->io_stream));
+
+        source = g_pollable_output_stream_create_source(out, priv->cancellable);
+        g_source_set_callback(source, G_SOURCE_FUNC(out_stream_ready_cb),
+            g_object_ref(self), NULL);
+        g_source_attach(source, NULL);
+        g_source_unref(source);
+    }
+}
+
+void vdagent_connection_flush(VDAgentConnection *self)
+{
+    while (do_write(self, TRUE));
+}
+
+static void message_read_cb(GObject      *source_object,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+    VDAgentConnection *self = user_data;
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    GInputStream *in = G_INPUT_STREAM(source_object);
+    GError *err = NULL;
+    gsize bytes_read, data_size;
+
+    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
+    if (err) {
+        if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+            g_error_free(err);
+        } else {
+            priv->error_cb(self, err);
+        }
+        goto unref;
+    }
+
+    if (bytes_read == 0) {
+        /* see virtio-port.c for the rationale behind this */
+        if (priv->opening) {
+            g_usleep(10000);
+            read_next_message(self);
+        } else {
+            priv->error_cb(self, NULL);
+        }
+        goto unref;
+    }
+    priv->opening = FALSE;
+
+    if (!priv->data_buf) {
+        /* we've read the message header, now let's read its body */
+        data_size = VDAGENT_CONNECTION_GET_CLASS(self)->handle_header(
+            self, priv->header_buf);
+
+        if (g_cancellable_is_cancelled(priv->cancellable)) {
+            goto unref;
+        }
+
+        if (data_size > 0) {
+            priv->data_buf = g_malloc(data_size);
+            g_input_stream_read_all_async(in,
+                priv->data_buf, data_size,
+                G_PRIORITY_DEFAULT, priv->cancellable,
+                message_read_cb, g_object_ref(self));
+            goto unref;
+        }
+    }
+
+    VDAGENT_CONNECTION_GET_CLASS(self)->handle_message(
+        self, priv->header_buf, priv->data_buf);
+
+    g_clear_pointer(&priv->data_buf, g_free);
+    read_next_message(self);
+
+unref:
+    g_object_unref(self);
+}
+
+static void read_next_message(VDAgentConnection *self)
+{
+    VDAgentConnectionPrivate *priv = vdagent_connection_get_instance_private(self);
+    GInputStream *in;
+
+    if (g_cancellable_is_cancelled(priv->cancellable)) {
+        return;
+    }
+
+    in = g_io_stream_get_input_stream(priv->io_stream);
+
+    g_input_stream_read_all_async(in,
+        priv->header_buf, priv->header_size,
+        G_PRIORITY_DEFAULT, priv->cancellable,
+        message_read_cb, g_object_ref(self));
+}
diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
new file mode 100644
index 0000000..bd5edc4
--- /dev/null
+++ b/src/vdagent-connection.h
@@ -0,0 +1,102 @@
+/*  vdagent-connection.h
+
+    Copyright 2019 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __VDAGENT_CONNECTION_H
+#define __VDAGENT_CONNECTION_H
+
+#include <glib.h>
+#include <gio/gio.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define VDAGENT_TYPE_CONNECTION vdagent_connection_get_type()
+G_DECLARE_DERIVABLE_TYPE(VDAgentConnection, vdagent_connection, VDAGENT, CONNECTION, GObject);
+
+/* Sublasses of VDAgentConnection must implement
+ * handle_header and handle_message. */
+struct _VDAgentConnectionClass {
+    GObjectClass parent_class;
+
+    /* Called when a message header has been read.
+    *
+    * Handler must parse the @header_buf and
+    * return the size of message's body.
+    *
+    * @header_buf must not be freed. */
+    gsize (*handle_header) (VDAgentConnection *self,
+                            gpointer           header_buf);
+
+    /* Called when a full message has been read.
+    *
+    * @header, @data must not be freed. */
+    void (*handle_message) (VDAgentConnection *self,
+                            gpointer           header_buf,
+                            gpointer           data_buf);
+};
+
+/* Invoked when an error occurs during read or write.
+ *
+ * If @err is NULL, the connection was closed by the remote side,
+ * otherwise the handler must free @err using g_error_free().
+ *
+ * VDAgentConnection will not continue with the given I/O-op that failed. */
+typedef void (*VDAgentConnErrorCb)(VDAgentConnection *self, GError *err);
+
+/* Open a file in @path for read and write.
+ * Returns a new GIOStream to the given file or NULL when @err is set. */
+GIOStream *vdagent_file_open(const gchar *path, GError **err);
+
+/* Create a socket and initiate a new connection to the socket in @path.
+ * Returns a new GIOStream or NULL when @err is set. */
+GIOStream *vdagent_socket_connect(const gchar *path, GError **err);
+
+/* Set up @self to use @io_stream and start reading from it.
+ *
+ * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
+ * until the first message is successfully read or written to the @io_stream. */
+void vdagent_connection_setup(VDAgentConnection *self,
+                              GIOStream         *io_stream,
+                              gboolean           wait_on_opening,
+                              gsize              header_size,
+                              VDAgentConnErrorCb error_cb);
+
+
+/* Cancel running I/O-operations, close the underlying FD and
+ * unref the VDAgentConnection object. */
+void vdagent_connection_destroy(gpointer p);
+
+/* Append a message to the write queue.
+ *
+ * VDAgentConnection takes ownership of @data
+ * and frees it once the message is flushed. */
+void vdagent_connection_write(VDAgentConnection *self,
+                              gpointer           data,
+                              gsize              size);
+
+/* Synchronously write all queued messages to the output stream. */
+void vdagent_connection_flush(VDAgentConnection *self);
+
+/* Returns the PID of the foreign process connected to the socket
+ * or -1 with @err set. */
+gint vdagent_connection_get_peer_pid(VDAgentConnection *self,
+                                     GError           **err);
+
+G_END_DECLS
+
+#endif
diff --git a/src/vdagent/vdagent.c b/src/vdagent/vdagent.c
index 13ef29f..181c51a 100644
--- a/src/vdagent/vdagent.c
+++ b/src/vdagent/vdagent.c
@@ -165,10 +165,10 @@ static void vdagent_quit_loop(VDAgent *agent)
         g_main_loop_quit(agent->loop);
 }
 
-static void daemon_read_complete(struct udscs_connection **connp,
+static void daemon_read_complete(struct udscs_connection *conn,
     struct udscs_message_header *header, uint8_t *data)
 {
-    VDAgent *agent = udscs_get_user_data(*connp);
+    VDAgent *agent = g_object_get_data(G_OBJECT(conn), "agent");
 
     switch (header->type) {
     case VDAGENTD_MONITORS_CONFIG:
@@ -201,7 +201,7 @@ static void daemon_read_complete(struct udscs_connection **connp,
             vdagent_file_xfers_start(agent->xfers,
                                      (VDAgentFileXferStartMessage *)data);
         } else {
-            vdagent_file_xfers_error_disabled(*connp,
+            vdagent_file_xfers_error_disabled(conn,
                                               ((VDAgentFileXferStartMessage *)data)->id);
         }
         break;
@@ -210,7 +210,7 @@ static void daemon_read_complete(struct udscs_connection **connp,
             vdagent_file_xfers_status(agent->xfers,
                                       (VDAgentFileXferStatusMessage *)data);
         } else {
-            vdagent_file_xfers_error_disabled(*connp,
+            vdagent_file_xfers_error_disabled(conn,
                                               ((VDAgentFileXferStatusMessage *)data)->id);
         }
         break;
@@ -234,7 +234,7 @@ static void daemon_read_complete(struct udscs_connection **connp,
             vdagent_file_xfers_data(agent->xfers,
                                     (VDAgentFileXferDataMessage *)data);
         } else {
-            vdagent_file_xfers_error_disabled(*connp,
+            vdagent_file_xfers_error_disabled(conn,
                                               ((VDAgentFileXferDataMessage *)data)->id);
         }
         break;
@@ -253,10 +253,15 @@ static void daemon_read_complete(struct udscs_connection **connp,
     }
 }
 
-static void daemon_disconnect_cb(struct udscs_connection *conn)
+static void daemon_error_cb(VDAgentConnection *conn, GError *err)
 {
-    VDAgent *agent = udscs_get_user_data(conn);
-    agent->conn = NULL;
+    VDAgent *agent = g_object_get_data(G_OBJECT(conn), "agent");
+
+    if (err) {
+        syslog(LOG_ERR, "%s", err->message);
+        g_error_free(err);
+    }
+    g_clear_pointer(&agent->conn, vdagent_connection_destroy);
     vdagent_quit_loop(agent);
 }
 
@@ -355,7 +360,7 @@ static void vdagent_destroy(VDAgent *agent)
 {
     vdagent_finalize_file_xfer(agent);
     vdagent_x11_destroy(agent->x11, agent->conn == NULL);
-    udscs_destroy_connection(&agent->conn);
+    g_clear_pointer(&agent->conn, vdagent_connection_destroy);
 
     while (g_source_remove_by_user_data(agent))
         continue;
@@ -370,13 +375,13 @@ static gboolean vdagent_init_async_cb(gpointer user_data)
     VDAgent *agent = user_data;
 
     agent->conn = udscs_connect(vdagentd_socket,
-                                daemon_read_complete, daemon_disconnect_cb,
+                                daemon_read_complete, daemon_error_cb,
                                 debug);
     if (agent->conn == NULL) {
         g_timeout_add_seconds(1, vdagent_init_async_cb, agent);
         return G_SOURCE_REMOVE;
     }
-    udscs_set_user_data(agent->conn, agent);
+    g_object_set_data(G_OBJECT(agent->conn), "agent", agent);
 
     agent->x11 = vdagent_x11_create(agent->conn, debug, x11_sync);
     if (agent->x11 == NULL)
@@ -475,6 +480,9 @@ reconnect:
     vdagent_destroy(agent);
     agent = NULL;
 
+    /* allow the VDAgentConnection to finalize properly */
+    g_main_context_iteration(NULL, FALSE);
+
     if (!quit && do_daemonize)
         goto reconnect;
 
diff --git a/src/vdagentd/vdagentd.c b/src/vdagentd/vdagentd.c
index 72a3e13..689a83a 100644
--- a/src/vdagentd/vdagentd.c
+++ b/src/vdagentd/vdagentd.c
@@ -29,10 +29,9 @@
 #include <errno.h>
 #include <signal.h>
 #include <syslog.h>
-#include <sys/select.h>
 #include <sys/stat.h>
 #include <spice/vd_agent.h>
-#include <glib.h>
+#include <glib-unix.h>
 
 #ifdef WITH_SYSTEMD_SOCKET_ACTIVATION
 #include <systemd/sd-daemon.h>
@@ -79,11 +78,18 @@ static const char *active_session = NULL;
 static unsigned int session_count = 0;
 static struct udscs_connection *active_session_conn = NULL;
 static int agent_owns_clipboard[256] = { 0, };
-static int quit = 0;
 static int retval = 0;
 static int client_connected = 0;
 static int max_clipboard = -1;
 
+static GMainLoop *loop;
+
+static void vdagentd_quit(gint exit_code)
+{
+    retval = exit_code;
+    g_main_loop_quit(loop);
+}
+
 /* utility functions */
 static void virtio_msg_uint32_to_le(uint8_t *_msg, uint32_t size, uint32_t offset)
 {
@@ -155,9 +161,9 @@ void do_client_mouse(struct vdagentd_uinput **uinputp, VDAgentMouseState *mouse)
     vdagentd_uinput_do_mouse(uinputp, mouse);
     if (!*uinputp) {
         /* Try to re-open the tablet */
-        struct agent_data *agent_data =
-            udscs_get_user_data(active_session_conn);
-        if (agent_data)
+        if (active_session_conn) {
+            struct agent_data *agent_data =
+            g_object_get_data(G_OBJECT(active_session_conn), "agent_data");
             *uinputp = vdagentd_uinput_create(uinput_device,
                                               agent_data->width,
                                               agent_data->height,
@@ -165,10 +171,10 @@ void do_client_mouse(struct vdagentd_uinput **uinputp, VDAgentMouseState *mouse)
                                               agent_data->screen_count,
                                               debug > 1,
                                               uinput_fake);
+        }
         if (!*uinputp) {
             syslog(LOG_CRIT, "Fatal uinput error");
-            retval = 1;
-            quit = 1;
+            vdagentd_quit(1);
         }
     }
 }
@@ -517,14 +523,14 @@ static gboolean vdagent_message_check_size(const VDAgentMessage *message_header)
 
 static VDAgentGraphicsDeviceInfo *device_info = NULL;
 static size_t device_info_size = 0;
-static int virtio_port_read_complete(
+static void virtio_port_read_complete(
         struct vdagent_virtio_port *vport,
         int port_nr,
         VDAgentMessage *message_header,
         uint8_t *data)
 {
     if (!vdagent_message_check_size(message_header))
-        return 0;
+        return;
 
     switch (message_header->type) {
     case VD_AGENT_MOUSE_STATE:
@@ -586,8 +592,26 @@ static int virtio_port_read_complete(
     default:
         g_warn_if_reached();
     }
+}
 
-    return 0;
+static void virtio_port_error_cb(VDAgentConnection *conn, GError *err)
+{
+    gboolean old_client_connected = client_connected;
+    syslog(LOG_CRIT, "AIIEEE lost spice client connection, reconnecting (err: %s)",
+                     err ? err->message : "");
+    g_clear_error(&err);
+
+    vdagent_connection_destroy(virtio_port);
+    virtio_port = vdagent_virtio_port_create(portdev,
+                                             virtio_port_read_complete,
+                                             virtio_port_error_cb);
+    if (virtio_port == NULL) {
+        syslog(LOG_CRIT, "Fatal error opening vdagent virtio channel");
+        vdagentd_quit(1);
+        return;
+    }
+    do_client_disconnect();
+    client_connected = old_client_connected;
 }
 
 static void virtio_write_clipboard(uint8_t selection, uint32_t msg_type,
@@ -622,7 +646,7 @@ static void virtio_write_clipboard(uint8_t selection, uint32_t msg_type,
 }
 
 /* vdagentd <-> vdagent communication handling */
-static int do_agent_clipboard(struct udscs_connection *conn,
+static void do_agent_clipboard(struct udscs_connection *conn,
         struct udscs_message_header *header, uint8_t *data)
 {
     uint8_t selection = header->arg1;
@@ -668,7 +692,7 @@ static int do_agent_clipboard(struct udscs_connection *conn,
             syslog(LOG_WARNING, "clipboard is too large (%d > %d), discarding",
                    size, max_clipboard);
             virtio_write_clipboard(selection, msg_type, data_type, NULL, 0);
-            return 0;
+            return;
         }
         break;
     case VDAGENTD_CLIPBOARD_RELEASE:
@@ -684,12 +708,13 @@ static int do_agent_clipboard(struct udscs_connection *conn,
     if (size != header->size) {
         syslog(LOG_ERR,
                "unexpected extra data in clipboard msg, disconnecting agent");
-        return -1;
+        udscs_server_destroy_connection(server, conn);
+        return;
     }
 
     virtio_write_clipboard(selection, msg_type, data_type, data, header->size);
 
-    return 0;
+    return;
 
 error:
     if (header->type == VDAGENTD_CLIPBOARD_REQUEST) {
@@ -697,7 +722,6 @@ error:
         udscs_write(conn, VDAGENTD_CLIPBOARD_DATA,
                     selection, VD_AGENT_CLIPBOARD_NONE, NULL, 0);
     }
-    return 0;
 }
 
 /* When we open the vdagent virtio channel, the server automatically goes into
@@ -709,7 +733,9 @@ error:
    closes both. */
 static void check_xorg_resolution(void)
 {
-    struct agent_data *agent_data = udscs_get_user_data(active_session_conn);
+    struct agent_data *agent_data = NULL;
+    if (active_session_conn)
+        agent_data = g_object_get_data(G_OBJECT(active_session_conn), "agent_data");
 
     if (agent_data && agent_data->screen_info) {
         if (!uinput)
@@ -728,8 +754,7 @@ static void check_xorg_resolution(void)
                                         agent_data->screen_count);
         if (!uinput) {
             syslog(LOG_CRIT, "Fatal uinput error");
-            retval = 1;
-            quit = 1;
+            vdagentd_quit(1);
             return;
         }
 
@@ -737,11 +762,10 @@ static void check_xorg_resolution(void)
             syslog(LOG_INFO, "opening vdagent virtio channel");
             virtio_port = vdagent_virtio_port_create(portdev,
                                                      virtio_port_read_complete,
-                                                     NULL);
+                                                     virtio_port_error_cb);
             if (!virtio_port) {
                 syslog(LOG_CRIT, "Fatal error opening vdagent virtio channel");
-                retval = 1;
-                quit = 1;
+                vdagentd_quit(1);
                 return;
             }
             send_capabilities(virtio_port, 1);
@@ -751,18 +775,23 @@ static void check_xorg_resolution(void)
         vdagentd_uinput_destroy(&uinput);
 #endif
         if (virtio_port) {
-            vdagent_virtio_port_flush(&virtio_port);
-            vdagent_virtio_port_destroy(&virtio_port);
+            if (only_once) {
+                syslog(LOG_INFO, "Exiting after one client session.");
+                vdagentd_quit(0);
+                return;
+            }
+            vdagent_connection_flush(VDAGENT_CONNECTION(virtio_port));
+            g_clear_pointer(&virtio_port, vdagent_connection_destroy);
             syslog(LOG_INFO, "closed vdagent virtio channel");
         }
     }
 }
 
-static int connection_matches_active_session(struct udscs_connection **connp,
+static int connection_matches_active_session(struct udscs_connection *conn,
     void *priv)
 {
     struct udscs_connection **conn_ret = (struct udscs_connection **)priv;
-    struct agent_data *agent_data = udscs_get_user_data(*connp);
+    struct agent_data *agent_data = g_object_get_data(G_OBJECT(conn), "agent_data");
 
     /* Check if this connection matches the currently active session */
     if (!agent_data->session || !active_session)
@@ -770,7 +799,7 @@ static int connection_matches_active_session(struct udscs_connection **connp,
     if (strcmp(agent_data->session, active_session))
         return 0;
 
-    *conn_ret = *connp;
+    *conn_ret = conn;
     return 1;
 }
 
@@ -852,13 +881,24 @@ static void agent_connect(struct udscs_connection *conn)
 {
     struct agent_data *agent_data;
     agent_data = g_new0(struct agent_data, 1);
+    GError *err = NULL;
+    gint pid;
 
     if (session_info) {
-        uint32_t pid = udscs_get_peer_pid(conn);
+        pid = vdagent_connection_get_peer_pid(VDAGENT_CONNECTION(conn), &err);
+        if (err) {
+            syslog(LOG_ERR, "Could not get peer PID, disconnecting new client: %s",
+                            err->message);
+            g_error_free(err);
+            g_free(agent_data);
+            udscs_server_destroy_connection(server, conn);
+            return;
+        }
+
         agent_data->session = session_info_session_for_pid(session_info, pid);
     }
 
-    udscs_set_user_data(conn, (void *)agent_data);
+    g_object_set_data(G_OBJECT(conn), "agent_data", agent_data);
     udscs_write(conn, VDAGENTD_VERSION, 0, 0,
                 (uint8_t *)VERSION, strlen(VERSION) + 1);
     update_active_session_connection(conn);
@@ -869,24 +909,29 @@ static void agent_connect(struct udscs_connection *conn)
     }
 }
 
-static void agent_disconnect(struct udscs_connection *conn)
+static void agent_disconnect(VDAgentConnection *conn, GError *err)
 {
-    struct agent_data *agent_data = udscs_get_user_data(conn);
+    struct agent_data *agent_data = g_object_get_data(G_OBJECT(conn), "agent_data");
 
     g_hash_table_foreach_remove(active_xfers, remove_active_xfers, conn);
 
     g_clear_pointer(&agent_data->session, g_free);
-    update_active_session_connection(NULL);
-
     g_free(agent_data->screen_info);
     g_free(agent_data);
+    if (err) {
+        syslog(LOG_ERR, "%s", err->message);
+        g_error_free(err);
+    }
+    udscs_server_destroy_connection(server, UDSCS_CONNECTION(conn));
+
+    update_active_session_connection(NULL);
 }
 
-static void do_agent_xorg_resolution(struct udscs_connection    **connp,
+static void do_agent_xorg_resolution(struct udscs_connection     *conn,
                                      struct udscs_message_header *header,
                                      guint8                      *data)
 {
-    struct agent_data *agent_data = udscs_get_user_data(*connp);
+    struct agent_data *agent_data = g_object_get_data(G_OBJECT(conn), "agent_data");
     guint res_size = sizeof(struct vdagentd_guest_xorg_resolution);
     guint n = header->size / res_size;
 
@@ -902,7 +947,7 @@ static void do_agent_xorg_resolution(struct udscs_connection    **connp,
     if (header->size != n * res_size) {
         syslog(LOG_ERR, "guest xorg resolution message has wrong size, "
                         "disconnecting agent");
-        udscs_destroy_connection(connp);
+        udscs_server_destroy_connection(server, conn);
         return;
     }
 
@@ -915,7 +960,7 @@ static void do_agent_xorg_resolution(struct udscs_connection    **connp,
     check_xorg_resolution();
 }
 
-static void do_agent_file_xfer_status(struct udscs_connection    **connp,
+static void do_agent_file_xfer_status(struct udscs_connection     *conn,
                                       struct udscs_message_header *header,
                                       guint8                      *data)
 {
@@ -938,29 +983,26 @@ static void do_agent_file_xfer_status(struct udscs_connection    **connp,
                           data, data_size);
 
     if (header->arg2 == VD_AGENT_FILE_XFER_STATUS_CAN_SEND_DATA)
-        g_hash_table_insert(active_xfers, task_id, *connp);
+        g_hash_table_insert(active_xfers, task_id, conn);
     else
         g_hash_table_remove(active_xfers, task_id);
 }
 
-static void agent_read_complete(struct udscs_connection **connp,
+static void agent_read_complete(struct udscs_connection *conn,
     struct udscs_message_header *header, uint8_t *data)
 {
     switch (header->type) {
     case VDAGENTD_GUEST_XORG_RESOLUTION:
-        do_agent_xorg_resolution(connp, header, data);
+        do_agent_xorg_resolution(conn, header, data);
         break;
     case VDAGENTD_CLIPBOARD_GRAB:
     case VDAGENTD_CLIPBOARD_REQUEST:
     case VDAGENTD_CLIPBOARD_DATA:
     case VDAGENTD_CLIPBOARD_RELEASE:
-        if (do_agent_clipboard(*connp, header, data)) {
-            udscs_destroy_connection(connp);
-            return;
-        }
+        do_agent_clipboard(conn, header, data);
         break;
     case VDAGENTD_FILE_XFER_STATUS:
-        do_agent_file_xfer_status(connp, header, data);
+        do_agent_file_xfer_status(conn, header, data);
         break;
 
     default:
@@ -969,6 +1011,15 @@ static void agent_read_complete(struct udscs_connection **connp,
     }
 }
 
+static gboolean si_io_channel_cb(GIOChannel  *source,
+                                 GIOCondition condition,
+                                 gpointer     data)
+{
+    active_session = session_info_get_active_session(session_info);
+    update_active_session_connection(NULL);
+    return G_SOURCE_CONTINUE;
+}
+
 /* main */
 
 static void daemonize(void)
@@ -1010,76 +1061,10 @@ static void daemonize(void)
     }
 }
 
-static void main_loop(void)
-{
-    fd_set readfds, writefds;
-    int n, nfds;
-    int ck_fd = 0;
-    int once = 0;
-
-    while (!quit) {
-        FD_ZERO(&readfds);
-        FD_ZERO(&writefds);
-
-        nfds = udscs_server_fill_fds(server, &readfds, &writefds);
-        n = vdagent_virtio_port_fill_fds(virtio_port, &readfds, &writefds);
-        if (n >= nfds)
-            nfds = n + 1;
-
-        if (session_info) {
-            ck_fd = session_info_get_fd(session_info);
-            FD_SET(ck_fd, &readfds);
-            if (ck_fd >= nfds)
-                nfds = ck_fd + 1;
-        }
-
-        n = select(nfds, &readfds, &writefds, NULL, NULL);
-        if (n == -1) {
-            if (errno == EINTR)
-                continue;
-            syslog(LOG_CRIT, "Fatal error select: %m");
-            retval = 1;
-            break;
-        }
-
-        udscs_server_handle_fds(server, &readfds, &writefds);
-
-        if (virtio_port) {
-            once = 1;
-            vdagent_virtio_port_handle_fds(&virtio_port, &readfds, &writefds);
-            if (!virtio_port) {
-                int old_client_connected = client_connected;
-                syslog(LOG_CRIT,
-                       "AIIEEE lost spice client connection, reconnecting");
-                virtio_port = vdagent_virtio_port_create(portdev,
-                                                     virtio_port_read_complete,
-                                                     NULL);
-                if (!virtio_port) {
-                    syslog(LOG_CRIT,
-                           "Fatal error opening vdagent virtio channel");
-                    retval = 1;
-                    break;
-                }
-                do_client_disconnect();
-                client_connected = old_client_connected;
-            }
-        }
-        else if (only_once && once)
-        {
-            syslog(LOG_INFO, "Exiting after one client session.");
-            break;
-        }
-
-        if (session_info && FD_ISSET(ck_fd, &readfds)) {
-            active_session = session_info_get_active_session(session_info);
-            update_active_session_connection(NULL);
-        }
-    }
-}
-
-static void quit_handler(int sig)
+static gboolean signal_handler(gpointer user_data)
 {
-    quit = 1;
+    vdagentd_quit(0);
+    return G_SOURCE_REMOVE;
 }
 
 static gboolean parse_debug_level_cb(const gchar *option_name,
@@ -1133,8 +1118,9 @@ int main(int argc, char *argv[])
 {
     GOptionContext *context;
     GError *err = NULL;
-    struct sigaction act;
     gboolean own_socket = TRUE;
+    GIOChannel *si_io_channel = NULL;
+    guint si_watch_id = 0;
 
     context = g_option_context_new(NULL);
     g_option_context_add_main_entries(context, cmd_entries, NULL);
@@ -1159,13 +1145,9 @@ int main(int argc, char *argv[])
         uinput_device = g_strdup(DEFAULT_UINPUT_DEVICE);
     }
 
-    memset(&act, 0, sizeof(act));
-    act.sa_flags = SA_RESTART;
-    act.sa_handler = quit_handler;
-    sigaction(SIGINT, &act, NULL);
-    sigaction(SIGHUP, &act, NULL);
-    sigaction(SIGTERM, &act, NULL);
-    sigaction(SIGQUIT, &act, NULL);
+    g_unix_signal_add(SIGINT, signal_handler, NULL);
+    g_unix_signal_add(SIGHUP, signal_handler, NULL);
+    g_unix_signal_add(SIGTERM, signal_handler, NULL);
 
     openlog("spice-vdagentd", do_daemonize ? 0 : LOG_PERROR, LOG_USER);
 
@@ -1197,7 +1179,7 @@ int main(int argc, char *argv[])
             syslog(LOG_CRIT, "Fatal the server socket %s exists already. Delete it?",
                    vdagentd_socket);
         } else {
-            syslog(LOG_CRIT, "Fatal could not create the server socket %s: %m",
+            syslog(LOG_CRIT, "Fatal could not create the server socket %s",
                    vdagentd_socket);
         }
         return 1;
@@ -1227,19 +1209,35 @@ int main(int argc, char *argv[])
 
     if (want_session_info)
         session_info = session_info_create(debug);
-    if (!session_info)
+    if (session_info) {
+        si_io_channel = g_io_channel_unix_new(session_info_get_fd(session_info));
+        si_watch_id = g_io_add_watch(si_io_channel, G_IO_IN, si_io_channel_cb, NULL);
+        g_io_channel_unref(si_io_channel);
+    } else
         syslog(LOG_WARNING, "no session info, max 1 session agent allowed");
 
     active_xfers = g_hash_table_new(g_direct_hash, g_direct_equal);
-    main_loop();
+
+    loop = g_main_loop_new(NULL, FALSE);
+    g_main_loop_run(loop);
 
     release_clipboards();
 
     vdagentd_uinput_destroy(&uinput);
-    vdagent_virtio_port_flush(&virtio_port);
-    vdagent_virtio_port_destroy(&virtio_port);
-    session_info_destroy(session_info);
-    udscs_destroy_server(server);
+    if (si_watch_id > 0) {
+        g_source_remove(si_watch_id);
+    }
+    g_clear_pointer(&session_info, session_info_destroy);
+    g_clear_pointer(&server, udscs_destroy_server);
+    if (virtio_port) {
+        vdagent_connection_flush(VDAGENT_CONNECTION(virtio_port));
+        g_clear_pointer(&virtio_port, vdagent_connection_destroy);
+    }
+
+    /* allow the VDAgentConnection(s) to finalize properly */
+    g_main_context_iteration(NULL, FALSE);
+
+    g_main_loop_unref(loop);
 
     /* leave the socket around if it was provided by systemd */
     if (own_socket) {
diff --git a/src/vdagentd/virtio-port.c b/src/vdagentd/virtio-port.c
index b0556ce..ad9d27b 100644
--- a/src/vdagentd/virtio-port.c
+++ b/src/vdagentd/virtio-port.c
@@ -20,27 +20,20 @@
 */
 #include <config.h>
 
-#include <errno.h>
 #include <stdlib.h>
 #include <string.h>
 #include <syslog.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
+#include <gio/gio.h>
+#include <glib-unix.h>
 
+#include "vdagent-connection.h"
 #include "virtio-port.h"
 
 
 struct vdagent_virtio_port_buf {
     uint8_t *buf;
-    size_t pos;
     size_t size;
     size_t write_pos;
-
-    struct vdagent_virtio_port_buf *next;
 };
 
 /* Data to keep track of the assembling of vdagent messages per chunk port,
@@ -53,141 +46,131 @@ struct vdagent_virtio_port_chunk_port_data {
 };
 
 struct vdagent_virtio_port {
-    int fd;
-    int opening;
-    int is_uds;
-
-    /* Chunk read stuff, single buffer, separate header and data buffer */
-    int chunk_header_read;
-    int chunk_data_pos;
-    VDIChunkHeader chunk_header;
-    uint8_t chunk_data[VD_AGENT_MAX_DATA_SIZE];
+    VDAgentConnection parent_instance;
 
     /* Per chunk port data */
     struct vdagent_virtio_port_chunk_port_data port_data[VDP_END_PORT];
 
-    /* Writes are stored in a linked list of buffers, with both the header
-       + data for a single message in 1 buffer. */
-    struct vdagent_virtio_port_buf *write_buf;
+    struct vdagent_virtio_port_buf write_buf;
 
     /* Callbacks */
     vdagent_virtio_port_read_callback read_callback;
-    vdagent_virtio_port_disconnect_callback disconnect_callback;
+    VDAgentConnErrorCb error_cb;
 };
 
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp);
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp);
+G_DEFINE_TYPE(VirtioPort, virtio_port, VDAGENT_TYPE_CONNECTION);
 
-struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
-    vdagent_virtio_port_read_callback read_callback,
-    vdagent_virtio_port_disconnect_callback disconnect_callback)
-{
-    struct vdagent_virtio_port *vport;
-    struct sockaddr_un address;
-    int c;
+static void vdagent_virtio_port_do_chunk(VDAgentConnection *conn,
+                                         gpointer header_data,
+                                         gpointer chunk_data);
 
-    vport = g_new0(struct vdagent_virtio_port, 1);
-
-    vport->fd = open(portname, O_RDWR);
-    if (vport->fd == -1) {
-        vport->fd = socket(PF_UNIX, SOCK_STREAM, 0);
-        if (vport->fd == -1) {
-            goto error;
-        }
-        address.sun_family = AF_UNIX;
-        snprintf(address.sun_path, sizeof(address.sun_path), "%s", portname);
-        c = connect(vport->fd, (struct sockaddr *)&address, sizeof(address));
-        if (c == 0) {
-            vport->is_uds = 1;
-        } else {
-            goto error;
-        }
-    } else {
-        vport->is_uds = 0;
+static gsize conn_handle_header(VDAgentConnection *conn,
+                                gpointer header_buf)
+{
+    VirtioPort *self = VIRTIO_PORT(conn);
+    VDIChunkHeader *header = header_buf;
+    GError *err;
+
+    header->size = GUINT32_FROM_LE(header->size);
+    header->port = GUINT32_FROM_LE(header->port);
+
+    if (header->size > VD_AGENT_MAX_DATA_SIZE) {
+        err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+                          "chunk size %u too large", header->size);
+        self->error_cb(conn, err);
+        return 0;
     }
-    vport->opening = 1;
-
-    vport->read_callback = read_callback;
-    vport->disconnect_callback = disconnect_callback;
-
-    return vport;
-
-error:
-    syslog(LOG_ERR, "open %s: %m", portname);
-    if (vport->fd != -1) {
-        close(vport->fd);
+    if (header->port >= VDP_END_PORT) {
+        err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+                          "chunk port %u out of range", header->port);
+        self->error_cb(conn, err);
+        return 0;
     }
-    g_free(vport);
-    return NULL;
+
+    return header->size;
 }
 
-void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp)
+static void virtio_port_init(VirtioPort *self)
 {
-    struct vdagent_virtio_port_buf *wbuf, *next_wbuf;
-    struct vdagent_virtio_port *vport = *vportp;
-    int i;
-
-    if (!vport)
-        return;
+}
 
-    if (vport->disconnect_callback)
-        vport->disconnect_callback(vport);
+static void virtio_port_finalize(GObject *obj)
+{
+    VirtioPort *self = VIRTIO_PORT(obj);
+    guint i;
 
-    wbuf = vport->write_buf;
-    while (wbuf) {
-        next_wbuf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-        wbuf = next_wbuf;
-    }
+    g_free(self->write_buf.buf);
 
     for (i = 0; i < VDP_END_PORT; i++) {
-        g_free(vport->port_data[i].message_data);
+        g_free(self->port_data[i].message_data);
     }
 
-    close(vport->fd);
-    g_clear_pointer(vportp, g_free);
+    G_OBJECT_CLASS(virtio_port_parent_class)->finalize(obj);
 }
 
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
-        fd_set *readfds, fd_set *writefds)
+static void virtio_port_class_init(VirtioPortClass *klass)
 {
-    if (!vport)
-        return -1;
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    VDAgentConnectionClass *conn_class = VDAGENT_CONNECTION_CLASS(klass);
 
-    FD_SET(vport->fd, readfds);
-    if (vport->write_buf)
-        FD_SET(vport->fd, writefds);
+    gobject_class->finalize  = virtio_port_finalize;
 
-    return vport->fd + 1;
+    conn_class->handle_header = conn_handle_header;
+    conn_class->handle_message = vdagent_virtio_port_do_chunk;
 }
 
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
-        fd_set *readfds, fd_set *writefds)
-{
-    if (!*vportp)
-        return;
-
-    if (FD_ISSET((*vportp)->fd, readfds))
-        vdagent_virtio_port_do_read(vportp);
-
-    if (*vportp && FD_ISSET((*vportp)->fd, writefds))
-        vdagent_virtio_port_do_write(vportp);
-}
-
-static struct vdagent_virtio_port_buf* vdagent_virtio_port_get_last_wbuf(
-    struct vdagent_virtio_port *vport)
+struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
+    vdagent_virtio_port_read_callback read_callback,
+    VDAgentConnErrorCb error_cb)
 {
-    struct vdagent_virtio_port_buf *wbuf;
+    struct vdagent_virtio_port *vport;
+    GIOStream *io_stream;
+    GError *err = NULL;
+
+    io_stream = vdagent_file_open(portname, &err);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_clear_error(&err);
+        io_stream = vdagent_socket_connect(portname, &err);
+        if (err) {
+            syslog(LOG_ERR, "%s: %s", __func__, err->message);
+            g_error_free(err);
+            return NULL;
+        }
+    }
 
-    wbuf = vport->write_buf;
-    if (!wbuf)
-        return NULL;
+    vport = g_object_new(VIRTIO_TYPE_PORT, NULL);
+
+    /* When calling vdagent_connection_new(),
+     * @wait_on_opening MUST be set to TRUE:
+     *
+     * When we open the virtio serial port, the following happens:
+     * 1) The linux kernel virtio_console driver sends a
+     *    VIRTIO_CONSOLE_PORT_OPEN message to qemu
+     * 2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
+     *    register the agent chardev with the spice-server
+     * 3) spice-server then calls the spicevmc chardev driver's state
+     *    callback to let it know it is ready to receive data
+     * 4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
+     *    chardev backend
+     * 5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
+     *    to the linux kernel virtio_console driver
+     *
+     * Until steps 1 - 5 have completed the linux kernel virtio_console
+     * driver sees the virtio serial port as being in a disconnected state
+     * and read will return 0 ! So if we blindly assume that a read 0 means
+     * that the channel is closed we will hit a race here.
+     */
+    vdagent_connection_setup(VDAGENT_CONNECTION(vport),
+                             io_stream,
+                             TRUE,
+                             sizeof(VDIChunkHeader),
+                             error_cb);
 
-    while (wbuf->next)
-        wbuf = wbuf->next;
+    vport->read_callback = read_callback;
+    vport->error_cb = error_cb;
 
-    return wbuf;
+    return vport;
 }
 
 void vdagent_virtio_port_write_start(
@@ -197,15 +180,15 @@ void vdagent_virtio_port_write_start(
         uint32_t message_opaque,
         uint32_t data_size)
 {
-    struct vdagent_virtio_port_buf *wbuf, *new_wbuf;
+    struct vdagent_virtio_port_buf *new_wbuf;
     VDIChunkHeader *chunk_header;
     VDAgentMessage *message_header;
 
-    new_wbuf = g_new(struct vdagent_virtio_port_buf, 1);
-    new_wbuf->pos = 0;
+    g_return_if_fail(vport->write_buf.buf == NULL);
+
+    new_wbuf = &vport->write_buf;
     new_wbuf->write_pos = 0;
     new_wbuf->size = sizeof(*chunk_header) + sizeof(*message_header) + data_size;
-    new_wbuf->next = NULL;
     new_wbuf->buf = g_malloc(new_wbuf->size);
 
     chunk_header = (VDIChunkHeader *) (new_wbuf->buf + new_wbuf->write_pos);
@@ -219,14 +202,6 @@ void vdagent_virtio_port_write_start(
     message_header->opaque = GUINT64_TO_LE(message_opaque);
     message_header->size = GUINT32_TO_LE(data_size);
     new_wbuf->write_pos += sizeof(*message_header);
-
-    if (!vport->write_buf) {
-        vport->write_buf = new_wbuf;
-        return;
-    }
-
-    wbuf = vdagent_virtio_port_get_last_wbuf(vport);
-    wbuf->next = new_wbuf;
 }
 
 int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
@@ -234,8 +209,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
 {
     struct vdagent_virtio_port_buf *wbuf;
 
-    wbuf = vdagent_virtio_port_get_last_wbuf(vport);
-    if (!wbuf) {
+    if (size == 0)
+        return 0;
+
+    wbuf = &vport->write_buf;
+    if (!wbuf->buf) {
         syslog(LOG_ERR, "can't append without a buffer");
         return -1;
     }
@@ -247,6 +225,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
 
     memcpy(wbuf->buf + wbuf->write_pos, data, size);
     wbuf->write_pos += size;
+
+    if (wbuf->write_pos == wbuf->size) {
+        vdagent_connection_write(VDAGENT_CONNECTION(vport), wbuf->buf, wbuf->size);
+        wbuf->buf = NULL;
+    }
     return 0;
 }
 
@@ -263,12 +246,6 @@ void vdagent_virtio_port_write(
     vdagent_virtio_port_write_append(vport, data, data_size);
 }
 
-void vdagent_virtio_port_flush(struct vdagent_virtio_port **vportp)
-{
-    while (*vportp && (*vportp)->write_buf)
-        vdagent_virtio_port_do_write(vportp);
-}
-
 void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
 {
     if (port >= VDP_END_PORT) {
@@ -279,20 +256,23 @@ void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
     memset(&vport->port_data[port], 0, sizeof(vport->port_data[0]));
 }
 
-static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
+static void vdagent_virtio_port_do_chunk(VDAgentConnection *conn,
+                                         gpointer header_data,
+                                         gpointer chunk_data)
 {
     int avail, read, pos = 0;
-    struct vdagent_virtio_port *vport = *vportp;
+    struct vdagent_virtio_port *vport = VIRTIO_PORT(conn);
+    VDIChunkHeader *chunk_header = header_data;
     struct vdagent_virtio_port_chunk_port_data *port =
-        &vport->port_data[vport->chunk_header.port];
+        &vport->port_data[chunk_header->port];
 
     if (port->message_header_read < sizeof(port->message_header)) {
         read = sizeof(port->message_header) - port->message_header_read;
-        if (read > vport->chunk_header.size) {
-            read = vport->chunk_header.size;
+        if (read > chunk_header->size) {
+            read = chunk_header->size;
         }
         memcpy((uint8_t *)&port->message_header + port->message_header_read,
-               vport->chunk_data, read);
+               chunk_data, read);
         port->message_header_read += read;
         if (port->message_header_read == sizeof(port->message_header)) {
 
@@ -310,11 +290,12 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
 
     if (port->message_header_read == sizeof(port->message_header)) {
         read  = port->message_header.size - port->message_data_pos;
-        avail = vport->chunk_header.size - pos;
+        avail = chunk_header->size - pos;
 
         if (avail > read) {
-            syslog(LOG_ERR, "chunk larger than message, lost sync?");
-            vdagent_virtio_port_destroy(vportp);
+            GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+                                      "chunk larger than message, lost sync?");
+            vport->error_cb(VDAGENT_CONNECTION(vport), err);
             return;
         }
 
@@ -323,18 +304,14 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
 
         if (read) {
             memcpy(port->message_data + port->message_data_pos,
-                   vport->chunk_data + pos, read);
+                   chunk_data + pos, read);
             port->message_data_pos += read;
         }
 
         if (port->message_data_pos == port->message_header.size) {
             if (vport->read_callback) {
-                int r = vport->read_callback(vport, vport->chunk_header.port,
-                                 &port->message_header, port->message_data);
-                if (r == -1) {
-                    vdagent_virtio_port_destroy(vportp);
-                    return;
-                }
+                vport->read_callback(vport, chunk_header->port,
+                                     &port->message_header, port->message_data);
             }
             port->message_header_read = 0;
             port->message_data_pos = 0;
@@ -342,140 +319,3 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
         }
     }
 }
-
-static int vport_read(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
-    if (vport->is_uds) {
-        return recv(vport->fd, buf, len, 0);
-    } else {
-        return read(vport->fd, buf, len);
-    }
-}
-
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp)
-{
-    ssize_t n;
-    size_t to_read;
-    uint8_t *dest;
-    struct vdagent_virtio_port *vport = *vportp;
-
-    if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
-        to_read = sizeof(vport->chunk_header) - vport->chunk_header_read;
-        dest = (uint8_t *)&vport->chunk_header + vport->chunk_header_read;
-    } else {
-        to_read = vport->chunk_header.size - vport->chunk_data_pos;
-        dest = vport->chunk_data + vport->chunk_data_pos;
-    }
-
-    n = vport_read(vport, dest, to_read);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "reading from vdagent virtio port: %m");
-    }
-    if (n == 0 && vport->opening) {
-        /* When we open the virtio serial port, the following happens:
-           1) The linux kernel virtio_console driver sends a
-              VIRTIO_CONSOLE_PORT_OPEN message to qemu
-           2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
-              register the agent chardev with the spice-server
-           3) spice-server then calls the spicevmc chardev driver's state
-              callback to let it know it is ready to receive data
-           4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
-              chardev backend
-           5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
-              to the linux kernel virtio_console driver
-
-           Until steps 1 - 5 have completed the linux kernel virtio_console
-           driver sees the virtio serial port as being in a disconnected state
-           and read will return 0 ! So if we blindly assume that a read 0 means
-           that the channel is closed we will hit a race here.
-
-           Therefore we ignore read returning 0 until we've successfully read
-           or written some data. If we hit this race we also sleep a bit here
-           to avoid busy waiting until the above steps complete */
-        usleep(10000);
-        return;
-    }
-    if (n <= 0) {
-        vdagent_virtio_port_destroy(vportp);
-        return;
-    }
-    vport->opening = 0;
-
-    if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
-        vport->chunk_header_read += n;
-        if (vport->chunk_header_read == sizeof(vport->chunk_header)) {
-            vport->chunk_header.size = GUINT32_FROM_LE(vport->chunk_header.size);
-            vport->chunk_header.port = GUINT32_FROM_LE(vport->chunk_header.port);
-            if (vport->chunk_header.size > VD_AGENT_MAX_DATA_SIZE) {
-                syslog(LOG_ERR, "chunk size %u too large",
-                       vport->chunk_header.size);
-                vdagent_virtio_port_destroy(vportp);
-                return;
-            }
-            if (vport->chunk_header.port >= VDP_END_PORT) {
-                syslog(LOG_ERR, "chunk port %u out of range",
-                       vport->chunk_header.port);
-                vdagent_virtio_port_destroy(vportp);
-                return;
-            }
-        }
-    } else {
-        vport->chunk_data_pos += n;
-        if (vport->chunk_data_pos == vport->chunk_header.size) {
-            vdagent_virtio_port_do_chunk(vportp);
-            // coverity[check_after_deref] previous function can change *vportd
-            if (!*vportp)
-                return;
-            vport->chunk_header_read = 0;
-            vport->chunk_data_pos = 0;
-        }
-    }
-}
-
-static int vport_write(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
-    if (vport->is_uds) {
-        return send(vport->fd, buf, len, 0);
-    } else {
-        return write(vport->fd, buf, len);
-    }
-}
-
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp)
-{
-    ssize_t n;
-    size_t to_write;
-    struct vdagent_virtio_port *vport = *vportp;
-
-    struct vdagent_virtio_port_buf* wbuf = vport->write_buf;
-    if (!wbuf) {
-        syslog(LOG_ERR, "do_write called on a port without a write buf ?!");
-        return;
-    }
-
-    if (wbuf->write_pos != wbuf->size) {
-        syslog(LOG_ERR, "do_write: buffer is incomplete!!");
-        return;
-    }
-
-    to_write = wbuf->size - wbuf->pos;
-    n = vport_write(vport, wbuf->buf + wbuf->pos, to_write);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "writing to vdagent virtio port: %m");
-        vdagent_virtio_port_destroy(vportp);
-        return;
-    }
-    if (n > 0)
-        vport->opening = 0;
-
-    wbuf->pos += n;
-    if (wbuf->pos == wbuf->size) {
-        vport->write_buf = wbuf->next;
-        g_free(wbuf->buf);
-        g_free(wbuf);
-    }
-}
diff --git a/src/vdagentd/virtio-port.h b/src/vdagentd/virtio-port.h
index dffb410..32cf8b7 100644
--- a/src/vdagentd/virtio-port.h
+++ b/src/vdagentd/virtio-port.h
@@ -22,58 +22,43 @@
 #ifndef __VIRTIO_PORT_H
 #define __VIRTIO_PORT_H
 
-#include <stdio.h>
 #include <stdint.h>
-#include <sys/select.h>
 #include <spice/vd_agent.h>
+#include <glib-object.h>
+#include "vdagent-connection.h"
+
+G_BEGIN_DECLS
+
+#define VIRTIO_TYPE_PORT            (virtio_port_get_type())
+#define VIRTIO_PORT(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj), VIRTIO_TYPE_PORT, VirtioPort))
+#define VIRTIO_IS_PORT(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj), VIRTIO_TYPE_PORT))
+#define VIRTIO_PORT_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass), VIRTIO_TYPE_PORT, VirtioPortClass))
+#define VIRTIO_IS_PORT_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), VIRTIO_TYPE_PORT))
+#define VIRTIO_PORT_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS((obj), VIRTIO_TYPE_PORT, VirtioPortClass))
+
+typedef struct vdagent_virtio_port VirtioPort;
+typedef struct VirtioPortClass VirtioPortClass;
+
+struct VirtioPortClass {
+    VDAgentConnectionClass parent_class;
+};
+
+GType virtio_port_get_type(void);
 
 struct vdagent_virtio_port;
 
 /* Callbacks with this type will be called when a complete message has been
-   received. Sometimes the callback may want to close the port, in this
-   case do *not* call vdagent_virtio_port_destroy from the callback. The desire
-   to close the port can be indicated be returning -1 from the callback,
-   in other cases return 0. */
-typedef int (*vdagent_virtio_port_read_callback)(
+   received. */
+typedef void (*vdagent_virtio_port_read_callback)(
     struct vdagent_virtio_port *vport,
     int port_nr,
     VDAgentMessage *message_header,
     uint8_t *data);
 
-/* Callbacks with this type will be called when the port is disconnected.
-   Note:
-   1) vdagent_virtio_port will destroy the port in question itself after
-      this callback has completed!
-   2) This callback is always called, even if the disconnect is initiated
-      by the vdagent_virtio_port user through returning -1 from a read
-      callback, or by explicitly calling vdagent_virtio_port_destroy */
-typedef void (*vdagent_virtio_port_disconnect_callback)(
-    struct vdagent_virtio_port *conn);
-
-
 /* Create a vdagent virtio port object for port portname */
 struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
     vdagent_virtio_port_read_callback read_callback,
-    vdagent_virtio_port_disconnect_callback disconnect_callback);
-
-/* The contents of portp will be made NULL */
-void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp);
-
-
-/* Given a vdagent_virtio_port fill the fd_sets pointed to by readfds and
-   writefds for select() usage.
-
-   Return value: value of the highest fd + 1 */
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
-        fd_set *readfds, fd_set *writefds);
-
-/* Handle any events flagged by select for the given vdagent_virtio_port.
-   Note the port may be destroyed (when disconnected) by this call
-   in this case the disconnect calllback will get called before the
-   destruction and the contents of connp will be made NULL */
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
-        fd_set *readfds, fd_set *writefds);
-
+    VDAgentConnErrorCb error_cb);
 
 /* Queue a message for delivery, either bit by bit, or all at once */
 void vdagent_virtio_port_write_start(
@@ -96,7 +81,8 @@ void vdagent_virtio_port_write(
         const uint8_t *data,
         uint32_t data_size);
 
-void vdagent_virtio_port_flush(struct vdagent_virtio_port **vportp);
 void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port);
 
+G_END_DECLS
+
 #endif
-- 
2.20.1



More information about the Spice-devel mailing list