[Spice-devel] [PATCH 01/16] worker: use glib main loop
Jonathon Jongsma
jjongsma at redhat.com
Thu Jan 28 07:43:19 PST 2016
I've looked at this patch so many times now that it's hard to review it
objectively anymore ;) But it looks good to me. I think we still want the
follow-up patch that removes the pushing after this goes in, though.
Acked-by: Jonathon Jongsma <jjongsma at redhat.com>
On Wed, 2016-01-27 at 12:48 +0000, Frediano Ziglio wrote:
> Use the glib mainloop instead of writing our own. The glib loop is both
> cleaner to use and is more extensible. It is also very mature and
> reduces the maintenance burden on the spice server.
>
> Signed-off-by: Marc-André Lureau <marcandre.lureau at gmail.com>
> Signed-off-by: Jonathon Jongsma <jjongsma at redhat.com>
> Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> ---
> server/Makefile.am | 2 -
> server/red-worker.c | 232 +++++++++++++--------------------------
> server/spice_timer_queue.c | 267 --------------------------------------------
> -
> server/spice_timer_queue.h | 44 --------
> 4 files changed, 76 insertions(+), 469 deletions(-)
> delete mode 100644 server/spice_timer_queue.c
> delete mode 100644 server/spice_timer_queue.h
>
> diff --git a/server/Makefile.am b/server/Makefile.am
> index 92b716f..411a0d9 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -121,8 +121,6 @@ libserver_la_SOURCES = \
> sound.h \
> stat.h \
> spicevmc.c \
> - spice_timer_queue.c \
> - spice_timer_queue.h \
> zlib-encoder.c \
> zlib-encoder.h \
> image-cache.h \
> diff --git a/server/red-worker.c b/server/red-worker.c
> index 6196682..738a890 100644
> --- a/server/red-worker.c
> +++ b/server/red-worker.c
> @@ -49,22 +49,14 @@
>
> #include "spice.h"
> #include "red-worker.h"
> -#include "spice_timer_queue.h"
> #include "cursor-channel.h"
> #include "tree.h"
>
> #define CMD_RING_POLL_TIMEOUT 10 //milli
> #define CMD_RING_POLL_RETRIES 200
>
> -#define MAX_EVENT_SOURCES 20
> #define INF_EVENT_WAIT ~0
>
> -struct SpiceWatch {
> - struct RedWorker *worker;
> - SpiceWatchFunc watch_func;
> - void *watch_func_opaque;
> -};
> -
> struct RedWorker {
> pthread_t thread;
> QXLInstance *qxl;
> @@ -72,8 +64,7 @@ struct RedWorker {
> SpiceWatch *dispatch_watch;
> int running;
> SpiceCoreInterfaceInternal core;
> - struct pollfd poll_fds[MAX_EVENT_SOURCES];
> - struct SpiceWatch watches[MAX_EVENT_SOURCES];
> +
> unsigned int event_timeout;
>
> DisplayChannel *display_channel;
> @@ -509,84 +500,6 @@ static int common_channel_config_socket(RedChannelClient
> *rcc)
> return TRUE;
> }
>
> -static void worker_watch_update_mask(SpiceWatch *watch, int event_mask)
> -{
> - struct RedWorker *worker;
> - int i;
> -
> - if (!watch) {
> - return;
> - }
> -
> - worker = watch->worker;
> - i = watch - worker->watches;
> -
> - worker->poll_fds[i].events = 0;
> - if (event_mask & SPICE_WATCH_EVENT_READ) {
> - worker->poll_fds[i].events |= POLLIN;
> - }
> - if (event_mask & SPICE_WATCH_EVENT_WRITE) {
> - worker->poll_fds[i].events |= POLLOUT;
> - }
> -}
> -
> -static SpiceWatch *worker_watch_add(const SpiceCoreInterfaceInternal *iface,
> - int fd, int event_mask, SpiceWatchFunc
> func, void *opaque)
> -{
> - RedWorker *worker = SPICE_CONTAINEROF(iface, RedWorker, core);
> - int i;
> -
> - /* Search for a free slot in our poll_fds & watches arrays */
> - for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> - if (worker->poll_fds[i].fd == -1) {
> - break;
> - }
> - }
> - if (i == MAX_EVENT_SOURCES) {
> - /* Since we are a channel core implementation, we always get called
> from
> - red_channel_client_create(), so opaque always is our rcc */
> - RedChannelClient *rcc = opaque;
> - spice_warning("could not add a watch for channel type %u id %u",
> - rcc->channel->type, rcc->channel->id);
> - return NULL;
> - }
> -
> - worker->poll_fds[i].fd = fd;
> - worker->watches[i].worker = worker;
> - worker->watches[i].watch_func = func;
> - worker->watches[i].watch_func_opaque = opaque;
> - worker_watch_update_mask(&worker->watches[i], event_mask);
> -
> - return &worker->watches[i];
> -}
> -
> -static void worker_watch_remove(SpiceWatch *watch)
> -{
> - if (!watch) {
> - return;
> - }
> -
> - /* Note we don't touch the poll_fd here, to avoid the
> - poll_fds/watches table entry getting re-used in the same
> - red_worker_main loop over the fds as it is removed.
> -
> - This is done because re-using it while events were pending on
> - the fd previously occupying the slot would lead to incorrectly
> - calling the watch_func for the new fd. */
> - memset(watch, 0, sizeof(SpiceWatch));
> -}
> -
> -static const SpiceCoreInterfaceInternal worker_core_initializer = {
> - .timer_add = spice_timer_queue_add,
> - .timer_start = spice_timer_set,
> - .timer_cancel = spice_timer_cancel,
> - .timer_remove = spice_timer_remove,
> -
> - .watch_update_mask = worker_watch_update_mask,
> - .watch_add = worker_watch_add,
> - .watch_remove = worker_watch_remove,
> -};
> -
> CommonChannelClient *common_channel_new_client(CommonChannel *common,
> int size,
> RedClient *client,
> @@ -1530,18 +1443,81 @@ static void handle_dev_input(int fd, int event, void
> *opaque)
> dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker
> ->red_dispatcher));
> }
>
> +typedef struct RedWorkerSource {
> + GSource source;
> + RedWorker *worker;
> +} RedWorkerSource;
> +
> +static gboolean worker_source_prepare(GSource *source, gint *p_timeout)
> +{
> + RedWorkerSource *wsource = SPICE_CONTAINEROF(source, RedWorkerSource,
> source);
> + RedWorker *worker = wsource->worker;
> + unsigned int timeout;
> +
> + timeout = MIN(worker->event_timeout,
> + display_channel_get_streams_timeout(worker
> ->display_channel));
> +
> + *p_timeout = (timeout == INF_EVENT_WAIT) ? -1 : timeout;
> + if (*p_timeout == 0)
> + return TRUE;
> +
> + return FALSE;
> +}
> +
> +static gboolean worker_source_check(GSource *source)
> +{
> + RedWorkerSource *wsource = SPICE_CONTAINEROF(source, RedWorkerSource,
> source);
> + RedWorker *worker = wsource->worker;
> +
> + return worker->running /* TODO && worker->pending_process */;
> +}
> +
> +static gboolean worker_source_dispatch(GSource *source, GSourceFunc callback,
> + gpointer user_data)
> +{
> + RedWorkerSource *wsource = SPICE_CONTAINEROF(source, RedWorkerSource,
> source);
> + RedWorker *worker = wsource->worker;
> + DisplayChannel *display = worker->display_channel;
> + int ring_is_empty;
> +
> + /* during migration, in the dest, the display channel can be initialized
> + while the global lz data not since migrate data msg hasn't been
> + received yet */
> + /* TODO: why is this here, and not in display_channel_create */
> + display_channel_free_glz_drawables_to_free(display);
> +
> + /* TODO: could use its own source */
> + stream_timeout(display);
> +
> + worker->event_timeout = INF_EVENT_WAIT;
> + red_process_cursor(worker, &ring_is_empty);
> + red_process_display(worker, &ring_is_empty);
> +
> + /* TODO: remove me? that should be handled by watch out condition */
> + red_push(worker);
> +
> + return TRUE;
> +}
> +
> +/* cannot be const */
> +static GSourceFuncs worker_source_funcs = {
> + .prepare = worker_source_prepare,
> + .check = worker_source_check,
> + .dispatch = worker_source_dispatch,
> +};
> +
> RedWorker* red_worker_new(QXLInstance *qxl, RedDispatcher *red_dispatcher)
> {
> QXLDevInitInfo init_info;
> RedWorker *worker;
> Dispatcher *dispatcher;
> - int i;
> const char *record_filename;
>
> qxl->st->qif->get_init_info(qxl, &init_info);
>
> worker = spice_new0(RedWorker, 1);
> - worker->core = worker_core_initializer;
> + worker->core = event_loop_core;
> + worker->core.main_context = g_main_context_new();
>
> record_filename = getenv("SPICE_WORKER_RECORD_FILENAME");
> if (record_filename) {
> @@ -1575,15 +1551,17 @@ RedWorker* red_worker_new(QXLInstance *qxl,
> RedDispatcher *red_dispatcher)
> worker->wakeup_counter = stat_add_counter(worker->stat, "wakeups", TRUE);
> worker->command_counter = stat_add_counter(worker->stat, "commands",
> TRUE);
> #endif
> - for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> - worker->poll_fds[i].fd = -1;
> - }
>
> worker->dispatch_watch =
> worker->core.watch_add(&worker->core,
> dispatcher_get_recv_fd(dispatcher),
> SPICE_WATCH_EVENT_READ, handle_dev_input,
> worker);
> spice_assert(worker->dispatch_watch != NULL);
>
> + GSource *source = g_source_new(&worker_source_funcs,
> sizeof(RedWorkerSource));
> + SPICE_CONTAINEROF(source, RedWorkerSource, source)->worker = worker;
> + g_source_attach(source, worker->core.main_context);
> + g_source_unref(source);
> +
> memslot_info_init(&worker->mem_slots,
> init_info.num_memslots_groups,
> init_info.num_memslots,
> @@ -1611,73 +1589,15 @@ SPICE_GNUC_NORETURN static void *red_worker_main(void
> *arg)
> spice_assert(MAX_PIPE_SIZE > WIDE_CLIENT_ACK_WINDOW &&
> MAX_PIPE_SIZE > NARROW_CLIENT_ACK_WINDOW); //ensure wakeup by ack
> message
>
> - if (!spice_timer_queue_create()) {
> - spice_error("failed to create timer queue");
> - }
> -
> RED_CHANNEL(worker->cursor_channel)->thread_id = pthread_self();
> RED_CHANNEL(worker->display_channel)->thread_id = pthread_self();
>
> - for (;;) {
> - int i, num_events;
> - unsigned int timeout;
> -
> - timeout = spice_timer_queue_get_timeout_ms();
> - worker->event_timeout = MIN(timeout, worker->event_timeout);
> - timeout = display_channel_get_streams_timeout(worker
> ->display_channel);
> - worker->event_timeout = MIN(timeout, worker->event_timeout);
> - num_events = poll(worker->poll_fds, MAX_EVENT_SOURCES, worker
> ->event_timeout);
> - stream_timeout(worker->display_channel);
> - spice_timer_queue_cb();
> -
> - if (worker->display_channel) {
> - /* during migration, in the dest, the display channel can be
> initialized
> - while the global lz data not since migrate data msg hasn't
> been
> - received yet */
> - display_channel_free_glz_drawables_to_free(worker
> ->display_channel);
> - }
> -
> - worker->event_timeout = INF_EVENT_WAIT;
> - if (num_events == -1) {
> - if (errno != EINTR) {
> - spice_error("poll failed, %s", strerror(errno));
> - }
> - }
> -
> - for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> - /* The watch may have been removed by the watch-func from
> - another fd (ie a disconnect through the dispatcher),
> - in this case watch_func is NULL. */
> - if (worker->poll_fds[i].revents && worker->watches[i].watch_func)
> {
> - int events = 0;
> - if (worker->poll_fds[i].revents & POLLIN) {
> - events |= SPICE_WATCH_EVENT_READ;
> - }
> - if (worker->poll_fds[i].revents & POLLOUT) {
> - events |= SPICE_WATCH_EVENT_WRITE;
> - }
> - worker->watches[i].watch_func(worker->poll_fds[i].fd, events,
> - worker
> ->watches[i].watch_func_opaque);
> - }
> - }
> -
> - /* Clear the poll_fd for any removed watches, see the comment in
> - watch_remove for why we don't do this there. */
> - for (i = 0; i < MAX_EVENT_SOURCES; i++) {
> - if (!worker->watches[i].watch_func) {
> - worker->poll_fds[i].fd = -1;
> - }
> - }
> -
> - if (worker->running) {
> - int ring_is_empty;
> - red_process_cursor(worker, &ring_is_empty);
> - red_process_display(worker, &ring_is_empty);
> - }
> - red_push(worker);
> - }
> + GMainLoop *loop = g_main_loop_new(worker->core.main_context, FALSE);
> + g_main_loop_run(loop);
> + g_main_loop_unref(loop);
>
> - spice_warn_if_reached();
> + /* FIXME: free worker, and join threads */
> + exit(0);
> }
>
> bool red_worker_run(RedWorker *worker)
> diff --git a/server/spice_timer_queue.c b/server/spice_timer_queue.c
> deleted file mode 100644
> index 421b090..0000000
> --- a/server/spice_timer_queue.c
> +++ /dev/null
> @@ -1,267 +0,0 @@
> -/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
> -/*
> - Copyright (C) 2013 Red Hat, Inc.
> -
> - This library is free software; you can redistribute it and/or
> - modify it under the terms of the GNU Lesser General Public
> - License as published by the Free Software Foundation; either
> - version 2.1 of the License, or (at your option) any later version.
> -
> - This library is distributed in the hope that it will be useful,
> - but WITHOUT ANY WARRANTY; without even the implied warranty of
> - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> - Lesser General Public License for more details.
> -
> - You should have received a copy of the GNU Lesser General Public
> - License along with this library; if not, see <http://www.gnu.org/licenses/
> >.
> -*/
> -#include <config.h>
> -#include <pthread.h>
> -#include "red-common.h"
> -#include "spice_timer_queue.h"
> -#include "common/ring.h"
> -#include "utils.h"
> -
> -static Ring timer_queue_list;
> -static int queue_count = 0;
> -static pthread_mutex_t queue_list_lock = PTHREAD_MUTEX_INITIALIZER;
> -
> -static void spice_timer_queue_init(void)
> -{
> - ring_init(&timer_queue_list);
> -}
> -
> -struct SpiceTimer {
> - RingItem link;
> - RingItem active_link;
> -
> - SpiceTimerFunc func;
> - void *opaque;
> -
> - SpiceTimerQueue *queue;
> -
> - int is_active;
> - uint32_t ms;
> - uint64_t expiry_time;
> -};
> -
> -struct SpiceTimerQueue {
> - RingItem link;
> - pthread_t thread;
> - Ring timers;
> - Ring active_timers;
> -};
> -
> -static SpiceTimerQueue *spice_timer_queue_find(void)
> -{
> - pthread_t self = pthread_self();
> - RingItem *queue_item;
> -
> - RING_FOREACH(queue_item, &timer_queue_list) {
> - SpiceTimerQueue *queue = SPICE_CONTAINEROF(queue_item,
> SpiceTimerQueue, link);
> -
> - if (pthread_equal(self, queue->thread) != 0) {
> - return queue;
> - }
> - }
> -
> - return NULL;
> -}
> -
> -static SpiceTimerQueue *spice_timer_queue_find_with_lock(void)
> -{
> - SpiceTimerQueue *queue;
> -
> - pthread_mutex_lock(&queue_list_lock);
> - queue = spice_timer_queue_find();
> - pthread_mutex_unlock(&queue_list_lock);
> - return queue;
> -}
> -
> -int spice_timer_queue_create(void)
> -{
> - SpiceTimerQueue *queue;
> -
> - pthread_mutex_lock(&queue_list_lock);
> - if (queue_count == 0) {
> - spice_timer_queue_init();
> - }
> -
> - if (spice_timer_queue_find() != NULL) {
> - spice_printerr("timer queue was already created for the thread");
> - return FALSE;
> - }
> -
> - queue = spice_new0(SpiceTimerQueue, 1);
> - queue->thread = pthread_self();
> - ring_init(&queue->timers);
> - ring_init(&queue->active_timers);
> -
> - ring_add(&timer_queue_list, &queue->link);
> - queue_count++;
> -
> - pthread_mutex_unlock(&queue_list_lock);
> -
> - return TRUE;
> -}
> -
> -void spice_timer_queue_destroy(void)
> -{
> - RingItem *item;
> - SpiceTimerQueue *queue;
> -
> - pthread_mutex_lock(&queue_list_lock);
> - queue = spice_timer_queue_find();
> -
> - spice_assert(queue != NULL);
> -
> - while ((item = ring_get_head(&queue->timers))) {
> - SpiceTimer *timer;
> -
> - timer = SPICE_CONTAINEROF(item, SpiceTimer, link);
> - spice_timer_remove(timer);
> - }
> -
> - ring_remove(&queue->link);
> - free(queue);
> - queue_count--;
> -
> - pthread_mutex_unlock(&queue_list_lock);
> -}
> -
> -SpiceTimer *spice_timer_queue_add(const SpiceCoreInterfaceInternal *iface,
> - SpiceTimerFunc func, void *opaque)
> -{
> - SpiceTimer *timer = spice_new0(SpiceTimer, 1);
> - SpiceTimerQueue *queue = spice_timer_queue_find_with_lock();
> -
> - spice_assert(queue != NULL);
> -
> - ring_item_init(&timer->link);
> - ring_item_init(&timer->active_link);
> -
> - timer->opaque = opaque;
> - timer->func = func;
> - timer->queue = queue;
> -
> - ring_add(&queue->timers, &timer->link);
> -
> - return timer;
> -}
> -
> -static void _spice_timer_set(SpiceTimer *timer, uint32_t ms, uint64_t now)
> -{
> - RingItem *next_item;
> - SpiceTimerQueue *queue;
> -
> - if (timer->is_active) {
> - spice_timer_cancel(timer);
> - }
> -
> - queue = timer->queue;
> - timer->expiry_time = now + ms;
> - timer->ms = ms;
> -
> - RING_FOREACH(next_item, &queue->active_timers) {
> - SpiceTimer *next_timer = SPICE_CONTAINEROF(next_item, SpiceTimer,
> active_link);
> -
> - if (timer->expiry_time <= next_timer->expiry_time) {
> - break;
> - }
> - }
> -
> - if (next_item) {
> - ring_add_before(&timer->active_link, next_item);
> - } else {
> - ring_add_before(&timer->active_link, &queue->active_timers);
> - }
> - timer->is_active = TRUE;
> -}
> -
> -void spice_timer_set(SpiceTimer *timer, uint32_t ms)
> -{
> - spice_assert(pthread_equal(timer->queue->thread, pthread_self()) != 0);
> -
> - _spice_timer_set(timer, ms, spice_get_monotonic_time_ms());
> -}
> -
> -void spice_timer_cancel(SpiceTimer *timer)
> -{
> - spice_assert(pthread_equal(timer->queue->thread, pthread_self()) != 0);
> -
> - if (!ring_item_is_linked(&timer->active_link)) {
> - spice_assert(!timer->is_active);
> - return;
> - }
> -
> - spice_assert(timer->is_active);
> - ring_remove(&timer->active_link);
> - timer->is_active = FALSE;
> -}
> -
> -void spice_timer_remove(SpiceTimer *timer)
> -{
> - spice_assert(timer->queue);
> - spice_assert(ring_item_is_linked(&timer->link));
> - spice_assert(pthread_equal(timer->queue->thread, pthread_self()) != 0);
> -
> - if (timer->is_active) {
> - spice_assert(ring_item_is_linked(&timer->active_link));
> - ring_remove(&timer->active_link);
> - }
> - ring_remove(&timer->link);
> - free(timer);
> -}
> -
> -unsigned int spice_timer_queue_get_timeout_ms(void)
> -{
> - int64_t now_ms;
> - RingItem *head;
> - SpiceTimer *head_timer;
> - SpiceTimerQueue *queue = spice_timer_queue_find_with_lock();
> -
> - spice_assert(queue != NULL);
> -
> - if (ring_is_empty(&queue->active_timers)) {
> - return -1;
> - }
> -
> - head = ring_get_head(&queue->active_timers);
> - head_timer = SPICE_CONTAINEROF(head, SpiceTimer, active_link);
> -
> - now_ms = spice_get_monotonic_time_ms();
> -
> - return MAX(0, ((int64_t)head_timer->expiry_time - now_ms));
> -}
> -
> -
> -void spice_timer_queue_cb(void)
> -{
> - uint64_t now_ms;
> - RingItem *head;
> - SpiceTimerQueue *queue = spice_timer_queue_find_with_lock();
> -
> - spice_assert(queue != NULL);
> -
> - if (ring_is_empty(&queue->active_timers)) {
> - return;
> - }
> -
> - now_ms = spice_get_monotonic_time_ms();
> -
> - while ((head = ring_get_head(&queue->active_timers))) {
> - SpiceTimer *timer = SPICE_CONTAINEROF(head, SpiceTimer, active_link);
> -
> - if (timer->expiry_time > now_ms) {
> - break;
> - } else {
> - /* Remove active timer before calling the timer function.
> - * Timer function could delete the timer making the timer
> - * pointer point to freed data.
> - */
> - spice_timer_cancel(timer);
> - timer->func(timer->opaque);
> - /* timer could now be invalid ! */
> - }
> - }
> -}
> diff --git a/server/spice_timer_queue.h b/server/spice_timer_queue.h
> deleted file mode 100644
> index b17cecf..0000000
> --- a/server/spice_timer_queue.h
> +++ /dev/null
> @@ -1,44 +0,0 @@
> -/*
> - Copyright (C) 2013 Red Hat, Inc.
> -
> - This library is free software; you can redistribute it and/or
> - modify it under the terms of the GNU Lesser General Public
> - License as published by the Free Software Foundation; either
> - version 2.1 of the License, or (at your option) any later version.
> -
> - This library is distributed in the hope that it will be useful,
> - but WITHOUT ANY WARRANTY; without even the implied warranty of
> - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> - Lesser General Public License for more details.
> -
> - You should have received a copy of the GNU Lesser General Public
> - License along with this library; if not, see <http://www.gnu.org/licenses/
> >.
> -*/
> -
> -#ifndef _H_SPICE_TIMER_QUEUE
> -#define _H_SPICE_TIMER_QUEUE
> -
> -#include <stdint.h>
> -#include "spice.h"
> -
> -typedef struct SpiceTimerQueue SpiceTimerQueue;
> -
> -/* create/destroy a timer queue for the current thread.
> - * In order to execute the timers functions, spice_timer_queue_cb should be
> called
> - * periodically, according to spice_timer_queue_get_timeout_ms */
> -int spice_timer_queue_create(void);
> -void spice_timer_queue_destroy(void);
> -
> -SpiceTimer *spice_timer_queue_add(const SpiceCoreInterfaceInternal *iface,
> - SpiceTimerFunc func, void *opaque);
> -void spice_timer_set(SpiceTimer *timer, uint32_t ms);
> -void spice_timer_cancel(SpiceTimer *timer);
> -void spice_timer_remove(SpiceTimer *timer);
> -
> -/* returns the time left till the earliest timer in the queue expires.
> - * returns (unsigned)-1 if there are no active timers */
> -unsigned int spice_timer_queue_get_timeout_ms(void);
> -/* call the timeout callbacks of all the expired timers */
> -void spice_timer_queue_cb(void);
> -
> -#endif
More information about the Spice-devel
mailing list