[systemd-devel] [RFC][PATCH] sd-bus: split ref-counting of queues from ref-counting of the rest of the sd-bus object
Tom Gundersen
teg at jklm.no
Sun Mar 23 08:49:19 PDT 2014
Introduce a new ref-count, n_ref_queues, which only protects the {r,w}queue of
a bus, and introduce bus_{un,}ref(), which are only available internally, and
which do not protect these queues.
Make sure that sd_bus_message object do not call sd_bus_ref(), but only the
internal bus_ref(). This is ok as the {r,w}queues should never be accessed via
the message object (as doing so would anyway not be thread-safe).
When the refcount on the queues reaches zero (even thought the refcount on the
bus itself may not, due to references held by messages in the queues), the
queues are flushed and their messages unref'ed. This avoids problems due to
mutual references between busses and their queued messages. In particular we
don't get an sd_bus_unref() -> sd_bus_message_unref() -> sd_bus_unref()
call-chain.
Moreover, we can now enforce that sd_bus_{un,}ref() is only ever called from
the same thread as created the bus (whereas bus_unref() may be called from a
different thread, as part of unref'ing a message being handled in a worker
thread).
---
src/libsystemd/sd-bus/bus-internal.h | 10 ++++
src/libsystemd/sd-bus/bus-message.c | 6 +--
src/libsystemd/sd-bus/sd-bus.c | 93 +++++++++++++++---------------------
3 files changed, 51 insertions(+), 58 deletions(-)
diff --git a/src/libsystemd/sd-bus/bus-internal.h b/src/libsystemd/sd-bus/bus-internal.h
index 3dceb8a..c9b40af 100644
--- a/src/libsystemd/sd-bus/bus-internal.h
+++ b/src/libsystemd/sd-bus/bus-internal.h
@@ -145,6 +145,13 @@ struct sd_bus {
same time. */
RefCount n_ref;
+ /* The {r,w}queue may only be accessed from the original
+ thread, so no need for atomic ref counting. The queues
+ are ref-counted separately from the sd_bus object to
+ avoid problems caused by mutual references between
+ busses and their queued messages */
+ unsigned n_ref_queues;
+
enum bus_state state;
int input_fd, output_fd;
int message_version;
@@ -340,3 +347,6 @@ int bus_set_address_system(sd_bus *bus);
int bus_set_address_user(sd_bus *bus);
int bus_set_address_system_remote(sd_bus *b, const char *host);
int bus_set_address_system_container(sd_bus *b, const char *machine);
+
+sd_bus *bus_ref(sd_bus* bus);
+sd_bus *bus_unref(sd_bus *bus);
diff --git a/src/libsystemd/sd-bus/bus-message.c b/src/libsystemd/sd-bus/bus-message.c
index 4fcc693..e33c08a 100644
--- a/src/libsystemd/sd-bus/bus-message.c
+++ b/src/libsystemd/sd-bus/bus-message.c
@@ -137,7 +137,7 @@ static void message_free(sd_bus_message *m) {
}
if (m->bus)
- sd_bus_unref(m->bus);
+ bus_unref(m->bus);
if (m->free_fds) {
close_many(m->fds, m->n_fds);
@@ -427,7 +427,7 @@ int bus_message_from_header(
}
if (bus)
- m->bus = sd_bus_ref(bus);
+ m->bus = bus_ref(bus);
*ret = m;
return 0;
@@ -502,7 +502,7 @@ static sd_bus_message *message_new(sd_bus *bus, uint8_t type) {
m->root_container.need_offsets = BUS_MESSAGE_IS_GVARIANT(m);
if (bus)
- m->bus = sd_bus_ref(bus);
+ m->bus = bus_ref(bus);
return m;
}
diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c
index bbe61a6..b09326d 100644
--- a/src/libsystemd/sd-bus/sd-bus.c
+++ b/src/libsystemd/sd-bus/sd-bus.c
@@ -111,12 +111,6 @@ static void bus_node_destroy(sd_bus *b, struct node *n) {
static void bus_reset_queues(sd_bus *b) {
assert(b);
- /* NOTE: We _must_ decrement b->Xqueue_size before calling
- * sd_bus_message_unref() for _each_ message. Otherwise the
- * self-reference checks in sd_bus_unref() will fire for each message.
- * We would thus recurse into sd_bus_message_unref() and trigger the
- * assert(m->n_ref > 0) */
-
while (b->rqueue_size > 0)
sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
@@ -203,6 +197,7 @@ _public_ int sd_bus_new(sd_bus **ret) {
return -ENOMEM;
r->n_ref = REFCNT_INIT;
+ r->n_ref_queues = 1;
r->input_fd = r->output_fd = -1;
r->message_version = 1;
r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
@@ -1380,72 +1375,60 @@ static void bus_enter_closing(sd_bus *bus) {
bus->state = BUS_CLOSING;
}
-_public_ sd_bus *sd_bus_ref(sd_bus *bus) {
- assert_return(bus, NULL);
+/* Unlike sd_bus_{un,}ref(), bus_{un,}ref() do not protect the
+ {r,w}queue. On the other hand, these methods are thread-safe.
+ */
+sd_bus *bus_ref(sd_bus* bus) {
+ assert(bus);
assert_se(REFCNT_INC(bus->n_ref) >= 2);
return bus;
}
-_public_ sd_bus *sd_bus_unref(sd_bus *bus) {
+sd_bus *bus_unref(sd_bus *bus) {
unsigned i;
if (!bus)
return NULL;
- /* TODO/FIXME: It's naive to think REFCNT_GET() is thread-safe in any
- * way but exclusive REFCNT_DEC(). The current logic _must_ lock around
- * REFCNT_GET() until REFCNT_DEC() or two threads might end up in
- * parallel in bus_reset_queues(). But locking would totally break the
- * recursion we introduce by bus_reset_queues()...
- * (Imagine one thread in sd_bus_message_unref() setting n_ref to 0 and
- * thus calling into sd_bus_unref(). If at the same time the real
- * thread calls sd_bus_unref(), both end up with "q == true" and will
- * call into bus_reset_queues().
- * If we require the main bus to be alive until all dispatch threads
- * are done, there is no need to do ref-counts at all. So in both ways,
- * the REFCNT thing is humbug.)
- *
- * On a second note: messages are *not* required to have ->bus set nor
- * does it have to be _this_ bus that they're assigned to. This whole
- * ref-cnt checking breaks apart if a message is not assigned to us.
- * (which is _very_ easy to trigger with the current API). */
-
- if (REFCNT_GET(bus->n_ref) == bus->rqueue_size + bus->wqueue_size + 1) {
- bool q = true;
-
- for (i = 0; i < bus->rqueue_size; i++)
- if (bus->rqueue[i]->n_ref > 1) {
- q = false;
- break;
- }
+ i = REFCNT_DEC(bus->n_ref);
+ if (i > 0)
+ return NULL;
- if (q) {
- for (i = 0; i < bus->wqueue_size; i++)
- if (bus->wqueue[i]->n_ref > 1) {
- q = false;
- break;
- }
- }
+ bus_free(bus);
- /* We are the only holders on the messages, and the
- * messages are the only holders on us, so let's drop
- * the messages and thus implicitly also kill our own
- * last references.
- * bus_reset_queues() decrements the queue-size before
- * calling into sd_bus_message_unref(). Thus, it
- * protects us from recursion. */
+ return NULL;
+}
- if (q)
- bus_reset_queues(bus);
- }
+/* sd_bus_ref() sholud not be called from sd_bus_message objects
+ as these may be in the bus' queues, hence creating circular
+ references
+ */
+_public_ sd_bus *sd_bus_ref(sd_bus *bus) {
+ assert_return(bus, NULL);
+ assert_return(!bus_pid_changed(bus), NULL);
- i = REFCNT_DEC(bus->n_ref);
- if (i > 0)
+ bus_ref(bus);
+ assert_se(++(bus->n_ref_queues) >= 2);
+
+ return bus;
+}
+
+_public_ sd_bus *sd_bus_unref(sd_bus *bus) {
+ unsigned i;
+
+ if (!bus)
return NULL;
- bus_free(bus);
+ assert_return(!bus_pid_changed(bus), NULL);
+
+ i = bus->n_ref_queues --;
+ if (i <= 0)
+ bus_reset_queues(bus);
+
+ bus_unref(bus);
+
return NULL;
}
--
1.9.1
More information about the systemd-devel
mailing list