[pulseaudio-commits] [Git][pulseaudio/pulseaudio][master] 6 commits: combine-sink: Fix latency calculations

PulseAudio Marge Bot (@pulseaudio-merge-bot) gitlab at gitlab.freedesktop.org
Wed Aug 25 15:35:28 UTC 2021



PulseAudio Marge Bot pushed to branch master at PulseAudio / pulseaudio


Commits:
ab78f8ed by Georg Chini at 2021-08-25T15:32:21+00:00
combine-sink: Fix latency calculations

Currently module-combine-sink uses only a rough estimate of the current
slave sink latencies to calculate the rate for the various sink inputs.
This leads to very inexact and unstable latency reports for the virtual
sink.

This patch fixes the issue by introducing latency snapshots like they
are used in module-loopback. It also changes the definition of the
target latency to ensure that there is always one sink which uses the
base rate.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/53>

- - - - -
54baa223 by Georg Chini at 2021-08-25T15:32:22+00:00
combine-sink: Add rate controller

This patch adds a rate controller similar to the one used in module-loopback
to limit step size and maximum deviation from the base rate. Rate changes
are handled more smoothly by the controller. The patch has not much impact
on the behavior of the module, except that there is less rate hunting.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/53>

- - - - -
9d1a43ae by Georg Chini at 2021-08-25T15:32:22+00:00
combine-sink: Improve initial latency reports

Currently, it takes one adjust time before the smoother is updated after an
unsuspend. Before the first update, the smoother will not be aware of the
slave sink latencies, leading to incorrect latency reports.

This patch moves the first smoother update to one latency time after the
sink was unsuspended, thereby improving initial latency reports. This
only partially resolves the problem because the smoother takes multiple
updates to adapt to the slave sink latencies.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/53>

- - - - -
45a5e6f0 by Georg Chini at 2021-08-25T15:32:22+00:00
combine-sink: Use configured resampler, reduce update time to 1s

Currently the combine-sink uses the trivial resampler by default.

This patch changes the default to the configured resampler.
Also the default update time is changed from 10s to 1s to achieve
faster convergence and higher precision.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/53>

- - - - -
4c180214 by Georg Chini at 2021-08-25T15:32:22+00:00
tunnel: Fix latency calculations

Currently module-tunnel uses only a rough estimate of the current stream
latency and reports wrong latencies in certain situations. This leads to
very inexact and unstable latency reports for the virtual sink.

This patch fixes the issue by introducing latency snapshots like they
are used in module-loopback. Because the latency reports are now correct,
the update interval for latency re-calculations can be reduced to 1s.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/53>

- - - - -
735eb05e by Georg Chini at 2021-08-25T15:32:22+00:00
tunnel: Make fixed latency configurable

Currently, module-tunnel uses the default fixed latency of 250ms as fixed
latency.

There is no reason for such a large latency. This patch adds a parameter
latency_msec to the module to set the fixed latency at load time of the
module. The parameter can range from 5 to 500 milliseconds. With this
patch, I was able to run a tunnel sink at 7ms latency without problems.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/53>

- - - - -


2 changed files:

- src/modules/module-combine-sink.c
- src/modules/module-tunnel.c


Changes:

=====================================
src/modules/module-combine-sink.c
=====================================
@@ -65,7 +65,7 @@ PA_MODULE_USAGE(
 
 #define MEMBLOCKQ_MAXLENGTH (1024*1024*16)
 
-#define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC)
+#define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
 
 #define BLOCK_USEC (PA_USEC_PER_MSEC * 200)
 
@@ -116,6 +116,14 @@ struct output {
 
     /* For communication of the stream latencies to the main thread */
     pa_usec_t total_latency;
+    struct {
+        pa_usec_t timestamp;
+        pa_usec_t sink_latency;
+        size_t output_memblockq_size;
+        uint64_t receive_counter;
+    } latency_snapshot;
+
+    uint64_t receive_counter;
 
     /* For communication of the stream parameters to the sink thread */
     pa_atomic_t max_request;
@@ -159,21 +167,33 @@ struct userdata {
         bool in_null_mode;
         pa_smoother *smoother;
         uint64_t counter;
+
+        uint64_t snapshot_counter;
+        pa_usec_t snapshot_time;
+
+        pa_usec_t render_timestamp;
     } thread_info;
 };
 
+struct sink_snapshot {
+    pa_usec_t timestamp;
+    uint64_t send_counter;
+};
+
 enum {
     SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
     SINK_MESSAGE_REMOVE_OUTPUT,
     SINK_MESSAGE_NEED,
     SINK_MESSAGE_UPDATE_LATENCY,
     SINK_MESSAGE_UPDATE_MAX_REQUEST,
-    SINK_MESSAGE_UPDATE_LATENCY_RANGE
+    SINK_MESSAGE_UPDATE_LATENCY_RANGE,
+    SINK_MESSAGE_GET_SNAPSHOT
 };
 
 enum {
     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
-    SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY
+    SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY,
+    SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
 };
 
 static void output_disable(struct output *o);
@@ -181,12 +201,48 @@ static void output_enable(struct output *o);
 static void output_free(struct output *o);
 static int output_create_sink_input(struct output *o);
 
+/* rate controller, called from main context
+ * - maximum deviation from base rate is less than 1%
+ * - controller step size is limited to 2.01‰
+ * - exhibits hunting with USB or Bluetooth devices
+ */
+static uint32_t rate_controller(
+                struct output *o,
+                uint32_t base_rate, uint32_t old_rate,
+                int32_t latency_difference_usec) {
+
+    double new_rate, new_rate_1, new_rate_2;
+    double min_cycles_1, min_cycles_2;
+
+    /* Calculate next rate that is not more than 2‰ away from the last rate */
+    min_cycles_1 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.002 + 1;
+    new_rate_1 = old_rate + base_rate * (double)latency_difference_usec / min_cycles_1 / o->userdata->adjust_time;
+
+    /* Calculate best rate to correct the current latency offset, limit at
+     * 1% difference from base_rate */
+    min_cycles_2 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.01 + 1;
+    new_rate_2 = (double)base_rate * (1.0 + (double)latency_difference_usec / min_cycles_2 / o->userdata->adjust_time);
+
+    /* Choose the rate that is nearer to base_rate */
+    new_rate = new_rate_2;
+    if (abs(new_rate_1 - base_rate) < abs(new_rate_2 - base_rate))
+        new_rate = new_rate_1;
+
+    return (uint32_t)(new_rate + 0.5);
+}
+
 static void adjust_rates(struct userdata *u) {
     struct output *o;
-    pa_usec_t max_sink_latency = 0, min_total_latency = (pa_usec_t) -1, target_latency, avg_total_latency = 0;
+    struct sink_snapshot rdata;
+    pa_usec_t avg_total_latency = 0;
+    pa_usec_t target_latency = 0;
+    pa_usec_t max_sink_latency = 0;
+    pa_usec_t min_total_latency = (pa_usec_t)-1;
     uint32_t base_rate;
     uint32_t idx;
     unsigned n = 0;
+    pa_usec_t now;
+    struct output *o_max;
 
     pa_assert(u);
     pa_sink_assert_ref(u->sink);
@@ -194,70 +250,97 @@ static void adjust_rates(struct userdata *u) {
     if (pa_idxset_size(u->outputs) <= 0)
         return;
 
-    if (!PA_SINK_IS_OPENED(u->sink->state))
+    if (u->sink->state != PA_SINK_RUNNING)
+        return;
+
+    /* Get sink snapshot */
+    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_SNAPSHOT, &rdata, 0, NULL);
+
+    /* The sink snapshot time is the time when the last data was rendered.
+     * Latency is calculated for that point in time. */
+    now = rdata.timestamp;
+
+    /* Sink snapshot is not yet valid. */
+    if (!now)
         return;
 
     PA_IDXSET_FOREACH(o, u->outputs, idx) {
-        pa_usec_t sink_latency;
+        pa_usec_t snapshot_latency;
+        int64_t time_difference;
 
         if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
             continue;
 
-        o->total_latency = pa_sink_input_get_latency(o->sink_input, &sink_latency);
-        o->total_latency += sink_latency;
-
-        if (sink_latency > max_sink_latency)
-            max_sink_latency = sink_latency;
-
-        if (min_total_latency == (pa_usec_t) -1 || o->total_latency < min_total_latency)
+        /* The difference may become negative, because it is probable, that the last
+         * render time was before the sink input snapshot. In this case, the sink
+         * had some more latency at the render time, so subtracting the value still
+         * gives the right result. */
+        time_difference = (int64_t)now - (int64_t)o->latency_snapshot.timestamp;
+
+        /* Latency at sink snapshot time is sink input snapshot latency minus time
+         * passed between the two snapshots. */
+        snapshot_latency = o->latency_snapshot.sink_latency
+                           + pa_bytes_to_usec(o->latency_snapshot.output_memblockq_size, &o->sink_input->sample_spec)
+                           - time_difference;
+
+        /* Add the data that was sent between taking the sink input snapshot
+         * and the sink snapshot. */
+        snapshot_latency += pa_bytes_to_usec(rdata.send_counter - o->latency_snapshot.receive_counter, &o->sink_input->sample_spec);
+
+        /* This is the current combined latency of the slave sink and the related
+         * memblockq at the time of the sink snapshot. */
+        o->total_latency = snapshot_latency;
+        avg_total_latency += snapshot_latency;
+
+        /* Get max_sink_latency and min_total_latency for target selection. */
+        if (min_total_latency == (pa_usec_t)-1 || o->total_latency < min_total_latency)
             min_total_latency = o->total_latency;
 
-        avg_total_latency += o->total_latency;
-        n++;
+        if (o->latency_snapshot.sink_latency > max_sink_latency) {
+            max_sink_latency = o->latency_snapshot.sink_latency;
+            o_max = o;
+        }
 
-        pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC);
+        /* Debug output */
+        pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC);
 
         if (o->total_latency > 10*PA_USEC_PER_SEC)
             pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
+
+        n++;
     }
 
+    /* If there is no valid output there is nothing to do. */
     if (min_total_latency == (pa_usec_t) -1)
         return;
 
     avg_total_latency /= n;
 
-    target_latency = PA_MAX(max_sink_latency, min_total_latency);
+    /* The target selection ensures, that at least one of the
+     * sinks will use the base rate and all other sinks are set
+     * relative to it. */
+    if (max_sink_latency > min_total_latency)
+        target_latency = o_max->total_latency;
+    else
+        target_latency = min_total_latency;
 
     pa_log_info("[%s] avg total latency is %0.2f msec.", u->sink->name, (double) avg_total_latency / PA_USEC_PER_MSEC);
-    pa_log_info("[%s] target latency is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC);
+    pa_log_info("[%s] target latency for all slaves is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC);
 
     base_rate = u->sink->sample_spec.rate;
 
+    /* Calculate and set rates for the sink inputs. */
     PA_IDXSET_FOREACH(o, u->outputs, idx) {
-        uint32_t new_rate = base_rate;
-        uint32_t current_rate;
+        uint32_t new_rate;
+        int32_t latency_difference;
 
         if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
             continue;
 
-        current_rate = o->sink_input->sample_spec.rate;
+        latency_difference = (int64_t)o->total_latency - (int64_t)target_latency;
+        new_rate = rate_controller(o, base_rate, o->sink_input->sample_spec.rate, latency_difference);
 
-        if (o->total_latency != target_latency)
-            new_rate += (uint32_t) (((double) o->total_latency - (double) target_latency) / (double) u->adjust_time * (double) new_rate);
-
-        if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
-            pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->sink->name, base_rate, new_rate);
-            new_rate = base_rate;
-        } else {
-            if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
-              new_rate = base_rate;
-            /* Do the adjustment in small steps; 2‰ can be considered inaudible */
-            if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
-                pa_log_info("[%s] new rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", o->sink_input->sink->name, new_rate, current_rate);
-                new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
-            }
-            pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.2f msec.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate, (double) o->total_latency / PA_USEC_PER_MSEC);
-        }
+        pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate);
         pa_sink_input_set_rate(o->sink_input, new_rate);
     }
 
@@ -271,13 +354,22 @@ static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct tim
     pa_assert(a);
     pa_assert(u->time_event == e);
 
-    adjust_rates(u);
-
     if (u->sink->state == PA_SINK_SUSPENDED) {
         u->core->mainloop->time_free(e);
         u->time_event = NULL;
-    } else
+    } else {
+        struct output *o;
+        uint32_t idx;
+
         pa_core_rttime_restart(u->core, e, pa_rtclock_now() + u->adjust_time);
+
+        /* Get latency snapshots */
+        PA_IDXSET_FOREACH(o, u->outputs, idx) {
+            pa_asyncmsgq_send(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
+        }
+
+    }
+    adjust_rates(u);
 }
 
 static void process_render_null(struct userdata *u, pa_usec_t now) {
@@ -387,7 +479,10 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
     while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
         ;
 
-    /* Ok, now let's prepare some data if we really have to */
+    /* Ok, now let's prepare some data if we really have to. Save the
+     * the time for latency calculations. */
+    u->thread_info.render_timestamp = pa_rtclock_now();
+
     while (!pa_memblockq_is_readable(o->memblockq)) {
         struct output *j;
         pa_memchunk chunk;
@@ -396,6 +491,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
         pa_sink_render(u->sink, length, &chunk);
 
         u->thread_info.counter += chunk.length;
+        o->receive_counter += chunk.length;
 
         /* OK, let's send this data to the other threads */
         PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
@@ -630,9 +726,10 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
 
         case SINK_INPUT_MESSAGE_POST:
 
-            if (PA_SINK_IS_OPENED(o->sink_input->sink->thread_info.state))
+            if (o->sink_input->sink->thread_info.state == PA_SINK_RUNNING) {
                 pa_memblockq_push_align(o->memblockq, chunk);
-            else
+                o->receive_counter += chunk->length;
+            } else
                 pa_memblockq_flush_write(o->memblockq, true);
 
             return 0;
@@ -644,6 +741,24 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
 
             return 0;
         }
+
+        case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
+            size_t length;
+
+            length = pa_memblockq_get_length(o->sink_input->thread_info.render_memblockq);
+
+            o->latency_snapshot.output_memblockq_size = pa_memblockq_get_length(o->memblockq);
+
+            /* Add content of memblockq's to sink latency */
+            o->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(o->sink, true) +
+                                               pa_bytes_to_usec(length, &o->sink->sample_spec);
+
+            o->latency_snapshot.timestamp = pa_rtclock_now();
+
+            o->latency_snapshot.receive_counter = o->receive_counter;
+
+            return 0;
+        }
     }
 
     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -674,9 +789,6 @@ static void unsuspend(struct userdata *u) {
     PA_IDXSET_FOREACH(o, u->outputs, idx)
         output_enable(o);
 
-    if (!u->time_event && u->adjust_time > 0)
-        u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
-
     pa_log_info("Resumed successfully...");
 }
 
@@ -708,6 +820,13 @@ static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state
             if (u->sink->state == PA_SINK_SUSPENDED)
                 unsuspend(u);
 
+            /* The first smoother update should be done early, otherwise the smoother will
+             * not be aware of the slave sink latencies and report far too small values.
+             * This is especially important if after an unsuspend the sink runs on a different
+             * latency than before. */
+            if (state == PA_SINK_RUNNING && !u->time_event && u->adjust_time > 0)
+                u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + pa_sink_get_requested_latency(u->sink), time_callback, u);
+
             break;
 
         case PA_SINK_UNLINKED:
@@ -735,9 +854,10 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state,
     running = new_state == PA_SINK_RUNNING;
     pa_atomic_store(&u->thread_info.running, running);
 
-    if (running)
+    if (running) {
+        u->thread_info.render_timestamp = 0;
         pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
-    else
+    } else
         pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
 
     return 0;
@@ -830,6 +950,7 @@ static void output_add_within_thread(struct output *o) {
             o->userdata->rtpoll,
             PA_RTPOLL_NORMAL,
             o->control_inq);
+    o->receive_counter = o->userdata->thread_info.counter;
 }
 
 /* Called from thread context of the io thread */
@@ -917,8 +1038,11 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
         case SINK_MESSAGE_UPDATE_LATENCY: {
             pa_usec_t x, y, latency = (pa_usec_t) offset;
 
-            x = pa_rtclock_now();
-            y = pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec);
+            /* It may be possible that thread_info.counter has been increased
+             * since we took the snapshot. Therefore we have to use the snapshot
+             * time and counter instead of the current values. */
+            x = u->thread_info.snapshot_time;
+            y = pa_bytes_to_usec(u->thread_info.snapshot_counter, &u->sink->sample_spec);
 
             if (y > latency)
                 y -= latency;
@@ -929,6 +1053,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             return 0;
         }
 
+        case SINK_MESSAGE_GET_SNAPSHOT: {
+            struct sink_snapshot *rdata = data;
+
+            rdata->timestamp = u->thread_info.render_timestamp;
+            rdata->send_counter = u->thread_info.counter;
+            u->thread_info.snapshot_counter = u->thread_info.counter;
+            u->thread_info.snapshot_time = u->thread_info.render_timestamp;
+
+            return 0;
+        }
+
         case SINK_MESSAGE_UPDATE_MAX_REQUEST:
             update_max_request(u);
             break;
@@ -1288,7 +1423,7 @@ int pa__init(pa_module*m) {
     struct userdata *u;
     pa_modargs *ma = NULL;
     const char *slaves, *rm;
-    int resample_method = PA_RESAMPLER_TRIVIAL;
+    int resample_method;
     pa_sample_spec ss;
     pa_channel_map map;
     struct output *o;
@@ -1304,6 +1439,7 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
+    resample_method = m->core->resample_method;
     if ((rm = pa_modargs_get_value(ma, "resample_method", NULL))) {
         if ((resample_method = pa_parse_resample_method(rm)) < 0) {
             pa_log("invalid resample method '%s'", rm);
@@ -1506,6 +1642,8 @@ int pa__init(pa_module*m) {
     u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_EARLY, (pa_hook_cb_t) sink_unlink_hook_cb, u);
     u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], PA_HOOK_NORMAL, (pa_hook_cb_t) sink_state_changed_hook_cb, u);
 
+    u->thread_info.render_timestamp = 0;
+
     if (!(u->thread = pa_thread_new("combine", thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;


=====================================
src/modules/module-tunnel.c
=====================================
@@ -79,6 +79,7 @@ PA_MODULE_USAGE(
         "format=<sample format> "
         "channels=<number of channels> "
         "rate=<sample rate> "
+        "latency_msec=<fixed latency in ms> "
         "channel_map=<channel map>");
 #else
 PA_MODULE_DESCRIPTION("Tunnel module for sources");
@@ -92,6 +93,7 @@ PA_MODULE_USAGE(
         "format=<sample format> "
         "channels=<number of channels> "
         "rate=<sample rate> "
+        "latency_msec=<fixed latency in ms> "
         "channel_map=<channel map>");
 #endif
 
@@ -106,6 +108,7 @@ static const char* const valid_modargs[] = {
     "format",
     "channels",
     "rate",
+    "latency_msec",
 #ifdef TUNNEL_SINK
     "sink_name",
     "sink_properties",
@@ -121,7 +124,7 @@ static const char* const valid_modargs[] = {
 
 #define DEFAULT_TIMEOUT 5
 
-#define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
+#define LATENCY_INTERVAL (1*PA_USEC_PER_SEC)
 
 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
 
@@ -131,21 +134,22 @@ enum {
     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
     SINK_MESSAGE_REMOTE_SUSPEND,
     SINK_MESSAGE_UPDATE_LATENCY,
+    SINK_MESSAGE_GET_LATENCY_SNAPSHOT,
     SINK_MESSAGE_POST
 };
 
-#define DEFAULT_TLENGTH_MSEC 150
-#define DEFAULT_MINREQ_MSEC 25
+#define DEFAULT_LATENCY_MSEC 100
 
 #else
 
 enum {
     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
     SOURCE_MESSAGE_REMOTE_SUSPEND,
-    SOURCE_MESSAGE_UPDATE_LATENCY
+    SOURCE_MESSAGE_UPDATE_LATENCY,
+    SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT
 };
 
-#define DEFAULT_FRAGSIZE_MSEC 25
+#define DEFAULT_LATENCY_MSEC 25
 
 #endif
 
@@ -211,8 +215,11 @@ struct userdata {
     uint32_t ctag;
     uint32_t device_index;
     uint32_t channel;
+    uint32_t latency;
 
-    int64_t counter, counter_delta;
+    int64_t counter;
+    uint64_t receive_counter;
+    uint64_t receive_snapshot;
 
     bool remote_corked:1;
     bool remote_suspended:1;
@@ -517,6 +524,13 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             return 0;
         }
 
+        case SINK_MESSAGE_GET_LATENCY_SNAPSHOT: {
+            int64_t *send_counter = data;
+
+            *send_counter = u->counter;
+            return 0;
+        }
+
         case SINK_MESSAGE_REQUEST:
 
             pa_assert(offset > 0);
@@ -559,7 +573,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
 
-            u->counter_delta += (int64_t) chunk->length;
+            u->receive_counter += chunk->length;
 
             return 0;
     }
@@ -628,6 +642,13 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             return 0;
         }
 
+        case SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT: {
+            int64_t *send_counter = data;
+
+            *send_counter = u->counter;
+            return 0;
+        }
+
         case SOURCE_MESSAGE_POST: {
             pa_memchunk c;
 
@@ -779,6 +800,9 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint
     struct timeval local, remote, now;
     pa_sample_spec *ss;
     int64_t delay;
+#ifdef TUNNEL_SINK
+    uint64_t send_counter;
+#endif
 
     pa_assert(pd);
     pa_assert(u);
@@ -826,7 +850,7 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint
     pa_gettimeofday(&now);
 
     /* Calculate transport usec */
-    if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
+    if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now) < 0) {
         /* local and remote seem to have synchronized clocks */
 #ifdef TUNNEL_SINK
         u->transport_usec = pa_timeval_diff(&remote, &local);
@@ -859,11 +883,12 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint
     delay += (int64_t) u->transport_usec;
 #endif
 
-    /* Now correct by what we have have read/written since we requested the update */
+    /* Now correct by what we have have written since we requested the update. This
+     * is not necessary for the source, because if data is received between request
+     * and reply, it was already posted before we requested the source latency. */
 #ifdef TUNNEL_SINK
-    delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
-#else
-    delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
+    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_LATENCY_SNAPSHOT, &send_counter, 0, NULL);
+    delay += (int64_t) pa_bytes_to_usec(send_counter - u->receive_snapshot, ss);
 #endif
 
 #ifdef TUNNEL_SINK
@@ -901,7 +926,7 @@ static void request_latency(struct userdata *u) {
     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
 
     u->ignore_latency_before = tag;
-    u->counter_delta = 0;
+    u->receive_snapshot = u->receive_counter;
 }
 
 /* Called from main context */
@@ -1658,11 +1683,11 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
         u->maxlength = 4*1024*1024;
 
 #ifdef TUNNEL_SINK
-    u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
-    u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
+    u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->sink->sample_spec);
+    u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency / 4, &u->sink->sample_spec);
     u->prebuf = u->tlength;
 #else
-    u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
+    u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->source->sample_spec);
 #endif
 
 #ifdef TUNNEL_SINK
@@ -1823,7 +1848,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
 
     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
 
-    u->counter_delta += (int64_t) chunk->length;
+    u->receive_counter += chunk->length;
 }
 #endif
 
@@ -1932,6 +1957,7 @@ int pa__init(pa_module*m) {
     pa_sample_spec ss;
     pa_channel_map map;
     char *dn = NULL;
+    uint32_t latency_msec;
 #ifdef TUNNEL_SINK
     pa_sink_new_data data;
 #else
@@ -1979,7 +2005,9 @@ int pa__init(pa_module*m) {
     u->ignore_latency_before = 0;
     u->transport_usec = u->thread_transport_usec = 0;
     u->remote_suspended = u->remote_corked = false;
-    u->counter = u->counter_delta = 0;
+    u->counter = 0;
+    u->receive_snapshot = 0;
+    u->receive_counter = 0;
 
     u->rtpoll = pa_rtpoll_new();
 
@@ -1993,6 +2021,15 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
+    /* Allow latencies between 5ms and 500ms */
+    latency_msec = DEFAULT_LATENCY_MSEC;
+    if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 5 || latency_msec > 500) {
+        pa_log("Invalid latency specification");
+        goto fail;
+    }
+
+    u->latency = latency_msec;
+
     cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
     server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
 
@@ -2174,6 +2211,7 @@ int pa__init(pa_module*m) {
 
     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
     pa_sink_set_rtpoll(u->sink, u->rtpoll);
+    pa_sink_set_fixed_latency(u->sink, latency_msec * PA_USEC_PER_MSEC);
 
 #else
 
@@ -2214,6 +2252,7 @@ int pa__init(pa_module*m) {
 
     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
     pa_source_set_rtpoll(u->source, u->rtpoll);
+    pa_source_set_fixed_latency(u->source, latency_msec * PA_USEC_PER_MSEC);
 
     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
 #endif



View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/compare/b1057f1a37c10ba23855b2a264491cc4a3b45eca...735eb05e64cb4fe73b4a1f7cff9afa81a41449bb

-- 
View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/compare/b1057f1a37c10ba23855b2a264491cc4a3b45eca...735eb05e64cb4fe73b4a1f7cff9afa81a41449bb
You're receiving this email because of your account on gitlab.freedesktop.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/pulseaudio-commits/attachments/20210825/2f76dfec/attachment-0001.htm>


More information about the pulseaudio-commits mailing list