[PATCH] Add thread affinity to wayland clients

Jørgen Lind jorgen.lind at nokia.com
Mon Mar 5 03:44:37 PST 2012


This makes it possible to marshal requests from more than 1 thread in
wayland clients. However, its not possible to run wl_display_iterate
from other threads than the thread that made the wl_display.
---
 src/Makefile.am      |    2 +
 src/wayland-client.c |  102 +++++++++++++++++++++++++++++++++++++++++++++++--
 src/wayland-client.h |    5 ++
 3 files changed, 104 insertions(+), 5 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index f356b54..9aab9de 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -24,6 +24,8 @@ libwayland_server_la_SOURCES =			\
 	event-loop.c
 
 libwayland_client_la_LIBADD = $(FFI_LIBS) libwayland-util.la -lrt
+libwayland_client_la_LDFLAGS = -pthread
+libwayland_client_la_CFLAGS = -pthread
 libwayland_client_la_SOURCES =			\
 	wayland-protocol.c			\
 	wayland-client.c
diff --git a/src/wayland-client.c b/src/wayland-client.c
index 498a429..b5a2853 100644
--- a/src/wayland-client.c
+++ b/src/wayland-client.c
@@ -34,6 +34,8 @@
 #include <assert.h>
 #include <fcntl.h>
 #include <sys/poll.h>
+#include <pthread.h>
+#include <sys/eventfd.h>
 
 #include "wayland-util.h"
 #include "wayland-client.h"
@@ -62,6 +64,9 @@ struct wl_display {
 	struct wl_proxy proxy;
 	struct wl_connection *connection;
 	int fd;
+	int write_notification_event_fd;
+	pthread_t thread_id;
+	pthread_mutex_t marshalling_mutex;
 	uint32_t mask;
 	struct wl_map objects;
 	struct wl_list global_listener_list;
@@ -191,7 +196,10 @@ wl_proxy_marshal(struct wl_proxy *proxy, uint32_t opcode, ...)
 {
 	struct wl_closure *closure;
 	va_list ap;
+	uint64_t write_notification_value;
+	ssize_t success;
 
+	pthread_mutex_lock(&proxy->display->marshalling_mutex);
 	va_start(ap, opcode);
 	closure = wl_connection_vmarshal(proxy->display->connection,
 					 &proxy->object, opcode, ap,
@@ -212,6 +220,14 @@ wl_proxy_marshal(struct wl_proxy *proxy, uint32_t opcode, ...)
 		wl_closure_print(closure, &proxy->object, true);
 
 	wl_closure_destroy(closure);
+
+	write_notification_value = 1;
+	success = write(proxy->display->write_notification_event_fd,&write_notification_value,8);
+	if (success < 0) {
+		fprintf(stderr, "Error writing to eventfd\n");
+	}
+
+	pthread_mutex_unlock(&proxy->display->marshalling_mutex);
 }
 
 /* Can't do this, there may be more than one instance of an
@@ -347,6 +363,7 @@ wl_display_connect(const char *name)
 	const char *debug;
 	char *connection, *end;
 	int flags;
+	int success;
 
 	debug = getenv("WAYLAND_DEBUG");
 	if (debug)
@@ -396,6 +413,21 @@ wl_display_connect(const char *name)
 		return NULL;
 	}
 
+	display->write_notification_event_fd = eventfd(0, EFD_CLOEXEC);
+        if (display->write_notification_event_fd < 0) {
+            fprintf(stderr, "Failed to create eventfd\n");
+        }
+
+	display->thread_id = pthread_self();
+	pthread_mutexattr_t mutex_attr;
+	success = pthread_mutexattr_init(&mutex_attr);
+	success += pthread_mutexattr_settype(&mutex_attr,PTHREAD_MUTEX_RECURSIVE);
+	success += pthread_mutex_init(&display->marshalling_mutex, &mutex_attr);
+	success += pthread_mutexattr_destroy(&mutex_attr);
+
+	if (success)
+		fprintf(stderr, "Threading setup was not successfull\n");
+
 	return display;
 }
 
@@ -432,6 +464,18 @@ wl_display_get_fd(struct wl_display *display,
 	return display->fd;
 }
 
+WL_EXPORT int
+wl_display_get_write_notification_fd(struct wl_display *display)
+{
+	return display->write_notification_event_fd;
+}
+
+WL_EXPORT pthread_t
+wl_display_thread(struct wl_display *display)
+{
+	return display->thread_id;
+}
+
 static void
 sync_callback(void *data, struct wl_callback *callback, uint32_t time)
 {
@@ -445,18 +489,46 @@ static const struct wl_callback_listener sync_listener = {
 	sync_callback
 };
 
+static void
+threaded_sync_callback(void *data, struct wl_callback *callback, uint32_t time)
+{
+	fprintf(stderr, "threaded_sync_callback\n");
+	pthread_cond_t *wait_condition = data;
+
+	pthread_cond_broadcast(wait_condition);
+	wl_callback_destroy(callback);
+}
+
+static const struct wl_callback_listener threaded_sync_listener = {
+	threaded_sync_callback
+};
+
 WL_EXPORT void
 wl_display_roundtrip(struct wl_display *display)
 {
 	struct wl_callback *callback;
 	int done;
+	pthread_cond_t wait_cond;
+	pthread_mutex_t wait_mutex;
 
-	done = 0;
 	callback = wl_display_sync(display);
-	wl_callback_add_listener(callback, &sync_listener, &done);
-	wl_display_flush(display);
-	while (!done)
-		wl_display_iterate(display, WL_DISPLAY_READABLE);
+
+	if (wl_display_thread(display) == pthread_self()) {
+		done = 0;
+		wl_callback_add_listener(callback, &sync_listener, &done);
+		wl_display_flush(display);
+		while (!done)
+			wl_display_iterate(display, WL_DISPLAY_READABLE);
+	} else {
+		pthread_mutex_init(&wait_mutex,NULL);
+		pthread_cond_init(&wait_cond, NULL);
+		pthread_mutex_lock(&wait_mutex);
+
+		wl_callback_add_listener(callback, &threaded_sync_listener, &wait_cond);
+		pthread_cond_wait(&wait_cond,&wait_mutex);
+		pthread_cond_destroy(&wait_cond);
+		pthread_mutex_destroy(&wait_mutex);
+	}
 }
 
 static void
@@ -500,7 +572,17 @@ WL_EXPORT void
 wl_display_iterate(struct wl_display *display, uint32_t mask)
 {
 	uint32_t p[2], object, opcode, size;
+	uint64_t write_notification_value;
 	int len;
+	ssize_t success;
+
+	if (pthread_self() != display->thread_id)
+		fprintf(stderr,
+			"wl_display_iterate called from another thread than "
+			"the thead that created wl_display. This will result "
+			"in undefined behavior\n");
+
+	pthread_mutex_lock(&display->marshalling_mutex);
 
 	mask &= display->mask;
 	if (mask == 0) {
@@ -509,6 +591,14 @@ wl_display_iterate(struct wl_display *display, uint32_t mask)
 		return;
 	}
 
+	if (mask & WL_DISPLAY_WRITABLE) {
+		success = read(display->write_notification_event_fd,
+			&write_notification_value, 8);
+		if (success < 0)
+			fprintf(stderr,
+				"wl_display_iterate eventfd error at read\n");
+	}
+
 	len = wl_connection_data(display->connection, mask);
 
 	while (len > 0) {
@@ -526,6 +616,8 @@ wl_display_iterate(struct wl_display *display, uint32_t mask)
 		len -= size;
 	}
 
+	pthread_mutex_unlock(&display->marshalling_mutex);
+
 	if (len < 0) {
 		fprintf(stderr, "read error: %m\n");
 		exit(EXIT_FAILURE);
diff --git a/src/wayland-client.h b/src/wayland-client.h
index b04a7ef..f5c722b 100644
--- a/src/wayland-client.h
+++ b/src/wayland-client.h
@@ -25,6 +25,8 @@
 
 #include "wayland-util.h"
 
+#include <pthread.h>
+
 #ifdef  __cplusplus
 extern "C" {
 #endif
@@ -94,6 +96,9 @@ uint32_t
 wl_display_get_global(struct wl_display *display,
 		      const char *interface, uint32_t version);
 
+int wl_display_get_write_notification_fd(struct wl_display *display);
+pthread_t wl_display_thread(struct wl_display *display);
+
 #ifdef  __cplusplus
 }
 #endif
-- 
1.7.5.4


--45Z9DzgjV8m4Oswq--


More information about the wayland-devel mailing list