[Spice-devel] [PATCH vdagent-linux 3/4] introduce VDAgentConnection
Jakub Janků
jjanku at redhat.com
Sun Sep 30 18:05:22 UTC 2018
1) VDAgentConnection
Add vdagent-connection.{c,h} files.
Define a new GObject: VDAgentConnection which can be used to easily
write messages to and read from the given FD.
VDAgentConnection uses GIO and therefore integrates well with GMainLoop.
Read messages must begin with a header of a fixed size.
Message body size can vary.
User of VDAgentConnection is notified
through callbacks about the following events:
* message header read
* whole message read
* I/O error
A new VDAgentConnection can be constructed using
vdagent_connection_new() based on a GIOStream that can be obtained using
vdagent_file_open() or vdagent_socket_connect().
vdagent_connection_destroy() destroyes the connection.
However, due to the asynchronous nature of used GIO functions,
this does NOT close the underlying FD immediately.
If vdagent_connection_destroy() is called outside of GMainLoop
(or the loop quits right after the function ivocation),
g_main_context_iteration() should be called to ensure that the
VDAgentConnection finalizes properly.
2) udscs
Rewrite udscs.c to use the new VDAgentConnection.
Use GSocketService in udscs_server.
Drop support for select(), remove:
* udscs_server_fill_fds()
* udscs_server_handle_fds()
3) virtio_port
Rewrite virtio-port.c to use the new VDAgentConnection.
Drop support for select(), remove:
* vdagent_virtio_port_fill_fds()
* vdagent_virtio_port_handle_fds()
2) vdagentd
Replace the main_loop() with a GMainLoop.
Use g_unix_signal_add() to handle SIGINT, SIGHUP, SIGTERM.
SIGQUIT handling is not supported by GLib.
Integrate the session_info into the loop using
GIOChannel and g_io_add_watch().
Signed-off-by: Jakub Janků <jjanku at redhat.com>
---
Makefile.am | 2 +
src/udscs.c | 483 +++++++++++--------------------------
src/udscs.h | 15 --
src/vdagent-connection.c | 300 +++++++++++++++++++++++
src/vdagent-connection.h | 124 ++++++++++
src/vdagent/vdagent.c | 3 +
src/vdagentd/vdagentd.c | 169 ++++++-------
src/vdagentd/virtio-port.c | 389 ++++++++++-------------------
src/vdagentd/virtio-port.h | 18 --
9 files changed, 772 insertions(+), 731 deletions(-)
create mode 100644 src/vdagent-connection.c
create mode 100644 src/vdagent-connection.h
diff --git a/Makefile.am b/Makefile.am
index fa54bbc..b291b19 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
common_sources = \
src/udscs.c \
src/udscs.h \
+ src/vdagent-connection.c \
+ src/vdagent-connection.h \
src/vdagentd-proto-strings.h \
src/vdagentd-proto.h \
$(NULL)
diff --git a/src/udscs.c b/src/udscs.c
index 62abc97..3bf0089 100644
--- a/src/udscs.c
+++ b/src/udscs.c
@@ -24,42 +24,22 @@
#include <config.h>
#endif
-#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
-#include <unistd.h>
-#include <errno.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
#include <glib-unix.h>
+#include <gio/gunixsocketaddress.h>
#include "udscs.h"
#include "vdagentd-proto-strings.h"
-
-struct udscs_buf {
- uint8_t *buf;
- size_t pos;
- size_t size;
-
- struct udscs_buf *next;
-};
+#include "vdagent-connection.h"
struct udscs_connection {
- int fd;
int debug;
void *user_data;
#ifndef UDSCS_NO_SERVER
- struct ucred peer_cred;
+ gint peer_pid;
#endif
- /* Read stuff, single buffer, separate header and data buffer */
- int header_read;
- struct udscs_message_header header;
- struct udscs_buf data;
-
- /* Writes are stored in a linked list of buffers, with both the header
- + data for a single message in 1 buffer. */
- struct udscs_buf *write_buf;
+ VDAgentConnection *conn;
/* Callbacks */
udscs_read_callback read_callback;
@@ -67,16 +47,8 @@ struct udscs_connection {
struct udscs_connection *next;
struct udscs_connection *prev;
-
- GIOChannel *io_channel;
- guint write_watch_id;
- guint read_watch_id;
};
-static gboolean udscs_io_channel_cb(GIOChannel *source,
- GIOCondition condition,
- gpointer data);
-
static void debug_print_message_header(struct udscs_connection *conn,
struct udscs_message_header *header,
const gchar *direction)
@@ -93,47 +65,61 @@ static void debug_print_message_header(struct udscs_connection *conn,
conn, direction, type, header->arg1, header->arg2, header->size);
}
+static gboolean conn_header_read_cb(gpointer header_buff,
+ gsize *body_size,
+ gpointer user_data)
+{
+ struct udscs_message_header *header = header_buff;
+ *body_size = header->size;
+ return TRUE;
+}
+
+static gboolean conn_read_cb(gpointer header_buff,
+ gpointer data,
+ gpointer user_data)
+{
+ struct udscs_connection *conn = user_data;
+ struct udscs_message_header *header = header_buff;
+
+ debug_print_message_header(conn, header, "received");
+
+ conn->read_callback(&conn, header, data);
+ return conn != NULL;
+}
+
+static void conn_error_cb(GError *err, gpointer user_data)
+{
+ struct udscs_connection *conn = user_data;
+ if (err)
+ syslog(LOG_ERR, "%p error: %s", conn, err->message);
+ udscs_destroy_connection(&conn);
+}
+
struct udscs_connection *udscs_connect(const char *socketname,
udscs_read_callback read_callback,
udscs_disconnect_callback disconnect_callback,
int debug)
{
- int c;
- struct sockaddr_un address;
+ GIOStream *io_stream;
struct udscs_connection *conn;
+ GError *err = NULL;
- conn = g_new0(struct udscs_connection, 1);
- conn->debug = debug;
-
- conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
- if (conn->fd == -1) {
- syslog(LOG_ERR, "creating unix domain socket: %m");
- g_free(conn);
- return NULL;
- }
-
- address.sun_family = AF_UNIX;
- snprintf(address.sun_path, sizeof(address.sun_path), "%s", socketname);
- c = connect(conn->fd, (struct sockaddr *)&address, sizeof(address));
- if (c != 0) {
- if (conn->debug) {
- syslog(LOG_DEBUG, "connect %s: %m", socketname);
- }
- g_free(conn);
- return NULL;
- }
-
- conn->io_channel = g_io_channel_unix_new(conn->fd);
- if (!conn->io_channel) {
- udscs_destroy_connection(&conn);
+ io_stream = vdagent_socket_connect(socketname, &err);
+ if (err) {
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_error_free(err);
return NULL;
}
- conn->read_watch_id =
- g_io_add_watch(conn->io_channel,
- G_IO_IN | G_IO_ERR | G_IO_NVAL,
- udscs_io_channel_cb,
- conn);
+ conn = g_new0(struct udscs_connection, 1);
+ conn->debug = debug;
+ conn->conn = vdagent_connection_new(io_stream,
+ FALSE,
+ sizeof(struct udscs_message_header),
+ conn_header_read_cb,
+ conn_read_cb,
+ conn_error_cb,
+ conn);
conn->read_callback = read_callback;
conn->disconnect_callback = disconnect_callback;
@@ -145,7 +131,6 @@ struct udscs_connection *udscs_connect(const char *socketname,
void udscs_destroy_connection(struct udscs_connection **connp)
{
- struct udscs_buf *wbuf, *next_wbuf;
struct udscs_connection *conn = *connp;
if (!conn)
@@ -154,28 +139,12 @@ void udscs_destroy_connection(struct udscs_connection **connp)
if (conn->disconnect_callback)
conn->disconnect_callback(conn);
- wbuf = conn->write_buf;
- while (wbuf) {
- next_wbuf = wbuf->next;
- g_free(wbuf->buf);
- g_free(wbuf);
- wbuf = next_wbuf;
- }
-
- g_clear_pointer(&conn->data.buf, g_free);
-
if (conn->next)
conn->next->prev = conn->prev;
if (conn->prev)
conn->prev->next = conn->next;
- close(conn->fd);
-
- if (conn->write_watch_id != 0)
- g_source_remove(conn->write_watch_id);
- if (conn->read_watch_id != 0)
- g_source_remove(conn->read_watch_id);
- g_clear_pointer(&conn->io_channel, g_io_channel_unref);
+ vdagent_connection_destroy(conn->conn);
if (conn->debug)
syslog(LOG_DEBUG, "%p disconnected", conn);
@@ -199,174 +168,33 @@ void *udscs_get_user_data(struct udscs_connection *conn)
void udscs_write(struct udscs_connection *conn, uint32_t type, uint32_t arg1,
uint32_t arg2, const uint8_t *data, uint32_t size)
{
- struct udscs_buf *wbuf, *new_wbuf;
+ gpointer buff;
+ guint buff_size;
struct udscs_message_header header;
- new_wbuf = g_new(struct udscs_buf, 1);
- new_wbuf->pos = 0;
- new_wbuf->size = sizeof(header) + size;
- new_wbuf->next = NULL;
- new_wbuf->buf = g_malloc(new_wbuf->size);
+ buff_size = sizeof(header) + size;
+ buff = g_malloc(buff_size);
header.type = type;
header.arg1 = arg1;
header.arg2 = arg2;
header.size = size;
- memcpy(new_wbuf->buf, &header, sizeof(header));
- memcpy(new_wbuf->buf + sizeof(header), data, size);
+ memcpy(buff, &header, sizeof(header));
+ memcpy(buff + sizeof(header), data, size);
debug_print_message_header(conn, &header, "sent");
- if (conn->io_channel && conn->write_watch_id == 0)
- conn->write_watch_id =
- g_io_add_watch(conn->io_channel,
- G_IO_OUT | G_IO_ERR | G_IO_NVAL,
- udscs_io_channel_cb,
- conn);
-
- if (!conn->write_buf) {
- conn->write_buf = new_wbuf;
- return;
- }
-
- /* maybe we should limit the write_buf stack depth ? */
- wbuf = conn->write_buf;
- while (wbuf->next)
- wbuf = wbuf->next;
-
- wbuf->next = new_wbuf;
-}
-
-/* A helper for udscs_do_read() */
-static void udscs_read_complete(struct udscs_connection **connp)
-{
- struct udscs_connection *conn = *connp;
-
- debug_print_message_header(conn, &conn->header, "received");
-
- if (conn->read_callback) {
- conn->read_callback(connp, &conn->header, conn->data.buf);
- if (!*connp) /* Was the connection disconnected by the callback ? */
- return;
- }
-
- g_free(conn->data.buf);
- memset(&conn->data, 0, sizeof(conn->data)); /* data.buf = NULL */
- conn->header_read = 0;
+ vdagent_connection_write(conn->conn, buff, buff_size);
}
-static void udscs_do_read(struct udscs_connection **connp)
-{
- ssize_t n;
- size_t to_read;
- uint8_t *dest;
- struct udscs_connection *conn = *connp;
-
- if (conn->header_read < sizeof(conn->header)) {
- to_read = sizeof(conn->header) - conn->header_read;
- dest = (uint8_t *)&conn->header + conn->header_read;
- } else {
- to_read = conn->data.size - conn->data.pos;
- dest = conn->data.buf + conn->data.pos;
- }
-
- n = read(conn->fd, dest, to_read);
- if (n < 0) {
- if (errno == EINTR)
- return;
- syslog(LOG_ERR, "reading unix domain socket: %m, disconnecting %p",
- conn);
- }
- if (n <= 0) {
- udscs_destroy_connection(connp);
- return;
- }
-
- if (conn->header_read < sizeof(conn->header)) {
- conn->header_read += n;
- if (conn->header_read == sizeof(conn->header)) {
- if (conn->header.size == 0) {
- udscs_read_complete(connp);
- return;
- }
- conn->data.pos = 0;
- conn->data.size = conn->header.size;
- conn->data.buf = g_malloc(conn->data.size);
- }
- } else {
- conn->data.pos += n;
- if (conn->data.pos == conn->data.size)
- udscs_read_complete(connp);
- }
-}
-
-static void udscs_do_write(struct udscs_connection **connp)
-{
- ssize_t n;
- size_t to_write;
- struct udscs_connection *conn = *connp;
-
- struct udscs_buf* wbuf = conn->write_buf;
- if (!wbuf) {
- syslog(LOG_ERR,
- "%p do_write called on a connection without a write buf ?!",
- conn);
- return;
- }
-
- to_write = wbuf->size - wbuf->pos;
- n = write(conn->fd, wbuf->buf + wbuf->pos, to_write);
- if (n < 0) {
- if (errno == EINTR)
- return;
- syslog(LOG_ERR, "writing to unix domain socket: %m, disconnecting %p",
- conn);
- udscs_destroy_connection(connp);
- return;
- }
-
- wbuf->pos += n;
- if (wbuf->pos == wbuf->size) {
- conn->write_buf = wbuf->next;
- g_free(wbuf->buf);
- g_free(wbuf);
- }
-}
-
-static gboolean udscs_io_channel_cb(GIOChannel *source,
- GIOCondition condition,
- gpointer data)
-{
- struct udscs_connection *conn = data;
-
- if (condition & G_IO_IN) {
- udscs_do_read(&conn);
- if (conn == NULL)
- return G_SOURCE_REMOVE;
- return G_SOURCE_CONTINUE;
- }
- if (condition & G_IO_OUT) {
- udscs_do_write(&conn);
- if (conn == NULL)
- return G_SOURCE_REMOVE;
- if (conn->write_buf)
- return G_SOURCE_CONTINUE;
- conn->write_watch_id = 0;
- return G_SOURCE_REMOVE;
- }
-
- udscs_destroy_connection(&conn);
- return G_SOURCE_REMOVE;
-}
-
-
#ifndef UDSCS_NO_SERVER
/* ---------- Server-side implementation ---------- */
struct udscs_server {
- int fd;
+ GSocketService *service;
+
int debug;
struct udscs_connection connections_head;
udscs_connect_callback connect_callback;
@@ -374,7 +202,12 @@ struct udscs_server {
udscs_disconnect_callback disconnect_callback;
};
-struct udscs_server *udscs_create_server_for_fd(int fd,
+static gboolean udscs_server_accept_cb(GSocketService *service,
+ GSocketConnection *socket_conn,
+ GObject *source_object,
+ gpointer user_data);
+
+static struct udscs_server *udscs_server_new(
udscs_connect_callback connect_callback,
udscs_read_callback read_callback,
udscs_disconnect_callback disconnect_callback,
@@ -382,59 +215,74 @@ struct udscs_server *udscs_create_server_for_fd(int fd,
{
struct udscs_server *server;
- if (fd <= 0) {
- syslog(LOG_ERR, "Invalid file descriptor: %i", fd);
- return NULL;
- }
-
server = g_new0(struct udscs_server, 1);
server->debug = debug;
- server->fd = fd;
server->connect_callback = connect_callback;
server->read_callback = read_callback;
server->disconnect_callback = disconnect_callback;
+ server->service = g_socket_service_new();
+
+ g_signal_connect(server->service, "incoming",
+ G_CALLBACK(udscs_server_accept_cb), server);
return server;
}
-struct udscs_server *udscs_create_server(const char *socketname,
+struct udscs_server *udscs_create_server_for_fd(int fd,
udscs_connect_callback connect_callback,
udscs_read_callback read_callback,
udscs_disconnect_callback disconnect_callback,
int debug)
{
- int c;
- int fd;
- struct sockaddr_un address;
struct udscs_server *server;
+ GSocket *socket;
+ GError *err = NULL;
- fd = socket(PF_UNIX, SOCK_STREAM, 0);
- if (fd == -1) {
- syslog(LOG_ERR, "creating unix domain socket: %m");
- return NULL;
- }
+ server = udscs_server_new(connect_callback, read_callback,
+ disconnect_callback, debug);
- address.sun_family = AF_UNIX;
- snprintf(address.sun_path, sizeof(address.sun_path), "%s", socketname);
- c = bind(fd, (struct sockaddr *)&address, sizeof(address));
- if (c != 0) {
- syslog(LOG_ERR, "bind %s: %m", socketname);
- close(fd);
- return NULL;
- }
+ socket = g_socket_new_from_fd(fd, &err);
+ if (err)
+ goto error;
+ g_socket_listener_add_socket(G_SOCKET_LISTENER(server->service),
+ socket, NULL, &err);
+ g_object_unref(socket);
+ if (err)
+ goto error;
- c = listen(fd, 5);
- if (c != 0) {
- syslog(LOG_ERR, "listen: %m");
- close(fd);
- return NULL;
- }
-
- server = udscs_create_server_for_fd(fd, connect_callback, read_callback,
- disconnect_callback, debug);
+ return server;
+error:
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_error_free(err);
+ udscs_destroy_server(server);
+ return NULL;
+}
- if (!server) {
- close(fd);
+struct udscs_server *udscs_create_server(const char *socketname,
+ udscs_connect_callback connect_callback,
+ udscs_read_callback read_callback,
+ udscs_disconnect_callback disconnect_callback,
+ int debug)
+{
+ struct udscs_server *server;
+ GSocketAddress *socket_addr;
+ GError *err = NULL;
+
+ server = udscs_server_new(connect_callback, read_callback,
+ disconnect_callback, debug);
+
+ socket_addr = g_unix_socket_address_new(socketname);
+ g_socket_listener_add_address(G_SOCKET_LISTENER(server->service),
+ socket_addr,
+ G_SOCKET_TYPE_STREAM,
+ G_SOCKET_PROTOCOL_DEFAULT,
+ NULL, NULL, &err);
+ g_object_unref(socket_addr);
+ if (err) {
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_error_free(err);
+ udscs_destroy_server(server);
+ return NULL;
}
return server;
@@ -453,43 +301,51 @@ void udscs_destroy_server(struct udscs_server *server)
udscs_destroy_connection(&conn);
conn = next_conn;
}
- close(server->fd);
+ g_object_unref(server->service);
g_free(server);
}
int udscs_get_peer_pid(struct udscs_connection *conn)
{
- return (int)conn->peer_cred.pid;
+ return conn->peer_pid;
}
-static void udscs_server_accept(struct udscs_server *server) {
+static gboolean udscs_server_accept_cb(GSocketService *service,
+ GSocketConnection *socket_conn,
+ GObject *source_object,
+ gpointer user_data)
+{
+ struct udscs_server *server = user_data;
struct udscs_connection *new_conn, *conn;
- struct sockaddr_un address;
- socklen_t length = sizeof(address);
- int r, fd;
-
- fd = accept(server->fd, (struct sockaddr *)&address, &length);
- if (fd == -1) {
- if (errno == EINTR)
- return;
- syslog(LOG_ERR, "accept: %m");
- return;
- }
+ GCredentials *cred;
+ GError *err = NULL;
new_conn = g_new0(struct udscs_connection, 1);
- new_conn->fd = fd;
new_conn->debug = server->debug;
new_conn->read_callback = server->read_callback;
new_conn->disconnect_callback = server->disconnect_callback;
- length = sizeof(new_conn->peer_cred);
- r = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &new_conn->peer_cred, &length);
- if (r != 0) {
- syslog(LOG_ERR, "Could not get peercred, disconnecting new client");
- close(fd);
+ g_object_ref(socket_conn);
+ new_conn->conn = vdagent_connection_new(G_IO_STREAM(socket_conn),
+ FALSE,
+ sizeof(struct udscs_message_header),
+ conn_header_read_cb,
+ conn_read_cb,
+ conn_error_cb,
+ new_conn);
+
+
+ cred = vdagent_connection_get_peer_credentials(new_conn->conn, &err);
+ if (err) {
+ syslog(LOG_ERR, "Could not get peer PID, disconnecting new client: %s",
+ err->message);
+ g_error_free(err);
+ vdagent_connection_destroy(new_conn->conn);
g_free(new_conn);
- return;
+ return TRUE;
}
+ new_conn->peer_pid = g_credentials_get_unix_pid(cred, NULL);
+ g_object_unref(cred);
conn = &server->connections_head;
while (conn->next)
@@ -504,59 +360,8 @@ static void udscs_server_accept(struct udscs_server *server) {
if (server->connect_callback)
server->connect_callback(new_conn);
-}
-
-int udscs_server_fill_fds(struct udscs_server *server, fd_set *readfds,
- fd_set *writefds)
-{
- struct udscs_connection *conn;
- int nfds;
- if (!server)
- return -1;
-
- nfds = server->fd + 1;
- FD_SET(server->fd, readfds);
-
- conn = server->connections_head.next;
- while (conn) {
- FD_SET(conn->fd, readfds);
- if (conn->write_buf)
- FD_SET(conn->fd, writefds);
-
- if (conn->fd >= nfds)
- nfds = conn->fd + 1;
-
- conn = conn->next;
- }
-
- return nfds;
-}
-
-void udscs_server_handle_fds(struct udscs_server *server, fd_set *readfds,
- fd_set *writefds)
-{
- struct udscs_connection *conn, *next_conn;
-
- if (!server)
- return;
-
- if (FD_ISSET(server->fd, readfds))
- udscs_server_accept(server);
-
- conn = server->connections_head.next;
- while (conn) {
- /* conn may be destroyed by udscs_do_read() or udscs_do_write()
- * (when disconnected), so get the next connection first. */
- next_conn = conn->next;
-
- if (FD_ISSET(conn->fd, readfds))
- udscs_do_read(&conn);
- if (conn && FD_ISSET(conn->fd, writefds))
- udscs_do_write(&conn);
-
- conn = next_conn;
- }
+ return TRUE;
}
void udscs_server_write_all(struct udscs_server *server,
diff --git a/src/udscs.h b/src/udscs.h
index 363ca18..1c7fa2b 100644
--- a/src/udscs.h
+++ b/src/udscs.h
@@ -22,9 +22,7 @@
#ifndef __UDSCS_H
#define __UDSCS_H
-#include <stdio.h>
#include <stdint.h>
-#include <sys/select.h>
#include <sys/socket.h>
@@ -151,19 +149,6 @@ typedef int (*udscs_for_all_clients_callback)(struct udscs_connection **connp,
int udscs_server_for_all_clients(struct udscs_server *server,
udscs_for_all_clients_callback func, void *priv);
-/* Given a udscs server, fill the fd_sets pointed to by readfds and
- * writefds for select() usage.
- * Return value: value of the highest fd + 1 or -1 if server is NULL
- */
-int udscs_server_fill_fds(struct udscs_server *server, fd_set *readfds,
- fd_set *writefds);
-
-/* Handle any events flagged by select for the given udscs server.
- * Does nothing if server is NULL.
- */
-void udscs_server_handle_fds(struct udscs_server *server, fd_set *readfds,
- fd_set *writefds);
-
/* Returns the peer's PID. */
int udscs_get_peer_pid(struct udscs_connection *conn);
diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
new file mode 100644
index 0000000..e492770
--- /dev/null
+++ b/src/vdagent-connection.c
@@ -0,0 +1,300 @@
+/* vdagent-connection.c
+
+ Copyright 2018 Red Hat, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <syslog.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <glib/gstdio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixsocketaddress.h>
+
+#include "vdagent-connection.h"
+
+struct VDAgentConnection {
+ GObject parent_instance;
+
+ GIOStream *io_stream;
+ gboolean opening;
+ GCancellable *cancellable;
+
+ GQueue *write_queue;
+ GMainLoop *flush_loop;
+
+ gsize header_size;
+ gpointer header_buff;
+ gpointer read_buff;
+
+ VDAgentConnHeaderReadCb header_read_cb;
+ VDAgentConnReadCb read_cb;
+ VDAgentConnErrorCb error_cb;
+
+ gpointer user_data;
+};
+
+G_DEFINE_TYPE(VDAgentConnection, vdagent_connection, G_TYPE_OBJECT);
+
+static void write_next_message(VDAgentConnection *conn);
+static void read_next_message(VDAgentConnection *conn);
+
+GIOStream *vdagent_file_open(const gchar *path, GError **err)
+{
+ gint fd, errsv;
+
+ fd = g_open(path, O_RDWR);
+ if (fd == -1) {
+ errsv = errno;
+ g_set_error_literal(err, G_FILE_ERROR,
+ g_file_error_from_errno(errsv),
+ g_strerror(errsv));
+ return NULL;
+ }
+
+ return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
+ g_unix_output_stream_new(fd, TRUE));
+}
+
+GIOStream *vdagent_socket_connect(const gchar *address, GError **err)
+{
+ GSocketConnection *socket_conn;
+ GSocketClient *client;
+ GSocketConnectable *connectable;
+
+ connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
+ client = g_object_new(G_TYPE_SOCKET_CLIENT,
+ "family", G_SOCKET_FAMILY_UNIX,
+ "type", G_SOCKET_TYPE_STREAM,
+ NULL);
+
+ socket_conn = g_socket_client_connect(client, connectable, NULL, err);
+ g_object_unref(client);
+ g_object_unref(connectable);
+ return G_IO_STREAM(socket_conn);
+}
+
+static void vdagent_connection_init(VDAgentConnection *conn)
+{
+ conn->cancellable = g_cancellable_new();
+ conn->write_queue = g_queue_new();
+ conn->flush_loop = NULL;
+ conn->read_buff = NULL;
+}
+
+static void vdagent_connection_dispose(GObject *obj)
+{
+ VDAgentConnection *conn = VDAGENT_CONNECTION(obj);
+ g_clear_object(&conn->cancellable);
+ g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+ g_clear_object(&conn->io_stream);
+
+ G_OBJECT_CLASS(vdagent_connection_parent_class)->dispose(obj);
+}
+
+static void vdagent_connection_finalize(GObject *obj)
+{
+ VDAgentConnection *conn = VDAGENT_CONNECTION(obj);
+ g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
+ g_free(conn->header_buff);
+ g_free(conn->read_buff);
+
+ G_OBJECT_CLASS(vdagent_connection_parent_class)->finalize(obj);
+}
+
+static void vdagent_connection_class_init(VDAgentConnectionClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+ gobject_class->dispose = vdagent_connection_dispose;
+ gobject_class->finalize = vdagent_connection_finalize;
+}
+
+VDAgentConnection *vdagent_connection_new(
+ GIOStream *io_stream,
+ gboolean wait_on_opening,
+ gsize header_size,
+ VDAgentConnHeaderReadCb header_read_cb,
+ VDAgentConnReadCb read_cb,
+ VDAgentConnErrorCb error_cb,
+ gpointer user_data)
+{
+ VDAgentConnection *conn;
+ conn = g_object_new(VDAGENT_TYPE_CONNECTION, NULL);
+ conn->io_stream = io_stream;
+ conn->opening = wait_on_opening;
+ conn->header_size = header_size;
+ conn->header_buff = g_malloc(header_size);
+ conn->header_read_cb = header_read_cb;
+ conn->read_cb = read_cb;
+ conn->error_cb = error_cb;
+ conn->user_data = user_data;
+
+ read_next_message(conn);
+
+ return conn;
+}
+
+void vdagent_connection_destroy(VDAgentConnection *conn)
+{
+ g_cancellable_cancel(conn->cancellable);
+ g_object_unref(conn);
+}
+
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn,
+ GError **err)
+{
+ g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
+
+ GSocketConnection *socket_conn = G_SOCKET_CONNECTION(conn->io_stream);
+ return g_socket_get_credentials(
+ g_socket_connection_get_socket(socket_conn), err);
+}
+
+static void message_write_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ VDAgentConnection *conn = user_data;
+ GOutputStream *out = G_OUTPUT_STREAM(source_object);
+ GError *err = NULL;
+
+ g_output_stream_write_all_finish(out, res, NULL, &err);
+ g_bytes_unref(g_queue_pop_head(conn->write_queue));
+
+ if (err) {
+ if (!g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+ conn->error_cb(err, conn->user_data);
+ g_error_free(err);
+ g_object_unref(conn);
+ return;
+ }
+ g_object_unref(conn);
+
+ conn->opening = FALSE;
+
+ if (g_queue_is_empty(conn->write_queue))
+ g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+ else
+ write_next_message(conn);
+}
+
+static void write_next_message(VDAgentConnection *conn)
+{
+ GBytes *msg;
+ GOutputStream *out;
+
+ msg = g_queue_peek_head(conn->write_queue);
+ out = g_io_stream_get_output_stream(conn->io_stream);
+
+ g_output_stream_write_all_async(out,
+ g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
+ G_PRIORITY_DEFAULT, conn->cancellable,
+ message_write_cb, g_object_ref(conn));
+}
+
+void vdagent_connection_write(VDAgentConnection *conn,
+ gpointer data,
+ gsize size)
+{
+ g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
+
+ if (g_queue_get_length(conn->write_queue) == 1)
+ write_next_message(conn);
+}
+
+void vdagent_connection_flush(VDAgentConnection *conn)
+{
+ GMainLoop *loop;
+ /* TODO: allow multiple flush calls at once? */
+ g_return_if_fail(conn->flush_loop == NULL);
+
+ if (g_queue_is_empty(conn->write_queue))
+ return;
+
+ loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
+ /* When using GTK+, this should be wrapped with
+ * gdk_threads_leave() and gdk_threads_enter(),
+ * but since flush is used in virtio-port.c only
+ * let's leave it as it is for now. */
+ g_main_loop_run(loop);
+ g_main_loop_unref(loop);
+}
+
+static void message_read_cb(GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ VDAgentConnection *conn = user_data;
+ GInputStream *in = G_INPUT_STREAM(source_object);
+ GError *err = NULL;
+ gsize bytes_read, data_size;
+
+ g_input_stream_read_all_finish(in, res, &bytes_read, &err);
+ if (err) {
+ if (!g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+ conn->error_cb(err, conn->user_data);
+ g_error_free(err);
+ g_object_unref(conn);
+ return;
+ }
+ g_object_unref(conn);
+ if (bytes_read == 0) {
+ /* see virtio-port.c for the rationale behind this */
+ if (conn->opening) {
+ g_usleep(10000);
+ read_next_message(conn);
+ } else {
+ conn->error_cb(NULL, conn->user_data);
+ }
+ return;
+ }
+ conn->opening = FALSE;
+
+ if (conn->read_buff == NULL) {
+ /* we've read the message header, now let's read its body */
+ if (conn->header_read_cb(conn->header_buff,
+ &data_size,
+ conn->user_data) == FALSE)
+ return;
+
+ if (data_size > 0) {
+ conn->read_buff = g_malloc(data_size);
+ g_input_stream_read_all_async(in,
+ conn->read_buff, data_size,
+ G_PRIORITY_DEFAULT, conn->cancellable,
+ message_read_cb, g_object_ref(conn));
+ return;
+ }
+ }
+
+ if (conn->read_cb(conn->header_buff,
+ conn->read_buff,
+ conn->user_data) == FALSE)
+ return;
+ g_clear_pointer(&conn->read_buff, g_free);
+ read_next_message(conn);
+}
+
+static void read_next_message(VDAgentConnection *conn)
+{
+ GInputStream *in;
+ in = g_io_stream_get_input_stream(conn->io_stream);
+
+ g_input_stream_read_all_async(in,
+ conn->header_buff, conn->header_size,
+ G_PRIORITY_DEFAULT, conn->cancellable,
+ message_read_cb, g_object_ref(conn));
+}
diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
new file mode 100644
index 0000000..fbfc2fb
--- /dev/null
+++ b/src/vdagent-connection.h
@@ -0,0 +1,124 @@
+/* vdagent-connection.h
+
+ Copyright 2018 Red Hat, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __VDAGENT_CONNECTION_H
+#define __VDAGENT_CONNECTION_H
+
+#include <glib.h>
+#include <gio/gio.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define VDAGENT_TYPE_CONNECTION (vdagent_connection_get_type())
+#define VDAGENT_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), VDAGENT_TYPE_CONNECTION, VDAgentConnection))
+#define VDAGENT_IS_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), VDAGENT_TYPE_CONNECTION))
+#define VDAGENT_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), VDAGENT_TYPE_CONNECTION, VDAgentConnectionClass))
+#define VDAGENT_IS_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), VDAGENT_TYPE_CONNECTION))
+#define VDAGENT_CONNECTION_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), VDAGENT_TYPE_CONNECTION, VDAgentConnectionClass))
+
+typedef struct VDAgentConnection VDAgentConnection;
+typedef struct VDAgentConnectionClass VDAgentConnectionClass;
+
+struct VDAgentConnectionClass {
+ GObjectClass parent_class;
+};
+
+GType vdagent_connection_get_type(void);
+
+/* Called when a message header has been read.
+ *
+ * If the handler wishes to continue reading,
+ * it must set @body_size to the size of message's body and return TRUE.
+ * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
+ *
+ * Otherwise the handler should return FALSE
+ * and call vdagent_connection_destroy().
+ *
+ * @header_buff is owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
+ gsize *body_size,
+ gpointer user_data);
+
+/* Called when a full message has been read.
+ *
+ * If the handler wished to continue reading, it must return TRUE,
+ * otherwise FALSE and call vdagent_connection_destroy().
+ *
+ * @header, @data are owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnReadCb)(gpointer header,
+ gpointer data,
+ gpointer user_data);
+
+/* Called when an error occured during read or write.
+ * If @err is NULL, the connection was closed by the remote side.
+ * The handler is expected to call vdagent_connection_destroy(). */
+typedef void (*VDAgentConnErrorCb)(GError *err, gpointer user_data);
+
+/* Open a file in @path for read and write.
+ * Returns a new GIOStream to the given file or NULL when @err is set. */
+GIOStream *vdagent_file_open(const gchar *path, GError **err);
+
+/* Create a socket and initiate a new connection to the socket on @address.
+ * Returns a new GIOStream or NULL when @err is set. */
+GIOStream *vdagent_socket_connect(const gchar *address, GError **err);
+
+/* Create new VDAgentConnection and start reading incoming messages.
+ *
+ * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
+ * until the first message is successfully read or written to the @io_stream.
+ *
+ * @user_data will be passed to the supplied callbacks. */
+VDAgentConnection *vdagent_connection_new(
+ GIOStream *io_stream,
+ gboolean wait_on_opening,
+ gsize header_size,
+ VDAgentConnHeaderReadCb header_read_cb,
+ VDAgentConnReadCb read_cb,
+ VDAgentConnErrorCb error_cb,
+ gpointer user_data);
+
+/* Free up all resources associated with the VDAgentConnection.
+ *
+ * This operation can be asynchronous. */
+void vdagent_connection_destroy(VDAgentConnection *conn);
+
+/* Append a message to the write queue.
+ *
+ * VDAgentConnection takes ownership of the @data
+ * and frees it once the message is flushed. */
+void vdagent_connection_write(VDAgentConnection *conn,
+ gpointer data,
+ gsize size);
+
+/* Waits until all queued messages get written to the output stream.
+ *
+ * Note: other GSources can be triggered during this call */
+void vdagent_connection_flush(VDAgentConnection *conn);
+
+/* Returns the credentials of the foreign process connected to the socket.
+ * The returned object must be freed using g_object_unref().
+ *
+ * It is an error to call this function with a VDAgentConnection
+ * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn,
+ GError **err);
+
+G_END_DECLS
+
+#endif
diff --git a/src/vdagent/vdagent.c b/src/vdagent/vdagent.c
index f7c8b72..d7e2aca 100644
--- a/src/vdagent/vdagent.c
+++ b/src/vdagent/vdagent.c
@@ -471,6 +471,9 @@ reconnect:
vdagent_destroy(agent);
agent = NULL;
+ /* allow the VDAgentConnection to close and finalize properly */
+ g_main_context_iteration(NULL, FALSE);
+
if (!quit && do_daemonize)
goto reconnect;
diff --git a/src/vdagentd/vdagentd.c b/src/vdagentd/vdagentd.c
index 99683da..f52f039 100644
--- a/src/vdagentd/vdagentd.c
+++ b/src/vdagentd/vdagentd.c
@@ -31,10 +31,9 @@
#include <errno.h>
#include <signal.h>
#include <syslog.h>
-#include <sys/select.h>
#include <sys/stat.h>
#include <spice/vd_agent.h>
-#include <glib.h>
+#include <glib-unix.h>
#ifdef WITH_SYSTEMD_SOCKET_ACTIVATION
#include <systemd/sd-daemon.h>
@@ -81,11 +80,18 @@ static const char *active_session = NULL;
static unsigned int session_count = 0;
static struct udscs_connection *active_session_conn = NULL;
static int agent_owns_clipboard[256] = { 0, };
-static int quit = 0;
static int retval = 0;
static int client_connected = 0;
static int max_clipboard = -1;
+static GMainLoop *loop;
+
+static void vdagentd_quit(gint exit_code)
+{
+ retval = exit_code;
+ g_main_loop_quit(loop);
+}
+
/* utility functions */
static void virtio_msg_uint32_to_le(uint8_t *_msg, uint32_t size, uint32_t offset)
{
@@ -168,8 +174,7 @@ void do_client_mouse(struct vdagentd_uinput **uinputp, VDAgentMouseState *mouse)
uinput_fake);
if (!*uinputp) {
syslog(LOG_CRIT, "Fatal uinput error");
- retval = 1;
- quit = 1;
+ vdagentd_quit(1);
}
}
}
@@ -510,6 +515,11 @@ static int virtio_port_read_complete(
VDAgentMessage *message_header,
uint8_t *data)
{
+ /* This callback could be invoked during vdagent_virtio_port_flush(),
+ * don't process any incoming messages when quitting. */
+ if (!g_main_loop_is_running(loop))
+ return 0;
+
if (!vdagent_message_check_size(message_header))
return 0;
@@ -565,6 +575,27 @@ static int virtio_port_read_complete(
return 0;
}
+static void virtio_port_disconnect_cb(struct vdagent_virtio_port *vport,
+ GError *err)
+{
+ if (err == NULL)
+ return;
+
+ gboolean old_client_connected = client_connected;
+ syslog(LOG_CRIT, "AIIEEE lost spice client connection, reconnecting: %s",
+ err->message);
+ virtio_port = vdagent_virtio_port_create(portdev,
+ virtio_port_read_complete,
+ virtio_port_disconnect_cb);
+ if (virtio_port == NULL) {
+ syslog(LOG_CRIT, "Fatal error opening vdagent virtio channel");
+ vdagentd_quit(1);
+ return;
+ }
+ do_client_disconnect();
+ client_connected = old_client_connected;
+}
+
static void virtio_write_clipboard(uint8_t selection, uint32_t msg_type,
uint32_t data_type, uint8_t *data, uint32_t data_size)
{
@@ -703,8 +734,7 @@ static void check_xorg_resolution(void)
agent_data->screen_count);
if (!uinput) {
syslog(LOG_CRIT, "Fatal uinput error");
- retval = 1;
- quit = 1;
+ vdagentd_quit(1);
return;
}
@@ -712,11 +742,10 @@ static void check_xorg_resolution(void)
syslog(LOG_INFO, "opening vdagent virtio channel");
virtio_port = vdagent_virtio_port_create(portdev,
virtio_port_read_complete,
- NULL);
+ virtio_port_disconnect_cb);
if (!virtio_port) {
syslog(LOG_CRIT, "Fatal error opening vdagent virtio channel");
- retval = 1;
- quit = 1;
+ vdagentd_quit(1);
return;
}
send_capabilities(virtio_port, 1);
@@ -726,6 +755,11 @@ static void check_xorg_resolution(void)
vdagentd_uinput_destroy(&uinput);
#endif
if (virtio_port) {
+ if (only_once) {
+ syslog(LOG_INFO, "Exiting after one client session.");
+ vdagentd_quit(0);
+ return;
+ }
vdagent_virtio_port_flush(&virtio_port);
vdagent_virtio_port_destroy(&virtio_port);
syslog(LOG_INFO, "closed vdagent virtio channel");
@@ -939,6 +973,15 @@ static void agent_read_complete(struct udscs_connection **connp,
}
}
+static gboolean si_io_channel_cb(GIOChannel *source,
+ GIOCondition condition,
+ gpointer data)
+{
+ active_session = session_info_get_active_session(session_info);
+ update_active_session_connection(NULL);
+ return G_SOURCE_CONTINUE;
+}
+
/* main */
static void daemonize(void)
@@ -967,76 +1010,10 @@ static void daemonize(void)
}
}
-static void main_loop(void)
-{
- fd_set readfds, writefds;
- int n, nfds;
- int ck_fd = 0;
- int once = 0;
-
- while (!quit) {
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
-
- nfds = udscs_server_fill_fds(server, &readfds, &writefds);
- n = vdagent_virtio_port_fill_fds(virtio_port, &readfds, &writefds);
- if (n >= nfds)
- nfds = n + 1;
-
- if (session_info) {
- ck_fd = session_info_get_fd(session_info);
- FD_SET(ck_fd, &readfds);
- if (ck_fd >= nfds)
- nfds = ck_fd + 1;
- }
-
- n = select(nfds, &readfds, &writefds, NULL, NULL);
- if (n == -1) {
- if (errno == EINTR)
- continue;
- syslog(LOG_CRIT, "Fatal error select: %m");
- retval = 1;
- break;
- }
-
- udscs_server_handle_fds(server, &readfds, &writefds);
-
- if (virtio_port) {
- once = 1;
- vdagent_virtio_port_handle_fds(&virtio_port, &readfds, &writefds);
- if (!virtio_port) {
- int old_client_connected = client_connected;
- syslog(LOG_CRIT,
- "AIIEEE lost spice client connection, reconnecting");
- virtio_port = vdagent_virtio_port_create(portdev,
- virtio_port_read_complete,
- NULL);
- if (!virtio_port) {
- syslog(LOG_CRIT,
- "Fatal error opening vdagent virtio channel");
- retval = 1;
- break;
- }
- do_client_disconnect();
- client_connected = old_client_connected;
- }
- }
- else if (only_once && once)
- {
- syslog(LOG_INFO, "Exiting after one client session.");
- break;
- }
-
- if (session_info && FD_ISSET(ck_fd, &readfds)) {
- active_session = session_info_get_active_session(session_info);
- update_active_session_connection(NULL);
- }
- }
-}
-
-static void quit_handler(int sig)
+static gboolean signal_handler(gpointer user_data)
{
- quit = 1;
+ vdagentd_quit(0);
+ return G_SOURCE_REMOVE;
}
static gboolean parse_debug_level_cb(const gchar *option_name,
@@ -1090,8 +1067,8 @@ int main(int argc, char *argv[])
{
GOptionContext *context;
GError *err = NULL;
- struct sigaction act;
gboolean own_socket = TRUE;
+ GIOChannel *si_io_channel = NULL;
context = g_option_context_new(NULL);
g_option_context_add_main_entries(context, cmd_entries, NULL);
@@ -1116,13 +1093,9 @@ int main(int argc, char *argv[])
uinput_device = g_strdup(DEFAULT_UINPUT_DEVICE);
}
- memset(&act, 0, sizeof(act));
- act.sa_flags = SA_RESTART;
- act.sa_handler = quit_handler;
- sigaction(SIGINT, &act, NULL);
- sigaction(SIGHUP, &act, NULL);
- sigaction(SIGTERM, &act, NULL);
- sigaction(SIGQUIT, &act, NULL);
+ g_unix_signal_add(SIGINT, signal_handler, NULL);
+ g_unix_signal_add(SIGHUP, signal_handler, NULL);
+ g_unix_signal_add(SIGTERM, signal_handler, NULL);
openlog("spice-vdagentd", do_daemonize ? 0 : LOG_PERROR, LOG_USER);
@@ -1154,7 +1127,7 @@ int main(int argc, char *argv[])
syslog(LOG_CRIT, "Fatal the server socket %s exists already. Delete it?",
vdagentd_socket);
} else {
- syslog(LOG_CRIT, "Fatal could not create the server socket %s: %m",
+ syslog(LOG_CRIT, "Fatal could not create the server socket %s",
vdagentd_socket);
}
return 1;
@@ -1184,19 +1157,31 @@ int main(int argc, char *argv[])
if (want_session_info)
session_info = session_info_create(debug);
- if (!session_info)
+ if (session_info) {
+ si_io_channel = g_io_channel_unix_new(
+ session_info_get_fd(session_info));
+ g_io_add_watch(si_io_channel, G_IO_IN, si_io_channel_cb, NULL);
+ } else
syslog(LOG_WARNING, "no session info, max 1 session agent allowed");
active_xfers = g_hash_table_new(g_direct_hash, g_direct_equal);
- main_loop();
+
+ loop = g_main_loop_new(NULL, FALSE);
+ g_main_loop_run(loop);
release_clipboards();
vdagentd_uinput_destroy(&uinput);
+ g_clear_pointer(&si_io_channel, g_io_channel_unref);
+ g_clear_pointer(&session_info, session_info_destroy);
+ g_clear_pointer(&server, udscs_destroy_server);
vdagent_virtio_port_flush(&virtio_port);
vdagent_virtio_port_destroy(&virtio_port);
- session_info_destroy(session_info);
- udscs_destroy_server(server);
+
+ /* allow the VDAgentConnection(s) to close and finalize properly */
+ g_main_context_iteration(NULL, FALSE);
+
+ g_main_loop_unref(loop);
/* leave the socket around if it was provided by systemd */
if (own_socket) {
diff --git a/src/vdagentd/virtio-port.c b/src/vdagentd/virtio-port.c
index 497811e..5e65a7f 100644
--- a/src/vdagentd/virtio-port.c
+++ b/src/vdagentd/virtio-port.c
@@ -19,28 +19,20 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <syslog.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <glib.h>
#include <gio/gio.h>
+#include <glib-unix.h>
+#include "vdagent-connection.h"
#include "virtio-port.h"
struct vdagent_virtio_port_buf {
uint8_t *buf;
- size_t pos;
size_t size;
size_t write_pos;
-
- struct vdagent_virtio_port_buf *next;
};
/* Data to keep track of the assembling of vdagent messages per chunk port,
@@ -53,78 +45,126 @@ struct vdagent_virtio_port_chunk_port_data {
};
struct vdagent_virtio_port {
- int fd;
- int opening;
- int is_uds;
-
- /* Chunk read stuff, single buffer, separate header and data buffer */
- int chunk_header_read;
- int chunk_data_pos;
- VDIChunkHeader chunk_header;
- uint8_t chunk_data[VD_AGENT_MAX_DATA_SIZE];
+ VDAgentConnection *conn;
/* Per chunk port data */
struct vdagent_virtio_port_chunk_port_data port_data[VDP_END_PORT];
- /* Writes are stored in a linked list of buffers, with both the header
- + data for a single message in 1 buffer. */
- struct vdagent_virtio_port_buf *write_buf;
+ struct vdagent_virtio_port_buf write_buf;
/* Callbacks */
vdagent_virtio_port_read_callback read_callback;
vdagent_virtio_port_disconnect_callback disconnect_callback;
};
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp);
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp);
+static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp,
+ VDIChunkHeader *chunk_header,
+ gconstpointer chunk_data);
+
+static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
+ GError *err);
+
+static gboolean conn_header_read_cb(gpointer header_buff,
+ gsize *body_size,
+ gpointer user_data)
+{
+ struct vdagent_virtio_port *vport = user_data;
+ VDIChunkHeader *header = header_buff;
+ GError *err;
+
+ header->size = GUINT32_FROM_LE(header->size);
+ header->port = GUINT32_FROM_LE(header->port);
+
+ if (header->size > VD_AGENT_MAX_DATA_SIZE) {
+ err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+ "chunk size %u too large", header->size);
+ virtio_port_destroy(&vport, err);
+ return FALSE;
+ }
+ if (header->port >= VDP_END_PORT) {
+ err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
+ "chunk port %u out of range", header->port);
+ virtio_port_destroy(&vport, err);
+ return FALSE;
+ }
+
+ *body_size = header->size;
+ return TRUE;
+}
+
+static gboolean conn_read_cb(gpointer header,
+ gpointer data,
+ gpointer user_data)
+{
+ struct vdagent_virtio_port *vport = user_data;
+ vdagent_virtio_port_do_chunk(&vport, header, data);
+ return vport != NULL;
+}
+
+static void conn_error_cb(GError *err, gpointer user_data)
+{
+ struct vdagent_virtio_port *vport = user_data;
+ virtio_port_destroy(&vport, err ? g_error_copy(err) : NULL);
+}
struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
vdagent_virtio_port_read_callback read_callback,
vdagent_virtio_port_disconnect_callback disconnect_callback)
{
struct vdagent_virtio_port *vport;
- struct sockaddr_un address;
- int c;
-
- vport = g_new0(struct vdagent_virtio_port, 1);
-
- vport->fd = open(portname, O_RDWR);
- if (vport->fd == -1) {
- vport->fd = socket(PF_UNIX, SOCK_STREAM, 0);
- if (vport->fd == -1) {
- goto error;
- }
- address.sun_family = AF_UNIX;
- snprintf(address.sun_path, sizeof(address.sun_path), "%s", portname);
- c = connect(vport->fd, (struct sockaddr *)&address, sizeof(address));
- if (c == 0) {
- vport->is_uds = 1;
- } else {
- goto error;
+ GIOStream *io_stream;
+ GError *err = NULL;
+
+ io_stream = vdagent_file_open(portname, &err);
+ if (err) {
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_clear_error(&err);
+ io_stream = vdagent_socket_connect(portname, &err);
+ if (err) {
+ syslog(LOG_ERR, "%s: %s", __func__, err->message);
+ g_error_free(err);
+ return NULL;
}
- } else {
- vport->is_uds = 0;
}
- vport->opening = 1;
+ vport = g_new0(struct vdagent_virtio_port, 1);
+
+ /* When calling vdagent_connection_new(),
+ * @wait_on_opening MUST be set to TRUE:
+ *
+ * When we open the virtio serial port, the following happens:
+ * 1) The linux kernel virtio_console driver sends a
+ * VIRTIO_CONSOLE_PORT_OPEN message to qemu
+ * 2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
+ * register the agent chardev with the spice-server
+ * 3) spice-server then calls the spicevmc chardev driver's state
+ * callback to let it know it is ready to receive data
+ * 4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
+ * chardev backend
+ * 5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
+ * to the linux kernel virtio_console driver
+ *
+ * Until steps 1 - 5 have completed the linux kernel virtio_console
+ * driver sees the virtio serial port as being in a disconnected state
+ * and read will return 0 ! So if we blindly assume that a read 0 means
+ * that the channel is closed we will hit a race here.
+ */
+ vport->conn = vdagent_connection_new(io_stream,
+ TRUE,
+ sizeof(VDIChunkHeader),
+ conn_header_read_cb,
+ conn_read_cb,
+ conn_error_cb,
+ vport);
vport->read_callback = read_callback;
vport->disconnect_callback = disconnect_callback;
return vport;
-
-error:
- syslog(LOG_ERR, "open %s: %m", portname);
- if (vport->fd != -1) {
- close(vport->fd);
- }
- g_free(vport);
- return NULL;
}
static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
GError *err)
{
- struct vdagent_virtio_port_buf *wbuf, *next_wbuf;
struct vdagent_virtio_port *vport = *vportp;
int i;
@@ -136,19 +176,13 @@ static void virtio_port_destroy(struct vdagent_virtio_port **vportp,
g_clear_error(&err);
- wbuf = vport->write_buf;
- while (wbuf) {
- next_wbuf = wbuf->next;
- g_free(wbuf->buf);
- g_free(wbuf);
- wbuf = next_wbuf;
- }
+ g_free(vport->write_buf.buf);
for (i = 0; i < VDP_END_PORT; i++) {
g_free(vport->port_data[i].message_data);
}
- close(vport->fd);
+ vdagent_connection_destroy(vport->conn);
g_clear_pointer(vportp, g_free);
}
@@ -157,47 +191,6 @@ void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp)
virtio_port_destroy(vportp, NULL);
}
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
- fd_set *readfds, fd_set *writefds)
-{
- if (!vport)
- return -1;
-
- FD_SET(vport->fd, readfds);
- if (vport->write_buf)
- FD_SET(vport->fd, writefds);
-
- return vport->fd + 1;
-}
-
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
- fd_set *readfds, fd_set *writefds)
-{
- if (!*vportp)
- return;
-
- if (FD_ISSET((*vportp)->fd, readfds))
- vdagent_virtio_port_do_read(vportp);
-
- if (*vportp && FD_ISSET((*vportp)->fd, writefds))
- vdagent_virtio_port_do_write(vportp);
-}
-
-static struct vdagent_virtio_port_buf* vdagent_virtio_port_get_last_wbuf(
- struct vdagent_virtio_port *vport)
-{
- struct vdagent_virtio_port_buf *wbuf;
-
- wbuf = vport->write_buf;
- if (!wbuf)
- return NULL;
-
- while (wbuf->next)
- wbuf = wbuf->next;
-
- return wbuf;
-}
-
void vdagent_virtio_port_write_start(
struct vdagent_virtio_port *vport,
uint32_t port_nr,
@@ -205,15 +198,15 @@ void vdagent_virtio_port_write_start(
uint32_t message_opaque,
uint32_t data_size)
{
- struct vdagent_virtio_port_buf *wbuf, *new_wbuf;
+ struct vdagent_virtio_port_buf *new_wbuf;
VDIChunkHeader chunk_header;
VDAgentMessage message_header;
- new_wbuf = g_new(struct vdagent_virtio_port_buf, 1);
- new_wbuf->pos = 0;
+ g_return_if_fail(vport->write_buf.buf == NULL);
+
+ new_wbuf = &vport->write_buf;
new_wbuf->write_pos = 0;
new_wbuf->size = sizeof(chunk_header) + sizeof(message_header) + data_size;
- new_wbuf->next = NULL;
new_wbuf->buf = g_malloc(new_wbuf->size);
chunk_header.port = GUINT32_TO_LE(port_nr);
@@ -229,14 +222,6 @@ void vdagent_virtio_port_write_start(
memcpy(new_wbuf->buf + new_wbuf->write_pos, &message_header,
sizeof(message_header));
new_wbuf->write_pos += sizeof(message_header);
-
- if (!vport->write_buf) {
- vport->write_buf = new_wbuf;
- return;
- }
-
- wbuf = vdagent_virtio_port_get_last_wbuf(vport);
- wbuf->next = new_wbuf;
}
int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
@@ -244,8 +229,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
{
struct vdagent_virtio_port_buf *wbuf;
- wbuf = vdagent_virtio_port_get_last_wbuf(vport);
- if (!wbuf) {
+ if (size == 0)
+ return 0;
+
+ wbuf = &vport->write_buf;
+ if (!wbuf->buf) {
syslog(LOG_ERR, "can't append without a buffer");
return -1;
}
@@ -257,6 +245,11 @@ int vdagent_virtio_port_write_append(struct vdagent_virtio_port *vport,
memcpy(wbuf->buf + wbuf->write_pos, data, size);
wbuf->write_pos += size;
+
+ if (wbuf->write_pos == wbuf->size) {
+ vdagent_connection_write(vport->conn, wbuf->buf, wbuf->size);
+ wbuf->buf = NULL;
+ }
return 0;
}
@@ -275,8 +268,8 @@ void vdagent_virtio_port_write(
void vdagent_virtio_port_flush(struct vdagent_virtio_port **vportp)
{
- while (*vportp && (*vportp)->write_buf)
- vdagent_virtio_port_do_write(vportp);
+ if (*vportp)
+ vdagent_connection_flush((*vportp)->conn);
}
void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
@@ -289,20 +282,22 @@ void vdagent_virtio_port_reset(struct vdagent_virtio_port *vport, int port)
memset(&vport->port_data[port], 0, sizeof(vport->port_data[0]));
}
-static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
+static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp,
+ VDIChunkHeader *chunk_header,
+ gconstpointer chunk_data)
{
int avail, read, pos = 0;
struct vdagent_virtio_port *vport = *vportp;
struct vdagent_virtio_port_chunk_port_data *port =
- &vport->port_data[vport->chunk_header.port];
+ &vport->port_data[chunk_header->port];
if (port->message_header_read < sizeof(port->message_header)) {
read = sizeof(port->message_header) - port->message_header_read;
- if (read > vport->chunk_header.size) {
- read = vport->chunk_header.size;
+ if (read > chunk_header->size) {
+ read = chunk_header->size;
}
memcpy((uint8_t *)&port->message_header + port->message_header_read,
- vport->chunk_data, read);
+ chunk_data, read);
port->message_header_read += read;
if (port->message_header_read == sizeof(port->message_header)) {
@@ -320,7 +315,7 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
if (port->message_header_read == sizeof(port->message_header)) {
read = port->message_header.size - port->message_data_pos;
- avail = vport->chunk_header.size - pos;
+ avail = chunk_header->size - pos;
if (avail > read) {
GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
@@ -334,13 +329,13 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
if (read) {
memcpy(port->message_data + port->message_data_pos,
- vport->chunk_data + pos, read);
+ chunk_data + pos, read);
port->message_data_pos += read;
}
if (port->message_data_pos == port->message_header.size) {
if (vport->read_callback) {
- int r = vport->read_callback(vport, vport->chunk_header.port,
+ int r = vport->read_callback(vport, chunk_header->port,
&port->message_header, port->message_data);
if (r == -1) {
virtio_port_destroy(vportp, NULL);
@@ -353,143 +348,3 @@ static void vdagent_virtio_port_do_chunk(struct vdagent_virtio_port **vportp)
}
}
}
-
-static int vport_read(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
- if (vport->is_uds) {
- return recv(vport->fd, buf, len, 0);
- } else {
- return read(vport->fd, buf, len);
- }
-}
-
-static void vdagent_virtio_port_do_read(struct vdagent_virtio_port **vportp)
-{
- ssize_t n;
- size_t to_read;
- uint8_t *dest;
- struct vdagent_virtio_port *vport = *vportp;
-
- if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
- to_read = sizeof(vport->chunk_header) - vport->chunk_header_read;
- dest = (uint8_t *)&vport->chunk_header + vport->chunk_header_read;
- } else {
- to_read = vport->chunk_header.size - vport->chunk_data_pos;
- dest = vport->chunk_data + vport->chunk_data_pos;
- }
-
- n = vport_read(vport, dest, to_read);
- if (n < 0) {
- if (errno == EINTR)
- return;
- }
- if (n == 0 && vport->opening) {
- /* When we open the virtio serial port, the following happens:
- 1) The linux kernel virtio_console driver sends a
- VIRTIO_CONSOLE_PORT_OPEN message to qemu
- 2) qemu's spicevmc chardev driver calls qemu_spice_add_interface to
- register the agent chardev with the spice-server
- 3) spice-server then calls the spicevmc chardev driver's state
- callback to let it know it is ready to receive data
- 4) The state callback sends a CHR_EVENT_OPENED to the virtio-console
- chardev backend
- 5) The virtio-console chardev backend sends VIRTIO_CONSOLE_PORT_OPEN
- to the linux kernel virtio_console driver
-
- Until steps 1 - 5 have completed the linux kernel virtio_console
- driver sees the virtio serial port as being in a disconnected state
- and read will return 0 ! So if we blindly assume that a read 0 means
- that the channel is closed we will hit a race here.
-
- Therefore we ignore read returning 0 until we've successfully read
- or written some data. If we hit this race we also sleep a bit here
- to avoid busy waiting until the above steps complete */
- usleep(10000);
- return;
- }
- if (n <= 0) {
- GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
- "reading from vdagent virtio port: %m");
- virtio_port_destroy(vportp, err);
- return;
- }
- vport->opening = 0;
-
- if (vport->chunk_header_read < sizeof(vport->chunk_header)) {
- vport->chunk_header_read += n;
- if (vport->chunk_header_read == sizeof(vport->chunk_header)) {
- vport->chunk_header.size = GUINT32_FROM_LE(vport->chunk_header.size);
- vport->chunk_header.port = GUINT32_FROM_LE(vport->chunk_header.port);
- if (vport->chunk_header.size > VD_AGENT_MAX_DATA_SIZE) {
- GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
- "chunk size %u too large",
- vport->chunk_header.size);
- virtio_port_destroy(vportp, err);
- return;
- }
- if (vport->chunk_header.port >= VDP_END_PORT) {
- GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
- "chunk port %u out of range",
- vport->chunk_header.port);
- virtio_port_destroy(vportp, err);
- return;
- }
- }
- } else {
- vport->chunk_data_pos += n;
- if (vport->chunk_data_pos == vport->chunk_header.size) {
- vdagent_virtio_port_do_chunk(vportp);
- if (!*vportp)
- return;
- vport->chunk_header_read = 0;
- vport->chunk_data_pos = 0;
- }
- }
-}
-
-static int vport_write(struct vdagent_virtio_port *vport, uint8_t *buf, int len)
-{
- if (vport->is_uds) {
- return send(vport->fd, buf, len, 0);
- } else {
- return write(vport->fd, buf, len);
- }
-}
-
-static void vdagent_virtio_port_do_write(struct vdagent_virtio_port **vportp)
-{
- ssize_t n;
- size_t to_write;
- struct vdagent_virtio_port *vport = *vportp;
-
- struct vdagent_virtio_port_buf* wbuf = vport->write_buf;
- if (!wbuf) {
- syslog(LOG_ERR, "do_write called on a port without a write buf ?!");
- return;
- }
-
- if (wbuf->write_pos != wbuf->size) {
- syslog(LOG_ERR, "do_write: buffer is incomplete!!");
- return;
- }
-
- to_write = wbuf->size - wbuf->pos;
- n = vport_write(vport, wbuf->buf + wbuf->pos, to_write);
- if (n < 0) {
- if (errno == EINTR)
- return;
- GError *err = g_error_new(G_IO_ERROR, G_IO_ERROR_FAILED,
- "writing to vdagent virtio port: %m");
- virtio_port_destroy(vportp, err);
- return;
- }
- if (n > 0)
- vport->opening = 0;
-
- wbuf->pos += n;
- if (wbuf->pos == wbuf->size) {
- vport->write_buf = wbuf->next;
- g_free(wbuf->buf);
- g_free(wbuf);
- }
-}
diff --git a/src/vdagentd/virtio-port.h b/src/vdagentd/virtio-port.h
index dfbe27b..7d14deb 100644
--- a/src/vdagentd/virtio-port.h
+++ b/src/vdagentd/virtio-port.h
@@ -22,9 +22,7 @@
#ifndef __VIRTIO_PORT_H
#define __VIRTIO_PORT_H
-#include <stdio.h>
#include <stdint.h>
-#include <sys/select.h>
#include <spice/vd_agent.h>
struct vdagent_virtio_port;
@@ -57,22 +55,6 @@ struct vdagent_virtio_port *vdagent_virtio_port_create(const char *portname,
/* The contents of portp will be made NULL */
void vdagent_virtio_port_destroy(struct vdagent_virtio_port **vportp);
-
-/* Given a vdagent_virtio_port fill the fd_sets pointed to by readfds and
- writefds for select() usage.
-
- Return value: value of the highest fd + 1 */
-int vdagent_virtio_port_fill_fds(struct vdagent_virtio_port *vport,
- fd_set *readfds, fd_set *writefds);
-
-/* Handle any events flagged by select for the given vdagent_virtio_port.
- Note the port may be destroyed (when disconnected) by this call
- in this case the disconnect calllback will get called before the
- destruction and the contents of connp will be made NULL */
-void vdagent_virtio_port_handle_fds(struct vdagent_virtio_port **vportp,
- fd_set *readfds, fd_set *writefds);
-
-
/* Queue a message for delivery, either bit by bit, or all at once */
void vdagent_virtio_port_write_start(
struct vdagent_virtio_port *vport,
--
2.17.1
More information about the Spice-devel
mailing list