[PATCH 4/8] client: Add wl_event_queue for multi-thread dispatching

Kristian Høgsberg krh at bitplanet.net
Tue Oct 9 19:38:01 PDT 2012


This introduces wl_event_queue, which is what will make multi-threaded
wayland clients possible and useful.  The driving use case is that of a
GL rendering thread that renders and calls eglSwapBuffer independently of
a "main thread" that owns the wl_display and handles input events and
everything else.  In general, the EGL and GL APIs have a threading model
that requires the wayland client library to be usable from several threads.
Finally, the current callback model gets into trouble even in a single
threaded scenario: if we have to block in eglSwapBuffers, we may end up
doing unrelated callbacks from within EGL.

The wl_event_queue mechanism lets the application (or middleware such as
EGL or toolkits) assign a proxy to an event queue.  Only events from objects
associated with the queue will be put in the queue, and conversely,
events from objects associated with the queue will not be queue up anywhere
else.  The wl_display struct has a built-in event queue, which is considered
the main and default event queue.  New proxies are associated with the
same queue as the object that created them (either the object that a
request with a new-id argument was sent to or the object that sent an
event with a new-id argument).  A proxy can be moved to a different event
queue by calling wl_proxy_set_queue().

A subsystem, such as EGL, will then create its own event queue and associate
the objects it expects to receive events from with that queue.  If EGL
needs to block and wait for a certain event, it can keep dispatching event
from its queue until that events comes in.  This wont call out to unrelated
code with an EGL lock held.  Similarly, we don't risk the main thread
handling an event from an EGL object and then calling into EGL from a
different thread without the lock held.
---
 src/wayland-client.c |  148 ++++++++++++++++++++++++++++++++++++++------------
 src/wayland-client.h |    8 +++
 2 files changed, 122 insertions(+), 34 deletions(-)

diff --git a/src/wayland-client.c b/src/wayland-client.c
index 6a499e0..694fd39 100644
--- a/src/wayland-client.c
+++ b/src/wayland-client.c
@@ -51,6 +51,8 @@ struct wl_global_listener {
 struct wl_proxy {
 	struct wl_object object;
 	struct wl_display *display;
+	struct wl_event_queue *queue;
+	int id_deleted;
 	void *user_data;
 };
 
@@ -61,14 +63,21 @@ struct wl_global {
 	struct wl_list link;
 };
 
+struct wl_event_queue {
+	struct wl_list event_list;
+	pthread_cond_t cond;
+};
+
 struct wl_display {
 	struct wl_proxy proxy;
 	struct wl_connection *connection;
 	int fd;
 	int close_fd;
+	pthread_t display_thread;
 	struct wl_map objects;
 	struct wl_list global_listener_list;
 	struct wl_list global_list;
+	struct wl_event_queue queue;
 	pthread_mutex_t mutex;
 
 	wl_display_global_func_t global_handler;
@@ -77,6 +86,48 @@ struct wl_display {
 
 static int wl_debug = 0;
 
+static void
+wl_event_queue_init(struct wl_event_queue *queue)
+{
+	wl_list_init(&queue->event_list);
+	pthread_cond_init(&queue->cond, NULL);
+}
+
+static void
+wl_event_queue_release(struct wl_event_queue *queue)
+{
+	struct wl_closure *closure;
+
+	while (!wl_list_empty(&queue->event_list)) {
+		closure = container_of(queue->event_list.next,
+				       struct wl_closure, link);
+		wl_list_remove(&closure->link);
+		wl_closure_destroy(closure);
+	}
+	pthread_cond_destroy(&queue->cond);
+}
+
+WL_EXPORT void
+wl_event_queue_destroy(struct wl_event_queue *queue)
+{
+	wl_event_queue_release(queue);
+	free(queue);
+}
+
+WL_EXPORT struct wl_event_queue *
+wl_display_create_queue(struct wl_display *display)
+{
+	struct wl_event_queue *queue;
+
+	queue = malloc(sizeof *queue);
+	if (queue == NULL)
+		return NULL;
+
+	wl_event_queue_init(queue);
+
+	return queue;
+}
+
 WL_EXPORT struct wl_global_listener *
 wl_display_add_global_listener(struct wl_display *display,
 			       wl_display_global_func_t handler, void *data)
@@ -120,6 +171,8 @@ wl_proxy_create(struct wl_proxy *factory, const struct wl_interface *interface)
 	proxy->object.interface = interface;
 	proxy->object.implementation = NULL;
 	proxy->display = display;
+	proxy->queue = factory->queue;
+	proxy->id_deleted = 0;
 
 	pthread_mutex_lock(&display->mutex);
 	proxy->object.id = wl_map_insert_new(&display->objects,
@@ -144,6 +197,8 @@ wl_proxy_create_for_id(struct wl_proxy *factory,
 	proxy->object.implementation = NULL;
 	proxy->object.id = id;
 	proxy->display = display;
+	proxy->queue = factory->queue;
+	proxy->id_deleted = 0;
 
 	pthread_mutex_lock(&display->mutex);
 	wl_map_insert_at(&display->objects, id, proxy);
@@ -157,7 +212,9 @@ wl_proxy_destroy(struct wl_proxy *proxy)
 {
 	pthread_mutex_lock(&proxy->display->mutex);
 
-	if (proxy->object.id < WL_SERVER_ID_START)
+	if (proxy->id_deleted)
+		wl_map_remove(&proxy->display->objects, proxy->object.id);
+	else if (proxy->object.id < WL_SERVER_ID_START)
 		wl_map_insert_at(&proxy->display->objects,
 				 proxy->object.id, WL_ZOMBIE_OBJECT);
 	else
@@ -290,7 +347,7 @@ display_handle_delete_id(void *data, struct wl_display *display, uint32_t id)
 
 	proxy = wl_map_lookup(&display->objects, id);
 	if (proxy != WL_ZOMBIE_OBJECT)
-		fprintf(stderr, "server sent delete_id for live object\n");
+		proxy->id_deleted = 1;
 	else
 		wl_map_remove(&display->objects, id);
 
@@ -380,6 +437,7 @@ wl_display_connect_to_fd(int fd)
 	wl_map_init(&display->objects);
 	wl_list_init(&display->global_listener_list);
 	wl_list_init(&display->global_list);
+	wl_event_queue_init(&display->queue);
 
 	wl_map_insert_new(&display->objects, WL_MAP_CLIENT_SIDE, NULL);
 
@@ -390,6 +448,7 @@ wl_display_connect_to_fd(int fd)
 	display->proxy.display = display;
 	display->proxy.object.implementation = (void(**)(void)) &display_listener;
 	display->proxy.user_data = display;
+	display->proxy.queue = &display->queue;
 
 	display->connection = wl_connection_create(display->fd);
 	if (display->connection == NULL) {
@@ -440,6 +499,7 @@ wl_display_disconnect(struct wl_display *display)
 
 	wl_connection_destroy(display->connection);
 	wl_map_release(&display->objects);
+	wl_event_queue_release(&display->queue);
 	wl_list_for_each_safe(global, gnext,
 			      &display->global_list, link)
 		wl_global_destroy(global);
@@ -486,7 +546,7 @@ wl_display_roundtrip(struct wl_display *display)
 }
 
 static int
-create_proxies(struct wl_display *display, struct wl_closure *closure)
+create_proxies(struct wl_proxy *sender, struct wl_closure *closure)
 {
 	struct wl_proxy *proxy;
 	const char *signature;
@@ -506,7 +566,7 @@ create_proxies(struct wl_display *display, struct wl_closure *closure)
 				*(void **) closure->args[i] = NULL;
 				break;
 			}
-			proxy = wl_proxy_create_for_id(&display->proxy, id,
+			proxy = wl_proxy_create_for_id(sender, id,
 						       closure->message->types[i - 2]);
 			if (proxy == NULL)
 				return -1;
@@ -521,7 +581,7 @@ create_proxies(struct wl_display *display, struct wl_closure *closure)
 }
 
 static int
-queue_event(struct wl_display *display, int len, struct wl_list *list)
+queue_event(struct wl_display *display, int len)
 {
 	uint32_t p[2], id;
 	int opcode, size;
@@ -552,45 +612,50 @@ queue_event(struct wl_display *display, int len, struct wl_list *list)
 	if (wl_debug)
 		wl_closure_print(closure, &proxy->object, false);
 
-	if (closure == NULL || create_proxies(display, closure) < 0) {
+	if (closure == NULL || create_proxies(proxy, closure) < 0) {
 		fprintf(stderr, "Error demarshalling event\n");
 		abort();
 	}
 
-	wl_list_insert(list->prev, &closure->link);
+	if (wl_list_empty(&proxy->queue->event_list))
+		pthread_cond_signal(&proxy->queue->cond);
+	wl_list_insert(proxy->queue->event_list.prev, &closure->link);
 
 	return size;
 }
 
 static void
-dispatch_event(struct wl_display *display, struct wl_closure *closure)
+dispatch_event(struct wl_display *display, struct wl_event_queue *queue)
 {
+	struct wl_closure *closure;
 	struct wl_proxy *proxy;
 	uint32_t id;
 	int opcode;
 
+	closure = container_of(queue->event_list.next,
+			       struct wl_closure, link);
 	wl_list_remove(&closure->link);
 	id = closure->buffer[0];
 	opcode = closure->buffer[1] & 0xffff;
 
-	pthread_mutex_lock(&display->mutex);
 	proxy = wl_map_lookup(&display->objects, id);
-	pthread_mutex_unlock(&display->mutex);
 
-	if (proxy == WL_ZOMBIE_OBJECT)
-		goto skip;
+	pthread_mutex_unlock(&display->mutex);
 
-	wl_closure_invoke(closure, &proxy->object,
-			  proxy->object.implementation[opcode],
-			  proxy->user_data);
- skip:
+	if (proxy != WL_ZOMBIE_OBJECT)
+		wl_closure_invoke(closure, &proxy->object,
+				  proxy->object.implementation[opcode],
+				  proxy->user_data);
 	wl_closure_destroy(closure);
+
+	pthread_mutex_lock(&display->mutex);
 }
 
+
 WL_EXPORT int
-wl_display_dispatch(struct wl_display *display)
+wl_display_dispatch_queue(struct wl_display *display,
+			  struct wl_event_queue *queue)
 {
-	struct wl_list list;
 	struct wl_closure *closure;
 	int len, size;
 
@@ -599,27 +664,35 @@ wl_display_dispatch(struct wl_display *display)
 	/* FIXME: Handle flush errors, EAGAIN... */
 	wl_connection_flush(display->connection);
 
-	/* FIXME: Shouldn't always read here... */
-	len = wl_connection_read(display->connection);
-	if (len == -1)
-		return -1;
-
-	wl_list_init(&list);
-	while (len >= 8) {
-		size = queue_event(display, len, &list);
-		if (size == 0)
-			break;
-		len -= size;
+	if (wl_list_empty(&queue->event_list) &&
+	    pthread_equal(display->display_thread, pthread_self())) {
+		len = wl_connection_read(display->connection);
+		if (len == -1)
+			return -1;
+		while (len >= 8) {
+			size = queue_event(display, len);
+			if (size == 0)
+				break;
+			len -= size;
+		}
+	} else if (wl_list_empty(&queue->event_list)) {
+		pthread_cond_wait(&queue->cond, &display->mutex);
 	}
 
+	while (!wl_list_empty(&queue->event_list))
+		dispatch_event(display, queue);
+
 	pthread_mutex_unlock(&display->mutex);
 
-	while (!wl_list_empty(&list)) {
-		closure = container_of(list.next, struct wl_closure, link);
-		dispatch_event(display, closure);
-	}
+	return 0;
+}
+
+WL_EXPORT int
+wl_display_dispatch(struct wl_display *display)
+{
+	display->display_thread = pthread_self();
 
-	return len;
+	return wl_display_dispatch_queue(display, &display->queue);
 }
 
 WL_EXPORT int
@@ -685,6 +758,13 @@ wl_proxy_get_id(struct wl_proxy *proxy)
 	return proxy->object.id;
 }
 
+
+WL_EXPORT void
+wl_proxy_set_queue(struct wl_proxy *proxy, struct wl_event_queue *queue)
+{
+	proxy->queue = queue;
+}
+
 WL_EXPORT void
 wl_log_set_handler_client(wl_log_func_t handler)
 {
diff --git a/src/wayland-client.h b/src/wayland-client.h
index 5fcb86d..fbbee09 100644
--- a/src/wayland-client.h
+++ b/src/wayland-client.h
@@ -32,6 +32,9 @@ extern "C" {
 
 struct wl_proxy;
 struct wl_display;
+struct wl_event_queue;
+
+void wl_event_queue_destroy(struct wl_event_queue *queue);
 
 void wl_proxy_marshal(struct wl_proxy *p, uint32_t opcode, ...);
 struct wl_proxy *wl_proxy_create(struct wl_proxy *factory,
@@ -46,6 +49,7 @@ int wl_proxy_add_listener(struct wl_proxy *proxy,
 void wl_proxy_set_user_data(struct wl_proxy *proxy, void *user_data);
 void *wl_proxy_get_user_data(struct wl_proxy *proxy);
 uint32_t wl_proxy_get_id(struct wl_proxy *proxy);
+void wl_proxy_set_queue(struct wl_proxy *proxy, struct wl_event_queue *queue);
 
 void *wl_display_bind(struct wl_display *display,
 		      uint32_t name, const struct wl_interface *interface);
@@ -74,8 +78,12 @@ struct wl_display *wl_display_connect_to_fd(int fd);
 void wl_display_disconnect(struct wl_display *display);
 int wl_display_get_fd(struct wl_display *display);
 int wl_display_dispatch(struct wl_display *display);
+int wl_display_dispatch_queue(struct wl_display *display,
+			      struct wl_event_queue *queue);
+
 int wl_display_flush(struct wl_display *display);
 void wl_display_roundtrip(struct wl_display *display);
+struct wl_event_queue *wl_display_create_queue(struct wl_display *display);
 
 struct wl_global_listener;
 typedef void (*wl_display_global_func_t)(struct wl_display *display,
-- 
1.7.10.2



More information about the wayland-devel mailing list