[PATCH] Iteration wakeup

Fan Wu wufan9418 at gmail.com
Mon Aug 6 18:03:34 PDT 2007


Hi all,

It took longer than I thought as I stumbled over Windows. Windows
patch will be posted later. Please review. Thanks!

Fan

[The bug]

There is a bug in DBUS when

- you program a DBUS *client*
- you have only one DBUS connection which is shared among several
threads (one read while the others doing async send)
- you have a thread looping with blocking dbus_connection_read_write()
or dbus_connection_read_write_dispatch()
- you want to initiate outgoing messages from another thread , or
- you want to *gracefully* interrupt and finish the looping thread
before the application exits

The problem is the outgoing message will never go out till there are
incoming messages, because dbus_connection_read_write() and its
friends block on incoming traffic (assume you set the timeout to -1).

Note that even in multi-threaded environment the problem can be
bypassed with the one of following two measures (but each with its own
problems):

- use dbus_connection_read_write() with a short timeout (frequent
polling). This is not eco-friendly as it can quickly drain the battery
of small handheld devices. Using a longer timeout renders the
application less responsive.

- use multiple connections and each connection has its own
dispatch/async-send threads. This is viable and the only downside is
implementation complexity.

You don't have the problem with mainloop.

[The fix]

The fix is to add a wakeup mechanism which is invoked when there are
outgoing messages and the looping thread is in the deep sleep of
poll/select(). This happens inside DBUS without user intervention.

A new API dbus_connection_wakeup_iteration() is added to allow other
threads to explicitly wake up the looping thread.

Here are some gory details. For Unix/Linux platforms an unamed pipe is
created for this purpose. The read end of the pipe is polled by the
looping thread along with the socket, the write end of the pipe is
used by the wakeup mechanism. When the thread is blocking in the
poll/select call, write to the write end of the pipe shall interrupt
the poll/select.

Unfortunately the same trick can't be applied to the Window DBUS code.
In Windows the poll/select mechanism can only be used on network
sockets, not pipes or other I/O handles. It would be way too expensive
if we implant a pair of TCP/UDP sockets for each DBUS connection just
to leverage the poll facility. The approach I take for Windows is APC
(asynchronous procedure call). In Windows every thread has an APC
queue and a thread in "alertable" wait state can be waken up when a
APC procedure is put in the queue. Note that the wait state has to be
"alertable" which means the "alertable" parameter is set to true when
WaitForMultipleObjectsEx() or other wait functions are called. The
poll() in Windows is not alertable. The windbus team does offer
another version of poll which uses WSAWaitForMultipleEvents() and this
function is alertable. The APC function is empty as its whole purpose
is to get queued. The alertable version _dbus_poll in windbus tree is
not working at this time. I will merge the two fixes and submit the
patch soon.

The fix does not affect mainloops.

[The patch]

The patch is diffed against 1.1.2.

diff -rup dbus-1.1.2/dbus/dbus-connection.c
dbus-1.1.2-fix/dbus/dbus-connection.c
--- dbus-1.1.2/dbus/dbus-connection.c	2007-07-24 09:39:08.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-connection.c	2007-08-03 14:42:02.000000000 -0600
@@ -1144,6 +1144,8 @@ _dbus_connection_do_iteration_unlocked (
 				    flags, timeout_milliseconds);
       _dbus_connection_release_io_path (connection);
     }
+   else if ( connection->n_outgoing > 0)
+      _dbus_transport_wakeup_iteration (connection->transport);

   HAVE_LOCK_CHECK (connection);

@@ -2894,6 +2896,31 @@ dbus_connection_get_server_id (DBusConne
 }

 /**
+ * This function is used in multi-threaded environment and is used to
+ * wakeup the DBUS thread if it's in poll/select sleep(). This function
+ * shall normally be used when an application is about exit.
+ *
+ * @param connection the connection.
+ * @return TRUE if the wakeup mechanism has been executed, otherwise FALSE.
+ */
+
+dbus_bool_t
+dbus_connection_wakeup_iteration (DBusConnection* connection)
+{
+    dbus_bool_t ret = FALSE;
+
+    if (NULL == connection)
+       return ret;
+
+    CONNECTION_LOCK (connection);
+    ret = _dbus_transport_wakeup_iteration (connection->transport);
+    CONNECTION_UNLOCK (connection);
+
+    return ret;
+}
+
+
+/**
  * Set whether _exit() should be called when the connection receives a
  * disconnect signal. The call to _exit() comes after any handlers for
  * the disconnect signal run; handlers can cancel the exit by calling
diff -rup dbus-1.1.2/dbus/dbus-connection.h
dbus-1.1.2-fix/dbus/dbus-connection.h
--- dbus-1.1.2/dbus/dbus-connection.h	2007-07-24 09:39:08.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-connection.h	2007-08-05 09:37:34.000000000 -0600
@@ -259,6 +259,7 @@ void        dbus_connection_remove_filte


 /* Other */
+dbus_bool_t dbus_connection_wakeup_iteration   (DBusConnection* connection);
 dbus_bool_t dbus_connection_allocate_data_slot (dbus_int32_t     *slot_p);
 void        dbus_connection_free_data_slot     (dbus_int32_t     *slot_p);
 dbus_bool_t dbus_connection_set_data           (DBusConnection   *connection,
diff -rup dbus-1.1.2/dbus/dbus-sysdeps.h dbus-1.1.2-fix/dbus/dbus-sysdeps.h
--- dbus-1.1.2/dbus/dbus-sysdeps.h	2007-07-25 11:01:51.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-sysdeps.h	2007-08-05 09:43:36.000000000 -0600
@@ -447,6 +447,16 @@ dbus_pid_t    _dbus_getpid (void);

 void _dbus_flush_caches (void);

+/*
+ * Functions used to wakeup iteration
+ */
+dbus_bool_t    _dbus_iteration_wakeup_initialize  (void*       data);
+dbus_bool_t    _dbus_iteration_wakeup             (void*       data);
+void           _dbus_iteration_wakeup_free        (void*       data);
+void           _dbus_iteration_wakeup_reset       (void*       data);
+void           _dbus_iteration_wakeup_update      (void*       data,
+                                                   dbus_bool_t polling_now);
+
 /** @} */

 DBUS_END_DECLS
diff -rup dbus-1.1.2/dbus/dbus-sysdeps-unix.c
dbus-1.1.2-fix/dbus/dbus-sysdeps-unix.c
--- dbus-1.1.2/dbus/dbus-sysdeps-unix.c	2007-07-25 11:01:51.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-sysdeps-unix.c	2007-08-03 15:23:59.000000000 -0600
@@ -3245,3 +3245,129 @@ _dbus_get_is_errno_eagain_or_ewouldblock
 }

 /* tests in dbus-sysdeps-util.c */
+
+
+/**
+ * Iteration wakeup functions. These functions shall be called with
connection locked.
+ * For Unix/Linux platforms the transport (which is doing the
polling) holds the read
+ * end of an unnamed pipe, and we save the write end of the pipe in
DBusIterationWakeupData,
+ * and write to it to wakeup the transport.
+ *
+ */
+
+typedef struct
+{
+    int           poll_fd;
+    int           wakeup_fd;
+    dbus_bool_t   is_polling;
+
+}DBusIterationWakeupData;
+
+
+dbus_bool_t
+_dbus_iteration_wakeup_initialize (void* data)
+{
+   int fd[2];
+   DBusIterationWakeupData* p =
(DBusIterationWakeupData*)malloc(sizeof(DBusIterationWakeupData));
+
+   if (NULL == p) return FALSE;
+
+   if (pipe(fd))
+   {
+       free(p);
+       return FALSE;
+   }
+
+   p->poll_fd    = fd[0]; /* for reading*/
+   p->wakeup_fd  = fd[1]; /* for writing*/
+   p->is_polling = FALSE;
+
+   *(DBusIterationWakeupData**)data = p;
+   return TRUE;
+}
+
+
+dbus_bool_t
+_dbus_iteration_wakeup (void* data)
+{
+   DBusIterationWakeupData* p = (DBusIterationWakeupData*)data;
+   fd_set wfds;
+   struct timeval tv;
+   int retval;
+
+   if (NULL == p || !p->is_polling)
+       return FALSE;
+
+   /* Still do a test before writing*/
+   FD_ZERO (&wfds);
+   FD_SET (p->wakeup_fd, &wfds);
+   tv.tv_sec  = 0;  /*immediate return*/
+   tv.tv_usec = 0;
+
+   retval = select (p->wakeup_fd+1, NULL, &wfds, NULL, &tv);
+
+   if (retval <= 0)
+       return FALSE;
+
+   /*write one char*/
+   if (write (p->wakeup_fd, "w", 1) >0 )
+       return TRUE;
+
+   return FALSE;
+}
+
+
+void
+_dbus_iteration_wakeup_free (void* data)
+{
+   DBusIterationWakeupData* p = *(DBusIterationWakeupData**)data;
+
+   if (NULL == p) return;
+
+   close (p->wakeup_fd);
+   close (p->poll_fd);
+
+   free (p);
+   *(DBusIterationWakeupData**)data = NULL;
+}
+
+
+void
+_dbus_iteration_wakeup_reset (void* data)
+{
+#define WAKEUP_BUF_SIZE 32
+   DBusIterationWakeupData* p = (DBusIterationWakeupData*)data;
+   char buf[WAKEUP_BUF_SIZE];
+   fd_set rfds;
+   struct timeval tv;
+   int retval;
+
+   if (NULL == p) return;
+
+   while ( 1 )
+   {
+       FD_ZERO (&rfds);
+       FD_SET (p->poll_fd, &rfds);
+       tv.tv_sec  = 0;  /*immediate return*/
+       tv.tv_usec = 0;
+
+       retval = select (p->poll_fd+1, &rfds, NULL, NULL, &tv);
+
+       if (retval <= 0)
+           return ;
+       if (read(p->poll_fd, buf, WAKEUP_BUF_SIZE) <= 0)
+           return;
+   }
+}
+
+void
+_dbus_iteration_wakeup_update (void* data, dbus_bool_t polling_now)
+{
+   DBusIterationWakeupData* p = (DBusIterationWakeupData*)data;
+
+   if (NULL == p) return;
+
+   p->is_polling = polling_now;
+}
+
+
diff -rup dbus-1.1.2/dbus/dbus-transport.c dbus-1.1.2-fix/dbus/dbus-transport.c
--- dbus-1.1.2/dbus/dbus-transport.c	2007-07-24 09:39:09.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-transport.c	2007-08-05 09:52:26.000000000 -0600
@@ -911,6 +911,35 @@ _dbus_transport_get_socket_fd (DBusTrans
 }

 /**
+ * Call wakeup_iteration in transport. Wakeup the transport if it's polling.
+ *
+ * @param transport the transport.
+ * @return TRUE if wakeup event is sent, otherwise FALSE.
+ */
+dbus_bool_t
+_dbus_transport_wakeup_iteration (DBusTransport  *transport)
+{
+  dbus_bool_t ret = FALSE;
+
+  _dbus_assert (transport->vtable->do_iteration != NULL);
+
+  _dbus_verbose ("Transport wakeup poll,  connected = %d\n",
+                 !transport->disconnected);
+
+  if (transport->disconnected)
+    return ret;
+
+  _dbus_transport_ref (transport);
+  ret  = (* transport->vtable->wakeup_iteration) (transport);
+  _dbus_transport_unref (transport);
+
+  _dbus_verbose ("%s end\n", _DBUS_FUNCTION_NAME);
+
+  return ret;
+}
+
+
+/**
  * Performs a single poll()/select() on the transport's file
  * descriptors and then reads/writes data as appropriate,
  * queueing incoming messages and sending outgoing messages.
diff -rup dbus-1.1.2/dbus/dbus-transport.h dbus-1.1.2-fix/dbus/dbus-transport.h
--- dbus-1.1.2/dbus/dbus-transport.h	2007-07-24 09:39:09.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-transport.h	2007-08-05 09:53:19.000000000 -0600
@@ -82,7 +82,7 @@ dbus_bool_t        _dbus_transport_set_a
                                                            const char
               **mechanisms);
 void               _dbus_transport_set_allow_anonymous
(DBusTransport              *transport,

dbus_bool_t                 value);
-
+dbus_bool_t        _dbus_transport_wakeup_iteration
(DBusTransport              *transport);

 DBUS_END_DECLS

diff -rup dbus-1.1.2/dbus/dbus-transport-protected.h
dbus-1.1.2-fix/dbus/dbus-transport-protected.h
--- dbus-1.1.2/dbus/dbus-transport-protected.h	2007-07-24
09:39:09.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-transport-protected.h	2007-08-05
09:58:22.000000000 -0600
@@ -69,6 +69,9 @@ struct DBusTransportVTable
   dbus_bool_t (* get_socket_fd) (DBusTransport *transport,
                                  int           *fd_p);
   /**< Get socket file descriptor */
+
+  dbus_bool_t (* wakeup_iteration)         (DBusTransport *transport);
+  /**< Called to interrupt a poll/select sleep */
 };

 /**
diff -rup dbus-1.1.2/dbus/dbus-transport-socket.c
dbus-1.1.2-fix/dbus/dbus-transport-socket.c
--- dbus-1.1.2/dbus/dbus-transport-socket.c	2007-07-25 11:01:51.000000000 -0600
+++ dbus-1.1.2-fix/dbus/dbus-transport-socket.c	2007-08-03
14:32:47.000000000 -0600
@@ -65,6 +65,9 @@ struct DBusTransportSocket
   DBusString encoded_incoming;          /**< Encoded version of current
                                          *   incoming data.
                                          */
+  void*   iteration_wakeup_data;        /**<  Data used by platform
dependent code
+                                         *    to interrupt the
poll/select sleep
+                                         */
 };

 static void
@@ -103,6 +106,9 @@ socket_finalize (DBusTransport *transpor
   DBusTransportSocket *socket_transport = (DBusTransportSocket*) transport;

   _dbus_verbose ("%s\n", _DBUS_FUNCTION_NAME);
+
+  if (socket_transport->iteration_wakeup_data)
+       _dbus_iteration_wakeup_free ( &
socket_transport->iteration_wakeup_data);

   free_watches (transport);

@@ -937,6 +943,20 @@ socket_connection_set (DBusTransport *tr
   return TRUE;
 }

+
+static dbus_bool_t
+socket_wakeup_iteration (DBusTransport *transport)
+{
+      DBusTransportSocket *socket_transport = (DBusTransportSocket*) transport;
+
+      if (!socket_transport->iteration_wakeup_data)
+          return FALSE;
+
+      return _dbus_iteration_wakeup(socket_transport->iteration_wakeup_data);
+}
+
+
+
 /**
  * @todo We need to have a way to wake up the select sleep if
  * a new iteration request comes in with a flag (read/write) that
@@ -950,7 +970,7 @@ socket_do_iteration (DBusTransport *tran
                    int            timeout_milliseconds)
 {
   DBusTransportSocket *socket_transport = (DBusTransportSocket*) transport;
-  DBusPollFD poll_fd;
+  DBusPollFD poll_fd[2];  /* 0 for data, 1 for waekup*/
   int poll_res;
   int poll_timeout;

@@ -968,8 +988,15 @@ socket_do_iteration (DBusTransport *tran
    * we don't want to read any messages yet if not given DO_READING.
    */

-  poll_fd.fd = socket_transport->fd;
-  poll_fd.events = 0;
+  poll_fd[0].fd = socket_transport->fd;
+  poll_fd[0].events = 0;
+
+  if (socket_transport->iteration_wakeup_data)
+         poll_fd[1].fd = *((int*)socket_transport->iteration_wakeup_data);
+  else
+         poll_fd[1].fd = 0;
+
+  poll_fd[1].events = 0;

   if (_dbus_transport_get_is_authenticated (transport))
     {
@@ -998,11 +1025,14 @@ socket_do_iteration (DBusTransport *tran
       /* If we get here, we decided to do the poll() after all */
       _dbus_assert (socket_transport->read_watch);
       if (flags & DBUS_ITERATION_DO_READING)
-	poll_fd.events |= _DBUS_POLLIN;
+	poll_fd[0].events |= _DBUS_POLLIN;

       _dbus_assert (socket_transport->write_watch);
       if (flags & DBUS_ITERATION_DO_WRITING)
-        poll_fd.events |= _DBUS_POLLOUT;
+        poll_fd[0].events |= _DBUS_POLLOUT;
+
+      if ( poll_fd[1].fd )
+              poll_fd[1].events |= _DBUS_POLLIN;
     }
   else
     {
@@ -1012,14 +1042,14 @@ socket_do_iteration (DBusTransport *tran

       if (transport->receive_credentials_pending ||
           auth_state == DBUS_AUTH_STATE_WAITING_FOR_INPUT)
-	poll_fd.events |= _DBUS_POLLIN;
+	poll_fd[0].events |= _DBUS_POLLIN;

       if (transport->send_credentials_pending ||
           auth_state == DBUS_AUTH_STATE_HAVE_BYTES_TO_SEND)
-	poll_fd.events |= _DBUS_POLLOUT;
+	poll_fd[0].events |= _DBUS_POLLOUT;
     }

-  if (poll_fd.events)
+  if (poll_fd[0].events)
     {
       if (flags & DBUS_ITERATION_BLOCK)
 	poll_timeout = timeout_milliseconds;
@@ -1034,11 +1064,12 @@ socket_do_iteration (DBusTransport *tran
       if (flags & DBUS_ITERATION_BLOCK)
         {
           _dbus_verbose ("unlock %s pre poll\n", _DBUS_FUNCTION_NAME);
+          _dbus_iteration_wakeup_update
(socket_transport->iteration_wakeup_data, TRUE);
           _dbus_connection_unlock (transport->connection);
         }

     again:
-      poll_res = _dbus_poll (&poll_fd, 1, poll_timeout);
+      poll_res = _dbus_poll (poll_fd, (poll_fd[1].fd?2:1), poll_timeout);

       if (poll_res < 0 && _dbus_get_is_errno_eintr ())
 	goto again;
@@ -1047,22 +1078,28 @@ socket_do_iteration (DBusTransport *tran
         {
           _dbus_verbose ("lock %s post poll\n", _DBUS_FUNCTION_NAME);
           _dbus_connection_lock (transport->connection);
+          _dbus_iteration_wakeup_update
(socket_transport->iteration_wakeup_data,FALSE);
         }

       if (poll_res >= 0)
         {
           if (poll_res == 0)
-            poll_fd.revents = 0; /* some concern that posix does not
guarantee this;
+            poll_fd[0].revents = 0; /* some concern that posix does
not guarantee this;
                                   * valgrind flags it as an error.
though it probably
                                   * is guaranteed on linux at least.
                                   */
+
+          if ( poll_fd[1].fd
+               && (poll_fd[1].events & _DBUS_POLLIN)
+               && (poll_fd[1].revents & _DBUS_POLLIN) )
+              _dbus_iteration_wakeup_reset
(socket_transport->iteration_wakeup_data);

-          if (poll_fd.revents & _DBUS_POLLERR)
+          if (poll_fd[0].revents & _DBUS_POLLERR)
             do_io_error (transport);
           else
             {
-              dbus_bool_t need_read = (poll_fd.revents & _DBUS_POLLIN) > 0;
-              dbus_bool_t need_write = (poll_fd.revents & _DBUS_POLLOUT) > 0;
+              dbus_bool_t need_read = (poll_fd[0].revents & _DBUS_POLLIN) > 0;
+              dbus_bool_t need_write = (poll_fd[0].revents &
_DBUS_POLLOUT) > 0;
 	      dbus_bool_t authentication_completed;

               _dbus_verbose ("in iteration, need_read=%d need_write=%d\n",
@@ -1130,7 +1167,8 @@ static const DBusTransportVTable socket_
   socket_connection_set,
   socket_do_iteration,
   socket_live_messages_changed,
-  socket_get_socket_fd
+  socket_get_socket_fd,
+  socket_wakeup_iteration
 };

 /**
@@ -1186,9 +1224,20 @@ _dbus_transport_new_for_socket (int
   /* These values should probably be tunable or something. */
   socket_transport->max_bytes_read_per_iteration = 2048;
   socket_transport->max_bytes_written_per_iteration = 2048;
+
+  /*add wakeup for client connections*/
+  socket_transport->iteration_wakeup_data = NULL;
+
+  if (NULL == server_guid)  /*client only*/
+  {
+     if(!_dbus_iteration_wakeup_initialize
(&socket_transport->iteration_wakeup_data))
+         goto failed_5;
+  }

   return (DBusTransport*) socket_transport;

+ failed_5:
+   _dbus_transport_finalize_base (&socket_transport->base);
  failed_4:
   _dbus_watch_unref (socket_transport->read_watch);
  failed_3:


More information about the dbus mailing list