[PATCH 1/8] Change filedescriptor API to be thread safe

Kristian Høgsberg krh at bitplanet.net
Tue Oct 9 19:37:58 PDT 2012


The update callback for the file descriptors was always a bit awkward and
un-intuitive.  The idea was that whenever the protocol code needed to
write data to the fd it would call the 'update' function.  This function
would adjust the mainloop so that it polls for POLLOUT on the fd so we
can eventually flush the data to the socket.

The problem is that in multi-threaded applications, any thread can issue
a request, which writes data to the output buffer and thus triggers the
update callback.  Thus, we'll be calling out with the display mutex
held and may call from any thread.

The solution is to eliminate the udpate callback and just require that
the application or server flushes all connection buffers before blocking.
This turns out to be a simpler API, although we now require clients to
deal with EAGAIN and non-blocking writes.  It also saves a few syscalls,
since the socket will be writable most of the time and most writes will
complete, so we avoid changing epoll to poll for POLLOUT, then write and
then change it back for each write.
---
 src/connection.c         |  109 +++++++++++++++++++---------------------------
 src/event-loop.c         |    4 ++
 src/wayland-client.c     |   62 ++++++--------------------
 src/wayland-client.h     |   10 ++---
 src/wayland-private.h    |   15 +++----
 src/wayland-server.c     |   75 ++++++++++++++++++-------------
 src/wayland-server.h     |    5 ++-
 tests/connection-test.c  |   82 +++++++++-------------------------
 tests/os-wrappers-test.c |   31 +++----------
 9 files changed, 143 insertions(+), 250 deletions(-)

diff --git a/src/connection.c b/src/connection.c
index dbe0fa9..54de4f1 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -56,9 +56,7 @@ struct wl_connection {
 	struct wl_buffer in, out;
 	struct wl_buffer fds_in, fds_out;
 	int fd;
-	void *data;
-	wl_connection_update_func_t update;
-	int write_signalled;
+	int want_flush;
 };
 
 union wl_value {
@@ -156,9 +154,7 @@ wl_buffer_size(struct wl_buffer *b)
 }
 
 struct wl_connection *
-wl_connection_create(int fd,
-		     wl_connection_update_func_t update,
-		     void *data)
+wl_connection_create(int fd)
 {
 	struct wl_connection *connection;
 
@@ -167,12 +163,6 @@ wl_connection_create(int fd,
 		return NULL;
 	memset(connection, 0, sizeof *connection);
 	connection->fd = fd;
-	connection->update = update;
-	connection->data = data;
-
-	connection->update(connection,
-			   WL_CONNECTION_READABLE,
-			   connection->data);
 
 	return connection;
 }
@@ -249,14 +239,19 @@ decode_cmsg(struct wl_buffer *buffer, struct msghdr *msg)
 }
 
 int
-wl_connection_data(struct wl_connection *connection, uint32_t mask)
+wl_connection_flush(struct wl_connection *connection)
 {
 	struct iovec iov[2];
 	struct msghdr msg;
 	char cmsg[CLEN];
-	int len, count, clen;
+	int len = 0, count, clen;
+	uint32_t tail;
+
+	if (!connection->want_flush)
+		return 0;
 
-	if (mask & WL_CONNECTION_WRITABLE) {
+	tail = connection->out.tail;
+	while (connection->out.head - connection->out.tail > 0) {
 		wl_buffer_get_iov(&connection->out, iov, &count);
 
 		build_cmsg(&connection->fds_out, cmsg, &clen);
@@ -272,58 +267,49 @@ wl_connection_data(struct wl_connection *connection, uint32_t mask)
 		do {
 			len = sendmsg(connection->fd, &msg,
 				      MSG_NOSIGNAL | MSG_DONTWAIT);
-		} while (len < 0 && errno == EINTR);
+		} while (len == -1 && errno == EINTR);
 
-		if (len == -1 && errno == EPIPE) {
+		if (len == -1)
 			return -1;
-		} else if (len < 0) {
-			fprintf(stderr,
-				"write error for connection %p, fd %d: %m\n",
-				connection, connection->fd);
-			return -1;
-		}
 
 		close_fds(&connection->fds_out);
 
 		connection->out.tail += len;
-		if (connection->out.tail == connection->out.head &&
-		    connection->write_signalled) {
-			connection->update(connection,
-					   WL_CONNECTION_READABLE,
-					   connection->data);
-			connection->write_signalled = 0;
-		}
 	}
 
-	if (mask & WL_CONNECTION_READABLE) {
-		wl_buffer_put_iov(&connection->in, iov, &count);
+	connection->want_flush = 0;
 
-		msg.msg_name = NULL;
-		msg.msg_namelen = 0;
-		msg.msg_iov = iov;
-		msg.msg_iovlen = count;
-		msg.msg_control = cmsg;
-		msg.msg_controllen = sizeof cmsg;
-		msg.msg_flags = 0;
+	return connection->out.head - tail;
+}
 
-		do {
-			len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
-		} while (len < 0 && errno == EINTR);
+int
+wl_connection_read(struct wl_connection *connection)
+{
+	struct iovec iov[2];
+	struct msghdr msg;
+	char cmsg[CLEN];
+	int len, count;
 
-		if (len < 0) {
-			fprintf(stderr,
-				"read error from connection %p: %m (%d)\n",
-				connection, errno);
-			return -1;
-		} else if (len == 0) {
-			/* FIXME: Handle this better? */
-			return -1;
-		}
+	wl_buffer_put_iov(&connection->in, iov, &count);
+
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_iov = iov;
+	msg.msg_iovlen = count;
+	msg.msg_control = cmsg;
+	msg.msg_controllen = sizeof cmsg;
+	msg.msg_flags = 0;
+
+	do {
+		len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
+	} while (len < 0 && errno == EINTR);
 
-		decode_cmsg(&connection->fds_in, &msg);
+	if (len <= 0)
+		return len;
 
-		connection->in.head += len;
-	}	
+	decode_cmsg(&connection->fds_in, &msg);
+
+	connection->in.head += len;
 
 	return connection->in.head - connection->in.tail;
 }
@@ -334,18 +320,11 @@ wl_connection_write(struct wl_connection *connection,
 {
 	if (connection->out.head - connection->out.tail +
 	    count > ARRAY_LENGTH(connection->out.data))
-		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
+		if (wl_connection_flush(connection))
 			return -1;
 
 	wl_buffer_put(&connection->out, data, count);
-
-	if (!connection->write_signalled) {
-		connection->update(connection,
-				   WL_CONNECTION_READABLE |
-				   WL_CONNECTION_WRITABLE,
-				   connection->data);
-		connection->write_signalled = 1;
-	}
+	connection->want_flush = 1;
 
 	return 0;
 }
@@ -356,7 +335,7 @@ wl_connection_queue(struct wl_connection *connection,
 {
 	if (connection->out.head - connection->out.tail +
 	    count > ARRAY_LENGTH(connection->out.data))
-		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
+		if (wl_connection_flush(connection))
 			return -1;
 
 	wl_buffer_put(&connection->out, data, count);
@@ -395,7 +374,7 @@ static int
 wl_connection_put_fd(struct wl_connection *connection, int32_t fd)
 {
 	if (wl_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd)
-		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
+		if (wl_connection_flush(connection))
 			return -1;
 
 	wl_buffer_put(&connection->fds_out, &fd, sizeof fd);
diff --git a/src/event-loop.c b/src/event-loop.c
index 9339226..e383300 100644
--- a/src/event-loop.c
+++ b/src/event-loop.c
@@ -75,6 +75,10 @@ wl_event_source_fd_dispatch(struct wl_event_source *source,
 		mask |= WL_EVENT_READABLE;
 	if (ep->events & EPOLLOUT)
 		mask |= WL_EVENT_WRITABLE;
+	if (ep->events & EPOLLHUP)
+		mask |= WL_EVENT_HANGUP;
+	if (ep->events & EPOLLERR)
+		mask |= WL_EVENT_ERROR;
 
 	return fd_source->func(fd_source->fd, mask, source->data);
 }
diff --git a/src/wayland-client.c b/src/wayland-client.c
index 9cf144f..9283985 100644
--- a/src/wayland-client.c
+++ b/src/wayland-client.c
@@ -1,5 +1,6 @@
 /*
- * Copyright © 2008 Kristian Høgsberg
+ * Copyright © 2008-2012 Kristian Høgsberg
+ * Copyright © 2010-2012 Intel Corporation
  *
  * Permission to use, copy, modify, distribute, and sell this software and its
  * documentation for any purpose is hereby granted without fee, provided that
@@ -64,34 +65,16 @@ struct wl_display {
 	struct wl_connection *connection;
 	int fd;
 	int close_fd;
-	uint32_t mask;
 	struct wl_map objects;
 	struct wl_list global_listener_list;
 	struct wl_list global_list;
 
-	wl_display_update_func_t update;
-	void *update_data;
-
 	wl_display_global_func_t global_handler;
 	void *global_handler_data;
 };
 
 static int wl_debug = 0;
 
-static int
-connection_update(struct wl_connection *connection,
-		  uint32_t mask, void *data)
-{
-	struct wl_display *display = data;
-
-	display->mask = mask;
-	if (display->update)
-		return display->update(display->mask,
-				       display->update_data);
-
-	return 0;
-}
-
 WL_EXPORT struct wl_global_listener *
 wl_display_add_global_listener(struct wl_display *display,
 			       wl_display_global_func_t handler, void *data)
@@ -387,8 +370,7 @@ wl_display_connect_to_fd(int fd)
 	display->proxy.object.implementation = (void(**)(void)) &display_listener;
 	display->proxy.user_data = display;
 
-	display->connection = wl_connection_create(display->fd,
-						   connection_update, display);
+	display->connection = wl_connection_create(display->fd);
 	if (display->connection == NULL) {
 		wl_map_release(&display->objects);
 		close(display->fd);
@@ -451,16 +433,8 @@ wl_display_disconnect(struct wl_display *display)
 }
 
 WL_EXPORT int
-wl_display_get_fd(struct wl_display *display,
-		  wl_display_update_func_t update, void *data)
+wl_display_get_fd(struct wl_display *display)
 {
-	display->update = update;
-	display->update_data = data;
-
-	if (display->update)
-		display->update(display->mask,
-		                display->update_data);
-
 	return display->fd;
 }
 
@@ -486,9 +460,8 @@ wl_display_roundtrip(struct wl_display *display)
 	done = 0;
 	callback = wl_display_sync(display);
 	wl_callback_add_listener(callback, &sync_listener, &done);
-	wl_display_flush(display);
 	while (!done)
-		wl_display_iterate(display, WL_DISPLAY_READABLE);
+		wl_display_dispatch(display);
 }
 
 static int
@@ -563,20 +536,17 @@ handle_event(struct wl_display *display,
 	wl_closure_destroy(closure);
 }
 
-WL_EXPORT void
-wl_display_iterate(struct wl_display *display, uint32_t mask)
+WL_EXPORT int
+wl_display_dispatch(struct wl_display *display)
 {
 	uint32_t p[2], object;
 	int len, opcode, size;
 
-	mask &= display->mask;
-	if (mask == 0) {
-		fprintf(stderr,
-			"wl_display_iterate called with unsolicited flags\n");
-		return;
-	}
+	/* FIXME: Handle flush errors, EAGAIN... */
+	wl_display_flush(display);
 
-	len = wl_connection_data(display->connection, mask);
+	/* FIXME: Shouldn't always read here... */
+	len = wl_connection_read(display->connection);
 
 	while (len > 0) {
 		if ((size_t) len < sizeof p)
@@ -593,17 +563,13 @@ wl_display_iterate(struct wl_display *display, uint32_t mask)
 		len -= size;
 	}
 
-	if (len < 0) {
-		fprintf(stderr, "read error: %m\n");
-		exit(EXIT_FAILURE);
-	}
+	return len;
 }
 
-WL_EXPORT void
+WL_EXPORT int
 wl_display_flush(struct wl_display *display)
 {
-	while (display->mask & WL_DISPLAY_WRITABLE)
-		wl_display_iterate (display, WL_DISPLAY_WRITABLE);
+	return wl_connection_flush(display->connection);
 }
 
 WL_EXPORT void *
diff --git a/src/wayland-client.h b/src/wayland-client.h
index 5cec28b..5fcb86d 100644
--- a/src/wayland-client.h
+++ b/src/wayland-client.h
@@ -66,19 +66,15 @@ struct wl_callback *wl_display_sync(struct wl_display *display);
 
 #include "wayland-client-protocol.h"
 
-#define WL_DISPLAY_READABLE 0x01
-#define WL_DISPLAY_WRITABLE 0x02
-
 typedef int (*wl_display_update_func_t)(uint32_t mask, void *data);
 typedef void (*wl_callback_func_t)(void *data, uint32_t time);
 
 struct wl_display *wl_display_connect(const char *name);
 struct wl_display *wl_display_connect_to_fd(int fd);
 void wl_display_disconnect(struct wl_display *display);
-int wl_display_get_fd(struct wl_display *display,
-		      wl_display_update_func_t update, void *data);
-void wl_display_iterate(struct wl_display *display, uint32_t mask);
-void wl_display_flush(struct wl_display *display);
+int wl_display_get_fd(struct wl_display *display);
+int wl_display_dispatch(struct wl_display *display);
+int wl_display_flush(struct wl_display *display);
 void wl_display_roundtrip(struct wl_display *display);
 
 struct wl_global_listener;
diff --git a/src/wayland-private.h b/src/wayland-private.h
index f9fcc96..15e4bd0 100644
--- a/src/wayland-private.h
+++ b/src/wayland-private.h
@@ -54,19 +54,14 @@ void wl_map_for_each(struct wl_map *map, wl_iterator_func_t func, void *data);
 struct wl_connection;
 struct wl_closure;
 
-#define WL_CONNECTION_READABLE 0x01
-#define WL_CONNECTION_WRITABLE 0x02
-
-typedef int (*wl_connection_update_func_t)(struct wl_connection *connection,
-					   uint32_t mask, void *data);
-
-struct wl_connection *wl_connection_create(int fd,
-					   wl_connection_update_func_t update,
-					   void *data);
+struct wl_connection *wl_connection_create(int fd);
 void wl_connection_destroy(struct wl_connection *connection);
 void wl_connection_copy(struct wl_connection *connection, void *data, size_t size);
 void wl_connection_consume(struct wl_connection *connection, size_t size);
-int wl_connection_data(struct wl_connection *connection, uint32_t mask);
+
+int wl_connection_flush(struct wl_connection *connection);
+int wl_connection_read(struct wl_connection *connection);
+
 int wl_connection_write(struct wl_connection *connection, const void *data, size_t count);
 int wl_connection_queue(struct wl_connection *connection,
 			const void *data, size_t count);
diff --git a/src/wayland-server.c b/src/wayland-server.c
index 416cec1..41b515b 100644
--- a/src/wayland-server.c
+++ b/src/wayland-server.c
@@ -205,8 +205,6 @@ deref_new_objects(struct wl_closure *closure)
 	}
 }
 
-
-
 static int
 wl_client_connection_data(int fd, uint32_t mask, void *data)
 {
@@ -218,20 +216,33 @@ wl_client_connection_data(int fd, uint32_t mask, void *data)
 	const struct wl_message *message;
 	uint32_t p[2];
 	int opcode, size;
-	uint32_t cmask = 0;
 	int len;
 
-	if (mask & WL_EVENT_READABLE)
-		cmask |= WL_CONNECTION_READABLE;
-	if (mask & WL_EVENT_WRITABLE)
-		cmask |= WL_CONNECTION_WRITABLE;
-
-	len = wl_connection_data(connection, cmask);
-	if (len < 0) {
+	if (mask & (WL_EVENT_ERROR | WL_EVENT_HANGUP)) {
 		wl_client_destroy(client);
 		return 1;
 	}
 
+	if (mask & WL_EVENT_WRITABLE) {
+		len = wl_connection_flush(connection);
+		if (len < 0 && errno != EAGAIN) {
+			wl_client_destroy(client);
+			return 1;
+		} else if (len >= 0) {
+			wl_event_source_fd_update(client->source,
+						  WL_EVENT_READABLE);
+		}
+	}
+
+	len = 0;
+	if (mask & WL_EVENT_READABLE) {
+		len = wl_connection_read(connection);
+		if (len < 0 && errno != EAGAIN) {
+			wl_client_destroy(client);
+			return 1;
+		}
+	}
+
 	while ((size_t) len >= sizeof p) {
 		wl_connection_copy(connection, p, sizeof p);
 		opcode = p[1] & 0xffff;
@@ -296,27 +307,10 @@ wl_client_connection_data(int fd, uint32_t mask, void *data)
 	return 1;
 }
 
-static int
-wl_client_connection_update(struct wl_connection *connection,
-			    uint32_t mask, void *data)
-{
-	struct wl_client *client = data;
-	uint32_t emask = 0;
-
-	client->mask = mask;
-	if (mask & WL_CONNECTION_READABLE)
-		emask |= WL_EVENT_READABLE;
-	if (mask & WL_CONNECTION_WRITABLE)
-		emask |= WL_EVENT_WRITABLE;
-
-	return wl_event_source_fd_update(client->source, emask);
-}
-
 WL_EXPORT void
 wl_client_flush(struct wl_client *client)
 {
-	if (client->mask & WL_CONNECTION_WRITABLE)
-		wl_connection_data(client->connection, WL_CONNECTION_WRITABLE);
+	wl_connection_flush(client->connection);
 }
 
 WL_EXPORT struct wl_display *
@@ -352,8 +346,7 @@ wl_client_create(struct wl_display *display, int fd)
 		return NULL;
 	}
 
-	client->connection =
-		wl_connection_create(fd, wl_client_connection_update, client);
+	client->connection = wl_connection_create(fd);
 	if (client->connection == NULL) {
 		free(client);
 		return NULL;
@@ -1103,8 +1096,28 @@ wl_display_run(struct wl_display *display)
 {
 	display->run = 1;
 
-	while (display->run)
+	while (display->run) {
+		wl_display_flush_clients(display);
 		wl_event_loop_dispatch(display->loop, -1);
+	}
+}
+
+WL_EXPORT void
+wl_display_flush_clients(struct wl_display *display)
+{
+	struct wl_client *client, *next;
+	int ret;
+
+	wl_list_for_each_safe(client, next, &display->client_list, link) {
+		ret = wl_connection_flush(client->connection);
+		if (ret < 0 && errno == EAGAIN) {
+			wl_event_source_fd_update(client->source,
+						  WL_EVENT_WRITABLE |
+						  WL_EVENT_READABLE);
+		} else if (ret < 0) {
+			wl_client_destroy(client);
+		}
+	}
 }
 
 static int
diff --git a/src/wayland-server.h b/src/wayland-server.h
index 45cc61c..0a4dcc2 100644
--- a/src/wayland-server.h
+++ b/src/wayland-server.h
@@ -34,7 +34,9 @@ extern "C" {
 
 enum {
 	WL_EVENT_READABLE = 0x01,
-	WL_EVENT_WRITABLE = 0x02
+	WL_EVENT_WRITABLE = 0x02,
+	WL_EVENT_HANGUP   = 0x04,
+	WL_EVENT_ERROR    = 0x08
 };
 
 struct wl_event_loop;
@@ -88,6 +90,7 @@ struct wl_event_loop *wl_display_get_event_loop(struct wl_display *display);
 int wl_display_add_socket(struct wl_display *display, const char *name);
 void wl_display_terminate(struct wl_display *display);
 void wl_display_run(struct wl_display *display);
+void wl_display_flush_clients(struct wl_display *display);
 
 typedef void (*wl_global_bind_func_t)(struct wl_client *client, void *data,
 				      uint32_t version, uint32_t id);
diff --git a/tests/connection-test.c b/tests/connection-test.c
index b6bcde1..d0113f1 100644
--- a/tests/connection-test.c
+++ b/tests/connection-test.c
@@ -37,26 +37,15 @@
 
 static const char message[] = "Hello, world";
 
-static int
-update_func(struct wl_connection *connection, uint32_t mask, void *data)
-{
-	uint32_t *m = data;
-
-	*m = mask;
-
-	return 0;
-}
-
 static struct wl_connection *
-setup(int *s, uint32_t *mask)
+setup(int *s)
 {
 	struct wl_connection *connection;
 
 	assert(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, s) == 0);
 
-	connection = wl_connection_create(s[0], update_func, mask);
+	connection = wl_connection_create(s[0]);
 	assert(connection);
-	assert(*mask == WL_CONNECTION_READABLE);
 
 	return connection;
 }
@@ -65,9 +54,8 @@ TEST(connection_create)
 {
 	struct wl_connection *connection;
 	int s[2];
-	uint32_t mask;
 
-	connection = setup(s, &mask);
+	connection = setup(s);
 	wl_connection_destroy(connection);
 	close(s[1]);
 }
@@ -76,15 +64,12 @@ TEST(connection_write)
 {
 	struct wl_connection *connection;
 	int s[2];
-	uint32_t mask;
 	char buffer[64];
 
-	connection = setup(s, &mask);
+	connection = setup(s);
 
 	assert(wl_connection_write(connection, message, sizeof message) == 0);
-	assert(mask == (WL_CONNECTION_WRITABLE | WL_CONNECTION_READABLE));
-	assert(wl_connection_data(connection, WL_CONNECTION_WRITABLE) == 0);
-	assert(mask == WL_CONNECTION_READABLE);
+	assert(wl_connection_flush(connection) == sizeof message);
 	assert(read(s[1], buffer, sizeof buffer) == sizeof message);
 	assert(memcmp(message, buffer, sizeof message) == 0);
 
@@ -96,15 +81,12 @@ TEST(connection_data)
 {
 	struct wl_connection *connection;
 	int s[2];
-	uint32_t mask;
 	char buffer[64];
 
-	connection = setup(s, &mask);
+	connection = setup(s);
 
 	assert(write(s[1], message, sizeof message) == sizeof message);
-	assert(mask == WL_CONNECTION_READABLE);
-	assert(wl_connection_data(connection, WL_CONNECTION_READABLE) == 
-	       sizeof message);
+	assert(wl_connection_read(connection) == sizeof message);
 	wl_connection_copy(connection, buffer, sizeof message);
 	assert(memcmp(message, buffer, sizeof message) == 0);
 	wl_connection_consume(connection, sizeof message);
@@ -117,23 +99,19 @@ TEST(connection_queue)
 {
 	struct wl_connection *connection;
 	int s[2];
-	uint32_t mask;
 	char buffer[64];
 
-	connection = setup(s, &mask);
+	connection = setup(s);
 
 	/* Test that wl_connection_queue() puts data in the output
-	 * buffer without asking for WL_CONNECTION_WRITABLE.  Verify
-	 * that the data did get in the buffer by writing another
-	 * message and making sure that we receive the two messages on
-	 * the other fd. */
+	 * buffer without flush it.  Verify that the data did get in
+	 * the buffer by writing another message and making sure that
+	 * we receive the two messages on the other fd. */
 
 	assert(wl_connection_queue(connection, message, sizeof message) == 0);
-	assert(mask == WL_CONNECTION_READABLE);
+	assert(wl_connection_flush(connection) == 0);
 	assert(wl_connection_write(connection, message, sizeof message) == 0);
-	assert(mask == (WL_CONNECTION_WRITABLE | WL_CONNECTION_READABLE));
-	assert(wl_connection_data(connection, WL_CONNECTION_WRITABLE) == 0);
-	assert(mask == WL_CONNECTION_READABLE);
+	assert(wl_connection_flush(connection) == 2 * sizeof message);
 	assert(read(s[1], buffer, sizeof buffer) == 2 * sizeof message);
 	assert(memcmp(message, buffer, sizeof message) == 0);
 	assert(memcmp(message, buffer + sizeof message, sizeof message) == 0);
@@ -146,8 +124,6 @@ struct marshal_data {
 	struct wl_connection *read_connection;
 	struct wl_connection *write_connection;
 	int s[2];
-	uint32_t read_mask;
-	uint32_t write_mask;
 	uint32_t buffer[10];
 	union {
 		uint32_t u;
@@ -162,18 +138,10 @@ setup_marshal_data(struct marshal_data *data)
 {
 	assert(socketpair(AF_UNIX,
 			  SOCK_STREAM | SOCK_CLOEXEC, 0, data->s) == 0);
-
-	data->read_connection =
-		wl_connection_create(data->s[0],
-				     update_func, &data->read_mask);
+	data->read_connection = wl_connection_create(data->s[0]);
 	assert(data->read_connection);
-	assert(data->read_mask == WL_CONNECTION_READABLE);
-
-	data->write_connection =
-		wl_connection_create(data->s[1],
-				     update_func, &data->write_mask);
+	data->write_connection = wl_connection_create(data->s[1]);
 	assert(data->write_connection);
-	assert(data->write_mask == WL_CONNECTION_READABLE);
 }
 
 static void
@@ -199,11 +167,7 @@ marshal(struct marshal_data *data, const char *format, int size, ...)
 	assert(closure);
 	assert(wl_closure_send(closure, data->write_connection) == 0);
 	wl_closure_destroy(closure);
-	assert(data->write_mask ==
-	       (WL_CONNECTION_WRITABLE | WL_CONNECTION_READABLE));
-	assert(wl_connection_data(data->write_connection,
-				  WL_CONNECTION_WRITABLE) == 0);
-	assert(data->write_mask == WL_CONNECTION_READABLE);
+	assert(wl_connection_flush(data->write_connection) == size);
 	assert(read(data->s[0], data->buffer, sizeof data->buffer) == size);
 
 	assert(data->buffer[0] == sender.id);
@@ -367,8 +331,7 @@ demarshal(struct marshal_data *data, const char *format,
 	int size = msg[1];
 
 	assert(write(data->s[1], msg, size) == size);
-	assert(wl_connection_data(data->read_connection,
-				  WL_CONNECTION_READABLE) == size);
+	assert(wl_connection_read(data->read_connection) == size);
 
 	wl_map_init(&objects);
 	object.id = msg[0];
@@ -447,14 +410,9 @@ marshal_demarshal(struct marshal_data *data,
 	assert(closure);
 	assert(wl_closure_send(closure, data->write_connection) == 0);
 	wl_closure_destroy(closure);
-	assert(data->write_mask ==
-	       (WL_CONNECTION_WRITABLE | WL_CONNECTION_READABLE));
-	assert(wl_connection_data(data->write_connection,
-				  WL_CONNECTION_WRITABLE) == 0);
-	assert(data->write_mask == WL_CONNECTION_READABLE);
-
-	assert(wl_connection_data(data->read_connection,
-				  WL_CONNECTION_READABLE) == size);
+	assert(wl_connection_flush(data->write_connection) == size);
+
+	assert(wl_connection_read(data->read_connection) == size);
 
 	wl_map_init(&objects);
 	object.id = msg[0];
diff --git a/tests/os-wrappers-test.c b/tests/os-wrappers-test.c
index b9be2b4..515fd81 100644
--- a/tests/os-wrappers-test.c
+++ b/tests/os-wrappers-test.c
@@ -211,33 +211,17 @@ struct marshal_data {
 	int wrapped_calls;
 };
 
-static int
-update_func(struct wl_connection *connection, uint32_t mask, void *data)
-{
-	uint32_t *m = data;
-
-	*m = mask;
-
-	return 0;
-}
-
 static void
 setup_marshal_data(struct marshal_data *data)
 {
 	assert(socketpair(AF_UNIX,
 			  SOCK_STREAM | SOCK_CLOEXEC, 0, data->s) == 0);
 
-	data->read_connection =
-		wl_connection_create(data->s[0],
-				     update_func, &data->read_mask);
+	data->read_connection = wl_connection_create(data->s[0]);
 	assert(data->read_connection);
-	assert(data->read_mask == WL_CONNECTION_READABLE);
 
-	data->write_connection =
-		wl_connection_create(data->s[1],
-				     update_func, &data->write_mask);
+	data->write_connection = wl_connection_create(data->s[1]);
 	assert(data->write_connection);
-	assert(data->write_mask == WL_CONNECTION_READABLE);
 }
 
 static void
@@ -260,14 +244,9 @@ marshal_demarshal(struct marshal_data *data,
 	assert(closure);
 	assert(wl_closure_send(closure, data->write_connection) == 0);
 	wl_closure_destroy(closure);
-	assert(data->write_mask ==
-	       (WL_CONNECTION_WRITABLE | WL_CONNECTION_READABLE));
-	assert(wl_connection_data(data->write_connection,
-				  WL_CONNECTION_WRITABLE) == 0);
-	assert(data->write_mask == WL_CONNECTION_READABLE);
-
-	assert(wl_connection_data(data->read_connection,
-				  WL_CONNECTION_READABLE) == size);
+	assert(wl_connection_flush(data->write_connection) == size);
+
+	assert(wl_connection_read(data->read_connection) == size);
 
 	wl_map_init(&objects);
 	object.id = msg[0];
-- 
1.7.10.2



More information about the wayland-devel mailing list