[pulseaudio-discuss] [PATCH 02/21 v2] loopback: Initialize latency at startup and during source/sink changes

Tanu Kaskinen tanuk at iki.fi
Thu Feb 23 11:17:09 UTC 2017


On Sun, 2017-02-19 at 17:15 +0100, Georg Chini wrote:
> The current code does not make any attempt to initialize the end-to-end latency
> to a value near the desired latency. This leads to underruns at startup because
> the memblockq is initially empty and to very long adjustment times for long
> latencies because the end-to-end latency at startup is significantly shorter
> than the desired value.
> This patch initializes the memblockq at startup and during source or sink changes
> so that the end-to-end latency will be near the configured value. It also ensures
> that there are no underruns if the source is slow to start and that the latency
> does not grow too much when the sink is slow to start by adjusting the length of
> the memblockq until the source has called push for the first time and the sink
> has called pop for the second time. Waiting for the second pop is necessary
> because the sink has not been started when the first pop is called.

"the sink has not been started" -> "the sink has not necessarily been
started". The sink can already be running when the loopback is added to
it.

I don't think it's actually guaranteed that the sink is running yet
when pop is called for the second time. For alsa sinks that's probably
guaranteed, but I think the bluetooth sink will start rendering data as
soon as the socket is writable, and I don't know if the socket being
writable has anything to do with the full audio pipeline being up and
running. If the pipeline is guaranteed to be running when the socket is
writable, then there's no problem, but if there are no such guarantees,
then we may write multiple chunks of audio (implying multiple pop
calls) before the pipeline really starts running.

I don't know how other sink implementations work.

In any case, waiting for the second pop seems like the most sensible
solution given the available information.

> For clarity, variables have been separated into input, output and main thread
> variables.

Was the commit message supposed to have three paragraphs? Please use an
empty line between paragraphs.

The commit message doesn't say anything about the changes to the
corking logic. Are those changes necessary for this patch, or could
they be in a separate patch? No matter in which patch those changes are
made, the commit message should explain the problem with the old
corking logic.

> 
> ---
>  src/modules/module-loopback.c | 324 +++++++++++++++++++++++++++++++++++++-----
>  1 file changed, 290 insertions(+), 34 deletions(-)
> 
> diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c
> index 2907bbc..a5705d5 100644
> --- a/src/modules/module-loopback.c
> +++ b/src/modules/module-loopback.c
> @@ -61,6 +61,8 @@ PA_MODULE_USAGE(
>  
>  #define MEMBLOCKQ_MAXLENGTH (1024*1024*32)
>  
> +#define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC)
> +
>  #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC)
>  
>  struct userdata {
> @@ -78,14 +80,18 @@ struct userdata {
>      pa_time_event *time_event;
>      pa_usec_t adjust_time;
>  
> -    int64_t recv_counter;
> -    int64_t send_counter;
> -
>      size_t skip;
>      pa_usec_t latency;
>  
> -    bool in_pop;
> +    /* Latency boundaries and current values */
> +    pa_usec_t min_source_latency;
> +    pa_usec_t max_source_latency;
> +    pa_usec_t min_sink_latency;
> +    pa_usec_t max_sink_latency;
> +    pa_usec_t configured_sink_latency;
> +    pa_usec_t configured_source_latency;
>  
> +    /* Used for sink input and source output snapshots */
>      struct {
>          int64_t send_counter;
>          pa_usec_t source_latency;
> @@ -96,6 +102,22 @@ struct userdata {
>          pa_usec_t sink_latency;
>          pa_usec_t sink_timestamp;
>      } latency_snapshot;
> +
> +    /* Input thread variable */
> +    int64_t send_counter;
> +
> +    /* Output thread variables */
> +    struct {
> +        int64_t recv_counter;
> +        pa_usec_t effective_source_latency;
> +
> +        /* Various booleans */
> +        bool in_pop;
> +        bool pop_called;
> +        bool pop_adjust;
> +        bool first_pop_done;
> +        bool push_called;
> +    } output_thread_info;
>  };
>  
>  static const char* const valid_modargs[] = {
> @@ -118,11 +140,14 @@ static const char* const valid_modargs[] = {
>  enum {
>      SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
>      SINK_INPUT_MESSAGE_REWIND,
> -    SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
> +    SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
> +    SINK_INPUT_MESSAGE_SINK_CHANGED,

The SINK_CHANGED message seems unnecessary. It's sent from
sink_input_moving_cb, which is called when the sink input is not
attached to any sink. The message handler just sets a couple of output
thread variables. Since the sink input is not attached to any sink,
there's no output thread, so there are no concurrency problems if you
set the variables directly in sink_input_moving_cb.

> +    SINK_INPUT_MESSAGE_SOURCE_CHANGED,
> +    SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY
>  };
>  
>  enum {
> -    SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
> +    SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX
>  };
>  
>  static void enable_adjust_timer(struct userdata *u, bool enable);
> @@ -279,10 +304,70 @@ static void update_adjust_timer(struct userdata *u) {
>          enable_adjust_timer(u, true);
>  }
>  
> +/* Called from main thread
> + * Calculates minimum and maximum possible latency for source and sink */
> +static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) {
> +
> +    if (source) {
> +        /* Source latencies */
> +        if (source->flags & PA_SOURCE_DYNAMIC_LATENCY)
> +            pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
> +        else {
> +            u->min_source_latency = pa_source_get_fixed_latency(source);
> +            u->max_source_latency = u->min_source_latency;
> +        }

Sidenote: it seems silly that pa_source_get_latency_range() doesn't
work when fixed latency is used. The function could just return the
fixed latency as the min and max latency. If you feel like it, you
could add a patch that fixes this. (The same applies to the sink API.)

> +        /* Latencies below 2.5 ms cause problems, limit source latency if possible */
> +        if (u->max_source_latency >= MIN_DEVICE_LATENCY)
> +            u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
> +        else
> +           u->min_source_latency = u->max_source_latency;

Indentation is a bit off on this last line.

> +    }
> +
> +    if (sink) {
> +        /* Sink latencies */
> +        if (sink->flags & PA_SINK_DYNAMIC_LATENCY)
> +            pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
> +        else {
> +            u->min_sink_latency = pa_sink_get_fixed_latency(sink);
> +            u->max_sink_latency = u->min_sink_latency;
> +        }
> +        /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
> +        if (u->max_sink_latency >= MIN_DEVICE_LATENCY)
> +            u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
> +        else
> +           u->min_sink_latency = u->max_sink_latency;

Indentation is a bit off on this last line.

> +    }
> +}
> +
> +/* Called from output context
> + * Sets the memblockq to the configured latency corrected by latency_offset_usec */
> +static void memblockq_adjust(struct userdata *u, pa_usec_t latency_offset_usec, bool allow_push) {
> +    size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
> +    pa_usec_t requested_buffer_latency;
> +
> +    requested_buffer_latency = PA_CLIP_SUB(u->latency, latency_offset_usec);
> +    requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
> +    current_memblockq_length = pa_memblockq_get_length(u->memblockq);
> +
> +    if (current_memblockq_length > requested_memblockq_length) {
> +        /* Drop audio from queue */
> +        buffer_correction = current_memblockq_length - requested_memblockq_length;
> +        pa_log_info("Dropping %lu usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
> +        pa_memblockq_drop(u->memblockq, buffer_correction);
> +
> +    } else if (current_memblockq_length < requested_memblockq_length && allow_push) {
> +        /* Add silence to queue */
> +        buffer_correction = requested_memblockq_length - current_memblockq_length;
> +        pa_log_info("Adding %lu usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
> +        pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true);
> +    }
> +}
> +
>  /* Called from input thread context */
>  static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
>      struct userdata *u;
>      pa_memchunk copy;
> +    pa_usec_t push_time, current_source_latency;
>  
>      pa_source_output_assert_ref(o);
>      pa_source_output_assert_io_context(o);
> @@ -302,7 +387,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
>          chunk = ©
>      }
>  
> -    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, chunk, NULL);
> +    /* Send current source latency and timestamp with the message */
> +    push_time = pa_rtclock_now();
> +    current_source_latency = pa_source_get_latency_within_thread(u->source_output->source);
> +
> +    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_UINT_TO_PTR(current_source_latency), push_time, chunk, NULL);
>      u->send_counter += (int64_t) chunk->length;
>  }
>  
> @@ -342,6 +431,45 @@ static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data,
>      return pa_source_output_process_msg(obj, code, data, offset, chunk);
>  }
>  
> +/* Called from main thread.
> + * Get current effective latency of the source. If the source is in use with
> + * smaller latency than the configured latency, it will continue running with
> + * the smaller value when the source output is switched to the source. */
> +static void get_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) {

I think "update" would be a better verb for the function name. "get"
functions usually return something.

> +    pa_usec_t effective_source_latency;
> +
> +    effective_source_latency = u->configured_source_latency;
> +
> +    if (source) {
> +        effective_source_latency = pa_source_get_requested_latency(source);
> +        if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency)
> +            effective_source_latency = u->configured_source_latency;
> +    }

It looks like this code should take into account also the minimum
latency of the source. Or is it ok to set effective_source_latency to a
lower value than the minimum latency of the source?

> +
> +    /* If the sink is valid, send a message to the output thread, else set the variable directly */
> +    if (sink)
> +        pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL);
> +    else
> +       u->output_thread_info.effective_source_latency = effective_source_latency;
> +}
> +
> +/* Called from main thread.
> + * Set source output latency to one third of the overall latency if possible.
> + * The choice of one third is rather arbitrary somewhere between the minimum
> + * possible latency (which would cause a lot of CPU load) and half the configured
> + * latency (which would lead to an empty memblockq if the sink is configured
> + * likewise). */

The comment is a bit inaccurate: the other end of the possible scale
isn't half the configured latency, and using half the configured
latency wouldn't necessarily lead to an empty memblockq (and the
memblockq can never be empty all the time).

Minimizing the memblockq length could be achieved with the following
algorithm:

1) Configure both sink and source latency to half of u->latency.

2) If that fails, because the max latency limit of the sink or source
doesn't allow setting that high latency, set the latency to maximum
supported for the sink or source (depending on which of those reached
the max latency limit).

3) If the sink max latency was the limiting factor in step 2, increase
the source latency until u->latency is reached, or until the source's
max latency limit is hit too. Conversely, if the source max latency was
the limiting factor in step 2, increase the sink latency.

If u->latency is fully allocated to the sink and source latencies,
leaving no slack to be allocated for the memblockq, that doesn't mean
that the memblockq will be empty, except possibly for very short
durations.

The sum of the sink latency, the memblockq length and the source
latency at any given moment is constant (assuming perfect clock drift
compensation). The configured sink and source latencies together never
exceed the target end-to-end latency. Therefore, the only situation
where the memblockq can get empty is when the sink latency is at its
peak (i.e. right after reading from the memblockq) and when the source
latency is at its peak (i.e. right before writing to the memblockq).
Since the reads and writes aren't synchronized, it's a relatively rare
event that we end up in a situation where the sink has just read data
and the source is just about to write data.

I believe minimizing the memblockq length would be a sensible strategy,
since it would minimize the CPU usage. I'm fine with the "configure
sink and source with a third of u->latency" approach, however. I'd just
like the comment to describe the options accurately.

> +static void set_source_output_latency(struct userdata *u, pa_source *source) {
> +    pa_usec_t latency, requested_latency;
> +
> +    requested_latency = u->latency / 3;
> +
> +    latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency);
> +    u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
> +    if (u->configured_source_latency != requested_latency)
> +        pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
> +}
> +
>  /* Called from input thread context */
>  static void source_output_attach_cb(pa_source_output *o) {
>      struct userdata *u;
> @@ -435,12 +563,26 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
>      if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
>          pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n);
>  
> -    if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED)
> -        pa_sink_input_cork(u->sink_input, true);
> -    else
> +    /* Set latency and calculate latency limits */
> +    update_latency_boundaries(u, dest, NULL);
> +    set_source_output_latency(u, dest);
> +    get_effective_source_latency(u, dest, u->sink_input->sink);
> +
> +    if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) {
> +        if (dest->suspend_cause != PA_SUSPEND_IDLE)
> +            pa_sink_input_cork(u->sink_input, true);
> +    } else
>          pa_sink_input_cork(u->sink_input, false);

Is the intention to cork the sink input if the source is suspended for
a non-idle reason, and uncork it otherwise? This code doesn't do
anything if the source is suspended due to being idle, so if the sink
input was corked before, it will stay corked, which doesn't make much
sense to me, but maybe I'm missing something, because the old code
didn't uncork the sink input in this situation either.

>  
>      update_adjust_timer(u);
> +
> +    /* Send a mesage to the output thread that the source has changed.
> +     * If the sink is invalid here during a profile switching situation
> +     * we can safely set push_called to false directly. */
> +    if (u->sink_input->sink)
> +        pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
> +    else
> +        u->output_thread_info.push_called = false;
>  }
>  
>  /* Called from main thread */
> @@ -451,6 +593,18 @@ static void source_output_suspend_cb(pa_source_output *o, bool suspended) {
>      pa_assert_ctl_context();
>      pa_assert_se(u = o->userdata);
>  
> +    /* If the source output has been suspended, we need to handle this like

"source output" -> "source" (the same terminology mistake is in
sink_input_suspend_cb())

> +     * a source change when the source output is resumed */
> +    if (suspended) {
> +        if (u->sink_input->sink)
> +            pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
> +        else
> +            u->output_thread_info.push_called = false;
> +
> +    } else
> +        /* Get effective source latency on unsuspend */
> +        get_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
> +
>      pa_sink_input_cork(u->sink_input, suspended);
>  
>      update_adjust_timer(u);
> @@ -465,10 +619,22 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
>      pa_assert_se(u = i->userdata);
>      pa_assert(chunk);
>  
> -    u->in_pop = true;
> +    /* It seems necessary to handle outstanding push messages here, though it is not clear
> +     * why. Removing this part leads to underruns when low latencies are configured. */
> +    u->output_thread_info.in_pop = true;
>      while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
>          ;
> -    u->in_pop = false;
> +    u->output_thread_info.in_pop = false;
> +
> +    /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
> +     * enabled. Disable them on second pop and enable the final adjustment during the
> +     * next push. We are waiting for the second pop, because the first pop is called
> +     * before the sink is actually started. */

"is called" -> "may be called"

> +    if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
> +        u->output_thread_info.pop_adjust = true;

If I understood correctly, this "requests" the POST handler to adjust
the memblockq. Why is the adjustment delayed until the next POST? Why
not adjust the memblockq right away, if push_called is true? Ah, I
think I know: adjusting the memblockq requires knowledge of the current
source latency, and we don't have that information here. Maybe add a
comment about this?

> +        u->output_thread_info.pop_called = true;
> +    }
> +    u->output_thread_info.first_pop_done = true;
>  
>      if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
>          pa_log_info("Could not peek into queue");
> @@ -478,6 +644,12 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
>      chunk->length = PA_MIN(chunk->length, nbytes);
>      pa_memblockq_drop(u->memblockq, chunk->length);
>  
> +    /* If push has not been called yet, assume that the source will deliver one full latency
> +     * when it starts pushing. Adjust the memblockq accordingly and ensure that there is
> +     * enough data in the queue to avoid underruns. */
> +    if (!u->output_thread_info.push_called)
> +        memblockq_adjust(u, u->output_thread_info.effective_source_latency, true);

This seems unnecessary. Why can't we just let the pop calls drain the
memblockq, and add the necessary amount of silence to the memblockq
only when we receive the first chunk from the source?

> +
>      return 0;
>  }
>  
> @@ -496,13 +668,13 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
>  static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
>      struct userdata *u = PA_SINK_INPUT(obj)->userdata;
>  
> +    pa_sink_input_assert_io_context(u->sink_input);
> +
>      switch (code) {
>  
>          case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
>              pa_usec_t *r = data;
>  
> -            pa_sink_input_assert_io_context(u->sink_input);
> -
>              *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
>  
>              /* Fall through, the default handler will add in the extra
> @@ -512,16 +684,41 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
>  
>          case SINK_INPUT_MESSAGE_POST:
>  
> -            pa_sink_input_assert_io_context(u->sink_input);
> +            pa_memblockq_push_align(u->memblockq, chunk);
> +
> +            /* If push has not been called yet, latency adjustments in sink_input_pop_cb()
> +             * are enabled. Disable them on first push and correct the memblockq. Do the
> +             * same if the pop_cb() requested the adjustment */
> +            if (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust) {
> +                pa_usec_t time_delta;

time_delta represents the sink latency plus the source latency, right?
Maybe rename it to "sink_and_source_latency"?

>  
> -            if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state))
> -                pa_memblockq_push_align(u->memblockq, chunk);
> -            else
> -                pa_memblockq_flush_write(u->memblockq, true);
> +                time_delta = PA_PTR_TO_UINT(data);
> +                time_delta += pa_rtclock_now() - offset;

Comments would be useful here. First time_delta is set to the source
latency that was measured when the message was sent, and then we add
the time between sending and receiving the message, because that's the
amount that the source latency has grown since the message was sent.

> +                time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink);
> +
> +                /* If the source has overrun, assume that the maximum it should have pushed is
> +                 * one full source latency. It may still be possible that the next push also
> +                 * contains too much data, then the resulting latency will be wrong. */
> +                if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
> +                    time_delta = PA_CLIP_SUB(time_delta, u->output_thread_info.effective_source_latency);
> +                else
> +                    time_delta = PA_CLIP_SUB(time_delta, pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec));

It's unclear to me what's happening here. I guess you're substracting
chunk->length from time_delta, because when push_cb was called, the
chunk was still included in the source latency report (I'm a bit
worried that some sources might not do this), but the chunk has also
been pushed to the memblockq, so if we don't adjust time_delta, the
chunk will be counted twice, and the memblockq adjustment will be
wrong.

But I don't really get why you are concerned with "overruns", and how
your code mitigates any problems. If you substract only a part of
chunk->length, it means that a part of the chunk will be counted twice,
resulting in wrong latency.

If we can avoid using effective_source_latency here, the variable can
be removed (it's also used in pop_cb, but that code can be removed
too).

> +
> +                memblockq_adjust(u, time_delta, true);
> +
> +                u->output_thread_info.pop_adjust = false;
> +                u->output_thread_info.push_called = true;
> +            }
> +
> +            /* If pop has not been called yet, make sure the latency does not grow too much.
> +             * Don't push any silence here, because we already have new data in the queue */
> +            if (!u->output_thread_info.pop_called)
> +                 memblockq_adjust(u, pa_sink_get_latency_within_thread(u->sink_input->sink), false);
>  
>              /* Is this the end of an underrun? Then let's start things
>               * right-away */
> -            if (!u->in_pop &&
> +            if (!u->output_thread_info.in_pop &&
> +                u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
>                  u->sink_input->thread_info.underrun_for > 0 &&
>                  pa_memblockq_is_readable(u->memblockq)) {
>  
> @@ -531,20 +728,15 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
>                                               false, true, false);
>              }
>  
> -            u->recv_counter += (int64_t) chunk->length;
> +            u->output_thread_info.recv_counter += (int64_t) chunk->length;
>  
>              return 0;
>  
>          case SINK_INPUT_MESSAGE_REWIND:
>  
> -            pa_sink_input_assert_io_context(u->sink_input);
> -
> -            if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state))
> -                pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
> -            else
> -                pa_memblockq_flush_write(u->memblockq, true);
> +            pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
>  
> -            u->recv_counter -= offset;
> +            u->output_thread_info.recv_counter -= offset;
>  
>              return 0;
>  
> @@ -553,7 +745,7 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
>  
>              length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
>  
> -            u->latency_snapshot.recv_counter = u->recv_counter;
> +            u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
>              u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
>              /* Add content of render memblockq to sink latency */
>              u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink) +
> @@ -562,10 +754,45 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
>  
>              return 0;
>          }
> +
> +        case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
> +
> +            u->output_thread_info.push_called = false;
> +
> +            return 0;
> +
> +        case SINK_INPUT_MESSAGE_SINK_CHANGED:
> +
> +            u->output_thread_info.pop_called = false;
> +            u->output_thread_info.first_pop_done = false;
> +
> +            return 0;
> +
> +        case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
> +
> +            u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
> +
> +            return 0;
>      }
>  
>      return pa_sink_input_process_msg(obj, code, data, offset, chunk);
>  }
> +/* Called from main thread.
> + * Set sink input latency to one third of the overall latency if possible.
> + * The choice of one third is rather arbitrary somewhere between the minimum
> + * possible latency (which would cause a lot of CPU load) and half the configured
> + * latency (which would lead to an empty memblockq if the source is configured
> + * likewise). */
> +static void set_sink_input_latency(struct userdata *u, pa_sink *sink) {
> +     pa_usec_t latency, requested_latency;
> +
> +    requested_latency = u->latency / 3;
> +
> +    latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency);
> +    u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
> +    if (u->configured_sink_latency != requested_latency)
> +        pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
> +}
>  
>  /* Called from output thread context */
>  static void sink_input_attach_cb(pa_sink_input *i) {
> @@ -665,12 +892,21 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
>      if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
>          pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n);
>  
> -    if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED)
> -        pa_source_output_cork(u->source_output, true);
> -    else
> +    /* Set latency and calculate latency limits */
> +    update_latency_boundaries(u, NULL, dest);
> +    set_sink_input_latency(u, dest);
> +    get_effective_source_latency(u, u->source_output->source, dest);
> +
> +    if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) {
> +        if (dest->suspend_cause != PA_SUSPEND_IDLE)
> +            pa_source_output_cork(u->source_output, true);
> +    } else
>          pa_source_output_cork(u->source_output, false);
>  
>      update_adjust_timer(u);
> +
> +    /* Send a message to the output thread that the sink has changed */
> +    pa_asyncmsgq_send(dest->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SINK_CHANGED, NULL, 0, NULL);
>  }
>  
>  /* Called from main thread */
> @@ -695,6 +931,16 @@ static void sink_input_suspend_cb(pa_sink_input *i, bool suspended) {
>      pa_assert_ctl_context();
>      pa_assert_se(u = i->userdata);
>  
> +    /* If the sink input has been suspended, we need to handle this like
> +     * a sink change when the sink input is resumed. Because the sink input
> +     * is suspended, we can set the variables directly. */
> +    if (suspended) {
> +        u->output_thread_info.pop_called = false;
> +        u->output_thread_info.first_pop_done = false;
> +    } else
> +        /* Set effective source latency on unsuspend */
> +        get_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
> +
>      pa_source_output_cork(u->source_output, suspended);
>  
>      update_adjust_timer(u);
> @@ -798,6 +1044,9 @@ int pa__init(pa_module *m) {
>      u->core = m->core;
>      u->module = m;
>      u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
> +    u->output_thread_info.pop_called = false;
> +    u->output_thread_info.pop_adjust = false;
> +    u->output_thread_info.push_called = false;
>  
>      adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
>      if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
> @@ -876,7 +1125,8 @@ int pa__init(pa_module *m) {
>      u->sink_input->suspend = sink_input_suspend_cb;
>      u->sink_input->userdata = u;
>  
> -    pa_sink_input_set_requested_latency(u->sink_input, u->latency/3);
> +    update_latency_boundaries(u, NULL, u->sink_input->sink);
> +    set_sink_input_latency(u, u->sink_input->sink);
>  
>      pa_source_output_new_data_init(&source_output_data);
>      source_output_data.driver = __FILE__;
> @@ -927,7 +1177,8 @@ int pa__init(pa_module *m) {
>      u->source_output->suspend = source_output_suspend_cb;
>      u->source_output->userdata = u;
>  
> -    pa_source_output_set_requested_latency(u->source_output, u->latency/3);
> +    update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
> +    set_source_output_latency(u, u->source_output->source);

pa__init() calls update_latency_boundaries() twice. I think the first
call can be omitted if you move the set_sink_input_latency() call to
happen here together with set_source_output_latency().

>  
>      pa_sink_input_get_silence(u->sink_input, &silence);
>      u->memblockq = pa_memblockq_new(
> @@ -941,6 +1192,8 @@ int pa__init(pa_module *m) {
>              0,                      /* maxrewind */
>              &silence);              /* silence frame */
>      pa_memblock_unref(silence.memblock);
> +    /* Fill the memblockq with silence */
> +    pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);

This seems unnecessary.

-- 
Tanu

https://www.patreon.com/tanuk


More information about the pulseaudio-discuss mailing list