[Spice-devel] [PATCH v2 2/6] server: introduce dispatcher
Alon Levy
alevy at redhat.com
Tue Nov 8 02:34:10 PST 2011
On Tue, Nov 08, 2011 at 09:31:27AM +0200, Yonit Halperin wrote:
> Hi,
> 2 comments bellow
> On 11/07/2011 01:44 PM, Alon Levy wrote:
> >used for main_dispatcher only in this patch.
> >
> >Dispatcher is meant to be used for Main<->any low frequency messages.
> >
> >It's interface is meant to include the red_dispatcher usage:
> > fixed size messages per message type
> > some messages require an ack
> >
> >Some methods are added to be used by RedDispatcher later:
> > dispatcher_handle_read - to be called directly by RedDispatcher epoll
> > based loop
> > dispatcher_set_opaque - to be set from red_worker pthread
> > dispatcher_init - allow NULL core as used by red_worker
> >
> >Read and Write behavior: (changed from RFC)
> > Sender: blocking write, blocking read for ack (if any).
> > Reader: non blocking check for 4 bytes (message type is uint32_t),
> > blocking read for the rest of the message, repeat until fail to
> > get message_type.
> >
> >FDO Bugzilla: 42463
> >
> >v1->v2:
> > drop O_NONBLOCK, use poll (Paolo Bonzini)
> > write_size/written_size (Paolo)
> > fold red_dispatcher change into dispatcher introduction
> >
> >RFC->v1:
> > read_safe, write_safe instead of read_with_eintr (Paolo Bonzini)
> > read_safe can be blocking or non blocking
> > statics where missing, fix read_safe arithmetic (Yonit Halperin)
> > dispatcher_handle_read: read until no data available. will block
> > if the start of a message is available. (Yonit)
> > renamed DispatcherMessage's send_ack to ack (Yonit)
> >---
> > server/Makefile.am | 2 +
> > server/dispatcher.c | 206 ++++++++++++++++++++++++++++++++++++++++++++++
> > server/dispatcher.h | 81 ++++++++++++++++++
> > server/main_dispatcher.c | 102 ++++++-----------------
> > 4 files changed, 316 insertions(+), 75 deletions(-)
> > create mode 100644 server/dispatcher.c
> > create mode 100644 server/dispatcher.h
> >
> >diff --git a/server/Makefile.am b/server/Makefile.am
> >index 418d707..34a6b47 100644
> >--- a/server/Makefile.am
> >+++ b/server/Makefile.am
> >@@ -78,6 +78,8 @@ libspice_server_la_SOURCES = \
> > red_client_cache.h \
> > red_client_shared_cache.h \
> > red_common.h \
> >+ dispatcher.c \
> >+ dispatcher.h \
> > red_dispatcher.c \
> > red_dispatcher.h \
> > main_dispatcher.c \
> >diff --git a/server/dispatcher.c b/server/dispatcher.c
> >new file mode 100644
> >index 0000000..e2973a4
> >--- /dev/null
> >+++ b/server/dispatcher.c
> >@@ -0,0 +1,206 @@
> >+#include<unistd.h>
> >+#include<errno.h>
> >+#include<assert.h>
> >+#include<string.h>
> >+#include<pthread.h>
> >+#include<fcntl.h>
> >+#include<poll.h>
> >+
> >+#include "mem.h"
> >+#include "spice_common.h"
> >+#include "dispatcher.h"
> >+
> >+#define ACK 0xffffffff
> >+
> >+/*
> >+ * read_safe
> >+ * helper. reads until size bytes accumulated in buf, if an error other then
> >+ * EINTR is encountered returns -1, otherwise returns 0.
> >+ * @block if 1 the read will block (the fd is always blocking).
> >+ * if 0 poll first, return immediately if no bytes available, otherwise
> >+ * read size in blocking mode.
> >+ */
> >+static int read_safe(int fd, void *buf, size_t size, int block)
> >+{
> >+ int read_size = 0;
> >+ int ret;
> >+ struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0};
> >+
> >+ if (!block) {
> >+ ret = poll(&pollfd, 1, 0);
> >+ if (ret == -1) {
> What if (ernno == EINTR)? shouldn't you call poll again? If you
> don't read from the fd now, will you receive another read event?
Yes for select, no for epoll (IIUC). So yes, I'll do a loop if errno ==
EINTR.
will fix both your other comments, thanks.
> >+ return -1;
> >+ }
> >+ if (!(pollfd.revents& POLLIN)) {
> >+ return 0;
> >+ }
> >+ }
> >+ while (read_size< size) {
> >+ ret = read(fd, buf + read_size, size - read_size);
> >+ if (ret == -1) {
> >+ if (errno == EINTR) {
> >+ continue;
> >+ }
> >+ return -1;
> >+ }
> >+ if (ret == 0) {
> >+ red_error("broken pipe on read");
> >+ errno = ECONNRESET; /* huh? */
> >+ return -1;
> >+ }
> >+ read_size += ret;
> >+ }
> >+ return read_size;
> >+}
> >+
> >+/*
> >+ * write_safe
> >+ * @return -1 for error, otherwise number of written bytes. may be zero.
> >+ */
> >+static int write_safe(int fd, void *buf, size_t size)
> >+{
> >+ int written_size = 0;
> >+ int ret;
> >+
> >+ while (written_size< size) {
> >+ ret = write(fd, buf + written_size, size - written_size);
> >+ if (ret == -1) {
> >+ if (errno != EINTR) {
> >+ return -1;
> >+ }
> >+ continue;
> >+ }
> >+ written_size += ret;
> >+ }
> >+ return written_size;
> >+}
> >+
> >+static int dispatcher_handle_single_read(Dispatcher *dispatcher)
> >+{
> >+ int ret;
> >+ uint32_t type;
> >+ DispatcherMessage *msg = NULL;
> >+ uint8_t *payload = dispatcher->payload;
> >+ uint32_t ack = ACK;
> >+
> >+ if ((ret = read_safe(dispatcher->recv_fd,&type, sizeof(type), 0)) == -1) {
> >+ red_printf("error reading from dispatcher: %d\n", errno);
> >+ return 0;
> >+ }
> >+ if (ret == 0) {
> >+ /* no messsage */
> >+ return 0;
> >+ }
> >+ msg =&dispatcher->messages[type];
> >+ /* we call read_safe even if msg->size == 0, this has the side
> >+ * effect of setting the fd to block, so the remaining send_data
> >+ * users will get a blocking fd as expected. */
> obsolete comment (and the poll is not necessary if msg->size == 0)
> >+ if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
> >+ red_printf("error reading from dispatcher: %d\n", errno);
> >+ /* TODO: close socketpair? */
> >+ return 0;
> >+ }
> >+ if (msg->handler) {
> >+ msg->handler(dispatcher->opaque, (void *)payload);
> >+ } else {
> >+ red_printf("error: no handler for message type %d\n", type);
> >+ }
> >+ if (msg->ack&& write_safe(dispatcher->recv_fd,
> >+&ack, sizeof(ack)) == -1) {
> >+ red_printf("error writing ack for message %d\n", type);
> >+ /* TODO: close socketpair? */
> >+ }
> >+ return 1;
> >+}
> >+
> >+/*
> >+ * dispatcher_handle_recv_read
> >+ * doesn't handle being in the middle of a message. all reads are blocking.
> >+ */
> >+void dispatcher_handle_recv_read(Dispatcher *dispatcher)
> >+{
> >+ while (dispatcher_handle_single_read(dispatcher)) {
> >+ }
> >+}
> >+
> >+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> >+ void *payload)
> >+{
> >+ DispatcherMessage *msg;
> >+ uint32_t ack;
> >+ int send_fd = dispatcher->send_fd;
> >+
> >+ assert(dispatcher->max_message_type> message_type);
> >+ assert(dispatcher->messages[message_type].handler);
> >+ msg =&dispatcher->messages[message_type];
> >+ pthread_mutex_lock(&dispatcher->lock);
> >+ if (write_safe(send_fd,&message_type, sizeof(message_type)) == -1) {
> >+ red_printf("error: failed to send message type for message %d\n",
> >+ message_type);
> >+ goto unlock;
> >+ }
> >+ if (write_safe(send_fd, payload, msg->size) == -1) {
> >+ red_printf("error: failed to send message body for message %d\n",
> >+ message_type);
> >+ goto unlock;
> >+ }
> >+ if (msg->ack) {
> >+ if (read_safe(send_fd,&ack, sizeof(ack), 1) == -1) {
> missing error print, goto unlock and return?
> >+ }
> >+ if (ack != ACK) {
> >+ red_printf("error: got wrong ack value in dispatcher "
> >+ "for message %d\n", message_type);
> >+ /* TODO handling error? */
> >+ }
> >+ }
> >+unlock:
> >+ pthread_mutex_unlock(&dispatcher->lock);
> >+}
> >+
> >+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> >+ dispatcher_handle_message handler, size_t size,
> >+ int ack)
> >+{
> >+ DispatcherMessage *msg;
> >+
> >+ assert(message_type< dispatcher->max_message_type);
> >+ assert(dispatcher->messages[message_type].handler == 0);
> >+ msg =&dispatcher->messages[message_type];
> >+ msg->handler = handler;
> >+ msg->size = size;
> >+ msg->ack = ack;
> >+ if (msg->size> dispatcher->payload_size) {
> >+ dispatcher->payload = realloc(dispatcher->payload, msg->size);
> >+ dispatcher->payload_size = msg->size;
> >+ }
> >+}
> >+
> >+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
> >+ void *opaque)
> >+{
> >+ int channels[2];
> >+
> >+ dispatcher->opaque = opaque;
> >+ if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> >+ red_error("socketpair failed %s", strerror(errno));
> >+ return;
> >+ }
> >+ pthread_mutex_init(&dispatcher->lock, NULL);
> >+ dispatcher->recv_fd = channels[0];
> >+ dispatcher->send_fd = channels[1];
> >+ dispatcher->self = pthread_self();
> >+
> >+ dispatcher->messages = spice_malloc0_n(max_message_type,
> >+ sizeof(dispatcher->messages[0]));
> >+ dispatcher->max_message_type = max_message_type;
> >+}
> >+
> >+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
> >+{
> >+ dispatcher->opaque = opaque;
> >+}
> >+
> >+int dispatcher_get_recv_fd(Dispatcher *dispatcher)
> >+{
> >+ return dispatcher->recv_fd;
> >+}
> >diff --git a/server/dispatcher.h b/server/dispatcher.h
> >new file mode 100644
> >index 0000000..04e6b46
> >--- /dev/null
> >+++ b/server/dispatcher.h
> >@@ -0,0 +1,81 @@
> >+#ifndef MAIN_DISPATCHER_H
> >+#define MAIN_DISPATCHER_H
> >+
> >+#include<spice.h>
> >+
> >+typedef struct Dispatcher Dispatcher;
> >+
> >+typedef void (*dispatcher_handle_message)(void *opaque,
> >+ void *payload);
> >+
> >+typedef struct DispatcherMessage {
> >+ size_t size;
> >+ int ack;
> >+ dispatcher_handle_message handler;
> >+} DispatcherMessage;
> >+
> >+struct Dispatcher {
> >+ SpiceCoreInterface *recv_core;
> >+ int recv_fd;
> >+ int send_fd;
> >+ pthread_t self;
> >+ pthread_mutex_t lock;
> >+ DispatcherMessage *messages;
> >+ int stage; /* message parser stage - sender has no stages */
> >+ size_t max_message_type;
> >+ void *payload; /* allocated as max of message sizes */
> >+ size_t payload_size; /* used to track realloc calls */
> >+ void *opaque;
> >+};
> >+
> >+/*
> >+ * dispatcher_send_message
> >+ * @message_type: message type
> >+ * @payload: payload
> >+ */
> >+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> >+ void *payload);
> >+
> >+/*
> >+ * dispatcher_init
> >+ * @max_message_type: number of message types. Allows upfront allocation
> >+ * of a DispatcherMessage list.
> >+ * up front, and registration in any order wanted.
> >+ */
> >+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
> >+ void *opaque);
> >+
> >+/*
> >+ * dispatcher_register_handler
> >+ * @dispatcher: dispatcher
> >+ * @messsage_type: message type
> >+ * @handler: message handler
> >+ * @size: message size. Each type has a fixed associated size.
> >+ * @ack: send an ack. This is per message type - you can't send the
> >+ * same message type with and without. Register two different
> >+ * messages if that is what you want.
> >+ */
> >+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> >+ dispatcher_handle_message handler, size_t size,
> >+ int ack);
> >+
> >+/*
> >+ * dispatcher_handle_recv_read
> >+ * @dispatcher: Dispatcher instance
> >+ */
> >+void dispatcher_handle_recv_read(Dispatcher *);
> >+
> >+/*
> >+ * dispatcher_get_recv_fd
> >+ * @return: receive file descriptor of the dispatcher
> >+ */
> >+int dispatcher_get_recv_fd(Dispatcher *);
> >+
> >+/*
> >+ * dispatcher_set_opaque
> >+ * @dispatcher: Dispatcher instance
> >+ * @opaque: opaque to use for callbacks
> >+ */
> >+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
> >+
> >+#endif //MAIN_DISPATCHER_H
> >diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> >index 73856bf..a5967fa 100644
> >--- a/server/main_dispatcher.c
> >+++ b/server/main_dispatcher.c
> >@@ -5,6 +5,7 @@
> > #include<assert.h>
> >
> > #include "red_common.h"
> >+#include "dispatcher.h"
> > #include "main_dispatcher.h"
> >
> > /*
> >@@ -28,11 +29,8 @@
> > */
> >
> > typedef struct {
> >+ Dispatcher base;
> > SpiceCoreInterface *core;
> >- int main_fd;
> >- int other_fd;
> >- pthread_t self;
> >- pthread_mutex_t lock;
> > } MainDispatcher;
> >
> > MainDispatcher main_dispatcher;
> >@@ -43,15 +41,10 @@ enum {
> > MAIN_DISPATCHER_NUM_MESSAGES
> > };
> >
> >-typedef struct MainDispatcherMessage {
> >- uint32_t type;
> >- union {
> >- struct {
> >- int event;
> >- SpiceChannelEventInfo *info;
> >- } channel_event;
> >- } data;
> >-} MainDispatcherMessage;
> >+typedef struct MainDispatcherChannelEventMessage {
> >+ int event;
> >+ SpiceChannelEventInfo *info;
> >+} MainDispatcherChannelEventMessage;
> >
> > /* channel_event - calls core->channel_event, must be done in main thread */
> > static void main_dispatcher_self_handle_channel_event(
> >@@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
> > main_dispatcher.core->channel_event(event, info);
> > }
> >
> >-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> >+static void main_dispatcher_handle_channel_event(void *opaque,
> >+ void *payload)
> > {
> >- main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> >- msg->data.channel_event.info);
> >+ MainDispatcherChannelEventMessage *channel_event = payload;
> >+
> >+ main_dispatcher_self_handle_channel_event(channel_event->event,
> >+ channel_event->info);
> > }
> >
> > void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> > {
> >- MainDispatcherMessage msg;
> >- ssize_t written = 0;
> >- ssize_t ret;
> >+ MainDispatcherChannelEventMessage msg;
> >
> >- if (pthread_self() == main_dispatcher.self) {
> >+ if (pthread_self() == main_dispatcher.base.self) {
> > main_dispatcher_self_handle_channel_event(event, info);
> > return;
> > }
> >- msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> >- msg.data.channel_event.event = event;
> >- msg.data.channel_event.info = info;
> >- pthread_mutex_lock(&main_dispatcher.lock);
> >- while (written< sizeof(msg)) {
> >- ret = write(main_dispatcher.other_fd,&msg + written,
> >- sizeof(msg) - written);
> >- if (ret == -1) {
> >- assert(errno == EINTR);
> >- continue;
> >- }
> >- written += ret;
> >- }
> >- pthread_mutex_unlock(&main_dispatcher.lock);
> >+ msg.event = event;
> >+ msg.info = info;
> >+ dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> >+&msg);
> > }
> >
> >-
> >-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> >+static void dispatcher_handle_read(int fd, int event, void *opaque)
> > {
> >- int ret;
> >- MainDispatcher *md = opaque;
> >- MainDispatcherMessage msg;
> >- int read_size = 0;
> >+ Dispatcher *dispatcher = opaque;
> >
> >- while (read_size< sizeof(msg)) {
> >- /* blocks until sizeof(msg) is read */
> >- ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size);
> >- if (ret == -1) {
> >- if (errno != EINTR) {
> >- red_printf("error reading from main dispatcher: %d\n", errno);
> >- /* TODO: close channel? */
> >- return;
> >- }
> >- continue;
> >- }
> >- read_size += ret;
> >- }
> >- switch (msg.type) {
> >- case MAIN_DISPATCHER_CHANNEL_EVENT:
> >- main_dispatcher_handle_channel_event(&msg);
> >- break;
> >- default:
> >- red_printf("error: unhandled main dispatcher message type %d\n",
> >- msg.type);
> >- }
> >+ dispatcher_handle_recv_read(dispatcher);
> > }
> >
> > void main_dispatcher_init(SpiceCoreInterface *core)
> > {
> >- int channels[2];
> >-
> > memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> > main_dispatcher.core = core;
> >-
> >- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> >- red_error("socketpair failed %s", strerror(errno));
> >- return;
> >- }
> >- pthread_mutex_init(&main_dispatcher.lock, NULL);
> >- main_dispatcher.main_fd = channels[0];
> >- main_dispatcher.other_fd = channels[1];
> >- main_dispatcher.self = pthread_self();
> >-
> >- core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> >- main_dispatcher_handle_read,&main_dispatcher);
> >+ dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES,&main_dispatcher.base);
> >+ core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
> >+ dispatcher_handle_read,&main_dispatcher.base);
> >+ dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> >+ main_dispatcher_handle_channel_event,
> >+ sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
> > }
>
More information about the Spice-devel
mailing list