[pulseaudio-discuss] [PATCH] combine-sink: Fix unsafe message handling

Tanu Kaskinen tanuk at iki.fi
Sat Jun 27 11:14:59 PDT 2015


On Sat, 2015-06-27 at 19:27 +0200, Georg Chini wrote:
> On 27.06.2015 17:36, Tanu Kaskinen wrote:
> > This fixes a crash. sink_input_pop_cb() drains the message queue that receives
> > memchunks from the combine sink thread to avoid requesting more audio too soon.
> > The same message queue received also SET_REQUESTED_LATENCY messages, which
> > generate rewind requests. Rewind requests shouldn't be issued in the pop()
> > callback, doing so results in an assertion error. Therefore, it was not safe to
> > drain the message queue in the pop() callback, but usually the queue is empty,
> > so this bug was not immediately detected.
> >
> > This patch splits the message queue into two queues: audio_inq and control_inq.
> > audio_inq receives only messages containing memchunks, and control_inq receives
> > only the SET_REQUESTED_LATENCY messages. The pop() callback only drains the
> > audio queue, which avoids the rewind requests in the pop() callback.
> >
> > BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=90489
> > ---
> >   src/modules/module-combine-sink.c | 108 ++++++++++++++++++++++++++++----------
> >   1 file changed, 80 insertions(+), 28 deletions(-)
> >
> > diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
> > index 602fa87..b6322c6 100644
> > --- a/src/modules/module-combine-sink.c
> > +++ b/src/modules/module-combine-sink.c
> > @@ -90,9 +90,27 @@ struct output {
> >       pa_sink_input *sink_input;
> >       bool ignore_state_change;
> >   
> > -    pa_asyncmsgq *inq,    /* Message queue from the sink thread to this sink input */
> > -                 *outq;   /* Message queue from this sink input to the sink thread */
> > -    pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
> > +    /* This message queue is only for POST messages, i.e. the messages that
> > +     * carry audio data from the sink thread to the output thread. The POST
> > +     * messages need to be handled in a separate queue, because the queue is
> > +     * processed not only in the output thread mainloop, but also inside the
> > +     * sink input pop() callback. Processing other messages (such as
> > +     * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
> > +     * one reason why it's not safe is that messages that generate rewind
> > +     * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
> > +     * in the pop() callback. */
> > +    pa_asyncmsgq *audio_inq;
> > +
> > +    /* This message queue is for all other messages than POST from the sink
> > +     * thread to the output thread (currently "all other messages" means just
> > +     * the SET_REQUESTED_LATENCY message). */
> > +    pa_asyncmsgq *control_inq;
> > +
> > +    /* Message queue from the output thread to the sink thread. */
> > +    pa_asyncmsgq *outq;
> > +
> > +    pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
> > +    pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
> >       pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
> >   
> >       pa_memblockq *memblockq;
> > @@ -352,7 +370,7 @@ finish:
> >       pa_log_debug("Thread shutting down");
> >   }
> >   
> > -/* Called from I/O thread context */
> > +/* Called from combine sink I/O thread context */
> >   static void render_memblock(struct userdata *u, struct output *o, size_t length) {
> >       pa_assert(u);
> >       pa_assert(o);
> > @@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
> >   
> >       /* Maybe there's some data in the requesting output's queue
> >        * now? */
> > -    while (pa_asyncmsgq_process_one(o->inq) > 0)
> > +    while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
> >           ;
> >   
> >       /* Ok, now let's prepare some data if we really have to */
> > @@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
> >               if (j == o)
> >                   continue;
> >   
> > -            pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
> > +            pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
> >           }
> >   
> >           /* And place it directly into the requesting output's queue */
> > @@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) {
> >       /* If another thread already prepared some data we received
> >        * the data over the asyncmsgq, hence let's first process
> >        * it. */
> > -    while (pa_asyncmsgq_process_one(o->inq) > 0)
> > +    while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
> >           ;
> >   
> >       /* Check whether we're now readable */
> > @@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) {
> >       pa_assert_se(o = i->userdata);
> >   
> >       /* Set up the queue from the sink thread to us */
> > -    pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
> > +    pa_assert(!o->audio_inq_rtpoll_item_read);
> > +    pa_assert(!o->control_inq_rtpoll_item_read);
> > +    pa_assert(!o->outq_rtpoll_item_write);
> >   
> > -    o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> > +    o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> >               i->sink->thread_info.rtpoll,
> >               PA_RTPOLL_LATE,  /* This one is not that important, since we check for data in _peek() anyway. */
> > -            o->inq);
> > +            o->audio_inq);
> > +
> > +    o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> > +            i->sink->thread_info.rtpoll,
> > +            PA_RTPOLL_NORMAL,
> > +            o->control_inq);
> >   
> >       o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> >               i->sink->thread_info.rtpoll,
> > @@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) {
> >        * pass any further data to this output */
> >       pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
> >   
> > -    if (o->inq_rtpoll_item_read) {
> > -        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
> > -        o->inq_rtpoll_item_read = NULL;
> > +    if (o->audio_inq_rtpoll_item_read) {
> > +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
> > +        o->audio_inq_rtpoll_item_read = NULL;
> > +    }
> > +
> > +    if (o->control_inq_rtpoll_item_read) {
> > +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
> > +        o->control_inq_rtpoll_item_read = NULL;
> >       }
> >   
> >       if (o->outq_rtpoll_item_write) {
> > @@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) {
> >   
> >       PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
> >   
> > -    pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
> > +    pa_assert(!o->outq_rtpoll_item_read);
> > +    pa_assert(!o->audio_inq_rtpoll_item_write);
> > +    pa_assert(!o->control_inq_rtpoll_item_write);
> >   
> >       o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
> >               o->userdata->rtpoll,
> >               PA_RTPOLL_EARLY-1,  /* This item is very important */
> >               o->outq);
> > -    o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> > +    o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> >               o->userdata->rtpoll,
> >               PA_RTPOLL_EARLY,
> > -            o->inq);
> > +            o->audio_inq);
> > +    o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
> > +            o->userdata->rtpoll,
> > +            PA_RTPOLL_NORMAL,
> > +            o->control_inq);
> >   }
> >   
> >   /* Called from thread context of the io thread */
> > @@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) {
> >           o->outq_rtpoll_item_read = NULL;
> >       }
> >   
> > -    if (o->inq_rtpoll_item_write) {
> > -        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
> > -        o->inq_rtpoll_item_write = NULL;
> > +    if (o->audio_inq_rtpoll_item_write) {
> > +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
> > +        o->audio_inq_rtpoll_item_write = NULL;
> > +    }
> > +
> > +    if (o->control_inq_rtpoll_item_write) {
> > +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
> > +        o->control_inq_rtpoll_item_write = NULL;
> >       }
> >   }
> >   
> > @@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) {
> >   
> >       /* Just hand this one over to all sink_inputs */
> >       PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
> > -        pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL);
> > +        pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
> > +                          u->block_usec, NULL, NULL);
> >       }
> >   }
> >   
> > @@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
> >   
> >       o = pa_xnew0(struct output, 1);
> >       o->userdata = u;
> > -    o->inq = pa_asyncmsgq_new(0);
> > +    o->audio_inq = pa_asyncmsgq_new(0);
> > +    o->control_inq = pa_asyncmsgq_new(0);
> >       o->outq = pa_asyncmsgq_new(0);
> >       o->sink = sink;
> >       o->memblockq = pa_memblockq_new(
> > @@ -1004,18 +1047,26 @@ static void output_free(struct output *o) {
> >       output_disable(o);
> >       update_description(o->userdata);
> >   
> > -    if (o->inq_rtpoll_item_read)
> > -        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
> > -    if (o->inq_rtpoll_item_write)
> > -        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
> > +    if (o->audio_inq_rtpoll_item_read)
> > +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
> > +    if (o->audio_inq_rtpoll_item_write)
> > +        pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
> > +
> > +    if (o->control_inq_rtpoll_item_read)
> > +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
> > +    if (o->control_inq_rtpoll_item_write)
> > +        pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
> >   
> >       if (o->outq_rtpoll_item_read)
> >           pa_rtpoll_item_free(o->outq_rtpoll_item_read);
> >       if (o->outq_rtpoll_item_write)
> >           pa_rtpoll_item_free(o->outq_rtpoll_item_write);
> >   
> > -    if (o->inq)
> > -        pa_asyncmsgq_unref(o->inq);
> > +    if (o->audio_inq)
> > +        pa_asyncmsgq_unref(o->audio_inq);
> > +
> > +    if (o->control_inq)
> > +        pa_asyncmsgq_unref(o->control_inq);
> >   
> >       if (o->outq)
> >           pa_asyncmsgq_unref(o->outq);
> > @@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) {
> >   
> >       /* Finally, drop all queued data */
> >       pa_memblockq_flush_write(o->memblockq, true);
> > -    pa_asyncmsgq_flush(o->inq, false);
> > +    pa_asyncmsgq_flush(o->audio_inq, false);
> > +    pa_asyncmsgq_flush(o->control_inq, false);
> >       pa_asyncmsgq_flush(o->outq, false);
> >   }
> >   
> Hi Tanu,
> 
> couldn't you just replace the pa_asyncmsgq_post() with 
> pa_asyncmsgq_send() in
> sink_update_requested_latency()? Then the latency updates would be done
> synchronously and the situation you describe above should never happen.

I don't see how that would help. Let's say that the pop() callback of
one of the sink inputs is just about to be called. It's inevitably going
to drain the message queue, but that hasn't yet happened. Then, before
the pop() callback gets executed, sink_update_requested_latency() calls
pa_asyncmsgq_send() and starts waiting for an answer. The pop() callback
continues, and finds the latency update message in the queue, processes
it, and in doing so, causes a rewind request. A crash will happen in
short order.

Also, the sink input threads send messages to the combine sink thread
using pa_asyncmsgq_send(). Using pa_asyncmsgq_send() also when sending
messages to the other direction is asking for deadlocks.

-- 
Tanu



More information about the pulseaudio-discuss mailing list