Dbus in Multi-threaded application

Abhi Arora abhiarora4 at live.com
Sat Sep 9 17:45:04 UTC 2017


Hello Community,


I have developed a C++ application (for Bluetooth Low energy) using DBUS on Linux.


Earlier, I was calling dbus_connection_* function in different threads and was observing unexpected behaviour.

I have changed the way I was managing single DBus connection across different threads (as suggested by Thiago Macieira. Thanks to him).


I am now calling dbus_connection_* from a single thread (Thread A). Other threads (Thread B, C ...) in my application prepares, queues DBusMessages in a thread safe queue and then blocks for replies (using mutex, conditional variables).


The thread A periodically deque (from thread safe queue) and sends those messages and sets pending call callbacks and then return. The Thread A is running glib event loop. The callbacks are delivered on Thread A by glib event loop. I am using glib and it

 is exactly similar to "bluetoothctl" event loop implementation:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/gdbus/mainloop.c . The watch, timeout implementation and dispatching of dbus messages are handled by glib event loop.


I have tested it and run valgrind and it didn't show anything unexpected. Looks like everything is working but still I want to be sure if this is the right way to handle it. Do I need to call dbus_threads_init()?


As far as I have gone through dbus source code, I think dbus_message_* API should be thread-safe and re-entrant. Please help.


Below are the few functions:


// Called in Thread other than Thread A

DBusMessage *DBusClient::queue_message_and_block(DBusMessage *p_message)
{
DBusMessage *p_reply;
dbus_message_ref(p_message);
struct cb_data_s *p_data = new struct cb_data_s;
if (p_data == nullptr)
return nullptr;

{
std::unique_lock<std::mutex> reply_guard(p_data->mutex);

p_data->status = false;
p_data->p_reply = nullptr;

{
std::lock_guard<std::mutex> guard(_queue_mutex);
_queue.push_back(std::make_pair(p_message, p_data));
}
p_data->cond.wait(reply_guard, [&] { return p_data->status.load(); });

p_reply = p_data->p_reply;
dbus_message_unref(p_message);
}
delete p_data;

return p_reply;
}

// Called periodically by glib in  Thread A.
gboolean DBusClient::periodic_dbus_message_dispatch(gpointer user_data)
{
DBusMessage *p_msg = nullptr;
struct cb_data_s *p_data = nullptr;
std::lock_guard<std::mutex> guard(_queue_mutex);
while (_queue.size()) {

std::pair<DBusMessage *, struct cb_data_s *> pair = _queue.front();
_queue.pop_front();

p_msg = pair.first;
p_data = pair.second;

DBusPendingCall *p_call;

dbus_connection_send_with_reply(_p_conn, p_msg, &p_call, METHOD_CALL_TIMEOUT);

dbus_pending_call_set_notify(p_call, pending_callback, reinterpret_cast<void *>(p_data), nullptr);
dbus_pending_call_unref(p_call);
dbus_message_unref(p_msg);
}

return TRUE;
}

// This callback should be dispatched in thread A.
void DBusClient::pending_callback(DBusPendingCall *p_call, void *data)
{
struct cb_data_s *cb_data = reinterpret_cast<struct cb_data_s *>(data);
if (cb_data == nullptr)
return;

std::lock_guard<std::mutex> guard(cb_data->mutex);

DBusMessage *p_reply = dbus_pending_call_steal_reply(p_call);
DBusError error;

dbus_error_init(&error);
if (dbus_set_error_from_message(&error, p_reply) == TRUE) {
dbus_error_free(&error);
dbus_message_unref(p_reply);
p_reply = nullptr;
}

cb_data->p_reply = p_reply;
cb_data->status = true;
cb_data->cond.notify_all();
}

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/dbus/attachments/20170909/c576bd1d/attachment-0001.html>


More information about the dbus mailing list