[PATCH 1/8] Change filedescriptor API to be thread safe

Pekka Paalanen ppaalanen at gmail.com
Wed Oct 10 01:32:15 PDT 2012


On Tue,  9 Oct 2012 22:37:58 -0400
Kristian Høgsberg <krh at bitplanet.net> wrote:

> The update callback for the file descriptors was always a bit awkward and
> un-intuitive.  The idea was that whenever the protocol code needed to
> write data to the fd it would call the 'update' function.  This function
> would adjust the mainloop so that it polls for POLLOUT on the fd so we
> can eventually flush the data to the socket.
> 
> The problem is that in multi-threaded applications, any thread can issue
> a request, which writes data to the output buffer and thus triggers the
> update callback.  Thus, we'll be calling out with the display mutex
> held and may call from any thread.
> 
> The solution is to eliminate the udpate callback and just require that
> the application or server flushes all connection buffers before blocking.
> This turns out to be a simpler API, although we now require clients to
> deal with EAGAIN and non-blocking writes.  It also saves a few syscalls,
> since the socket will be writable most of the time and most writes will
> complete, so we avoid changing epoll to poll for POLLOUT, then write and
> then change it back for each write.
> ---
>  src/connection.c         |  109 +++++++++++++++++++---------------------------
>  src/event-loop.c         |    4 ++
>  src/wayland-client.c     |   62 ++++++--------------------
>  src/wayland-client.h     |   10 ++---
>  src/wayland-private.h    |   15 +++----
>  src/wayland-server.c     |   75 ++++++++++++++++++-------------
>  src/wayland-server.h     |    5 ++-
>  tests/connection-test.c  |   82 +++++++++-------------------------
>  tests/os-wrappers-test.c |   31 +++----------
>  9 files changed, 143 insertions(+), 250 deletions(-)
> 
> diff --git a/src/connection.c b/src/connection.c
> index dbe0fa9..54de4f1 100644
> --- a/src/connection.c
> +++ b/src/connection.c
> @@ -56,9 +56,7 @@ struct wl_connection {
>  	struct wl_buffer in, out;
>  	struct wl_buffer fds_in, fds_out;
>  	int fd;
> -	void *data;
> -	wl_connection_update_func_t update;
> -	int write_signalled;
> +	int want_flush;
>  };
>  
>  union wl_value {
> @@ -156,9 +154,7 @@ wl_buffer_size(struct wl_buffer *b)
>  }
>  
>  struct wl_connection *
> -wl_connection_create(int fd,
> -		     wl_connection_update_func_t update,
> -		     void *data)
> +wl_connection_create(int fd)
>  {
>  	struct wl_connection *connection;
>  
> @@ -167,12 +163,6 @@ wl_connection_create(int fd,
>  		return NULL;
>  	memset(connection, 0, sizeof *connection);
>  	connection->fd = fd;
> -	connection->update = update;
> -	connection->data = data;
> -
> -	connection->update(connection,
> -			   WL_CONNECTION_READABLE,
> -			   connection->data);
>  
>  	return connection;
>  }
> @@ -249,14 +239,19 @@ decode_cmsg(struct wl_buffer *buffer, struct msghdr *msg)
>  }
>  
>  int
> -wl_connection_data(struct wl_connection *connection, uint32_t mask)
> +wl_connection_flush(struct wl_connection *connection)
>  {
>  	struct iovec iov[2];
>  	struct msghdr msg;
>  	char cmsg[CLEN];
> -	int len, count, clen;
> +	int len = 0, count, clen;
> +	uint32_t tail;
> +
> +	if (!connection->want_flush)
> +		return 0;
>  
> -	if (mask & WL_CONNECTION_WRITABLE) {
> +	tail = connection->out.tail;
> +	while (connection->out.head - connection->out.tail > 0) {
>  		wl_buffer_get_iov(&connection->out, iov, &count);
>  
>  		build_cmsg(&connection->fds_out, cmsg, &clen);
> @@ -272,58 +267,49 @@ wl_connection_data(struct wl_connection *connection, uint32_t mask)
>  		do {
>  			len = sendmsg(connection->fd, &msg,
>  				      MSG_NOSIGNAL | MSG_DONTWAIT);
> -		} while (len < 0 && errno == EINTR);
> +		} while (len == -1 && errno == EINTR);
>  
> -		if (len == -1 && errno == EPIPE) {
> +		if (len == -1)
>  			return -1;
> -		} else if (len < 0) {
> -			fprintf(stderr,
> -				"write error for connection %p, fd %d: %m\n",
> -				connection, connection->fd);
> -			return -1;
> -		}
>  
>  		close_fds(&connection->fds_out);
>  
>  		connection->out.tail += len;
> -		if (connection->out.tail == connection->out.head &&
> -		    connection->write_signalled) {
> -			connection->update(connection,
> -					   WL_CONNECTION_READABLE,
> -					   connection->data);
> -			connection->write_signalled = 0;
> -		}
>  	}
>  
> -	if (mask & WL_CONNECTION_READABLE) {
> -		wl_buffer_put_iov(&connection->in, iov, &count);
> +	connection->want_flush = 0;
>  
> -		msg.msg_name = NULL;
> -		msg.msg_namelen = 0;
> -		msg.msg_iov = iov;
> -		msg.msg_iovlen = count;
> -		msg.msg_control = cmsg;
> -		msg.msg_controllen = sizeof cmsg;
> -		msg.msg_flags = 0;
> +	return connection->out.head - tail;
> +}
>  
> -		do {
> -			len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
> -		} while (len < 0 && errno == EINTR);
> +int
> +wl_connection_read(struct wl_connection *connection)
> +{
> +	struct iovec iov[2];
> +	struct msghdr msg;
> +	char cmsg[CLEN];
> +	int len, count;
>  
> -		if (len < 0) {
> -			fprintf(stderr,
> -				"read error from connection %p: %m (%d)\n",
> -				connection, errno);
> -			return -1;
> -		} else if (len == 0) {
> -			/* FIXME: Handle this better? */
> -			return -1;
> -		}
> +	wl_buffer_put_iov(&connection->in, iov, &count);
> +
> +	msg.msg_name = NULL;
> +	msg.msg_namelen = 0;
> +	msg.msg_iov = iov;
> +	msg.msg_iovlen = count;
> +	msg.msg_control = cmsg;
> +	msg.msg_controllen = sizeof cmsg;
> +	msg.msg_flags = 0;
> +
> +	do {
> +		len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
> +	} while (len < 0 && errno == EINTR);
>  
> -		decode_cmsg(&connection->fds_in, &msg);
> +	if (len <= 0)
> +		return len;
>  
> -		connection->in.head += len;
> -	}	
> +	decode_cmsg(&connection->fds_in, &msg);
> +
> +	connection->in.head += len;
>  
>  	return connection->in.head - connection->in.tail;
>  }
> @@ -334,18 +320,11 @@ wl_connection_write(struct wl_connection *connection,
>  {
>  	if (connection->out.head - connection->out.tail +
>  	    count > ARRAY_LENGTH(connection->out.data))

I think you need want_flush=1 forced here, otherwise the flush is a
no-op.

> -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> +		if (wl_connection_flush(connection))
>  			return -1;

Doesn't wl_connection_flush() return the number of bytes sent, or
negative on error? If so, the above condition seems wrong. Should it
not be

+		if (wl_connection_flush(connection) < 0)
 			return -1;

Should we account for a short send, i.e. it sent some data but not all?
Looks like wl_connection_flush() turns a short send into an error case,
so I guess not.

Is it ok to return an error, if a short send did release enough
buffer space to fit the new write, but did not send all data from the
buffer, due to e.g. EAGAIN?

>  
>  	wl_buffer_put(&connection->out, data, count);
> -
> -	if (!connection->write_signalled) {
> -		connection->update(connection,
> -				   WL_CONNECTION_READABLE |
> -				   WL_CONNECTION_WRITABLE,
> -				   connection->data);
> -		connection->write_signalled = 1;
> -	}
> +	connection->want_flush = 1;
>  
>  	return 0;
>  }
> @@ -356,7 +335,7 @@ wl_connection_queue(struct wl_connection *connection,
>  {
>  	if (connection->out.head - connection->out.tail +
>  	    count > ARRAY_LENGTH(connection->out.data))

want_flush=1 needed here, too.

> -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> +		if (wl_connection_flush(connection))
>  			return -1;
>  
>  	wl_buffer_put(&connection->out, data, count);
> @@ -395,7 +374,7 @@ static int
>  wl_connection_put_fd(struct wl_connection *connection, int32_t fd)
>  {
>  	if (wl_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd)

want_flush=1?

> -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> +		if (wl_connection_flush(connection))
>  			return -1;
>  
>  	wl_buffer_put(&connection->fds_out, &fd, sizeof fd);
...

The rest seemed like it would work well enough, though I didn't read
too carefully.


Thanks,
pq


More information about the wayland-devel mailing list