[pulseaudio-discuss] [PATCH RFC v3] Make module-loopback honor requested latency

Georg Chini georg at chini.tk
Sun Nov 30 10:26:39 PST 2014


This patch makes module loopback adjust to the requested latency if possible
by tweaking the rate adjustment. On startup the latency is initialized to a value
near the requested latency so that the stable state can be reached within a few
iterations of adjust_time.
In the case that the requested latency is smaller than source latency + sink
latency + 25ms, module loopback will try to adjust the buffer latency to 25ms.
(This value could also be made user configurable if necessary)

---
 src/modules/module-loopback.c | 150 +++++++++++++++++++++---------------------
 1 file changed, 74 insertions(+), 76 deletions(-)

diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c
index b3b9557..aff42ab 100644
--- a/src/modules/module-loopback.c
+++ b/src/modules/module-loopback.c
@@ -65,6 +65,8 @@ PA_MODULE_USAGE(
 
 #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC)
 
+#define MIN_BUFFER_USEC (25*PA_USEC_PER_MSEC)
+
 struct userdata {
     pa_core *core;
     pa_module *module;
@@ -87,7 +89,7 @@ struct userdata {
     pa_usec_t latency;
 
     bool in_pop;
-    size_t min_memblockq_length;
+    bool pop_called;
 
     struct {
         int64_t send_counter;
@@ -98,8 +100,7 @@ struct userdata {
         size_t sink_input_buffer;
         pa_usec_t sink_latency;
 
-        size_t min_memblockq_length;
-        size_t max_request;
+        pa_usec_t buffer_latency;
     } latency_snapshot;
 };
 
@@ -124,7 +125,6 @@ enum {
     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
     SINK_INPUT_MESSAGE_REWIND,
     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
-    SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED
 };
 
 enum {
@@ -171,9 +171,10 @@ static void teardown(struct userdata *u) {
 
 /* Called from main context */
 static void adjust_rates(struct userdata *u) {
-    size_t buffer, fs;
+    size_t buffer;
     uint32_t old_rate, base_rate, new_rate;
-    pa_usec_t buffer_latency;
+    pa_usec_t req_buffer_latency, final_latency;
+    double step_size;
 
     pa_assert(u);
     pa_assert_ctl_context();
@@ -188,40 +189,40 @@ static void adjust_rates(struct userdata *u) {
     if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
         buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
     else
-        buffer += PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
+        buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
 
-    buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
+    u->latency_snapshot.buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
 
     pa_log_debug("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
                 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
-                (double) buffer_latency / PA_USEC_PER_MSEC,
+                (double) u->latency_snapshot.buffer_latency / PA_USEC_PER_MSEC,
                 (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
-                ((double) u->latency_snapshot.sink_latency + buffer_latency + u->latency_snapshot.source_latency) / PA_USEC_PER_MSEC);
-
-    pa_log_debug("Should buffer %zu bytes, buffered at minimum %zu bytes",
-                u->latency_snapshot.max_request*2,
-                u->latency_snapshot.min_memblockq_length);
+                ((double) u->latency_snapshot.sink_latency + u->latency_snapshot.buffer_latency + u->latency_snapshot.source_latency) / PA_USEC_PER_MSEC);
 
-    fs = pa_frame_size(&u->sink_input->sample_spec);
     old_rate = u->sink_input->sample_spec.rate;
     base_rate = u->source_output->sample_spec.rate;
 
-    if (u->latency_snapshot.min_memblockq_length < u->latency_snapshot.max_request*2)
-        new_rate = base_rate - (((u->latency_snapshot.max_request*2 - u->latency_snapshot.min_memblockq_length) / fs) *PA_USEC_PER_SEC)/u->adjust_time;
-    else
-        new_rate = base_rate + (((u->latency_snapshot.min_memblockq_length - u->latency_snapshot.max_request*2) / fs) *PA_USEC_PER_SEC)/u->adjust_time;
+    final_latency = PA_MAX(u->latency, u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency + MIN_BUFFER_USEC);
+    req_buffer_latency = final_latency - u->latency_snapshot.sink_latency - u->latency_snapshot.source_latency;
+    new_rate = base_rate * 1.01;
+    if ((int32_t)(u->adjust_time + req_buffer_latency - u->latency_snapshot.buffer_latency) > 0)
+        new_rate = base_rate * u->adjust_time/(u->adjust_time + req_buffer_latency - u->latency_snapshot.buffer_latency);
 
-    if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
-        pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
-        new_rate = base_rate;
-    } else {
-        if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
-          new_rate = base_rate;
-        /* Do the adjustment in small steps; 2‰ can be considered inaudible */
-        if (new_rate < (uint32_t) (old_rate*0.998) || new_rate > (uint32_t) (old_rate*1.002)) {
-            pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, old_rate);
-            new_rate = PA_CLAMP(new_rate, (uint32_t) (old_rate*0.998), (uint32_t) (old_rate*1.002));
-        }
+    if (base_rate < new_rate + 10 && new_rate < base_rate + 10)
+       new_rate = base_rate;
+
+    /* Do not deviate more than between 1% from the base sample rate */
+    if (new_rate < (uint32_t) (base_rate*0.99) || new_rate > (uint32_t) (base_rate*1.01)) {
+       pa_log_info("Sample rates too different (%u vs. %u), limiting rate adjustment.", base_rate, new_rate);
+       new_rate = PA_CLAMP(new_rate, (uint32_t) (base_rate*0.99), (uint32_t) (base_rate*1.01));
+    }
+    /* Do the adjustment in small steps; 2-2.5‰ can be considered inaudible */
+    step_size = 0.002;
+    if (abs((int32_t)(new_rate - base_rate)) < abs((int32_t)(old_rate - base_rate)))
+       step_size = 0.0025;
+    if (new_rate < (uint32_t) (old_rate*(1-step_size)) || new_rate > (uint32_t) (old_rate*(1+step_size))) {
+        pa_log_info("New rate of %u Hz not within %0.1f‰ of %u Hz, forcing smaller adjustment", new_rate, step_size * 1000, old_rate);
+        new_rate = PA_CLAMP(new_rate, (uint32_t) (old_rate*(1-step_size)), (uint32_t) (old_rate*(1+step_size)));
     }
 
     pa_sink_input_set_rate(u->sink_input, new_rate);
@@ -442,20 +443,6 @@ static void source_output_suspend_cb(pa_source_output *o, bool suspended) {
 }
 
 /* Called from output thread context */
-static void update_min_memblockq_length(struct userdata *u) {
-    size_t length;
-
-    pa_assert(u);
-    pa_sink_input_assert_io_context(u->sink_input);
-
-    length = pa_memblockq_get_length(u->memblockq);
-
-    if (u->min_memblockq_length == (size_t) -1 ||
-        length < u->min_memblockq_length)
-        u->min_memblockq_length = length;
-}
-
-/* Called from output thread context */
 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
     struct userdata *u;
 
@@ -464,6 +451,8 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert_se(u = i->userdata);
     pa_assert(chunk);
 
+    if (PA_SINK_IS_RUNNING(u->sink_input->sink->thread_info.state) && !u->pop_called)
+        u->pop_called = true;
     u->in_pop = true;
     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
         ;
@@ -477,8 +466,6 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     chunk->length = PA_MIN(chunk->length, nbytes);
     pa_memblockq_drop(u->memblockq, chunk->length);
 
-    update_min_memblockq_length(u);
-
     return 0;
 }
 
@@ -494,6 +481,28 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
 }
 
 /* Called from output thread context */
+static void memblockq_adjust(struct userdata *u, size_t length) {
+    int32_t latency_diff;
+    uint32_t latency;
+    size_t nbytes;
+
+    if (length) {
+       nbytes = PA_MIN(length, pa_memblockq_get_length(u->memblockq));
+       pa_log_debug("Dropping %u Bytes from queue", (uint32_t) nbytes);
+       pa_memblockq_drop(u->memblockq, nbytes);
+       return;
+    }
+    latency = PA_MAX(u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency + MIN_BUFFER_USEC, u->latency);
+    latency_diff = u->latency_snapshot.sink_latency + u->latency_snapshot.buffer_latency + u->latency_snapshot.source_latency - latency;
+    if (latency_diff > 0) {
+       nbytes = PA_MIN(pa_usec_to_bytes(latency_diff, &u->sink_input->sample_spec), pa_memblockq_get_length(u->memblockq));
+       pa_log_debug("Dropping %u Bytes from queue", (uint32_t) nbytes);
+       pa_memblockq_drop(u->memblockq, nbytes);
+/*       u->latency_snapshot.buffer_latency = latency - u->latency_snapshot.source_latency - u->latency_snapshot.sink_latency; */
+   }
+}
+
+/* Called from output thread context */
 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
 
@@ -515,12 +524,12 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
 
             pa_sink_input_assert_io_context(u->sink_input);
 
-            if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state))
+            if (PA_SINK_IS_RUNNING(u->sink_input->sink->thread_info.state) && u->pop_called)
                 pa_memblockq_push_align(u->memblockq, chunk);
-            else
-                pa_memblockq_flush_write(u->memblockq, true);
-
-            update_min_memblockq_length(u);
+            else {
+                pa_memblockq_push_align(u->memblockq, chunk);
+                memblockq_adjust(u, chunk->length);
+            }
 
             /* Is this the end of an underrun? Then let's start things
              * right-away */
@@ -542,22 +551,18 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
 
             pa_sink_input_assert_io_context(u->sink_input);
 
-            if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state))
+            if (PA_SINK_IS_RUNNING(u->sink_input->sink->thread_info.state) && u->pop_called)
                 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
-            else
-                pa_memblockq_flush_write(u->memblockq, true);
+            else if (u->latency_snapshot.buffer_latency)
+                memblockq_adjust(u, 0);
 
             u->recv_counter -= offset;
 
-            update_min_memblockq_length(u);
-
             return 0;
 
         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
             size_t length;
 
-            update_min_memblockq_length(u);
-
             length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
 
             u->latency_snapshot.recv_counter = u->recv_counter;
@@ -566,25 +571,9 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in
                 (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, length) : length);
             u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
 
-            u->latency_snapshot.max_request = pa_sink_input_get_max_request(u->sink_input);
-
-            u->latency_snapshot.min_memblockq_length = u->min_memblockq_length;
-            u->min_memblockq_length = (size_t) -1;
-
             return 0;
         }
 
-        case SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED: {
-            /* This message is sent from the IO thread to the main
-             * thread! So don't be confused. All the user cases above
-             * are executed in thread context, but this one is not! */
-
-            pa_assert_ctl_context();
-
-            if (u->time_event)
-                adjust_rates(u);
-            return 0;
-        }
     }
 
     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -606,7 +595,7 @@ static void sink_input_attach_cb(pa_sink_input *i) {
     pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
     pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
 
-    u->min_memblockq_length = (size_t) -1;
+    u->pop_called = false;
 }
 
 /* Called from output thread context */
@@ -621,6 +610,7 @@ static void sink_input_detach_cb(pa_sink_input *i) {
         pa_rtpoll_item_free(u->rtpoll_item_read);
         u->rtpoll_item_read = NULL;
     }
+    memblockq_adjust(u, 0);
 }
 
 /* Called from output thread context */
@@ -644,7 +634,6 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
 
     pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
     pa_log_info("Max request changed");
-    pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED, NULL, 0, NULL, NULL);
 }
 
 /* Called from main thread */
@@ -746,6 +735,7 @@ int pa__init(pa_module *m) {
     uint32_t adjust_time_sec;
     const char *n;
     bool remix = true;
+    pa_usec_t counter;
 
     pa_assert(m);
 
@@ -820,6 +810,7 @@ int pa__init(pa_module *m) {
     u->core = m->core;
     u->module = m;
     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
+    u->pop_called = false;
 
     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
@@ -962,6 +953,13 @@ int pa__init(pa_module *m) {
             0,                      /* minreq */
             0,                      /* maxrewind */
             &silence);              /* silence frame */
+    silence.index = silence.length - pa_usec_to_bytes(PA_USEC_PER_MSEC * 100, &u->source_output->sample_spec);
+    silence.length = pa_usec_to_bytes(PA_USEC_PER_MSEC * 100, &u->source_output->sample_spec);
+    counter = 150 * PA_USEC_PER_MSEC;
+    while ( counter < u->latency) {
+        pa_memblockq_push_align(u->memblockq, &silence);
+        counter += 100 * PA_USEC_PER_MSEC;
+    }
     pa_memblock_unref(silence.memblock);
 
     u->asyncmsgq = pa_asyncmsgq_new(0);
-- 
2.1.3



More information about the pulseaudio-discuss mailing list