[PATCH 1/8] Change filedescriptor API to be thread safe

Kristian Høgsberg hoegsberg at gmail.com
Wed Oct 10 17:58:34 PDT 2012


On Wed, Oct 10, 2012 at 11:32:15AM +0300, Pekka Paalanen wrote:
> On Tue,  9 Oct 2012 22:37:58 -0400
> Kristian Høgsberg <krh at bitplanet.net> wrote:
> 
> > The update callback for the file descriptors was always a bit awkward and
> > un-intuitive.  The idea was that whenever the protocol code needed to
> > write data to the fd it would call the 'update' function.  This function
> > would adjust the mainloop so that it polls for POLLOUT on the fd so we
> > can eventually flush the data to the socket.
> > 
> > The problem is that in multi-threaded applications, any thread can issue
> > a request, which writes data to the output buffer and thus triggers the
> > update callback.  Thus, we'll be calling out with the display mutex
> > held and may call from any thread.
> > 
> > The solution is to eliminate the udpate callback and just require that
> > the application or server flushes all connection buffers before blocking.
> > This turns out to be a simpler API, although we now require clients to
> > deal with EAGAIN and non-blocking writes.  It also saves a few syscalls,
> > since the socket will be writable most of the time and most writes will
> > complete, so we avoid changing epoll to poll for POLLOUT, then write and
> > then change it back for each write.
> > ---
> >  src/connection.c         |  109 +++++++++++++++++++---------------------------
> >  src/event-loop.c         |    4 ++
> >  src/wayland-client.c     |   62 ++++++--------------------
> >  src/wayland-client.h     |   10 ++---
> >  src/wayland-private.h    |   15 +++----
> >  src/wayland-server.c     |   75 ++++++++++++++++++-------------
> >  src/wayland-server.h     |    5 ++-
> >  tests/connection-test.c  |   82 +++++++++-------------------------
> >  tests/os-wrappers-test.c |   31 +++----------
> >  9 files changed, 143 insertions(+), 250 deletions(-)
> > 
> > diff --git a/src/connection.c b/src/connection.c
> > index dbe0fa9..54de4f1 100644
> > --- a/src/connection.c
> > +++ b/src/connection.c
> > @@ -56,9 +56,7 @@ struct wl_connection {
> >  	struct wl_buffer in, out;
> >  	struct wl_buffer fds_in, fds_out;
> >  	int fd;
> > -	void *data;
> > -	wl_connection_update_func_t update;
> > -	int write_signalled;
> > +	int want_flush;
> >  };
> >  
> >  union wl_value {
> > @@ -156,9 +154,7 @@ wl_buffer_size(struct wl_buffer *b)
> >  }
> >  
> >  struct wl_connection *
> > -wl_connection_create(int fd,
> > -		     wl_connection_update_func_t update,
> > -		     void *data)
> > +wl_connection_create(int fd)
> >  {
> >  	struct wl_connection *connection;
> >  
> > @@ -167,12 +163,6 @@ wl_connection_create(int fd,
> >  		return NULL;
> >  	memset(connection, 0, sizeof *connection);
> >  	connection->fd = fd;
> > -	connection->update = update;
> > -	connection->data = data;
> > -
> > -	connection->update(connection,
> > -			   WL_CONNECTION_READABLE,
> > -			   connection->data);
> >  
> >  	return connection;
> >  }
> > @@ -249,14 +239,19 @@ decode_cmsg(struct wl_buffer *buffer, struct msghdr *msg)
> >  }
> >  
> >  int
> > -wl_connection_data(struct wl_connection *connection, uint32_t mask)
> > +wl_connection_flush(struct wl_connection *connection)
> >  {
> >  	struct iovec iov[2];
> >  	struct msghdr msg;
> >  	char cmsg[CLEN];
> > -	int len, count, clen;
> > +	int len = 0, count, clen;
> > +	uint32_t tail;
> > +
> > +	if (!connection->want_flush)
> > +		return 0;
> >  
> > -	if (mask & WL_CONNECTION_WRITABLE) {
> > +	tail = connection->out.tail;
> > +	while (connection->out.head - connection->out.tail > 0) {
> >  		wl_buffer_get_iov(&connection->out, iov, &count);
> >  
> >  		build_cmsg(&connection->fds_out, cmsg, &clen);
> > @@ -272,58 +267,49 @@ wl_connection_data(struct wl_connection *connection, uint32_t mask)
> >  		do {
> >  			len = sendmsg(connection->fd, &msg,
> >  				      MSG_NOSIGNAL | MSG_DONTWAIT);
> > -		} while (len < 0 && errno == EINTR);
> > +		} while (len == -1 && errno == EINTR);
> >  
> > -		if (len == -1 && errno == EPIPE) {
> > +		if (len == -1)
> >  			return -1;
> > -		} else if (len < 0) {
> > -			fprintf(stderr,
> > -				"write error for connection %p, fd %d: %m\n",
> > -				connection, connection->fd);
> > -			return -1;
> > -		}
> >  
> >  		close_fds(&connection->fds_out);
> >  
> >  		connection->out.tail += len;
> > -		if (connection->out.tail == connection->out.head &&
> > -		    connection->write_signalled) {
> > -			connection->update(connection,
> > -					   WL_CONNECTION_READABLE,
> > -					   connection->data);
> > -			connection->write_signalled = 0;
> > -		}
> >  	}
> >  
> > -	if (mask & WL_CONNECTION_READABLE) {
> > -		wl_buffer_put_iov(&connection->in, iov, &count);
> > +	connection->want_flush = 0;
> >  
> > -		msg.msg_name = NULL;
> > -		msg.msg_namelen = 0;
> > -		msg.msg_iov = iov;
> > -		msg.msg_iovlen = count;
> > -		msg.msg_control = cmsg;
> > -		msg.msg_controllen = sizeof cmsg;
> > -		msg.msg_flags = 0;
> > +	return connection->out.head - tail;
> > +}
> >  
> > -		do {
> > -			len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
> > -		} while (len < 0 && errno == EINTR);
> > +int
> > +wl_connection_read(struct wl_connection *connection)
> > +{
> > +	struct iovec iov[2];
> > +	struct msghdr msg;
> > +	char cmsg[CLEN];
> > +	int len, count;
> >  
> > -		if (len < 0) {
> > -			fprintf(stderr,
> > -				"read error from connection %p: %m (%d)\n",
> > -				connection, errno);
> > -			return -1;
> > -		} else if (len == 0) {
> > -			/* FIXME: Handle this better? */
> > -			return -1;
> > -		}
> > +	wl_buffer_put_iov(&connection->in, iov, &count);
> > +
> > +	msg.msg_name = NULL;
> > +	msg.msg_namelen = 0;
> > +	msg.msg_iov = iov;
> > +	msg.msg_iovlen = count;
> > +	msg.msg_control = cmsg;
> > +	msg.msg_controllen = sizeof cmsg;
> > +	msg.msg_flags = 0;
> > +
> > +	do {
> > +		len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
> > +	} while (len < 0 && errno == EINTR);
> >  
> > -		decode_cmsg(&connection->fds_in, &msg);
> > +	if (len <= 0)
> > +		return len;
> >  
> > -		connection->in.head += len;
> > -	}	
> > +	decode_cmsg(&connection->fds_in, &msg);
> > +
> > +	connection->in.head += len;
> >  
> >  	return connection->in.head - connection->in.tail;
> >  }
> > @@ -334,18 +320,11 @@ wl_connection_write(struct wl_connection *connection,
> >  {
> >  	if (connection->out.head - connection->out.tail +
> >  	    count > ARRAY_LENGTH(connection->out.data))
> 
> I think you need want_flush=1 forced here, otherwise the flush is a
> no-op.

It could be, though it's probably unlikely that we fill up the output
buffer without at least one wl_connection_write() in there to set
want_flush=1.  But it could happen, and you're right.

> > -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> > +		if (wl_connection_flush(connection))
> >  			return -1;
> 
> Doesn't wl_connection_flush() return the number of bytes sent, or
> negative on error? If so, the above condition seems wrong. Should it
> not be
> 
> +		if (wl_connection_flush(connection) < 0)
>  			return -1;

Yes, fixed.

> Should we account for a short send, i.e. it sent some data but not all?
> Looks like wl_connection_flush() turns a short send into an error case,
> so I guess not.
>
> Is it ok to return an error, if a short send did release enough
> buffer space to fit the new write, but did not send all data from the
> buffer, due to e.g. EAGAIN?

A short send turns into a return value of -1 and errno=EAGAIN.
wl_connection_flsh() keeps writing until it gets EAGAIN.  It's not
clear what's a good behaviour in this case and it depends on client
side vs server side.  On the client, it may make sense to just block
if we can't send enough to make space for the new request, but
currently we just die in abort().  It's really hard to do any other
kind of recovery, I certainly don't see any good way to return to the
mainloop, write the remaining data asynchronously and then recover...
It seems like a choice between abort(), blocking, growing the buffer
or setting a wl_display error state and return (David Herrmann is
sending out patches that does this for ENOMEM errors in the marshal
path).  The wl_display error should then be checked by the app (before
blocking, perhaps), but it's not clear to me what kind of recovery the
app can make.

On the server side, we currently just kill the client if it doesn't
read fast enough.  We had a long discussion about a while ago (me,
sroedal and jlind, mainly) and the "kill-the-client" approach breaks
down if the server dumps a lot of data in short time (they were
implementing a glyph/icon cache over wayland protocol).  X just keeps
growing the output buffer until it hits OOM if a client doesn't read
from the socket.  But those are the only two options I can think of on
the server side.

Something to think about... I've applied the other fixes, thanks.

Kristian

> >  
> >  	wl_buffer_put(&connection->out, data, count);
> > -
> > -	if (!connection->write_signalled) {
> > -		connection->update(connection,
> > -				   WL_CONNECTION_READABLE |
> > -				   WL_CONNECTION_WRITABLE,
> > -				   connection->data);
> > -		connection->write_signalled = 1;
> > -	}
> > +	connection->want_flush = 1;
> >  
> >  	return 0;
> >  }
> > @@ -356,7 +335,7 @@ wl_connection_queue(struct wl_connection *connection,
> >  {
> >  	if (connection->out.head - connection->out.tail +
> >  	    count > ARRAY_LENGTH(connection->out.data))
> 
> want_flush=1 needed here, too.

Yup.

> > -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> > +		if (wl_connection_flush(connection))
> >  			return -1;
> >  
> >  	wl_buffer_put(&connection->out, data, count);
> > @@ -395,7 +374,7 @@ static int
> >  wl_connection_put_fd(struct wl_connection *connection, int32_t fd)
> >  {
> >  	if (wl_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd)
> 
> want_flush=1?

Yeah.

> > -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> > +		if (wl_connection_flush(connection))
> >  			return -1;
> >  
> >  	wl_buffer_put(&connection->fds_out, &fd, sizeof fd);
> ...
> 
> The rest seemed like it would work well enough, though I didn't read
> too carefully.
> 
> 
> Thanks,
> pq


More information about the wayland-devel mailing list