[Spice-devel] [RFC spice-vdagent 10/18] vport: use VDAgentConnection

Jakub Janků jjanku at redhat.com
Tue Aug 14 18:53:44 UTC 2018


Rewrite virtio-port.c using VDAgentConnection to integrate it
into GMainLoop and simplify the code.

virtio_port_destroy() does NOT close the underlying FD immediately.
GSources attached to GMainContext can be processed during
vdagent_virtio_port_flush() call.
Apart from that, the behavior stays the same.

Drop support for select(), remove
udscs_server_fill_fds(), udscs_server_handle_fds().
---
 src/vdagentd/virtio-port.c | 369 +++++++++++--------------------------
 src/vdagentd/virtio-port.h |  17 --
 2 files changed, 110 insertions(+), 276 deletions(-)

diff --git a/src/vdagentd/virtio-port.c b/src/vdagentd/virtio-port.c
index 642c848..9731086 100644
--- a/src/vdagentd/virtio-port.c
+++ b/src/vdagentd/virtio-port.c
@@ -19,27 +19,19 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
-#include <errno.h>
 #include <stdlib.h>
 #include <string.h>
 #include <syslog.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
+#include <glib-unix.h>
 
+#include "vdagent-connection.h"
 #include "virtio-port.h"
 
 
 struct vdagent_virtio_port_buf {
     uint8_t *buf;
-    size_t pos;
     size_t size;
     size_t write_pos;
-
-    struct vdagent_virtio_port_buf *next;
 };
 
 /* Data to keep track of the assembling of vdagent messages per chunk port,
@@ -52,21 +44,11 @@ struct vdagent_virtio_port_chunk_port_data {
 };
 
 struct vdagent_virtio_port {
-    int fd;
-    int opening;
-    int is_uds;
-
-    /* Chunk read stuff, single buffer, separate header and data buffer */
-    int chunk_header_read;
-    int chunk_data_pos;
-    VDIChunkHeader chunk_header;
-    uint8_t chunk_data[VD_AGENT_MAX_DATA_SIZE];
+    VDAgentConnection *conn;
 
     /* Per chunk port data */
     struct vdagent_virtio_port_chunk_port_data port_data[VDP_END_PORT];
 
-    /* Writes are stored in a linked list of buffers, with both the header
-       + data for a single message in 1 buffer. */
     struct vdagent_virtio_port_buf *write_buf;
 
     /* Callbacks */
@@ -74,58 +56,106 @@ struct vdagent_virtio_port {
     vdagent_virtio_port_disconnect_callback disconnect_callback;
 };
 
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp);
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp);
+static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp,
+                                         VDIChunkHeader *chunk_header,
+                                         const guchar *chunk_data);
+
+static void virtio_port_destroy(struct vdagent_virtio_port **vportp, int by_user);
+
+static gboolean conn_header_read_cb(gpointer header_buff,
+                                    gsize   *body_size,
+                                    gpointer user_data)
+{
+    struct vdagent_virtio_port *vport = user_data;
+    VDIChunkHeader *header = header_buff;
+
+    header->size = GUINT32_FROM_LE(header->size);
+    header->port = GUINT32_FROM_LE(header->port);
+
+    if (header->size > VD_AGENT_MAX_DATA_SIZE) {
+        syslog(LOG_ERR, "chunk size %u too large", header->size);
+        virtio_port_destroy(&vport, FALSE);
+        return FALSE;
+    }
+    if (header->port >= VDP_END_PORT) {
+        syslog(LOG_ERR, "chunk port %u out of range", header->port);
+        virtio_port_destroy(&vport, FALSE);
+        return FALSE;
+    }
+
+    *body_size = header->size;
+    return TRUE;
+}
+
+static gboolean conn_read_cb(gpointer header,
+                             gpointer data,
+                             gpointer user_data)
+{
+    struct vdagent_virtio_port *vport = user_data;
+    vdagent_virtio_port_do_chunk(&vport, header, data);
+    return vport != NULL;
+}
+
+static void conn_error_cb(gpointer user_data)
+{
+    struct vdagent_virtio_port *vport = user_data;
+    virtio_port_destroy(&vport, FALSE);
+}
 
 struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
     vdagent_virtio_port_read_callback read_callback,
     vdagent_virtio_port_disconnect_callback disconnect_callback)
 {
     struct vdagent_virtio_port *vport;
-    struct sockaddr_un address;
-    int c;
+    GIOStream *io_stream;
+
+    io_stream = vdagent_file_open(portname);
+    if (io_stream == NULL) {
+        io_stream = vdagent_socket_connect(portname);
+        if (io_stream == NULL)
+            return NULL;
+    }
 
     vport = calloc(1, sizeof(*vport));
     if (!vport)
         return 0;
 
-    vport->fd = open(portname, O_RDWR);
-    if (vport->fd == -1) {
-        vport->fd = socket(PF_UNIX, SOCK_STREAM, 0);
-        if (vport->fd == -1) {
-            goto error;
-        }
-        address.sun_family = AF_UNIX;
-        snprintf(address.sun_path, sizeof(address.sun_path), "%s", portname);
-        c = connect(vport->fd, (struct sockaddr *)&address, sizeof(address));
-        if (c == 0) {
-            vport->is_uds = 1;
-        } else {
-            goto error;
-        }
-    } else {
-        vport->is_uds = 0;
-    }
-    vport->opening = 1;
-
+    /* When calling vdagent_connection_new(),
+     * @wait_on_opening MUST be set to TRUE:
+     *
+     * When we open the virtio serial port, the following happens:
+     * 1) The linux kernel virtio_console driver sends a
+     *    VIRTIO_CONSOLE_PORT_OPEN message to qemu
+     * 2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
+     *    register the agent chardev with the spice-server
+     * 3) spice-server then calls the spicevmc chardev driver's state
+     *    callback to let it know it is ready to receive data
+     * 4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
+     *    chardev backend
+     * 5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
+     *    to the linux kernel virtio_console driver
+     *
+     * Until steps 1 - 5 have completed the linux kernel virtio_console
+     * driver sees the virtio serial port as being in a disconnected state
+     * and read will return 0 ! So if we blindly assume that a read 0 means
+     * that the channel is closed we will hit a race here.
+     */
+    vport->conn = vdagent_connection_new(io_stream,
+                                         TRUE,
+                                         sizeof(VDIChunkHeader),
+                                         conn_header_read_cb,
+                                         conn_read_cb,
+                                         conn_error_cb,
+                                         vport);
     vport->read_callback = read_callback;
     vport->disconnect_callback = disconnect_callback;
 
     return vport;
-
-error:
-    syslog(LOG_ERR, "open %s: %m", portname);
-    if (vport->fd != -1) {
-        close(vport->fd);
-    }
-    free(vport);
-    return NULL;
 }
 
 static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
                                 gboolean by_user)
 {
-    struct vdagent_virtio_port_buf *wbuf, *next_wbuf;
     struct vdagent_virtio_port *vport = *vportp;
     int i;
 
@@ -135,19 +165,16 @@ static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
     if (vport->disconnect_callback)
         vport->disconnect_callback(vport, by_user);
 
-    wbuf = vport->write_buf;
-    while (wbuf) {
-        next_wbuf = wbuf->next;
-        free(wbuf->buf);
-        free(wbuf);
-        wbuf = next_wbuf;
+    if (vport->write_buf) {
+        free(vport->write_buf->buf);
+        free(vport->write_buf);
     }
 
     for (i = 0; i < VDP_END_PORT; i++) {
         free(vport->port_data[i].message_data);
     }
 
-    close(vport->fd);
+    vdagent_connection_destroy(vport->conn);
     free(vport);
     *vportp = NULL;
 }
@@ -157,47 +184,6 @@ void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp)
     virtio_port_destroy(vportp, TRUE);
 }
 
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
-        fd_set *readfds, fd_set *writefds)
-{
-    if (!vport)
-        return -1;
-
-    FD_SET(vport->fd, readfds);
-    if (vport->write_buf)
-        FD_SET(vport->fd, writefds);
-
-    return vport->fd + 1;
-}
-
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
-        fd_set *readfds, fd_set *writefds)
-{
-    if (!*vportp)
-        return;
-
-    if (FD_ISSET((*vportp)->fd, readfds))
-        vdagent_virtio_port_do_read(vportp);
-
-    if (*vportp && FD_ISSET((*vportp)->fd, writefds))
-        vdagent_virtio_port_do_write(vportp);
-}
-
-static struct vdagent_virtio_port_buf* vdagent_virtio_port_get_last_wbuf(
-    struct vdagent_virtio_port *vport)
-{
-    struct vdagent_virtio_port_buf *wbuf;
-
-    wbuf = vport->write_buf;
-    if (!wbuf)
-        return NULL;
-
-    while (wbuf->next)
-        wbuf = wbuf->next;
-
-    return wbuf;
-}
-
 int vdagent_virtio_port_write_start(
         struct vdagent_virtio_port *vport,
         uint32_t port_nr,
@@ -205,7 +191,7 @@ int vdagent_virtio_port_write_start(
         uint32_t message_opaque,
         uint32_t data_size)
 {
-    struct vdagent_virtio_port_buf *wbuf, *new_wbuf;
+    struct vdagent_virtio_port_buf *new_wbuf;
     VDIChunkHeader chunk_header;
     VDAgentMessage message_header;
 
@@ -213,10 +199,8 @@ int vdagent_virtio_port_write_start(
     if (!new_wbuf)
         return -1;
 
-    new_wbuf->pos = 0;
     new_wbuf->write_pos = 0;
     new_wbuf->size = sizeof(chunk_header) + sizeof(message_header) + data_size;
-    new_wbuf->next = NULL;
     new_wbuf->buf = malloc(new_wbuf->size);
     if (!new_wbuf->buf) {
         free(new_wbuf);
@@ -237,14 +221,7 @@ int vdagent_virtio_port_write_start(
            sizeof(message_header));
     new_wbuf->write_pos += sizeof(message_header);
 
-    if (!vport->write_buf) {
-        vport->write_buf = new_wbuf;
-        return 0;
-    }
-
-    wbuf = vdagent_virtio_port_get_last_wbuf(vport);
-    wbuf->next = new_wbuf;
-
+    vport->write_buf = new_wbuf;
     return 0;
 }
 
@@ -253,7 +230,10 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
 {
     struct vdagent_virtio_port_buf *wbuf;
 
-    wbuf = vdagent_virtio_port_get_last_wbuf(vport);
+    if (size == 0)
+        return 0;
+
+    wbuf = vport->write_buf;
     if (!wbuf) {
         syslog(LOG_ERR, "can't append without a buffer");
         return -1;
@@ -266,6 +246,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
 
     memcpy(wbuf->buf + wbuf->write_pos, data, size);
     wbuf->write_pos += size;
+
+    if (wbuf->write_pos == wbuf->size) {
+        vdagent_connection_write(vport->conn, wbuf->buf, wbuf->size);
+        g_clear_pointer(&vport->write_buf, free);
+    }
     return 0;
 }
 
@@ -287,8 +272,8 @@ int vdagent_virtio_port_write(
 
 void vdagent_virtio_port_flush(struct vdagent_virtio_port **vportp)
 {
-    while (*vportp && (*vportp)->write_buf)
-        vdagent_virtio_port_do_write(vportp);
+    if (*vportp)
+        vdagent_connection_flush((*vportp)->conn);
 }
 
 void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
@@ -301,20 +286,22 @@ void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
     memset(&vport->port_data[port], 0, sizeof(vport->port_data[0]));
 }
 
-static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
+static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp,
+                                         VDIChunkHeader *chunk_header,
+                                         const guchar *chunk_data)
 {
     int avail, read, pos = 0;
     struct vdagent_virtio_port *vport = *vportp;
     struct vdagent_virtio_port_chunk_port_data *port =
-        &vport->port_data[vport->chunk_header.port];
+        &vport->port_data[chunk_header->port];
 
     if (port->message_header_read < sizeof(port->message_header)) {
         read = sizeof(port->message_header) - port->message_header_read;
-        if (read > vport->chunk_header.size) {
-            read = vport->chunk_header.size;
+        if (read > chunk_header->size) {
+            read = chunk_header->size;
         }
         memcpy((uint8_t *)&port->message_header + port->message_header_read,
-               vport->chunk_data, read);
+               chunk_data, read);
         port->message_header_read += read;
         if (port->message_header_read == sizeof(port->message_header)) {
 
@@ -337,7 +324,7 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
 
     if (port->message_header_read == sizeof(port->message_header)) {
         read  = port->message_header.size - port->message_data_pos;
-        avail = vport->chunk_header.size - pos;
+        avail = chunk_header->size - pos;
 
         if (avail > read) {
             syslog(LOG_ERR, "chunk larger than message, lost sync?");
@@ -350,13 +337,13 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
 
         if (read) {
             memcpy(port->message_data + port->message_data_pos,
-                   vport->chunk_data + pos, read);
+                   chunk_data + pos, read);
             port->message_data_pos += read;
         }
 
         if (port->message_data_pos == port->message_header.size) {
             if (vport->read_callback) {
-                int r = vport->read_callback(vport, vport->chunk_header.port,
+                int r = vport->read_callback(vport, chunk_header->port,
                                  &port->message_header, port->message_data);
                 if (r == -1) {
                     virtio_port_destroy(vportp, TRUE);
@@ -370,139 +357,3 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
         }
     }
 }
-
-static int vport_read(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
-    if (vport->is_uds) {
-        return recv(vport->fd, buf, len, 0);
-    } else {
-        return read(vport->fd, buf, len);
-    }
-}
-
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp)
-{
-    ssize_t n;
-    size_t to_read;
-    uint8_t *dest;
-    struct vdagent_virtio_port *vport = *vportp;
-
-    if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
-        to_read = sizeof(vport->chunk_header) - vport->chunk_header_read;
-        dest = (uint8_t *)&vport->chunk_header + vport->chunk_header_read;
-    } else {
-        to_read = vport->chunk_header.size - vport->chunk_data_pos;
-        dest = vport->chunk_data + vport->chunk_data_pos;
-    }
-
-    n = vport_read(vport, dest, to_read);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "reading from vdagent virtio port: %m");
-    }
-    if (n == 0 && vport->opening) {
-        /* When we open the virtio serial port, the following happens:
-           1) The linux kernel virtio_console driver sends a
-              VIRTIO_CONSOLE_PORT_OPEN message to qemu
-           2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
-              register the agent chardev with the spice-server
-           3) spice-server then calls the spicevmc chardev driver's state
-              callback to let it know it is ready to receive data
-           4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
-              chardev backend
-           5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
-              to the linux kernel virtio_console driver
-
-           Until steps 1 - 5 have completed the linux kernel virtio_console
-           driver sees the virtio serial port as being in a disconnected state
-           and read will return 0 ! So if we blindly assume that a read 0 means
-           that the channel is closed we will hit a race here.
-
-           Therefore we ignore read returning 0 until we've successfully read
-           or written some data. If we hit this race we also sleep a bit here
-           to avoid busy waiting until the above steps complete */
-        usleep(10000);
-        return;
-    }
-    if (n <= 0) {
-        virtio_port_destroy(vportp, FALSE);
-        return;
-    }
-    vport->opening = 0;
-
-    if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
-        vport->chunk_header_read += n;
-        if (vport->chunk_header_read == sizeof(vport->chunk_header)) {
-            vport->chunk_header.size = GUINT32_FROM_LE(vport->chunk_header.size);
-            vport->chunk_header.port = GUINT32_FROM_LE(vport->chunk_header.port);
-            if (vport->chunk_header.size > VD_AGENT_MAX_DATA_SIZE) {
-                syslog(LOG_ERR, "chunk size %u too large",
-                       vport->chunk_header.size);
-                virtio_port_destroy(vportp, FALSE);
-                return;
-            }
-            if (vport->chunk_header.port >= VDP_END_PORT) {
-                syslog(LOG_ERR, "chunk port %u out of range",
-                       vport->chunk_header.port);
-                virtio_port_destroy(vportp, FALSE);
-                return;
-            }
-        }
-    } else {
-        vport->chunk_data_pos += n;
-        if (vport->chunk_data_pos == vport->chunk_header.size) {
-            vdagent_virtio_port_do_chunk(vportp);
-            if (!*vportp)
-                return;
-            vport->chunk_header_read = 0;
-            vport->chunk_data_pos = 0;
-        }
-    }
-}
-
-static int vport_write(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
-    if (vport->is_uds) {
-        return send(vport->fd, buf, len, 0);
-    } else {
-        return write(vport->fd, buf, len);
-    }
-}
-
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp)
-{
-    ssize_t n;
-    size_t to_write;
-    struct vdagent_virtio_port *vport = *vportp;
-
-    struct vdagent_virtio_port_buf* wbuf = vport->write_buf;
-    if (!wbuf) {
-        syslog(LOG_ERR, "do_write called on a port without a write buf ?!");
-        return;
-    }
-
-    if (wbuf->write_pos != wbuf->size) {
-        syslog(LOG_ERR, "do_write: buffer is incomplete!!");
-        return;
-    }
-
-    to_write = wbuf->size - wbuf->pos;
-    n = vport_write(vport, wbuf->buf + wbuf->pos, to_write);
-    if (n < 0) {
-        if (errno == EINTR)
-            return;
-        syslog(LOG_ERR, "writing to vdagent virtio port: %m");
-        virtio_port_destroy(vportp, FALSE);
-        return;
-    }
-    if (n > 0)
-        vport->opening = 0;
-
-    wbuf->pos += n;
-    if (wbuf->pos == wbuf->size) {
-        vport->write_buf = wbuf->next;
-        free(wbuf->buf);
-        free(wbuf);
-    }
-}
diff --git a/src/vdagentd/virtio-port.h b/src/vdagentd/virtio-port.h
index 3c701d6..87b75cf 100644
--- a/src/vdagentd/virtio-port.h
+++ b/src/vdagentd/virtio-port.h
@@ -22,9 +22,7 @@
 #ifndef __VIRTIO_PORT_H
 #define __VIRTIO_PORT_H
 
-#include <stdio.h>
 #include <stdint.h>
-#include <sys/select.h>
 #include <spice/vd_agent.h>
 
 struct vdagent_virtio_port;
@@ -60,21 +58,6 @@ struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
 void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp);
 
 
-/* Given a vdagent_virtio_port fill the fd_sets pointed to by readfds and
-   writefds for select() usage.
-
-   Return value: value of the highest fd + 1 */
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
-        fd_set *readfds, fd_set *writefds);
-
-/* Handle any events flagged by select for the given vdagent_virtio_port.
-   Note the port may be destroyed (when disconnected) by this call
-   in this case the disconnect calllback will get called before the
-   destruction and the contents of connp will be made NULL */
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
-        fd_set *readfds, fd_set *writefds);
-
-
 /* Queue a message for delivery, either bit by bit, or all at once
 
    Returns 0 on success -1 on error (only happens when malloc fails) */
-- 
2.17.1



More information about the Spice-devel mailing list