[pulseaudio-discuss] [PATCH] combine-sinks: add suport for DYNAMIC_LATENCY

Wim Taymans wim.taymans at gmail.com
Tue Apr 15 06:21:27 PDT 2014


Mark the sink as DYNAMIC_LATENCY and implement update_sink_latency_range
on its sink-input to collect the combined latency range of all sinks.

Implement update_requested_latency on the sink to configure the final
latency by combining the sink-input requested latencies. This makes us
honour the client latency request.

We don't need to call update_max_request() and update_fixed_latency()
when adding and removing outputs, this is already done when we attach
the sink-inputs.

Also add more debug log.

Fixes https://bugs.freedesktop.org/show_bug.cgi?id=47899
---
 src/modules/module-combine-sink.c | 135 +++++++++++++++++++++++++++-----------
 1 file changed, 98 insertions(+), 37 deletions(-)

diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
index deabceb..d705542 100644
--- a/src/modules/module-combine-sink.c
+++ b/src/modules/module-combine-sink.c
@@ -104,7 +104,8 @@ struct output {
 
     /* For communication of the stream parameters to the sink thread */
     pa_atomic_t max_request;
-    pa_atomic_t requested_latency;
+    pa_atomic_t max_latency;
+    pa_atomic_t min_latency;
 
     PA_LLIST_FIELDS(struct output);
 };
@@ -150,11 +151,12 @@ enum {
     SINK_MESSAGE_NEED,
     SINK_MESSAGE_UPDATE_LATENCY,
     SINK_MESSAGE_UPDATE_MAX_REQUEST,
-    SINK_MESSAGE_UPDATE_REQUESTED_LATENCY
+    SINK_MESSAGE_UPDATE_LATENCY_RANGE
 };
 
 enum {
     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
+    SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY
 };
 
 static void output_disable(struct output *o);
@@ -469,35 +471,44 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
         return;
 
     pa_atomic_store(&o->max_request, (int) nbytes);
+    pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes);
     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
 }
 
 /* Called from thread context */
-static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
+static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
     struct output *o;
-    pa_usec_t c;
+    pa_usec_t min, max, fix;
 
     pa_assert(i);
 
     pa_sink_input_assert_ref(i);
     pa_assert_se(o = i->userdata);
 
-    c = pa_sink_get_requested_latency_within_thread(i->sink);
-
-    if (c == (pa_usec_t) -1)
-        c = i->sink->thread_info.max_latency;
+    fix = i->sink->thread_info.fixed_latency;
+    if (fix > 0) {
+      min = fix;
+      max = fix;
+    } else {
+      min = i->sink->thread_info.min_latency;
+      max = i->sink->thread_info.max_latency;
+    }
 
-    if (pa_atomic_load(&o->requested_latency) == (int) c)
+    if ((pa_atomic_load(&o->min_latency) == (int) min) &&
+        (pa_atomic_load(&o->max_latency) == (int) max))
         return;
 
-    pa_atomic_store(&o->requested_latency, (int) c);
-    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
+    pa_atomic_store(&o->min_latency, (int) min);
+    pa_atomic_store(&o->max_latency, (int) max);
+    pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max);
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
 }
 
 /* Called from I/O thread context */
 static void sink_input_attach_cb(pa_sink_input *i) {
     struct output *o;
-    pa_usec_t c;
+    pa_usec_t fix, min, max;
+    size_t nbytes;
 
     pa_sink_input_assert_ref(i);
     pa_assert_se(o = i->userdata);
@@ -517,13 +528,23 @@ static void sink_input_attach_cb(pa_sink_input *i) {
 
     pa_sink_input_request_rewind(i, 0, false, true, true);
 
-    pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i));
+    nbytes = pa_sink_input_get_max_request(i);
+    pa_atomic_store(&o->max_request, (int) nbytes);
+    pa_log_debug("attach max request %lu", (unsigned long) nbytes);
 
-    c = pa_sink_get_requested_latency_within_thread(i->sink);
-    pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c));
+    fix = i->sink->thread_info.fixed_latency;
+    if (fix > 0) {
+      min = max = fix;
+    } else {
+      min = i->sink->thread_info.min_latency;
+      max = i->sink->thread_info.max_latency;
+    }
+    pa_atomic_store(&o->min_latency, (int) min);
+    pa_atomic_store(&o->max_latency, (int) max);
+    pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max);
 
     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
-    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
 }
 
 /* Called from I/O thread context */
@@ -580,6 +601,14 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
                 pa_memblockq_flush_write(o->memblockq, true);
 
             return 0;
+
+        case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: {
+            pa_usec_t latency = (pa_usec_t) offset;
+
+            pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency);
+
+            return 0;
+        }
     }
 
     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -671,31 +700,37 @@ static void update_max_request(struct userdata *u) {
     if (max_request <= 0)
         max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
 
+    pa_log_debug("Sink update max request %lu", (unsigned long) max_request);
     pa_sink_set_max_request_within_thread(u->sink, max_request);
 }
 
 /* Called from IO context */
-static void update_fixed_latency(struct userdata *u) {
-    pa_usec_t fixed_latency = 0;
+static void update_latency_range(struct userdata *u) {
+    pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1;
     struct output *o;
 
     pa_assert(u);
     pa_sink_assert_io_context(u->sink);
 
-    /* Collects the requested_latency values of all streams and sets
-     * the largest one as fixed_latency locally */
-
+    /* Collects the latency_range values of all streams and sets
+     * the max of min and min of max locally */
     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
-        pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency);
+        pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency);
+        pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency);
 
-        if (rl > fixed_latency)
-            fixed_latency = rl;
+        if (min > min_latency)
+            min_latency = min;
+        if (max_latency == (pa_usec_t) -1 || max < max_latency)
+            max_latency = max;
     }
-
-    if (fixed_latency <= 0)
-        fixed_latency = u->block_usec;
-
-    pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency);
+    if (max_latency == (pa_usec_t) -1)
+      /* no outputs, use block size */
+      min_latency = max_latency = u->block_usec;
+    else if (max_latency < min_latency)
+      max_latency = min_latency;
+
+    pa_log_debug("Sink update latency range %lu %lu", min_latency, max_latency);
+    pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency);
 }
 
 /* Called from thread context of the io thread */
@@ -735,6 +770,28 @@ static void output_remove_within_thread(struct output *o) {
     }
 }
 
+/* Called from sink I/O thread context */
+static void sink_update_requested_latency(pa_sink *s) {
+    struct userdata *u;
+    struct output *o;
+    pa_usec_t latency;
+
+    pa_sink_assert_ref(s);
+    pa_assert_se(u = s->userdata);
+
+    latency = pa_sink_get_requested_latency_within_thread(s);
+    if (latency == (pa_usec_t) -1)
+      return;
+
+    pa_log_debug("Sink update requested latency %0.2f", (double) latency / PA_USEC_PER_MSEC);
+
+    /* Just hand this one over to all sink_inputs */
+    PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
+        pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, latency, NULL, NULL);
+    }
+}
+
+
 /* Called from thread context of the io thread */
 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
@@ -772,14 +829,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
         case SINK_MESSAGE_ADD_OUTPUT:
             output_add_within_thread(data);
-            update_max_request(u);
-            update_fixed_latency(u);
             return 0;
 
         case SINK_MESSAGE_REMOVE_OUTPUT:
             output_remove_within_thread(data);
-            update_max_request(u);
-            update_fixed_latency(u);
             return 0;
 
         case SINK_MESSAGE_NEED:
@@ -805,9 +858,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             update_max_request(u);
             break;
 
-        case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY:
-            update_fixed_latency(u);
+        case SINK_MESSAGE_UPDATE_LATENCY_RANGE:
+            update_latency_range(u);
             break;
+
 }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -879,7 +933,7 @@ static int output_create_sink_input(struct output *o) {
     o->sink_input->process_rewind = sink_input_process_rewind_cb;
     o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
     o->sink_input->update_max_request = sink_input_update_max_request_cb;
-    o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
+    o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
     o->sink_input->attach = sink_input_attach_cb;
     o->sink_input->detach = sink_input_detach_cb;
     o->sink_input->kill = sink_input_kill_cb;
@@ -997,6 +1051,11 @@ static void output_disable(struct output *o) {
      * pass any further data to this output */
     pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
 
+    /* now that this sink is removed from the list, update the combined properties
+     * of the remaining sinks */
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
+
     /* Now deallocate the stream */
     pa_sink_input_unref(o->sink_input);
     o->sink_input = NULL;
@@ -1271,7 +1330,7 @@ int pa__init(pa_module*m) {
         pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
     }
 
-    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
+    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
     pa_sink_new_data_done(&data);
 
     if (!u->sink) {
@@ -1281,6 +1340,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg;
     u->sink->set_state = sink_set_state;
+    u->sink->update_requested_latency = sink_update_requested_latency;
     u->sink->userdata = u;
 
     pa_sink_set_rtpoll(u->sink, u->rtpoll);
@@ -1288,6 +1348,7 @@ int pa__init(pa_module*m) {
 
     u->block_usec = BLOCK_USEC;
     pa_sink_set_max_request(u->sink, pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec));
+    pa_sink_set_latency_range_within_thread(u->sink, u->block_usec, u->block_usec);
 
     if (!u->automatic) {
         const char*split_state;
-- 
1.9.0



More information about the pulseaudio-discuss mailing list