[Spice-devel] [RFC 2/3] server: introduce dispatcher

Alon Levy alevy at redhat.com
Wed Nov 2 08:08:37 PDT 2011


On Wed, Nov 02, 2011 at 04:24:05PM +0200, Yonit Halperin wrote:
> On 11/01/2011 10:10 AM, Alon Levy wrote:
> >used for main_dispatcher only in this patch.
> >
> >Dispatcher is meant to be used for Main<->any low frequency messages.
> >
> >It's interface is meant to include the red_dispatcher usage:
> >  fixed size messages per message type
> >  some messages require an ack
> >
> >Some methods are added to be used by RedDispatcher later:
> >  dispatcher_handle_read - to be called directly by RedDispatcher epoll
> >   based loop
> >  dispatcher_set_opaque - to be set from red_worker pthread
> >  dispatcher_init - allow NULL core as used by red_worker
> Can you add comment that by default a dispatcher is watched by the
> "main thread" and that if the user don't want this to happen, core
> must be NULL? Maybe it will be safer to just make it the dispatcher
> user responsibility and not start the watch in dispatcher_init.

I'll do the later.

> >---
> >  server/Makefile.am       |    2 +
> >  server/dispatcher.c      |  162 ++++++++++++++++++++++++++++++++++++++++++++++
> >  server/dispatcher.h      |   78 ++++++++++++++++++++++
> >  server/main_dispatcher.c |  105 +++++++-----------------------
> >  4 files changed, 265 insertions(+), 82 deletions(-)
> >  create mode 100644 server/dispatcher.c
> >  create mode 100644 server/dispatcher.h
> >
> >diff --git a/server/Makefile.am b/server/Makefile.am
> >index 418d707..34a6b47 100644
> >--- a/server/Makefile.am
> >+++ b/server/Makefile.am
> >@@ -78,6 +78,8 @@ libspice_server_la_SOURCES =			\
> >  	red_client_cache.h			\
> >  	red_client_shared_cache.h		\
> >  	red_common.h				\
> >+	dispatcher.c				\
> >+	dispatcher.h				\
> >  	red_dispatcher.c			\
> >  	red_dispatcher.h			\
> >  	main_dispatcher.c			\
> >diff --git a/server/dispatcher.c b/server/dispatcher.c
> >new file mode 100644
> >index 0000000..ca111b4
> >--- /dev/null
> >+++ b/server/dispatcher.c
> >@@ -0,0 +1,162 @@
> >+#include<unistd.h>
> >+#include<errno.h>
> >+#include<assert.h>
> >+#include<string.h>
> >+#include<pthread.h>
> >+
> >+#include "mem.h"
> >+#include "spice_common.h"
> >+#include "dispatcher.h"
> >+
> >+#define ACK 0xffffffff
> >+
> >+/*
> >+ * read_with_eintr
> >+ * helper. reads until size bytes accumulated in buf, if an error other then
> >+ * EINTR is encountered returns -1, otherwise returns 0.
> >+ */
> static
> >+int read_with_eintr(int fd, void *buf, size_t size)
> >+{
> >+    int read_size = 0;
> >+    int ret;
> >+
> >+    while (read_size<  size) {
> >+        ret = read(fd, buf + read_size, size - read_size);
> >+        if (ret == -1) {
> >+            if (errno != EINTR) {
> >+                return -1;
> >+            }
> >+            continue;
> >+        }
> >+        read_size += size;
> should be += ret

eek, good catch.

> >+    }
> >+    return 0;
> >+}
> >+
> static
> >+int write_with_eintr(int fd, void *buf, size_t size)
> >+{
> >+    /* according to the man page EINTR will mean no data written, so
> >+     * no need to take account of short writes. */
> this is when -1 is returned and not if (write(fd, buf, size) > 0)
> and !size. Then, you should still track short writes.

I'll just change this to assume short writes are possible, like I
answered Paolo.

> >+    while (write(fd, buf, size) != size) {
> >+        if (errno != EINTR) {
> >+            return -1;
> >+        }
> >+    }
> >+    return 0;
> >+}
> >+/*
> >+ * dispatcher_handle_read
> >+ * doesn't handle being in the middle of a message. all reads are blocking.
> >+ */
> >+void dispatcher_handle_read(int fd, int event, void *opaque)
> >+{
> >+    Dispatcher *dispatcher = opaque;
> >+    uint32_t type;
> >+    DispatcherMessage *msg = NULL;
> >+    uint8_t *payload = dispatcher->payload;
> >+    uint32_t ack = ACK;
> >+
> >+    if (read_with_eintr(dispatcher->recv_fd,&type, sizeof(type)) == -1) {
> >+        red_printf("error reading from dispatcher: %d\n", errno);
> >+        return;
> >+    }
> >+    assert(type<  dispatcher->max_message_type);
> >+    msg =&dispatcher->messages[type];
> >+    if (read_with_eintr(dispatcher->recv_fd, payload, msg->size) == -1) {
> >+        red_printf("error reading from dispatcher: %d\n", errno);
> >+        /* TODO: close socketpair? */
> >+        return;
> >+    }
> >+    if (msg->handler) {
> >+        msg->handler(dispatcher->opaque, (void *)payload);
> >+    } else {
> >+        red_printf("error: no handler for message type %d\n", type);
> >+    }
> >+    if (msg->send_ack&&  write_with_eintr(dispatcher->recv_fd,
> >+&ack, sizeof(ack)) == -1) {
> >+        red_printf("error writing ack for message %d\n", type);
> >+        /* TODO: close socketpair? */
> >+    }
> >+}
> >+
> Looks like a problem (we already had), that we read only one message
> when we are waked up, where it is possible that there will be
> several messages in the pipe. What if several display channels
> disconnected and sent channel_event through the main_dispatcher?
> Another example is any message written to the red_dispatcher without
> waiting for Ack.
> (already talked with Alon - the fd will be non blocking)

Right, thanks for updating our audience.

> >+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> >+                             void *payload)
> >+{
> >+    DispatcherMessage *msg;
> >+    uint32_t ack;
> >+    int send_fd = dispatcher->send_fd;
> >+
> >+    assert(dispatcher->max_message_type>  message_type);
> >+    assert(dispatcher->messages[message_type].handler);
> >+    msg =&dispatcher->messages[message_type];
> >+    pthread_mutex_lock(&dispatcher->lock);
> >+    if (write_with_eintr(send_fd,&message_type, sizeof(message_type)) == -1) {
> >+        red_printf("error: failed to send message type for message %d\n",
> >+                   message_type);
> >+        goto unlock;
> >+    }
> >+    if (write_with_eintr(send_fd, payload, msg->size) == -1) {
> >+        red_printf("error: failed to send message body for message %d\n",
> >+                   message_type);
> >+        goto unlock;
> >+    }
> >+    if (msg->send_ack) {
> >+        if (read_with_eintr(send_fd,&ack, sizeof(ack)) == -1) {
> >+        }
> >+        if (ack != ACK) {
> >+            red_printf("error: got wrong ack value in dispatcher "
> >+                       "for message %d\n", message_type);
> >+            /* TODO handling error? */
> >+        }
> >+    }
> >+unlock:
> >+    pthread_mutex_unlock(&dispatcher->lock);
> >+}
> >+
> >+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> >+                                 dispatcher_handle_message handler, size_t size,
> >+                                 int send_ack)
> >+{
> >+    DispatcherMessage *msg;
> >+
> >+    assert(message_type<  dispatcher->max_message_type);
> >+    assert(dispatcher->messages[message_type].handler == 0);
> >+    msg =&dispatcher->messages[message_type];
> >+    msg->handler = handler;
> >+    msg->size = size;
> >+    msg->send_ack = send_ack;
> >+    if (msg->size>  dispatcher->payload_size) {
> >+        dispatcher->payload = realloc(dispatcher->payload, msg->size);
> >+        dispatcher->payload_size = msg->size;
> >+    }
> >+}
> >+
> >+void dispatcher_init(Dispatcher *dispatcher, SpiceCoreInterface *recv_core,
> >+                     size_t max_message_type, void *opaque)
> >+{
> >+    int channels[2];
> >+
> >+    dispatcher->recv_core = recv_core;
> >+    dispatcher->opaque = opaque;
> >+    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> >+        red_error("socketpair failed %s", strerror(errno));
> >+        return;
> >+    }
> >+    pthread_mutex_init(&dispatcher->lock, NULL);
> >+    dispatcher->recv_fd = channels[0];
> >+    dispatcher->send_fd = channels[1];
> >+    dispatcher->self = pthread_self();
> >+
> >+    dispatcher->messages = spice_malloc0_n(max_message_type,
> >+                                           sizeof(dispatcher->messages[0]));
> >+    dispatcher->max_message_type = max_message_type;
> >+    if (recv_core) {
> >+        recv_core->watch_add(dispatcher->recv_fd, SPICE_WATCH_EVENT_READ,
> >+                             dispatcher_handle_read, dispatcher);
> >+    }
> >+}
> >+
> >+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
> >+{
> >+    dispatcher->opaque = opaque;
> >+}
> >diff --git a/server/dispatcher.h b/server/dispatcher.h
> >new file mode 100644
> >index 0000000..9212ca6
> >--- /dev/null
> >+++ b/server/dispatcher.h
> >@@ -0,0 +1,78 @@
> >+#ifndef MAIN_DISPATCHER_H
> >+#define MAIN_DISPATCHER_H
> >+
> >+#include<spice.h>
> >+
> >+typedef struct Dispatcher Dispatcher;
> >+
> >+typedef void (*dispatcher_handle_message)(void *opaque,
> >+                                          void *payload);
> >+
> >+typedef struct DispatcherMessage {
> >+    size_t size;
> >+    int send_ack;
> a bit confusing. s/send_ack/wait_ack ?

It's equivalent - don't see the purpose of changing the name. I mean, it
could just be 'ack', if it's 1 then sender waits for an ack, receiver
sends an ack, if it is 0 neither happen.

> >+    dispatcher_handle_message handler;
> >+} DispatcherMessage;
> >+
> >+struct Dispatcher {
> >+    SpiceCoreInterface *recv_core;
> >+    int recv_fd;
> >+    int send_fd;
> >+    pthread_t self;
> >+    pthread_mutex_t lock;
> >+    DispatcherMessage *messages;
> >+    int stage;  /* message parser stage - sender has no stages */
> >+    size_t max_message_type;
> >+    void *payload; /* allocated as max of message sizes */
> >+    size_t payload_size; /* used to track realloc calls */
> >+    void *opaque;
> >+};
> >+
> >+/*
> >+ * dispatcher_send_message
> >+ * @message_type: message type
> >+ * @payload:      payload
> >+ */
> >+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> >+                             void *payload);
> >+
> >+/*
> >+ * dispatcher_init
> >+ * @recv_core: core interface instance used by receiver
> >+ * @max_message_type: number of message types. Allows upfront allocation
> >+ *  of a DispatcherMessage list.
> >+ * up front, and registration in any order wanted.
> >+ */
> >+void dispatcher_init(Dispatcher *dispatcher, SpiceCoreInterface *recv_core,
> >+                     size_t max_message_type, void *opaque);
> >+
> >+/*
> >+ * dispatcher_register_handler
> >+ * @dispatcher:           dispatcher
> >+ * @messsage_type:  message type
> >+ * @handler:        message handler
> >+ * @size:           message size. Each type has a fixed associated size.
> >+ * @send_ack:       send an ack. This is per message type - you can't send the same
> >+ *                  message type with and without. Register two different messages
> >+ *                  if that is what you want.
> >+ */
> >+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> >+                                 dispatcher_handle_message handler, size_t size,
> >+                                 int send_ack);
> >+
> >+/*
> >+ *  dispatcher_handle_read
> >+ *  @fd: fd triggered. Value is ignored
> >+ *  @event: event (read/write). Value is ignored
> >+ *  @opaque: must be Dispatcher instance
> >+ */
> >+void dispatcher_handle_read(int fd, int event, void *opaque);
> >+
> >+/*
> >+ * dispatcher_set_opaque
> >+ * @dispatcher: Dispatcher instance
> >+ * @opaque: opaque to use for callbacks
> >+ */
> >+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
> >+
> >+#endif //MAIN_DISPATCHER_H
> >diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> >index 73856bf..5cddab7 100644
> >--- a/server/main_dispatcher.c
> >+++ b/server/main_dispatcher.c
> >@@ -5,6 +5,7 @@
> >  #include<assert.h>
> >
> >  #include "red_common.h"
> >+#include "dispatcher.h"
> >  #include "main_dispatcher.h"
> >
> >  /*
> >@@ -28,11 +29,7 @@
> >   */
> >
> >  typedef struct {
> >-    SpiceCoreInterface *core;
> >-    int main_fd;
> >-    int other_fd;
> >-    pthread_t self;
> >-    pthread_mutex_t lock;
> >+    Dispatcher base;
> >  } MainDispatcher;
> >
> >  MainDispatcher main_dispatcher;
> >@@ -43,103 +40,47 @@ enum {
> >      MAIN_DISPATCHER_NUM_MESSAGES
> >  };
> >
> >-typedef struct MainDispatcherMessage {
> >-    uint32_t type;
> >-    union {
> >-        struct {
> >-            int event;
> >-            SpiceChannelEventInfo *info;
> >-        } channel_event;
> >-    } data;
> >-} MainDispatcherMessage;
> >+typedef struct MainDispatcherChannelEventMessage {
> >+    int event;
> >+    SpiceChannelEventInfo *info;
> >+} MainDispatcherChannelEventMessage;
> >
> >  /* channel_event - calls core->channel_event, must be done in main thread */
> >  static void main_dispatcher_self_handle_channel_event(
> >                                                  int event,
> >                                                  SpiceChannelEventInfo *info)
> >  {
> >-    main_dispatcher.core->channel_event(event, info);
> >+    main_dispatcher.base.recv_core->channel_event(event, info);
> >  }
> >
> >-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> >+static void main_dispatcher_handle_channel_event(void *opaque,
> >+                                                 void *payload)
> >  {
> >-    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> >-                                              msg->data.channel_event.info);
> >+    MainDispatcherChannelEventMessage *channel_event = payload;
> >+
> >+    main_dispatcher_self_handle_channel_event(channel_event->event,
> >+                                              channel_event->info);
> >  }
> >
> >  void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> >  {
> >-    MainDispatcherMessage msg;
> >-    ssize_t written = 0;
> >-    ssize_t ret;
> >+    MainDispatcherChannelEventMessage msg;
> >
> >-    if (pthread_self() == main_dispatcher.self) {
> >+    if (pthread_self() == main_dispatcher.base.self) {
> >          main_dispatcher_self_handle_channel_event(event, info);
> >          return;
> >      }
> >-    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> >-    msg.data.channel_event.event = event;
> >-    msg.data.channel_event.info = info;
> >-    pthread_mutex_lock(&main_dispatcher.lock);
> >-    while (written<  sizeof(msg)) {
> >-        ret = write(main_dispatcher.other_fd,&msg + written,
> >-                    sizeof(msg) - written);
> >-        if (ret == -1) {
> >-            assert(errno == EINTR);
> >-            continue;
> >-        }
> >-        written += ret;
> >-    }
> >-    pthread_mutex_unlock(&main_dispatcher.lock);
> >-}
> >-
> >-
> >-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> >-{
> >-    int ret;
> >-    MainDispatcher *md = opaque;
> >-    MainDispatcherMessage msg;
> >-    int read_size = 0;
> >-
> >-    while (read_size<  sizeof(msg)) {
> >-        /* blocks until sizeof(msg) is read */
> >-        ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size);
> >-        if (ret == -1) {
> >-            if (errno != EINTR) {
> >-                red_printf("error reading from main dispatcher: %d\n", errno);
> >-                /* TODO: close channel? */
> >-                return;
> >-            }
> >-            continue;
> >-        }
> >-        read_size += ret;
> >-    }
> >-    switch (msg.type) {
> >-        case MAIN_DISPATCHER_CHANNEL_EVENT:
> >-            main_dispatcher_handle_channel_event(&msg);
> >-            break;
> >-        default:
> >-            red_printf("error: unhandled main dispatcher message type %d\n",
> >-                       msg.type);
> >-    }
> >+    msg.event = event;
> >+    msg.info = info;
> >+    dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> >+&msg);
> >  }
> >
> >  void main_dispatcher_init(SpiceCoreInterface *core)
> >  {
> >-    int channels[2];
> >-
> >      memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> >-    main_dispatcher.core = core;
> >-
> >-    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> >-        red_error("socketpair failed %s", strerror(errno));
> >-        return;
> >-    }
> >-    pthread_mutex_init(&main_dispatcher.lock, NULL);
> >-    main_dispatcher.main_fd = channels[0];
> >-    main_dispatcher.other_fd = channels[1];
> >-    main_dispatcher.self = pthread_self();
> >-
> >-    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> >-                    main_dispatcher_handle_read,&main_dispatcher);
> >+    dispatcher_init(&main_dispatcher.base, core, MAIN_DISPATCHER_NUM_MESSAGES,&main_dispatcher.base);
> >+    dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> >+                                main_dispatcher_handle_channel_event,
> >+                                sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
> >  }
> 


More information about the Spice-devel mailing list