[Spice-devel] [PATCH v2 2/6] server: introduce dispatcher
Alon Levy
alevy at redhat.com
Mon Nov 7 03:44:42 PST 2011
used for main_dispatcher only in this patch.
Dispatcher is meant to be used for Main<->any low frequency messages.
It's interface is meant to include the red_dispatcher usage:
fixed size messages per message type
some messages require an ack
Some methods are added to be used by RedDispatcher later:
dispatcher_handle_read - to be called directly by RedDispatcher epoll
based loop
dispatcher_set_opaque - to be set from red_worker pthread
dispatcher_init - allow NULL core as used by red_worker
Read and Write behavior: (changed from RFC)
Sender: blocking write, blocking read for ack (if any).
Reader: non blocking check for 4 bytes (message type is uint32_t),
blocking read for the rest of the message, repeat until fail to
get message_type.
FDO Bugzilla: 42463
v1->v2:
drop O_NONBLOCK, use poll (Paolo Bonzini)
write_size/written_size (Paolo)
fold red_dispatcher change into dispatcher introduction
RFC->v1:
read_safe, write_safe instead of read_with_eintr (Paolo Bonzini)
read_safe can be blocking or non blocking
statics where missing, fix read_safe arithmetic (Yonit Halperin)
dispatcher_handle_read: read until no data available. will block
if the start of a message is available. (Yonit)
renamed DispatcherMessage's send_ack to ack (Yonit)
---
server/Makefile.am | 2 +
server/dispatcher.c | 206 ++++++++++++++++++++++++++++++++++++++++++++++
server/dispatcher.h | 81 ++++++++++++++++++
server/main_dispatcher.c | 102 ++++++-----------------
4 files changed, 316 insertions(+), 75 deletions(-)
create mode 100644 server/dispatcher.c
create mode 100644 server/dispatcher.h
diff --git a/server/Makefile.am b/server/Makefile.am
index 418d707..34a6b47 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -78,6 +78,8 @@ libspice_server_la_SOURCES = \
red_client_cache.h \
red_client_shared_cache.h \
red_common.h \
+ dispatcher.c \
+ dispatcher.h \
red_dispatcher.c \
red_dispatcher.h \
main_dispatcher.c \
diff --git a/server/dispatcher.c b/server/dispatcher.c
new file mode 100644
index 0000000..e2973a4
--- /dev/null
+++ b/server/dispatcher.c
@@ -0,0 +1,206 @@
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include "mem.h"
+#include "spice_common.h"
+#include "dispatcher.h"
+
+#define ACK 0xffffffff
+
+/*
+ * read_safe
+ * helper. reads until size bytes accumulated in buf, if an error other then
+ * EINTR is encountered returns -1, otherwise returns 0.
+ * @block if 1 the read will block (the fd is always blocking).
+ * if 0 poll first, return immediately if no bytes available, otherwise
+ * read size in blocking mode.
+ */
+static int read_safe(int fd, void *buf, size_t size, int block)
+{
+ int read_size = 0;
+ int ret;
+ struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0};
+
+ if (!block) {
+ ret = poll(&pollfd, 1, 0);
+ if (ret == -1) {
+ return -1;
+ }
+ if (!(pollfd.revents & POLLIN)) {
+ return 0;
+ }
+ }
+ while (read_size < size) {
+ ret = read(fd, buf + read_size, size - read_size);
+ if (ret == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return -1;
+ }
+ if (ret == 0) {
+ red_error("broken pipe on read");
+ errno = ECONNRESET; /* huh? */
+ return -1;
+ }
+ read_size += ret;
+ }
+ return read_size;
+}
+
+/*
+ * write_safe
+ * @return -1 for error, otherwise number of written bytes. may be zero.
+ */
+static int write_safe(int fd, void *buf, size_t size)
+{
+ int written_size = 0;
+ int ret;
+
+ while (written_size < size) {
+ ret = write(fd, buf + written_size, size - written_size);
+ if (ret == -1) {
+ if (errno != EINTR) {
+ return -1;
+ }
+ continue;
+ }
+ written_size += ret;
+ }
+ return written_size;
+}
+
+static int dispatcher_handle_single_read(Dispatcher *dispatcher)
+{
+ int ret;
+ uint32_t type;
+ DispatcherMessage *msg = NULL;
+ uint8_t *payload = dispatcher->payload;
+ uint32_t ack = ACK;
+
+ if ((ret = read_safe(dispatcher->recv_fd, &type, sizeof(type), 0)) == -1) {
+ red_printf("error reading from dispatcher: %d\n", errno);
+ return 0;
+ }
+ if (ret == 0) {
+ /* no messsage */
+ return 0;
+ }
+ msg = &dispatcher->messages[type];
+ /* we call read_safe even if msg->size == 0, this has the side
+ * effect of setting the fd to block, so the remaining send_data
+ * users will get a blocking fd as expected. */
+ if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
+ red_printf("error reading from dispatcher: %d\n", errno);
+ /* TODO: close socketpair? */
+ return 0;
+ }
+ if (msg->handler) {
+ msg->handler(dispatcher->opaque, (void *)payload);
+ } else {
+ red_printf("error: no handler for message type %d\n", type);
+ }
+ if (msg->ack && write_safe(dispatcher->recv_fd,
+ &ack, sizeof(ack)) == -1) {
+ red_printf("error writing ack for message %d\n", type);
+ /* TODO: close socketpair? */
+ }
+ return 1;
+}
+
+/*
+ * dispatcher_handle_recv_read
+ * doesn't handle being in the middle of a message. all reads are blocking.
+ */
+void dispatcher_handle_recv_read(Dispatcher *dispatcher)
+{
+ while (dispatcher_handle_single_read(dispatcher)) {
+ }
+}
+
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+ void *payload)
+{
+ DispatcherMessage *msg;
+ uint32_t ack;
+ int send_fd = dispatcher->send_fd;
+
+ assert(dispatcher->max_message_type > message_type);
+ assert(dispatcher->messages[message_type].handler);
+ msg = &dispatcher->messages[message_type];
+ pthread_mutex_lock(&dispatcher->lock);
+ if (write_safe(send_fd, &message_type, sizeof(message_type)) == -1) {
+ red_printf("error: failed to send message type for message %d\n",
+ message_type);
+ goto unlock;
+ }
+ if (write_safe(send_fd, payload, msg->size) == -1) {
+ red_printf("error: failed to send message body for message %d\n",
+ message_type);
+ goto unlock;
+ }
+ if (msg->ack) {
+ if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) {
+ }
+ if (ack != ACK) {
+ red_printf("error: got wrong ack value in dispatcher "
+ "for message %d\n", message_type);
+ /* TODO handling error? */
+ }
+ }
+unlock:
+ pthread_mutex_unlock(&dispatcher->lock);
+}
+
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+ dispatcher_handle_message handler, size_t size,
+ int ack)
+{
+ DispatcherMessage *msg;
+
+ assert(message_type < dispatcher->max_message_type);
+ assert(dispatcher->messages[message_type].handler == 0);
+ msg = &dispatcher->messages[message_type];
+ msg->handler = handler;
+ msg->size = size;
+ msg->ack = ack;
+ if (msg->size > dispatcher->payload_size) {
+ dispatcher->payload = realloc(dispatcher->payload, msg->size);
+ dispatcher->payload_size = msg->size;
+ }
+}
+
+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
+ void *opaque)
+{
+ int channels[2];
+
+ dispatcher->opaque = opaque;
+ if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
+ red_error("socketpair failed %s", strerror(errno));
+ return;
+ }
+ pthread_mutex_init(&dispatcher->lock, NULL);
+ dispatcher->recv_fd = channels[0];
+ dispatcher->send_fd = channels[1];
+ dispatcher->self = pthread_self();
+
+ dispatcher->messages = spice_malloc0_n(max_message_type,
+ sizeof(dispatcher->messages[0]));
+ dispatcher->max_message_type = max_message_type;
+}
+
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
+{
+ dispatcher->opaque = opaque;
+}
+
+int dispatcher_get_recv_fd(Dispatcher *dispatcher)
+{
+ return dispatcher->recv_fd;
+}
diff --git a/server/dispatcher.h b/server/dispatcher.h
new file mode 100644
index 0000000..04e6b46
--- /dev/null
+++ b/server/dispatcher.h
@@ -0,0 +1,81 @@
+#ifndef MAIN_DISPATCHER_H
+#define MAIN_DISPATCHER_H
+
+#include <spice.h>
+
+typedef struct Dispatcher Dispatcher;
+
+typedef void (*dispatcher_handle_message)(void *opaque,
+ void *payload);
+
+typedef struct DispatcherMessage {
+ size_t size;
+ int ack;
+ dispatcher_handle_message handler;
+} DispatcherMessage;
+
+struct Dispatcher {
+ SpiceCoreInterface *recv_core;
+ int recv_fd;
+ int send_fd;
+ pthread_t self;
+ pthread_mutex_t lock;
+ DispatcherMessage *messages;
+ int stage; /* message parser stage - sender has no stages */
+ size_t max_message_type;
+ void *payload; /* allocated as max of message sizes */
+ size_t payload_size; /* used to track realloc calls */
+ void *opaque;
+};
+
+/*
+ * dispatcher_send_message
+ * @message_type: message type
+ * @payload: payload
+ */
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+ void *payload);
+
+/*
+ * dispatcher_init
+ * @max_message_type: number of message types. Allows upfront allocation
+ * of a DispatcherMessage list.
+ * up front, and registration in any order wanted.
+ */
+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
+ void *opaque);
+
+/*
+ * dispatcher_register_handler
+ * @dispatcher: dispatcher
+ * @messsage_type: message type
+ * @handler: message handler
+ * @size: message size. Each type has a fixed associated size.
+ * @ack: send an ack. This is per message type - you can't send the
+ * same message type with and without. Register two different
+ * messages if that is what you want.
+ */
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+ dispatcher_handle_message handler, size_t size,
+ int ack);
+
+/*
+ * dispatcher_handle_recv_read
+ * @dispatcher: Dispatcher instance
+ */
+void dispatcher_handle_recv_read(Dispatcher *);
+
+/*
+ * dispatcher_get_recv_fd
+ * @return: receive file descriptor of the dispatcher
+ */
+int dispatcher_get_recv_fd(Dispatcher *);
+
+/*
+ * dispatcher_set_opaque
+ * @dispatcher: Dispatcher instance
+ * @opaque: opaque to use for callbacks
+ */
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
+
+#endif //MAIN_DISPATCHER_H
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
index 73856bf..a5967fa 100644
--- a/server/main_dispatcher.c
+++ b/server/main_dispatcher.c
@@ -5,6 +5,7 @@
#include <assert.h>
#include "red_common.h"
+#include "dispatcher.h"
#include "main_dispatcher.h"
/*
@@ -28,11 +29,8 @@
*/
typedef struct {
+ Dispatcher base;
SpiceCoreInterface *core;
- int main_fd;
- int other_fd;
- pthread_t self;
- pthread_mutex_t lock;
} MainDispatcher;
MainDispatcher main_dispatcher;
@@ -43,15 +41,10 @@ enum {
MAIN_DISPATCHER_NUM_MESSAGES
};
-typedef struct MainDispatcherMessage {
- uint32_t type;
- union {
- struct {
- int event;
- SpiceChannelEventInfo *info;
- } channel_event;
- } data;
-} MainDispatcherMessage;
+typedef struct MainDispatcherChannelEventMessage {
+ int event;
+ SpiceChannelEventInfo *info;
+} MainDispatcherChannelEventMessage;
/* channel_event - calls core->channel_event, must be done in main thread */
static void main_dispatcher_self_handle_channel_event(
@@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
main_dispatcher.core->channel_event(event, info);
}
-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
+static void main_dispatcher_handle_channel_event(void *opaque,
+ void *payload)
{
- main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
- msg->data.channel_event.info);
+ MainDispatcherChannelEventMessage *channel_event = payload;
+
+ main_dispatcher_self_handle_channel_event(channel_event->event,
+ channel_event->info);
}
void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
{
- MainDispatcherMessage msg;
- ssize_t written = 0;
- ssize_t ret;
+ MainDispatcherChannelEventMessage msg;
- if (pthread_self() == main_dispatcher.self) {
+ if (pthread_self() == main_dispatcher.base.self) {
main_dispatcher_self_handle_channel_event(event, info);
return;
}
- msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
- msg.data.channel_event.event = event;
- msg.data.channel_event.info = info;
- pthread_mutex_lock(&main_dispatcher.lock);
- while (written < sizeof(msg)) {
- ret = write(main_dispatcher.other_fd, &msg + written,
- sizeof(msg) - written);
- if (ret == -1) {
- assert(errno == EINTR);
- continue;
- }
- written += ret;
- }
- pthread_mutex_unlock(&main_dispatcher.lock);
+ msg.event = event;
+ msg.info = info;
+ dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+ &msg);
}
-
-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
+static void dispatcher_handle_read(int fd, int event, void *opaque)
{
- int ret;
- MainDispatcher *md = opaque;
- MainDispatcherMessage msg;
- int read_size = 0;
+ Dispatcher *dispatcher = opaque;
- while (read_size < sizeof(msg)) {
- /* blocks until sizeof(msg) is read */
- ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size);
- if (ret == -1) {
- if (errno != EINTR) {
- red_printf("error reading from main dispatcher: %d\n", errno);
- /* TODO: close channel? */
- return;
- }
- continue;
- }
- read_size += ret;
- }
- switch (msg.type) {
- case MAIN_DISPATCHER_CHANNEL_EVENT:
- main_dispatcher_handle_channel_event(&msg);
- break;
- default:
- red_printf("error: unhandled main dispatcher message type %d\n",
- msg.type);
- }
+ dispatcher_handle_recv_read(dispatcher);
}
void main_dispatcher_init(SpiceCoreInterface *core)
{
- int channels[2];
-
memset(&main_dispatcher, 0, sizeof(main_dispatcher));
main_dispatcher.core = core;
-
- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
- red_error("socketpair failed %s", strerror(errno));
- return;
- }
- pthread_mutex_init(&main_dispatcher.lock, NULL);
- main_dispatcher.main_fd = channels[0];
- main_dispatcher.other_fd = channels[1];
- main_dispatcher.self = pthread_self();
-
- core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
- main_dispatcher_handle_read, &main_dispatcher);
+ dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base);
+ core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
+ dispatcher_handle_read, &main_dispatcher.base);
+ dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+ main_dispatcher_handle_channel_event,
+ sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
}
--
1.7.7.1
More information about the Spice-devel
mailing list