[Spice-commits] 5 commits - server/dispatcher.c server/dispatcher.h server/main_dispatcher.c server/Makefile.am server/red_dispatcher.c server/red_dispatcher.h server/red_worker.c server/red_worker.h

Alon Levy alon at kemper.freedesktop.org
Tue Nov 8 06:24:54 PST 2011


 server/Makefile.am       |    2 
 server/dispatcher.c      |  261 +++++++++++++
 server/dispatcher.h      |  106 +++++
 server/main_dispatcher.c |  102 +----
 server/red_dispatcher.c  |  489 ++++++++++++++-----------
 server/red_dispatcher.h  |  149 +++++++
 server/red_worker.c      |  903 +++++++++++++++++++++++++++++------------------
 server/red_worker.h      |    5 
 8 files changed, 1394 insertions(+), 623 deletions(-)

New commits:
commit 8e049ce3b03c5e0034702d2a4b49b7ca36aaff92
Author: Alon Levy <alevy at redhat.com>
Date:   Mon Oct 31 17:35:30 2011 +0200

    server/red_worker: reuse dispatcher
    
    This patch reuses Dispatcher in RedDispatcher. It adds two helpers
    to red_worker to keep RedWorker opaque to the outside. The dispatcher is
    abused in three places that use the underlying socket directly:
     once sending a READY after red_init completes
     once for each channel creation, replying with the RedChannel instance
      for cursor and display.
    
    FDO Bugzilla: 42463
    
    rfc->v1:
    * move callbacks to red_worker.c including registration (Yonit)
    * rename dispatcher to red_dispatcher in red_worker.c and red_dispatcher.c
    * add accessor red_dispatcher_get_dispatcher
    * s/dispatcher_handle_recv/dispatcher_handle_recv_read/ and change sig to
      just Dispatcher *dispatcher (was the SpiceCoreInterface one)
    * remove SpiceCoreInterface parameter from dispatcher_init (Yonit)
    * main_dispatcher needed it for channel_event so it has it in
      struct MainDispatcher
    * add dispatcher_get_recv_fd for red_worker

diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 0c7a875..17b469e 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -37,6 +37,7 @@
 #include "reds_gl_canvas.h"
 #endif // USE_OPENGL
 #include "reds.h"
+#include "dispatcher.h"
 #include "red_dispatcher.h"
 #include "red_parse_qxl.h"
 
@@ -58,7 +59,7 @@ struct AsyncCommand {
 struct RedDispatcher {
     QXLWorker base;
     QXLInstance *qxl;
-    int channel;
+    Dispatcher dispatcher;
     pthread_t worker_thread;
     uint32_t pending;
     int primary_active;
@@ -93,19 +94,22 @@ static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *clie
                                             int num_common_caps, uint32_t *common_caps, int num_caps,
                                             uint32_t *caps)
 {
+    RedWorkerMessageDisplayConnect payload;
     RedDispatcher *dispatcher;
 
     red_printf("");
     dispatcher = (RedDispatcher *)channel->data;
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &client, sizeof(RedClient *));
-    send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
-    send_data(dispatcher->channel, &migration, sizeof(int));
+    payload.client = client;
+    payload.stream = stream;
+    payload.migration = migration;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_CONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
 {
+    RedWorkerMessageDisplayDisconnect payload;
     RedDispatcher *dispatcher;
 
     if (!rcc->channel) {
@@ -115,27 +119,28 @@ static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
     dispatcher = (RedDispatcher *)rcc->channel->data;
 
     red_printf("");
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+    payload.rcc = rcc;
 
     // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count
     // for channels
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_display_migrate(RedChannelClient *rcc)
 {
+    RedWorkerMessageDisplayMigrate payload;
     RedDispatcher *dispatcher;
     if (!rcc->channel) {
         return;
     }
     dispatcher = (RedDispatcher *)rcc->channel->data;
     red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+    payload.rcc = rcc;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
+                            &payload);
 }
 
 static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
@@ -143,17 +148,20 @@ static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *clien
                                            uint32_t *common_caps, int num_caps,
                                            uint32_t *caps)
 {
+    RedWorkerMessageCursorConnect payload;
     RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
     red_printf("");
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &client, sizeof(RedClient *));
-    send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
-    send_data(dispatcher->channel, &migration, sizeof(int));
+    payload.client = client;
+    payload.stream = stream;
+    payload.migration = migration;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_CONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
 {
+    RedWorkerMessageCursorDisconnect payload;
     RedDispatcher *dispatcher;
 
     if (!rcc->channel) {
@@ -162,16 +170,16 @@ static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
 
     dispatcher = (RedDispatcher *)rcc->channel->data;
     red_printf("");
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+    payload.rcc = rcc;
 
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
 {
+    RedWorkerMessageCursorMigrate payload;
     RedDispatcher *dispatcher;
 
     if (!rcc->channel) {
@@ -179,8 +187,10 @@ static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
     }
     dispatcher = (RedDispatcher *)rcc->channel->data;
     red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE;
-    write_message(dispatcher->channel, &message);
+    payload.rcc = rcc;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_MIGRATE,
+                            &payload);
 }
 
 typedef struct RendererInfo {
@@ -263,16 +273,16 @@ static void red_dispatcher_update_area(RedDispatcher *dispatcher, uint32_t surfa
                                    QXLRect *qxl_area, QXLRect *qxl_dirty_rects,
                                    uint32_t num_dirty_rects, uint32_t clear_dirty_region)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE;
+    RedWorkerMessageUpdate payload;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, &qxl_area, sizeof(QXLRect *));
-    send_data(dispatcher->channel, &qxl_dirty_rects, sizeof(QXLRect *));
-    send_data(dispatcher->channel, &num_dirty_rects, sizeof(uint32_t));
-    send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    payload.surface_id = surface_id;
+    payload.qxl_area = qxl_area;
+    payload.qxl_dirty_rects = qxl_dirty_rects;
+    payload.num_dirty_rects = num_dirty_rects;
+    payload.clear_dirty_region = clear_dirty_region;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_UPDATE,
+                            &payload);
 }
 
 static AsyncCommand *async_command_alloc(RedDispatcher *dispatcher,
@@ -297,13 +307,15 @@ static void red_dispatcher_update_area_async(RedDispatcher *dispatcher,
                                          uint64_t cookie)
 {
     RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
+    RedWorkerMessageUpdateAsync payload;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, qxl_area, sizeof(QXLRect));
-    send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t));
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    payload.qxl_area = *qxl_area;
+    payload.clear_dirty_region = clear_dirty_region;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            message,
+                            &payload);
 }
 
 static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id,
@@ -316,12 +328,12 @@ static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id,
 
 static void red_dispatcher_add_memslot(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT;
+    RedWorkerMessageAddMemslot payload;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    payload.mem_slot = *mem_slot;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_ADD_MEMSLOT,
+                            &payload);
 }
 
 static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slot)
@@ -331,21 +343,22 @@ static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slo
 
 static void red_dispatcher_add_memslot_async(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot, uint64_t cookie)
 {
+    RedWorkerMessageAddMemslotAsync payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot));
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.mem_slot = *mem_slot;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void red_dispatcher_del_memslot(RedDispatcher *dispatcher, uint32_t slot_group_id, uint32_t slot_id)
 {
+    RedWorkerMessageDelMemslot payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DEL_MEMSLOT;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &slot_group_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, &slot_id, sizeof(uint32_t));
+    payload.slot_group_id = slot_group_id;
+    payload.slot_id = slot_id;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id, uint32_t slot_id)
@@ -355,11 +368,11 @@ static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id
 
 static void red_dispatcher_destroy_surfaces(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES;
+    RedWorkerMessageDestroySurfaces payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DESTROY_SURFACES,
+                            &payload);
 }
 
 static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker)
@@ -369,11 +382,11 @@ static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker)
 
 static void red_dispatcher_destroy_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie)
 {
+    RedWorkerMessageDestroySurfacesAsync payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispatcher)
@@ -387,28 +400,37 @@ static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispa
 }
 
 static void
+red_dispatcher_destroy_primary_surface_sync(RedDispatcher *dispatcher,
+                                            uint32_t surface_id)
+{
+    RedWorkerMessageDestroyPrimarySurface payload;
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
+                            &payload);
+    red_dispatcher_destroy_primary_surface_complete(dispatcher);
+}
+
+static void
+red_dispatcher_destroy_primary_surface_async(RedDispatcher *dispatcher,
+                                             uint32_t surface_id, uint64_t cookie)
+{
+    RedWorkerMessageDestroyPrimarySurfaceAsync payload;
+    RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC;
+
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void
 red_dispatcher_destroy_primary_surface(RedDispatcher *dispatcher,
                                        uint32_t surface_id, int async, uint64_t cookie)
 {
-    RedWorkerMessage message;
-    AsyncCommand *cmd = NULL;
-
     if (async) {
-        message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC;
-        cmd = async_command_alloc(dispatcher, message, cookie);
+        red_dispatcher_destroy_primary_surface_async(dispatcher, surface_id, cookie);
     } else {
-        message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE;
-    }
-
-    write_message(dispatcher->channel, &message);
-    if (async) {
-        send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    }
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    if (!async) {
-        read_message(dispatcher->channel, &message);
-        ASSERT(message == RED_WORKER_MESSAGE_READY);
-        red_dispatcher_destroy_primary_surface_complete(dispatcher);
+        red_dispatcher_destroy_primary_surface_sync(dispatcher, surface_id);
     }
 }
 
@@ -431,30 +453,42 @@ static void red_dispatcher_create_primary_surface_complete(RedDispatcher *dispat
 }
 
 static void
-red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id,
-                                      QXLDevSurfaceCreate *surface, int async, uint64_t cookie)
+red_dispatcher_create_primary_surface_async(RedDispatcher *dispatcher, uint32_t surface_id,
+                                            QXLDevSurfaceCreate *surface, uint64_t cookie)
 {
-    RedWorkerMessage message;
-    AsyncCommand *cmd = NULL;
+    RedWorkerMessageCreatePrimarySurfaceAsync payload;
+    RedWorkerMessage message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC;
+
+    dispatcher->surface_create = *surface;
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    payload.surface = *surface;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void
+red_dispatcher_create_primary_surface_sync(RedDispatcher *dispatcher, uint32_t surface_id,
+                                           QXLDevSurfaceCreate *surface)
+{
+    RedWorkerMessageCreatePrimarySurface payload;
 
-    if (async) {
-        message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC;
-        cmd = async_command_alloc(dispatcher, message, cookie);
-    } else {
-        message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE;
-    }
     dispatcher->surface_create = *surface;
+    payload.surface_id = surface_id;
+    payload.surface = *surface;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
+                            &payload);
+    red_dispatcher_create_primary_surface_complete(dispatcher);
+}
 
-    write_message(dispatcher->channel, &message);
+static void
+red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id,
+                                      QXLDevSurfaceCreate *surface, int async, uint64_t cookie)
+{
     if (async) {
-        send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    }
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, surface, sizeof(QXLDevSurfaceCreate));
-    if (!async) {
-        read_message(dispatcher->channel, &message);
-        ASSERT(message == RED_WORKER_MESSAGE_READY);
-        red_dispatcher_create_primary_surface_complete(dispatcher);
+        red_dispatcher_create_primary_surface_async(dispatcher, surface_id, surface, cookie);
+    } else {
+        red_dispatcher_create_primary_surface_sync(dispatcher, surface_id, surface);
     }
 }
 
@@ -466,11 +500,11 @@ static void qxl_worker_create_primary_surface(QXLWorker *qxl_worker, uint32_t su
 
 static void red_dispatcher_reset_image_cache(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_IMAGE_CACHE;
+    RedWorkerMessageResetImageCache payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
+                            &payload);
 }
 
 static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker)
@@ -480,11 +514,11 @@ static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker)
 
 static void red_dispatcher_reset_cursor(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_CURSOR;
+    RedWorkerMessageResetCursor payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_RESET_CURSOR,
+                            &payload);
 }
 
 static void qxl_worker_reset_cursor(QXLWorker *qxl_worker)
@@ -492,29 +526,38 @@ static void qxl_worker_reset_cursor(QXLWorker *qxl_worker)
     red_dispatcher_reset_cursor((RedDispatcher*)qxl_worker);
 }
 
-static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher, uint32_t surface_id,
-                                                int async, uint64_t cookie)
+static void red_dispatcher_destroy_surface_wait_sync(RedDispatcher *dispatcher,
+                                                     uint32_t surface_id)
 {
-    RedWorkerMessage message;
-    AsyncCommand *cmd = NULL;
+    RedWorkerMessageDestroySurfaceWait payload;
 
-    if (async ) {
-        message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC;
-        cmd = async_command_alloc(dispatcher, message, cookie);
-    } else {
-        message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT;
-    }
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
+                            &payload);
+}
 
-    write_message(dispatcher->channel, &message);
-    if (async) {
-        send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    }
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
+static void red_dispatcher_destroy_surface_wait_async(RedDispatcher *dispatcher,
+                                                      uint32_t surface_id,
+                                                      uint64_t cookie)
+{
+    RedWorkerMessageDestroySurfaceWaitAsync payload;
+    RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC;
+
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher,
+                                                uint32_t surface_id,
+                                                int async, uint64_t cookie)
+{
     if (async) {
-        return;
+        red_dispatcher_destroy_surface_wait_async(dispatcher, surface_id, cookie);
+    } else {
+        red_dispatcher_destroy_surface_wait_sync(dispatcher, surface_id);
     }
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
 }
 
 static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surface_id)
@@ -524,9 +567,11 @@ static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surf
 
 static void red_dispatcher_reset_memslots(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_MEMSLOTS;
+    RedWorkerMessageResetMemslots payload;
 
-    write_message(dispatcher->channel, &message);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_RESET_MEMSLOTS,
+                            &payload);
 }
 
 static void qxl_worker_reset_memslots(QXLWorker *qxl_worker)
@@ -536,11 +581,15 @@ static void qxl_worker_reset_memslots(QXLWorker *qxl_worker)
 
 static void red_dispatcher_wakeup(RedDispatcher *dispatcher)
 {
-    if (!test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_WAKEUP;
-        set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending);
-        write_message(dispatcher->channel, &message);
+    RedWorkerMessageWakeup payload;
+
+    if (test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) {
+        return;
     }
+    set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_WAKEUP,
+                            &payload);
 }
 
 static void qxl_worker_wakeup(QXLWorker *qxl_worker)
@@ -550,11 +599,15 @@ static void qxl_worker_wakeup(QXLWorker *qxl_worker)
 
 static void red_dispatcher_oom(RedDispatcher *dispatcher)
 {
-    if (!test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_OOM;
-        set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending);
-        write_message(dispatcher->channel, &message);
+    RedWorkerMessageOom payload;
+
+    if (test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) {
+        return;
     }
+    set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_OOM,
+                            &payload);
 }
 
 static void qxl_worker_oom(QXLWorker *qxl_worker)
@@ -564,9 +617,11 @@ static void qxl_worker_oom(QXLWorker *qxl_worker)
 
 static void red_dispatcher_start(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_START;
+    RedWorkerMessageStart payload;
 
-    write_message(dispatcher->channel, &message);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_START,
+                            &payload);
 }
 
 static void qxl_worker_start(QXLWorker *qxl_worker)
@@ -576,20 +631,20 @@ static void qxl_worker_start(QXLWorker *qxl_worker)
 
 static void red_dispatcher_flush_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie)
 {
+    RedWorkerMessageFlushSurfacesAsync payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
+    payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void red_dispatcher_stop(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+    RedWorkerMessageStop payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_STOP,
+                            &payload);
 }
 
 static void qxl_worker_stop(QXLWorker *qxl_worker)
@@ -601,14 +656,14 @@ static void red_dispatcher_loadvm_commands(RedDispatcher *dispatcher,
                                            struct QXLCommandExt *ext,
                                            uint32_t count)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_LOADVM_COMMANDS;
+    RedWorkerMessageLoadvmCommands payload;
 
     red_printf("");
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &count, sizeof(uint32_t));
-    send_data(dispatcher->channel, ext, sizeof(QXLCommandExt) * count);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    payload.count = count;
+    payload.ext = ext;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_LOADVM_COMMANDS,
+                            &payload);
 }
 
 static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
@@ -640,37 +695,44 @@ static inline int calc_compression_level(void)
 
 void red_dispatcher_on_ic_change(void)
 {
+    RedWorkerMessageSetCompression payload;
     int compression_level = calc_compression_level();
     RedDispatcher *now = dispatchers;
+
     while (now) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_SET_COMPRESSION;
         now->qxl->st->qif->set_compression_level(now->qxl, compression_level);
-        write_message(now->channel, &message);
-        send_data(now->channel, &image_compression, sizeof(spice_image_compression_t));
+        payload.image_compression = image_compression;
+        dispatcher_send_message(&now->dispatcher,
+                                RED_WORKER_MESSAGE_SET_COMPRESSION,
+                                &payload);
         now = now->next;
     }
 }
 
 void red_dispatcher_on_sv_change(void)
 {
+    RedWorkerMessageSetStreamingVideo payload;
     int compression_level = calc_compression_level();
     RedDispatcher *now = dispatchers;
     while (now) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_SET_STREAMING_VIDEO;
         now->qxl->st->qif->set_compression_level(now->qxl, compression_level);
-        write_message(now->channel, &message);
-        send_data(now->channel, &streaming_video, sizeof(uint32_t));
+        payload.streaming_video = streaming_video;
+        dispatcher_send_message(&now->dispatcher,
+                                RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
+                                &payload);
         now = now->next;
     }
 }
 
 void red_dispatcher_set_mouse_mode(uint32_t mode)
 {
+    RedWorkerMessageSetMouseMode payload;
     RedDispatcher *now = dispatchers;
     while (now) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_SET_MOUSE_MODE;
-        write_message(now->channel, &message);
-        send_data(now->channel, &mode, sizeof(uint32_t));
+        payload.mode = mode;
+        dispatcher_send_message(&now->dispatcher,
+                                RED_WORKER_MESSAGE_SET_MOUSE_MODE,
+                                &payload);
         now = now->next;
     }
 }
@@ -873,34 +935,31 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher,
 
 static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE;
+    RedWorkerMessageDisplayChannelCreate payload;
     RedChannel *display_channel;
 
-    write_message(dispatcher->channel, &message);
-
-    receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+                            &payload);
+    receive_data(dispatcher->dispatcher.send_fd, &display_channel, sizeof(RedChannel *));
     return display_channel;
 }
 
 static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE;
+    RedWorkerMessageCursorChannelCreate payload;
     RedChannel *cursor_channel;
 
-    write_message(dispatcher->channel, &message);
-
-    receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+                            &payload);
+    receive_data(dispatcher->dispatcher.send_fd, &cursor_channel, sizeof(RedChannel *));
     return cursor_channel;
 }
 
 RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
 {
-    RedDispatcher *dispatcher;
-    int channels[2];
+    RedDispatcher *red_dispatcher;
     RedWorkerMessage message;
     WorkerInitData init_data;
     QXLDevInitInfo init_info;
@@ -917,45 +976,41 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
     gl_canvas_init();
 #endif // USE_OPENGL
 
-    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
-        red_error("socketpair failed %s", strerror(errno));
-    }
-
-    dispatcher = spice_new0(RedDispatcher, 1);
-    dispatcher->channel = channels[0];
-    ring_init(&dispatcher->async_commands);
-    DBG_ASYNC("dispatcher->async_commands.next %p", dispatcher->async_commands.next);
-    init_data.qxl = dispatcher->qxl = qxl;
+    red_dispatcher = spice_new0(RedDispatcher, 1);
+    ring_init(&red_dispatcher->async_commands);
+    DBG_ASYNC("red_dispatcher->async_commands.next %p", red_dispatcher->async_commands.next);
+    dispatcher_init(&red_dispatcher->dispatcher, RED_WORKER_MESSAGE_COUNT, NULL);
+    init_data.qxl = red_dispatcher->qxl = qxl;
     init_data.id = qxl->id;
-    init_data.channel = channels[1];
-    init_data.pending = &dispatcher->pending;
+    init_data.red_dispatcher = red_dispatcher;
+    init_data.pending = &red_dispatcher->pending;
     init_data.num_renderers = num_renderers;
     memcpy(init_data.renderers, renderers, sizeof(init_data.renderers));
 
-    pthread_mutex_init(&dispatcher->async_lock, NULL);
+    pthread_mutex_init(&red_dispatcher->async_lock, NULL);
     init_data.image_compression = image_compression;
     init_data.jpeg_state = jpeg_state;
     init_data.zlib_glz_state = zlib_glz_state;
     init_data.streaming_video = streaming_video;
 
-    dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR;
-    dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR;
-    dispatcher->base.wakeup = qxl_worker_wakeup;
-    dispatcher->base.oom = qxl_worker_oom;
-    dispatcher->base.start = qxl_worker_start;
-    dispatcher->base.stop = qxl_worker_stop;
-    dispatcher->base.update_area = qxl_worker_update_area;
-    dispatcher->base.add_memslot = qxl_worker_add_memslot;
-    dispatcher->base.del_memslot = qxl_worker_del_memslot;
-    dispatcher->base.reset_memslots = qxl_worker_reset_memslots;
-    dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces;
-    dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface;
-    dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface;
-
-    dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache;
-    dispatcher->base.reset_cursor = qxl_worker_reset_cursor;
-    dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait;
-    dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands;
+    red_dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR;
+    red_dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR;
+    red_dispatcher->base.wakeup = qxl_worker_wakeup;
+    red_dispatcher->base.oom = qxl_worker_oom;
+    red_dispatcher->base.start = qxl_worker_start;
+    red_dispatcher->base.stop = qxl_worker_stop;
+    red_dispatcher->base.update_area = qxl_worker_update_area;
+    red_dispatcher->base.add_memslot = qxl_worker_add_memslot;
+    red_dispatcher->base.del_memslot = qxl_worker_del_memslot;
+    red_dispatcher->base.reset_memslots = qxl_worker_reset_memslots;
+    red_dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces;
+    red_dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface;
+    red_dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface;
+
+    red_dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache;
+    red_dispatcher->base.reset_cursor = qxl_worker_reset_cursor;
+    red_dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait;
+    red_dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands;
 
     qxl->st->qif->get_init_info(qxl, &init_info);
 
@@ -965,7 +1020,6 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
     init_data.num_memslots_groups = init_info.num_memslots_groups;
     init_data.internal_groupslot_id = init_info.internal_groupslot_id;
     init_data.n_surfaces = init_info.n_surfaces;
-    init_data.dispatcher = dispatcher;
 
     num_active_workers = 1;
 
@@ -974,40 +1028,51 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
     sigdelset(&thread_sig_mask, SIGFPE);
     sigdelset(&thread_sig_mask, SIGSEGV);
     pthread_sigmask(SIG_SETMASK, &thread_sig_mask, &curr_sig_mask);
-    if ((r = pthread_create(&dispatcher->worker_thread, NULL, red_worker_main, &init_data))) {
+    if ((r = pthread_create(&red_dispatcher->worker_thread, NULL, red_worker_main, &init_data))) {
         red_error("create thread failed %d", r);
     }
     pthread_sigmask(SIG_SETMASK, &curr_sig_mask, NULL);
 
-    read_message(dispatcher->channel, &message);
+    read_message(red_dispatcher->dispatcher.send_fd, &message);
     ASSERT(message == RED_WORKER_MESSAGE_READY);
 
-    display_channel = red_dispatcher_display_channel_create(dispatcher);
+    display_channel = red_dispatcher_display_channel_create(red_dispatcher);
 
     if (display_channel) {
         client_cbs.connect = red_dispatcher_set_display_peer;
         client_cbs.disconnect = red_dispatcher_disconnect_display_peer;
         client_cbs.migrate = red_dispatcher_display_migrate;
         red_channel_register_client_cbs(display_channel, &client_cbs);
-        red_channel_set_data(display_channel, dispatcher);
+        red_channel_set_data(display_channel, red_dispatcher);
         reds_register_channel(display_channel);
     }
 
-    cursor_channel = red_dispatcher_cursor_channel_create(dispatcher);
+    cursor_channel = red_dispatcher_cursor_channel_create(red_dispatcher);
 
     if (cursor_channel) {
         client_cbs.connect = red_dispatcher_set_cursor_peer;
         client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer;
         client_cbs.migrate = red_dispatcher_cursor_migrate;
         red_channel_register_client_cbs(cursor_channel, &client_cbs);
-        red_channel_set_data(cursor_channel, dispatcher);
+        red_channel_set_data(cursor_channel, red_dispatcher);
         reds_register_channel(cursor_channel);
     }
 
-    qxl->st->qif->attache_worker(qxl, &dispatcher->base);
+    qxl->st->qif->attache_worker(qxl, &red_dispatcher->base);
     qxl->st->qif->set_compression_level(qxl, calc_compression_level());
 
-    dispatcher->next = dispatchers;
-    dispatchers = dispatcher;
-    return dispatcher;
+    red_dispatcher->next = dispatchers;
+    dispatchers = red_dispatcher;
+    return red_dispatcher;
+}
+
+struct Dispatcher *red_dispatcher_get_dispatcher(RedDispatcher *red_dispatcher)
+{
+    return &red_dispatcher->dispatcher;
+}
+
+void red_dispatcher_set_dispatcher_opaque(struct RedDispatcher *red_dispatcher,
+                                          void *opaque)
+{
+    dispatcher_set_opaque(&red_dispatcher->dispatcher, opaque);
 }
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index c2582f4..7417aac 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -32,5 +32,154 @@ int red_dispatcher_add_renderer(const char *name);
 uint32_t red_dispatcher_qxl_ram_size(void);
 int red_dispatcher_qxl_count(void);
 void red_dispatcher_async_complete(struct RedDispatcher *, AsyncCommand *);
+struct Dispatcher *red_dispatcher_get_dispatcher(struct RedDispatcher *);
+
+typedef struct RedWorkerMessageDisplayConnect {
+    RedClient * client;
+    RedsStream * stream;
+    int migration;
+} RedWorkerMessageDisplayConnect;
+
+typedef struct RedWorkerMessageDisplayDisconnect {
+    RedChannelClient *rcc;
+} RedWorkerMessageDisplayDisconnect;
+
+typedef struct RedWorkerMessageDisplayMigrate {
+    RedChannelClient *rcc;
+} RedWorkerMessageDisplayMigrate;
+
+typedef struct RedWorkerMessageCursorConnect {
+    RedClient *client;
+    RedsStream *stream;
+    int migration;
+} RedWorkerMessageCursorConnect;
+
+typedef struct RedWorkerMessageCursorDisconnect {
+    RedChannelClient *rcc;
+} RedWorkerMessageCursorDisconnect;
+
+typedef struct RedWorkerMessageCursorMigrate {
+    RedChannelClient *rcc;
+} RedWorkerMessageCursorMigrate;
+
+typedef struct RedWorkerMessageUpdate {
+    uint32_t surface_id;
+    QXLRect * qxl_area;
+    QXLRect * qxl_dirty_rects;
+    uint32_t num_dirty_rects;
+    uint32_t clear_dirty_region;
+} RedWorkerMessageUpdate;
+
+typedef struct RedWorkerMessageAsync {
+    AsyncCommand *cmd;
+} RedWorkerMessageAsync;
+
+typedef struct RedWorkerMessageUpdateAsync {
+    RedWorkerMessageAsync base;
+    uint32_t surface_id;
+    QXLRect qxl_area;
+    uint32_t clear_dirty_region;
+} RedWorkerMessageUpdateAsync;
+
+typedef struct RedWorkerMessageAddMemslot {
+    QXLDevMemSlot mem_slot;
+} RedWorkerMessageAddMemslot;
+
+typedef struct RedWorkerMessageAddMemslotAsync {
+    RedWorkerMessageAsync base;
+    QXLDevMemSlot mem_slot;
+} RedWorkerMessageAddMemslotAsync;
+
+typedef struct RedWorkerMessageDelMemslot {
+    uint32_t slot_group_id;
+    uint32_t slot_id;
+} RedWorkerMessageDelMemslot;
+
+typedef struct RedWorkerMessageDestroySurfaces {
+} RedWorkerMessageDestroySurfaces;
+
+typedef struct RedWorkerMessageDestroySurfacesAsync {
+    RedWorkerMessageAsync base;
+} RedWorkerMessageDestroySurfacesAsync;
+
+
+typedef struct RedWorkerMessageDestroyPrimarySurface {
+    uint32_t surface_id;
+} RedWorkerMessageDestroyPrimarySurface;
+
+typedef struct RedWorkerMessageDestroyPrimarySurfaceAsync {
+    RedWorkerMessageAsync base;
+    uint32_t surface_id;
+} RedWorkerMessageDestroyPrimarySurfaceAsync;
+
+typedef struct RedWorkerMessageCreatePrimarySurfaceAsync {
+    RedWorkerMessageAsync base;
+    uint32_t surface_id;
+    QXLDevSurfaceCreate surface;
+} RedWorkerMessageCreatePrimarySurfaceAsync;
+
+typedef struct RedWorkerMessageCreatePrimarySurface {
+    uint32_t surface_id;
+    QXLDevSurfaceCreate surface;
+} RedWorkerMessageCreatePrimarySurface;
+
+typedef struct RedWorkerMessageResetImageCache {
+} RedWorkerMessageResetImageCache;
+
+typedef struct RedWorkerMessageResetCursor {
+} RedWorkerMessageResetCursor;
+
+typedef struct RedWorkerMessageWakeup {
+} RedWorkerMessageWakeup;
+
+typedef struct RedWorkerMessageOom {
+} RedWorkerMessageOom;
+
+typedef struct RedWorkerMessageStart {
+} RedWorkerMessageStart;
+
+typedef struct RedWorkerMessageFlushSurfacesAsync {
+    RedWorkerMessageAsync base;
+} RedWorkerMessageFlushSurfacesAsync;
+
+typedef struct RedWorkerMessageStop {
+} RedWorkerMessageStop;
+
+/* this command is sync, so it's ok to pass a pointer */
+typedef struct RedWorkerMessageLoadvmCommands {
+    uint32_t count;
+    QXLCommandExt *ext;
+} RedWorkerMessageLoadvmCommands;
+
+typedef struct RedWorkerMessageSetCompression {
+    spice_image_compression_t image_compression;
+} RedWorkerMessageSetCompression;
+
+typedef struct RedWorkerMessageSetStreamingVideo {
+    uint32_t streaming_video;
+} RedWorkerMessageSetStreamingVideo;
+
+typedef struct RedWorkerMessageSetMouseMode {
+    uint32_t mode;
+} RedWorkerMessageSetMouseMode;
+
+typedef struct RedWorkerMessageDisplayChannelCreate {
+} RedWorkerMessageDisplayChannelCreate;
+
+typedef struct RedWorkerMessageCursorChannelCreate {
+} RedWorkerMessageCursorChannelCreate;
+
+typedef struct RedWorkerMessageDestroySurfaceWait {
+    uint32_t surface_id;
+} RedWorkerMessageDestroySurfaceWait;
+
+typedef struct RedWorkerMessageDestroySurfaceWaitAsync {
+    RedWorkerMessageAsync base;
+    uint32_t surface_id;
+} RedWorkerMessageDestroySurfaceWaitAsync;
+
+typedef struct RedWorkerMessageResetMemslots {
+} RedWorkerMessageResetMemslots;
+
 
 #endif
diff --git a/server/red_worker.c b/server/red_worker.c
index de8a820..cb48f09 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -72,6 +72,7 @@
 #include "zlib_encoder.h"
 #include "red_channel.h"
 #include "red_dispatcher.h"
+#include "dispatcher.h"
 #include "main_channel.h"
 
 //#define COMPRESS_STAT
@@ -865,9 +866,10 @@ typedef struct RedWorker {
     DisplayChannel *display_channel;
     CursorChannel *cursor_channel;
     QXLInstance *qxl;
-    RedDispatcher *dispatcher;
-    int id;
+    RedDispatcher *red_dispatcher;
+
     int channel;
+    int id;
     int running;
     uint32_t *pending;
     int epoll;
@@ -10103,21 +10105,19 @@ static void surface_dirty_region_to_rects(RedSurface *surface,
     free(dirty_rects);
 }
 
-static inline void handle_dev_update_async(RedWorker *worker)
+void handle_dev_update_async(void *opaque, void *payload)
 {
-    QXLRect qxl_rect;
+    RedWorker *worker = opaque;
+    RedWorkerMessageUpdateAsync *msg = payload;
     SpiceRect rect;
-    uint32_t surface_id;
-    uint32_t clear_dirty_region;
     QXLRect *qxl_dirty_rects;
     uint32_t num_dirty_rects;
     RedSurface *surface;
+    uint32_t surface_id = msg->surface_id;
+    QXLRect qxl_area = msg->qxl_area;
+    uint32_t clear_dirty_region = msg->clear_dirty_region;
 
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-    receive_data(worker->channel, &qxl_rect, sizeof(QXLRect));
-    receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t));
-
-    red_get_rect_ptr(&rect, &qxl_rect);
+    red_get_rect_ptr(&rect, &qxl_area);
     flush_display_commands(worker);
 
     ASSERT(worker->running);
@@ -10140,24 +10140,20 @@ static inline void handle_dev_update_async(RedWorker *worker)
     free(qxl_dirty_rects);
 }
 
-static inline void handle_dev_update(RedWorker *worker)
+void handle_dev_update(void *opaque, void *payload)
 {
-    const QXLRect *qxl_rect;
+    RedWorker *worker = opaque;
+    RedWorkerMessageUpdate *msg = payload;
     SpiceRect *rect = spice_new0(SpiceRect, 1);
-    QXLRect *qxl_dirty_rects;
     RedSurface *surface;
-    uint32_t num_dirty_rects;
-    uint32_t surface_id;
-    uint32_t clear_dirty_region;
-
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-    receive_data(worker->channel, &qxl_rect, sizeof(QXLRect *));
-    receive_data(worker->channel, &qxl_dirty_rects, sizeof(QXLRect *));
-    receive_data(worker->channel, &num_dirty_rects, sizeof(uint32_t));
-    receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t));
+    uint32_t surface_id = msg->surface_id;
+    const QXLRect *qxl_area = msg->qxl_area;
+    uint32_t num_dirty_rects = msg->num_dirty_rects;
+    QXLRect *qxl_dirty_rects = msg->qxl_dirty_rects;
+    uint32_t clear_dirty_region = msg->clear_dirty_region;
 
     surface = &worker->surfaces[surface_id];
-    red_get_rect_ptr(rect, qxl_rect);
+    red_get_rect_ptr(rect, qxl_area);
     flush_display_commands(worker);
 
     ASSERT(worker->running);
@@ -10170,28 +10166,36 @@ static inline void handle_dev_update(RedWorker *worker)
                                   clear_dirty_region);
 }
 
-static inline void handle_dev_add_memslot(RedWorker *worker)
+static void dev_add_memslot(RedWorker *worker, QXLDevMemSlot mem_slot)
 {
-    QXLDevMemSlot dev_slot;
+    red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id,
+                              mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end,
+                              mem_slot.generation);
+}
 
-    receive_data(worker->channel, &dev_slot, sizeof(QXLDevMemSlot));
+void handle_dev_add_memslot(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+    RedWorkerMessageAddMemslot *msg = payload;
+    QXLDevMemSlot mem_slot = msg->mem_slot;
 
-    red_memslot_info_add_slot(&worker->mem_slots, dev_slot.slot_group_id, dev_slot.slot_id,
-                              dev_slot.addr_delta, dev_slot.virt_start, dev_slot.virt_end,
-                              dev_slot.generation);
+    red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id,
+                              mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end,
+                              mem_slot.generation);
 }
 
-static inline void handle_dev_del_memslot(RedWorker *worker)
+void handle_dev_del_memslot(void *opaque, void *payload)
 {
-    uint32_t slot_id;
-    uint32_t slot_group_id;
-
-    receive_data(worker->channel, &slot_group_id, sizeof(uint32_t));
-    receive_data(worker->channel, &slot_id, sizeof(uint32_t));
+    RedWorker *worker = opaque;
+    RedWorkerMessageDelMemslot *msg = payload;
+    uint32_t slot_id = msg->slot_id;
+    uint32_t slot_group_id = msg->slot_group_id;
 
     red_memslot_info_del_slot(&worker->mem_slots, slot_group_id, slot_id);
 }
 
+/* TODO: destroy_surface_wait, dev_destroy_surface_wait - confusing. one asserts
+ * surface_id == 0, maybe move the assert upward and merge the two functions? */
 static inline void destroy_surface_wait(RedWorker *worker, int surface_id)
 {
     if (!worker->surfaces[surface_id].context.canvas) {
@@ -10206,12 +10210,8 @@ static inline void destroy_surface_wait(RedWorker *worker, int surface_id)
     red_clear_surface_drawables_from_pipes(worker, surface_id, TRUE, TRUE);
 }
 
-static inline void handle_dev_destroy_surface_wait(RedWorker *worker)
+static void dev_destroy_surface_wait(RedWorker *worker, uint32_t surface_id)
 {
-    uint32_t surface_id;
-
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-
     ASSERT(surface_id == 0);
 
     flush_all_qxl_commands(worker);
@@ -10221,6 +10221,14 @@ static inline void handle_dev_destroy_surface_wait(RedWorker *worker)
     }
 }
 
+void handle_dev_destroy_surface_wait(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroySurfaceWait *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_destroy_surface_wait(worker, msg->surface_id);
+}
+
 static inline void red_cursor_reset(RedWorker *worker)
 {
     if (worker->cursor) {
@@ -10244,7 +10252,7 @@ static inline void red_cursor_reset(RedWorker *worker)
 /* called upon device reset */
 
 /* TODO: split me*/
-static inline void handle_dev_destroy_surfaces(RedWorker *worker)
+static inline void dev_destroy_surfaces(RedWorker *worker)
 {
     int i;
 
@@ -10273,14 +10281,17 @@ static inline void handle_dev_destroy_surfaces(RedWorker *worker)
     red_cursor_reset(worker);
 }
 
-static inline void handle_dev_create_primary_surface(RedWorker *worker)
+void handle_dev_destroy_surfaces(void *opaque, void *payload)
 {
-    uint32_t surface_id;
-    QXLDevSurfaceCreate surface;
-    uint8_t *line_0;
+    RedWorker *worker = opaque;
 
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-    receive_data(worker->channel, &surface, sizeof(QXLDevSurfaceCreate));
+    dev_destroy_surfaces(worker);
+}
+
+static void dev_create_primary_surface(RedWorker *worker, uint32_t surface_id,
+                                       QXLDevSurfaceCreate surface)
+{
+    uint8_t *line_0;
 
     PANIC_ON(surface_id != 0);
     PANIC_ON(surface.height == 0);
@@ -10308,12 +10319,16 @@ static inline void handle_dev_create_primary_surface(RedWorker *worker)
     }
 }
 
-static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
+void handle_dev_create_primary_surface(void *opaque, void *payload)
 {
-    uint32_t surface_id;
+    RedWorkerMessageCreatePrimarySurface *msg = payload;
+    RedWorker *worker = opaque;
 
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
+    dev_create_primary_surface(worker, msg->surface_id, msg->surface);
+}
 
+static void dev_destroy_primary_surface(RedWorker *worker, uint32_t surface_id)
+{
     PANIC_ON(surface_id != 0);
 
     if (!worker->surfaces[surface_id].context.canvas) {
@@ -10322,7 +10337,7 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
     }
 
     flush_all_qxl_commands(worker);
-    destroy_surface_wait(worker, 0);
+    dev_destroy_surface_wait(worker, 0);
     red_destroy_surface(worker, 0);
     ASSERT(ring_is_empty(&worker->streams));
 
@@ -10331,6 +10346,24 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
     red_cursor_reset(worker);
 }
 
+void handle_dev_destroy_primary_surface(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroyPrimarySurface *msg = payload;
+    RedWorker *worker = opaque;
+    uint32_t surface_id = msg->surface_id;
+
+    dev_destroy_primary_surface(worker, surface_id);
+}
+
+void handle_dev_destroy_primary_surface_async(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroyPrimarySurfaceAsync *msg = payload;
+    RedWorker *worker = opaque;
+    uint32_t surface_id = msg->surface_id;
+
+    dev_destroy_primary_surface(worker, surface_id);
+}
+
 static void flush_all_surfaces(RedWorker *worker)
 {
     int x;
@@ -10342,7 +10375,7 @@ static void flush_all_surfaces(RedWorker *worker)
     }
 }
 
-static void handle_dev_flush_surfaces(RedWorker *worker)
+static void dev_flush_surfaces(RedWorker *worker)
 {
     flush_all_qxl_commands(worker);
     flush_all_surfaces(worker);
@@ -10350,8 +10383,25 @@ static void handle_dev_flush_surfaces(RedWorker *worker)
     red_wait_outgoing_items(&worker->cursor_channel->common.base);
 }
 
-static void handle_dev_stop(RedWorker *worker)
+void handle_dev_flush_surfaces(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    dev_flush_surfaces(worker);
+}
+
+void handle_dev_flush_surfaces_async(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    dev_flush_surfaces(worker);
+}
+
+void handle_dev_stop(void *opaque, void *payload)
 {
+    RedWorker *worker = opaque;
+
+    red_printf("stop");
     ASSERT(worker->running);
     worker->running = FALSE;
     red_display_clear_glz_drawables(worker->display_channel);
@@ -10360,8 +10410,9 @@ static void handle_dev_stop(RedWorker *worker)
     red_wait_outgoing_items(&worker->cursor_channel->common.base);
 }
 
-static void handle_dev_start(RedWorker *worker)
+void handle_dev_start(void *opaque, void *payload)
 {
+    RedWorker *worker = opaque;
     RedChannel *cursor_red_channel = &worker->cursor_channel->common.base;
     RedChannel *display_red_channel = &worker->display_channel->common.base;
 
@@ -10375,308 +10426,488 @@ static void handle_dev_start(RedWorker *worker)
     worker->running = TRUE;
 }
 
-static void handle_dev_input(EventListener *listener, uint32_t events)
+void handle_dev_wakeup(void *opaque, void *payload)
 {
-    RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
-    RedWorkerMessage message;
+    RedWorker *worker = opaque;
+
+    clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending);
+    stat_inc_counter(worker->wakeup_counter, 1);
+}
+
+void handle_dev_oom(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
     RedChannel *display_red_channel = &worker->display_channel->common.base;
     int ring_is_empty;
-    int call_async_complete = 0;
-    int write_ready = 0;
-    AsyncCommand *cmd;
-
-    read_message(worker->channel, &message);
-
-    /* for async messages we do the common work in the handler, and
-     * send a ready or call async_complete from here, hence the added switch. */
-    switch (message) {
-    case RED_WORKER_MESSAGE_UPDATE_ASYNC:
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC:
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC:
-    case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC:
-        call_async_complete = 1;
-        receive_data(worker->channel, &cmd, sizeof(cmd));
-        break;
-    case RED_WORKER_MESSAGE_UPDATE:
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES:
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE:
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT:
-    case RED_WORKER_MESSAGE_RESET_CURSOR:
-    case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
-    case RED_WORKER_MESSAGE_STOP:
-    case RED_WORKER_MESSAGE_LOADVM_COMMANDS:
-    case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE:
-    case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE:
-    case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
-    case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
-        write_ready = 1;
-    default:
-        break;
-    }
 
-    switch (message) {
-    case RED_WORKER_MESSAGE_UPDATE_ASYNC:
-        handle_dev_update_async(worker);
-        break;
-    case RED_WORKER_MESSAGE_UPDATE:
-        handle_dev_update(worker);
-        break;
-    case RED_WORKER_MESSAGE_WAKEUP:
-        clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending);
-        stat_inc_counter(worker->wakeup_counter, 1);
-        break;
-    case RED_WORKER_MESSAGE_OOM:
-        ASSERT(worker->running);
-        // streams? but without streams also leak
-        red_printf_debug(1, "WORKER",
-                         "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
-                         worker->drawable_count,
-                         worker->red_drawable_count,
-                         worker->glz_drawable_count,
-                         worker->current_size,
-                         worker->display_channel ?
-                         red_channel_sum_pipes_size(display_red_channel) : 0);
-        while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) {
-            red_channel_push(&worker->display_channel->common.base);
-        }
-        if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) {
-            red_free_some(worker);
-            worker->qxl->st->qif->flush_resources(worker->qxl);
-        }
-        red_printf_debug(1, "WORKER",
-                         "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
-                         worker->drawable_count,
-                         worker->red_drawable_count,
-                         worker->glz_drawable_count,
-                         worker->current_size,
-                         worker->display_channel ?
-                         red_channel_sum_pipes_size(display_red_channel) : 0);
-        clear_bit(RED_WORKER_PENDING_OOM, worker->pending);
-        break;
-    case RED_WORKER_MESSAGE_RESET_CURSOR:
-        red_cursor_reset(worker);
-        break;
-    case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
-        image_cache_reset(&worker->image_cache);
-        break;
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT:
-        handle_dev_destroy_surface_wait(worker);
-        break;
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES:
-        handle_dev_destroy_surfaces(worker);
-        break;
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE:
-        handle_dev_create_primary_surface(worker);
-        break;
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
-        handle_dev_destroy_primary_surface(worker);
-        break;
-    case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: {
-        RedChannel *red_channel;
-        // TODO: handle seemless migration. Temp, setting migrate to FALSE
-        ensure_display_channel_created(worker, FALSE);
-        red_channel = &worker->display_channel->common.base;
-        send_data(worker->channel, &red_channel, sizeof(RedChannel *));
-        break;
+    ASSERT(worker->running);
+    // streams? but without streams also leak
+    red_printf_debug(1, "WORKER",
+                     "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
+                     worker->drawable_count,
+                     worker->red_drawable_count,
+                     worker->glz_drawable_count,
+                     worker->current_size,
+                     worker->display_channel ?
+                     red_channel_sum_pipes_size(display_red_channel) : 0);
+    while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) {
+        red_channel_push(&worker->display_channel->common.base);
     }
-    case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
-        RedsStream *stream;
-        RedClient *client;
-        int migrate;
-        red_printf("connect");
-
-        receive_data(worker->channel, &client, sizeof(RedClient *));
-        receive_data(worker->channel, &stream, sizeof(RedsStream *));
-        receive_data(worker->channel, &migrate, sizeof(int));
-        handle_new_display_channel(worker, client, stream, migrate);
-        break;
+    if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) {
+        red_free_some(worker);
+        worker->qxl->st->qif->flush_resources(worker->qxl);
     }
-    case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: {
-        RedChannelClient *rcc;
+    red_printf_debug(1, "WORKER",
+                     "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
+                     worker->drawable_count,
+                     worker->red_drawable_count,
+                     worker->glz_drawable_count,
+                     worker->current_size,
+                     worker->display_channel ?
+                     red_channel_sum_pipes_size(display_red_channel) : 0);
+    clear_bit(RED_WORKER_PENDING_OOM, worker->pending);
+}
 
-        red_printf("disconnect display client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        display_channel_client_disconnect(rcc);
-        break;
-    }
-    case RED_WORKER_MESSAGE_STOP: {
-        red_printf("stop");
-        handle_dev_stop(worker);
+void handle_dev_reset_cursor(void *opaque, void *payload)
+{
+    red_cursor_reset((RedWorker *)opaque);
+}
+
+void handle_dev_reset_image_cache(void *opaque, void *payload)
+{
+    image_cache_reset(&((RedWorker *)opaque)->image_cache);
+}
+
+void handle_dev_destroy_surface_wait_async(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroySurfaceWaitAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_destroy_surface_wait(worker, msg->surface_id);
+}
+
+void handle_dev_destroy_surfaces_async(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    dev_destroy_surfaces(worker);
+}
+
+void handle_dev_create_primary_surface_async(void *opaque, void *payload)
+{
+    RedWorkerMessageCreatePrimarySurfaceAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_create_primary_surface(worker, msg->surface_id, msg->surface);
+}
+
+/* exception for Dispatcher, data going from red_worker to main thread,
+ * TODO: use a different dispatcher?
+ * TODO: leave direct usage of channel(fd)? It's only used right after the
+ * pthread is created, since the channel duration is the lifetime of the spice
+ * server. */
+
+void handle_dev_display_channel_create(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    RedChannel *red_channel;
+    // TODO: handle seemless migration. Temp, setting migrate to FALSE
+    ensure_display_channel_created(worker, FALSE);
+    red_channel = &worker->display_channel->common.base;
+    send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+}
+
+void handle_dev_display_connect(void *opaque, void *payload)
+{
+    RedWorkerMessageDisplayConnect *msg = payload;
+    RedWorker *worker = opaque;
+    RedsStream *stream = msg->stream;
+    RedClient *client = msg->client;
+    int migration = msg->migration;
+
+    red_printf("connect");
+    handle_new_display_channel(worker, client, stream, migration);
+}
+
+void handle_dev_display_disconnect(void *opaque, void *payload)
+{
+    RedWorkerMessageDisplayDisconnect *msg = payload;
+    RedChannelClient *rcc = msg->rcc;
+
+    red_printf("disconnect display client");
+    ASSERT(rcc);
+    display_channel_client_disconnect(rcc);
+}
+
+void handle_dev_display_migrate(void *opaque, void *payload)
+{
+    RedWorkerMessageDisplayMigrate *msg = payload;
+    RedWorker *worker = opaque;
+
+    RedChannelClient *rcc = msg->rcc;
+    red_printf("migrate display client");
+    ASSERT(rcc);
+    red_migrate_display(worker, rcc);
+}
+
+/* TODO: special, perhaps use another dispatcher? */
+void handle_dev_cursor_channel_create(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+    RedChannel *red_channel;
+
+    // TODO: handle seemless migration. Temp, setting migrate to FALSE
+    ensure_cursor_channel_created(worker, FALSE);
+    red_channel = &worker->cursor_channel->common.base;
+    send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+}
+
+void handle_dev_cursor_connect(void *opaque, void *payload)
+{
+    RedWorkerMessageCursorConnect *msg = payload;
+    RedWorker *worker = opaque;
+    RedsStream *stream = msg->stream;
+    RedClient *client = msg->client;
+    int migration = msg->migration;
+
+    red_printf("cursor connect");
+    red_connect_cursor(worker, client, stream, migration);
+}
+
+void handle_dev_cursor_disconnect(void *opaque, void *payload)
+{
+    RedWorkerMessageCursorDisconnect *msg = payload;
+    RedChannelClient *rcc = msg->rcc;
+
+    red_printf("disconnect cursor client");
+    ASSERT(rcc);
+    cursor_channel_client_disconnect(rcc);
+}
+
+void handle_dev_cursor_migrate(void *opaque, void *payload)
+{
+    RedWorkerMessageCursorMigrate *msg = payload;
+    RedWorker *worker = opaque;
+    RedChannelClient *rcc = msg->rcc;
+
+    red_printf("migrate cursor client");
+    ASSERT(rcc);
+    red_migrate_cursor(worker, rcc);
+}
+
+void handle_dev_set_compression(void *opaque, void *payload)
+{
+    RedWorkerMessageSetCompression *msg = payload;
+    RedWorker *worker = opaque;
+
+    worker->image_compression = msg->image_compression;
+    switch (worker->image_compression) {
+    case SPICE_IMAGE_COMPRESS_AUTO_LZ:
+        red_printf("ic auto_lz");
         break;
-    }
-    case RED_WORKER_MESSAGE_START:
-        red_printf("start");
-        handle_dev_start(worker);
+    case SPICE_IMAGE_COMPRESS_AUTO_GLZ:
+        red_printf("ic auto_glz");
         break;
-    case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: {
-        RedChannelClient *rcc;
-        red_printf("migrate display client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        red_migrate_display(worker, rcc);
+    case SPICE_IMAGE_COMPRESS_QUIC:
+        red_printf("ic quic");
         break;
-    }
-    case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: {
-        RedChannel *red_channel;
-        // TODO: handle seemless migration. Temp, setting migrate to FALSE
-        ensure_cursor_channel_created(worker, FALSE);
-        red_channel = &worker->cursor_channel->common.base;
-        send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+    case SPICE_IMAGE_COMPRESS_LZ:
+        red_printf("ic lz");
         break;
-    }
-    case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
-        RedsStream *stream;
-        RedClient *client;
-        int migrate;
-
-        red_printf("cursor connect");
-        receive_data(worker->channel, &client, sizeof(RedClient *));
-        receive_data(worker->channel, &stream, sizeof(RedsStream *));
-        receive_data(worker->channel, &migrate, sizeof(int));
-        red_connect_cursor(worker, client, stream, migrate);
+    case SPICE_IMAGE_COMPRESS_GLZ:
+        red_printf("ic glz");
         break;
-    }
-    case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: {
-        RedChannelClient *rcc;
-
-        red_printf("disconnect cursor client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        cursor_channel_client_disconnect(rcc);
+    case SPICE_IMAGE_COMPRESS_OFF:
+        red_printf("ic off");
         break;
+    default:
+        red_printf("ic invalid");
     }
-    case RED_WORKER_MESSAGE_CURSOR_MIGRATE: {
-        RedChannelClient *rcc;
-        red_printf("migrate cursor client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        red_migrate_cursor(worker, rcc);
-        break;
+#ifdef COMPRESS_STAT
+    print_compress_stats(worker->display_channel);
+    if (worker->display_channel) {
+        stat_reset(&worker->display_channel->quic_stat);
+        stat_reset(&worker->display_channel->lz_stat);
+        stat_reset(&worker->display_channel->glz_stat);
+        stat_reset(&worker->display_channel->jpeg_stat);
+        stat_reset(&worker->display_channel->zlib_glz_stat);
+        stat_reset(&worker->display_channel->jpeg_alpha_stat);
     }
-    case RED_WORKER_MESSAGE_SET_COMPRESSION:
-        receive_data(worker->channel, &worker->image_compression,
-                     sizeof(spice_image_compression_t));
-        switch (worker->image_compression) {
-        case SPICE_IMAGE_COMPRESS_AUTO_LZ:
-            red_printf("ic auto_lz");
-            break;
-        case SPICE_IMAGE_COMPRESS_AUTO_GLZ:
-            red_printf("ic auto_glz");
+#endif
+}
+
+void handle_dev_set_streaming_video(void *opaque, void *payload)
+{
+    RedWorkerMessageSetStreamingVideo *msg = payload;
+    RedWorker *worker = opaque;
+
+    worker->streaming_video = msg->streaming_video;
+    ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID);
+    switch(worker->streaming_video) {
+        case STREAM_VIDEO_ALL:
+            red_printf("sv all");
             break;
-        case SPICE_IMAGE_COMPRESS_QUIC:
-            red_printf("ic quic");
+        case STREAM_VIDEO_FILTER:
+            red_printf("sv filter");
             break;
-        case SPICE_IMAGE_COMPRESS_LZ:
-            red_printf("ic lz");
+        case STREAM_VIDEO_OFF:
+            red_printf("sv off");
             break;
-        case SPICE_IMAGE_COMPRESS_GLZ:
-            red_printf("ic glz");
+        default:
+            red_printf("sv invalid");
+    }
+}
+
+void handle_dev_set_mouse_mode(void *opaque, void *payload)
+{
+    RedWorkerMessageSetMouseMode *msg = payload;
+    RedWorker *worker = opaque;
+
+    worker->mouse_mode = msg->mode;
+    red_printf("mouse mode %u", worker->mouse_mode);
+}
+
+void handle_dev_add_memslot_async(void *opaque, void *payload)
+{
+    RedWorkerMessageAddMemslotAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_add_memslot(worker, msg->mem_slot);
+}
+
+void handle_dev_reset_memslots(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    red_memslot_info_reset(&worker->mem_slots);
+}
+
+void handle_dev_loadvm_commands(void *opaque, void *payload)
+{
+    RedWorkerMessageLoadvmCommands *msg = payload;
+    RedWorker *worker = opaque;
+    uint32_t i;
+    RedCursorCmd *cursor_cmd;
+    RedSurfaceCmd *surface_cmd;
+    uint32_t count = msg->count;
+    QXLCommandExt *ext = msg->ext;
+
+    red_printf("loadvm_commands");
+    for (i = 0 ; i < count ; ++i) {
+        switch (ext[i].cmd.type) {
+        case QXL_CMD_CURSOR:
+            cursor_cmd = spice_new0(RedCursorCmd, 1);
+            red_get_cursor_cmd(&worker->mem_slots, ext[i].group_id,
+                               cursor_cmd, ext[i].cmd.data);
+            qxl_process_cursor(worker, cursor_cmd, ext[i].group_id);
             break;
-        case SPICE_IMAGE_COMPRESS_OFF:
-            red_printf("ic off");
+        case QXL_CMD_SURFACE:
+            surface_cmd = spice_new0(RedSurfaceCmd, 1);
+            red_get_surface_cmd(&worker->mem_slots, ext[i].group_id,
+                                surface_cmd, ext[i].cmd.data);
+            red_process_surface(worker, surface_cmd, ext[i].group_id, TRUE);
             break;
         default:
-            red_printf("ic invalid");
-        }
-#ifdef COMPRESS_STAT
-        print_compress_stats(worker->display_channel);
-        if (worker->display_channel) {
-            stat_reset(&worker->display_channel->quic_stat);
-            stat_reset(&worker->display_channel->lz_stat);
-            stat_reset(&worker->display_channel->glz_stat);
-            stat_reset(&worker->display_channel->jpeg_stat);
-            stat_reset(&worker->display_channel->zlib_glz_stat);
-            stat_reset(&worker->display_channel->jpeg_alpha_stat);
-        }
-#endif
-        break;
-    case RED_WORKER_MESSAGE_SET_STREAMING_VIDEO:
-        receive_data(worker->channel, &worker->streaming_video, sizeof(uint32_t));
-        ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID);
-        switch(worker->streaming_video) {
-            case STREAM_VIDEO_ALL:
-                red_printf("sv all");
-                break;
-            case STREAM_VIDEO_FILTER:
-                red_printf("sv filter");
-                break;
-            case STREAM_VIDEO_OFF:
-                red_printf("sv off");
-                break;
-            default:
-                red_printf("sv invalid");
-        }
-        break;
-    case RED_WORKER_MESSAGE_SET_MOUSE_MODE:
-        receive_data(worker->channel, &worker->mouse_mode, sizeof(uint32_t));
-        red_printf("mouse mode %u", worker->mouse_mode);
-        break;
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC:
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT:
-        handle_dev_add_memslot(worker);
-        break;
-    case RED_WORKER_MESSAGE_DEL_MEMSLOT:
-        handle_dev_del_memslot(worker);
-        break;
-    case RED_WORKER_MESSAGE_RESET_MEMSLOTS:
-        red_memslot_info_reset(&worker->mem_slots);
-        break;
-    case RED_WORKER_MESSAGE_LOADVM_COMMANDS: {
-        uint32_t count;
-        QXLCommandExt ext;
-        RedCursorCmd *cursor_cmd;
-        RedSurfaceCmd *surface_cmd;
-
-        red_printf("loadvm_commands");
-        receive_data(worker->channel, &count, sizeof(uint32_t));
-        while (count > 0) {
-            receive_data(worker->channel, &ext, sizeof(QXLCommandExt));
-            switch (ext.cmd.type) {
-            case QXL_CMD_CURSOR:
-                cursor_cmd = spice_new0(RedCursorCmd, 1);
-                red_get_cursor_cmd(&worker->mem_slots, ext.group_id,
-                                   cursor_cmd, ext.cmd.data);
-                qxl_process_cursor(worker, cursor_cmd, ext.group_id);
-                break;
-            case QXL_CMD_SURFACE:
-                surface_cmd = spice_new0(RedSurfaceCmd, 1);
-                red_get_surface_cmd(&worker->mem_slots, ext.group_id,
-                                    surface_cmd, ext.cmd.data);
-                red_process_surface(worker, surface_cmd, ext.group_id, TRUE);
-                break;
-            default:
-                red_printf("unhandled loadvm command type (%d)", ext.cmd.type);
-                break;
-            }
-            count--;
+            red_printf("unhandled loadvm command type (%d)", ext[i].cmd.type);
+            break;
         }
-        break;
-    }
-    case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC:
-        handle_dev_flush_surfaces(worker);
-        break;
-    default:
-        red_error("message error");
-    }
-    if (call_async_complete) {
-        red_dispatcher_async_complete(worker->dispatcher, cmd);
-    }
-    if (write_ready) {
-        message = RED_WORKER_MESSAGE_READY;
-        write_message(worker->channel, &message);
     }
 }
 
+static void worker_handle_dispatcher_async_done(void *opaque,
+                                                uint32_t message_type,
+                                                void *payload)
+{
+    RedWorker *worker = opaque;
+    RedWorkerMessageAsync *msg_async = payload;
+
+    red_printf_debug(2, "WORKER", "");
+    red_dispatcher_async_complete(worker->red_dispatcher, msg_async->cmd);
+}
+
+static void register_callbacks(Dispatcher *dispatcher)
+{
+    dispatcher_register_async_done_callback(
+                                    dispatcher,
+                                    worker_handle_dispatcher_async_done);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_CONNECT,
+                                handle_dev_display_connect,
+                                sizeof(RedWorkerMessageDisplayConnect),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+                                handle_dev_display_disconnect,
+                                sizeof(RedWorkerMessageDisplayDisconnect),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
+                                handle_dev_display_migrate,
+                                sizeof(RedWorkerMessageDisplayMigrate),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_CONNECT,
+                                handle_dev_cursor_connect,
+                                sizeof(RedWorkerMessageCursorConnect),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+                                handle_dev_cursor_disconnect,
+                                sizeof(RedWorkerMessageCursorDisconnect),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_MIGRATE,
+                                handle_dev_cursor_migrate,
+                                sizeof(RedWorkerMessageCursorMigrate),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_UPDATE,
+                                handle_dev_update,
+                                sizeof(RedWorkerMessageUpdate),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_UPDATE_ASYNC,
+                                handle_dev_update_async,
+                                sizeof(RedWorkerMessageUpdateAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_ADD_MEMSLOT,
+                                handle_dev_add_memslot,
+                                sizeof(RedWorkerMessageAddMemslot),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC,
+                                handle_dev_add_memslot_async,
+                                sizeof(RedWorkerMessageAddMemslotAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DEL_MEMSLOT,
+                                handle_dev_del_memslot,
+                                sizeof(RedWorkerMessageDelMemslot),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACES,
+                                handle_dev_destroy_surfaces,
+                                sizeof(RedWorkerMessageDestroySurfaces),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC,
+                                handle_dev_destroy_surfaces_async,
+                                sizeof(RedWorkerMessageDestroySurfacesAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
+                                handle_dev_destroy_primary_surface,
+                                sizeof(RedWorkerMessageDestroyPrimarySurface),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC,
+                                handle_dev_destroy_primary_surface_async,
+                                sizeof(RedWorkerMessageDestroyPrimarySurfaceAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC,
+                                handle_dev_create_primary_surface_async,
+                                sizeof(RedWorkerMessageCreatePrimarySurfaceAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
+                                handle_dev_create_primary_surface,
+                                sizeof(RedWorkerMessageCreatePrimarySurface),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
+                                handle_dev_reset_image_cache,
+                                sizeof(RedWorkerMessageResetImageCache),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_RESET_CURSOR,
+                                handle_dev_reset_cursor,
+                                sizeof(RedWorkerMessageResetCursor),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_WAKEUP,
+                                handle_dev_wakeup,
+                                sizeof(RedWorkerMessageWakeup),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_OOM,
+                                handle_dev_oom,
+                                sizeof(RedWorkerMessageOom),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_START,
+                                handle_dev_start,
+                                sizeof(RedWorkerMessageStart),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
+                                handle_dev_flush_surfaces_async,
+                                sizeof(RedWorkerMessageFlushSurfacesAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_STOP,
+                                handle_dev_stop,
+                                sizeof(RedWorkerMessageStop),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_LOADVM_COMMANDS,
+                                handle_dev_loadvm_commands,
+                                sizeof(RedWorkerMessageLoadvmCommands),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_SET_COMPRESSION,
+                                handle_dev_set_compression,
+                                sizeof(RedWorkerMessageSetCompression),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
+                                handle_dev_set_streaming_video,
+                                sizeof(RedWorkerMessageSetStreamingVideo),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_SET_MOUSE_MODE,
+                                handle_dev_set_mouse_mode,
+                                sizeof(RedWorkerMessageSetMouseMode),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+                                handle_dev_display_channel_create,
+                                sizeof(RedWorkerMessageDisplayChannelCreate),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+                                handle_dev_cursor_channel_create,
+                                sizeof(RedWorkerMessageCursorChannelCreate),
+                                DISPATCHER_NONE);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
+                                handle_dev_destroy_surface_wait,
+                                sizeof(RedWorkerMessageDestroySurfaceWait),
+                                DISPATCHER_ACK);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
+                                handle_dev_destroy_surface_wait_async,
+                                sizeof(RedWorkerMessageDestroySurfaceWaitAsync),
+                                DISPATCHER_ASYNC);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_RESET_MEMSLOTS,
+                                handle_dev_reset_memslots,
+                                sizeof(RedWorkerMessageResetMemslots),
+                                DISPATCHER_NONE);
+}
+
+
+
+static void handle_dev_input(EventListener *listener, uint32_t events)
+{
+    RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
+
+    dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher));
+}
+
 static void handle_dev_free(EventListener *ctx)
 {
     free(ctx);
@@ -10687,14 +10918,18 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
     struct epoll_event event;
     RedWorkerMessage message;
     int epoll;
+    Dispatcher *dispatcher;
 
     ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE);
 
     memset(worker, 0, sizeof(RedWorker));
-    worker->dispatcher = init_data->dispatcher;
+    dispatcher = red_dispatcher_get_dispatcher(init_data->red_dispatcher);
+    dispatcher_set_opaque(dispatcher, worker);
+    worker->red_dispatcher = init_data->red_dispatcher;
     worker->qxl = init_data->qxl;
     worker->id = init_data->id;
-    worker->channel = init_data->channel;
+    worker->channel = dispatcher_get_recv_fd(dispatcher);
+    register_callbacks(dispatcher);
     worker->pending = init_data->pending;
     worker->dev_listener.refs = 1;
     worker->dev_listener.action = handle_dev_input;
diff --git a/server/red_worker.h b/server/red_worker.h
index 26c43ad..08c7b22 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -84,6 +84,8 @@ enum {
 
     RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
     RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+
+    RED_WORKER_MESSAGE_COUNT // LAST
 };
 
 typedef uint32_t RedWorkerMessage;
@@ -102,7 +104,6 @@ typedef struct RedDispatcher RedDispatcher;
 typedef struct WorkerInitData {
     struct QXLInstance *qxl;
     int id;
-    int channel;
     uint32_t *pending;
     uint32_t num_renderers;
     uint32_t renderers[RED_MAX_RENDERERS];
@@ -116,7 +117,7 @@ typedef struct WorkerInitData {
     uint8_t memslot_id_bits;
     uint8_t internal_groupslot_id;
     uint32_t n_surfaces;
-    RedDispatcher *dispatcher;
+    RedDispatcher *red_dispatcher;
 } WorkerInitData;
 
 void *red_worker_main(void *arg);
commit ca5776f40e60f5bf5c0bf19c242492c2082d3dfc
Author: Alon Levy <alevy at redhat.com>
Date:   Mon Nov 7 12:01:26 2011 +0200

    server/dispatcher: add dispatcher_register_async_done_callback

diff --git a/server/dispatcher.c b/server/dispatcher.c
index 81abbdf..90e46e3 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -126,6 +126,9 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
             red_printf("error writing ack for message %d\n", type);
             /* TODO: close socketpair? */
         }
+    } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->handle_async_done) {
+        dispatcher->handle_async_done(dispatcher->opaque, type,
+                                      (void *)payload);
     }
     return 1;
 }
@@ -174,9 +177,17 @@ unlock:
     pthread_mutex_unlock(&dispatcher->lock);
 }
 
+void dispatcher_register_async_done_callback(
+                                        Dispatcher *dispatcher,
+                                        dispatcher_handle_async_done handler)
+{
+    assert(dispatcher->handle_async_done == NULL);
+    dispatcher->handle_async_done = handler;
+}
+
 void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
-                                 dispatcher_handle_message handler, size_t size,
-                                 int ack)
+                                 dispatcher_handle_message handler,
+                                 size_t size, int ack)
 {
     DispatcherMessage *msg;
 
diff --git a/server/dispatcher.h b/server/dispatcher.h
index 95b6bfc..a468c58 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -8,6 +8,11 @@ typedef struct Dispatcher Dispatcher;
 typedef void (*dispatcher_handle_message)(void *opaque,
                                           void *payload);
 
+typedef void (*dispatcher_handle_async_done)(void *opaque,
+                                             uint32_t message_type,
+                                             void *payload);
+
+
 typedef struct DispatcherMessage {
     size_t size;
     int ack;
@@ -26,6 +31,7 @@ struct Dispatcher {
     void *payload; /* allocated as max of message sizes */
     size_t payload_size; /* used to track realloc calls */
     void *opaque;
+    dispatcher_handle_async_done handle_async_done;
 };
 
 /*
@@ -69,6 +75,16 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
                                  int ack);
 
 /*
+ * dispatcher_register_async_done_callback
+ * @dispatcher:     dispatcher
+ * @handler:        callback on the receiver side called *after* the
+ *                  message callback in case ack == DISPATCHER_ASYNC.
+ */
+void dispatcher_register_async_done_callback(
+                                    Dispatcher *dispatcher,
+                                    dispatcher_handle_async_done handler);
+
+/*
  *  dispatcher_handle_recv_read
  *  @dispatcher: Dispatcher instance
  */
commit fff04e867c02eb5c3582f17810c1a092ce240fc6
Author: Alon Levy <alevy at redhat.com>
Date:   Mon Nov 7 13:27:54 2011 +0200

    introduce DISPATCHER_{NONE,ACK,ASYNC}

diff --git a/server/dispatcher.c b/server/dispatcher.c
index 1cd9e43..81abbdf 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -120,10 +120,12 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
     } else {
         red_printf("error: no handler for message type %d\n", type);
     }
-    if (msg->ack && write_safe(dispatcher->recv_fd,
-                               &ack, sizeof(ack)) == -1) {
-        red_printf("error writing ack for message %d\n", type);
-        /* TODO: close socketpair? */
+    if (msg->ack == DISPATCHER_ACK) {
+        if (write_safe(dispatcher->recv_fd,
+                       &ack, sizeof(ack)) == -1) {
+            red_printf("error writing ack for message %d\n", type);
+            /* TODO: close socketpair? */
+        }
     }
     return 1;
 }
@@ -159,7 +161,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
                    message_type);
         goto unlock;
     }
-    if (msg->ack) {
+    if (msg->ack == DISPATCHER_ACK) {
         if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) {
             red_printf("error: failed to read ack");
         } else if (ack != ACK) {
diff --git a/server/dispatcher.h b/server/dispatcher.h
index 04e6b46..95b6bfc 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -45,13 +45,22 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
 void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
                      void *opaque);
 
+enum {
+    DISPATCHER_NONE = 0,
+    DISPATCHER_ACK,
+    DISPATCHER_ASYNC
+};
+
 /*
  * dispatcher_register_handler
- * @dispatcher:           dispatcher
+ * @dispatcher:     dispatcher
  * @messsage_type:  message type
  * @handler:        message handler
  * @size:           message size. Each type has a fixed associated size.
- * @ack:            send an ack. This is per message type - you can't send the
+ * @ack:            One of DISPATCHER_NONE, DISPATCHER_ACK, DISPATCHER_ASYNC.
+ *                  DISPATCHER_NONE - only send the message
+ *                  DISPATCHER_ACK - send an ack after the message
+ *                  DISPATCHER_ASYNC - call send an ack. This is per message type - you can't send the
  *                  same message type with and without. Register two different
  *                  messages if that is what you want.
  */
commit 776bdd6c95715dcd8e609dc3ff647d0ad73fd339
Author: Alon Levy <alevy at redhat.com>
Date:   Sun Oct 30 17:04:59 2011 +0200

    server: introduce dispatcher
    
    used for main_dispatcher only in this patch.
    
    Dispatcher is meant to be used for Main<->any low frequency messages.
    
    It's interface is meant to include the red_dispatcher usage:
     fixed size messages per message type
     some messages require an ack
    
    Some methods are added to be used by RedDispatcher later:
     dispatcher_handle_read - to be called directly by RedDispatcher epoll
      based loop
     dispatcher_set_opaque - to be set from red_worker pthread
     dispatcher_init - allow NULL core as used by red_worker
    
    Read and Write behavior:
     Sender: blocking write, blocking read for ack (if any).
     Reader: poll for any data, if such then blocking read for a
     message_type and following message. repeat until poll returns
     with no pending data to read.
    
    FDO Bugzilla: 42463

diff --git a/server/Makefile.am b/server/Makefile.am
index 418d707..34a6b47 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -78,6 +78,8 @@ libspice_server_la_SOURCES =			\
 	red_client_cache.h			\
 	red_client_shared_cache.h		\
 	red_common.h				\
+	dispatcher.c				\
+	dispatcher.h				\
 	red_dispatcher.c			\
 	red_dispatcher.h			\
 	main_dispatcher.c			\
diff --git a/server/dispatcher.c b/server/dispatcher.c
new file mode 100644
index 0000000..1cd9e43
--- /dev/null
+++ b/server/dispatcher.c
@@ -0,0 +1,248 @@
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include "mem.h"
+#include "spice_common.h"
+#include "dispatcher.h"
+
+#define DISPATCHER_DEBUG_PRINTF(level, ...) \
+    red_printf_debug(level, "DISP", ##__VA_ARGS__)
+
+//#define DEBUG_DISPATCHER
+
+#ifdef DEBUG_DISPATCHER
+#include <signal.h>
+#endif
+
+#define ACK 0xffffffff
+
+/*
+ * read_safe
+ * helper. reads until size bytes accumulated in buf, if an error other then
+ * EINTR is encountered returns -1, otherwise returns 0.
+ * @block if 1 the read will block (the fd is always blocking).
+ *        if 0 poll first, return immediately if no bytes available, otherwise
+ *         read size in blocking mode.
+ */
+static int read_safe(int fd, void *buf, size_t size, int block)
+{
+    int read_size = 0;
+    int ret;
+    struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0};
+
+    if (size == 0) {
+        return 0;
+    }
+
+    if (!block) {
+        while ((ret = poll(&pollfd, 1, 0)) == -1) {
+            if (errno == EINTR) {
+                DISPATCHER_DEBUG_PRINTF(3, "EINTR in poll");
+                continue;
+            }
+            red_error("poll failed");
+            return -1;
+        }
+        if (!(pollfd.revents & POLLIN)) {
+            return 0;
+        }
+    }
+    while (read_size < size) {
+        ret = read(fd, buf + read_size, size - read_size);
+        if (ret == -1) {
+            if (errno == EINTR) {
+                DISPATCHER_DEBUG_PRINTF(3, "EINTR in read");
+                continue;
+            }
+            return -1;
+        }
+        if (ret == 0) {
+            red_error("broken pipe on read");
+            return -1;
+        }
+        read_size += ret;
+    }
+    return read_size;
+}
+
+/*
+ * write_safe
+ * @return -1 for error, otherwise number of written bytes. may be zero.
+ */
+static int write_safe(int fd, void *buf, size_t size)
+{
+    int written_size = 0;
+    int ret;
+
+    while (written_size < size) {
+        ret = write(fd, buf + written_size, size - written_size);
+        if (ret == -1) {
+            if (errno != EINTR) {
+                DISPATCHER_DEBUG_PRINTF(3, "EINTR in write\n");
+                return -1;
+            }
+            continue;
+        }
+        written_size += ret;
+    }
+    return written_size;
+}
+
+static int dispatcher_handle_single_read(Dispatcher *dispatcher)
+{
+    int ret;
+    uint32_t type;
+    DispatcherMessage *msg = NULL;
+    uint8_t *payload = dispatcher->payload;
+    uint32_t ack = ACK;
+
+    if ((ret = read_safe(dispatcher->recv_fd, &type, sizeof(type), 0)) == -1) {
+        red_printf("error reading from dispatcher: %d\n", errno);
+        return 0;
+    }
+    if (ret == 0) {
+        /* no messsage */
+        return 0;
+    }
+    msg = &dispatcher->messages[type];
+    if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
+        red_printf("error reading from dispatcher: %d\n", errno);
+        /* TODO: close socketpair? */
+        return 0;
+    }
+    if (msg->handler) {
+        msg->handler(dispatcher->opaque, (void *)payload);
+    } else {
+        red_printf("error: no handler for message type %d\n", type);
+    }
+    if (msg->ack && write_safe(dispatcher->recv_fd,
+                               &ack, sizeof(ack)) == -1) {
+        red_printf("error writing ack for message %d\n", type);
+        /* TODO: close socketpair? */
+    }
+    return 1;
+}
+
+/*
+ * dispatcher_handle_recv_read
+ * doesn't handle being in the middle of a message. all reads are blocking.
+ */
+void dispatcher_handle_recv_read(Dispatcher *dispatcher)
+{
+    while (dispatcher_handle_single_read(dispatcher)) {
+    }
+}
+
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+                             void *payload)
+{
+    DispatcherMessage *msg;
+    uint32_t ack;
+    int send_fd = dispatcher->send_fd;
+
+    assert(dispatcher->max_message_type > message_type);
+    assert(dispatcher->messages[message_type].handler);
+    msg = &dispatcher->messages[message_type];
+    pthread_mutex_lock(&dispatcher->lock);
+    if (write_safe(send_fd, &message_type, sizeof(message_type)) == -1) {
+        red_printf("error: failed to send message type for message %d\n",
+                   message_type);
+        goto unlock;
+    }
+    if (write_safe(send_fd, payload, msg->size) == -1) {
+        red_printf("error: failed to send message body for message %d\n",
+                   message_type);
+        goto unlock;
+    }
+    if (msg->ack) {
+        if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) {
+            red_printf("error: failed to read ack");
+        } else if (ack != ACK) {
+            red_printf("error: got wrong ack value in dispatcher "
+                       "for message %d\n", message_type);
+            /* TODO handling error? */
+        }
+    }
+unlock:
+    pthread_mutex_unlock(&dispatcher->lock);
+}
+
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+                                 dispatcher_handle_message handler, size_t size,
+                                 int ack)
+{
+    DispatcherMessage *msg;
+
+    assert(message_type < dispatcher->max_message_type);
+    assert(dispatcher->messages[message_type].handler == 0);
+    msg = &dispatcher->messages[message_type];
+    msg->handler = handler;
+    msg->size = size;
+    msg->ack = ack;
+    if (msg->size > dispatcher->payload_size) {
+        dispatcher->payload = realloc(dispatcher->payload, msg->size);
+        dispatcher->payload_size = msg->size;
+    }
+}
+
+#ifdef DEBUG_DISPATCHER
+static void dummy_handler(int bla)
+{
+}
+
+static void setup_dummy_signal_handler(void)
+{
+    static int inited = 0;
+    struct sigaction act = {
+        .sa_handler = &dummy_handler,
+    };
+    if (inited) {
+        return;
+    }
+    inited = 1;
+    /* handle SIGRTMIN+10 in order to test the loops for EINTR */
+    if (sigaction(SIGRTMIN + 10, &act, NULL) == -1) {
+        fprintf(stderr,
+            "failed to set dummy sigaction for DEBUG_DISPATCHER\n");
+        exit(1);
+    }
+}
+#endif
+
+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
+                     void *opaque)
+{
+    int channels[2];
+
+#ifdef DEBUG_DISPATCHER
+    setup_dummy_signal_handler();
+#endif
+    dispatcher->opaque = opaque;
+    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
+        red_error("socketpair failed %s", strerror(errno));
+        return;
+    }
+    pthread_mutex_init(&dispatcher->lock, NULL);
+    dispatcher->recv_fd = channels[0];
+    dispatcher->send_fd = channels[1];
+    dispatcher->self = pthread_self();
+
+    dispatcher->messages = spice_malloc0_n(max_message_type,
+                                           sizeof(dispatcher->messages[0]));
+    dispatcher->max_message_type = max_message_type;
+}
+
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
+{
+    dispatcher->opaque = opaque;
+}
+
+int dispatcher_get_recv_fd(Dispatcher *dispatcher)
+{
+    return dispatcher->recv_fd;
+}
diff --git a/server/dispatcher.h b/server/dispatcher.h
new file mode 100644
index 0000000..04e6b46
--- /dev/null
+++ b/server/dispatcher.h
@@ -0,0 +1,81 @@
+#ifndef MAIN_DISPATCHER_H
+#define MAIN_DISPATCHER_H
+
+#include <spice.h>
+
+typedef struct Dispatcher Dispatcher;
+
+typedef void (*dispatcher_handle_message)(void *opaque,
+                                          void *payload);
+
+typedef struct DispatcherMessage {
+    size_t size;
+    int ack;
+    dispatcher_handle_message handler;
+} DispatcherMessage;
+
+struct Dispatcher {
+    SpiceCoreInterface *recv_core;
+    int recv_fd;
+    int send_fd;
+    pthread_t self;
+    pthread_mutex_t lock;
+    DispatcherMessage *messages;
+    int stage;  /* message parser stage - sender has no stages */
+    size_t max_message_type;
+    void *payload; /* allocated as max of message sizes */
+    size_t payload_size; /* used to track realloc calls */
+    void *opaque;
+};
+
+/*
+ * dispatcher_send_message
+ * @message_type: message type
+ * @payload:      payload
+ */
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+                             void *payload);
+
+/*
+ * dispatcher_init
+ * @max_message_type: number of message types. Allows upfront allocation
+ *  of a DispatcherMessage list.
+ * up front, and registration in any order wanted.
+ */
+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
+                     void *opaque);
+
+/*
+ * dispatcher_register_handler
+ * @dispatcher:           dispatcher
+ * @messsage_type:  message type
+ * @handler:        message handler
+ * @size:           message size. Each type has a fixed associated size.
+ * @ack:            send an ack. This is per message type - you can't send the
+ *                  same message type with and without. Register two different
+ *                  messages if that is what you want.
+ */
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+                                 dispatcher_handle_message handler, size_t size,
+                                 int ack);
+
+/*
+ *  dispatcher_handle_recv_read
+ *  @dispatcher: Dispatcher instance
+ */
+void dispatcher_handle_recv_read(Dispatcher *);
+
+/*
+ *  dispatcher_get_recv_fd
+ *  @return: receive file descriptor of the dispatcher
+ */
+int dispatcher_get_recv_fd(Dispatcher *);
+
+/*
+ * dispatcher_set_opaque
+ * @dispatcher: Dispatcher instance
+ * @opaque: opaque to use for callbacks
+ */
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
+
+#endif //MAIN_DISPATCHER_H
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
index 73856bf..a5967fa 100644
--- a/server/main_dispatcher.c
+++ b/server/main_dispatcher.c
@@ -5,6 +5,7 @@
 #include <assert.h>
 
 #include "red_common.h"
+#include "dispatcher.h"
 #include "main_dispatcher.h"
 
 /*
@@ -28,11 +29,8 @@
  */
 
 typedef struct {
+    Dispatcher base;
     SpiceCoreInterface *core;
-    int main_fd;
-    int other_fd;
-    pthread_t self;
-    pthread_mutex_t lock;
 } MainDispatcher;
 
 MainDispatcher main_dispatcher;
@@ -43,15 +41,10 @@ enum {
     MAIN_DISPATCHER_NUM_MESSAGES
 };
 
-typedef struct MainDispatcherMessage {
-    uint32_t type;
-    union {
-        struct {
-            int event;
-            SpiceChannelEventInfo *info;
-        } channel_event;
-    } data;
-} MainDispatcherMessage;
+typedef struct MainDispatcherChannelEventMessage {
+    int event;
+    SpiceChannelEventInfo *info;
+} MainDispatcherChannelEventMessage;
 
 /* channel_event - calls core->channel_event, must be done in main thread */
 static void main_dispatcher_self_handle_channel_event(
@@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
     main_dispatcher.core->channel_event(event, info);
 }
 
-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
+static void main_dispatcher_handle_channel_event(void *opaque,
+                                                 void *payload)
 {
-    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
-                                              msg->data.channel_event.info);
+    MainDispatcherChannelEventMessage *channel_event = payload;
+
+    main_dispatcher_self_handle_channel_event(channel_event->event,
+                                              channel_event->info);
 }
 
 void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
 {
-    MainDispatcherMessage msg;
-    ssize_t written = 0;
-    ssize_t ret;
+    MainDispatcherChannelEventMessage msg;
 
-    if (pthread_self() == main_dispatcher.self) {
+    if (pthread_self() == main_dispatcher.base.self) {
         main_dispatcher_self_handle_channel_event(event, info);
         return;
     }
-    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
-    msg.data.channel_event.event = event;
-    msg.data.channel_event.info = info;
-    pthread_mutex_lock(&main_dispatcher.lock);
-    while (written < sizeof(msg)) {
-        ret = write(main_dispatcher.other_fd, &msg + written,
-                    sizeof(msg) - written);
-        if (ret == -1) {
-            assert(errno == EINTR);
-            continue;
-        }
-        written += ret;
-    }
-    pthread_mutex_unlock(&main_dispatcher.lock);
+    msg.event = event;
+    msg.info = info;
+    dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+                            &msg);
 }
 
-
-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
+static void dispatcher_handle_read(int fd, int event, void *opaque)
 {
-    int ret;
-    MainDispatcher *md = opaque;
-    MainDispatcherMessage msg;
-    int read_size = 0;
+    Dispatcher *dispatcher = opaque;
 
-    while (read_size < sizeof(msg)) {
-        /* blocks until sizeof(msg) is read */
-        ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size);
-        if (ret == -1) {
-            if (errno != EINTR) {
-                red_printf("error reading from main dispatcher: %d\n", errno);
-                /* TODO: close channel? */
-                return;
-            }
-            continue;
-        }
-        read_size += ret;
-    }
-    switch (msg.type) {
-        case MAIN_DISPATCHER_CHANNEL_EVENT:
-            main_dispatcher_handle_channel_event(&msg);
-            break;
-        default:
-            red_printf("error: unhandled main dispatcher message type %d\n",
-                       msg.type);
-    }
+    dispatcher_handle_recv_read(dispatcher);
 }
 
 void main_dispatcher_init(SpiceCoreInterface *core)
 {
-    int channels[2];
-
     memset(&main_dispatcher, 0, sizeof(main_dispatcher));
     main_dispatcher.core = core;
-
-    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
-        red_error("socketpair failed %s", strerror(errno));
-        return;
-    }
-    pthread_mutex_init(&main_dispatcher.lock, NULL);
-    main_dispatcher.main_fd = channels[0];
-    main_dispatcher.other_fd = channels[1];
-    main_dispatcher.self = pthread_self();
-
-    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
-                    main_dispatcher_handle_read, &main_dispatcher);
+    dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base);
+    core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
+                    dispatcher_handle_read, &main_dispatcher.base);
+    dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+                                main_dispatcher_handle_channel_event,
+                                sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
 }
commit 9174b67160157f74cc00faf0b42cb7c5d2b710a1
Author: Alon Levy <alevy at redhat.com>
Date:   Mon Nov 7 12:11:12 2011 +0200

    server/red_dispatcher: remove semicolon from DBG_ASYNC

diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 5257e6b..0c7a875 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -47,7 +47,7 @@ static int num_active_workers = 0;
 #define DBG_ASYNC(s, ...)   \
     do {                    \
         red_printf_debug(2, "ASYNC", s, ##__VA_ARGS__);   \
-    } while (0);
+    } while (0)
 
 struct AsyncCommand {
     RingItem link;


More information about the Spice-commits mailing list