[pulseaudio-discuss] [PATCH] combine-sink: Fix unsafe message handling
Georg Chini
georg at chini.tk
Sat Jun 27 10:27:50 PDT 2015
On 27.06.2015 17:36, Tanu Kaskinen wrote:
> This fixes a crash. sink_input_pop_cb() drains the message queue that receives
> memchunks from the combine sink thread to avoid requesting more audio too soon.
> The same message queue received also SET_REQUESTED_LATENCY messages, which
> generate rewind requests. Rewind requests shouldn't be issued in the pop()
> callback, doing so results in an assertion error. Therefore, it was not safe to
> drain the message queue in the pop() callback, but usually the queue is empty,
> so this bug was not immediately detected.
>
> This patch splits the message queue into two queues: audio_inq and control_inq.
> audio_inq receives only messages containing memchunks, and control_inq receives
> only the SET_REQUESTED_LATENCY messages. The pop() callback only drains the
> audio queue, which avoids the rewind requests in the pop() callback.
>
> BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=90489
> ---
> src/modules/module-combine-sink.c | 108 ++++++++++++++++++++++++++++----------
> 1 file changed, 80 insertions(+), 28 deletions(-)
>
> diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
> index 602fa87..b6322c6 100644
> --- a/src/modules/module-combine-sink.c
> +++ b/src/modules/module-combine-sink.c
> @@ -90,9 +90,27 @@ struct output {
> pa_sink_input *sink_input;
> bool ignore_state_change;
>
> - pa_asyncmsgq *inq, /* Message queue from the sink thread to this sink input */
> - *outq; /* Message queue from this sink input to the sink thread */
> - pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
> + /* This message queue is only for POST messages, i.e. the messages that
> + * carry audio data from the sink thread to the output thread. The POST
> + * messages need to be handled in a separate queue, because the queue is
> + * processed not only in the output thread mainloop, but also inside the
> + * sink input pop() callback. Processing other messages (such as
> + * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
> + * one reason why it's not safe is that messages that generate rewind
> + * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
> + * in the pop() callback. */
> + pa_asyncmsgq *audio_inq;
> +
> + /* This message queue is for all other messages than POST from the sink
> + * thread to the output thread (currently "all other messages" means just
> + * the SET_REQUESTED_LATENCY message). */
> + pa_asyncmsgq *control_inq;
> +
> + /* Message queue from the output thread to the sink thread. */
> + pa_asyncmsgq *outq;
> +
> + pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
> + pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
> pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
>
> pa_memblockq *memblockq;
> @@ -352,7 +370,7 @@ finish:
> pa_log_debug("Thread shutting down");
> }
>
> -/* Called from I/O thread context */
> +/* Called from combine sink I/O thread context */
> static void render_memblock(struct userdata *u, struct output *o, size_t length) {
> pa_assert(u);
> pa_assert(o);
> @@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
>
> /* Maybe there's some data in the requesting output's queue
> * now? */
> - while (pa_asyncmsgq_process_one(o->inq) > 0)
> + while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
> ;
>
> /* Ok, now let's prepare some data if we really have to */
> @@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
> if (j == o)
> continue;
>
> - pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
> + pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
> }
>
> /* And place it directly into the requesting output's queue */
> @@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) {
> /* If another thread already prepared some data we received
> * the data over the asyncmsgq, hence let's first process
> * it. */
> - while (pa_asyncmsgq_process_one(o->inq) > 0)
> + while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
> ;
>
> /* Check whether we're now readable */
> @@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) {
> pa_assert_se(o = i->userdata);
>
> /* Set up the queue from the sink thread to us */
> - pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
> + pa_assert(!o->audio_inq_rtpoll_item_read);
> + pa_assert(!o->control_inq_rtpoll_item_read);
> + pa_assert(!o->outq_rtpoll_item_write);
>
> - o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> + o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> i->sink->thread_info.rtpoll,
> PA_RTPOLL_LATE, /* This one is not that important, since we check for data in _peek() anyway. */
> - o->inq);
> + o->audio_inq);
> +
> + o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> + i->sink->thread_info.rtpoll,
> + PA_RTPOLL_NORMAL,
> + o->control_inq);
>
> o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> i->sink->thread_info.rtpoll,
> @@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) {
> * pass any further data to this output */
> pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
>
> - if (o->inq_rtpoll_item_read) {
> - pa_rtpoll_item_free(o->inq_rtpoll_item_read);
> - o->inq_rtpoll_item_read = NULL;
> + if (o->audio_inq_rtpoll_item_read) {
> + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
> + o->audio_inq_rtpoll_item_read = NULL;
> + }
> +
> + if (o->control_inq_rtpoll_item_read) {
> + pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
> + o->control_inq_rtpoll_item_read = NULL;
> }
>
> if (o->outq_rtpoll_item_write) {
> @@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) {
>
> PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
>
> - pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
> + pa_assert(!o->outq_rtpoll_item_read);
> + pa_assert(!o->audio_inq_rtpoll_item_write);
> + pa_assert(!o->control_inq_rtpoll_item_write);
>
> o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> o->userdata->rtpoll,
> PA_RTPOLL_EARLY-1, /* This item is very important */
> o->outq);
> - o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> + o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> o->userdata->rtpoll,
> PA_RTPOLL_EARLY,
> - o->inq);
> + o->audio_inq);
> + o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> + o->userdata->rtpoll,
> + PA_RTPOLL_NORMAL,
> + o->control_inq);
> }
>
> /* Called from thread context of the io thread */
> @@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) {
> o->outq_rtpoll_item_read = NULL;
> }
>
> - if (o->inq_rtpoll_item_write) {
> - pa_rtpoll_item_free(o->inq_rtpoll_item_write);
> - o->inq_rtpoll_item_write = NULL;
> + if (o->audio_inq_rtpoll_item_write) {
> + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
> + o->audio_inq_rtpoll_item_write = NULL;
> + }
> +
> + if (o->control_inq_rtpoll_item_write) {
> + pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
> + o->control_inq_rtpoll_item_write = NULL;
> }
> }
>
> @@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) {
>
> /* Just hand this one over to all sink_inputs */
> PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
> - pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL);
> + pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
> + u->block_usec, NULL, NULL);
> }
> }
>
> @@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
>
> o = pa_xnew0(struct output, 1);
> o->userdata = u;
> - o->inq = pa_asyncmsgq_new(0);
> + o->audio_inq = pa_asyncmsgq_new(0);
> + o->control_inq = pa_asyncmsgq_new(0);
> o->outq = pa_asyncmsgq_new(0);
> o->sink = sink;
> o->memblockq = pa_memblockq_new(
> @@ -1004,18 +1047,26 @@ static void output_free(struct output *o) {
> output_disable(o);
> update_description(o->userdata);
>
> - if (o->inq_rtpoll_item_read)
> - pa_rtpoll_item_free(o->inq_rtpoll_item_read);
> - if (o->inq_rtpoll_item_write)
> - pa_rtpoll_item_free(o->inq_rtpoll_item_write);
> + if (o->audio_inq_rtpoll_item_read)
> + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
> + if (o->audio_inq_rtpoll_item_write)
> + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
> +
> + if (o->control_inq_rtpoll_item_read)
> + pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
> + if (o->control_inq_rtpoll_item_write)
> + pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
>
> if (o->outq_rtpoll_item_read)
> pa_rtpoll_item_free(o->outq_rtpoll_item_read);
> if (o->outq_rtpoll_item_write)
> pa_rtpoll_item_free(o->outq_rtpoll_item_write);
>
> - if (o->inq)
> - pa_asyncmsgq_unref(o->inq);
> + if (o->audio_inq)
> + pa_asyncmsgq_unref(o->audio_inq);
> +
> + if (o->control_inq)
> + pa_asyncmsgq_unref(o->control_inq);
>
> if (o->outq)
> pa_asyncmsgq_unref(o->outq);
> @@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) {
>
> /* Finally, drop all queued data */
> pa_memblockq_flush_write(o->memblockq, true);
> - pa_asyncmsgq_flush(o->inq, false);
> + pa_asyncmsgq_flush(o->audio_inq, false);
> + pa_asyncmsgq_flush(o->control_inq, false);
> pa_asyncmsgq_flush(o->outq, false);
> }
>
Hi Tanu,
couldn't you just replace the pa_asyncmsgq_post() with
pa_asyncmsgq_send() in
sink_update_requested_latency()? Then the latency updates would be done
synchronously and the situation you describe above should never happen.
Regards
Georg
More information about the pulseaudio-discuss
mailing list