[Spice-devel] [PATCH 5/7] red_worker: reimplement event loop using poll()
Alon Levy
alevy at redhat.com
Fri Feb 17 01:05:24 PST 2012
On Thu, Feb 16, 2012 at 11:30:11PM -0600, Dan McGee wrote:
> This removes the epoll dependency we had in red_worker, which was the
> last Linux-specific call we were using in the entire Spice server. Given
> we never have more than 10 file descriptors involved, there is little
> performance gain had here by using epoll() over poll().
>
> The biggest change is introduction of a new pre_disconnect callback;
> this is because poll, unlike epoll, cannot automatically remove file
> descriptors as they are closed from the pollfd set. This cannot be done
> in the existing on_disconnect callback; that is too late as the stream
> has already been closed and the file descriptor lost. The on_disconnect
> callback can not be moved before the close and other operations easily
> because of some behavior that relies on client_num being set to a
> certain value.
>
> Signed-off-by: Dan McGee <dpmcgee at gmail.com>
> ---
>
> Notes:
>
> From what I can tell, the performance aspects of epoll() were never really
> necessary here. Unless you have multiple client mode turned on, the epoll
> descriptor never has more than 3 registered events. Thus, the switch to poll()
> does not seem like a problem.
>
> The tradeoff comes with MAX_EVENT_SOURCES. You'll note I bumped it to 20 (for
> now) below from the 10 passed to epoll before. The catch is, epoll only uses
> that value as a suggestion and will continue to accept as many FDs as you hand
> it. poll() on the other hand does not do allocation of its own, so once we fill
> our 20-element array, we are done. What does this mean? When multi-client mode
> is active, we can't accept more than about 10 clients right now.
>
> I'd like to know how people use multi-client mode, and how many clients we
> should be expected to support. I would be fine with writing a follow-on patch
> that can dynamically resize as necessary the pollfd array.
>
> A fair amount of the new code is due to the fact that epoll() can automatically
> drop events on file descriptors as they are closed; poll() doesn't do that for
> us so we need to do the bookkeeping ourselves.
>
The patch itself looks ok, but I'm not sure I like the direction. Why
not specialize, have epoll for linux and poll if epoll isn't supported?
- double the code
+ no possible regression
You could do this with a vtable or ifdeferry.
On the other hand multiple client support is not used by anyone afaict,
there is a major problem with it that is why I kept it experimental -
crashes if the clients are not all with the same bandwidth (can happen
with a hiccup too).
> server/red_channel.c | 3 +
> server/red_channel.h | 1 +
> server/red_worker.c | 106 ++++++++++++++++++++++++++++++--------------------
> server/reds.c | 1 +
> 4 files changed, 69 insertions(+), 42 deletions(-)
>
> diff --git a/server/red_channel.c b/server/red_channel.c
> index 767f907..c07216e 100644
> --- a/server/red_channel.c
> +++ b/server/red_channel.c
> @@ -1190,6 +1190,9 @@ void red_channel_client_disconnect(RedChannelClient *rcc)
> return;
> }
> red_channel_client_pipe_clear(rcc);
> + if (rcc->channel->channel_cbs.pre_disconnect) {
> + rcc->channel->channel_cbs.pre_disconnect(rcc);
> + }
> reds_stream_free(rcc->stream);
> rcc->stream = NULL;
> red_channel_remove_client(rcc);
> diff --git a/server/red_channel.h b/server/red_channel.h
> index 045bfd4..7bafecb 100644
> --- a/server/red_channel.h
> +++ b/server/red_channel.h
> @@ -182,6 +182,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base);
> */
> typedef struct {
> channel_configure_socket_proc config_socket;
> + channel_disconnect_proc pre_disconnect;
> channel_disconnect_proc on_disconnect;
> channel_send_pipe_item_proc send_item;
> channel_hold_pipe_item_proc hold_item;
> diff --git a/server/red_worker.c b/server/red_worker.c
> index 3973f3e..e88dbc0 100644
> --- a/server/red_worker.c
> +++ b/server/red_worker.c
> @@ -31,7 +31,6 @@
>
> #include <stdio.h>
> #include <stdarg.h>
> -#include <sys/epoll.h>
> #include <fcntl.h>
> #include <sys/socket.h>
> #include <netinet/in.h>
> @@ -39,6 +38,7 @@
> #include <errno.h>
> #include <string.h>
> #include <unistd.h>
> +#include <poll.h>
> #include <pthread.h>
> #include <netinet/tcp.h>
> #include <setjmp.h>
> @@ -231,12 +231,12 @@ double inline stat_byte_to_mega(uint64_t size)
> #define stat_compress_add(a, b, c, d)
> #endif
>
> -#define MAX_EPOLL_SOURCES 10
> +#define MAX_EVENT_SOURCES 20
> #define INF_EVENT_WAIT ~0
>
>
> typedef struct EventListener EventListener;
> -typedef void (*event_listener_action_proc)(EventListener *ctx, uint32_t events);
> +typedef void (*event_listener_action_proc)(EventListener *ctx, struct pollfd *pfd);
> typedef void (*event_listener_free_proc)(EventListener *ctx);
> struct EventListener {
> uint32_t refs;
> @@ -877,7 +877,8 @@ typedef struct RedWorker {
> int id;
> int running;
> uint32_t *pending;
> - int epoll;
> + struct pollfd poll_fds[MAX_EVENT_SOURCES];
> + EventListener *listeners[MAX_EVENT_SOURCES];
> unsigned int event_timeout;
> uint32_t repoll_cmd_ring;
> uint32_t repoll_cursor_ring;
> @@ -8721,6 +8722,21 @@ void red_show_tree(RedWorker *worker)
> }
> }
>
> +static void poll_channel_client_pre_disconnect(RedChannelClient *rcc)
> +{
> + CommonChannel *common;
> + int i;
> +
> + common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
> + for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> + struct pollfd *pfd = common->worker->poll_fds + i;
> + if (pfd->fd == rcc->stream->socket) {
> + pfd->fd = -1;
> + break;
> + }
> + }
> +}
> +
> static void display_channel_client_on_disconnect(RedChannelClient *rcc)
> {
> DisplayChannel *display_channel;
> @@ -9612,7 +9628,7 @@ CursorChannelClient *cursor_channel_create_rcc(CommonChannel *common,
> static int listen_to_new_client_channel(CommonChannel *common,
> CommonChannelClient *common_cc, RedsStream *stream)
> {
> - struct epoll_event event;
> + int i;
>
> common_cc->listener.refs = 1;
> common_cc->listener.action = common->listener_action;
> @@ -9620,17 +9636,23 @@ static int listen_to_new_client_channel(CommonChannel *common,
> ASSERT(common->base.clients_num);
> common_cc->id = common->worker->id;
> red_printf("NEW ID = %d", common_cc->id);
> - event.events = EPOLLIN | EPOLLOUT | EPOLLET;
> - event.data.ptr = &common_cc->listener;
> - if (epoll_ctl(common->worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) {
> - red_printf("epoll_ctl failed, %s", strerror(errno));
> - return FALSE;
> +
> + for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> + struct pollfd *pfd = common->worker->poll_fds + i;
> + if (pfd->fd < 0) {
> + red_printf("new poll event %d (fd %d)", i, stream->socket);
> + pfd->fd = stream->socket;
> + pfd->events = POLLIN | POLLOUT;
> + common->worker->listeners[i] = &common_cc->listener;
> + return TRUE;
> + }
> }
> - return TRUE;
> + return FALSE;
> }
>
> static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
> event_listener_action_proc handler,
> + channel_disconnect_proc pre_disconnect,
> channel_disconnect_proc on_disconnect,
> channel_send_pipe_item_proc send_item,
> channel_hold_pipe_item_proc hold_item,
> @@ -9645,6 +9667,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
> ChannelCbs channel_cbs;
>
> channel_cbs.config_socket = common_channel_config_socket;
> + channel_cbs.pre_disconnect = pre_disconnect;
> channel_cbs.on_disconnect = on_disconnect;
> channel_cbs.send_item = send_item;
> channel_cbs.hold_item = hold_item;
> @@ -9675,12 +9698,12 @@ error:
> return NULL;
> }
>
> -static void handle_channel_events(EventListener *in_listener, uint32_t events)
> +static void handle_channel_events(EventListener *in_listener, struct pollfd *pfd)
> {
> CommonChannelClient *common_cc = SPICE_CONTAINEROF(in_listener, CommonChannelClient, listener);
> RedChannelClient *rcc = &common_cc->base;
>
> - if ((events & EPOLLIN) && red_channel_client_is_connected(rcc)) {
> + if ((pfd->events & POLLIN) && red_channel_client_is_connected(rcc)) {
> red_channel_client_receive(rcc);
> }
>
> @@ -9835,6 +9858,7 @@ static void display_channel_create(RedWorker *worker, int migrate)
> worker, sizeof(*display_channel),
> SPICE_CHANNEL_DISPLAY, migrate,
> handle_channel_events,
> + poll_channel_client_pre_disconnect,
> display_channel_client_on_disconnect,
> display_channel_send_item,
> display_channel_hold_pipe_item,
> @@ -10049,6 +10073,7 @@ static void cursor_channel_create(RedWorker *worker, int migrate)
> worker, sizeof(*worker->cursor_channel),
> SPICE_CHANNEL_CURSOR, migrate,
> handle_channel_events,
> + poll_channel_client_pre_disconnect,
> cursor_channel_client_on_disconnect,
> cursor_channel_send_item,
> cursor_channel_hold_pipe_item,
> @@ -11027,7 +11052,7 @@ static void register_callbacks(Dispatcher *dispatcher)
>
>
>
> -static void handle_dev_input(EventListener *listener, uint32_t events)
> +static void handle_dev_input(EventListener *listener, struct pollfd *pfd)
> {
> RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
>
> @@ -11041,10 +11066,9 @@ static void handle_dev_free(EventListener *ctx)
>
> static void red_init(RedWorker *worker, WorkerInitData *init_data)
> {
> - struct epoll_event event;
> RedWorkerMessage message;
> - int epoll;
> Dispatcher *dispatcher;
> + int i;
>
> ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE);
>
> @@ -11086,17 +11110,13 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
> worker->wakeup_counter = stat_add_counter(worker->stat, "wakeups", TRUE);
> worker->command_counter = stat_add_counter(worker->stat, "commands", TRUE);
> #endif
> - if ((epoll = epoll_create(MAX_EPOLL_SOURCES)) == -1) {
> - red_error("epoll_create failed, %s", strerror(errno));
> + for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> + worker->poll_fds[i].fd = -1;
> }
> - worker->epoll = epoll;
>
> - event.events = EPOLLIN;
> - event.data.ptr = &worker->dev_listener;
> -
> - if (epoll_ctl(epoll, EPOLL_CTL_ADD, worker->channel, &event) == -1) {
> - red_error("add channel failed, %s", strerror(errno));
> - }
> + worker->poll_fds[0].fd = worker->channel;
> + worker->poll_fds[0].events = POLLIN;
> + worker->listeners[0] = &worker->dev_listener;
>
> red_memslot_info_init(&worker->mem_slots,
> init_data->num_memslots_groups,
> @@ -11140,13 +11160,10 @@ void *red_worker_main(void *arg)
> red_init_zlib(&worker);
> worker.event_timeout = INF_EVENT_WAIT;
> for (;;) {
> - struct epoll_event events[MAX_EPOLL_SOURCES];
> - int num_events;
> - struct epoll_event *event;
> - struct epoll_event *end;
> + int i, num_events;
>
> worker.event_timeout = MIN(red_get_streams_timout(&worker), worker.event_timeout);
> - num_events = epoll_wait(worker.epoll, events, MAX_EPOLL_SOURCES, worker.event_timeout);
> + num_events = poll(worker.poll_fds, MAX_EVENT_SOURCES, worker.event_timeout);
> red_handle_streams_timout(&worker);
>
> if (worker.display_channel) {
> @@ -11160,27 +11177,32 @@ void *red_worker_main(void *arg)
> worker.event_timeout = INF_EVENT_WAIT;
> if (num_events == -1) {
> if (errno != EINTR) {
> - red_error("poll_wait failed, %s", strerror(errno));
> + red_error("poll failed, %s", strerror(errno));
> }
> num_events = 0;
> }
>
> - for (event = events, end = event + num_events; event < end; event++) {
> - EventListener *evt_listener = (EventListener *)event->data.ptr;
> - evt_listener->refs++;
> + for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> + struct pollfd *pfd = worker.poll_fds + i;
> + if (pfd->revents) {
> + worker.listeners[i]->refs++;
> + }
> }
>
> - for (event = events, end = event + num_events; event < end; event++) {
> - EventListener *evt_listener = (EventListener *)event->data.ptr;
> + for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> + struct pollfd *pfd = worker.poll_fds + i;
> + if (pfd->revents) {
> + EventListener *evt_listener = worker.listeners[i];
>
> - if (evt_listener->refs > 1) {
> - evt_listener->action(evt_listener, event->events);
> - if (--evt_listener->refs) {
> - continue;
> + if (evt_listener->refs > 1) {
> + evt_listener->action(evt_listener, pfd);
> + if (--evt_listener->refs) {
> + continue;
> + }
> }
> + red_printf("freeing event listener");
> + evt_listener->free(evt_listener);
> }
> - red_printf("freeing event listener");
> - evt_listener->free(evt_listener);
> }
>
> if (worker.running) {
> diff --git a/server/reds.c b/server/reds.c
> index daadb56..250e0ca 100644
> --- a/server/reds.c
> +++ b/server/reds.c
> @@ -4204,6 +4204,7 @@ void reds_stream_free(RedsStream *s)
> }
>
> reds_stream_remove_watch(s);
> + red_printf("close socket fd %d", s->socket);
> close(s->socket);
>
> free(s);
> --
> 1.7.9.1
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
More information about the Spice-devel
mailing list