[pulseaudio-discuss] [RFC PATCH] sink-input: Add history memblockq

Georg Chini georg at chini.tk
Sun Jun 16 10:26:46 UTC 2019


A new memblockq is added to the sink input code to keep some history of the
input data. The queue is kept in sync with the render memblockq. The old input
data will be used to prepare the resampler after a reset.

ALso, the helper functions pa_convert_to_sink_length() and
pa_convert_to_sink_input_length() have been introduced to allow easy conversion
between sink and sink input sample spec. pa_convert_to_sink_input_length() does
not use pa_resampler_result() because this function always rounds up and returns
wrong values for the ffmpeg resampler. The helper functions have been designed
to round down where necessary.
---
 src/pulsecore/sink-input.c  | 88 +++++++++++++++++++++++++++++--------
 src/pulsecore/sink-input.h  |  4 ++
 src/pulsecore/stream-util.c | 27 ++++++++++++
 src/pulsecore/stream-util.h |  4 ++
 4 files changed, 105 insertions(+), 18 deletions(-)

diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index f2c289bed..36857d2f9 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -45,6 +45,7 @@
 
 #define MEMBLOCKQ_MAXLENGTH (32*1024*1024)
 #define CONVERT_BUFFER_LENGTH (pa_page_size())
+#define RESAMPLER_MAX_HISTORY 64
 
 PA_DEFINE_PUBLIC_CLASS(pa_sink_input, pa_msgobject);
 
@@ -293,6 +294,7 @@ int pa_sink_input_new(
     int r;
     char *pt;
     char *memblockq_name;
+    pa_memchunk silence;
 
     pa_assert(_i);
     pa_assert(core);
@@ -573,6 +575,21 @@ int pa_sink_input_new(
             &i->sink->silence);
     pa_xfree(memblockq_name);
 
+    memblockq_name = pa_sprintf_malloc("sink input history memblockq [%u]", i->index);
+    pa_sink_input_get_silence(i, &silence);
+    i->thread_info.history_memblockq = pa_memblockq_new(
+            memblockq_name,
+            0,
+            MEMBLOCKQ_MAXLENGTH,
+            0,
+            &i->sample_spec,
+            0,
+            1,
+            0,
+            &silence);
+    pa_xfree(memblockq_name);
+    pa_memblock_unref(silence.memblock);
+
     pt = pa_proplist_to_string_sep(i->proplist, "\n    ");
     pa_log_info("Created input %u \"%s\" on %s with sample spec %s and channel map %s\n    %s",
                 i->index,
@@ -756,6 +773,9 @@ static void sink_input_free(pa_object *o) {
     if (i->thread_info.render_memblockq)
         pa_memblockq_free(i->thread_info.render_memblockq);
 
+    if (i->thread_info.history_memblockq)
+        pa_memblockq_free(i->thread_info.history_memblockq);
+
     if (i->thread_info.resampler)
         pa_resampler_free(i->thread_info.resampler);
 
@@ -922,6 +942,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa
              * data, so let's just hand out silence */
 
             pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, true);
+            pa_memblockq_seek(i->thread_info.history_memblockq, (int64_t) ilength_full, PA_SEEK_RELATIVE, true);
             i->thread_info.playing_for = 0;
             if (i->thread_info.underrun_for != (uint64_t) -1) {
                 i->thread_info.underrun_for += ilength_full;
@@ -969,6 +990,9 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa
                     pa_volume_memchunk(&wchunk, &i->thread_info.sample_spec, &i->thread_info.soft_volume);
             }
 
+            /* Push chunk into history queue to retain some resampler input history. */
+            pa_memblockq_push(i->thread_info.history_memblockq, &wchunk);
+
             if (!i->thread_info.resampler) {
 
                 if (nvfs) {
@@ -1033,6 +1057,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa
 
 /* Called from thread context */
 void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
+    int64_t rbq, hbq;
 
     pa_sink_input_assert_ref(i);
     pa_sink_input_assert_io_context(i);
@@ -1045,6 +1070,21 @@ void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec *
 #endif
 
     pa_memblockq_drop(i->thread_info.render_memblockq, nbytes);
+
+    /* Keep memblockq's in sync. Using pa_convert_to_sink_input_length()
+     * on nbytes will not work here because of rounding. */
+    rbq = pa_memblockq_get_write_index(i->thread_info.render_memblockq);
+    rbq -= pa_memblockq_get_read_index(i->thread_info.render_memblockq);
+    hbq = pa_memblockq_get_length(i->thread_info.history_memblockq);
+    if (rbq >= 0)
+        rbq = pa_convert_to_sink_input_length(i, rbq);
+    else
+        rbq = - (int64_t) pa_convert_to_sink_input_length(i, - rbq);
+
+    if (hbq > rbq)
+        pa_memblockq_drop(i->thread_info.history_memblockq, hbq - rbq);
+    else if (rbq > hbq)
+        pa_memblockq_rewind(i->thread_info.history_memblockq, rbq - hbq);
 }
 
 /* Called from thread context */
@@ -1058,6 +1098,7 @@ bool pa_sink_input_process_underrun(pa_sink_input *i) {
     if (i->process_underrun && i->process_underrun(i)) {
         /* All valid data has been played back, so we can empty this queue. */
         pa_memblockq_silence(i->thread_info.render_memblockq);
+        pa_memblockq_silence(i->thread_info.history_memblockq);
         return true;
     }
     return false;
@@ -1067,6 +1108,7 @@ bool pa_sink_input_process_underrun(pa_sink_input *i) {
 void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
     size_t lbq;
     bool called = false;
+    size_t sink_input_nbytes;
 
     pa_sink_input_assert_ref(i);
     pa_sink_input_assert_io_context(i);
@@ -1078,10 +1120,12 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
 #endif
 
     lbq = pa_memblockq_get_length(i->thread_info.render_memblockq);
+    sink_input_nbytes = pa_convert_to_sink_input_length(i, nbytes);
 
     if (nbytes > 0 && !i->thread_info.dont_rewind_render) {
         pa_log_debug("Have to rewind %lu bytes on render memblockq.", (unsigned long) nbytes);
         pa_memblockq_rewind(i->thread_info.render_memblockq, nbytes);
+        pa_memblockq_rewind(i->thread_info.history_memblockq, sink_input_nbytes);
     }
 
     if (i->thread_info.rewrite_nbytes == (size_t) -1) {
@@ -1090,6 +1134,7 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
          * data from implementor the next time peek() is called */
 
         pa_memblockq_flush_write(i->thread_info.render_memblockq, true);
+        pa_memblockq_flush_write(i->thread_info.history_memblockq, true);
 
     } else if (i->thread_info.rewrite_nbytes > 0) {
         size_t max_rewrite, amount;
@@ -1105,8 +1150,7 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
             max_rewrite = PA_MIN(i->sink->thread_info.max_rewind, max_rewrite);
 
         /* Transform into local domain */
-        if (i->thread_info.resampler)
-            max_rewrite = pa_resampler_request(i->thread_info.resampler, max_rewrite);
+        max_rewrite = pa_convert_to_sink_input_length(i, max_rewrite);
 
         /* Calculate how much of the rewinded data should actually be rewritten */
         amount = PA_MIN(i->thread_info.rewrite_nbytes, max_rewrite);
@@ -1119,20 +1163,25 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
                 i->process_rewind(i, amount);
             called = true;
 
+            if (amount > 0)
+                /* Update the history write pointer */
+                pa_memblockq_seek(i->thread_info.history_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE, true);
+
             /* Convert back to sink domain */
-            if (i->thread_info.resampler)
-                amount = pa_resampler_result(i->thread_info.resampler, amount);
+            amount = pa_convert_to_sink_length(i, amount);
 
             if (amount > 0)
                 /* Ok, now update the write pointer */
                 pa_memblockq_seek(i->thread_info.render_memblockq, - ((int64_t) amount), PA_SEEK_RELATIVE, true);
 
-            if (i->thread_info.rewrite_flush)
-                pa_memblockq_silence(i->thread_info.render_memblockq);
-
             /* And rewind the resampler */
             if (i->thread_info.resampler)
                 pa_resampler_rewind(i->thread_info.resampler, amount);
+
+            if (i->thread_info.rewrite_flush) {
+                pa_memblockq_silence(i->thread_info.render_memblockq);
+                pa_memblockq_silence(i->thread_info.history_memblockq);
+            }
         }
     }
 
@@ -1150,7 +1199,7 @@ size_t pa_sink_input_get_max_rewind(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_sink_input_assert_io_context(i);
 
-    return i->thread_info.resampler ? pa_resampler_request(i->thread_info.resampler, i->sink->thread_info.max_rewind) : i->sink->thread_info.max_rewind;
+    return pa_convert_to_sink_input_length(i, i->sink->thread_info.max_rewind);
 }
 
 /* Called from thread context */
@@ -1161,11 +1210,13 @@ size_t pa_sink_input_get_max_request(pa_sink_input *i) {
     /* We're not verifying the status here, to allow this to be called
      * in the state change handler between _INIT and _RUNNING */
 
-    return i->thread_info.resampler ? pa_resampler_request(i->thread_info.resampler, i->sink->thread_info.max_request) : i->sink->thread_info.max_request;
+    return pa_convert_to_sink_input_length(i, i->sink->thread_info.max_request);
 }
 
 /* Called from thread context */
 void pa_sink_input_update_max_rewind(pa_sink_input *i, size_t nbytes  /* in the sink's sample spec */) {
+    size_t max_rewind;
+
     pa_sink_input_assert_ref(i);
     pa_sink_input_assert_io_context(i);
     pa_assert(PA_SINK_INPUT_IS_LINKED(i->thread_info.state));
@@ -1173,8 +1224,12 @@ void pa_sink_input_update_max_rewind(pa_sink_input *i, size_t nbytes  /* in the
 
     pa_memblockq_set_maxrewind(i->thread_info.render_memblockq, nbytes);
 
+    max_rewind = pa_convert_to_sink_input_length(i, nbytes);
+
+    pa_memblockq_set_maxrewind(i->thread_info.history_memblockq, max_rewind + RESAMPLER_MAX_HISTORY * pa_frame_size(&i->sample_spec));
+
     if (i->update_max_rewind)
-        i->update_max_rewind(i, i->thread_info.resampler ? pa_resampler_request(i->thread_info.resampler, nbytes) : nbytes);
+        i->update_max_rewind(i, max_rewind);
 }
 
 /* Called from thread context */
@@ -1185,7 +1240,7 @@ void pa_sink_input_update_max_request(pa_sink_input *i, size_t nbytes  /* in the
     pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec));
 
     if (i->update_max_request)
-        i->update_max_request(i, i->thread_info.resampler ? pa_resampler_request(i->thread_info.resampler, nbytes) : nbytes);
+        i->update_max_request(i, pa_convert_to_sink_input_length(i, nbytes));
 }
 
 /* Called from thread context */
@@ -2183,14 +2238,11 @@ void pa_sink_input_request_rewind(
         nbytes = i->sink->thread_info.max_rewind + lbq;
 
         /* Transform from sink domain */
-        if (i->thread_info.resampler)
-            nbytes = pa_resampler_request(i->thread_info.resampler, nbytes);
+        nbytes = pa_convert_to_sink_input_length(i, nbytes);
     }
 
     /* Transform max_rewind from sink domain */
-    max_rewind = i->sink->thread_info.max_rewind;
-    if (i->thread_info.resampler)
-        max_rewind = pa_resampler_request(i->thread_info.resampler, max_rewind);
+    max_rewind = pa_convert_to_sink_input_length(i, i->sink->thread_info.max_rewind);
 
     /* If the sink is virtual or has a virtual sink attached, hard limit rewinding
      * to max_rewind */
@@ -2220,8 +2272,7 @@ void pa_sink_input_request_rewind(
     if (nbytes != (size_t) -1) {
 
         /* Transform to sink domain */
-        if (i->thread_info.resampler)
-            nbytes = pa_resampler_result(i->thread_info.resampler, nbytes);
+        nbytes = pa_convert_to_sink_length(i, nbytes);
 
         if (nbytes > lbq)
             pa_sink_request_rewind(i->sink, nbytes - lbq);
@@ -2326,6 +2377,7 @@ int pa_sink_input_update_resampler(pa_sink_input *i) {
     i->thread_info.resampler = new_resampler;
 
     pa_memblockq_free(i->thread_info.render_memblockq);
+    pa_memblockq_flush_write(i->thread_info.history_memblockq, true);
 
     memblockq_name = pa_sprintf_malloc("sink input render_memblockq [%u]", i->index);
     i->thread_info.render_memblockq = pa_memblockq_new(
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index 129039007..83e81f6ad 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -254,6 +254,10 @@ struct pa_sink_input {
         /* We maintain a history of resampled audio data here. */
         pa_memblockq *render_memblockq;
 
+        /* This queue keeps the history before resampling and is used
+         * when rewinding the resampler. */
+        pa_memblockq *history_memblockq;
+
         pa_sink_input *sync_prev, *sync_next;
 
         /* The requested latency for the sink */
diff --git a/src/pulsecore/stream-util.c b/src/pulsecore/stream-util.c
index 257450154..2a31f7369 100644
--- a/src/pulsecore/stream-util.c
+++ b/src/pulsecore/stream-util.c
@@ -82,3 +82,30 @@ int pa_stream_get_volume_channel_map(const pa_cvolume *volume, const pa_channel_
 
     return -PA_ERR_INVALID;
 }
+
+/* Translates bytes from sink input sample spec to bytes in sink sample
+ * spec. The number of frames is always rounded down. */
+size_t pa_convert_to_sink_length(pa_sink_input *i, size_t length) {
+
+    /* Convert to frames */
+    length = length / pa_frame_size(&i->sample_spec);
+    /* Convert frames to sink sample rate */
+    length = length * i->sink->sample_spec.rate / i->sample_spec.rate;
+    /* Convert to bytes */
+    length *= pa_frame_size(&i->sink->sample_spec);
+    return length;
+}
+
+/* Translates bytes from sink sample spec to bytes in sink input sample
+ * spec. The number of frames is rounded down if the sink rate is larger
+ * than the sink input rate to avoid producing too many samples on the
+ * sink side. */
+size_t pa_convert_to_sink_input_length(pa_sink_input *i, size_t length) {
+
+    /* Transform from sink into sink input domain */
+    if (i->thread_info.resampler)
+        length = pa_resampler_request(i->thread_info.resampler, length);
+    if (i->sample_spec.rate < i->sink->sample_spec.rate && length > 0)
+        length = length - pa_frame_size(&i->sample_spec);
+    return length;
+}
diff --git a/src/pulsecore/stream-util.h b/src/pulsecore/stream-util.h
index b2e47fe4a..8bfc59ecc 100644
--- a/src/pulsecore/stream-util.h
+++ b/src/pulsecore/stream-util.h
@@ -22,6 +22,7 @@
 
 #include <pulse/format.h>
 #include <pulse/volume.h>
+#include <pulsecore/sink-input.h>
 
 /* This is a helper function that is called from pa_sink_input_new() and
  * pa_source_output_new(). The job of this function is to figure out what
@@ -45,4 +46,7 @@
 int pa_stream_get_volume_channel_map(const pa_cvolume *volume, const pa_channel_map *original_map, const pa_format_info *format,
                                      pa_channel_map *volume_map);
 
+size_t pa_convert_to_sink_length(pa_sink_input *i, size_t length);
+size_t pa_convert_to_sink_input_length(pa_sink_input *i, size_t length);
+
 #endif
-- 
2.20.1



More information about the pulseaudio-discuss mailing list