[Spice-devel] [PATCH v3] [0.8 branch] server: add main_dispatcher

Alon Levy alevy at redhat.com
Mon Oct 24 05:15:52 PDT 2011


On Mon, Oct 24, 2011 at 12:17:19PM +0200, Yonit Halperin wrote:
> On 10/23/2011 07:10 PM, Alon Levy wrote:
> >add main_dispatcher, a message passing mechanism for sending messages to
> >the main thread. The main thread is the thread that implements
> >SpiceCoreInterface, which is assumed to be a single thread.
> >
> >Similar to the async operation of red_worker, a socket pair is created
> >and used to pass messages. The messages are a fixed size to ease
> >parsing. A single message is defined to pass a channel_event.
> >
> >RHBZ: 746950
> >FDBZ: 41858
> >
> >This patch is 0.8 branch only, for the master branch there should be a
> >better approach to share code with red_dispatcher and ready the way for
> >later adding more threads.
> >---
> >v2->v3:
> >  Resending with the write loop change.
> >
> >v1->v2:
> >  channel_event: push a pointer, not a copy, like the old code did.
> >  add a lock.
> >  loop on write until all bytes written.
> >  loop on read until all bytes read.
> >
> Hi,
> It would be nice to add a patch that will also solve
> reds_update_mouse_mode being called from the wrong threads. Unless
> you think it is not necessary at this point.

ok, I'll try to do that in an additional patch, since it isn't in the
scope of this bz. I'll address all of your other comments which are all
good points and send v2.

> 
> Other comments and nitpicks are inlined.
> >  server/Makefile.am       |    2 +
> >  server/main_dispatcher.c |  143 ++++++++++++++++++++++++++++++++++++++++++++++
> >  server/main_dispatcher.h |    9 +++
> >  server/reds.c            |    4 +-
> >  4 files changed, 157 insertions(+), 1 deletions(-)
> >  create mode 100644 server/main_dispatcher.c
> >  create mode 100644 server/main_dispatcher.h
> >
> >diff --git a/server/Makefile.am b/server/Makefile.am
> >index 93ed312..ebb0d3f 100644
> >--- a/server/Makefile.am
> >+++ b/server/Makefile.am
> >@@ -114,6 +114,8 @@ libspice_server_la_SOURCES =			\
> >  	red_common.h				\
> >  	red_dispatcher.c			\
> >  	red_dispatcher.h			\
> >+	main_dispatcher.c			\
> >+	main_dispatcher.h			\
> >  	red_memslots.c				\
> >  	red_memslots.h				\
> >  	red_parse_qxl.c				\
> >diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> >new file mode 100644
> >index 0000000..93d95b3
> >--- /dev/null
> >+++ b/server/main_dispatcher.c
> >@@ -0,0 +1,143 @@
> >+#include<unistd.h>
> >+#include<string.h>
> >+#include<errno.h>
> >+#include<pthread.h>
> >+#include<assert.h>
> >+
> >+#include "red_common.h"
> >+#include "main_dispatcher.h"
> >+
> >+/*
> >+ * Main Dispatcher
> >+ * ===============
> >+ *
> >+ * Communication channel between any non main thread and the main thread.
> >+ *
> >+ * The main thread is that from which spice_server_init is called.
> >+ *
> >+ * Messages are single sized, sent from the non-main thread to the main-thread.
> >+ * No acknowledge is sent back. This prevents a possible deadlock with the main
> >+ * thread already waiting on a response for the existing red_dispatcher used
> >+ * by the worker thread.
> >+ *
> >+ * All events have three functions:
> >+ * main_dispatcher_<event_name>  - non static, public function
> >+ * main_dispatcher_self_<event_name>  - handler for main thread
> >+ * main_dispatcher_handle_<event_name>  - handler for callback from main thread
> >+ *   seperate from self because it may send an ack or do other work in the future.
> >+ */
> >+
> >+typedef struct {
> >+    SpiceCoreInterface *core;
> >+    int main_fd;
> >+    int other_fd;
> >+    pthread_t self;
> >+    pthread_mutex_t lock;
> >+} MainDispatcher;
> >+
> >+MainDispatcher main_dispatcher;
> >+
> >+enum {
> >+    MAIN_DISPATCHER_CHANNEL_EVENT = 0,
> >+
> >+    MAIN_DISPATCHER_NUM_MESSAGES
> >+};
> >+
> >+typedef struct MainDispatcherMessage {
> >+    uint32_t type;
> >+    union {
> >+        struct {
> >+            int event;
> >+            SpiceChannelEventInfo *info;
> >+        } channel_event;
> >+    } data;
> >+} MainDispatcherMessage;
> >+
> >+/* channel_event - calls core->channel_event, must be done in main thread */
> >+static void main_dispatcher_self_handle_channel_event(int event,
> >+    SpiceChannelEventInfo *info)
> the params of the routine should be aligned
> >+{
> >+    main_dispatcher.core->channel_event(event, info);
> >+}
> >+
> >+static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> >+{
> >+    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> >+        msg->data.channel_event.info);
> alignment
> >+}
> >+
> >+void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> >+{
> >+    MainDispatcherMessage msg;
> >+    ssize_t written = 0;
> >+    ssize_t ret;
> >+
> >+    if (pthread_self() == main_dispatcher.self) {
> >+        main_dispatcher_self_handle_channel_event(event, info);
> >+        return;
> >+    }
> >+    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> >+    msg.data.channel_event.event = event;
> >+    msg.data.channel_event.info = info;
> >+    pthread_mutex_lock(&main_dispatcher.lock);
> >+    while (written<  sizeof(msg)) {
> space after written
> >+        ret = write(main_dispatcher.other_fd,&msg + written,
> >+                    sizeof(msg) - written);
> >+        if (ret == -1) {
> >+            assert(errno == EAGAIN);
> Shouldn't it be EINTR? (you didn't mark the socket O_NONBLOCK).
> >+            continue;
> >+        }
> >+        written += ret;
> >+    }
> >+    pthread_mutex_unlock(&main_dispatcher.lock);
> >+}
> >+
> >+
> >+static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> >+{
> >+    int ret;
> >+    MainDispatcher *md = opaque;
> >+    MainDispatcherMessage msg;
> >+    int read_size = 0;
> >+
> >+    while (read_size<  sizeof(msg)) {
> >+        /* blocks until sizeof(msg) is read */
> >+        ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size);
> >+        if (ret == -1) {
> >+            if (errno != EAGAIN&&  errno != EPIPE) {
> Shouldn't it be EINTR? (you didn't mark the socket O_NONBLOCK).
> >+                red_printf("error reading from main dispatcher: %d\n", errno);
> >+                /* TODO: close channel? */
> >+                return;
> >+            }
> "continue" is missing
> >+        }
> >+        read_size += ret;
> >+    }
> >+    switch (msg.type) {
> >+        case MAIN_DISPATCHER_CHANNEL_EVENT:
> >+            main_dispatcher_handle_channel_event(&msg);
> >+            break;
> >+        default:
> >+            red_printf("error: unhandled main dispatcher message type %d\n",
> >+                       msg.type);
> >+    }
> >+}
> >+
> >+void main_dispatcher_init(SpiceCoreInterface *core)
> >+{
> >+    int channels[2];
> >+
> >+    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> >+    main_dispatcher.core = core;
> >+
> >+    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> >+        red_error("socketpair failed %s", strerror(errno));
> >+        return;
> >+    }
> >+    pthread_mutex_init(&main_dispatcher.lock, NULL);
> >+    main_dispatcher.main_fd = channels[0];
> >+    main_dispatcher.other_fd = channels[1];
> >+    main_dispatcher.self = pthread_self();
> >+
> >+    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> >+                    main_dispatcher_handle_read,&main_dispatcher);
> 						   ^ missing space
> >+}
> >diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> >new file mode 100644
> >index 0000000..2c201c7
> >--- /dev/null
> >+++ b/server/main_dispatcher.h
> >@@ -0,0 +1,9 @@
> >+#ifndef MAIN_DISPATCHER_H
> >+#define MAIN_DISPATCHER_H
> >+
> >+#include<spice.h>
> >+
> >+void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> >+void main_dispatcher_init(SpiceCoreInterface *core);
> >+
> >+#endif //MAIN_DISPATCHER_H
> >diff --git a/server/reds.c b/server/reds.c
> >index 8e83b99..0bb7e96 100644
> >--- a/server/reds.c
> >+++ b/server/reds.c
> >@@ -53,6 +53,7 @@
> >
> >  #include "red_common.h"
> >  #include "red_dispatcher.h"
> >+#include "main_dispatcher.h"
> >  #include "snd_worker.h"
> >  #include<spice/stats.h>
> >  #include "stat.h"
> >@@ -415,7 +416,7 @@ static void reds_channel_event(RedsStream *stream, int event)
> >  {
> >      if (core->base.minor_version<  3 || core->channel_event == NULL)
> >          return;
> >-    core->channel_event(event,&stream->info);
> >+    main_dispatcher_channel_event(event,&stream->info);
> >  }
> >
> >  static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> >@@ -4685,6 +4686,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
> >      reds->outgoing.vec = reds->outgoing.vec_buf;
> >
> >      init_vd_agent_resources();
> >+    main_dispatcher_init(core);
> >
> >      if (!(reds->mig_timer = core->timer_add(migrate_timeout, NULL))) {
> >          red_error("migration timer create failed");
> 


More information about the Spice-devel mailing list