[pulseaudio-discuss] [PATCH 11/21 v2] loopback: Use new feature of pa_{source, sink}_get_latency_within_thread()

Georg Chini georg at chini.tk
Sun Feb 19 16:15:19 UTC 2017


Using the new feature of pa_{source,sink}_get_latency_within_thread() leads
to improved end to end latency estimation and to correct handling of port
latency offsets.

---
 src/modules/module-loopback.c | 105 ++++++++++++++++++++++++++++++++----------
 1 file changed, 81 insertions(+), 24 deletions(-)

diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c
index ffeebbb..fd8afdb 100644
--- a/src/modules/module-loopback.c
+++ b/src/modules/module-loopback.c
@@ -110,12 +110,12 @@ struct userdata {
     /* Used for sink input and source output snapshots */
     struct {
         int64_t send_counter;
-        pa_usec_t source_latency;
+        int64_t source_latency;
         pa_usec_t source_timestamp;
 
         int64_t recv_counter;
         size_t loopback_memblockq_length;
-        pa_usec_t sink_latency;
+        int64_t sink_latency;
         pa_usec_t sink_timestamp;
     } latency_snapshot;
 
@@ -125,9 +125,11 @@ struct userdata {
     /* Output thread variables */
     struct {
         int64_t recv_counter;
+        pa_usec_t effective_source_latency;
 
         /* Copied from main thread */
-        pa_usec_t effective_source_latency;
+        int64_t source_latency_offset;
+        int64_t sink_latency_offset;
         pa_usec_t minimum_latency;
 
         /* Various booleans */
@@ -171,7 +173,9 @@ enum {
     SINK_INPUT_MESSAGE_SINK_CHANGED,
     SINK_INPUT_MESSAGE_SOURCE_CHANGED,
     SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
-    SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY
+    SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
+    SINK_INPUT_MESSAGE_SOURCE_LATENCY_OFFSET_CHANGED,
+    SINK_INPUT_MESSAGE_SINK_LATENCY_OFFSET_CHANGED
 };
 
 enum {
@@ -314,7 +318,8 @@ static void adjust_rates(struct userdata *u) {
     size_t buffer;
     uint32_t old_rate, base_rate, new_rate, run_hours;
     int32_t latency_difference;
-    pa_usec_t current_buffer_latency, snapshot_delay, current_source_sink_latency, current_latency, latency_at_optimum_rate;
+    pa_usec_t current_buffer_latency, snapshot_delay;
+    int64_t current_source_sink_latency, current_latency, latency_at_optimum_rate;
     pa_usec_t final_latency;
 
     pa_assert(u);
@@ -361,7 +366,7 @@ static void adjust_rates(struct userdata *u) {
     latency_at_optimum_rate = current_source_sink_latency + current_buffer_latency * old_rate / base_rate;
 
     final_latency = PA_MAX(u->latency, u->minimum_latency);
-    latency_difference = (int32_t)((int64_t)latency_at_optimum_rate - final_latency);
+    latency_difference = (int32_t)(latency_at_optimum_rate - final_latency);
 
     pa_log_debug("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
                 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
@@ -474,12 +479,23 @@ static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_
 
 /* Called from output context
  * Sets the memblockq to the configured latency corrected by latency_offset_usec */
-static void memblockq_adjust(struct userdata *u, pa_usec_t latency_offset_usec, bool allow_push) {
+static void memblockq_adjust(struct userdata *u, int64_t latency_offset_usec, bool allow_push) {
     size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
-    pa_usec_t requested_buffer_latency, final_latency;
+    int64_t requested_buffer_latency;
+    pa_usec_t final_latency, requested_sink_latency;
 
     final_latency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
-    requested_buffer_latency = PA_CLIP_SUB(final_latency, latency_offset_usec);
+
+    /* If source or sink have some large negative latency offset, we might want to
+     * hold more than final_latency in the memblockq */
+    requested_buffer_latency = (int64_t)final_latency - latency_offset_usec;
+
+    /* Keep at least one sink latency in the queue to make sure that the sink
+     * never underruns initially */
+    requested_sink_latency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
+    if (requested_buffer_latency < (int64_t)requested_sink_latency)
+        requested_buffer_latency = requested_sink_latency;
+
     requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
     current_memblockq_length = pa_memblockq_get_length(u->memblockq);
 
@@ -500,7 +516,8 @@ static void memblockq_adjust(struct userdata *u, pa_usec_t latency_offset_usec,
 /* Called from input thread context */
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
     struct userdata *u;
-    pa_usec_t push_time, current_source_latency;
+    pa_usec_t push_time;
+    int64_t current_source_latency;
 
     pa_source_output_assert_ref(o);
     pa_source_output_assert_io_context(o);
@@ -508,9 +525,9 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
 
     /* Send current source latency and timestamp with the message */
     push_time = pa_rtclock_now();
-    current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, false);
+    current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, true);
 
-    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_UINT_TO_PTR(current_source_latency), push_time, chunk, NULL);
+    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_INT_TO_PTR(current_source_latency), push_time, chunk, NULL);
     u->send_counter += (int64_t) chunk->length;
 }
 
@@ -539,7 +556,7 @@ static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data,
 
             u->latency_snapshot.send_counter = u->send_counter;
             /* Add content of delay memblockq to the source latency */
-            u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, false) +
+            u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
                                                  pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
             u->latency_snapshot.source_timestamp = pa_rtclock_now();
 
@@ -682,7 +699,7 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
     u->iteration_counter = 0;
     u->underrun_counter = 0;
 
-    /* Send a mesage to the output thread that the source has changed.
+    /* Send a mesage to the output thread that the source and offset have changed.
      * If the sink is invalid here during a profile switching situation
      * we can safely set push_called to false directly.  Also, the current
      * sampling rate may be far away from the default rate if we are still
@@ -691,9 +708,11 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
      * update the sink input sample spec.*/
     if (u->sink_input->sink) {
         pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
+        pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_LATENCY_OFFSET_CHANGED, NULL, u->source_latency_offset, NULL);
         pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
     } else {
         u->output_thread_info.push_called = false;
+        u->output_thread_info.source_latency_offset = u->source_latency_offset;
         u->sink_input->sample_spec.rate = u->source_output->sample_spec.rate;
     }
 }
@@ -773,7 +792,7 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
      * when it starts pushing. Adjust the memblockq accordingly and ensure that there is
      * enough data in the queue to avoid underruns. */
     if (!u->output_thread_info.push_called)
-        memblockq_adjust(u, u->output_thread_info.effective_source_latency, true);
+        memblockq_adjust(u, (int64_t)u->output_thread_info.effective_source_latency + u->output_thread_info.source_latency_offset, true);
 
     return 0;
 }
@@ -815,19 +834,33 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
              * are enabled. Disable them on first push and correct the memblockq. Do the
              * same if the pop_cb() requested the adjustment */
             if (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust) {
-                pa_usec_t time_delta;
+                int64_t time_delta, tmp_latency;
 
-                time_delta = PA_PTR_TO_UINT(data);
+                /* If the source has not been running, the reported latency will reflect the time between
+                 * start of the source and the first push. If the source has been starting slow, this may
+                 * be more than the actual source latency, so clip at the requested source latency value */
+                time_delta = PA_PTR_TO_INT(data);
+                if (time_delta > u->output_thread_info.source_latency_offset + (int64_t)u->output_thread_info.effective_source_latency)
+                    time_delta = u->output_thread_info.source_latency_offset + u->output_thread_info.effective_source_latency;
+
+                /* Add the time between the push and the POST message */
                 time_delta += pa_rtclock_now() - offset;
-                time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, false);
+
+                /* If the sink has not been running, the reported latency will be the amount written minus the
+                 * time since the start and therefore might become negative if the sink has been starting slow.
+                 * Clip to latency offset */
+                tmp_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true);
+                if (tmp_latency < u->output_thread_info.sink_latency_offset)
+                    tmp_latency = u->output_thread_info.sink_latency_offset;
+                time_delta += tmp_latency;
 
                 /* If the source has overrun, assume that the maximum it should have pushed is
                  * one full source latency. It may still be possible that the next push also
                  * contains too much data, then the resulting latency will be wrong. */
                 if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
-                    time_delta = PA_CLIP_SUB(time_delta, u->output_thread_info.effective_source_latency);
+                    time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
                 else
-                    time_delta = PA_CLIP_SUB(time_delta, pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec));
+                    time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
 
                 memblockq_adjust(u, time_delta, true);
 
@@ -838,7 +871,7 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
             /* If pop has not been called yet, make sure the latency does not grow too much.
              * Don't push any silence here, because we already have new data in the queue */
             if (!u->output_thread_info.pop_called)
-                 memblockq_adjust(u, pa_sink_get_latency_within_thread(u->sink_input->sink, false), false);
+                 memblockq_adjust(u, pa_sink_get_latency_within_thread(u->sink_input->sink, true), false);
 
             /* Is this the end of an underrun? Then let's start things
              * right-away */
@@ -876,7 +909,7 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
             u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
             u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
             /* Add content of render memblockq to sink latency */
-            u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, false) +
+            u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
                                                pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
             u->latency_snapshot.sink_timestamp = pa_rtclock_now();
 
@@ -907,6 +940,18 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
             u->output_thread_info.minimum_latency = (pa_usec_t)offset;
 
             return 0;
+
+        case SINK_INPUT_MESSAGE_SOURCE_LATENCY_OFFSET_CHANGED:
+
+            u->output_thread_info.source_latency_offset = offset;
+
+            return 0;
+
+        case SINK_INPUT_MESSAGE_SINK_LATENCY_OFFSET_CHANGED:
+
+            u->output_thread_info.sink_latency_offset = offset;
+
+            return 0;
     }
 
     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -1044,8 +1089,9 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
     u->iteration_counter = 0;
     u->underrun_counter = 0;
 
-    /* Send a message to the output thread that the sink has changed */
+    /* Send a message to the output thread that the sink and offset have changed */
     pa_asyncmsgq_send(dest->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SINK_CHANGED, NULL, 0, NULL);
+    pa_asyncmsgq_send(dest->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SINK_LATENCY_OFFSET_CHANGED, NULL, u->sink_latency_offset, NULL);
 
     /* Sampling rate may be far away from the default rate if we are still
      * recovering from a previous source or sink change, so reset rate to
@@ -1172,6 +1218,9 @@ static pa_hook_result_t sink_port_latency_offset_changed_cb(pa_core *core, pa_si
     u->sink_latency_offset = sink->port_latency_offset;
     update_minimum_latency(u, sink, true);
 
+    /* Update variable in output thread */
+    pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SINK_LATENCY_OFFSET_CHANGED, NULL, u->sink_latency_offset, NULL);
+
     return PA_HOOK_OK;
 }
 
@@ -1186,6 +1235,12 @@ static pa_hook_result_t source_port_latency_offset_changed_cb(pa_core *core, pa_
     u->source_latency_offset = source->port_latency_offset;
     update_minimum_latency(u, u->sink_input->sink, true);
 
+    /* Update variable in output thread */
+    if (u->sink_input->sink)
+        pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_LATENCY_OFFSET_CHANGED, NULL, u->source_latency_offset, NULL);
+    else
+        u->output_thread_info.source_latency_offset = u->source_latency_offset;
+
     return PA_HOOK_OK;
 }
 
@@ -1477,8 +1532,10 @@ int pa__init(pa_module *m) {
     u->msg->parent.process_msg = loopback_process_msg_cb;
     u->msg->userdata = u;
 
-    /* The output thread is not yet running, set effective_source_latency directly */
+    /* The output thread is not yet running, set effective_source_latency and offsets directly */
     get_effective_source_latency(u, u->source_output->source, NULL);
+    u->output_thread_info.source_latency_offset = u->source_latency_offset;
+    u->output_thread_info.sink_latency_offset = u->sink_latency_offset;
 
     pa_sink_input_put(u->sink_input);
     pa_source_output_put(u->source_output);
-- 
2.10.1



More information about the pulseaudio-discuss mailing list