[pulseaudio-discuss] [PATCH] combine-sink: Fix unsafe message handling

Georg Chini georg at chini.tk
Sat Jun 27 10:27:50 PDT 2015


On 27.06.2015 17:36, Tanu Kaskinen wrote:
> This fixes a crash. sink_input_pop_cb() drains the message queue that receives
> memchunks from the combine sink thread to avoid requesting more audio too soon.
> The same message queue received also SET_REQUESTED_LATENCY messages, which
> generate rewind requests. Rewind requests shouldn't be issued in the pop()
> callback, doing so results in an assertion error. Therefore, it was not safe to
> drain the message queue in the pop() callback, but usually the queue is empty,
> so this bug was not immediately detected.
>
> This patch splits the message queue into two queues: audio_inq and control_inq.
> audio_inq receives only messages containing memchunks, and control_inq receives
> only the SET_REQUESTED_LATENCY messages. The pop() callback only drains the
> audio queue, which avoids the rewind requests in the pop() callback.
>
> BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=90489
> ---
>   src/modules/module-combine-sink.c | 108 ++++++++++++++++++++++++++++----------
>   1 file changed, 80 insertions(+), 28 deletions(-)
>
> diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
> index 602fa87..b6322c6 100644
> --- a/src/modules/module-combine-sink.c
> +++ b/src/modules/module-combine-sink.c
> @@ -90,9 +90,27 @@ struct output {
>       pa_sink_input *sink_input;
>       bool ignore_state_change;
>   
> -    pa_asyncmsgq *inq,    /* Message queue from the sink thread to this sink input */
> -                 *outq;   /* Message queue from this sink input to the sink thread */
> -    pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
> +    /* This message queue is only for POST messages, i.e. the messages that
> +     * carry audio data from the sink thread to the output thread. The POST
> +     * messages need to be handled in a separate queue, because the queue is
> +     * processed not only in the output thread mainloop, but also inside the
> +     * sink input pop() callback. Processing other messages (such as
> +     * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
> +     * one reason why it's not safe is that messages that generate rewind
> +     * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
> +     * in the pop() callback. */
> +    pa_asyncmsgq *audio_inq;
> +
> +    /* This message queue is for all other messages than POST from the sink
> +     * thread to the output thread (currently "all other messages" means just
> +     * the SET_REQUESTED_LATENCY message). */
> +    pa_asyncmsgq *control_inq;
> +
> +    /* Message queue from the output thread to the sink thread. */
> +    pa_asyncmsgq *outq;
> +
> +    pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
> +    pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
>       pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
>   
>       pa_memblockq *memblockq;
> @@ -352,7 +370,7 @@ finish:
>       pa_log_debug("Thread shutting down");
>   }
>   
> -/* Called from I/O thread context */
> +/* Called from combine sink I/O thread context */
>   static void render_memblock(struct userdata *u, struct output *o, size_t length) {
>       pa_assert(u);
>       pa_assert(o);
> @@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
>   
>       /* Maybe there's some data in the requesting output's queue
>        * now? */
> -    while (pa_asyncmsgq_process_one(o->inq) > 0)
> +    while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
>           ;
>   
>       /* Ok, now let's prepare some data if we really have to */
> @@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
>               if (j == o)
>                   continue;
>   
> -            pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
> +            pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
>           }
>   
>           /* And place it directly into the requesting output's queue */
> @@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) {
>       /* If another thread already prepared some data we received
>        * the data over the asyncmsgq, hence let's first process
>        * it. */
> -    while (pa_asyncmsgq_process_one(o->inq) > 0)
> +    while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
>           ;
>   
>       /* Check whether we're now readable */
> @@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) {
>       pa_assert_se(o = i->userdata);
>   
>       /* Set up the queue from the sink thread to us */
> -    pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
> +    pa_assert(!o->audio_inq_rtpoll_item_read);
> +    pa_assert(!o->control_inq_rtpoll_item_read);
> +    pa_assert(!o->outq_rtpoll_item_write);
>   
> -    o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> +    o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
>               i->sink->thread_info.rtpoll,
>               PA_RTPOLL_LATE,  /* This one is not that important, since we check for data in _peek() anyway. */
> -            o->inq);
> +            o->audio_inq);
> +
> +    o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> +            i->sink->thread_info.rtpoll,
> +            PA_RTPOLL_NORMAL,
> +            o->control_inq);
>   
>       o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
>               i->sink->thread_info.rtpoll,
> @@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) {
>        * pass any further data to this output */
>       pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
>   
> -    if (o->inq_rtpoll_item_read) {
> -        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
> -        o->inq_rtpoll_item_read = NULL;
> +    if (o->audio_inq_rtpoll_item_read) {
> +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
> +        o->audio_inq_rtpoll_item_read = NULL;
> +    }
> +
> +    if (o->control_inq_rtpoll_item_read) {
> +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
> +        o->control_inq_rtpoll_item_read = NULL;
>       }
>   
>       if (o->outq_rtpoll_item_write) {
> @@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) {
>   
>       PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
>   
> -    pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
> +    pa_assert(!o->outq_rtpoll_item_read);
> +    pa_assert(!o->audio_inq_rtpoll_item_write);
> +    pa_assert(!o->control_inq_rtpoll_item_write);
>   
>       o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
>               o->userdata->rtpoll,
>               PA_RTPOLL_EARLY-1,  /* This item is very important */
>               o->outq);
> -    o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> +    o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
>               o->userdata->rtpoll,
>               PA_RTPOLL_EARLY,
> -            o->inq);
> +            o->audio_inq);
> +    o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> +            o->userdata->rtpoll,
> +            PA_RTPOLL_NORMAL,
> +            o->control_inq);
>   }
>   
>   /* Called from thread context of the io thread */
> @@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) {
>           o->outq_rtpoll_item_read = NULL;
>       }
>   
> -    if (o->inq_rtpoll_item_write) {
> -        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
> -        o->inq_rtpoll_item_write = NULL;
> +    if (o->audio_inq_rtpoll_item_write) {
> +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
> +        o->audio_inq_rtpoll_item_write = NULL;
> +    }
> +
> +    if (o->control_inq_rtpoll_item_write) {
> +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
> +        o->control_inq_rtpoll_item_write = NULL;
>       }
>   }
>   
> @@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) {
>   
>       /* Just hand this one over to all sink_inputs */
>       PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
> -        pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL);
> +        pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
> +                          u->block_usec, NULL, NULL);
>       }
>   }
>   
> @@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
>   
>       o = pa_xnew0(struct output, 1);
>       o->userdata = u;
> -    o->inq = pa_asyncmsgq_new(0);
> +    o->audio_inq = pa_asyncmsgq_new(0);
> +    o->control_inq = pa_asyncmsgq_new(0);
>       o->outq = pa_asyncmsgq_new(0);
>       o->sink = sink;
>       o->memblockq = pa_memblockq_new(
> @@ -1004,18 +1047,26 @@ static void output_free(struct output *o) {
>       output_disable(o);
>       update_description(o->userdata);
>   
> -    if (o->inq_rtpoll_item_read)
> -        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
> -    if (o->inq_rtpoll_item_write)
> -        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
> +    if (o->audio_inq_rtpoll_item_read)
> +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
> +    if (o->audio_inq_rtpoll_item_write)
> +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
> +
> +    if (o->control_inq_rtpoll_item_read)
> +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
> +    if (o->control_inq_rtpoll_item_write)
> +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
>   
>       if (o->outq_rtpoll_item_read)
>           pa_rtpoll_item_free(o->outq_rtpoll_item_read);
>       if (o->outq_rtpoll_item_write)
>           pa_rtpoll_item_free(o->outq_rtpoll_item_write);
>   
> -    if (o->inq)
> -        pa_asyncmsgq_unref(o->inq);
> +    if (o->audio_inq)
> +        pa_asyncmsgq_unref(o->audio_inq);
> +
> +    if (o->control_inq)
> +        pa_asyncmsgq_unref(o->control_inq);
>   
>       if (o->outq)
>           pa_asyncmsgq_unref(o->outq);
> @@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) {
>   
>       /* Finally, drop all queued data */
>       pa_memblockq_flush_write(o->memblockq, true);
> -    pa_asyncmsgq_flush(o->inq, false);
> +    pa_asyncmsgq_flush(o->audio_inq, false);
> +    pa_asyncmsgq_flush(o->control_inq, false);
>       pa_asyncmsgq_flush(o->outq, false);
>   }
>   
Hi Tanu,

couldn't you just replace the pa_asyncmsgq_post() with 
pa_asyncmsgq_send() in
sink_update_requested_latency()? Then the latency updates would be done
synchronously and the situation you describe above should never happen.

Regards
              Georg


More information about the pulseaudio-discuss mailing list