[pulseaudio-discuss] [PATCH v2] Second draft implementation of better drain reporting

David Henningsson david.henningsson at canonical.com
Thu Mar 7 07:40:33 PST 2013


Ok, I went back to the calculations and found what was wrong; the few samples played
back during the time of the actual fillup was not properly handled.

The unix_write part is untested.

Otherwise fixed according to Tanuk's suggestions (hopefully I didn't miss anything).

So what do you think remains before this patch can go in? Anybody who wants to do some
testing?

Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
 src/modules/alsa/alsa-sink.c    |   72 ++++++++++++++++++++++++++++++---------
 src/pulsecore/protocol-native.c |   58 ++++++++++++++++++++++---------
 src/pulsecore/sink-input.c      |   28 +++++++++++++--
 src/pulsecore/sink-input.h      |    8 +++++
 src/pulsecore/sink.c            |   23 +++++++++++++
 src/pulsecore/sink.h            |    2 ++
 6 files changed, 157 insertions(+), 34 deletions(-)

diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c
index 69d006e..44ffdaa 100644
--- a/src/modules/alsa/alsa-sink.c
+++ b/src/modules/alsa/alsa-sink.c
@@ -146,6 +146,8 @@ struct userdata {
     uint64_t since_start;
     pa_usec_t smoother_interval;
     pa_usec_t last_smoother_update;
+    int64_t since_fill;
+    int64_t last_avail;
 
     pa_idxset *formats;
 
@@ -508,7 +510,7 @@ static size_t check_left_to_play(struct userdata *u, size_t n_bytes, pa_bool_t o
 static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) {
     pa_bool_t work_done = FALSE;
     pa_usec_t max_sleep_usec = 0, process_usec = 0;
-    size_t left_to_play;
+    size_t left_to_play, input_underrun;
     unsigned j = 0;
 
     pa_assert(u);
@@ -535,11 +537,14 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
         }
 
         n_bytes = (size_t) n * u->frame_size;
+        u->since_fill += n_bytes - u->last_avail;
 
 #ifdef DEBUG_TIMING
-        pa_log_debug("avail: %lu", (unsigned long) n_bytes);
+        pa_log_debug("avail: %lu, since fill: %ld, advance: %ld", (unsigned long) n_bytes,
+                     (long) u->since_fill, (long) (n_bytes - u->last_avail));
 #endif
 
+        u->last_avail = n_bytes;
         left_to_play = check_left_to_play(u, n_bytes, on_timeout);
         on_timeout = FALSE;
 
@@ -600,6 +605,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             const snd_pcm_channel_area_t *areas;
             snd_pcm_uframes_t offset, frames;
             snd_pcm_sframes_t sframes;
+            size_t written;
 
             frames = (snd_pcm_uframes_t) (n_bytes / u->frame_size);
 /*             pa_log_debug("%lu frames to write", (unsigned long) frames); */
@@ -635,12 +641,14 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 
             p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
 
-            chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
+            written = frames * u->frame_size;
+            chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, written, TRUE);
             chunk.length = pa_memblock_get_length(chunk.memblock);
             chunk.index = 0;
 
             pa_sink_render_into_full(u->sink, &chunk);
             pa_memblock_unref_fixed(chunk.memblock);
+            u->since_fill = 0;
 
             if (PA_UNLIKELY((sframes = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {
 
@@ -655,21 +663,26 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 
             work_done = TRUE;
 
-            u->write_count += frames * u->frame_size;
-            u->since_start += frames * u->frame_size;
+            u->write_count += written;
+            u->since_start += written;
+            u->last_avail -= written;
 
 #ifdef DEBUG_TIMING
-            pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) (frames * u->frame_size), (unsigned long) n_bytes);
+            pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) written, (unsigned long) n_bytes);
 #endif
 
-            if ((size_t) frames * u->frame_size >= n_bytes)
+            if (written >= n_bytes)
                 break;
 
-            n_bytes -= (size_t) frames * u->frame_size;
+            n_bytes -= written;
         }
     }
 
+    input_underrun = pa_sink_process_input_underruns(u->sink, u->since_fill, left_to_play);
+
     if (u->use_tsched) {
+        pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec);
+
         *sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec);
         process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
 
@@ -677,6 +690,8 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             *sleep_usec -= process_usec;
         else
             *sleep_usec = 0;
+
+        *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep);
     } else
         *sleep_usec = 0;
 
@@ -686,7 +701,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) {
     pa_bool_t work_done = FALSE;
     pa_usec_t max_sleep_usec = 0, process_usec = 0;
-    size_t left_to_play;
+    size_t left_to_play, input_underrun;
     unsigned j = 0;
 
     pa_assert(u);
@@ -710,6 +725,14 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
         }
 
         n_bytes = (size_t) n * u->frame_size;
+        u->since_fill += n_bytes - u->last_avail;
+
+#ifdef DEBUG_TIMING
+        pa_log_debug("avail: %lu, since fill: %ld, advance: %ld", (unsigned long) n_bytes,
+                     (long) u->since_fill, (long) (n_bytes - u->last_avail));
+#endif
+
+        u->last_avail = n_bytes;
         left_to_play = check_left_to_play(u, n_bytes, on_timeout);
         on_timeout = FALSE;
 
@@ -754,11 +777,14 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
         for (;;) {
             snd_pcm_sframes_t frames;
             void *p;
+            size_t written;
 
 /*         pa_log_debug("%lu frames to write", (unsigned long) frames); */
 
-            if (u->memchunk.length <= 0)
+            if (u->memchunk.length <= 0) {
                 pa_sink_render(u->sink, n_bytes, &u->memchunk);
+                u->since_fill = 0;
+            }
 
             pa_assert(u->memchunk.length > 0);
 
@@ -788,8 +814,9 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             pa_assert(frames > 0);
             after_avail = FALSE;
 
-            u->memchunk.index += (size_t) frames * u->frame_size;
-            u->memchunk.length -= (size_t) frames * u->frame_size;
+            written = frames * u->frame_size;
+            u->memchunk.index += written;
+            u->memchunk.length -= written;
 
             if (u->memchunk.length <= 0) {
                 pa_memblock_unref(u->memchunk.memblock);
@@ -798,19 +825,24 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 
             work_done = TRUE;
 
-            u->write_count += frames * u->frame_size;
-            u->since_start += frames * u->frame_size;
+            u->write_count += written;
+            u->since_start += written;
+            u->last_avail -= written;
 
 /*         pa_log_debug("wrote %lu frames", (unsigned long) frames); */
 
-            if ((size_t) frames * u->frame_size >= n_bytes)
+            if (written >= n_bytes)
                 break;
 
-            n_bytes -= (size_t) frames * u->frame_size;
+            n_bytes -= written;
         }
     }
 
+    input_underrun = pa_sink_process_input_underruns(u->sink, u->since_fill, left_to_play);
+
     if (u->use_tsched) {
+        pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec);
+
         *sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec);
         process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
 
@@ -818,6 +850,8 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             *sleep_usec -= process_usec;
         else
             *sleep_usec = 0;
+
+        *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep);
     } else
         *sleep_usec = 0;
 
@@ -1099,6 +1133,8 @@ static int unsuspend(struct userdata *u) {
     pa_smoother_reset(u->smoother, pa_rtclock_now(), TRUE);
     u->smoother_interval = SMOOTHER_MIN_INTERVAL;
     u->last_smoother_update = 0;
+    u->since_fill = 0;
+    u->last_avail = u->hwbuf_size;
 
     u->first = TRUE;
     u->since_start = 0;
@@ -1663,6 +1699,8 @@ static int process_rewind(struct userdata *u) {
             pa_log_info("Tried rewind, but was apparently not possible.");
         else {
             u->write_count -= rewind_nbytes;
+            u->since_fill -= rewind_nbytes;
+            u->last_avail += rewind_nbytes;
             pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
             pa_sink_process_rewind(u->sink, rewind_nbytes);
 
@@ -2311,6 +2349,8 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca
     u->frame_size = frame_size;
     u->fragment_size = frag_size = (size_t) (period_frames * frame_size);
     u->hwbuf_size = buffer_size = (size_t) (buffer_frames * frame_size);
+    u->last_avail = u->hwbuf_size;
+
     pa_cvolume_mute(&u->hardware_volume, u->sink->sample_spec.channels);
 
     pa_log_info("Using %0.1f fragments of size %lu bytes (%0.2fms), buffer size is %lu bytes (%0.2fms)",
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index a4ee920..ae467f4 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -231,6 +231,7 @@ enum {
     CONNECTION_MESSAGE_REVOKE
 };
 
+static bool sink_input_process_underrun_cb(pa_sink_input *i);
 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
 static void sink_input_kill_cb(pa_sink_input *i);
 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
@@ -1171,6 +1172,7 @@ static playback_stream* playback_stream_new(
 
     s->sink_input->parent.process_msg = sink_input_process_msg;
     s->sink_input->pop = sink_input_pop_cb;
+    s->sink_input->process_underrun = sink_input_process_underrun_cb;
     s->sink_input->process_rewind = sink_input_process_rewind_cb;
     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
     s->sink_input->update_max_request = sink_input_update_max_request_cb;
@@ -1573,6 +1575,44 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
 }
 
+
+static bool handle_input_underrun(playback_stream *s, bool force)
+{
+    bool send_drain;
+
+    if (pa_memblockq_is_readable(s->memblockq))
+        return false;
+
+    if (!s->is_underrun)
+        pa_log_debug("%s %s of '%s'", force ? "Actual" : "Implicit",
+            s->drain_request ? "drain" : "underrun", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
+
+    send_drain = s->drain_request && (force || pa_sink_input_safe_to_remove(s->sink_input));
+
+    if (send_drain) {
+         s->drain_request = false;
+         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
+         pa_log_debug("Drain acknowledged of '%s'", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
+    } else if (!s->is_underrun) {
+         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
+    }
+    s->is_underrun = true;
+    playback_stream_request_bytes(s);
+    return true;
+}
+
+/* Called from thread context */
+static bool sink_input_process_underrun_cb(pa_sink_input *i) {
+    playback_stream *s;
+
+    pa_sink_input_assert_ref(i);
+    s = PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+
+    return handle_input_underrun(s, true);
+}
+
+
 /* Called from thread context */
 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
     playback_stream *s;
@@ -1586,22 +1626,8 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
 #endif
 
-    if (pa_memblockq_is_readable(s->memblockq))
-        s->is_underrun = FALSE;
-    else {
-        if (!s->is_underrun)
-            pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
-
-        if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
-            s->drain_request = FALSE;
-            pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
-        } else if (!s->is_underrun)
-            pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
-
-        s->is_underrun = TRUE;
-
-        playback_stream_request_bytes(s);
-    }
+    if (!handle_input_underrun(s, false))
+        s->is_underrun = false;
 
     /* This call will not fail with prebuf=0, hence we check for
        underrun explicitly above */
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 519f47e..621e2a7 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -532,6 +532,7 @@ int pa_sink_input_new(
     i->thread_info.rewrite_flush = FALSE;
     i->thread_info.dont_rewind_render = FALSE;
     i->thread_info.underrun_for = (uint64_t) -1;
+    i->thread_info.underrun_for_sink = 0;
     i->thread_info.playing_for = 0;
     i->thread_info.direct_outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
 
@@ -827,7 +828,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i, pa_usec_t *sink_latency) {
 }
 
 /* Called from thread context */
-void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa_memchunk *chunk, pa_cvolume *volume) {
+void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa_memchunk *chunk, pa_cvolume *volume) {
     pa_bool_t do_volume_adj_here, need_volume_factor_sink;
     pa_bool_t volume_is_norm;
     size_t block_size_max_sink, block_size_max_sink_input;
@@ -896,8 +897,10 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
 
             pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE);
             i->thread_info.playing_for = 0;
-            if (i->thread_info.underrun_for != (uint64_t) -1)
+            if (i->thread_info.underrun_for != (uint64_t) -1) {
                 i->thread_info.underrun_for += ilength_full;
+                i->thread_info.underrun_for_sink += slength;
+            }
             break;
         }
 
@@ -907,6 +910,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
         pa_assert(tchunk.memblock);
 
         i->thread_info.underrun_for = 0;
+        i->thread_info.underrun_for_sink = 0;
         i->thread_info.playing_for += tchunk.length;
 
         while (tchunk.length > 0) {
@@ -1020,6 +1024,25 @@ void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec *
 }
 
 /* Called from thread context */
+bool pa_sink_input_process_underrun(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */) {
+    pa_sink_input_assert_ref(i);
+    pa_sink_input_assert_io_context(i);
+
+    if (nbytes < pa_memblockq_get_maxrewind(i->thread_info.render_memblockq))
+        return false;
+    if (pa_memblockq_is_readable(i->thread_info.render_memblockq))
+        return false;
+
+    if (i->process_underrun && i->process_underrun(i)) {
+        /* All valid data has been played back, so we can empty this queue. */
+        pa_memblockq_silence(i->thread_info.render_memblockq);
+        return true;
+    }
+    return false;
+}
+
+
+/* Called from thread context */
 void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
     size_t lbq;
     pa_bool_t called = FALSE;
@@ -1903,6 +1926,7 @@ void pa_sink_input_set_state_within_thread(pa_sink_input *i, pa_sink_input_state
         pa_log_debug("Requesting rewind due to uncorking");
 
         i->thread_info.underrun_for = (uint64_t) -1;
+        i->thread_info.underrun_for_sink = 0;
         i->thread_info.playing_for = 0;
 
         /* Set the uncorked state *before* requesting rewind */
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index 8bd5912..5790589 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -135,6 +135,11 @@ struct pa_sink_input {
      * the full block. */
     int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); /* may NOT be NULL */
 
+    /* This is called when the playback buffer has actually played back
+       all available data. Return true unless there is more data to play back.
+       Called from IO context. */
+    bool (*process_underrun) (pa_sink_input *i);
+
     /* Rewind the queue by the specified number of bytes. Called just
      * before peek() if it is called at all. Only called if the sink
      * input driver ever plans to call
@@ -232,6 +237,7 @@ struct pa_sink_input {
         pa_bool_t rewrite_flush:1, dont_rewind_render:1;
         size_t rewrite_nbytes;
         uint64_t underrun_for, playing_for;
+        uint64_t underrun_for_sink; /* Like underrun_for, but in sink sample spec */
 
         pa_sample_spec sample_spec;
 
@@ -407,6 +413,8 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t
 pa_usec_t pa_sink_input_set_requested_latency_within_thread(pa_sink_input *i, pa_usec_t usec);
 
 pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i);
+bool pa_sink_input_process_underrun(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */);
+
 
 pa_memchunk* pa_sink_input_get_silence(pa_sink_input *i, pa_memchunk *ret);
 
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 6ebe956..1d9e296 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -915,6 +915,29 @@ void pa_sink_move_all_fail(pa_queue *q) {
     pa_queue_free(q, NULL);
 }
 
+ /* Called from IO thread context */
+size_t pa_sink_process_input_underruns(pa_sink *s, size_t since_fill, size_t left_to_play) {
+    pa_sink_input *i;
+    void *state = NULL;
+    size_t result = 0;
+
+    pa_sink_assert_ref(s);
+    pa_sink_assert_io_context(s);
+    PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state) {
+        size_t uf = i->thread_info.underrun_for_sink;
+        if (uf == 0)
+            continue;
+        if (pa_sink_input_process_underrun(i, uf + since_fill))
+            continue;
+        if (uf < left_to_play && uf > result)
+            result = uf;
+    }
+
+    if (result > 0)
+        pa_log_debug("Found underrun %ld bytes ago (%ld bytes ahead in playback buffer)", (long) result, (long) left_to_play - result);
+    return left_to_play - result;
+}
+
 /* Called from IO thread context */
 void pa_sink_process_rewind(pa_sink *s, size_t nbytes) {
     pa_sink_input *i;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index fcda5ef..0803216 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -492,6 +492,8 @@ void pa_sink_update_volume_and_mute(pa_sink *s);
 
 pa_bool_t pa_sink_volume_change_apply(pa_sink *s, pa_usec_t *usec_to_next);
 
+size_t pa_sink_process_input_underruns(pa_sink *s, size_t since_fill, size_t left_to_play);
+
 /*** To be called exclusively by sink input drivers, from IO context */
 
 void pa_sink_request_rewind(pa_sink*s, size_t nbytes);
-- 
1.7.9.5



More information about the pulseaudio-discuss mailing list