[Spice-devel] [PATCH v2 5/6] server/red_worker: reuse dispatcher

Alon Levy alevy at redhat.com
Mon Nov 7 03:44:45 PST 2011


This patch reuses Dispatcher in RedDispatcher. It adds two helpers
to red_worker to keep RedWorker opaque to the outside. The dispatcher is
abused in three places that use the underlying socket directly:
 once sending a READY after red_init completes
 once for each channel creation, replying with the RedChannel instance
  for cursor and display.

FDO Bugzilla: 42463

rfc->v1:
* move callbacks to red_worker.c including registration (Yonit)
* rename dispatcher to red_dispatcher in red_worker.c and red_dispatcher.c
* add accessor red_dispatcher_get_dispatcher
* s/dispatcher_handle_recv/dispatcher_handle_recv_read/ and change sig to
  just Dispatcher *dispatcher (was the SpiceCoreInterface one)
* remove SpiceCoreInterface parameter from dispatcher_init (Yonit)
* main_dispatcher needed it for channel_event so it has it in
  struct MainDispatcher
* add dispatcher_get_recv_fd for red_worker
---
 server/red_dispatcher.c |  487 +++++++++++++++-----------
 server/red_dispatcher.h |  145 ++++++++
 server/red_worker.c     |  904 +++++++++++++++++++++++++++++------------------
 server/red_worker.h     |    5 +-
 4 files changed, 992 insertions(+), 549 deletions(-)

diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 0c7a875..eaa0164 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -37,6 +37,7 @@
 #include "reds_gl_canvas.h"
 #endif // USE_OPENGL
 #include "reds.h"
+#include "dispatcher.h"
 #include "red_dispatcher.h"
 #include "red_parse_qxl.h"
 
@@ -58,7 +59,7 @@ struct AsyncCommand {
 struct RedDispatcher {
     QXLWorker base;
     QXLInstance *qxl;
-    int channel;
+    Dispatcher dispatcher;
     pthread_t worker_thread;
     uint32_t pending;
     int primary_active;
@@ -93,19 +94,22 @@ static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *clie
                                             int num_common_caps, uint32_t *common_caps, int num_caps,
                                             uint32_t *caps)
 {
+    RedWorkerMessageDisplayConnect payload;
     RedDispatcher *dispatcher;
 
     red_printf("");
     dispatcher = (RedDispatcher *)channel->data;
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &client, sizeof(RedClient *));
-    send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
-    send_data(dispatcher->channel, &migration, sizeof(int));
+    payload.client = client;
+    payload.stream = stream;
+    payload.migration = migration;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_CONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
 {
+    RedWorkerMessageDisplayDisconnect payload;
     RedDispatcher *dispatcher;
 
     if (!rcc->channel) {
@@ -115,27 +119,28 @@ static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
     dispatcher = (RedDispatcher *)rcc->channel->data;
 
     red_printf("");
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+    payload.rcc = rcc;
 
     // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count
     // for channels
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_display_migrate(RedChannelClient *rcc)
 {
+    RedWorkerMessageDisplayMigrate payload;
     RedDispatcher *dispatcher;
     if (!rcc->channel) {
         return;
     }
     dispatcher = (RedDispatcher *)rcc->channel->data;
     red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+    payload.rcc = rcc;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
+                            &payload);
 }
 
 static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
@@ -143,17 +148,20 @@ static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *clien
                                            uint32_t *common_caps, int num_caps,
                                            uint32_t *caps)
 {
+    RedWorkerMessageCursorConnect payload;
     RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
     red_printf("");
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &client, sizeof(RedClient *));
-    send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
-    send_data(dispatcher->channel, &migration, sizeof(int));
+    payload.client = client;
+    payload.stream = stream;
+    payload.migration = migration;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_CONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
 {
+    RedWorkerMessageCursorDisconnect payload;
     RedDispatcher *dispatcher;
 
     if (!rcc->channel) {
@@ -162,16 +170,16 @@ static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
 
     dispatcher = (RedDispatcher *)rcc->channel->data;
     red_printf("");
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT;
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+    payload.rcc = rcc;
 
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+                            &payload);
 }
 
 static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
 {
+    RedWorkerMessageCursorMigrate payload;
     RedDispatcher *dispatcher;
 
     if (!rcc->channel) {
@@ -179,8 +187,10 @@ static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
     }
     dispatcher = (RedDispatcher *)rcc->channel->data;
     red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE;
-    write_message(dispatcher->channel, &message);
+    payload.rcc = rcc;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_MIGRATE,
+                            &payload);
 }
 
 typedef struct RendererInfo {
@@ -263,16 +273,16 @@ static void red_dispatcher_update_area(RedDispatcher *dispatcher, uint32_t surfa
                                    QXLRect *qxl_area, QXLRect *qxl_dirty_rects,
                                    uint32_t num_dirty_rects, uint32_t clear_dirty_region)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE;
+    RedWorkerMessageUpdate payload;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, &qxl_area, sizeof(QXLRect *));
-    send_data(dispatcher->channel, &qxl_dirty_rects, sizeof(QXLRect *));
-    send_data(dispatcher->channel, &num_dirty_rects, sizeof(uint32_t));
-    send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    payload.surface_id = surface_id;
+    payload.qxl_area = qxl_area;
+    payload.qxl_dirty_rects = qxl_dirty_rects;
+    payload.num_dirty_rects = num_dirty_rects;
+    payload.clear_dirty_region = clear_dirty_region;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_UPDATE,
+                            &payload);
 }
 
 static AsyncCommand *async_command_alloc(RedDispatcher *dispatcher,
@@ -297,13 +307,15 @@ static void red_dispatcher_update_area_async(RedDispatcher *dispatcher,
                                          uint64_t cookie)
 {
     RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
+    RedWorkerMessageUpdateAsync payload;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, qxl_area, sizeof(QXLRect));
-    send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t));
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    payload.qxl_area = *qxl_area;
+    payload.clear_dirty_region = clear_dirty_region;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            message,
+                            &payload);
 }
 
 static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id,
@@ -316,12 +328,12 @@ static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id,
 
 static void red_dispatcher_add_memslot(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT;
+    RedWorkerMessageAddMemslot payload;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    payload.mem_slot = *mem_slot;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_ADD_MEMSLOT,
+                            &payload);
 }
 
 static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slot)
@@ -331,21 +343,22 @@ static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slo
 
 static void red_dispatcher_add_memslot_async(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot, uint64_t cookie)
 {
+    RedWorkerMessageAddMemslotAsync payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot));
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.mem_slot = *mem_slot;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void red_dispatcher_del_memslot(RedDispatcher *dispatcher, uint32_t slot_group_id, uint32_t slot_id)
 {
+    RedWorkerMessageDelMemslot payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DEL_MEMSLOT;
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &slot_group_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, &slot_id, sizeof(uint32_t));
+    payload.slot_group_id = slot_group_id;
+    payload.slot_id = slot_id;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id, uint32_t slot_id)
@@ -355,11 +368,11 @@ static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id
 
 static void red_dispatcher_destroy_surfaces(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES;
+    RedWorkerMessageDestroySurfaces payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DESTROY_SURFACES,
+                            &payload);
 }
 
 static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker)
@@ -369,11 +382,11 @@ static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker)
 
 static void red_dispatcher_destroy_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie)
 {
+    RedWorkerMessageDestroySurfacesAsync payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispatcher)
@@ -387,28 +400,37 @@ static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispa
 }
 
 static void
+red_dispatcher_destroy_primary_surface_sync(RedDispatcher *dispatcher,
+                                            uint32_t surface_id)
+{
+    RedWorkerMessageDestroyPrimarySurface payload;
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
+                            &payload);
+    red_dispatcher_destroy_primary_surface_complete(dispatcher);
+}
+
+static void
+red_dispatcher_destroy_primary_surface_async(RedDispatcher *dispatcher,
+                                             uint32_t surface_id, uint64_t cookie)
+{
+    RedWorkerMessageDestroyPrimarySurfaceAsync payload;
+    RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC;
+
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void
 red_dispatcher_destroy_primary_surface(RedDispatcher *dispatcher,
                                        uint32_t surface_id, int async, uint64_t cookie)
 {
-    RedWorkerMessage message;
-    AsyncCommand *cmd = NULL;
-
     if (async) {
-        message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC;
-        cmd = async_command_alloc(dispatcher, message, cookie);
+        red_dispatcher_destroy_primary_surface_async(dispatcher, surface_id, cookie);
     } else {
-        message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE;
-    }
-
-    write_message(dispatcher->channel, &message);
-    if (async) {
-        send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    }
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    if (!async) {
-        read_message(dispatcher->channel, &message);
-        ASSERT(message == RED_WORKER_MESSAGE_READY);
-        red_dispatcher_destroy_primary_surface_complete(dispatcher);
+        red_dispatcher_destroy_primary_surface_sync(dispatcher, surface_id);
     }
 }
 
@@ -431,30 +453,42 @@ static void red_dispatcher_create_primary_surface_complete(RedDispatcher *dispat
 }
 
 static void
-red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id,
-                                      QXLDevSurfaceCreate *surface, int async, uint64_t cookie)
+red_dispatcher_create_primary_surface_async(RedDispatcher *dispatcher, uint32_t surface_id,
+                                            QXLDevSurfaceCreate *surface, uint64_t cookie)
 {
-    RedWorkerMessage message;
-    AsyncCommand *cmd = NULL;
+    RedWorkerMessageCreatePrimarySurfaceAsync payload;
+    RedWorkerMessage message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC;
+
+    dispatcher->surface_create = *surface;
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    payload.surface = *surface;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void
+red_dispatcher_create_primary_surface_sync(RedDispatcher *dispatcher, uint32_t surface_id,
+                                           QXLDevSurfaceCreate *surface)
+{
+    RedWorkerMessageCreatePrimarySurface payload;
 
-    if (async) {
-        message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC;
-        cmd = async_command_alloc(dispatcher, message, cookie);
-    } else {
-        message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE;
-    }
     dispatcher->surface_create = *surface;
+    payload.surface_id = surface_id;
+    payload.surface = *surface;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
+                            &payload);
+    red_dispatcher_create_primary_surface_complete(dispatcher);
+}
 
-    write_message(dispatcher->channel, &message);
+static void
+red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id,
+                                      QXLDevSurfaceCreate *surface, int async, uint64_t cookie)
+{
     if (async) {
-        send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    }
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
-    send_data(dispatcher->channel, surface, sizeof(QXLDevSurfaceCreate));
-    if (!async) {
-        read_message(dispatcher->channel, &message);
-        ASSERT(message == RED_WORKER_MESSAGE_READY);
-        red_dispatcher_create_primary_surface_complete(dispatcher);
+        red_dispatcher_create_primary_surface_async(dispatcher, surface_id, surface, cookie);
+    } else {
+        red_dispatcher_create_primary_surface_sync(dispatcher, surface_id, surface);
     }
 }
 
@@ -466,11 +500,11 @@ static void qxl_worker_create_primary_surface(QXLWorker *qxl_worker, uint32_t su
 
 static void red_dispatcher_reset_image_cache(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_IMAGE_CACHE;
+    RedWorkerMessageResetImageCache payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
+                            &payload);
 }
 
 static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker)
@@ -480,11 +514,11 @@ static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker)
 
 static void red_dispatcher_reset_cursor(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_CURSOR;
+    RedWorkerMessageResetCursor payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_RESET_CURSOR,
+                            &payload);
 }
 
 static void qxl_worker_reset_cursor(QXLWorker *qxl_worker)
@@ -492,29 +526,38 @@ static void qxl_worker_reset_cursor(QXLWorker *qxl_worker)
     red_dispatcher_reset_cursor((RedDispatcher*)qxl_worker);
 }
 
-static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher, uint32_t surface_id,
-                                                int async, uint64_t cookie)
+static void red_dispatcher_destroy_surface_wait_sync(RedDispatcher *dispatcher,
+                                                     uint32_t surface_id)
 {
-    RedWorkerMessage message;
-    AsyncCommand *cmd = NULL;
+    RedWorkerMessageDestroySurfaceWait payload;
 
-    if (async ) {
-        message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC;
-        cmd = async_command_alloc(dispatcher, message, cookie);
-    } else {
-        message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT;
-    }
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
+                            &payload);
+}
 
-    write_message(dispatcher->channel, &message);
-    if (async) {
-        send_data(dispatcher->channel, &cmd, sizeof(cmd));
-    }
-    send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
+static void red_dispatcher_destroy_surface_wait_async(RedDispatcher *dispatcher,
+                                                      uint32_t surface_id,
+                                                      uint64_t cookie)
+{
+    RedWorkerMessageDestroySurfaceWaitAsync payload;
+    RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC;
+
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    payload.surface_id = surface_id;
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher,
+                                                uint32_t surface_id,
+                                                int async, uint64_t cookie)
+{
     if (async) {
-        return;
+        red_dispatcher_destroy_surface_wait_async(dispatcher, surface_id, cookie);
+    } else {
+        red_dispatcher_destroy_surface_wait_sync(dispatcher, surface_id);
     }
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
 }
 
 static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surface_id)
@@ -524,9 +567,11 @@ static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surf
 
 static void red_dispatcher_reset_memslots(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_MEMSLOTS;
+    RedWorkerMessageResetMemslots payload;
 
-    write_message(dispatcher->channel, &message);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_RESET_MEMSLOTS,
+                            &payload);
 }
 
 static void qxl_worker_reset_memslots(QXLWorker *qxl_worker)
@@ -536,11 +581,15 @@ static void qxl_worker_reset_memslots(QXLWorker *qxl_worker)
 
 static void red_dispatcher_wakeup(RedDispatcher *dispatcher)
 {
-    if (!test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_WAKEUP;
-        set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending);
-        write_message(dispatcher->channel, &message);
+    RedWorkerMessageWakeup payload;
+
+    if (test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) {
+        return;
     }
+    set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_WAKEUP,
+                            &payload);
 }
 
 static void qxl_worker_wakeup(QXLWorker *qxl_worker)
@@ -550,11 +599,15 @@ static void qxl_worker_wakeup(QXLWorker *qxl_worker)
 
 static void red_dispatcher_oom(RedDispatcher *dispatcher)
 {
-    if (!test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_OOM;
-        set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending);
-        write_message(dispatcher->channel, &message);
+    RedWorkerMessageOom payload;
+
+    if (test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) {
+        return;
     }
+    set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_OOM,
+                            &payload);
 }
 
 static void qxl_worker_oom(QXLWorker *qxl_worker)
@@ -564,9 +617,11 @@ static void qxl_worker_oom(QXLWorker *qxl_worker)
 
 static void red_dispatcher_start(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_START;
+    RedWorkerMessageStart payload;
 
-    write_message(dispatcher->channel, &message);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_START,
+                            &payload);
 }
 
 static void qxl_worker_start(QXLWorker *qxl_worker)
@@ -576,20 +631,20 @@ static void qxl_worker_start(QXLWorker *qxl_worker)
 
 static void red_dispatcher_flush_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie)
 {
+    RedWorkerMessageFlushSurfacesAsync payload;
     RedWorkerMessage message = RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC;
-    AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
 
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &cmd, sizeof(cmd));
+    payload.cmd = async_command_alloc(dispatcher, message, cookie);
+    dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
 }
 
 static void red_dispatcher_stop(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+    RedWorkerMessageStop payload;
 
-    write_message(dispatcher->channel, &message);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_STOP,
+                            &payload);
 }
 
 static void qxl_worker_stop(QXLWorker *qxl_worker)
@@ -601,14 +656,14 @@ static void red_dispatcher_loadvm_commands(RedDispatcher *dispatcher,
                                            struct QXLCommandExt *ext,
                                            uint32_t count)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_LOADVM_COMMANDS;
+    RedWorkerMessageLoadvmCommands payload;
 
     red_printf("");
-    write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &count, sizeof(uint32_t));
-    send_data(dispatcher->channel, ext, sizeof(QXLCommandExt) * count);
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    payload.count = count;
+    payload.ext = ext;
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_LOADVM_COMMANDS,
+                            &payload);
 }
 
 static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
@@ -640,37 +695,44 @@ static inline int calc_compression_level(void)
 
 void red_dispatcher_on_ic_change(void)
 {
+    RedWorkerMessageSetCompression payload;
     int compression_level = calc_compression_level();
     RedDispatcher *now = dispatchers;
+
     while (now) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_SET_COMPRESSION;
         now->qxl->st->qif->set_compression_level(now->qxl, compression_level);
-        write_message(now->channel, &message);
-        send_data(now->channel, &image_compression, sizeof(spice_image_compression_t));
+        payload.image_compression = image_compression;
+        dispatcher_send_message(&now->dispatcher,
+                                RED_WORKER_MESSAGE_SET_COMPRESSION,
+                                &payload);
         now = now->next;
     }
 }
 
 void red_dispatcher_on_sv_change(void)
 {
+    RedWorkerMessageSetStreamingVideo payload;
     int compression_level = calc_compression_level();
     RedDispatcher *now = dispatchers;
     while (now) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_SET_STREAMING_VIDEO;
         now->qxl->st->qif->set_compression_level(now->qxl, compression_level);
-        write_message(now->channel, &message);
-        send_data(now->channel, &streaming_video, sizeof(uint32_t));
+        payload.streaming_video = streaming_video;
+        dispatcher_send_message(&now->dispatcher,
+                                RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
+                                &payload);
         now = now->next;
     }
 }
 
 void red_dispatcher_set_mouse_mode(uint32_t mode)
 {
+    RedWorkerMessageSetMouseMode payload;
     RedDispatcher *now = dispatchers;
     while (now) {
-        RedWorkerMessage message = RED_WORKER_MESSAGE_SET_MOUSE_MODE;
-        write_message(now->channel, &message);
-        send_data(now->channel, &mode, sizeof(uint32_t));
+        payload.mode = mode;
+        dispatcher_send_message(&now->dispatcher,
+                                RED_WORKER_MESSAGE_SET_MOUSE_MODE,
+                                &payload);
         now = now->next;
     }
 }
@@ -873,34 +935,31 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher,
 
 static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE;
+    RedWorkerMessageDisplayChannelCreate payload;
     RedChannel *display_channel;
 
-    write_message(dispatcher->channel, &message);
-
-    receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+                            &payload);
+    receive_data(dispatcher->dispatcher.send_fd, &display_channel, sizeof(RedChannel *));
     return display_channel;
 }
 
 static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher)
 {
-    RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE;
+    RedWorkerMessageCursorChannelCreate payload;
     RedChannel *cursor_channel;
 
-    write_message(dispatcher->channel, &message);
-
-    receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *));
-    read_message(dispatcher->channel, &message);
-    ASSERT(message == RED_WORKER_MESSAGE_READY);
+    dispatcher_send_message(&dispatcher->dispatcher,
+                            RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+                            &payload);
+    receive_data(dispatcher->dispatcher.send_fd, &cursor_channel, sizeof(RedChannel *));
     return cursor_channel;
 }
 
 RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
 {
-    RedDispatcher *dispatcher;
-    int channels[2];
+    RedDispatcher *red_dispatcher;
     RedWorkerMessage message;
     WorkerInitData init_data;
     QXLDevInitInfo init_info;
@@ -917,45 +976,41 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
     gl_canvas_init();
 #endif // USE_OPENGL
 
-    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
-        red_error("socketpair failed %s", strerror(errno));
-    }
-
-    dispatcher = spice_new0(RedDispatcher, 1);
-    dispatcher->channel = channels[0];
-    ring_init(&dispatcher->async_commands);
-    DBG_ASYNC("dispatcher->async_commands.next %p", dispatcher->async_commands.next);
-    init_data.qxl = dispatcher->qxl = qxl;
+    red_dispatcher = spice_new0(RedDispatcher, 1);
+    ring_init(&red_dispatcher->async_commands);
+    DBG_ASYNC("red_dispatcher->async_commands.next %p", red_dispatcher->async_commands.next);
+    dispatcher_init(&red_dispatcher->dispatcher, RED_WORKER_MESSAGE_COUNT, NULL);
+    init_data.qxl = red_dispatcher->qxl = qxl;
     init_data.id = qxl->id;
-    init_data.channel = channels[1];
-    init_data.pending = &dispatcher->pending;
+    init_data.red_dispatcher = red_dispatcher;
+    init_data.pending = &red_dispatcher->pending;
     init_data.num_renderers = num_renderers;
     memcpy(init_data.renderers, renderers, sizeof(init_data.renderers));
 
-    pthread_mutex_init(&dispatcher->async_lock, NULL);
+    pthread_mutex_init(&red_dispatcher->async_lock, NULL);
     init_data.image_compression = image_compression;
     init_data.jpeg_state = jpeg_state;
     init_data.zlib_glz_state = zlib_glz_state;
     init_data.streaming_video = streaming_video;
 
-    dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR;
-    dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR;
-    dispatcher->base.wakeup = qxl_worker_wakeup;
-    dispatcher->base.oom = qxl_worker_oom;
-    dispatcher->base.start = qxl_worker_start;
-    dispatcher->base.stop = qxl_worker_stop;
-    dispatcher->base.update_area = qxl_worker_update_area;
-    dispatcher->base.add_memslot = qxl_worker_add_memslot;
-    dispatcher->base.del_memslot = qxl_worker_del_memslot;
-    dispatcher->base.reset_memslots = qxl_worker_reset_memslots;
-    dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces;
-    dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface;
-    dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface;
-
-    dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache;
-    dispatcher->base.reset_cursor = qxl_worker_reset_cursor;
-    dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait;
-    dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands;
+    red_dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR;
+    red_dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR;
+    red_dispatcher->base.wakeup = qxl_worker_wakeup;
+    red_dispatcher->base.oom = qxl_worker_oom;
+    red_dispatcher->base.start = qxl_worker_start;
+    red_dispatcher->base.stop = qxl_worker_stop;
+    red_dispatcher->base.update_area = qxl_worker_update_area;
+    red_dispatcher->base.add_memslot = qxl_worker_add_memslot;
+    red_dispatcher->base.del_memslot = qxl_worker_del_memslot;
+    red_dispatcher->base.reset_memslots = qxl_worker_reset_memslots;
+    red_dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces;
+    red_dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface;
+    red_dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface;
+
+    red_dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache;
+    red_dispatcher->base.reset_cursor = qxl_worker_reset_cursor;
+    red_dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait;
+    red_dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands;
 
     qxl->st->qif->get_init_info(qxl, &init_info);
 
@@ -965,7 +1020,6 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
     init_data.num_memslots_groups = init_info.num_memslots_groups;
     init_data.internal_groupslot_id = init_info.internal_groupslot_id;
     init_data.n_surfaces = init_info.n_surfaces;
-    init_data.dispatcher = dispatcher;
 
     num_active_workers = 1;
 
@@ -974,40 +1028,51 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
     sigdelset(&thread_sig_mask, SIGFPE);
     sigdelset(&thread_sig_mask, SIGSEGV);
     pthread_sigmask(SIG_SETMASK, &thread_sig_mask, &curr_sig_mask);
-    if ((r = pthread_create(&dispatcher->worker_thread, NULL, red_worker_main, &init_data))) {
+    if ((r = pthread_create(&red_dispatcher->worker_thread, NULL, red_worker_main, &init_data))) {
         red_error("create thread failed %d", r);
     }
     pthread_sigmask(SIG_SETMASK, &curr_sig_mask, NULL);
 
-    read_message(dispatcher->channel, &message);
+    read_message(red_dispatcher->dispatcher.send_fd, &message);
     ASSERT(message == RED_WORKER_MESSAGE_READY);
 
-    display_channel = red_dispatcher_display_channel_create(dispatcher);
+    display_channel = red_dispatcher_display_channel_create(red_dispatcher);
 
     if (display_channel) {
         client_cbs.connect = red_dispatcher_set_display_peer;
         client_cbs.disconnect = red_dispatcher_disconnect_display_peer;
         client_cbs.migrate = red_dispatcher_display_migrate;
         red_channel_register_client_cbs(display_channel, &client_cbs);
-        red_channel_set_data(display_channel, dispatcher);
+        red_channel_set_data(display_channel, red_dispatcher);
         reds_register_channel(display_channel);
     }
 
-    cursor_channel = red_dispatcher_cursor_channel_create(dispatcher);
+    cursor_channel = red_dispatcher_cursor_channel_create(red_dispatcher);
 
     if (cursor_channel) {
         client_cbs.connect = red_dispatcher_set_cursor_peer;
         client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer;
         client_cbs.migrate = red_dispatcher_cursor_migrate;
         red_channel_register_client_cbs(cursor_channel, &client_cbs);
-        red_channel_set_data(cursor_channel, dispatcher);
+        red_channel_set_data(cursor_channel, red_dispatcher);
         reds_register_channel(cursor_channel);
     }
 
-    qxl->st->qif->attache_worker(qxl, &dispatcher->base);
+    qxl->st->qif->attache_worker(qxl, &red_dispatcher->base);
     qxl->st->qif->set_compression_level(qxl, calc_compression_level());
 
-    dispatcher->next = dispatchers;
-    dispatchers = dispatcher;
-    return dispatcher;
+    red_dispatcher->next = dispatchers;
+    dispatchers = red_dispatcher;
+    return red_dispatcher;
+}
+
+struct Dispatcher *red_dispatcher_get_dispatcher(RedDispatcher *red_dispatcher)
+{
+    return &red_dispatcher->dispatcher;
+}
+
+void red_dispatcher_set_dispatcher_opaque(struct RedDispatcher *red_dispatcher,
+                                          void *opaque)
+{
+    dispatcher_set_opaque(&red_dispatcher->dispatcher, opaque);
 }
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index c2582f4..59edb44 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -32,5 +32,150 @@ int red_dispatcher_add_renderer(const char *name);
 uint32_t red_dispatcher_qxl_ram_size(void);
 int red_dispatcher_qxl_count(void);
 void red_dispatcher_async_complete(struct RedDispatcher *, AsyncCommand *);
+struct Dispatcher *red_dispatcher_get_dispatcher(struct RedDispatcher *);
+
+typedef struct RedWorkerMessageDisplayConnect {
+    RedClient * client;
+    RedsStream * stream;
+    int migration;
+} RedWorkerMessageDisplayConnect;
+
+typedef struct RedWorkerMessageDisplayDisconnect {
+    RedChannelClient *rcc;
+} RedWorkerMessageDisplayDisconnect;
+
+typedef struct RedWorkerMessageDisplayMigrate {
+    RedChannelClient *rcc;
+} RedWorkerMessageDisplayMigrate;
+
+typedef struct RedWorkerMessageCursorConnect {
+    RedClient *client;
+    RedsStream *stream;
+    int migration;
+} RedWorkerMessageCursorConnect;
+
+typedef struct RedWorkerMessageCursorDisconnect {
+    RedChannelClient *rcc;
+} RedWorkerMessageCursorDisconnect;
+
+typedef struct RedWorkerMessageCursorMigrate {
+    RedChannelClient *rcc;
+} RedWorkerMessageCursorMigrate;
+
+typedef struct RedWorkerMessageUpdate {
+    uint32_t surface_id;
+    QXLRect * qxl_area;
+    QXLRect * qxl_dirty_rects;
+    uint32_t num_dirty_rects;
+    uint32_t clear_dirty_region;
+} RedWorkerMessageUpdate;
+
+typedef struct RedWorkerMessageUpdateAsync {
+    AsyncCommand *cmd;
+    uint32_t surface_id;
+    QXLRect qxl_area;
+    uint32_t clear_dirty_region;
+} RedWorkerMessageUpdateAsync;
+
+typedef struct RedWorkerMessageAddMemslot {
+    QXLDevMemSlot mem_slot;
+} RedWorkerMessageAddMemslot;
+
+typedef struct RedWorkerMessageAddMemslotAsync {
+    AsyncCommand *cmd;
+    QXLDevMemSlot mem_slot;
+} RedWorkerMessageAddMemslotAsync;
+
+typedef struct RedWorkerMessageDelMemslot {
+    uint32_t slot_group_id;
+    uint32_t slot_id;
+} RedWorkerMessageDelMemslot;
+
+typedef struct RedWorkerMessageDestroySurfaces {
+} RedWorkerMessageDestroySurfaces;
+
+typedef struct RedWorkerMessageDestroySurfacesAsync {
+    AsyncCommand *cmd;
+} RedWorkerMessageDestroySurfacesAsync;
+
+
+typedef struct RedWorkerMessageDestroyPrimarySurface {
+    uint32_t surface_id;
+} RedWorkerMessageDestroyPrimarySurface;
+
+typedef struct RedWorkerMessageDestroyPrimarySurfaceAsync {
+    AsyncCommand *cmd;
+    uint32_t surface_id;
+} RedWorkerMessageDestroyPrimarySurfaceAsync;
+
+typedef struct RedWorkerMessageCreatePrimarySurfaceAsync {
+    AsyncCommand *cmd;
+    uint32_t surface_id;
+    QXLDevSurfaceCreate surface;
+} RedWorkerMessageCreatePrimarySurfaceAsync;
+
+typedef struct RedWorkerMessageCreatePrimarySurface {
+    uint32_t surface_id;
+    QXLDevSurfaceCreate surface;
+} RedWorkerMessageCreatePrimarySurface;
+
+typedef struct RedWorkerMessageResetImageCache {
+} RedWorkerMessageResetImageCache;
+
+typedef struct RedWorkerMessageResetCursor {
+} RedWorkerMessageResetCursor;
+
+typedef struct RedWorkerMessageWakeup {
+} RedWorkerMessageWakeup;
+
+typedef struct RedWorkerMessageOom {
+} RedWorkerMessageOom;
+
+typedef struct RedWorkerMessageStart {
+} RedWorkerMessageStart;
+
+typedef struct RedWorkerMessageFlushSurfacesAsync {
+    AsyncCommand *cmd;
+} RedWorkerMessageFlushSurfacesAsync;
+
+typedef struct RedWorkerMessageStop {
+} RedWorkerMessageStop;
+
+/* this command is sync, so it's ok to pass a pointer */
+typedef struct RedWorkerMessageLoadvmCommands {
+    uint32_t count;
+    QXLCommandExt *ext;
+} RedWorkerMessageLoadvmCommands;
+
+typedef struct RedWorkerMessageSetCompression {
+    spice_image_compression_t image_compression;
+} RedWorkerMessageSetCompression;
+
+typedef struct RedWorkerMessageSetStreamingVideo {
+    uint32_t streaming_video;
+} RedWorkerMessageSetStreamingVideo;
+
+typedef struct RedWorkerMessageSetMouseMode {
+    uint32_t mode;
+} RedWorkerMessageSetMouseMode;
+
+typedef struct RedWorkerMessageDisplayChannelCreate {
+} RedWorkerMessageDisplayChannelCreate;
+
+typedef struct RedWorkerMessageCursorChannelCreate {
+} RedWorkerMessageCursorChannelCreate;
+
+typedef struct RedWorkerMessageDestroySurfaceWait {
+    uint32_t surface_id;
+} RedWorkerMessageDestroySurfaceWait;
+
+typedef struct RedWorkerMessageDestroySurfaceWaitAsync {
+    AsyncCommand *cmd;
+    uint32_t surface_id;
+} RedWorkerMessageDestroySurfaceWaitAsync;
+
+typedef struct RedWorkerMessageResetMemslots {
+} RedWorkerMessageResetMemslots;
+
 
 #endif
diff --git a/server/red_worker.c b/server/red_worker.c
index de8a820..0bb5b0c 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -72,6 +72,7 @@
 #include "zlib_encoder.h"
 #include "red_channel.h"
 #include "red_dispatcher.h"
+#include "dispatcher.h"
 #include "main_channel.h"
 
 //#define COMPRESS_STAT
@@ -865,9 +866,10 @@ typedef struct RedWorker {
     DisplayChannel *display_channel;
     CursorChannel *cursor_channel;
     QXLInstance *qxl;
-    RedDispatcher *dispatcher;
-    int id;
+    RedDispatcher *red_dispatcher;
+
     int channel;
+    int id;
     int running;
     uint32_t *pending;
     int epoll;
@@ -10103,21 +10105,19 @@ static void surface_dirty_region_to_rects(RedSurface *surface,
     free(dirty_rects);
 }
 
-static inline void handle_dev_update_async(RedWorker *worker)
+void handle_dev_update_async(void *opaque, void *payload)
 {
-    QXLRect qxl_rect;
+    RedWorker *worker = opaque;
+    RedWorkerMessageUpdateAsync *msg = payload;
     SpiceRect rect;
-    uint32_t surface_id;
-    uint32_t clear_dirty_region;
     QXLRect *qxl_dirty_rects;
     uint32_t num_dirty_rects;
     RedSurface *surface;
+    uint32_t surface_id = msg->surface_id;
+    QXLRect qxl_area = msg->qxl_area;
+    uint32_t clear_dirty_region = msg->clear_dirty_region;
 
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-    receive_data(worker->channel, &qxl_rect, sizeof(QXLRect));
-    receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t));
-
-    red_get_rect_ptr(&rect, &qxl_rect);
+    red_get_rect_ptr(&rect, &qxl_area);
     flush_display_commands(worker);
 
     ASSERT(worker->running);
@@ -10125,12 +10125,12 @@ static inline void handle_dev_update_async(RedWorker *worker)
     validate_surface(worker, surface_id);
     red_update_area(worker, &rect, surface_id);
     if (!worker->qxl->st->qif->update_area_complete) {
-        return;
+        goto complete;
     }
     surface = &worker->surfaces[surface_id];
     num_dirty_rects = pixman_region32_n_rects(&surface->draw_dirty_region);
     if (num_dirty_rects == 0) {
-        return;
+        goto complete;
     }
     qxl_dirty_rects = spice_new0(QXLRect, num_dirty_rects);
     surface_dirty_region_to_rects(surface, qxl_dirty_rects, num_dirty_rects,
@@ -10138,26 +10138,25 @@ static inline void handle_dev_update_async(RedWorker *worker)
     worker->qxl->st->qif->update_area_complete(worker->qxl, surface_id,
                                           qxl_dirty_rects, num_dirty_rects);
     free(qxl_dirty_rects);
+
+complete:
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
 }
 
-static inline void handle_dev_update(RedWorker *worker)
+void handle_dev_update(void *opaque, void *payload)
 {
-    const QXLRect *qxl_rect;
+    RedWorker *worker = opaque;
+    RedWorkerMessageUpdate *msg = payload;
     SpiceRect *rect = spice_new0(SpiceRect, 1);
-    QXLRect *qxl_dirty_rects;
     RedSurface *surface;
-    uint32_t num_dirty_rects;
-    uint32_t surface_id;
-    uint32_t clear_dirty_region;
-
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-    receive_data(worker->channel, &qxl_rect, sizeof(QXLRect *));
-    receive_data(worker->channel, &qxl_dirty_rects, sizeof(QXLRect *));
-    receive_data(worker->channel, &num_dirty_rects, sizeof(uint32_t));
-    receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t));
+    uint32_t surface_id = msg->surface_id;
+    const QXLRect *qxl_area = msg->qxl_area;
+    uint32_t num_dirty_rects = msg->num_dirty_rects;
+    QXLRect *qxl_dirty_rects = msg->qxl_dirty_rects;
+    uint32_t clear_dirty_region = msg->clear_dirty_region;
 
     surface = &worker->surfaces[surface_id];
-    red_get_rect_ptr(rect, qxl_rect);
+    red_get_rect_ptr(rect, qxl_area);
     flush_display_commands(worker);
 
     ASSERT(worker->running);
@@ -10170,28 +10169,36 @@ static inline void handle_dev_update(RedWorker *worker)
                                   clear_dirty_region);
 }
 
-static inline void handle_dev_add_memslot(RedWorker *worker)
+static void dev_add_memslot(RedWorker *worker, QXLDevMemSlot mem_slot)
 {
-    QXLDevMemSlot dev_slot;
+    red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id,
+                              mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end,
+                              mem_slot.generation);
+}
 
-    receive_data(worker->channel, &dev_slot, sizeof(QXLDevMemSlot));
+void handle_dev_add_memslot(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+    RedWorkerMessageAddMemslot *msg = payload;
+    QXLDevMemSlot mem_slot = msg->mem_slot;
 
-    red_memslot_info_add_slot(&worker->mem_slots, dev_slot.slot_group_id, dev_slot.slot_id,
-                              dev_slot.addr_delta, dev_slot.virt_start, dev_slot.virt_end,
-                              dev_slot.generation);
+    red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id,
+                              mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end,
+                              mem_slot.generation);
 }
 
-static inline void handle_dev_del_memslot(RedWorker *worker)
+void handle_dev_del_memslot(void *opaque, void *payload)
 {
-    uint32_t slot_id;
-    uint32_t slot_group_id;
-
-    receive_data(worker->channel, &slot_group_id, sizeof(uint32_t));
-    receive_data(worker->channel, &slot_id, sizeof(uint32_t));
+    RedWorker *worker = opaque;
+    RedWorkerMessageDelMemslot *msg = payload;
+    uint32_t slot_id = msg->slot_id;
+    uint32_t slot_group_id = msg->slot_group_id;
 
     red_memslot_info_del_slot(&worker->mem_slots, slot_group_id, slot_id);
 }
 
+/* TODO: destroy_surface_wait, dev_destroy_surface_wait - confusing. one asserts
+ * surface_id == 0, maybe move the assert upward and merge the two functions? */
 static inline void destroy_surface_wait(RedWorker *worker, int surface_id)
 {
     if (!worker->surfaces[surface_id].context.canvas) {
@@ -10206,12 +10213,8 @@ static inline void destroy_surface_wait(RedWorker *worker, int surface_id)
     red_clear_surface_drawables_from_pipes(worker, surface_id, TRUE, TRUE);
 }
 
-static inline void handle_dev_destroy_surface_wait(RedWorker *worker)
+static void dev_destroy_surface_wait(RedWorker *worker, uint32_t surface_id)
 {
-    uint32_t surface_id;
-
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-
     ASSERT(surface_id == 0);
 
     flush_all_qxl_commands(worker);
@@ -10221,6 +10224,14 @@ static inline void handle_dev_destroy_surface_wait(RedWorker *worker)
     }
 }
 
+void handle_dev_destroy_surface_wait(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroySurfaceWait *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_destroy_surface_wait(worker, msg->surface_id);
+}
+
 static inline void red_cursor_reset(RedWorker *worker)
 {
     if (worker->cursor) {
@@ -10244,7 +10255,7 @@ static inline void red_cursor_reset(RedWorker *worker)
 /* called upon device reset */
 
 /* TODO: split me*/
-static inline void handle_dev_destroy_surfaces(RedWorker *worker)
+static inline void dev_destroy_surfaces(RedWorker *worker)
 {
     int i;
 
@@ -10273,14 +10284,17 @@ static inline void handle_dev_destroy_surfaces(RedWorker *worker)
     red_cursor_reset(worker);
 }
 
-static inline void handle_dev_create_primary_surface(RedWorker *worker)
+void handle_dev_destroy_surfaces(void *opaque, void *payload)
 {
-    uint32_t surface_id;
-    QXLDevSurfaceCreate surface;
-    uint8_t *line_0;
+    RedWorker *worker = opaque;
 
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-    receive_data(worker->channel, &surface, sizeof(QXLDevSurfaceCreate));
+    dev_destroy_surfaces(worker);
+}
+
+static void dev_create_primary_surface(RedWorker *worker, uint32_t surface_id,
+                                       QXLDevSurfaceCreate surface)
+{
+    uint8_t *line_0;
 
     PANIC_ON(surface_id != 0);
     PANIC_ON(surface.height == 0);
@@ -10308,12 +10322,16 @@ static inline void handle_dev_create_primary_surface(RedWorker *worker)
     }
 }
 
-static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
+void handle_dev_create_primary_surface(void *opaque, void *payload)
 {
-    uint32_t surface_id;
+    RedWorkerMessageCreatePrimarySurface *msg = payload;
+    RedWorker *worker = opaque;
 
-    receive_data(worker->channel, &surface_id, sizeof(uint32_t));
+    dev_create_primary_surface(worker, msg->surface_id, msg->surface);
+}
 
+static void dev_destroy_primary_surface(RedWorker *worker, uint32_t surface_id)
+{
     PANIC_ON(surface_id != 0);
 
     if (!worker->surfaces[surface_id].context.canvas) {
@@ -10322,7 +10340,7 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
     }
 
     flush_all_qxl_commands(worker);
-    destroy_surface_wait(worker, 0);
+    dev_destroy_surface_wait(worker, 0);
     red_destroy_surface(worker, 0);
     ASSERT(ring_is_empty(&worker->streams));
 
@@ -10331,6 +10349,25 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
     red_cursor_reset(worker);
 }
 
+void handle_dev_destroy_primary_surface(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroyPrimarySurface *msg = payload;
+    RedWorker *worker = opaque;
+    uint32_t surface_id = msg->surface_id;
+
+    dev_destroy_primary_surface(worker, surface_id);
+}
+
+void handle_dev_destroy_primary_surface_async(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroyPrimarySurfaceAsync *msg = payload;
+    RedWorker *worker = opaque;
+    uint32_t surface_id = msg->surface_id;
+
+    dev_destroy_primary_surface(worker, surface_id);
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
+}
+
 static void flush_all_surfaces(RedWorker *worker)
 {
     int x;
@@ -10342,7 +10379,7 @@ static void flush_all_surfaces(RedWorker *worker)
     }
 }
 
-static void handle_dev_flush_surfaces(RedWorker *worker)
+static void dev_flush_surfaces(RedWorker *worker)
 {
     flush_all_qxl_commands(worker);
     flush_all_surfaces(worker);
@@ -10350,8 +10387,27 @@ static void handle_dev_flush_surfaces(RedWorker *worker)
     red_wait_outgoing_items(&worker->cursor_channel->common.base);
 }
 
-static void handle_dev_stop(RedWorker *worker)
+void handle_dev_flush_surfaces(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    dev_flush_surfaces(worker);
+}
+
+void handle_dev_flush_surfaces_async(void *opaque, void *payload)
 {
+    RedWorkerMessageFlushSurfacesAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_flush_surfaces(worker);
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
+}
+
+void handle_dev_stop(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    red_printf("stop");
     ASSERT(worker->running);
     worker->running = FALSE;
     red_display_clear_glz_drawables(worker->display_channel);
@@ -10360,8 +10416,9 @@ static void handle_dev_stop(RedWorker *worker)
     red_wait_outgoing_items(&worker->cursor_channel->common.base);
 }
 
-static void handle_dev_start(RedWorker *worker)
+void handle_dev_start(void *opaque, void *payload)
 {
+    RedWorker *worker = opaque;
     RedChannel *cursor_red_channel = &worker->cursor_channel->common.base;
     RedChannel *display_red_channel = &worker->display_channel->common.base;
 
@@ -10375,308 +10432,479 @@ static void handle_dev_start(RedWorker *worker)
     worker->running = TRUE;
 }
 
-static void handle_dev_input(EventListener *listener, uint32_t events)
+void handle_dev_wakeup(void *opaque, void *payload)
 {
-    RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
-    RedWorkerMessage message;
+    RedWorker *worker = opaque;
+
+    clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending);
+    stat_inc_counter(worker->wakeup_counter, 1);
+}
+
+void handle_dev_oom(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
     RedChannel *display_red_channel = &worker->display_channel->common.base;
     int ring_is_empty;
-    int call_async_complete = 0;
-    int write_ready = 0;
-    AsyncCommand *cmd;
-
-    read_message(worker->channel, &message);
-
-    /* for async messages we do the common work in the handler, and
-     * send a ready or call async_complete from here, hence the added switch. */
-    switch (message) {
-    case RED_WORKER_MESSAGE_UPDATE_ASYNC:
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC:
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC:
-    case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC:
-        call_async_complete = 1;
-        receive_data(worker->channel, &cmd, sizeof(cmd));
-        break;
-    case RED_WORKER_MESSAGE_UPDATE:
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES:
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE:
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT:
-    case RED_WORKER_MESSAGE_RESET_CURSOR:
-    case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
-    case RED_WORKER_MESSAGE_STOP:
-    case RED_WORKER_MESSAGE_LOADVM_COMMANDS:
-    case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE:
-    case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE:
-    case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
-    case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
-        write_ready = 1;
-    default:
-        break;
-    }
 
-    switch (message) {
-    case RED_WORKER_MESSAGE_UPDATE_ASYNC:
-        handle_dev_update_async(worker);
-        break;
-    case RED_WORKER_MESSAGE_UPDATE:
-        handle_dev_update(worker);
-        break;
-    case RED_WORKER_MESSAGE_WAKEUP:
-        clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending);
-        stat_inc_counter(worker->wakeup_counter, 1);
-        break;
-    case RED_WORKER_MESSAGE_OOM:
-        ASSERT(worker->running);
-        // streams? but without streams also leak
-        red_printf_debug(1, "WORKER",
-                         "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
-                         worker->drawable_count,
-                         worker->red_drawable_count,
-                         worker->glz_drawable_count,
-                         worker->current_size,
-                         worker->display_channel ?
-                         red_channel_sum_pipes_size(display_red_channel) : 0);
-        while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) {
-            red_channel_push(&worker->display_channel->common.base);
-        }
-        if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) {
-            red_free_some(worker);
-            worker->qxl->st->qif->flush_resources(worker->qxl);
-        }
-        red_printf_debug(1, "WORKER",
-                         "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
-                         worker->drawable_count,
-                         worker->red_drawable_count,
-                         worker->glz_drawable_count,
-                         worker->current_size,
-                         worker->display_channel ?
-                         red_channel_sum_pipes_size(display_red_channel) : 0);
-        clear_bit(RED_WORKER_PENDING_OOM, worker->pending);
-        break;
-    case RED_WORKER_MESSAGE_RESET_CURSOR:
-        red_cursor_reset(worker);
-        break;
-    case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
-        image_cache_reset(&worker->image_cache);
-        break;
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT:
-        handle_dev_destroy_surface_wait(worker);
-        break;
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_SURFACES:
-        handle_dev_destroy_surfaces(worker);
-        break;
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE:
-        handle_dev_create_primary_surface(worker);
-        break;
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC:
-    case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
-        handle_dev_destroy_primary_surface(worker);
-        break;
-    case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: {
-        RedChannel *red_channel;
-        // TODO: handle seemless migration. Temp, setting migrate to FALSE
-        ensure_display_channel_created(worker, FALSE);
-        red_channel = &worker->display_channel->common.base;
-        send_data(worker->channel, &red_channel, sizeof(RedChannel *));
-        break;
+    ASSERT(worker->running);
+    // streams? but without streams also leak
+    red_printf_debug(1, "WORKER",
+                     "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
+                     worker->drawable_count,
+                     worker->red_drawable_count,
+                     worker->glz_drawable_count,
+                     worker->current_size,
+                     worker->display_channel ?
+                     red_channel_sum_pipes_size(display_red_channel) : 0);
+    while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) {
+        red_channel_push(&worker->display_channel->common.base);
     }
-    case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
-        RedsStream *stream;
-        RedClient *client;
-        int migrate;
-        red_printf("connect");
-
-        receive_data(worker->channel, &client, sizeof(RedClient *));
-        receive_data(worker->channel, &stream, sizeof(RedsStream *));
-        receive_data(worker->channel, &migrate, sizeof(int));
-        handle_new_display_channel(worker, client, stream, migrate);
-        break;
+    if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) {
+        red_free_some(worker);
+        worker->qxl->st->qif->flush_resources(worker->qxl);
     }
-    case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: {
-        RedChannelClient *rcc;
+    red_printf_debug(1, "WORKER",
+                     "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
+                     worker->drawable_count,
+                     worker->red_drawable_count,
+                     worker->glz_drawable_count,
+                     worker->current_size,
+                     worker->display_channel ?
+                     red_channel_sum_pipes_size(display_red_channel) : 0);
+    clear_bit(RED_WORKER_PENDING_OOM, worker->pending);
+}
 
-        red_printf("disconnect display client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        display_channel_client_disconnect(rcc);
-        break;
-    }
-    case RED_WORKER_MESSAGE_STOP: {
-        red_printf("stop");
-        handle_dev_stop(worker);
+void handle_dev_reset_cursor(void *opaque, void *payload)
+{
+    red_cursor_reset((RedWorker *)opaque);
+}
+
+void handle_dev_reset_image_cache(void *opaque, void *payload)
+{
+    image_cache_reset(&((RedWorker *)opaque)->image_cache);
+}
+
+void handle_dev_destroy_surface_wait_async(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroySurfaceWaitAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_destroy_surface_wait(worker, msg->surface_id);
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
+}
+
+void handle_dev_destroy_surfaces_async(void *opaque, void *payload)
+{
+    RedWorkerMessageDestroySurfacesAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_destroy_surfaces(worker);
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
+}
+
+void handle_dev_create_primary_surface_async(void *opaque, void *payload)
+{
+    RedWorkerMessageCreatePrimarySurfaceAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_create_primary_surface(worker, msg->surface_id, msg->surface);
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
+}
+
+/* exception for Dispatcher, data going from red_worker to main thread,
+ * TODO: use a different dispatcher?
+ * TODO: leave direct usage of channel(fd)? It's only used right after the
+ * pthread is created, since the channel duration is the lifetime of the spice
+ * server. */
+
+void handle_dev_display_channel_create(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    RedChannel *red_channel;
+    // TODO: handle seemless migration. Temp, setting migrate to FALSE
+    ensure_display_channel_created(worker, FALSE);
+    red_channel = &worker->display_channel->common.base;
+    send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+}
+
+void handle_dev_display_connect(void *opaque, void *payload)
+{
+    RedWorkerMessageDisplayConnect *msg = payload;
+    RedWorker *worker = opaque;
+    RedsStream *stream = msg->stream;
+    RedClient *client = msg->client;
+    int migration = msg->migration;
+
+    red_printf("connect");
+    handle_new_display_channel(worker, client, stream, migration);
+}
+
+void handle_dev_display_disconnect(void *opaque, void *payload)
+{
+    RedWorkerMessageDisplayDisconnect *msg = payload;
+    RedChannelClient *rcc = msg->rcc;
+
+    red_printf("disconnect display client");
+    ASSERT(rcc);
+    display_channel_client_disconnect(rcc);
+}
+
+void handle_dev_display_migrate(void *opaque, void *payload)
+{
+    RedWorkerMessageDisplayMigrate *msg = payload;
+    RedWorker *worker = opaque;
+
+    RedChannelClient *rcc = msg->rcc;
+    red_printf("migrate display client");
+    ASSERT(rcc);
+    red_migrate_display(worker, rcc);
+}
+
+/* TODO: special, perhaps use another dispatcher? */
+void handle_dev_cursor_channel_create(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+    RedChannel *red_channel;
+
+    // TODO: handle seemless migration. Temp, setting migrate to FALSE
+    ensure_cursor_channel_created(worker, FALSE);
+    red_channel = &worker->cursor_channel->common.base;
+    send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+}
+
+void handle_dev_cursor_connect(void *opaque, void *payload)
+{
+    RedWorkerMessageCursorConnect *msg = payload;
+    RedWorker *worker = opaque;
+    RedsStream *stream = msg->stream;
+    RedClient *client = msg->client;
+    int migration = msg->migration;
+
+    red_printf("cursor connect");
+    red_connect_cursor(worker, client, stream, migration);
+}
+
+void handle_dev_cursor_disconnect(void *opaque, void *payload)
+{
+    RedWorkerMessageCursorDisconnect *msg = payload;
+    RedChannelClient *rcc = msg->rcc;
+
+    red_printf("disconnect cursor client");
+    ASSERT(rcc);
+    cursor_channel_client_disconnect(rcc);
+}
+
+void handle_dev_cursor_migrate(void *opaque, void *payload)
+{
+    RedWorkerMessageCursorMigrate *msg = payload;
+    RedWorker *worker = opaque;
+    RedChannelClient *rcc = msg->rcc;
+
+    red_printf("migrate cursor client");
+    ASSERT(rcc);
+    red_migrate_cursor(worker, rcc);
+}
+
+void handle_dev_set_compression(void *opaque, void *payload)
+{
+    RedWorkerMessageSetCompression *msg = payload;
+    RedWorker *worker = opaque;
+
+    worker->image_compression = msg->image_compression;
+    switch (worker->image_compression) {
+    case SPICE_IMAGE_COMPRESS_AUTO_LZ:
+        red_printf("ic auto_lz");
         break;
-    }
-    case RED_WORKER_MESSAGE_START:
-        red_printf("start");
-        handle_dev_start(worker);
+    case SPICE_IMAGE_COMPRESS_AUTO_GLZ:
+        red_printf("ic auto_glz");
         break;
-    case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: {
-        RedChannelClient *rcc;
-        red_printf("migrate display client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        red_migrate_display(worker, rcc);
+    case SPICE_IMAGE_COMPRESS_QUIC:
+        red_printf("ic quic");
         break;
-    }
-    case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: {
-        RedChannel *red_channel;
-        // TODO: handle seemless migration. Temp, setting migrate to FALSE
-        ensure_cursor_channel_created(worker, FALSE);
-        red_channel = &worker->cursor_channel->common.base;
-        send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+    case SPICE_IMAGE_COMPRESS_LZ:
+        red_printf("ic lz");
         break;
-    }
-    case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
-        RedsStream *stream;
-        RedClient *client;
-        int migrate;
-
-        red_printf("cursor connect");
-        receive_data(worker->channel, &client, sizeof(RedClient *));
-        receive_data(worker->channel, &stream, sizeof(RedsStream *));
-        receive_data(worker->channel, &migrate, sizeof(int));
-        red_connect_cursor(worker, client, stream, migrate);
+    case SPICE_IMAGE_COMPRESS_GLZ:
+        red_printf("ic glz");
         break;
-    }
-    case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: {
-        RedChannelClient *rcc;
-
-        red_printf("disconnect cursor client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        cursor_channel_client_disconnect(rcc);
+    case SPICE_IMAGE_COMPRESS_OFF:
+        red_printf("ic off");
         break;
+    default:
+        red_printf("ic invalid");
     }
-    case RED_WORKER_MESSAGE_CURSOR_MIGRATE: {
-        RedChannelClient *rcc;
-        red_printf("migrate cursor client");
-        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
-        ASSERT(rcc);
-        red_migrate_cursor(worker, rcc);
-        break;
+#ifdef COMPRESS_STAT
+    print_compress_stats(worker->display_channel);
+    if (worker->display_channel) {
+        stat_reset(&worker->display_channel->quic_stat);
+        stat_reset(&worker->display_channel->lz_stat);
+        stat_reset(&worker->display_channel->glz_stat);
+        stat_reset(&worker->display_channel->jpeg_stat);
+        stat_reset(&worker->display_channel->zlib_glz_stat);
+        stat_reset(&worker->display_channel->jpeg_alpha_stat);
     }
-    case RED_WORKER_MESSAGE_SET_COMPRESSION:
-        receive_data(worker->channel, &worker->image_compression,
-                     sizeof(spice_image_compression_t));
-        switch (worker->image_compression) {
-        case SPICE_IMAGE_COMPRESS_AUTO_LZ:
-            red_printf("ic auto_lz");
-            break;
-        case SPICE_IMAGE_COMPRESS_AUTO_GLZ:
-            red_printf("ic auto_glz");
+#endif
+}
+
+void handle_dev_set_streaming_video(void *opaque, void *payload)
+{
+    RedWorkerMessageSetStreamingVideo *msg = payload;
+    RedWorker *worker = opaque;
+
+    worker->streaming_video = msg->streaming_video;
+    ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID);
+    switch(worker->streaming_video) {
+        case STREAM_VIDEO_ALL:
+            red_printf("sv all");
             break;
-        case SPICE_IMAGE_COMPRESS_QUIC:
-            red_printf("ic quic");
+        case STREAM_VIDEO_FILTER:
+            red_printf("sv filter");
             break;
-        case SPICE_IMAGE_COMPRESS_LZ:
-            red_printf("ic lz");
+        case STREAM_VIDEO_OFF:
+            red_printf("sv off");
             break;
-        case SPICE_IMAGE_COMPRESS_GLZ:
-            red_printf("ic glz");
+        default:
+            red_printf("sv invalid");
+    }
+}
+
+void handle_dev_set_mouse_mode(void *opaque, void *payload)
+{
+    RedWorkerMessageSetMouseMode *msg = payload;
+    RedWorker *worker = opaque;
+
+    worker->mouse_mode = msg->mode;
+    red_printf("mouse mode %u", worker->mouse_mode);
+}
+
+void handle_dev_add_memslot_async(void *opaque, void *payload)
+{
+    RedWorkerMessageAddMemslotAsync *msg = payload;
+    RedWorker *worker = opaque;
+
+    dev_add_memslot(worker, msg->mem_slot);
+    red_dispatcher_async_complete(worker->red_dispatcher, msg->cmd);
+}
+
+void handle_dev_reset_memslots(void *opaque, void *payload)
+{
+    RedWorker *worker = opaque;
+
+    red_memslot_info_reset(&worker->mem_slots);
+}
+
+void handle_dev_loadvm_commands(void *opaque, void *payload)
+{
+    RedWorkerMessageLoadvmCommands *msg = payload;
+    RedWorker *worker = opaque;
+    uint32_t i;
+    RedCursorCmd *cursor_cmd;
+    RedSurfaceCmd *surface_cmd;
+    uint32_t count = msg->count;
+    QXLCommandExt *ext = msg->ext;
+
+    red_printf("loadvm_commands");
+    for (i = 0 ; i < count ; ++i) {
+        switch (ext[i].cmd.type) {
+        case QXL_CMD_CURSOR:
+            cursor_cmd = spice_new0(RedCursorCmd, 1);
+            red_get_cursor_cmd(&worker->mem_slots, ext[i].group_id,
+                               cursor_cmd, ext[i].cmd.data);
+            qxl_process_cursor(worker, cursor_cmd, ext[i].group_id);
             break;
-        case SPICE_IMAGE_COMPRESS_OFF:
-            red_printf("ic off");
+        case QXL_CMD_SURFACE:
+            surface_cmd = spice_new0(RedSurfaceCmd, 1);
+            red_get_surface_cmd(&worker->mem_slots, ext[i].group_id,
+                                surface_cmd, ext[i].cmd.data);
+            red_process_surface(worker, surface_cmd, ext[i].group_id, TRUE);
             break;
         default:
-            red_printf("ic invalid");
-        }
-#ifdef COMPRESS_STAT
-        print_compress_stats(worker->display_channel);
-        if (worker->display_channel) {
-            stat_reset(&worker->display_channel->quic_stat);
-            stat_reset(&worker->display_channel->lz_stat);
-            stat_reset(&worker->display_channel->glz_stat);
-            stat_reset(&worker->display_channel->jpeg_stat);
-            stat_reset(&worker->display_channel->zlib_glz_stat);
-            stat_reset(&worker->display_channel->jpeg_alpha_stat);
-        }
-#endif
-        break;
-    case RED_WORKER_MESSAGE_SET_STREAMING_VIDEO:
-        receive_data(worker->channel, &worker->streaming_video, sizeof(uint32_t));
-        ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID);
-        switch(worker->streaming_video) {
-            case STREAM_VIDEO_ALL:
-                red_printf("sv all");
-                break;
-            case STREAM_VIDEO_FILTER:
-                red_printf("sv filter");
-                break;
-            case STREAM_VIDEO_OFF:
-                red_printf("sv off");
-                break;
-            default:
-                red_printf("sv invalid");
-        }
-        break;
-    case RED_WORKER_MESSAGE_SET_MOUSE_MODE:
-        receive_data(worker->channel, &worker->mouse_mode, sizeof(uint32_t));
-        red_printf("mouse mode %u", worker->mouse_mode);
-        break;
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC:
-    case RED_WORKER_MESSAGE_ADD_MEMSLOT:
-        handle_dev_add_memslot(worker);
-        break;
-    case RED_WORKER_MESSAGE_DEL_MEMSLOT:
-        handle_dev_del_memslot(worker);
-        break;
-    case RED_WORKER_MESSAGE_RESET_MEMSLOTS:
-        red_memslot_info_reset(&worker->mem_slots);
-        break;
-    case RED_WORKER_MESSAGE_LOADVM_COMMANDS: {
-        uint32_t count;
-        QXLCommandExt ext;
-        RedCursorCmd *cursor_cmd;
-        RedSurfaceCmd *surface_cmd;
-
-        red_printf("loadvm_commands");
-        receive_data(worker->channel, &count, sizeof(uint32_t));
-        while (count > 0) {
-            receive_data(worker->channel, &ext, sizeof(QXLCommandExt));
-            switch (ext.cmd.type) {
-            case QXL_CMD_CURSOR:
-                cursor_cmd = spice_new0(RedCursorCmd, 1);
-                red_get_cursor_cmd(&worker->mem_slots, ext.group_id,
-                                   cursor_cmd, ext.cmd.data);
-                qxl_process_cursor(worker, cursor_cmd, ext.group_id);
-                break;
-            case QXL_CMD_SURFACE:
-                surface_cmd = spice_new0(RedSurfaceCmd, 1);
-                red_get_surface_cmd(&worker->mem_slots, ext.group_id,
-                                    surface_cmd, ext.cmd.data);
-                red_process_surface(worker, surface_cmd, ext.group_id, TRUE);
-                break;
-            default:
-                red_printf("unhandled loadvm command type (%d)", ext.cmd.type);
-                break;
-            }
-            count--;
+            red_printf("unhandled loadvm command type (%d)", ext[i].cmd.type);
+            break;
         }
-        break;
-    }
-    case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC:
-        handle_dev_flush_surfaces(worker);
-        break;
-    default:
-        red_error("message error");
-    }
-    if (call_async_complete) {
-        red_dispatcher_async_complete(worker->dispatcher, cmd);
-    }
-    if (write_ready) {
-        message = RED_WORKER_MESSAGE_READY;
-        write_message(worker->channel, &message);
     }
 }
 
+static void register_callbacks(Dispatcher *dispatcher)
+{
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_CONNECT,
+                                handle_dev_display_connect,
+                                sizeof(RedWorkerMessageDisplayConnect),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+                                handle_dev_display_disconnect,
+                                sizeof(RedWorkerMessageDisplayDisconnect),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
+                                handle_dev_display_migrate,
+                                sizeof(RedWorkerMessageDisplayMigrate),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_CONNECT,
+                                handle_dev_cursor_connect,
+                                sizeof(RedWorkerMessageCursorConnect),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+                                handle_dev_cursor_disconnect,
+                                sizeof(RedWorkerMessageCursorDisconnect),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_MIGRATE,
+                                handle_dev_cursor_migrate,
+                                sizeof(RedWorkerMessageCursorMigrate),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_UPDATE,
+                                handle_dev_update,
+                                sizeof(RedWorkerMessageUpdate),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_UPDATE_ASYNC,
+                                handle_dev_update_async,
+                                sizeof(RedWorkerMessageUpdateAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_ADD_MEMSLOT,
+                                handle_dev_add_memslot,
+                                sizeof(RedWorkerMessageAddMemslot),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC,
+                                handle_dev_add_memslot_async,
+                                sizeof(RedWorkerMessageAddMemslotAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DEL_MEMSLOT,
+                                handle_dev_del_memslot,
+                                sizeof(RedWorkerMessageDelMemslot),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACES,
+                                handle_dev_destroy_surfaces,
+                                sizeof(RedWorkerMessageDestroySurfaces),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC,
+                                handle_dev_destroy_surfaces_async,
+                                sizeof(RedWorkerMessageDestroySurfacesAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
+                                handle_dev_destroy_primary_surface,
+                                sizeof(RedWorkerMessageDestroyPrimarySurface),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC,
+                                handle_dev_destroy_primary_surface_async,
+                                sizeof(RedWorkerMessageDestroyPrimarySurfaceAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC,
+                                handle_dev_create_primary_surface_async,
+                                sizeof(RedWorkerMessageCreatePrimarySurfaceAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
+                                handle_dev_create_primary_surface,
+                                sizeof(RedWorkerMessageCreatePrimarySurface),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
+                                handle_dev_reset_image_cache,
+                                sizeof(RedWorkerMessageResetImageCache),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_RESET_CURSOR,
+                                handle_dev_reset_cursor,
+                                sizeof(RedWorkerMessageResetCursor),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_WAKEUP,
+                                handle_dev_wakeup,
+                                sizeof(RedWorkerMessageWakeup),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_OOM,
+                                handle_dev_oom,
+                                sizeof(RedWorkerMessageOom),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_START,
+                                handle_dev_start,
+                                sizeof(RedWorkerMessageStart),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
+                                handle_dev_flush_surfaces_async,
+                                sizeof(RedWorkerMessageFlushSurfacesAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_STOP,
+                                handle_dev_stop,
+                                sizeof(RedWorkerMessageStop),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_LOADVM_COMMANDS,
+                                handle_dev_loadvm_commands,
+                                sizeof(RedWorkerMessageLoadvmCommands),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_SET_COMPRESSION,
+                                handle_dev_set_compression,
+                                sizeof(RedWorkerMessageSetCompression),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
+                                handle_dev_set_streaming_video,
+                                sizeof(RedWorkerMessageSetStreamingVideo),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_SET_MOUSE_MODE,
+                                handle_dev_set_mouse_mode,
+                                sizeof(RedWorkerMessageSetMouseMode),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+                                handle_dev_display_channel_create,
+                                sizeof(RedWorkerMessageDisplayChannelCreate),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+                                handle_dev_cursor_channel_create,
+                                sizeof(RedWorkerMessageCursorChannelCreate),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
+                                handle_dev_destroy_surface_wait,
+                                sizeof(RedWorkerMessageDestroySurfaceWait),
+                                1);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
+                                handle_dev_destroy_surface_wait_async,
+                                sizeof(RedWorkerMessageDestroySurfaceWaitAsync),
+                                0);
+    dispatcher_register_handler(dispatcher,
+                                RED_WORKER_MESSAGE_RESET_MEMSLOTS,
+                                handle_dev_reset_memslots,
+                                sizeof(RedWorkerMessageResetMemslots),
+                                0);
+}
+
+
+
+static void handle_dev_input(EventListener *listener, uint32_t events)
+{
+    RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
+
+    dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher));
+}
+
 static void handle_dev_free(EventListener *ctx)
 {
     free(ctx);
@@ -10687,14 +10915,18 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
     struct epoll_event event;
     RedWorkerMessage message;
     int epoll;
+    Dispatcher *dispatcher;
 
     ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE);
 
     memset(worker, 0, sizeof(RedWorker));
-    worker->dispatcher = init_data->dispatcher;
+    dispatcher = red_dispatcher_get_dispatcher(init_data->red_dispatcher);
+    dispatcher_set_opaque(dispatcher, worker);
+    worker->red_dispatcher = init_data->red_dispatcher;
     worker->qxl = init_data->qxl;
     worker->id = init_data->id;
-    worker->channel = init_data->channel;
+    worker->channel = dispatcher_get_recv_fd(dispatcher);
+    register_callbacks(dispatcher);
     worker->pending = init_data->pending;
     worker->dev_listener.refs = 1;
     worker->dev_listener.action = handle_dev_input;
diff --git a/server/red_worker.h b/server/red_worker.h
index 26c43ad..08c7b22 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -84,6 +84,8 @@ enum {
 
     RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
     RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+
+    RED_WORKER_MESSAGE_COUNT // LAST
 };
 
 typedef uint32_t RedWorkerMessage;
@@ -102,7 +104,6 @@ typedef struct RedDispatcher RedDispatcher;
 typedef struct WorkerInitData {
     struct QXLInstance *qxl;
     int id;
-    int channel;
     uint32_t *pending;
     uint32_t num_renderers;
     uint32_t renderers[RED_MAX_RENDERERS];
@@ -116,7 +117,7 @@ typedef struct WorkerInitData {
     uint8_t memslot_id_bits;
     uint8_t internal_groupslot_id;
     uint32_t n_surfaces;
-    RedDispatcher *dispatcher;
+    RedDispatcher *red_dispatcher;
 } WorkerInitData;
 
 void *red_worker_main(void *arg);
-- 
1.7.7.1



More information about the Spice-devel mailing list