[Spice-commits] 5 commits - server/dispatcher.c server/dispatcher.h server/main_dispatcher.c server/Makefile.am server/red_dispatcher.c server/red_dispatcher.h server/red_worker.c server/red_worker.h
Alon Levy
alon at kemper.freedesktop.org
Tue Nov 8 06:24:54 PST 2011
server/Makefile.am | 2
server/dispatcher.c | 261 +++++++++++++
server/dispatcher.h | 106 +++++
server/main_dispatcher.c | 102 +----
server/red_dispatcher.c | 489 ++++++++++++++-----------
server/red_dispatcher.h | 149 +++++++
server/red_worker.c | 903 +++++++++++++++++++++++++++++------------------
server/red_worker.h | 5
8 files changed, 1394 insertions(+), 623 deletions(-)
New commits:
commit 8e049ce3b03c5e0034702d2a4b49b7ca36aaff92
Author: Alon Levy <alevy at redhat.com>
Date: Mon Oct 31 17:35:30 2011 +0200
server/red_worker: reuse dispatcher
This patch reuses Dispatcher in RedDispatcher. It adds two helpers
to red_worker to keep RedWorker opaque to the outside. The dispatcher is
abused in three places that use the underlying socket directly:
once sending a READY after red_init completes
once for each channel creation, replying with the RedChannel instance
for cursor and display.
FDO Bugzilla: 42463
rfc->v1:
* move callbacks to red_worker.c including registration (Yonit)
* rename dispatcher to red_dispatcher in red_worker.c and red_dispatcher.c
* add accessor red_dispatcher_get_dispatcher
* s/dispatcher_handle_recv/dispatcher_handle_recv_read/ and change sig to
just Dispatcher *dispatcher (was the SpiceCoreInterface one)
* remove SpiceCoreInterface parameter from dispatcher_init (Yonit)
* main_dispatcher needed it for channel_event so it has it in
struct MainDispatcher
* add dispatcher_get_recv_fd for red_worker
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 0c7a875..17b469e 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -37,6 +37,7 @@
#include "reds_gl_canvas.h"
#endif // USE_OPENGL
#include "reds.h"
+#include "dispatcher.h"
#include "red_dispatcher.h"
#include "red_parse_qxl.h"
@@ -58,7 +59,7 @@ struct AsyncCommand {
struct RedDispatcher {
QXLWorker base;
QXLInstance *qxl;
- int channel;
+ Dispatcher dispatcher;
pthread_t worker_thread;
uint32_t pending;
int primary_active;
@@ -93,19 +94,22 @@ static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *clie
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
+ RedWorkerMessageDisplayConnect payload;
RedDispatcher *dispatcher;
red_printf("");
dispatcher = (RedDispatcher *)channel->data;
- RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &client, sizeof(RedClient *));
- send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
- send_data(dispatcher->channel, &migration, sizeof(int));
+ payload.client = client;
+ payload.stream = stream;
+ payload.migration = migration;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_CONNECT,
+ &payload);
}
static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
{
+ RedWorkerMessageDisplayDisconnect payload;
RedDispatcher *dispatcher;
if (!rcc->channel) {
@@ -115,27 +119,28 @@ static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
dispatcher = (RedDispatcher *)rcc->channel->data;
red_printf("");
- RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+ payload.rcc = rcc;
// TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count
// for channels
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+ &payload);
}
static void red_dispatcher_display_migrate(RedChannelClient *rcc)
{
+ RedWorkerMessageDisplayMigrate payload;
RedDispatcher *dispatcher;
if (!rcc->channel) {
return;
}
dispatcher = (RedDispatcher *)rcc->channel->data;
red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
- RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+ payload.rcc = rcc;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
+ &payload);
}
static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
@@ -143,17 +148,20 @@ static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *clien
uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
+ RedWorkerMessageCursorConnect payload;
RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
red_printf("");
- RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &client, sizeof(RedClient *));
- send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
- send_data(dispatcher->channel, &migration, sizeof(int));
+ payload.client = client;
+ payload.stream = stream;
+ payload.migration = migration;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_CONNECT,
+ &payload);
}
static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
{
+ RedWorkerMessageCursorDisconnect payload;
RedDispatcher *dispatcher;
if (!rcc->channel) {
@@ -162,16 +170,16 @@ static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
dispatcher = (RedDispatcher *)rcc->channel->data;
red_printf("");
- RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+ payload.rcc = rcc;
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+ &payload);
}
static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
{
+ RedWorkerMessageCursorMigrate payload;
RedDispatcher *dispatcher;
if (!rcc->channel) {
@@ -179,8 +187,10 @@ static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
}
dispatcher = (RedDispatcher *)rcc->channel->data;
red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
- RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE;
- write_message(dispatcher->channel, &message);
+ payload.rcc = rcc;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_MIGRATE,
+ &payload);
}
typedef struct RendererInfo {
@@ -263,16 +273,16 @@ static void red_dispatcher_update_area(RedDispatcher *dispatcher, uint32_t surfa
QXLRect *qxl_area, QXLRect *qxl_dirty_rects,
uint32_t num_dirty_rects, uint32_t clear_dirty_region)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE;
+ RedWorkerMessageUpdate payload;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
- send_data(dispatcher->channel, &qxl_area, sizeof(QXLRect *));
- send_data(dispatcher->channel, &qxl_dirty_rects, sizeof(QXLRect *));
- send_data(dispatcher->channel, &num_dirty_rects, sizeof(uint32_t));
- send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t));
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ payload.surface_id = surface_id;
+ payload.qxl_area = qxl_area;
+ payload.qxl_dirty_rects = qxl_dirty_rects;
+ payload.num_dirty_rects = num_dirty_rects;
+ payload.clear_dirty_region = clear_dirty_region;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_UPDATE,
+ &payload);
}
static AsyncCommand *async_command_alloc(RedDispatcher *dispatcher,
@@ -297,13 +307,15 @@ static void red_dispatcher_update_area_async(RedDispatcher *dispatcher,
uint64_t cookie)
{
RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE_ASYNC;
- AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
+ RedWorkerMessageUpdateAsync payload;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
- send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
- send_data(dispatcher->channel, qxl_area, sizeof(QXLRect));
- send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t));
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ payload.surface_id = surface_id;
+ payload.qxl_area = *qxl_area;
+ payload.clear_dirty_region = clear_dirty_region;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ message,
+ &payload);
}
static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id,
@@ -316,12 +328,12 @@ static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id,
static void red_dispatcher_add_memslot(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT;
+ RedWorkerMessageAddMemslot payload;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot));
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ payload.mem_slot = *mem_slot;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_ADD_MEMSLOT,
+ &payload);
}
static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slot)
@@ -331,21 +343,22 @@ static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slo
static void red_dispatcher_add_memslot_async(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot, uint64_t cookie)
{
+ RedWorkerMessageAddMemslotAsync payload;
RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC;
- AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
- send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot));
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ payload.mem_slot = *mem_slot;
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
}
static void red_dispatcher_del_memslot(RedDispatcher *dispatcher, uint32_t slot_group_id, uint32_t slot_id)
{
+ RedWorkerMessageDelMemslot payload;
RedWorkerMessage message = RED_WORKER_MESSAGE_DEL_MEMSLOT;
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &slot_group_id, sizeof(uint32_t));
- send_data(dispatcher->channel, &slot_id, sizeof(uint32_t));
+ payload.slot_group_id = slot_group_id;
+ payload.slot_id = slot_id;
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
}
static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id, uint32_t slot_id)
@@ -355,11 +368,11 @@ static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id
static void red_dispatcher_destroy_surfaces(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES;
+ RedWorkerMessageDestroySurfaces payload;
- write_message(dispatcher->channel, &message);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_SURFACES,
+ &payload);
}
static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker)
@@ -369,11 +382,11 @@ static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker)
static void red_dispatcher_destroy_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie)
{
+ RedWorkerMessageDestroySurfacesAsync payload;
RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC;
- AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
}
static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispatcher)
@@ -387,28 +400,37 @@ static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispa
}
static void
+red_dispatcher_destroy_primary_surface_sync(RedDispatcher *dispatcher,
+ uint32_t surface_id)
+{
+ RedWorkerMessageDestroyPrimarySurface payload;
+ payload.surface_id = surface_id;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
+ &payload);
+ red_dispatcher_destroy_primary_surface_complete(dispatcher);
+}
+
+static void
+red_dispatcher_destroy_primary_surface_async(RedDispatcher *dispatcher,
+ uint32_t surface_id, uint64_t cookie)
+{
+ RedWorkerMessageDestroyPrimarySurfaceAsync payload;
+ RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC;
+
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ payload.surface_id = surface_id;
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void
red_dispatcher_destroy_primary_surface(RedDispatcher *dispatcher,
uint32_t surface_id, int async, uint64_t cookie)
{
- RedWorkerMessage message;
- AsyncCommand *cmd = NULL;
-
if (async) {
- message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC;
- cmd = async_command_alloc(dispatcher, message, cookie);
+ red_dispatcher_destroy_primary_surface_async(dispatcher, surface_id, cookie);
} else {
- message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE;
- }
-
- write_message(dispatcher->channel, &message);
- if (async) {
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
- }
- send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
- if (!async) {
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
- red_dispatcher_destroy_primary_surface_complete(dispatcher);
+ red_dispatcher_destroy_primary_surface_sync(dispatcher, surface_id);
}
}
@@ -431,30 +453,42 @@ static void red_dispatcher_create_primary_surface_complete(RedDispatcher *dispat
}
static void
-red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id,
- QXLDevSurfaceCreate *surface, int async, uint64_t cookie)
+red_dispatcher_create_primary_surface_async(RedDispatcher *dispatcher, uint32_t surface_id,
+ QXLDevSurfaceCreate *surface, uint64_t cookie)
{
- RedWorkerMessage message;
- AsyncCommand *cmd = NULL;
+ RedWorkerMessageCreatePrimarySurfaceAsync payload;
+ RedWorkerMessage message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC;
+
+ dispatcher->surface_create = *surface;
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ payload.surface_id = surface_id;
+ payload.surface = *surface;
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void
+red_dispatcher_create_primary_surface_sync(RedDispatcher *dispatcher, uint32_t surface_id,
+ QXLDevSurfaceCreate *surface)
+{
+ RedWorkerMessageCreatePrimarySurface payload;
- if (async) {
- message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC;
- cmd = async_command_alloc(dispatcher, message, cookie);
- } else {
- message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE;
- }
dispatcher->surface_create = *surface;
+ payload.surface_id = surface_id;
+ payload.surface = *surface;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
+ &payload);
+ red_dispatcher_create_primary_surface_complete(dispatcher);
+}
- write_message(dispatcher->channel, &message);
+static void
+red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id,
+ QXLDevSurfaceCreate *surface, int async, uint64_t cookie)
+{
if (async) {
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
- }
- send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
- send_data(dispatcher->channel, surface, sizeof(QXLDevSurfaceCreate));
- if (!async) {
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
- red_dispatcher_create_primary_surface_complete(dispatcher);
+ red_dispatcher_create_primary_surface_async(dispatcher, surface_id, surface, cookie);
+ } else {
+ red_dispatcher_create_primary_surface_sync(dispatcher, surface_id, surface);
}
}
@@ -466,11 +500,11 @@ static void qxl_worker_create_primary_surface(QXLWorker *qxl_worker, uint32_t su
static void red_dispatcher_reset_image_cache(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_IMAGE_CACHE;
+ RedWorkerMessageResetImageCache payload;
- write_message(dispatcher->channel, &message);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
+ &payload);
}
static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker)
@@ -480,11 +514,11 @@ static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker)
static void red_dispatcher_reset_cursor(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_CURSOR;
+ RedWorkerMessageResetCursor payload;
- write_message(dispatcher->channel, &message);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_RESET_CURSOR,
+ &payload);
}
static void qxl_worker_reset_cursor(QXLWorker *qxl_worker)
@@ -492,29 +526,38 @@ static void qxl_worker_reset_cursor(QXLWorker *qxl_worker)
red_dispatcher_reset_cursor((RedDispatcher*)qxl_worker);
}
-static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher, uint32_t surface_id,
- int async, uint64_t cookie)
+static void red_dispatcher_destroy_surface_wait_sync(RedDispatcher *dispatcher,
+ uint32_t surface_id)
{
- RedWorkerMessage message;
- AsyncCommand *cmd = NULL;
+ RedWorkerMessageDestroySurfaceWait payload;
- if (async ) {
- message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC;
- cmd = async_command_alloc(dispatcher, message, cookie);
- } else {
- message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT;
- }
+ payload.surface_id = surface_id;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
+ &payload);
+}
- write_message(dispatcher->channel, &message);
- if (async) {
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
- }
- send_data(dispatcher->channel, &surface_id, sizeof(uint32_t));
+static void red_dispatcher_destroy_surface_wait_async(RedDispatcher *dispatcher,
+ uint32_t surface_id,
+ uint64_t cookie)
+{
+ RedWorkerMessageDestroySurfaceWaitAsync payload;
+ RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC;
+
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ payload.surface_id = surface_id;
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
+}
+
+static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher,
+ uint32_t surface_id,
+ int async, uint64_t cookie)
+{
if (async) {
- return;
+ red_dispatcher_destroy_surface_wait_async(dispatcher, surface_id, cookie);
+ } else {
+ red_dispatcher_destroy_surface_wait_sync(dispatcher, surface_id);
}
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
}
static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surface_id)
@@ -524,9 +567,11 @@ static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surf
static void red_dispatcher_reset_memslots(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_MEMSLOTS;
+ RedWorkerMessageResetMemslots payload;
- write_message(dispatcher->channel, &message);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_RESET_MEMSLOTS,
+ &payload);
}
static void qxl_worker_reset_memslots(QXLWorker *qxl_worker)
@@ -536,11 +581,15 @@ static void qxl_worker_reset_memslots(QXLWorker *qxl_worker)
static void red_dispatcher_wakeup(RedDispatcher *dispatcher)
{
- if (!test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) {
- RedWorkerMessage message = RED_WORKER_MESSAGE_WAKEUP;
- set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending);
- write_message(dispatcher->channel, &message);
+ RedWorkerMessageWakeup payload;
+
+ if (test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) {
+ return;
}
+ set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_WAKEUP,
+ &payload);
}
static void qxl_worker_wakeup(QXLWorker *qxl_worker)
@@ -550,11 +599,15 @@ static void qxl_worker_wakeup(QXLWorker *qxl_worker)
static void red_dispatcher_oom(RedDispatcher *dispatcher)
{
- if (!test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) {
- RedWorkerMessage message = RED_WORKER_MESSAGE_OOM;
- set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending);
- write_message(dispatcher->channel, &message);
+ RedWorkerMessageOom payload;
+
+ if (test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) {
+ return;
}
+ set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_OOM,
+ &payload);
}
static void qxl_worker_oom(QXLWorker *qxl_worker)
@@ -564,9 +617,11 @@ static void qxl_worker_oom(QXLWorker *qxl_worker)
static void red_dispatcher_start(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_START;
+ RedWorkerMessageStart payload;
- write_message(dispatcher->channel, &message);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_START,
+ &payload);
}
static void qxl_worker_start(QXLWorker *qxl_worker)
@@ -576,20 +631,20 @@ static void qxl_worker_start(QXLWorker *qxl_worker)
static void red_dispatcher_flush_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie)
{
+ RedWorkerMessageFlushSurfacesAsync payload;
RedWorkerMessage message = RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC;
- AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie);
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &cmd, sizeof(cmd));
+ payload.base.cmd = async_command_alloc(dispatcher, message, cookie);
+ dispatcher_send_message(&dispatcher->dispatcher, message, &payload);
}
static void red_dispatcher_stop(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+ RedWorkerMessageStop payload;
- write_message(dispatcher->channel, &message);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_STOP,
+ &payload);
}
static void qxl_worker_stop(QXLWorker *qxl_worker)
@@ -601,14 +656,14 @@ static void red_dispatcher_loadvm_commands(RedDispatcher *dispatcher,
struct QXLCommandExt *ext,
uint32_t count)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_LOADVM_COMMANDS;
+ RedWorkerMessageLoadvmCommands payload;
red_printf("");
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &count, sizeof(uint32_t));
- send_data(dispatcher->channel, ext, sizeof(QXLCommandExt) * count);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ payload.count = count;
+ payload.ext = ext;
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_LOADVM_COMMANDS,
+ &payload);
}
static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
@@ -640,37 +695,44 @@ static inline int calc_compression_level(void)
void red_dispatcher_on_ic_change(void)
{
+ RedWorkerMessageSetCompression payload;
int compression_level = calc_compression_level();
RedDispatcher *now = dispatchers;
+
while (now) {
- RedWorkerMessage message = RED_WORKER_MESSAGE_SET_COMPRESSION;
now->qxl->st->qif->set_compression_level(now->qxl, compression_level);
- write_message(now->channel, &message);
- send_data(now->channel, &image_compression, sizeof(spice_image_compression_t));
+ payload.image_compression = image_compression;
+ dispatcher_send_message(&now->dispatcher,
+ RED_WORKER_MESSAGE_SET_COMPRESSION,
+ &payload);
now = now->next;
}
}
void red_dispatcher_on_sv_change(void)
{
+ RedWorkerMessageSetStreamingVideo payload;
int compression_level = calc_compression_level();
RedDispatcher *now = dispatchers;
while (now) {
- RedWorkerMessage message = RED_WORKER_MESSAGE_SET_STREAMING_VIDEO;
now->qxl->st->qif->set_compression_level(now->qxl, compression_level);
- write_message(now->channel, &message);
- send_data(now->channel, &streaming_video, sizeof(uint32_t));
+ payload.streaming_video = streaming_video;
+ dispatcher_send_message(&now->dispatcher,
+ RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
+ &payload);
now = now->next;
}
}
void red_dispatcher_set_mouse_mode(uint32_t mode)
{
+ RedWorkerMessageSetMouseMode payload;
RedDispatcher *now = dispatchers;
while (now) {
- RedWorkerMessage message = RED_WORKER_MESSAGE_SET_MOUSE_MODE;
- write_message(now->channel, &message);
- send_data(now->channel, &mode, sizeof(uint32_t));
+ payload.mode = mode;
+ dispatcher_send_message(&now->dispatcher,
+ RED_WORKER_MESSAGE_SET_MOUSE_MODE,
+ &payload);
now = now->next;
}
}
@@ -873,34 +935,31 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher,
static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE;
+ RedWorkerMessageDisplayChannelCreate payload;
RedChannel *display_channel;
- write_message(dispatcher->channel, &message);
-
- receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *));
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+ &payload);
+ receive_data(dispatcher->dispatcher.send_fd, &display_channel, sizeof(RedChannel *));
return display_channel;
}
static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher)
{
- RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE;
+ RedWorkerMessageCursorChannelCreate payload;
RedChannel *cursor_channel;
- write_message(dispatcher->channel, &message);
-
- receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *));
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
+ dispatcher_send_message(&dispatcher->dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+ &payload);
+ receive_data(dispatcher->dispatcher.send_fd, &cursor_channel, sizeof(RedChannel *));
return cursor_channel;
}
RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
{
- RedDispatcher *dispatcher;
- int channels[2];
+ RedDispatcher *red_dispatcher;
RedWorkerMessage message;
WorkerInitData init_data;
QXLDevInitInfo init_info;
@@ -917,45 +976,41 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
gl_canvas_init();
#endif // USE_OPENGL
- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
- red_error("socketpair failed %s", strerror(errno));
- }
-
- dispatcher = spice_new0(RedDispatcher, 1);
- dispatcher->channel = channels[0];
- ring_init(&dispatcher->async_commands);
- DBG_ASYNC("dispatcher->async_commands.next %p", dispatcher->async_commands.next);
- init_data.qxl = dispatcher->qxl = qxl;
+ red_dispatcher = spice_new0(RedDispatcher, 1);
+ ring_init(&red_dispatcher->async_commands);
+ DBG_ASYNC("red_dispatcher->async_commands.next %p", red_dispatcher->async_commands.next);
+ dispatcher_init(&red_dispatcher->dispatcher, RED_WORKER_MESSAGE_COUNT, NULL);
+ init_data.qxl = red_dispatcher->qxl = qxl;
init_data.id = qxl->id;
- init_data.channel = channels[1];
- init_data.pending = &dispatcher->pending;
+ init_data.red_dispatcher = red_dispatcher;
+ init_data.pending = &red_dispatcher->pending;
init_data.num_renderers = num_renderers;
memcpy(init_data.renderers, renderers, sizeof(init_data.renderers));
- pthread_mutex_init(&dispatcher->async_lock, NULL);
+ pthread_mutex_init(&red_dispatcher->async_lock, NULL);
init_data.image_compression = image_compression;
init_data.jpeg_state = jpeg_state;
init_data.zlib_glz_state = zlib_glz_state;
init_data.streaming_video = streaming_video;
- dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR;
- dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR;
- dispatcher->base.wakeup = qxl_worker_wakeup;
- dispatcher->base.oom = qxl_worker_oom;
- dispatcher->base.start = qxl_worker_start;
- dispatcher->base.stop = qxl_worker_stop;
- dispatcher->base.update_area = qxl_worker_update_area;
- dispatcher->base.add_memslot = qxl_worker_add_memslot;
- dispatcher->base.del_memslot = qxl_worker_del_memslot;
- dispatcher->base.reset_memslots = qxl_worker_reset_memslots;
- dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces;
- dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface;
- dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface;
-
- dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache;
- dispatcher->base.reset_cursor = qxl_worker_reset_cursor;
- dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait;
- dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands;
+ red_dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR;
+ red_dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR;
+ red_dispatcher->base.wakeup = qxl_worker_wakeup;
+ red_dispatcher->base.oom = qxl_worker_oom;
+ red_dispatcher->base.start = qxl_worker_start;
+ red_dispatcher->base.stop = qxl_worker_stop;
+ red_dispatcher->base.update_area = qxl_worker_update_area;
+ red_dispatcher->base.add_memslot = qxl_worker_add_memslot;
+ red_dispatcher->base.del_memslot = qxl_worker_del_memslot;
+ red_dispatcher->base.reset_memslots = qxl_worker_reset_memslots;
+ red_dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces;
+ red_dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface;
+ red_dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface;
+
+ red_dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache;
+ red_dispatcher->base.reset_cursor = qxl_worker_reset_cursor;
+ red_dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait;
+ red_dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands;
qxl->st->qif->get_init_info(qxl, &init_info);
@@ -965,7 +1020,6 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
init_data.num_memslots_groups = init_info.num_memslots_groups;
init_data.internal_groupslot_id = init_info.internal_groupslot_id;
init_data.n_surfaces = init_info.n_surfaces;
- init_data.dispatcher = dispatcher;
num_active_workers = 1;
@@ -974,40 +1028,51 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
sigdelset(&thread_sig_mask, SIGFPE);
sigdelset(&thread_sig_mask, SIGSEGV);
pthread_sigmask(SIG_SETMASK, &thread_sig_mask, &curr_sig_mask);
- if ((r = pthread_create(&dispatcher->worker_thread, NULL, red_worker_main, &init_data))) {
+ if ((r = pthread_create(&red_dispatcher->worker_thread, NULL, red_worker_main, &init_data))) {
red_error("create thread failed %d", r);
}
pthread_sigmask(SIG_SETMASK, &curr_sig_mask, NULL);
- read_message(dispatcher->channel, &message);
+ read_message(red_dispatcher->dispatcher.send_fd, &message);
ASSERT(message == RED_WORKER_MESSAGE_READY);
- display_channel = red_dispatcher_display_channel_create(dispatcher);
+ display_channel = red_dispatcher_display_channel_create(red_dispatcher);
if (display_channel) {
client_cbs.connect = red_dispatcher_set_display_peer;
client_cbs.disconnect = red_dispatcher_disconnect_display_peer;
client_cbs.migrate = red_dispatcher_display_migrate;
red_channel_register_client_cbs(display_channel, &client_cbs);
- red_channel_set_data(display_channel, dispatcher);
+ red_channel_set_data(display_channel, red_dispatcher);
reds_register_channel(display_channel);
}
- cursor_channel = red_dispatcher_cursor_channel_create(dispatcher);
+ cursor_channel = red_dispatcher_cursor_channel_create(red_dispatcher);
if (cursor_channel) {
client_cbs.connect = red_dispatcher_set_cursor_peer;
client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer;
client_cbs.migrate = red_dispatcher_cursor_migrate;
red_channel_register_client_cbs(cursor_channel, &client_cbs);
- red_channel_set_data(cursor_channel, dispatcher);
+ red_channel_set_data(cursor_channel, red_dispatcher);
reds_register_channel(cursor_channel);
}
- qxl->st->qif->attache_worker(qxl, &dispatcher->base);
+ qxl->st->qif->attache_worker(qxl, &red_dispatcher->base);
qxl->st->qif->set_compression_level(qxl, calc_compression_level());
- dispatcher->next = dispatchers;
- dispatchers = dispatcher;
- return dispatcher;
+ red_dispatcher->next = dispatchers;
+ dispatchers = red_dispatcher;
+ return red_dispatcher;
+}
+
+struct Dispatcher *red_dispatcher_get_dispatcher(RedDispatcher *red_dispatcher)
+{
+ return &red_dispatcher->dispatcher;
+}
+
+void red_dispatcher_set_dispatcher_opaque(struct RedDispatcher *red_dispatcher,
+ void *opaque)
+{
+ dispatcher_set_opaque(&red_dispatcher->dispatcher, opaque);
}
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index c2582f4..7417aac 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -32,5 +32,154 @@ int red_dispatcher_add_renderer(const char *name);
uint32_t red_dispatcher_qxl_ram_size(void);
int red_dispatcher_qxl_count(void);
void red_dispatcher_async_complete(struct RedDispatcher *, AsyncCommand *);
+struct Dispatcher *red_dispatcher_get_dispatcher(struct RedDispatcher *);
+
+typedef struct RedWorkerMessageDisplayConnect {
+ RedClient * client;
+ RedsStream * stream;
+ int migration;
+} RedWorkerMessageDisplayConnect;
+
+typedef struct RedWorkerMessageDisplayDisconnect {
+ RedChannelClient *rcc;
+} RedWorkerMessageDisplayDisconnect;
+
+typedef struct RedWorkerMessageDisplayMigrate {
+ RedChannelClient *rcc;
+} RedWorkerMessageDisplayMigrate;
+
+typedef struct RedWorkerMessageCursorConnect {
+ RedClient *client;
+ RedsStream *stream;
+ int migration;
+} RedWorkerMessageCursorConnect;
+
+typedef struct RedWorkerMessageCursorDisconnect {
+ RedChannelClient *rcc;
+} RedWorkerMessageCursorDisconnect;
+
+typedef struct RedWorkerMessageCursorMigrate {
+ RedChannelClient *rcc;
+} RedWorkerMessageCursorMigrate;
+
+typedef struct RedWorkerMessageUpdate {
+ uint32_t surface_id;
+ QXLRect * qxl_area;
+ QXLRect * qxl_dirty_rects;
+ uint32_t num_dirty_rects;
+ uint32_t clear_dirty_region;
+} RedWorkerMessageUpdate;
+
+typedef struct RedWorkerMessageAsync {
+ AsyncCommand *cmd;
+} RedWorkerMessageAsync;
+
+typedef struct RedWorkerMessageUpdateAsync {
+ RedWorkerMessageAsync base;
+ uint32_t surface_id;
+ QXLRect qxl_area;
+ uint32_t clear_dirty_region;
+} RedWorkerMessageUpdateAsync;
+
+typedef struct RedWorkerMessageAddMemslot {
+ QXLDevMemSlot mem_slot;
+} RedWorkerMessageAddMemslot;
+
+typedef struct RedWorkerMessageAddMemslotAsync {
+ RedWorkerMessageAsync base;
+ QXLDevMemSlot mem_slot;
+} RedWorkerMessageAddMemslotAsync;
+
+typedef struct RedWorkerMessageDelMemslot {
+ uint32_t slot_group_id;
+ uint32_t slot_id;
+} RedWorkerMessageDelMemslot;
+
+typedef struct RedWorkerMessageDestroySurfaces {
+} RedWorkerMessageDestroySurfaces;
+
+typedef struct RedWorkerMessageDestroySurfacesAsync {
+ RedWorkerMessageAsync base;
+} RedWorkerMessageDestroySurfacesAsync;
+
+
+typedef struct RedWorkerMessageDestroyPrimarySurface {
+ uint32_t surface_id;
+} RedWorkerMessageDestroyPrimarySurface;
+
+typedef struct RedWorkerMessageDestroyPrimarySurfaceAsync {
+ RedWorkerMessageAsync base;
+ uint32_t surface_id;
+} RedWorkerMessageDestroyPrimarySurfaceAsync;
+
+typedef struct RedWorkerMessageCreatePrimarySurfaceAsync {
+ RedWorkerMessageAsync base;
+ uint32_t surface_id;
+ QXLDevSurfaceCreate surface;
+} RedWorkerMessageCreatePrimarySurfaceAsync;
+
+typedef struct RedWorkerMessageCreatePrimarySurface {
+ uint32_t surface_id;
+ QXLDevSurfaceCreate surface;
+} RedWorkerMessageCreatePrimarySurface;
+
+typedef struct RedWorkerMessageResetImageCache {
+} RedWorkerMessageResetImageCache;
+
+typedef struct RedWorkerMessageResetCursor {
+} RedWorkerMessageResetCursor;
+
+typedef struct RedWorkerMessageWakeup {
+} RedWorkerMessageWakeup;
+
+typedef struct RedWorkerMessageOom {
+} RedWorkerMessageOom;
+
+typedef struct RedWorkerMessageStart {
+} RedWorkerMessageStart;
+
+typedef struct RedWorkerMessageFlushSurfacesAsync {
+ RedWorkerMessageAsync base;
+} RedWorkerMessageFlushSurfacesAsync;
+
+typedef struct RedWorkerMessageStop {
+} RedWorkerMessageStop;
+
+/* this command is sync, so it's ok to pass a pointer */
+typedef struct RedWorkerMessageLoadvmCommands {
+ uint32_t count;
+ QXLCommandExt *ext;
+} RedWorkerMessageLoadvmCommands;
+
+typedef struct RedWorkerMessageSetCompression {
+ spice_image_compression_t image_compression;
+} RedWorkerMessageSetCompression;
+
+typedef struct RedWorkerMessageSetStreamingVideo {
+ uint32_t streaming_video;
+} RedWorkerMessageSetStreamingVideo;
+
+typedef struct RedWorkerMessageSetMouseMode {
+ uint32_t mode;
+} RedWorkerMessageSetMouseMode;
+
+typedef struct RedWorkerMessageDisplayChannelCreate {
+} RedWorkerMessageDisplayChannelCreate;
+
+typedef struct RedWorkerMessageCursorChannelCreate {
+} RedWorkerMessageCursorChannelCreate;
+
+typedef struct RedWorkerMessageDestroySurfaceWait {
+ uint32_t surface_id;
+} RedWorkerMessageDestroySurfaceWait;
+
+typedef struct RedWorkerMessageDestroySurfaceWaitAsync {
+ RedWorkerMessageAsync base;
+ uint32_t surface_id;
+} RedWorkerMessageDestroySurfaceWaitAsync;
+
+typedef struct RedWorkerMessageResetMemslots {
+} RedWorkerMessageResetMemslots;
+
#endif
diff --git a/server/red_worker.c b/server/red_worker.c
index de8a820..cb48f09 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -72,6 +72,7 @@
#include "zlib_encoder.h"
#include "red_channel.h"
#include "red_dispatcher.h"
+#include "dispatcher.h"
#include "main_channel.h"
//#define COMPRESS_STAT
@@ -865,9 +866,10 @@ typedef struct RedWorker {
DisplayChannel *display_channel;
CursorChannel *cursor_channel;
QXLInstance *qxl;
- RedDispatcher *dispatcher;
- int id;
+ RedDispatcher *red_dispatcher;
+
int channel;
+ int id;
int running;
uint32_t *pending;
int epoll;
@@ -10103,21 +10105,19 @@ static void surface_dirty_region_to_rects(RedSurface *surface,
free(dirty_rects);
}
-static inline void handle_dev_update_async(RedWorker *worker)
+void handle_dev_update_async(void *opaque, void *payload)
{
- QXLRect qxl_rect;
+ RedWorker *worker = opaque;
+ RedWorkerMessageUpdateAsync *msg = payload;
SpiceRect rect;
- uint32_t surface_id;
- uint32_t clear_dirty_region;
QXLRect *qxl_dirty_rects;
uint32_t num_dirty_rects;
RedSurface *surface;
+ uint32_t surface_id = msg->surface_id;
+ QXLRect qxl_area = msg->qxl_area;
+ uint32_t clear_dirty_region = msg->clear_dirty_region;
- receive_data(worker->channel, &surface_id, sizeof(uint32_t));
- receive_data(worker->channel, &qxl_rect, sizeof(QXLRect));
- receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t));
-
- red_get_rect_ptr(&rect, &qxl_rect);
+ red_get_rect_ptr(&rect, &qxl_area);
flush_display_commands(worker);
ASSERT(worker->running);
@@ -10140,24 +10140,20 @@ static inline void handle_dev_update_async(RedWorker *worker)
free(qxl_dirty_rects);
}
-static inline void handle_dev_update(RedWorker *worker)
+void handle_dev_update(void *opaque, void *payload)
{
- const QXLRect *qxl_rect;
+ RedWorker *worker = opaque;
+ RedWorkerMessageUpdate *msg = payload;
SpiceRect *rect = spice_new0(SpiceRect, 1);
- QXLRect *qxl_dirty_rects;
RedSurface *surface;
- uint32_t num_dirty_rects;
- uint32_t surface_id;
- uint32_t clear_dirty_region;
-
- receive_data(worker->channel, &surface_id, sizeof(uint32_t));
- receive_data(worker->channel, &qxl_rect, sizeof(QXLRect *));
- receive_data(worker->channel, &qxl_dirty_rects, sizeof(QXLRect *));
- receive_data(worker->channel, &num_dirty_rects, sizeof(uint32_t));
- receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t));
+ uint32_t surface_id = msg->surface_id;
+ const QXLRect *qxl_area = msg->qxl_area;
+ uint32_t num_dirty_rects = msg->num_dirty_rects;
+ QXLRect *qxl_dirty_rects = msg->qxl_dirty_rects;
+ uint32_t clear_dirty_region = msg->clear_dirty_region;
surface = &worker->surfaces[surface_id];
- red_get_rect_ptr(rect, qxl_rect);
+ red_get_rect_ptr(rect, qxl_area);
flush_display_commands(worker);
ASSERT(worker->running);
@@ -10170,28 +10166,36 @@ static inline void handle_dev_update(RedWorker *worker)
clear_dirty_region);
}
-static inline void handle_dev_add_memslot(RedWorker *worker)
+static void dev_add_memslot(RedWorker *worker, QXLDevMemSlot mem_slot)
{
- QXLDevMemSlot dev_slot;
+ red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id,
+ mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end,
+ mem_slot.generation);
+}
- receive_data(worker->channel, &dev_slot, sizeof(QXLDevMemSlot));
+void handle_dev_add_memslot(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+ RedWorkerMessageAddMemslot *msg = payload;
+ QXLDevMemSlot mem_slot = msg->mem_slot;
- red_memslot_info_add_slot(&worker->mem_slots, dev_slot.slot_group_id, dev_slot.slot_id,
- dev_slot.addr_delta, dev_slot.virt_start, dev_slot.virt_end,
- dev_slot.generation);
+ red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id,
+ mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end,
+ mem_slot.generation);
}
-static inline void handle_dev_del_memslot(RedWorker *worker)
+void handle_dev_del_memslot(void *opaque, void *payload)
{
- uint32_t slot_id;
- uint32_t slot_group_id;
-
- receive_data(worker->channel, &slot_group_id, sizeof(uint32_t));
- receive_data(worker->channel, &slot_id, sizeof(uint32_t));
+ RedWorker *worker = opaque;
+ RedWorkerMessageDelMemslot *msg = payload;
+ uint32_t slot_id = msg->slot_id;
+ uint32_t slot_group_id = msg->slot_group_id;
red_memslot_info_del_slot(&worker->mem_slots, slot_group_id, slot_id);
}
+/* TODO: destroy_surface_wait, dev_destroy_surface_wait - confusing. one asserts
+ * surface_id == 0, maybe move the assert upward and merge the two functions? */
static inline void destroy_surface_wait(RedWorker *worker, int surface_id)
{
if (!worker->surfaces[surface_id].context.canvas) {
@@ -10206,12 +10210,8 @@ static inline void destroy_surface_wait(RedWorker *worker, int surface_id)
red_clear_surface_drawables_from_pipes(worker, surface_id, TRUE, TRUE);
}
-static inline void handle_dev_destroy_surface_wait(RedWorker *worker)
+static void dev_destroy_surface_wait(RedWorker *worker, uint32_t surface_id)
{
- uint32_t surface_id;
-
- receive_data(worker->channel, &surface_id, sizeof(uint32_t));
-
ASSERT(surface_id == 0);
flush_all_qxl_commands(worker);
@@ -10221,6 +10221,14 @@ static inline void handle_dev_destroy_surface_wait(RedWorker *worker)
}
}
+void handle_dev_destroy_surface_wait(void *opaque, void *payload)
+{
+ RedWorkerMessageDestroySurfaceWait *msg = payload;
+ RedWorker *worker = opaque;
+
+ dev_destroy_surface_wait(worker, msg->surface_id);
+}
+
static inline void red_cursor_reset(RedWorker *worker)
{
if (worker->cursor) {
@@ -10244,7 +10252,7 @@ static inline void red_cursor_reset(RedWorker *worker)
/* called upon device reset */
/* TODO: split me*/
-static inline void handle_dev_destroy_surfaces(RedWorker *worker)
+static inline void dev_destroy_surfaces(RedWorker *worker)
{
int i;
@@ -10273,14 +10281,17 @@ static inline void handle_dev_destroy_surfaces(RedWorker *worker)
red_cursor_reset(worker);
}
-static inline void handle_dev_create_primary_surface(RedWorker *worker)
+void handle_dev_destroy_surfaces(void *opaque, void *payload)
{
- uint32_t surface_id;
- QXLDevSurfaceCreate surface;
- uint8_t *line_0;
+ RedWorker *worker = opaque;
- receive_data(worker->channel, &surface_id, sizeof(uint32_t));
- receive_data(worker->channel, &surface, sizeof(QXLDevSurfaceCreate));
+ dev_destroy_surfaces(worker);
+}
+
+static void dev_create_primary_surface(RedWorker *worker, uint32_t surface_id,
+ QXLDevSurfaceCreate surface)
+{
+ uint8_t *line_0;
PANIC_ON(surface_id != 0);
PANIC_ON(surface.height == 0);
@@ -10308,12 +10319,16 @@ static inline void handle_dev_create_primary_surface(RedWorker *worker)
}
}
-static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
+void handle_dev_create_primary_surface(void *opaque, void *payload)
{
- uint32_t surface_id;
+ RedWorkerMessageCreatePrimarySurface *msg = payload;
+ RedWorker *worker = opaque;
- receive_data(worker->channel, &surface_id, sizeof(uint32_t));
+ dev_create_primary_surface(worker, msg->surface_id, msg->surface);
+}
+static void dev_destroy_primary_surface(RedWorker *worker, uint32_t surface_id)
+{
PANIC_ON(surface_id != 0);
if (!worker->surfaces[surface_id].context.canvas) {
@@ -10322,7 +10337,7 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
}
flush_all_qxl_commands(worker);
- destroy_surface_wait(worker, 0);
+ dev_destroy_surface_wait(worker, 0);
red_destroy_surface(worker, 0);
ASSERT(ring_is_empty(&worker->streams));
@@ -10331,6 +10346,24 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker)
red_cursor_reset(worker);
}
+void handle_dev_destroy_primary_surface(void *opaque, void *payload)
+{
+ RedWorkerMessageDestroyPrimarySurface *msg = payload;
+ RedWorker *worker = opaque;
+ uint32_t surface_id = msg->surface_id;
+
+ dev_destroy_primary_surface(worker, surface_id);
+}
+
+void handle_dev_destroy_primary_surface_async(void *opaque, void *payload)
+{
+ RedWorkerMessageDestroyPrimarySurfaceAsync *msg = payload;
+ RedWorker *worker = opaque;
+ uint32_t surface_id = msg->surface_id;
+
+ dev_destroy_primary_surface(worker, surface_id);
+}
+
static void flush_all_surfaces(RedWorker *worker)
{
int x;
@@ -10342,7 +10375,7 @@ static void flush_all_surfaces(RedWorker *worker)
}
}
-static void handle_dev_flush_surfaces(RedWorker *worker)
+static void dev_flush_surfaces(RedWorker *worker)
{
flush_all_qxl_commands(worker);
flush_all_surfaces(worker);
@@ -10350,8 +10383,25 @@ static void handle_dev_flush_surfaces(RedWorker *worker)
red_wait_outgoing_items(&worker->cursor_channel->common.base);
}
-static void handle_dev_stop(RedWorker *worker)
+void handle_dev_flush_surfaces(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+
+ dev_flush_surfaces(worker);
+}
+
+void handle_dev_flush_surfaces_async(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+
+ dev_flush_surfaces(worker);
+}
+
+void handle_dev_stop(void *opaque, void *payload)
{
+ RedWorker *worker = opaque;
+
+ red_printf("stop");
ASSERT(worker->running);
worker->running = FALSE;
red_display_clear_glz_drawables(worker->display_channel);
@@ -10360,8 +10410,9 @@ static void handle_dev_stop(RedWorker *worker)
red_wait_outgoing_items(&worker->cursor_channel->common.base);
}
-static void handle_dev_start(RedWorker *worker)
+void handle_dev_start(void *opaque, void *payload)
{
+ RedWorker *worker = opaque;
RedChannel *cursor_red_channel = &worker->cursor_channel->common.base;
RedChannel *display_red_channel = &worker->display_channel->common.base;
@@ -10375,308 +10426,488 @@ static void handle_dev_start(RedWorker *worker)
worker->running = TRUE;
}
-static void handle_dev_input(EventListener *listener, uint32_t events)
+void handle_dev_wakeup(void *opaque, void *payload)
{
- RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
- RedWorkerMessage message;
+ RedWorker *worker = opaque;
+
+ clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending);
+ stat_inc_counter(worker->wakeup_counter, 1);
+}
+
+void handle_dev_oom(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+
RedChannel *display_red_channel = &worker->display_channel->common.base;
int ring_is_empty;
- int call_async_complete = 0;
- int write_ready = 0;
- AsyncCommand *cmd;
-
- read_message(worker->channel, &message);
-
- /* for async messages we do the common work in the handler, and
- * send a ready or call async_complete from here, hence the added switch. */
- switch (message) {
- case RED_WORKER_MESSAGE_UPDATE_ASYNC:
- case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC:
- case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC:
- case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC:
- case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC:
- case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC:
- case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC:
- call_async_complete = 1;
- receive_data(worker->channel, &cmd, sizeof(cmd));
- break;
- case RED_WORKER_MESSAGE_UPDATE:
- case RED_WORKER_MESSAGE_ADD_MEMSLOT:
- case RED_WORKER_MESSAGE_DESTROY_SURFACES:
- case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE:
- case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
- case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT:
- case RED_WORKER_MESSAGE_RESET_CURSOR:
- case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
- case RED_WORKER_MESSAGE_STOP:
- case RED_WORKER_MESSAGE_LOADVM_COMMANDS:
- case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE:
- case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE:
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
- write_ready = 1;
- default:
- break;
- }
- switch (message) {
- case RED_WORKER_MESSAGE_UPDATE_ASYNC:
- handle_dev_update_async(worker);
- break;
- case RED_WORKER_MESSAGE_UPDATE:
- handle_dev_update(worker);
- break;
- case RED_WORKER_MESSAGE_WAKEUP:
- clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending);
- stat_inc_counter(worker->wakeup_counter, 1);
- break;
- case RED_WORKER_MESSAGE_OOM:
- ASSERT(worker->running);
- // streams? but without streams also leak
- red_printf_debug(1, "WORKER",
- "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
- worker->drawable_count,
- worker->red_drawable_count,
- worker->glz_drawable_count,
- worker->current_size,
- worker->display_channel ?
- red_channel_sum_pipes_size(display_red_channel) : 0);
- while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) {
- red_channel_push(&worker->display_channel->common.base);
- }
- if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) {
- red_free_some(worker);
- worker->qxl->st->qif->flush_resources(worker->qxl);
- }
- red_printf_debug(1, "WORKER",
- "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
- worker->drawable_count,
- worker->red_drawable_count,
- worker->glz_drawable_count,
- worker->current_size,
- worker->display_channel ?
- red_channel_sum_pipes_size(display_red_channel) : 0);
- clear_bit(RED_WORKER_PENDING_OOM, worker->pending);
- break;
- case RED_WORKER_MESSAGE_RESET_CURSOR:
- red_cursor_reset(worker);
- break;
- case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
- image_cache_reset(&worker->image_cache);
- break;
- case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC:
- case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT:
- handle_dev_destroy_surface_wait(worker);
- break;
- case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC:
- case RED_WORKER_MESSAGE_DESTROY_SURFACES:
- handle_dev_destroy_surfaces(worker);
- break;
- case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC:
- case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE:
- handle_dev_create_primary_surface(worker);
- break;
- case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC:
- case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
- handle_dev_destroy_primary_surface(worker);
- break;
- case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: {
- RedChannel *red_channel;
- // TODO: handle seemless migration. Temp, setting migrate to FALSE
- ensure_display_channel_created(worker, FALSE);
- red_channel = &worker->display_channel->common.base;
- send_data(worker->channel, &red_channel, sizeof(RedChannel *));
- break;
+ ASSERT(worker->running);
+ // streams? but without streams also leak
+ red_printf_debug(1, "WORKER",
+ "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
+ worker->drawable_count,
+ worker->red_drawable_count,
+ worker->glz_drawable_count,
+ worker->current_size,
+ worker->display_channel ?
+ red_channel_sum_pipes_size(display_red_channel) : 0);
+ while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) {
+ red_channel_push(&worker->display_channel->common.base);
}
- case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
- RedsStream *stream;
- RedClient *client;
- int migrate;
- red_printf("connect");
-
- receive_data(worker->channel, &client, sizeof(RedClient *));
- receive_data(worker->channel, &stream, sizeof(RedsStream *));
- receive_data(worker->channel, &migrate, sizeof(int));
- handle_new_display_channel(worker, client, stream, migrate);
- break;
+ if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) {
+ red_free_some(worker);
+ worker->qxl->st->qif->flush_resources(worker->qxl);
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: {
- RedChannelClient *rcc;
+ red_printf_debug(1, "WORKER",
+ "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u",
+ worker->drawable_count,
+ worker->red_drawable_count,
+ worker->glz_drawable_count,
+ worker->current_size,
+ worker->display_channel ?
+ red_channel_sum_pipes_size(display_red_channel) : 0);
+ clear_bit(RED_WORKER_PENDING_OOM, worker->pending);
+}
- red_printf("disconnect display client");
- receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
- ASSERT(rcc);
- display_channel_client_disconnect(rcc);
- break;
- }
- case RED_WORKER_MESSAGE_STOP: {
- red_printf("stop");
- handle_dev_stop(worker);
+void handle_dev_reset_cursor(void *opaque, void *payload)
+{
+ red_cursor_reset((RedWorker *)opaque);
+}
+
+void handle_dev_reset_image_cache(void *opaque, void *payload)
+{
+ image_cache_reset(&((RedWorker *)opaque)->image_cache);
+}
+
+void handle_dev_destroy_surface_wait_async(void *opaque, void *payload)
+{
+ RedWorkerMessageDestroySurfaceWaitAsync *msg = payload;
+ RedWorker *worker = opaque;
+
+ dev_destroy_surface_wait(worker, msg->surface_id);
+}
+
+void handle_dev_destroy_surfaces_async(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+
+ dev_destroy_surfaces(worker);
+}
+
+void handle_dev_create_primary_surface_async(void *opaque, void *payload)
+{
+ RedWorkerMessageCreatePrimarySurfaceAsync *msg = payload;
+ RedWorker *worker = opaque;
+
+ dev_create_primary_surface(worker, msg->surface_id, msg->surface);
+}
+
+/* exception for Dispatcher, data going from red_worker to main thread,
+ * TODO: use a different dispatcher?
+ * TODO: leave direct usage of channel(fd)? It's only used right after the
+ * pthread is created, since the channel duration is the lifetime of the spice
+ * server. */
+
+void handle_dev_display_channel_create(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_display_channel_created(worker, FALSE);
+ red_channel = &worker->display_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+}
+
+void handle_dev_display_connect(void *opaque, void *payload)
+{
+ RedWorkerMessageDisplayConnect *msg = payload;
+ RedWorker *worker = opaque;
+ RedsStream *stream = msg->stream;
+ RedClient *client = msg->client;
+ int migration = msg->migration;
+
+ red_printf("connect");
+ handle_new_display_channel(worker, client, stream, migration);
+}
+
+void handle_dev_display_disconnect(void *opaque, void *payload)
+{
+ RedWorkerMessageDisplayDisconnect *msg = payload;
+ RedChannelClient *rcc = msg->rcc;
+
+ red_printf("disconnect display client");
+ ASSERT(rcc);
+ display_channel_client_disconnect(rcc);
+}
+
+void handle_dev_display_migrate(void *opaque, void *payload)
+{
+ RedWorkerMessageDisplayMigrate *msg = payload;
+ RedWorker *worker = opaque;
+
+ RedChannelClient *rcc = msg->rcc;
+ red_printf("migrate display client");
+ ASSERT(rcc);
+ red_migrate_display(worker, rcc);
+}
+
+/* TODO: special, perhaps use another dispatcher? */
+void handle_dev_cursor_channel_create(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+ RedChannel *red_channel;
+
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_cursor_channel_created(worker, FALSE);
+ red_channel = &worker->cursor_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+}
+
+void handle_dev_cursor_connect(void *opaque, void *payload)
+{
+ RedWorkerMessageCursorConnect *msg = payload;
+ RedWorker *worker = opaque;
+ RedsStream *stream = msg->stream;
+ RedClient *client = msg->client;
+ int migration = msg->migration;
+
+ red_printf("cursor connect");
+ red_connect_cursor(worker, client, stream, migration);
+}
+
+void handle_dev_cursor_disconnect(void *opaque, void *payload)
+{
+ RedWorkerMessageCursorDisconnect *msg = payload;
+ RedChannelClient *rcc = msg->rcc;
+
+ red_printf("disconnect cursor client");
+ ASSERT(rcc);
+ cursor_channel_client_disconnect(rcc);
+}
+
+void handle_dev_cursor_migrate(void *opaque, void *payload)
+{
+ RedWorkerMessageCursorMigrate *msg = payload;
+ RedWorker *worker = opaque;
+ RedChannelClient *rcc = msg->rcc;
+
+ red_printf("migrate cursor client");
+ ASSERT(rcc);
+ red_migrate_cursor(worker, rcc);
+}
+
+void handle_dev_set_compression(void *opaque, void *payload)
+{
+ RedWorkerMessageSetCompression *msg = payload;
+ RedWorker *worker = opaque;
+
+ worker->image_compression = msg->image_compression;
+ switch (worker->image_compression) {
+ case SPICE_IMAGE_COMPRESS_AUTO_LZ:
+ red_printf("ic auto_lz");
break;
- }
- case RED_WORKER_MESSAGE_START:
- red_printf("start");
- handle_dev_start(worker);
+ case SPICE_IMAGE_COMPRESS_AUTO_GLZ:
+ red_printf("ic auto_glz");
break;
- case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: {
- RedChannelClient *rcc;
- red_printf("migrate display client");
- receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
- ASSERT(rcc);
- red_migrate_display(worker, rcc);
+ case SPICE_IMAGE_COMPRESS_QUIC:
+ red_printf("ic quic");
break;
- }
- case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: {
- RedChannel *red_channel;
- // TODO: handle seemless migration. Temp, setting migrate to FALSE
- ensure_cursor_channel_created(worker, FALSE);
- red_channel = &worker->cursor_channel->common.base;
- send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+ case SPICE_IMAGE_COMPRESS_LZ:
+ red_printf("ic lz");
break;
- }
- case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
- RedsStream *stream;
- RedClient *client;
- int migrate;
-
- red_printf("cursor connect");
- receive_data(worker->channel, &client, sizeof(RedClient *));
- receive_data(worker->channel, &stream, sizeof(RedsStream *));
- receive_data(worker->channel, &migrate, sizeof(int));
- red_connect_cursor(worker, client, stream, migrate);
+ case SPICE_IMAGE_COMPRESS_GLZ:
+ red_printf("ic glz");
break;
- }
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: {
- RedChannelClient *rcc;
-
- red_printf("disconnect cursor client");
- receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
- ASSERT(rcc);
- cursor_channel_client_disconnect(rcc);
+ case SPICE_IMAGE_COMPRESS_OFF:
+ red_printf("ic off");
break;
+ default:
+ red_printf("ic invalid");
}
- case RED_WORKER_MESSAGE_CURSOR_MIGRATE: {
- RedChannelClient *rcc;
- red_printf("migrate cursor client");
- receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
- ASSERT(rcc);
- red_migrate_cursor(worker, rcc);
- break;
+#ifdef COMPRESS_STAT
+ print_compress_stats(worker->display_channel);
+ if (worker->display_channel) {
+ stat_reset(&worker->display_channel->quic_stat);
+ stat_reset(&worker->display_channel->lz_stat);
+ stat_reset(&worker->display_channel->glz_stat);
+ stat_reset(&worker->display_channel->jpeg_stat);
+ stat_reset(&worker->display_channel->zlib_glz_stat);
+ stat_reset(&worker->display_channel->jpeg_alpha_stat);
}
- case RED_WORKER_MESSAGE_SET_COMPRESSION:
- receive_data(worker->channel, &worker->image_compression,
- sizeof(spice_image_compression_t));
- switch (worker->image_compression) {
- case SPICE_IMAGE_COMPRESS_AUTO_LZ:
- red_printf("ic auto_lz");
- break;
- case SPICE_IMAGE_COMPRESS_AUTO_GLZ:
- red_printf("ic auto_glz");
+#endif
+}
+
+void handle_dev_set_streaming_video(void *opaque, void *payload)
+{
+ RedWorkerMessageSetStreamingVideo *msg = payload;
+ RedWorker *worker = opaque;
+
+ worker->streaming_video = msg->streaming_video;
+ ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID);
+ switch(worker->streaming_video) {
+ case STREAM_VIDEO_ALL:
+ red_printf("sv all");
break;
- case SPICE_IMAGE_COMPRESS_QUIC:
- red_printf("ic quic");
+ case STREAM_VIDEO_FILTER:
+ red_printf("sv filter");
break;
- case SPICE_IMAGE_COMPRESS_LZ:
- red_printf("ic lz");
+ case STREAM_VIDEO_OFF:
+ red_printf("sv off");
break;
- case SPICE_IMAGE_COMPRESS_GLZ:
- red_printf("ic glz");
+ default:
+ red_printf("sv invalid");
+ }
+}
+
+void handle_dev_set_mouse_mode(void *opaque, void *payload)
+{
+ RedWorkerMessageSetMouseMode *msg = payload;
+ RedWorker *worker = opaque;
+
+ worker->mouse_mode = msg->mode;
+ red_printf("mouse mode %u", worker->mouse_mode);
+}
+
+void handle_dev_add_memslot_async(void *opaque, void *payload)
+{
+ RedWorkerMessageAddMemslotAsync *msg = payload;
+ RedWorker *worker = opaque;
+
+ dev_add_memslot(worker, msg->mem_slot);
+}
+
+void handle_dev_reset_memslots(void *opaque, void *payload)
+{
+ RedWorker *worker = opaque;
+
+ red_memslot_info_reset(&worker->mem_slots);
+}
+
+void handle_dev_loadvm_commands(void *opaque, void *payload)
+{
+ RedWorkerMessageLoadvmCommands *msg = payload;
+ RedWorker *worker = opaque;
+ uint32_t i;
+ RedCursorCmd *cursor_cmd;
+ RedSurfaceCmd *surface_cmd;
+ uint32_t count = msg->count;
+ QXLCommandExt *ext = msg->ext;
+
+ red_printf("loadvm_commands");
+ for (i = 0 ; i < count ; ++i) {
+ switch (ext[i].cmd.type) {
+ case QXL_CMD_CURSOR:
+ cursor_cmd = spice_new0(RedCursorCmd, 1);
+ red_get_cursor_cmd(&worker->mem_slots, ext[i].group_id,
+ cursor_cmd, ext[i].cmd.data);
+ qxl_process_cursor(worker, cursor_cmd, ext[i].group_id);
break;
- case SPICE_IMAGE_COMPRESS_OFF:
- red_printf("ic off");
+ case QXL_CMD_SURFACE:
+ surface_cmd = spice_new0(RedSurfaceCmd, 1);
+ red_get_surface_cmd(&worker->mem_slots, ext[i].group_id,
+ surface_cmd, ext[i].cmd.data);
+ red_process_surface(worker, surface_cmd, ext[i].group_id, TRUE);
break;
default:
- red_printf("ic invalid");
- }
-#ifdef COMPRESS_STAT
- print_compress_stats(worker->display_channel);
- if (worker->display_channel) {
- stat_reset(&worker->display_channel->quic_stat);
- stat_reset(&worker->display_channel->lz_stat);
- stat_reset(&worker->display_channel->glz_stat);
- stat_reset(&worker->display_channel->jpeg_stat);
- stat_reset(&worker->display_channel->zlib_glz_stat);
- stat_reset(&worker->display_channel->jpeg_alpha_stat);
- }
-#endif
- break;
- case RED_WORKER_MESSAGE_SET_STREAMING_VIDEO:
- receive_data(worker->channel, &worker->streaming_video, sizeof(uint32_t));
- ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID);
- switch(worker->streaming_video) {
- case STREAM_VIDEO_ALL:
- red_printf("sv all");
- break;
- case STREAM_VIDEO_FILTER:
- red_printf("sv filter");
- break;
- case STREAM_VIDEO_OFF:
- red_printf("sv off");
- break;
- default:
- red_printf("sv invalid");
- }
- break;
- case RED_WORKER_MESSAGE_SET_MOUSE_MODE:
- receive_data(worker->channel, &worker->mouse_mode, sizeof(uint32_t));
- red_printf("mouse mode %u", worker->mouse_mode);
- break;
- case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC:
- case RED_WORKER_MESSAGE_ADD_MEMSLOT:
- handle_dev_add_memslot(worker);
- break;
- case RED_WORKER_MESSAGE_DEL_MEMSLOT:
- handle_dev_del_memslot(worker);
- break;
- case RED_WORKER_MESSAGE_RESET_MEMSLOTS:
- red_memslot_info_reset(&worker->mem_slots);
- break;
- case RED_WORKER_MESSAGE_LOADVM_COMMANDS: {
- uint32_t count;
- QXLCommandExt ext;
- RedCursorCmd *cursor_cmd;
- RedSurfaceCmd *surface_cmd;
-
- red_printf("loadvm_commands");
- receive_data(worker->channel, &count, sizeof(uint32_t));
- while (count > 0) {
- receive_data(worker->channel, &ext, sizeof(QXLCommandExt));
- switch (ext.cmd.type) {
- case QXL_CMD_CURSOR:
- cursor_cmd = spice_new0(RedCursorCmd, 1);
- red_get_cursor_cmd(&worker->mem_slots, ext.group_id,
- cursor_cmd, ext.cmd.data);
- qxl_process_cursor(worker, cursor_cmd, ext.group_id);
- break;
- case QXL_CMD_SURFACE:
- surface_cmd = spice_new0(RedSurfaceCmd, 1);
- red_get_surface_cmd(&worker->mem_slots, ext.group_id,
- surface_cmd, ext.cmd.data);
- red_process_surface(worker, surface_cmd, ext.group_id, TRUE);
- break;
- default:
- red_printf("unhandled loadvm command type (%d)", ext.cmd.type);
- break;
- }
- count--;
+ red_printf("unhandled loadvm command type (%d)", ext[i].cmd.type);
+ break;
}
- break;
- }
- case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC:
- handle_dev_flush_surfaces(worker);
- break;
- default:
- red_error("message error");
- }
- if (call_async_complete) {
- red_dispatcher_async_complete(worker->dispatcher, cmd);
- }
- if (write_ready) {
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
}
}
+static void worker_handle_dispatcher_async_done(void *opaque,
+ uint32_t message_type,
+ void *payload)
+{
+ RedWorker *worker = opaque;
+ RedWorkerMessageAsync *msg_async = payload;
+
+ red_printf_debug(2, "WORKER", "");
+ red_dispatcher_async_complete(worker->red_dispatcher, msg_async->cmd);
+}
+
+static void register_callbacks(Dispatcher *dispatcher)
+{
+ dispatcher_register_async_done_callback(
+ dispatcher,
+ worker_handle_dispatcher_async_done);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_CONNECT,
+ handle_dev_display_connect,
+ sizeof(RedWorkerMessageDisplayConnect),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+ handle_dev_display_disconnect,
+ sizeof(RedWorkerMessageDisplayDisconnect),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
+ handle_dev_display_migrate,
+ sizeof(RedWorkerMessageDisplayMigrate),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_CONNECT,
+ handle_dev_cursor_connect,
+ sizeof(RedWorkerMessageCursorConnect),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+ handle_dev_cursor_disconnect,
+ sizeof(RedWorkerMessageCursorDisconnect),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_MIGRATE,
+ handle_dev_cursor_migrate,
+ sizeof(RedWorkerMessageCursorMigrate),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_UPDATE,
+ handle_dev_update,
+ sizeof(RedWorkerMessageUpdate),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_UPDATE_ASYNC,
+ handle_dev_update_async,
+ sizeof(RedWorkerMessageUpdateAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_ADD_MEMSLOT,
+ handle_dev_add_memslot,
+ sizeof(RedWorkerMessageAddMemslot),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC,
+ handle_dev_add_memslot_async,
+ sizeof(RedWorkerMessageAddMemslotAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DEL_MEMSLOT,
+ handle_dev_del_memslot,
+ sizeof(RedWorkerMessageDelMemslot),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_SURFACES,
+ handle_dev_destroy_surfaces,
+ sizeof(RedWorkerMessageDestroySurfaces),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC,
+ handle_dev_destroy_surfaces_async,
+ sizeof(RedWorkerMessageDestroySurfacesAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
+ handle_dev_destroy_primary_surface,
+ sizeof(RedWorkerMessageDestroyPrimarySurface),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC,
+ handle_dev_destroy_primary_surface_async,
+ sizeof(RedWorkerMessageDestroyPrimarySurfaceAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC,
+ handle_dev_create_primary_surface_async,
+ sizeof(RedWorkerMessageCreatePrimarySurfaceAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
+ handle_dev_create_primary_surface,
+ sizeof(RedWorkerMessageCreatePrimarySurface),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
+ handle_dev_reset_image_cache,
+ sizeof(RedWorkerMessageResetImageCache),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_RESET_CURSOR,
+ handle_dev_reset_cursor,
+ sizeof(RedWorkerMessageResetCursor),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_WAKEUP,
+ handle_dev_wakeup,
+ sizeof(RedWorkerMessageWakeup),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_OOM,
+ handle_dev_oom,
+ sizeof(RedWorkerMessageOom),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_START,
+ handle_dev_start,
+ sizeof(RedWorkerMessageStart),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
+ handle_dev_flush_surfaces_async,
+ sizeof(RedWorkerMessageFlushSurfacesAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_STOP,
+ handle_dev_stop,
+ sizeof(RedWorkerMessageStop),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_LOADVM_COMMANDS,
+ handle_dev_loadvm_commands,
+ sizeof(RedWorkerMessageLoadvmCommands),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_SET_COMPRESSION,
+ handle_dev_set_compression,
+ sizeof(RedWorkerMessageSetCompression),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
+ handle_dev_set_streaming_video,
+ sizeof(RedWorkerMessageSetStreamingVideo),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_SET_MOUSE_MODE,
+ handle_dev_set_mouse_mode,
+ sizeof(RedWorkerMessageSetMouseMode),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+ handle_dev_display_channel_create,
+ sizeof(RedWorkerMessageDisplayChannelCreate),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+ handle_dev_cursor_channel_create,
+ sizeof(RedWorkerMessageCursorChannelCreate),
+ DISPATCHER_NONE);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
+ handle_dev_destroy_surface_wait,
+ sizeof(RedWorkerMessageDestroySurfaceWait),
+ DISPATCHER_ACK);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
+ handle_dev_destroy_surface_wait_async,
+ sizeof(RedWorkerMessageDestroySurfaceWaitAsync),
+ DISPATCHER_ASYNC);
+ dispatcher_register_handler(dispatcher,
+ RED_WORKER_MESSAGE_RESET_MEMSLOTS,
+ handle_dev_reset_memslots,
+ sizeof(RedWorkerMessageResetMemslots),
+ DISPATCHER_NONE);
+}
+
+
+
+static void handle_dev_input(EventListener *listener, uint32_t events)
+{
+ RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
+
+ dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher));
+}
+
static void handle_dev_free(EventListener *ctx)
{
free(ctx);
@@ -10687,14 +10918,18 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
struct epoll_event event;
RedWorkerMessage message;
int epoll;
+ Dispatcher *dispatcher;
ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE);
memset(worker, 0, sizeof(RedWorker));
- worker->dispatcher = init_data->dispatcher;
+ dispatcher = red_dispatcher_get_dispatcher(init_data->red_dispatcher);
+ dispatcher_set_opaque(dispatcher, worker);
+ worker->red_dispatcher = init_data->red_dispatcher;
worker->qxl = init_data->qxl;
worker->id = init_data->id;
- worker->channel = init_data->channel;
+ worker->channel = dispatcher_get_recv_fd(dispatcher);
+ register_callbacks(dispatcher);
worker->pending = init_data->pending;
worker->dev_listener.refs = 1;
worker->dev_listener.action = handle_dev_input;
diff --git a/server/red_worker.h b/server/red_worker.h
index 26c43ad..08c7b22 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -84,6 +84,8 @@ enum {
RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
+
+ RED_WORKER_MESSAGE_COUNT // LAST
};
typedef uint32_t RedWorkerMessage;
@@ -102,7 +104,6 @@ typedef struct RedDispatcher RedDispatcher;
typedef struct WorkerInitData {
struct QXLInstance *qxl;
int id;
- int channel;
uint32_t *pending;
uint32_t num_renderers;
uint32_t renderers[RED_MAX_RENDERERS];
@@ -116,7 +117,7 @@ typedef struct WorkerInitData {
uint8_t memslot_id_bits;
uint8_t internal_groupslot_id;
uint32_t n_surfaces;
- RedDispatcher *dispatcher;
+ RedDispatcher *red_dispatcher;
} WorkerInitData;
void *red_worker_main(void *arg);
commit ca5776f40e60f5bf5c0bf19c242492c2082d3dfc
Author: Alon Levy <alevy at redhat.com>
Date: Mon Nov 7 12:01:26 2011 +0200
server/dispatcher: add dispatcher_register_async_done_callback
diff --git a/server/dispatcher.c b/server/dispatcher.c
index 81abbdf..90e46e3 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -126,6 +126,9 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
red_printf("error writing ack for message %d\n", type);
/* TODO: close socketpair? */
}
+ } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->handle_async_done) {
+ dispatcher->handle_async_done(dispatcher->opaque, type,
+ (void *)payload);
}
return 1;
}
@@ -174,9 +177,17 @@ unlock:
pthread_mutex_unlock(&dispatcher->lock);
}
+void dispatcher_register_async_done_callback(
+ Dispatcher *dispatcher,
+ dispatcher_handle_async_done handler)
+{
+ assert(dispatcher->handle_async_done == NULL);
+ dispatcher->handle_async_done = handler;
+}
+
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
- dispatcher_handle_message handler, size_t size,
- int ack)
+ dispatcher_handle_message handler,
+ size_t size, int ack)
{
DispatcherMessage *msg;
diff --git a/server/dispatcher.h b/server/dispatcher.h
index 95b6bfc..a468c58 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -8,6 +8,11 @@ typedef struct Dispatcher Dispatcher;
typedef void (*dispatcher_handle_message)(void *opaque,
void *payload);
+typedef void (*dispatcher_handle_async_done)(void *opaque,
+ uint32_t message_type,
+ void *payload);
+
+
typedef struct DispatcherMessage {
size_t size;
int ack;
@@ -26,6 +31,7 @@ struct Dispatcher {
void *payload; /* allocated as max of message sizes */
size_t payload_size; /* used to track realloc calls */
void *opaque;
+ dispatcher_handle_async_done handle_async_done;
};
/*
@@ -69,6 +75,16 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
int ack);
/*
+ * dispatcher_register_async_done_callback
+ * @dispatcher: dispatcher
+ * @handler: callback on the receiver side called *after* the
+ * message callback in case ack == DISPATCHER_ASYNC.
+ */
+void dispatcher_register_async_done_callback(
+ Dispatcher *dispatcher,
+ dispatcher_handle_async_done handler);
+
+/*
* dispatcher_handle_recv_read
* @dispatcher: Dispatcher instance
*/
commit fff04e867c02eb5c3582f17810c1a092ce240fc6
Author: Alon Levy <alevy at redhat.com>
Date: Mon Nov 7 13:27:54 2011 +0200
introduce DISPATCHER_{NONE,ACK,ASYNC}
diff --git a/server/dispatcher.c b/server/dispatcher.c
index 1cd9e43..81abbdf 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -120,10 +120,12 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
} else {
red_printf("error: no handler for message type %d\n", type);
}
- if (msg->ack && write_safe(dispatcher->recv_fd,
- &ack, sizeof(ack)) == -1) {
- red_printf("error writing ack for message %d\n", type);
- /* TODO: close socketpair? */
+ if (msg->ack == DISPATCHER_ACK) {
+ if (write_safe(dispatcher->recv_fd,
+ &ack, sizeof(ack)) == -1) {
+ red_printf("error writing ack for message %d\n", type);
+ /* TODO: close socketpair? */
+ }
}
return 1;
}
@@ -159,7 +161,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
message_type);
goto unlock;
}
- if (msg->ack) {
+ if (msg->ack == DISPATCHER_ACK) {
if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) {
red_printf("error: failed to read ack");
} else if (ack != ACK) {
diff --git a/server/dispatcher.h b/server/dispatcher.h
index 04e6b46..95b6bfc 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -45,13 +45,22 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
void *opaque);
+enum {
+ DISPATCHER_NONE = 0,
+ DISPATCHER_ACK,
+ DISPATCHER_ASYNC
+};
+
/*
* dispatcher_register_handler
- * @dispatcher: dispatcher
+ * @dispatcher: dispatcher
* @messsage_type: message type
* @handler: message handler
* @size: message size. Each type has a fixed associated size.
- * @ack: send an ack. This is per message type - you can't send the
+ * @ack: One of DISPATCHER_NONE, DISPATCHER_ACK, DISPATCHER_ASYNC.
+ * DISPATCHER_NONE - only send the message
+ * DISPATCHER_ACK - send an ack after the message
+ * DISPATCHER_ASYNC - call send an ack. This is per message type - you can't send the
* same message type with and without. Register two different
* messages if that is what you want.
*/
commit 776bdd6c95715dcd8e609dc3ff647d0ad73fd339
Author: Alon Levy <alevy at redhat.com>
Date: Sun Oct 30 17:04:59 2011 +0200
server: introduce dispatcher
used for main_dispatcher only in this patch.
Dispatcher is meant to be used for Main<->any low frequency messages.
It's interface is meant to include the red_dispatcher usage:
fixed size messages per message type
some messages require an ack
Some methods are added to be used by RedDispatcher later:
dispatcher_handle_read - to be called directly by RedDispatcher epoll
based loop
dispatcher_set_opaque - to be set from red_worker pthread
dispatcher_init - allow NULL core as used by red_worker
Read and Write behavior:
Sender: blocking write, blocking read for ack (if any).
Reader: poll for any data, if such then blocking read for a
message_type and following message. repeat until poll returns
with no pending data to read.
FDO Bugzilla: 42463
diff --git a/server/Makefile.am b/server/Makefile.am
index 418d707..34a6b47 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -78,6 +78,8 @@ libspice_server_la_SOURCES = \
red_client_cache.h \
red_client_shared_cache.h \
red_common.h \
+ dispatcher.c \
+ dispatcher.h \
red_dispatcher.c \
red_dispatcher.h \
main_dispatcher.c \
diff --git a/server/dispatcher.c b/server/dispatcher.c
new file mode 100644
index 0000000..1cd9e43
--- /dev/null
+++ b/server/dispatcher.c
@@ -0,0 +1,248 @@
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include "mem.h"
+#include "spice_common.h"
+#include "dispatcher.h"
+
+#define DISPATCHER_DEBUG_PRINTF(level, ...) \
+ red_printf_debug(level, "DISP", ##__VA_ARGS__)
+
+//#define DEBUG_DISPATCHER
+
+#ifdef DEBUG_DISPATCHER
+#include <signal.h>
+#endif
+
+#define ACK 0xffffffff
+
+/*
+ * read_safe
+ * helper. reads until size bytes accumulated in buf, if an error other then
+ * EINTR is encountered returns -1, otherwise returns 0.
+ * @block if 1 the read will block (the fd is always blocking).
+ * if 0 poll first, return immediately if no bytes available, otherwise
+ * read size in blocking mode.
+ */
+static int read_safe(int fd, void *buf, size_t size, int block)
+{
+ int read_size = 0;
+ int ret;
+ struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0};
+
+ if (size == 0) {
+ return 0;
+ }
+
+ if (!block) {
+ while ((ret = poll(&pollfd, 1, 0)) == -1) {
+ if (errno == EINTR) {
+ DISPATCHER_DEBUG_PRINTF(3, "EINTR in poll");
+ continue;
+ }
+ red_error("poll failed");
+ return -1;
+ }
+ if (!(pollfd.revents & POLLIN)) {
+ return 0;
+ }
+ }
+ while (read_size < size) {
+ ret = read(fd, buf + read_size, size - read_size);
+ if (ret == -1) {
+ if (errno == EINTR) {
+ DISPATCHER_DEBUG_PRINTF(3, "EINTR in read");
+ continue;
+ }
+ return -1;
+ }
+ if (ret == 0) {
+ red_error("broken pipe on read");
+ return -1;
+ }
+ read_size += ret;
+ }
+ return read_size;
+}
+
+/*
+ * write_safe
+ * @return -1 for error, otherwise number of written bytes. may be zero.
+ */
+static int write_safe(int fd, void *buf, size_t size)
+{
+ int written_size = 0;
+ int ret;
+
+ while (written_size < size) {
+ ret = write(fd, buf + written_size, size - written_size);
+ if (ret == -1) {
+ if (errno != EINTR) {
+ DISPATCHER_DEBUG_PRINTF(3, "EINTR in write\n");
+ return -1;
+ }
+ continue;
+ }
+ written_size += ret;
+ }
+ return written_size;
+}
+
+static int dispatcher_handle_single_read(Dispatcher *dispatcher)
+{
+ int ret;
+ uint32_t type;
+ DispatcherMessage *msg = NULL;
+ uint8_t *payload = dispatcher->payload;
+ uint32_t ack = ACK;
+
+ if ((ret = read_safe(dispatcher->recv_fd, &type, sizeof(type), 0)) == -1) {
+ red_printf("error reading from dispatcher: %d\n", errno);
+ return 0;
+ }
+ if (ret == 0) {
+ /* no messsage */
+ return 0;
+ }
+ msg = &dispatcher->messages[type];
+ if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
+ red_printf("error reading from dispatcher: %d\n", errno);
+ /* TODO: close socketpair? */
+ return 0;
+ }
+ if (msg->handler) {
+ msg->handler(dispatcher->opaque, (void *)payload);
+ } else {
+ red_printf("error: no handler for message type %d\n", type);
+ }
+ if (msg->ack && write_safe(dispatcher->recv_fd,
+ &ack, sizeof(ack)) == -1) {
+ red_printf("error writing ack for message %d\n", type);
+ /* TODO: close socketpair? */
+ }
+ return 1;
+}
+
+/*
+ * dispatcher_handle_recv_read
+ * doesn't handle being in the middle of a message. all reads are blocking.
+ */
+void dispatcher_handle_recv_read(Dispatcher *dispatcher)
+{
+ while (dispatcher_handle_single_read(dispatcher)) {
+ }
+}
+
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+ void *payload)
+{
+ DispatcherMessage *msg;
+ uint32_t ack;
+ int send_fd = dispatcher->send_fd;
+
+ assert(dispatcher->max_message_type > message_type);
+ assert(dispatcher->messages[message_type].handler);
+ msg = &dispatcher->messages[message_type];
+ pthread_mutex_lock(&dispatcher->lock);
+ if (write_safe(send_fd, &message_type, sizeof(message_type)) == -1) {
+ red_printf("error: failed to send message type for message %d\n",
+ message_type);
+ goto unlock;
+ }
+ if (write_safe(send_fd, payload, msg->size) == -1) {
+ red_printf("error: failed to send message body for message %d\n",
+ message_type);
+ goto unlock;
+ }
+ if (msg->ack) {
+ if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) {
+ red_printf("error: failed to read ack");
+ } else if (ack != ACK) {
+ red_printf("error: got wrong ack value in dispatcher "
+ "for message %d\n", message_type);
+ /* TODO handling error? */
+ }
+ }
+unlock:
+ pthread_mutex_unlock(&dispatcher->lock);
+}
+
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+ dispatcher_handle_message handler, size_t size,
+ int ack)
+{
+ DispatcherMessage *msg;
+
+ assert(message_type < dispatcher->max_message_type);
+ assert(dispatcher->messages[message_type].handler == 0);
+ msg = &dispatcher->messages[message_type];
+ msg->handler = handler;
+ msg->size = size;
+ msg->ack = ack;
+ if (msg->size > dispatcher->payload_size) {
+ dispatcher->payload = realloc(dispatcher->payload, msg->size);
+ dispatcher->payload_size = msg->size;
+ }
+}
+
+#ifdef DEBUG_DISPATCHER
+static void dummy_handler(int bla)
+{
+}
+
+static void setup_dummy_signal_handler(void)
+{
+ static int inited = 0;
+ struct sigaction act = {
+ .sa_handler = &dummy_handler,
+ };
+ if (inited) {
+ return;
+ }
+ inited = 1;
+ /* handle SIGRTMIN+10 in order to test the loops for EINTR */
+ if (sigaction(SIGRTMIN + 10, &act, NULL) == -1) {
+ fprintf(stderr,
+ "failed to set dummy sigaction for DEBUG_DISPATCHER\n");
+ exit(1);
+ }
+}
+#endif
+
+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
+ void *opaque)
+{
+ int channels[2];
+
+#ifdef DEBUG_DISPATCHER
+ setup_dummy_signal_handler();
+#endif
+ dispatcher->opaque = opaque;
+ if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
+ red_error("socketpair failed %s", strerror(errno));
+ return;
+ }
+ pthread_mutex_init(&dispatcher->lock, NULL);
+ dispatcher->recv_fd = channels[0];
+ dispatcher->send_fd = channels[1];
+ dispatcher->self = pthread_self();
+
+ dispatcher->messages = spice_malloc0_n(max_message_type,
+ sizeof(dispatcher->messages[0]));
+ dispatcher->max_message_type = max_message_type;
+}
+
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
+{
+ dispatcher->opaque = opaque;
+}
+
+int dispatcher_get_recv_fd(Dispatcher *dispatcher)
+{
+ return dispatcher->recv_fd;
+}
diff --git a/server/dispatcher.h b/server/dispatcher.h
new file mode 100644
index 0000000..04e6b46
--- /dev/null
+++ b/server/dispatcher.h
@@ -0,0 +1,81 @@
+#ifndef MAIN_DISPATCHER_H
+#define MAIN_DISPATCHER_H
+
+#include <spice.h>
+
+typedef struct Dispatcher Dispatcher;
+
+typedef void (*dispatcher_handle_message)(void *opaque,
+ void *payload);
+
+typedef struct DispatcherMessage {
+ size_t size;
+ int ack;
+ dispatcher_handle_message handler;
+} DispatcherMessage;
+
+struct Dispatcher {
+ SpiceCoreInterface *recv_core;
+ int recv_fd;
+ int send_fd;
+ pthread_t self;
+ pthread_mutex_t lock;
+ DispatcherMessage *messages;
+ int stage; /* message parser stage - sender has no stages */
+ size_t max_message_type;
+ void *payload; /* allocated as max of message sizes */
+ size_t payload_size; /* used to track realloc calls */
+ void *opaque;
+};
+
+/*
+ * dispatcher_send_message
+ * @message_type: message type
+ * @payload: payload
+ */
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+ void *payload);
+
+/*
+ * dispatcher_init
+ * @max_message_type: number of message types. Allows upfront allocation
+ * of a DispatcherMessage list.
+ * up front, and registration in any order wanted.
+ */
+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
+ void *opaque);
+
+/*
+ * dispatcher_register_handler
+ * @dispatcher: dispatcher
+ * @messsage_type: message type
+ * @handler: message handler
+ * @size: message size. Each type has a fixed associated size.
+ * @ack: send an ack. This is per message type - you can't send the
+ * same message type with and without. Register two different
+ * messages if that is what you want.
+ */
+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
+ dispatcher_handle_message handler, size_t size,
+ int ack);
+
+/*
+ * dispatcher_handle_recv_read
+ * @dispatcher: Dispatcher instance
+ */
+void dispatcher_handle_recv_read(Dispatcher *);
+
+/*
+ * dispatcher_get_recv_fd
+ * @return: receive file descriptor of the dispatcher
+ */
+int dispatcher_get_recv_fd(Dispatcher *);
+
+/*
+ * dispatcher_set_opaque
+ * @dispatcher: Dispatcher instance
+ * @opaque: opaque to use for callbacks
+ */
+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
+
+#endif //MAIN_DISPATCHER_H
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
index 73856bf..a5967fa 100644
--- a/server/main_dispatcher.c
+++ b/server/main_dispatcher.c
@@ -5,6 +5,7 @@
#include <assert.h>
#include "red_common.h"
+#include "dispatcher.h"
#include "main_dispatcher.h"
/*
@@ -28,11 +29,8 @@
*/
typedef struct {
+ Dispatcher base;
SpiceCoreInterface *core;
- int main_fd;
- int other_fd;
- pthread_t self;
- pthread_mutex_t lock;
} MainDispatcher;
MainDispatcher main_dispatcher;
@@ -43,15 +41,10 @@ enum {
MAIN_DISPATCHER_NUM_MESSAGES
};
-typedef struct MainDispatcherMessage {
- uint32_t type;
- union {
- struct {
- int event;
- SpiceChannelEventInfo *info;
- } channel_event;
- } data;
-} MainDispatcherMessage;
+typedef struct MainDispatcherChannelEventMessage {
+ int event;
+ SpiceChannelEventInfo *info;
+} MainDispatcherChannelEventMessage;
/* channel_event - calls core->channel_event, must be done in main thread */
static void main_dispatcher_self_handle_channel_event(
@@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
main_dispatcher.core->channel_event(event, info);
}
-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
+static void main_dispatcher_handle_channel_event(void *opaque,
+ void *payload)
{
- main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
- msg->data.channel_event.info);
+ MainDispatcherChannelEventMessage *channel_event = payload;
+
+ main_dispatcher_self_handle_channel_event(channel_event->event,
+ channel_event->info);
}
void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
{
- MainDispatcherMessage msg;
- ssize_t written = 0;
- ssize_t ret;
+ MainDispatcherChannelEventMessage msg;
- if (pthread_self() == main_dispatcher.self) {
+ if (pthread_self() == main_dispatcher.base.self) {
main_dispatcher_self_handle_channel_event(event, info);
return;
}
- msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
- msg.data.channel_event.event = event;
- msg.data.channel_event.info = info;
- pthread_mutex_lock(&main_dispatcher.lock);
- while (written < sizeof(msg)) {
- ret = write(main_dispatcher.other_fd, &msg + written,
- sizeof(msg) - written);
- if (ret == -1) {
- assert(errno == EINTR);
- continue;
- }
- written += ret;
- }
- pthread_mutex_unlock(&main_dispatcher.lock);
+ msg.event = event;
+ msg.info = info;
+ dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+ &msg);
}
-
-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
+static void dispatcher_handle_read(int fd, int event, void *opaque)
{
- int ret;
- MainDispatcher *md = opaque;
- MainDispatcherMessage msg;
- int read_size = 0;
+ Dispatcher *dispatcher = opaque;
- while (read_size < sizeof(msg)) {
- /* blocks until sizeof(msg) is read */
- ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size);
- if (ret == -1) {
- if (errno != EINTR) {
- red_printf("error reading from main dispatcher: %d\n", errno);
- /* TODO: close channel? */
- return;
- }
- continue;
- }
- read_size += ret;
- }
- switch (msg.type) {
- case MAIN_DISPATCHER_CHANNEL_EVENT:
- main_dispatcher_handle_channel_event(&msg);
- break;
- default:
- red_printf("error: unhandled main dispatcher message type %d\n",
- msg.type);
- }
+ dispatcher_handle_recv_read(dispatcher);
}
void main_dispatcher_init(SpiceCoreInterface *core)
{
- int channels[2];
-
memset(&main_dispatcher, 0, sizeof(main_dispatcher));
main_dispatcher.core = core;
-
- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
- red_error("socketpair failed %s", strerror(errno));
- return;
- }
- pthread_mutex_init(&main_dispatcher.lock, NULL);
- main_dispatcher.main_fd = channels[0];
- main_dispatcher.other_fd = channels[1];
- main_dispatcher.self = pthread_self();
-
- core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
- main_dispatcher_handle_read, &main_dispatcher);
+ dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base);
+ core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
+ dispatcher_handle_read, &main_dispatcher.base);
+ dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+ main_dispatcher_handle_channel_event,
+ sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
}
commit 9174b67160157f74cc00faf0b42cb7c5d2b710a1
Author: Alon Levy <alevy at redhat.com>
Date: Mon Nov 7 12:11:12 2011 +0200
server/red_dispatcher: remove semicolon from DBG_ASYNC
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 5257e6b..0c7a875 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -47,7 +47,7 @@ static int num_active_workers = 0;
#define DBG_ASYNC(s, ...) \
do { \
red_printf_debug(2, "ASYNC", s, ##__VA_ARGS__); \
- } while (0);
+ } while (0)
struct AsyncCommand {
RingItem link;
More information about the Spice-commits
mailing list