[pulseaudio-discuss] [PATCH] combine-sinks: add suport for DYNAMIC_LATENCY

Tanu Kaskinen tanu.kaskinen at linux.intel.com
Sun May 25 02:09:41 PDT 2014


On Tue, 2014-04-15 at 15:21 +0200, Wim Taymans wrote:
> Mark the sink as DYNAMIC_LATENCY and implement update_sink_latency_range
> on its sink-input to collect the combined latency range of all sinks.
> 
> Implement update_requested_latency on the sink to configure the final
> latency by combining the sink-input requested latencies. This makes us
> honour the client latency request.
> 
> We don't need to call update_max_request() and update_fixed_latency()
> when adding and removing outputs, this is already done when we attach
> the sink-inputs.

This paragraph could be worded better. We do need to call those when
adding and removing outputs, it's just that the place of calling changes
from the ADD_OUTPUT and REMOVE_OUTPUT message handler to the attach
callback and to the disable_output() function.

> 
> Also add more debug log.
> 
> Fixes https://bugs.freedesktop.org/show_bug.cgi?id=47899
> ---
>  src/modules/module-combine-sink.c | 135 +++++++++++++++++++++++++++-----------
>  1 file changed, 98 insertions(+), 37 deletions(-)
> 
> diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
> index deabceb..d705542 100644
> --- a/src/modules/module-combine-sink.c
> +++ b/src/modules/module-combine-sink.c
> @@ -104,7 +104,8 @@ struct output {
>  
>      /* For communication of the stream parameters to the sink thread */
>      pa_atomic_t max_request;
> -    pa_atomic_t requested_latency;
> +    pa_atomic_t max_latency;
> +    pa_atomic_t min_latency;
>  
>      PA_LLIST_FIELDS(struct output);
>  };
> @@ -150,11 +151,12 @@ enum {
>      SINK_MESSAGE_NEED,
>      SINK_MESSAGE_UPDATE_LATENCY,
>      SINK_MESSAGE_UPDATE_MAX_REQUEST,
> -    SINK_MESSAGE_UPDATE_REQUESTED_LATENCY
> +    SINK_MESSAGE_UPDATE_LATENCY_RANGE
>  };
>  
>  enum {
>      SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
> +    SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY
>  };
>  
>  static void output_disable(struct output *o);
> @@ -469,35 +471,44 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
>          return;
>  
>      pa_atomic_store(&o->max_request, (int) nbytes);
> +    pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes);
>      pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
>  }
>  
>  /* Called from thread context */
> -static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
> +static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
>      struct output *o;
> -    pa_usec_t c;
> +    pa_usec_t min, max, fix;
>  
>      pa_assert(i);
>  
>      pa_sink_input_assert_ref(i);
>      pa_assert_se(o = i->userdata);
>  
> -    c = pa_sink_get_requested_latency_within_thread(i->sink);
> -
> -    if (c == (pa_usec_t) -1)
> -        c = i->sink->thread_info.max_latency;
> +    fix = i->sink->thread_info.fixed_latency;
> +    if (fix > 0) {
> +      min = fix;
> +      max = fix;
> +    } else {
> +      min = i->sink->thread_info.min_latency;
> +      max = i->sink->thread_info.max_latency;

Too little indentation.

> +    }
>  
> -    if (pa_atomic_load(&o->requested_latency) == (int) c)
> +    if ((pa_atomic_load(&o->min_latency) == (int) min) &&
> +        (pa_atomic_load(&o->max_latency) == (int) max))
>          return;
>  
> -    pa_atomic_store(&o->requested_latency, (int) c);
> -    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
> +    pa_atomic_store(&o->min_latency, (int) min);
> +    pa_atomic_store(&o->max_latency, (int) max);
> +    pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max);
> +    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
>  }
>  
>  /* Called from I/O thread context */
>  static void sink_input_attach_cb(pa_sink_input *i) {
>      struct output *o;
> -    pa_usec_t c;
> +    pa_usec_t fix, min, max;
> +    size_t nbytes;
>  
>      pa_sink_input_assert_ref(i);
>      pa_assert_se(o = i->userdata);
> @@ -517,13 +528,23 @@ static void sink_input_attach_cb(pa_sink_input *i) {
>  
>      pa_sink_input_request_rewind(i, 0, false, true, true);
>  
> -    pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i));
> +    nbytes = pa_sink_input_get_max_request(i);
> +    pa_atomic_store(&o->max_request, (int) nbytes);
> +    pa_log_debug("attach max request %lu", (unsigned long) nbytes);
>  
> -    c = pa_sink_get_requested_latency_within_thread(i->sink);
> -    pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c));
> +    fix = i->sink->thread_info.fixed_latency;
> +    if (fix > 0) {
> +      min = max = fix;
> +    } else {
> +      min = i->sink->thread_info.min_latency;
> +      max = i->sink->thread_info.max_latency;

Too little indentation.

> +    }
> +    pa_atomic_store(&o->min_latency, (int) min);
> +    pa_atomic_store(&o->max_latency, (int) max);
> +    pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max);
>  
>      pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
> -    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
> +    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
>  }
>  
>  /* Called from I/O thread context */
> @@ -580,6 +601,14 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
>                  pa_memblockq_flush_write(o->memblockq, true);
>  
>              return 0;
> +
> +        case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: {
> +            pa_usec_t latency = (pa_usec_t) offset;
> +
> +            pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency);
> +
> +            return 0;
> +        }
>      }
>  
>      return pa_sink_input_process_msg(obj, code, data, offset, chunk);
> @@ -671,31 +700,37 @@ static void update_max_request(struct userdata *u) {
>      if (max_request <= 0)
>          max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
>  
> +    pa_log_debug("Sink update max request %lu", (unsigned long) max_request);
>      pa_sink_set_max_request_within_thread(u->sink, max_request);
>  }
>  
>  /* Called from IO context */
> -static void update_fixed_latency(struct userdata *u) {
> -    pa_usec_t fixed_latency = 0;
> +static void update_latency_range(struct userdata *u) {
> +    pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1;
>      struct output *o;
>  
>      pa_assert(u);
>      pa_sink_assert_io_context(u->sink);
>  
> -    /* Collects the requested_latency values of all streams and sets
> -     * the largest one as fixed_latency locally */
> -
> +    /* Collects the latency_range values of all streams and sets
> +     * the max of min and min of max locally */
>      PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
> -        pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency);
> +        pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency);
> +        pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency);
>  
> -        if (rl > fixed_latency)
> -            fixed_latency = rl;
> +        if (min > min_latency)
> +            min_latency = min;
> +        if (max_latency == (pa_usec_t) -1 || max < max_latency)
> +            max_latency = max;
>      }
> -
> -    if (fixed_latency <= 0)
> -        fixed_latency = u->block_usec;
> -
> -    pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency);
> +    if (max_latency == (pa_usec_t) -1)
> +      /* no outputs, use block size */
> +      min_latency = max_latency = u->block_usec;
> +    else if (max_latency < min_latency)
> +      max_latency = min_latency;

Too little indentation.

> +
> +    pa_log_debug("Sink update latency range %lu %lu", min_latency, max_latency);
> +    pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency);
>  }
>  
>  /* Called from thread context of the io thread */
> @@ -735,6 +770,28 @@ static void output_remove_within_thread(struct output *o) {
>      }
>  }
>  
> +/* Called from sink I/O thread context */
> +static void sink_update_requested_latency(pa_sink *s) {
> +    struct userdata *u;
> +    struct output *o;
> +    pa_usec_t latency;
> +
> +    pa_sink_assert_ref(s);
> +    pa_assert_se(u = s->userdata);
> +
> +    latency = pa_sink_get_requested_latency_within_thread(s);
> +    if (latency == (pa_usec_t) -1)
> +      return;

Why do you return here? If the sink requested latency gets relaxed to
-1, i.e. "no particular latency requested", we should relax the
downstream sink input latency requests too.

> +
> +    pa_log_debug("Sink update requested latency %0.2f", (double) latency / PA_USEC_PER_MSEC);
> +
> +    /* Just hand this one over to all sink_inputs */
> +    PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
> +        pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, latency, NULL, NULL);
> +    }
> +}
> +
> +
>  /* Called from thread context of the io thread */
>  static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
>      struct userdata *u = PA_SINK(o)->userdata;
> @@ -772,14 +829,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>  
>          case SINK_MESSAGE_ADD_OUTPUT:
>              output_add_within_thread(data);
> -            update_max_request(u);
> -            update_fixed_latency(u);
>              return 0;
>  
>          case SINK_MESSAGE_REMOVE_OUTPUT:
>              output_remove_within_thread(data);
> -            update_max_request(u);
> -            update_fixed_latency(u);
>              return 0;
>  
>          case SINK_MESSAGE_NEED:
> @@ -805,9 +858,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>              update_max_request(u);
>              break;
>  
> -        case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY:
> -            update_fixed_latency(u);
> +        case SINK_MESSAGE_UPDATE_LATENCY_RANGE:
> +            update_latency_range(u);
>              break;
> +
>  }
>  
>      return pa_sink_process_msg(o, code, data, offset, chunk);
> @@ -879,7 +933,7 @@ static int output_create_sink_input(struct output *o) {
>      o->sink_input->process_rewind = sink_input_process_rewind_cb;
>      o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
>      o->sink_input->update_max_request = sink_input_update_max_request_cb;
> -    o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
> +    o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
>      o->sink_input->attach = sink_input_attach_cb;
>      o->sink_input->detach = sink_input_detach_cb;
>      o->sink_input->kill = sink_input_kill_cb;
> @@ -997,6 +1051,11 @@ static void output_disable(struct output *o) {
>       * pass any further data to this output */
>      pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
>  
> +    /* now that this sink is removed from the list, update the combined properties
> +     * of the remaining sinks */
> +    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
> +    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);

We're in the main thread, and o->outq is meant for messages from the
sink input thread to the sink thread. This probably doesn't matter in
practice, because the target thread is the same and the message queue is
safe for multiple writers (and the sink input is unlinked anyway so even
the multiple writer safeness doesn't really matter), but
o->userdata->sink->asyncmsgq would be more logical choice.

A bigger problem is that you use post() instead of send(). post() is
asynchronous, send() is synchronous. If the sink thread doesn't process
the messages before we call pa_asyncmsgq_flush() a few lines down, the
messages are lost. You should use send() here. Or actually, sending
three messages to the same target thread is pretty redundant thing to
do, you could just add update_max_request() and update_latency_range()
calls to the SINK_MESSAGE_REMOVE_OUTPUT handler. Then these two messages
wouldn't need to be sent at all.

Thanks for the updated patch again. It seems that there's nothing
drastic to change in the logic any more.

-- 
Tanu



More information about the pulseaudio-discuss mailing list