[Spice-devel] [RFC 2/3] server: introduce dispatcher

Alon Levy alevy at redhat.com
Tue Nov 1 01:10:02 PDT 2011


used for main_dispatcher only in this patch.

Dispatcher is meant to be used for Main<->any low frequency messages.

It's interface is meant to include the red_dispatcher usage:
 fixed size messages per message type
 some messages require an ack

Some methods are added to be used by RedDispatcher later:
 dispatcher_handle_read - to be called directly by RedDispatcher epoll
  based loop
 dispatcher_set_opaque - to be set from red_worker pthread
 dispatcher_init - allow NULL core as used by red_worker
---
 server/Makefile.am       |    2 +
 server/dispatcher.c      |  162 ++++++++++++++++++++++++++++++++++++++++++++++
 server/dispatcher.h      |   78 ++++++++++++++++++++++
 server/main_dispatcher.c |  105 +++++++-----------------------
 4 files changed, 265 insertions(+), 82 deletions(-)
 create mode 100644 server/dispatcher.c
 create mode 100644 server/dispatcher.h

diff --git a/server/Makefile.am b/server/Makefile.am
index 418d707..34a6b47 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -78,6 +78,8 @@ libspice_server_la_SOURCES =			\
 	red_client_cache.h			\
 	red_client_shared_cache.h		\
 	red_common.h				\
+	dispatcher.c				\
+	dispatcher.h				\
 	red_dispatcher.c			\
 	red_dispatcher.h			\
 	main_dispatcher.c			\
diff --git a/server/dispatcher.c b/server/dispatcher.c
new file mode 100644
index 0000000..ca111b4
--- /dev/null
+++ b/server/dispatcher.c
@@ -0,0 +1,162 @@
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "mem.h"
+#include "spice_common.h"
+#include "dispatcher.h"
+
+#define ACK 0xffffffff
+
+/*
+ * read_with_eintr
+ * helper. reads until size bytes accumulated in buf, if an error other then
+ * EINTR is encountered returns -1, otherwise returns 0.
+ */
+int read_with_eintr(int fd, void *buf, size_t size)
+{
+    int read_size = 0;
+    int ret;
+
+    while (read_size < size) {
+        ret = read(fd, buf + read_size, size - read_size);
+        if (ret == -1) {
+            if (errno != EINTR) {
+                return -1;
+            }
+            continue;
+        }
+        read_size += size;
+    }
+    return 0;
+}
+
+int write_with_eintr(int fd, void *buf, size_t size)
+{
+    /* according to the man page EINTR will mean no data written, so
+     * no need to take account of short writes. */
+    while (write(fd, buf, size) != size) {
+        if (errno != EINTR) {
+            return -1;
+        }
+    }
+    return 0;
+}
+/*
+ * dispatcher_handle_read
+ * doesn't handle being in the middle of a message. all reads are blocking.
+ */
+void dispatcher_handle_read(int fd, int event, void *opaque)
+{
+    Dispatcher *dispatcher = opaque;
+    uint32_t type;
+    DispatcherMessage *msg = NULL;
+    uint8_t *payload = dispatcher->payload;
+    uint32_t ack = ACK;
+
+    if (read_with_eintr(dispatcher->recv_fd, &type, sizeof(type)) == -1) {
+        red_printf("error reading from dispatcher: %d\n", errno);
+        return;
+    }
+    assert(type < dispatcher->max_message_type);
+    msg = &dispatcher->messages[type];
+    if (read_with_eintr(dispatcher->recv_fd, payload, msg->size) == -1) {
+        red_printf("error reading from dispatcher: %d\n", errno);
+        /* TODO: close socketpair? */
+        return;
+    }
+    if (msg->handler) {
+        msg->handler(dispatcher->opaque, (void *)payload);
+    } else {
+        red_printf("error: no handler for message type %d\n", type);
+    }
+    if (msg->send_ack && write_with_eintr(dispatcher->recv_fd,
+                                          &ack, sizeof(ack)) == -1) {
+        red_printf("error writing ack for message %d\n", type);
+        /* TODO: close socketpair? */
+    }
+}
+
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+                             void *payload)
+{
+    DispatcherMessage *msg;
+    uint32_t ack;
+    int send_fd = dispatcher->send_fd;
+
+    assert(dispatcher->max_message_type > message_type);
+    assert(dispatcher->messages[message_type].handler);
+    msg = &dispatcher->messages[message_type];
+    pthread_mutex_lock(&dispatcher->lock);
+    if (write_with_eintr(send_fd, &message_type, sizeof(message_type)) == -1) {
+        red_printf("error: failed to send message type for message %d\n",
+                   message_type);
+        goto unlock;
+    }
+    if (write_with_eintr(send_fd, payload, msg->size) == -1) {
+        red_printf("error: failed to send message body for message %d\n",
+                   message_type);
+        goto unlock;
+    }
+    if (msg->send_ack) {
+        if (read_with_eintr(send_fd, &ack, sizeof(ack)) == -1) {
+        }
+        if (ack != ACK) {
+            red_printf("error: got wrong ack value in dispatcher "
+                       "for message %d\n", message_type);
+            /* TODO handling error? */
+        }
+    }
+unlock:
+    pthread_mutex_unlock(&dispatcher->lock);
+}
+
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+                                 dispatcher_handle_message handler, size_t size,
+                                 int send_ack)
+{
+    DispatcherMessage *msg;
+
+    assert(message_type < dispatcher->max_message_type);
+    assert(dispatcher->messages[message_type].handler == 0);
+    msg = &dispatcher->messages[message_type];
+    msg->handler = handler;
+    msg->size = size;
+    msg->send_ack = send_ack;
+    if (msg->size > dispatcher->payload_size) {
+        dispatcher->payload = realloc(dispatcher->payload, msg->size);
+        dispatcher->payload_size = msg->size;
+    }
+}
+
+void dispatcher_init(Dispatcher *dispatcher, SpiceCoreInterface *recv_core,
+                     size_t max_message_type, void *opaque)
+{
+    int channels[2];
+
+    dispatcher->recv_core = recv_core;
+    dispatcher->opaque = opaque;
+    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
+        red_error("socketpair failed %s", strerror(errno));
+        return;
+    }
+    pthread_mutex_init(&dispatcher->lock, NULL);
+    dispatcher->recv_fd = channels[0];
+    dispatcher->send_fd = channels[1];
+    dispatcher->self = pthread_self();
+
+    dispatcher->messages = spice_malloc0_n(max_message_type,
+                                           sizeof(dispatcher->messages[0]));
+    dispatcher->max_message_type = max_message_type;
+    if (recv_core) {
+        recv_core->watch_add(dispatcher->recv_fd, SPICE_WATCH_EVENT_READ,
+                             dispatcher_handle_read, dispatcher);
+    }
+}
+
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
+{
+    dispatcher->opaque = opaque;
+}
diff --git a/server/dispatcher.h b/server/dispatcher.h
new file mode 100644
index 0000000..9212ca6
--- /dev/null
+++ b/server/dispatcher.h
@@ -0,0 +1,78 @@
+#ifndef MAIN_DISPATCHER_H
+#define MAIN_DISPATCHER_H
+
+#include <spice.h>
+
+typedef struct Dispatcher Dispatcher;
+
+typedef void (*dispatcher_handle_message)(void *opaque,
+                                          void *payload);
+
+typedef struct DispatcherMessage {
+    size_t size;
+    int send_ack;
+    dispatcher_handle_message handler;
+} DispatcherMessage;
+
+struct Dispatcher {
+    SpiceCoreInterface *recv_core;
+    int recv_fd;
+    int send_fd;
+    pthread_t self;
+    pthread_mutex_t lock;
+    DispatcherMessage *messages;
+    int stage;  /* message parser stage - sender has no stages */
+    size_t max_message_type;
+    void *payload; /* allocated as max of message sizes */
+    size_t payload_size; /* used to track realloc calls */
+    void *opaque;
+};
+
+/*
+ * dispatcher_send_message
+ * @message_type: message type
+ * @payload:      payload
+ */
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+                             void *payload);
+
+/*
+ * dispatcher_init
+ * @recv_core: core interface instance used by receiver
+ * @max_message_type: number of message types. Allows upfront allocation
+ *  of a DispatcherMessage list.
+ * up front, and registration in any order wanted.
+ */
+void dispatcher_init(Dispatcher *dispatcher, SpiceCoreInterface *recv_core,
+                     size_t max_message_type, void *opaque);
+
+/*
+ * dispatcher_register_handler
+ * @dispatcher:           dispatcher
+ * @messsage_type:  message type
+ * @handler:        message handler
+ * @size:           message size. Each type has a fixed associated size.
+ * @send_ack:       send an ack. This is per message type - you can't send the same
+ *                  message type with and without. Register two different messages
+ *                  if that is what you want.
+ */
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+                                 dispatcher_handle_message handler, size_t size,
+                                 int send_ack);
+
+/*
+ *  dispatcher_handle_read
+ *  @fd: fd triggered. Value is ignored
+ *  @event: event (read/write). Value is ignored
+ *  @opaque: must be Dispatcher instance
+ */
+void dispatcher_handle_read(int fd, int event, void *opaque);
+
+/*
+ * dispatcher_set_opaque
+ * @dispatcher: Dispatcher instance
+ * @opaque: opaque to use for callbacks
+ */
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
+
+#endif //MAIN_DISPATCHER_H
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
index 73856bf..5cddab7 100644
--- a/server/main_dispatcher.c
+++ b/server/main_dispatcher.c
@@ -5,6 +5,7 @@
 #include <assert.h>
 
 #include "red_common.h"
+#include "dispatcher.h"
 #include "main_dispatcher.h"
 
 /*
@@ -28,11 +29,7 @@
  */
 
 typedef struct {
-    SpiceCoreInterface *core;
-    int main_fd;
-    int other_fd;
-    pthread_t self;
-    pthread_mutex_t lock;
+    Dispatcher base;
 } MainDispatcher;
 
 MainDispatcher main_dispatcher;
@@ -43,103 +40,47 @@ enum {
     MAIN_DISPATCHER_NUM_MESSAGES
 };
 
-typedef struct MainDispatcherMessage {
-    uint32_t type;
-    union {
-        struct {
-            int event;
-            SpiceChannelEventInfo *info;
-        } channel_event;
-    } data;
-} MainDispatcherMessage;
+typedef struct MainDispatcherChannelEventMessage {
+    int event;
+    SpiceChannelEventInfo *info;
+} MainDispatcherChannelEventMessage;
 
 /* channel_event - calls core->channel_event, must be done in main thread */
 static void main_dispatcher_self_handle_channel_event(
                                                 int event,
                                                 SpiceChannelEventInfo *info)
 {
-    main_dispatcher.core->channel_event(event, info);
+    main_dispatcher.base.recv_core->channel_event(event, info);
 }
 
-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
+static void main_dispatcher_handle_channel_event(void *opaque,
+                                                 void *payload)
 {
-    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
-                                              msg->data.channel_event.info);
+    MainDispatcherChannelEventMessage *channel_event = payload;
+
+    main_dispatcher_self_handle_channel_event(channel_event->event,
+                                              channel_event->info);
 }
 
 void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
 {
-    MainDispatcherMessage msg;
-    ssize_t written = 0;
-    ssize_t ret;
+    MainDispatcherChannelEventMessage msg;
 
-    if (pthread_self() == main_dispatcher.self) {
+    if (pthread_self() == main_dispatcher.base.self) {
         main_dispatcher_self_handle_channel_event(event, info);
         return;
     }
-    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
-    msg.data.channel_event.event = event;
-    msg.data.channel_event.info = info;
-    pthread_mutex_lock(&main_dispatcher.lock);
-    while (written < sizeof(msg)) {
-        ret = write(main_dispatcher.other_fd, &msg + written,
-                    sizeof(msg) - written);
-        if (ret == -1) {
-            assert(errno == EINTR);
-            continue;
-        }
-        written += ret;
-    }
-    pthread_mutex_unlock(&main_dispatcher.lock);
-}
-
-
-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
-{
-    int ret;
-    MainDispatcher *md = opaque;
-    MainDispatcherMessage msg;
-    int read_size = 0;
-
-    while (read_size < sizeof(msg)) {
-        /* blocks until sizeof(msg) is read */
-        ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size);
-        if (ret == -1) {
-            if (errno != EINTR) {
-                red_printf("error reading from main dispatcher: %d\n", errno);
-                /* TODO: close channel? */
-                return;
-            }
-            continue;
-        }
-        read_size += ret;
-    }
-    switch (msg.type) {
-        case MAIN_DISPATCHER_CHANNEL_EVENT:
-            main_dispatcher_handle_channel_event(&msg);
-            break;
-        default:
-            red_printf("error: unhandled main dispatcher message type %d\n",
-                       msg.type);
-    }
+    msg.event = event;
+    msg.info = info;
+    dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+                            &msg);
 }
 
 void main_dispatcher_init(SpiceCoreInterface *core)
 {
-    int channels[2];
-
     memset(&main_dispatcher, 0, sizeof(main_dispatcher));
-    main_dispatcher.core = core;
-
-    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
-        red_error("socketpair failed %s", strerror(errno));
-        return;
-    }
-    pthread_mutex_init(&main_dispatcher.lock, NULL);
-    main_dispatcher.main_fd = channels[0];
-    main_dispatcher.other_fd = channels[1];
-    main_dispatcher.self = pthread_self();
-
-    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
-                    main_dispatcher_handle_read, &main_dispatcher);
+    dispatcher_init(&main_dispatcher.base, core, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base);
+    dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+                                main_dispatcher_handle_channel_event,
+                                sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
 }
-- 
1.7.7



More information about the Spice-devel mailing list