[Spice-devel] [PATCH spice 4/5] red_worker: Rework poll code to use the watch interface

Hans de Goede hdegoede at redhat.com
Sat Mar 10 11:31:28 PST 2012


Commit 143a1df24e83e9c1e173c16aeb76d61ffdce9598 changed red_worker_main
from epoll to poll. But epoll has edge triggered semantics (when requested
and we requested them), where as poll is always level triggered. And
red_worker was relying on the edge triggered semantics, as it was always
polling for POLLOUT, which, when edge triggered, would only cause poll
to register an event after we had blocked on a write. But after the
switch to regular poll, with its level triggered semantics, the POLLOUT
condition would almost always be true, causing red_worker_main to not
block on the poll and burn CPU as fast as it can as soon as a client was
connected.

Luckily we already have a mechanism to switch from polling for read only
to polling for read+write and back again in the form of watches. So this
patch changes the red_worker dummy watch implementation into a proper watch
implementation, and drops the entire EventListener concept since that then is
no longer needed.

This fixes spice-server using 400% CPU on my quad core machine as soon as
a client was connected to a multi head vm, and as an added bonus is a nice
cleanup IMHO.

Signed-off-by: Hans de Goede <hdegoede at redhat.com>
---
 server/red_worker.c |  170 ++++++++++++++++++++++++---------------------------
 1 file changed, 79 insertions(+), 91 deletions(-)

diff --git a/server/red_worker.c b/server/red_worker.c
index 3da5161..f7c6ccb 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -234,12 +234,11 @@ double inline stat_byte_to_mega(uint64_t size)
 #define MAX_EVENT_SOURCES 20
 #define INF_EVENT_WAIT ~0
 
-
-typedef struct EventListener EventListener;
-typedef void (*event_listener_action_proc)(EventListener *ctx, struct pollfd *pfd);
-struct EventListener {
-    event_listener_action_proc action;
-};
+typedef struct SpiceWatch {
+    struct RedWorker *worker;
+    SpiceWatchFunc watch_func;
+    void *watch_func_opaque;
+} SpiceWatch;
 
 enum {
     BUF_TYPE_RAW = 1,
@@ -577,7 +576,6 @@ typedef struct GlzSharedDictionary {
 
 typedef struct CommonChannel {
     RedChannel base; // Must be the first thing
-    event_listener_action_proc listener_action;
     struct RedWorker *worker;
     uint8_t recv_buf[RECIVE_BUF_SIZE];
     uint32_t id_alloc; // bitfield. TODO - use this instead of shift scheme.
@@ -585,7 +583,6 @@ typedef struct CommonChannel {
 
 typedef struct CommonChannelClient {
     RedChannelClient base;
-    EventListener listener;
     uint32_t id;
     struct RedWorker *worker;
 } CommonChannelClient;
@@ -864,7 +861,6 @@ typedef struct ItemTrace {
 #define NUM_CURSORS 100
 
 typedef struct RedWorker {
-    EventListener dev_listener;
     DisplayChannel *display_channel;
     CursorChannel *cursor_channel;
     QXLInstance *qxl;
@@ -875,7 +871,7 @@ typedef struct RedWorker {
     int running;
     uint32_t *pending;
     struct pollfd poll_fds[MAX_EVENT_SOURCES];
-    EventListener *listeners[MAX_EVENT_SOURCES];
+    struct SpiceWatch watches[MAX_EVENT_SOURCES];
     unsigned int event_timeout;
     uint32_t repoll_cmd_ring;
     uint32_t repoll_cursor_ring;
@@ -8719,22 +8715,6 @@ void red_show_tree(RedWorker *worker)
     }
 }
 
-static void poll_channel_client_pre_disconnect(RedChannelClient *rcc)
-{
-    CommonChannel *common;
-    int i;
-
-    common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
-    for (i = 0; i < MAX_EVENT_SOURCES; i++) {
-        struct pollfd *pfd = common->worker->poll_fds + i;
-        if (pfd->fd == rcc->stream->socket) {
-            pfd->fd = -1;
-            common->worker->listeners[i] = NULL;
-            break;
-        }
-    }
-}
-
 static void display_channel_client_on_disconnect(RedChannelClient *rcc)
 {
     DisplayChannel *display_channel;
@@ -9534,15 +9514,61 @@ static int common_channel_config_socket(RedChannelClient *rcc)
 
 static void worker_watch_update_mask(SpiceWatch *watch, int event_mask)
 {
+    struct RedWorker *worker = watch->worker;
+    int i = watch - worker->watches;
+
+    worker->poll_fds[i].events = 0;
+    if (event_mask & SPICE_WATCH_EVENT_READ) {
+        worker->poll_fds[i].events |= POLLIN;
+    }
+    if (event_mask & SPICE_WATCH_EVENT_WRITE) {
+        worker->poll_fds[i].events |= POLLOUT;
+    }
 }
 
 static SpiceWatch *worker_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque)
 {
-    return NULL; // apparently allowed?
+    /* Since we are a channel core implementation, we always get called from
+       red_channel_client_create(), so opaque always is our rcc */
+    RedChannelClient *rcc = opaque;
+    struct RedWorker *worker;
+    int i;
+
+    /* Since we are called from red_channel_client_create()
+       CommonChannelClient->worker has not been set yet! */
+    worker = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base)->worker;
+
+    /* Search for a free slot in our poll_fds & watches arrays */
+    for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+        if (worker->poll_fds[i].fd == -1) {
+            break;
+        }
+    }
+    if (i == MAX_EVENT_SOURCES) {
+        red_printf("ERROR could not add a watch for channel type %u id %u",
+                   rcc->channel->type, rcc->channel->id);
+        return NULL;
+    }
+
+    worker->poll_fds[i].fd = fd;
+    worker->watches[i].worker = worker;
+    worker->watches[i].watch_func = func;
+    worker->watches[i].watch_func_opaque = opaque;
+    worker_watch_update_mask(&worker->watches[i], event_mask);
+
+    return &worker->watches[i];
 }
 
 static void worker_watch_remove(SpiceWatch *watch)
 {
+    /* Note we don't touch the poll_fd here, to avoid the
+       poll_fds/watches table entry getting re-used in the same
+       red_worker_main loop over the fds as it is removed.
+
+       This is done because re-using it while events were pending on
+       the fd previously occupying the slot would lead to incorrectly
+       calling the watch_func for the new fd. */
+    memset(watch, 0, sizeof(SpiceWatch));
 }
 
 SpiceCoreInterface worker_core = {
@@ -9566,6 +9592,7 @@ static CommonChannelClient *common_channel_client_create(int size,
                                   num_common_caps, common_caps, num_caps, caps);
     CommonChannelClient *common_cc = (CommonChannelClient*)rcc;
     common_cc->worker = common->worker;
+    common_cc->id = common->worker->id;
 
     // TODO: move wide/narrow ack setting to red_channel.
     red_channel_client_ack_set_client_window(rcc,
@@ -9615,32 +9642,7 @@ CursorChannelClient *cursor_channel_create_rcc(CommonChannel *common,
     return ccc;
 }
 
-static int listen_to_new_client_channel(CommonChannel *common,
-    CommonChannelClient *common_cc, RedsStream *stream)
-{
-    int i;
-
-    common_cc->listener.action = common->listener_action;
-    ASSERT(common->base.clients_num);
-    common_cc->id = common->worker->id;
-    red_printf("NEW ID = %d", common_cc->id);
-
-    for (i = 0; i < MAX_EVENT_SOURCES; i++) {
-        struct pollfd *pfd = common->worker->poll_fds + i;
-        if (pfd->fd < 0) {
-            red_printf("new poll event %d (fd %d)", i, stream->socket);
-            pfd->fd = stream->socket;
-            pfd->events = POLLIN | POLLOUT;
-            common->worker->listeners[i] = &common_cc->listener;
-            return TRUE;
-        }
-    }
-    return FALSE;
-}
-
 static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
-                                 event_listener_action_proc handler,
-                                 channel_disconnect_proc pre_disconnect,
                                  channel_disconnect_proc on_disconnect,
                                  channel_send_pipe_item_proc send_item,
                                  channel_hold_pipe_item_proc hold_item,
@@ -9655,7 +9657,6 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
     ChannelCbs channel_cbs = { NULL, };
 
     channel_cbs.config_socket = common_channel_config_socket;
-    channel_cbs.pre_disconnect = pre_disconnect;
     channel_cbs.on_disconnect = on_disconnect;
     channel_cbs.send_item = send_item;
     channel_cbs.hold_item = hold_item;
@@ -9678,7 +9679,6 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
         goto error;
     }
     common->worker = worker;
-    common->listener_action = handler;
     return channel;
 
 error:
@@ -9686,20 +9686,6 @@ error:
     return NULL;
 }
 
-static void handle_channel_events(EventListener *in_listener, struct pollfd *pfd)
-{
-    CommonChannelClient *common_cc = SPICE_CONTAINEROF(in_listener, CommonChannelClient, listener);
-    RedChannelClient *rcc = &common_cc->base;
-
-    if ((pfd->events & POLLIN) && red_channel_client_is_connected(rcc)) {
-        red_channel_client_receive(rcc);
-    }
-
-    if (rcc->send_data.blocked && red_channel_client_is_connected(rcc)) {
-        red_channel_client_push(rcc);
-    }
-}
-
 static void display_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
 {
     ASSERT(item);
@@ -9845,8 +9831,6 @@ static void display_channel_create(RedWorker *worker, int migrate)
     if (!(worker->display_channel = (DisplayChannel *)__new_channel(
             worker, sizeof(*display_channel),
             SPICE_CHANNEL_DISPLAY, migrate,
-            handle_channel_events,
-            poll_channel_client_pre_disconnect,
             display_channel_client_on_disconnect,
             display_channel_send_item,
             display_channel_hold_pipe_item,
@@ -9934,15 +9918,8 @@ static void handle_new_display_channel(RedWorker *worker, RedClient *client, Red
 
     // todo: tune level according to bandwidth
     display_channel->zlib_level = ZLIB_DEFAULT_COMPRESSION_LEVEL;
-    if (!listen_to_new_client_channel(&display_channel->common, &dcc->common, stream)) {
-        goto error;
-    }
     red_display_client_init_streams(dcc);
     on_new_display_channel_client(dcc);
-    return;
-
-error:
-    red_channel_client_destroy(&dcc->common.base);
 }
 
 static void cursor_channel_client_on_disconnect(RedChannelClient *rcc)
@@ -10060,8 +10037,6 @@ static void cursor_channel_create(RedWorker *worker, int migrate)
     worker->cursor_channel = (CursorChannel *)__new_channel(
         worker, sizeof(*worker->cursor_channel),
         SPICE_CHANNEL_CURSOR, migrate,
-        handle_channel_events,
-        poll_channel_client_pre_disconnect,
         cursor_channel_client_on_disconnect,
         cursor_channel_send_item,
         cursor_channel_hold_pipe_item,
@@ -10096,7 +10071,6 @@ static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream
     channel->stat = stat_add_node(worker->stat, "cursor_channel", TRUE);
     channel->common.base.out_bytes_counter = stat_add_counter(channel->stat, "out_bytes", TRUE);
 #endif
-    listen_to_new_client_channel(&channel->common, &ccc->common, stream);
     on_new_cursor_channel(worker, &ccc->common.base);
 }
 
@@ -11040,9 +11014,9 @@ static void register_callbacks(Dispatcher *dispatcher)
 
 
 
-static void handle_dev_input(EventListener *listener, struct pollfd *pfd)
+static void handle_dev_input(int fd, int event, void *opaque)
 {
-    RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
+    RedWorker *worker = opaque;
 
     dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher));
 }
@@ -11064,7 +11038,6 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
     worker->channel = dispatcher_get_recv_fd(dispatcher);
     register_callbacks(dispatcher);
     worker->pending = init_data->pending;
-    worker->dev_listener.action = handle_dev_input;
     worker->cursor_visible = TRUE;
     ASSERT(init_data->num_renderers > 0);
     worker->num_renderers = init_data->num_renderers;
@@ -11097,7 +11070,9 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
 
     worker->poll_fds[0].fd = worker->channel;
     worker->poll_fds[0].events = POLLIN;
-    worker->listeners[0] = &worker->dev_listener;
+    worker->watches[0].worker = worker;
+    worker->watches[0].watch_func = handle_dev_input;
+    worker->watches[0].watch_func_opaque = worker;
 
     red_memslot_info_init(&worker->mem_slots,
                           init_data->num_memslots_groups,
@@ -11160,17 +11135,30 @@ void *red_worker_main(void *arg)
             if (errno != EINTR) {
                 red_error("poll failed, %s", strerror(errno));
             }
-            num_events = 0;
         }
 
         for (i = 0; i < MAX_EVENT_SOURCES; i++) {
-            struct pollfd *pfd = worker.poll_fds + i;
-            if (pfd->revents) {
-                EventListener *evt_listener = worker.listeners[i];
-
-                if (evt_listener) {
-                    evt_listener->action(evt_listener, pfd);
+            /* The watch may have been removed by the watch-func from
+               another fd (ie a disconnect through the dispatcher),
+               in this case watch_func is NULL. */
+            if (worker.poll_fds[i].revents && worker.watches[i].watch_func) {
+                int events = 0;
+                if (worker.poll_fds[i].revents & POLLIN) {
+                    events |= SPICE_WATCH_EVENT_READ;
+                }
+                if (worker.poll_fds[i].revents & POLLOUT) {
+                    events |= SPICE_WATCH_EVENT_WRITE;
                 }
+                worker.watches[i].watch_func(worker.poll_fds[i].fd, events,
+                                        worker.watches[i].watch_func_opaque);
+            }
+        }
+
+        /* Clear the poll_fd for any removed watches, see the comment in
+           watch_remove for why we don't do this there. */
+        for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+            if (!worker.watches[i].watch_func) {
+                worker.poll_fds[i].fd = -1;
             }
         }
 
-- 
1.7.9.3



More information about the Spice-devel mailing list