[pulseaudio-discuss] [PATCH] combine-sink: Fix unsafe message handling

Tanu Kaskinen tanuk at iki.fi
Sat Jun 27 08:36:35 PDT 2015


This fixes a crash. sink_input_pop_cb() drains the message queue that receives
memchunks from the combine sink thread to avoid requesting more audio too soon.
The same message queue received also SET_REQUESTED_LATENCY messages, which
generate rewind requests. Rewind requests shouldn't be issued in the pop()
callback, doing so results in an assertion error. Therefore, it was not safe to
drain the message queue in the pop() callback, but usually the queue is empty,
so this bug was not immediately detected.

This patch splits the message queue into two queues: audio_inq and control_inq.
audio_inq receives only messages containing memchunks, and control_inq receives
only the SET_REQUESTED_LATENCY messages. The pop() callback only drains the
audio queue, which avoids the rewind requests in the pop() callback.

BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=90489
---
 src/modules/module-combine-sink.c | 108 ++++++++++++++++++++++++++++----------
 1 file changed, 80 insertions(+), 28 deletions(-)

diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
index 602fa87..b6322c6 100644
--- a/src/modules/module-combine-sink.c
+++ b/src/modules/module-combine-sink.c
@@ -90,9 +90,27 @@ struct output {
     pa_sink_input *sink_input;
     bool ignore_state_change;
 
-    pa_asyncmsgq *inq,    /* Message queue from the sink thread to this sink input */
-                 *outq;   /* Message queue from this sink input to the sink thread */
-    pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
+    /* This message queue is only for POST messages, i.e. the messages that
+     * carry audio data from the sink thread to the output thread. The POST
+     * messages need to be handled in a separate queue, because the queue is
+     * processed not only in the output thread mainloop, but also inside the
+     * sink input pop() callback. Processing other messages (such as
+     * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
+     * one reason why it's not safe is that messages that generate rewind
+     * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
+     * in the pop() callback. */
+    pa_asyncmsgq *audio_inq;
+
+    /* This message queue is for all other messages than POST from the sink
+     * thread to the output thread (currently "all other messages" means just
+     * the SET_REQUESTED_LATENCY message). */
+    pa_asyncmsgq *control_inq;
+
+    /* Message queue from the output thread to the sink thread. */
+    pa_asyncmsgq *outq;
+
+    pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
+    pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
     pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
 
     pa_memblockq *memblockq;
@@ -352,7 +370,7 @@ finish:
     pa_log_debug("Thread shutting down");
 }
 
-/* Called from I/O thread context */
+/* Called from combine sink I/O thread context */
 static void render_memblock(struct userdata *u, struct output *o, size_t length) {
     pa_assert(u);
     pa_assert(o);
@@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
 
     /* Maybe there's some data in the requesting output's queue
      * now? */
-    while (pa_asyncmsgq_process_one(o->inq) > 0)
+    while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
         ;
 
     /* Ok, now let's prepare some data if we really have to */
@@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
             if (j == o)
                 continue;
 
-            pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+            pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
         }
 
         /* And place it directly into the requesting output's queue */
@@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) {
     /* If another thread already prepared some data we received
      * the data over the asyncmsgq, hence let's first process
      * it. */
-    while (pa_asyncmsgq_process_one(o->inq) > 0)
+    while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
         ;
 
     /* Check whether we're now readable */
@@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) {
     pa_assert_se(o = i->userdata);
 
     /* Set up the queue from the sink thread to us */
-    pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
+    pa_assert(!o->audio_inq_rtpoll_item_read);
+    pa_assert(!o->control_inq_rtpoll_item_read);
+    pa_assert(!o->outq_rtpoll_item_write);
 
-    o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+    o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
             i->sink->thread_info.rtpoll,
             PA_RTPOLL_LATE,  /* This one is not that important, since we check for data in _peek() anyway. */
-            o->inq);
+            o->audio_inq);
+
+    o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+            i->sink->thread_info.rtpoll,
+            PA_RTPOLL_NORMAL,
+            o->control_inq);
 
     o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
             i->sink->thread_info.rtpoll,
@@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) {
      * pass any further data to this output */
     pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
 
-    if (o->inq_rtpoll_item_read) {
-        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
-        o->inq_rtpoll_item_read = NULL;
+    if (o->audio_inq_rtpoll_item_read) {
+        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
+        o->audio_inq_rtpoll_item_read = NULL;
+    }
+
+    if (o->control_inq_rtpoll_item_read) {
+        pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
+        o->control_inq_rtpoll_item_read = NULL;
     }
 
     if (o->outq_rtpoll_item_write) {
@@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) {
 
     PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
 
-    pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
+    pa_assert(!o->outq_rtpoll_item_read);
+    pa_assert(!o->audio_inq_rtpoll_item_write);
+    pa_assert(!o->control_inq_rtpoll_item_write);
 
     o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
             o->userdata->rtpoll,
             PA_RTPOLL_EARLY-1,  /* This item is very important */
             o->outq);
-    o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+    o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
             o->userdata->rtpoll,
             PA_RTPOLL_EARLY,
-            o->inq);
+            o->audio_inq);
+    o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+            o->userdata->rtpoll,
+            PA_RTPOLL_NORMAL,
+            o->control_inq);
 }
 
 /* Called from thread context of the io thread */
@@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) {
         o->outq_rtpoll_item_read = NULL;
     }
 
-    if (o->inq_rtpoll_item_write) {
-        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
-        o->inq_rtpoll_item_write = NULL;
+    if (o->audio_inq_rtpoll_item_write) {
+        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
+        o->audio_inq_rtpoll_item_write = NULL;
+    }
+
+    if (o->control_inq_rtpoll_item_write) {
+        pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
+        o->control_inq_rtpoll_item_write = NULL;
     }
 }
 
@@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) {
 
     /* Just hand this one over to all sink_inputs */
     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
-        pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL);
+        pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
+                          u->block_usec, NULL, NULL);
     }
 }
 
@@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
 
     o = pa_xnew0(struct output, 1);
     o->userdata = u;
-    o->inq = pa_asyncmsgq_new(0);
+    o->audio_inq = pa_asyncmsgq_new(0);
+    o->control_inq = pa_asyncmsgq_new(0);
     o->outq = pa_asyncmsgq_new(0);
     o->sink = sink;
     o->memblockq = pa_memblockq_new(
@@ -1004,18 +1047,26 @@ static void output_free(struct output *o) {
     output_disable(o);
     update_description(o->userdata);
 
-    if (o->inq_rtpoll_item_read)
-        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
-    if (o->inq_rtpoll_item_write)
-        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
+    if (o->audio_inq_rtpoll_item_read)
+        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
+    if (o->audio_inq_rtpoll_item_write)
+        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
+
+    if (o->control_inq_rtpoll_item_read)
+        pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
+    if (o->control_inq_rtpoll_item_write)
+        pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
 
     if (o->outq_rtpoll_item_read)
         pa_rtpoll_item_free(o->outq_rtpoll_item_read);
     if (o->outq_rtpoll_item_write)
         pa_rtpoll_item_free(o->outq_rtpoll_item_write);
 
-    if (o->inq)
-        pa_asyncmsgq_unref(o->inq);
+    if (o->audio_inq)
+        pa_asyncmsgq_unref(o->audio_inq);
+
+    if (o->control_inq)
+        pa_asyncmsgq_unref(o->control_inq);
 
     if (o->outq)
         pa_asyncmsgq_unref(o->outq);
@@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) {
 
     /* Finally, drop all queued data */
     pa_memblockq_flush_write(o->memblockq, true);
-    pa_asyncmsgq_flush(o->inq, false);
+    pa_asyncmsgq_flush(o->audio_inq, false);
+    pa_asyncmsgq_flush(o->control_inq, false);
     pa_asyncmsgq_flush(o->outq, false);
 }
 
-- 
2.1.4



More information about the pulseaudio-discuss mailing list