[PATCH 1/8] Change filedescriptor API to be thread safe

Pekka Paalanen ppaalanen at gmail.com
Thu Oct 11 00:02:58 PDT 2012


On Wed, 10 Oct 2012 20:58:34 -0400
Kristian Høgsberg <hoegsberg at gmail.com> wrote:

> On Wed, Oct 10, 2012 at 11:32:15AM +0300, Pekka Paalanen wrote:
> > On Tue,  9 Oct 2012 22:37:58 -0400
> > Kristian Høgsberg <krh at bitplanet.net> wrote:
> > 
> > > The update callback for the file descriptors was always a bit awkward and
> > > un-intuitive.  The idea was that whenever the protocol code needed to
> > > write data to the fd it would call the 'update' function.  This function
> > > would adjust the mainloop so that it polls for POLLOUT on the fd so we
> > > can eventually flush the data to the socket.
> > > 
> > > The problem is that in multi-threaded applications, any thread can issue
> > > a request, which writes data to the output buffer and thus triggers the
> > > update callback.  Thus, we'll be calling out with the display mutex
> > > held and may call from any thread.
> > > 
> > > The solution is to eliminate the udpate callback and just require that
> > > the application or server flushes all connection buffers before blocking.
> > > This turns out to be a simpler API, although we now require clients to
> > > deal with EAGAIN and non-blocking writes.  It also saves a few syscalls,
> > > since the socket will be writable most of the time and most writes will
> > > complete, so we avoid changing epoll to poll for POLLOUT, then write and
> > > then change it back for each write.
> > > ---
> > >  src/connection.c         |  109 +++++++++++++++++++---------------------------
> > >  src/event-loop.c         |    4 ++
> > >  src/wayland-client.c     |   62 ++++++--------------------
> > >  src/wayland-client.h     |   10 ++---
> > >  src/wayland-private.h    |   15 +++----
> > >  src/wayland-server.c     |   75 ++++++++++++++++++-------------
> > >  src/wayland-server.h     |    5 ++-
> > >  tests/connection-test.c  |   82 +++++++++-------------------------
> > >  tests/os-wrappers-test.c |   31 +++----------
> > >  9 files changed, 143 insertions(+), 250 deletions(-)
> > > 
> > > diff --git a/src/connection.c b/src/connection.c
> > > index dbe0fa9..54de4f1 100644
> > > --- a/src/connection.c
> > > +++ b/src/connection.c
> > > @@ -56,9 +56,7 @@ struct wl_connection {
> > >  	struct wl_buffer in, out;
> > >  	struct wl_buffer fds_in, fds_out;
> > >  	int fd;
> > > -	void *data;
> > > -	wl_connection_update_func_t update;
> > > -	int write_signalled;
> > > +	int want_flush;
> > >  };
> > >  
> > >  union wl_value {
...
> > > @@ -249,14 +239,19 @@ decode_cmsg(struct wl_buffer *buffer, struct msghdr *msg)
> > >  }
> > >  
> > >  int
> > > -wl_connection_data(struct wl_connection *connection, uint32_t mask)
> > > +wl_connection_flush(struct wl_connection *connection)
> > >  {
> > >  	struct iovec iov[2];
> > >  	struct msghdr msg;
> > >  	char cmsg[CLEN];
> > > -	int len, count, clen;
> > > +	int len = 0, count, clen;
> > > +	uint32_t tail;
> > > +
> > > +	if (!connection->want_flush)
> > > +		return 0;
> > >  
> > > -	if (mask & WL_CONNECTION_WRITABLE) {
> > > +	tail = connection->out.tail;
> > > +	while (connection->out.head - connection->out.tail > 0) {
> > >  		wl_buffer_get_iov(&connection->out, iov, &count);
> > >  
> > >  		build_cmsg(&connection->fds_out, cmsg, &clen);
> > > @@ -272,58 +267,49 @@ wl_connection_data(struct wl_connection *connection, uint32_t mask)
> > >  		do {
> > >  			len = sendmsg(connection->fd, &msg,
> > >  				      MSG_NOSIGNAL | MSG_DONTWAIT);
> > > -		} while (len < 0 && errno == EINTR);
> > > +		} while (len == -1 && errno == EINTR);
> > >  
> > > -		if (len == -1 && errno == EPIPE) {
> > > +		if (len == -1)
> > >  			return -1;
> > > -		} else if (len < 0) {
> > > -			fprintf(stderr,
> > > -				"write error for connection %p, fd %d: %m\n",
> > > -				connection, connection->fd);
> > > -			return -1;
> > > -		}
> > >  
> > >  		close_fds(&connection->fds_out);
> > >  
> > >  		connection->out.tail += len;
> > > -		if (connection->out.tail == connection->out.head &&
> > > -		    connection->write_signalled) {
> > > -			connection->update(connection,
> > > -					   WL_CONNECTION_READABLE,
> > > -					   connection->data);
> > > -			connection->write_signalled = 0;
> > > -		}
> > >  	}
> > >  
> > > -	if (mask & WL_CONNECTION_READABLE) {
> > > -		wl_buffer_put_iov(&connection->in, iov, &count);
> > > +	connection->want_flush = 0;
> > >  
> > > -		msg.msg_name = NULL;
> > > -		msg.msg_namelen = 0;
> > > -		msg.msg_iov = iov;
> > > -		msg.msg_iovlen = count;
> > > -		msg.msg_control = cmsg;
> > > -		msg.msg_controllen = sizeof cmsg;
> > > -		msg.msg_flags = 0;
> > > +	return connection->out.head - tail;
> > > +}
> > >  
> > > -		do {
> > > -			len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
> > > -		} while (len < 0 && errno == EINTR);
> > > +int
> > > +wl_connection_read(struct wl_connection *connection)
> > > +{
> > > +	struct iovec iov[2];
> > > +	struct msghdr msg;
> > > +	char cmsg[CLEN];
> > > +	int len, count;
> > >  
> > > -		if (len < 0) {
> > > -			fprintf(stderr,
> > > -				"read error from connection %p: %m (%d)\n",
> > > -				connection, errno);
> > > -			return -1;
> > > -		} else if (len == 0) {
> > > -			/* FIXME: Handle this better? */
> > > -			return -1;
> > > -		}
> > > +	wl_buffer_put_iov(&connection->in, iov, &count);
> > > +
> > > +	msg.msg_name = NULL;
> > > +	msg.msg_namelen = 0;
> > > +	msg.msg_iov = iov;
> > > +	msg.msg_iovlen = count;
> > > +	msg.msg_control = cmsg;
> > > +	msg.msg_controllen = sizeof cmsg;
> > > +	msg.msg_flags = 0;
> > > +
> > > +	do {
> > > +		len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0);
> > > +	} while (len < 0 && errno == EINTR);
> > >  
> > > -		decode_cmsg(&connection->fds_in, &msg);
> > > +	if (len <= 0)
> > > +		return len;
> > >  
> > > -		connection->in.head += len;
> > > -	}	
> > > +	decode_cmsg(&connection->fds_in, &msg);
> > > +
> > > +	connection->in.head += len;
> > >  
> > >  	return connection->in.head - connection->in.tail;
> > >  }
> > > @@ -334,18 +320,11 @@ wl_connection_write(struct wl_connection *connection,
> > >  {
> > >  	if (connection->out.head - connection->out.tail +
> > >  	    count > ARRAY_LENGTH(connection->out.data))
> > 
> > I think you need want_flush=1 forced here, otherwise the flush is a
> > no-op.
> 
> It could be, though it's probably unlikely that we fill up the output
> buffer without at least one wl_connection_write() in there to set
> want_flush=1.  But it could happen, and you're right.
> 
> > > -		if (wl_connection_data(connection, WL_CONNECTION_WRITABLE))
> > > +		if (wl_connection_flush(connection))
> > >  			return -1;
> > 
> > Doesn't wl_connection_flush() return the number of bytes sent, or
> > negative on error? If so, the above condition seems wrong. Should it
> > not be
> > 
> > +		if (wl_connection_flush(connection) < 0)
> >  			return -1;
> 
> Yes, fixed.
> 
> > Should we account for a short send, i.e. it sent some data but not all?
> > Looks like wl_connection_flush() turns a short send into an error case,
> > so I guess not.
> >
> > Is it ok to return an error, if a short send did release enough
> > buffer space to fit the new write, but did not send all data from the
> > buffer, due to e.g. EAGAIN?
> 
> A short send turns into a return value of -1 and errno=EAGAIN.
> wl_connection_flsh() keeps writing until it gets EAGAIN.  It's not
> clear what's a good behaviour in this case and it depends on client
> side vs server side.  On the client, it may make sense to just block
> if we can't send enough to make space for the new request, but
> currently we just die in abort().  It's really hard to do any other
> kind of recovery, I certainly don't see any good way to return to the
> mainloop, write the remaining data asynchronously and then recover...
> It seems like a choice between abort(), blocking, growing the buffer
> or setting a wl_display error state and return (David Herrmann is
> sending out patches that does this for ENOMEM errors in the marshal
> path).  The wl_display error should then be checked by the app (before
> blocking, perhaps), but it's not clear to me what kind of recovery the
> app can make.

I'm not sure if we are talking about the same thing here.

I meant the case, where wl_connection_flush() does return an error with
errno=EAGAIN, but still has managed to release enough space to make
this wl_connection_write() succeed. Should we not carry on with the
write and return success in that case? We can fail the next write,
then, if needed.

Though, I understand the benefits might be diminishing. If you're
fighting that hard to get messages through, you'll probably fail soon
anyway.

> On the server side, we currently just kill the client if it doesn't
> read fast enough.  We had a long discussion about a while ago (me,
> sroedal and jlind, mainly) and the "kill-the-client" approach breaks
> down if the server dumps a lot of data in short time (they were
> implementing a glyph/icon cache over wayland protocol).  X just keeps
> growing the output buffer until it hits OOM if a client doesn't read
> from the socket.  But those are the only two options I can think of on
> the server side.
> 
> Something to think about... I've applied the other fixes, thanks.
> 
> Kristian

Yup, thanks,
pq


More information about the wayland-devel mailing list