No subject
Thu Feb 16 21:16:00 PST 2012
necessary here. Unless you have multiple client mode turned on, the epoll
descriptor never has more than 3 registered events. Thus, the switch to poll()
does not seem like a problem.
The tradeoff comes with MAX_EVENT_SOURCES. You'll note I bumped it to 20 (for
now) below from the 10 passed to epoll before. The catch is, epoll only uses
that value as a suggestion and will continue to accept as many FDs as you hand
it. poll() on the other hand does not do allocation of its own, so once we fill
our 20-element array, we are done. What does this mean? When multi-client mode
is active, we can't accept more than about 10 clients right now.
I'd like to know how people use multi-client mode, and how many clients we
should be expected to support. I would be fine with writing a follow-on patch
that can dynamically resize as necessary the pollfd array.
A fair amount of the new code is due to the fact that epoll() can automatically
drop events on file descriptors as they are closed; poll() doesn't do that for
us so we need to do the bookkeeping ourselves.
server/red_channel.c | 3 +
server/red_channel.h | 1 +
server/red_worker.c | 106 ++++++++++++++++++++++++++++++--------------------
server/reds.c | 1 +
4 files changed, 69 insertions(+), 42 deletions(-)
diff --git a/server/red_channel.c b/server/red_channel.c
index 767f907..c07216e 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -1190,6 +1190,9 @@ void red_channel_client_disconnect(RedChannelClient *rcc)
return;
}
red_channel_client_pipe_clear(rcc);
+ if (rcc->channel->channel_cbs.pre_disconnect) {
+ rcc->channel->channel_cbs.pre_disconnect(rcc);
+ }
reds_stream_free(rcc->stream);
rcc->stream = NULL;
red_channel_remove_client(rcc);
diff --git a/server/red_channel.h b/server/red_channel.h
index 045bfd4..7bafecb 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -182,6 +182,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base);
*/
typedef struct {
channel_configure_socket_proc config_socket;
+ channel_disconnect_proc pre_disconnect;
channel_disconnect_proc on_disconnect;
channel_send_pipe_item_proc send_item;
channel_hold_pipe_item_proc hold_item;
diff --git a/server/red_worker.c b/server/red_worker.c
index 3973f3e..e88dbc0 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -31,7 +31,6 @@
#include <stdio.h>
#include <stdarg.h>
-#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -39,6 +38,7 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
+#include <poll.h>
#include <pthread.h>
#include <netinet/tcp.h>
#include <setjmp.h>
@@ -231,12 +231,12 @@ double inline stat_byte_to_mega(uint64_t size)
#define stat_compress_add(a, b, c, d)
#endif
-#define MAX_EPOLL_SOURCES 10
+#define MAX_EVENT_SOURCES 20
#define INF_EVENT_WAIT ~0
typedef struct EventListener EventListener;
-typedef void (*event_listener_action_proc)(EventListener *ctx, uint32_t events);
+typedef void (*event_listener_action_proc)(EventListener *ctx, struct pollfd *pfd);
typedef void (*event_listener_free_proc)(EventListener *ctx);
struct EventListener {
uint32_t refs;
@@ -877,7 +877,8 @@ typedef struct RedWorker {
int id;
int running;
uint32_t *pending;
- int epoll;
+ struct pollfd poll_fds[MAX_EVENT_SOURCES];
+ EventListener *listeners[MAX_EVENT_SOURCES];
unsigned int event_timeout;
uint32_t repoll_cmd_ring;
uint32_t repoll_cursor_ring;
@@ -8721,6 +8722,21 @@ void red_show_tree(RedWorker *worker)
}
}
+static void poll_channel_client_pre_disconnect(RedChannelClient *rcc)
+{
+ CommonChannel *common;
+ int i;
+
+ common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = common->worker->poll_fds + i;
+ if (pfd->fd == rcc->stream->socket) {
+ pfd->fd = -1;
+ break;
+ }
+ }
+}
+
static void display_channel_client_on_disconnect(RedChannelClient *rcc)
{
DisplayChannel *display_channel;
@@ -9612,7 +9628,7 @@ CursorChannelClient *cursor_channel_create_rcc(CommonChannel *common,
static int listen_to_new_client_channel(CommonChannel *common,
CommonChannelClient *common_cc, RedsStream *stream)
{
- struct epoll_event event;
+ int i;
common_cc->listener.refs = 1;
common_cc->listener.action = common->listener_action;
@@ -9620,17 +9636,23 @@ static int listen_to_new_client_channel(CommonChannel *common,
ASSERT(common->base.clients_num);
common_cc->id = common->worker->id;
red_printf("NEW ID = %d", common_cc->id);
- event.events = EPOLLIN | EPOLLOUT | EPOLLET;
- event.data.ptr = &common_cc->listener;
- if (epoll_ctl(common->worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) {
- red_printf("epoll_ctl failed, %s", strerror(errno));
- return FALSE;
+
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = common->worker->poll_fds + i;
+ if (pfd->fd < 0) {
+ red_printf("new poll event %d (fd %d)", i, stream->socket);
+ pfd->fd = stream->socket;
+ pfd->events = POLLIN | POLLOUT;
+ common->worker->listeners[i] = &common_cc->listener;
+ return TRUE;
+ }
}
- return TRUE;
+ return FALSE;
}
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
event_listener_action_proc handler,
+ channel_disconnect_proc pre_disconnect,
channel_disconnect_proc on_disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
@@ -9645,6 +9667,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
ChannelCbs channel_cbs;
channel_cbs.config_socket = common_channel_config_socket;
+ channel_cbs.pre_disconnect = pre_disconnect;
channel_cbs.on_disconnect = on_disconnect;
channel_cbs.send_item = send_item;
channel_cbs.hold_item = hold_item;
@@ -9675,12 +9698,12 @@ error:
return NULL;
}
-static void handle_channel_events(EventListener *in_listener, uint32_t events)
+static void handle_channel_events(EventListener *in_listener, struct pollfd *pfd)
{
CommonChannelClient *common_cc = SPICE_CONTAINEROF(in_listener, CommonChannelClient, listener);
RedChannelClient *rcc = &common_cc->base;
- if ((events & EPOLLIN) && red_channel_client_is_connected(rcc)) {
+ if ((pfd->events & POLLIN) && red_channel_client_is_connected(rcc)) {
red_channel_client_receive(rcc);
}
@@ -9835,6 +9858,7 @@ static void display_channel_create(RedWorker *worker, int migrate)
worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, migrate,
handle_channel_events,
+ poll_channel_client_pre_disconnect,
display_channel_client_on_disconnect,
display_channel_send_item,
display_channel_hold_pipe_item,
@@ -10049,6 +10073,7 @@ static void cursor_channel_create(RedWorker *worker, int migrate)
worker, sizeof(*worker->cursor_channel),
SPICE_CHANNEL_CURSOR, migrate,
handle_channel_events,
+ poll_channel_client_pre_disconnect,
cursor_channel_client_on_disconnect,
cursor_channel_send_item,
cursor_channel_hold_pipe_item,
@@ -11027,7 +11052,7 @@ static void register_callbacks(Dispatcher *dispatcher)
-static void handle_dev_input(EventListener *listener, uint32_t events)
+static void handle_dev_input(EventListener *listener, struct pollfd *pfd)
{
RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
@@ -11041,10 +11066,9 @@ static void handle_dev_free(EventListener *ctx)
static void red_init(RedWorker *worker, WorkerInitData *init_data)
{
- struct epoll_event event;
RedWorkerMessage message;
- int epoll;
Dispatcher *dispatcher;
+ int i;
ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE);
@@ -11086,17 +11110,13 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
worker->wakeup_counter = stat_add_counter(worker->stat, "wakeups", TRUE);
worker->command_counter = stat_add_counter(worker->stat, "commands", TRUE);
#endif
- if ((epoll = epoll_create(MAX_EPOLL_SOURCES)) == -1) {
- red_error("epoll_create failed, %s", strerror(errno));
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ worker->poll_fds[i].fd = -1;
}
- worker->epoll = epoll;
- event.events = EPOLLIN;
- event.data.ptr = &worker->dev_listener;
-
- if (epoll_ctl(epoll, EPOLL_CTL_ADD, worker->channel, &event) == -1) {
- red_error("add channel failed, %s", strerror(errno));
- }
+ worker->poll_fds[0].fd = worker->channel;
+ worker->poll_fds[0].events = POLLIN;
+ worker->listeners[0] = &worker->dev_listener;
red_memslot_info_init(&worker->mem_slots,
init_data->num_memslots_groups,
@@ -11140,13 +11160,10 @@ void *red_worker_main(void *arg)
red_init_zlib(&worker);
worker.event_timeout = INF_EVENT_WAIT;
for (;;) {
- struct epoll_event events[MAX_EPOLL_SOURCES];
- int num_events;
- struct epoll_event *event;
- struct epoll_event *end;
+ int i, num_events;
worker.event_timeout = MIN(red_get_streams_timout(&worker), worker.event_timeout);
- num_events = epoll_wait(worker.epoll, events, MAX_EPOLL_SOURCES, worker.event_timeout);
+ num_events = poll(worker.poll_fds, MAX_EVENT_SOURCES, worker.event_timeout);
red_handle_streams_timout(&worker);
if (worker.display_channel) {
@@ -11160,27 +11177,32 @@ void *red_worker_main(void *arg)
worker.event_timeout = INF_EVENT_WAIT;
if (num_events == -1) {
if (errno != EINTR) {
- red_error("poll_wait failed, %s", strerror(errno));
+ red_error("poll failed, %s", strerror(errno));
}
num_events = 0;
}
- for (event = events, end = event + num_events; event < end; event++) {
- EventListener *evt_listener = (EventListener *)event->data.ptr;
- evt_listener->refs++;
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = worker.poll_fds + i;
+ if (pfd->revents) {
+ worker.listeners[i]->refs++;
+ }
}
- for (event = events, end = event + num_events; event < end; event++) {
- EventListener *evt_listener = (EventListener *)event->data.ptr;
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = worker.poll_fds + i;
+ if (pfd->revents) {
+ EventListener *evt_listener = worker.listeners[i];
- if (evt_listener->refs > 1) {
- evt_listener->action(evt_listener, event->events);
- if (--evt_listener->refs) {
- continue;
+ if (evt_listener->refs > 1) {
+ evt_listener->action(evt_listener, pfd);
+ if (--evt_listener->refs) {
+ continue;
+ }
}
+ red_printf("freeing event listener");
+ evt_listener->free(evt_listener);
}
- red_printf("freeing event listener");
- evt_listener->free(evt_listener);
}
if (worker.running) {
diff --git a/server/reds.c b/server/reds.c
index daadb56..250e0ca 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -4204,6 +4204,7 @@ void reds_stream_free(RedsStream *s)
}
reds_stream_remove_watch(s);
+ red_printf("close socket fd %d", s->socket);
close(s->socket);
free(s);
--
1.7.9.1
More information about the Spice-devel
mailing list