[pulseaudio-discuss] [PATCH] Third draft implementation of better drain reporting

David Henningsson david.henningsson at canonical.com
Mon Mar 18 05:32:24 PDT 2013


Ok, so using sink_input->render_memblockq.max_rewind looked like a good idea, but in
fact it does not work with early requests, which alsa-plugins use.
I haven't found any drawback from skipping this check, and it actually makes the patch simpler, too.

I've tested it now with both aplay and paplay (and both in parallel!) and it seems to work.
Anybody else who wants to run it for a test?

Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
 src/modules/alsa/alsa-sink.c    |   51 ++++++++++++++++++++++++----------
 src/pulsecore/protocol-native.c |   58 ++++++++++++++++++++++++++++-----------
 src/pulsecore/sink-input.c      |   26 ++++++++++++++++--
 src/pulsecore/sink-input.h      |    8 ++++++
 src/pulsecore/sink.c            |   26 ++++++++++++++++++
 src/pulsecore/sink.h            |    2 ++
 6 files changed, 139 insertions(+), 32 deletions(-)

diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c
index 69d006e..ee0a487 100644
--- a/src/modules/alsa/alsa-sink.c
+++ b/src/modules/alsa/alsa-sink.c
@@ -508,7 +508,7 @@ static size_t check_left_to_play(struct userdata *u, size_t n_bytes, pa_bool_t o
 static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) {
     pa_bool_t work_done = FALSE;
     pa_usec_t max_sleep_usec = 0, process_usec = 0;
-    size_t left_to_play;
+    size_t left_to_play, input_underrun;
     unsigned j = 0;
 
     pa_assert(u);
@@ -600,6 +600,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             const snd_pcm_channel_area_t *areas;
             snd_pcm_uframes_t offset, frames;
             snd_pcm_sframes_t sframes;
+            size_t written;
 
             frames = (snd_pcm_uframes_t) (n_bytes / u->frame_size);
 /*             pa_log_debug("%lu frames to write", (unsigned long) frames); */
@@ -635,7 +636,8 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 
             p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
 
-            chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
+            written = frames * u->frame_size;
+            chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, written, TRUE);
             chunk.length = pa_memblock_get_length(chunk.memblock);
             chunk.index = 0;
 
@@ -655,21 +657,25 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 
             work_done = TRUE;
 
-            u->write_count += frames * u->frame_size;
-            u->since_start += frames * u->frame_size;
+            u->write_count += written;
+            u->since_start += written;
 
 #ifdef DEBUG_TIMING
-            pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) (frames * u->frame_size), (unsigned long) n_bytes);
+            pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) written, (unsigned long) n_bytes);
 #endif
 
-            if ((size_t) frames * u->frame_size >= n_bytes)
+            if (written >= n_bytes)
                 break;
 
-            n_bytes -= (size_t) frames * u->frame_size;
+            n_bytes -= written;
         }
     }
 
+    input_underrun = pa_sink_process_input_underruns(u->sink, left_to_play);
+
     if (u->use_tsched) {
+        pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec);
+
         *sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec);
         process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
 
@@ -677,6 +683,8 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             *sleep_usec -= process_usec;
         else
             *sleep_usec = 0;
+
+        *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep);
     } else
         *sleep_usec = 0;
 
@@ -686,7 +694,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) {
     pa_bool_t work_done = FALSE;
     pa_usec_t max_sleep_usec = 0, process_usec = 0;
-    size_t left_to_play;
+    size_t left_to_play, input_underrun;
     unsigned j = 0;
 
     pa_assert(u);
@@ -710,6 +718,12 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
         }
 
         n_bytes = (size_t) n * u->frame_size;
+
+
+#ifdef DEBUG_TIMING
+        pa_log_debug("avail: %lu", (unsigned long) n_bytes);
+#endif
+
         left_to_play = check_left_to_play(u, n_bytes, on_timeout);
         on_timeout = FALSE;
 
@@ -754,6 +768,7 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
         for (;;) {
             snd_pcm_sframes_t frames;
             void *p;
+            size_t written;
 
 /*         pa_log_debug("%lu frames to write", (unsigned long) frames); */
 
@@ -788,8 +803,9 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             pa_assert(frames > 0);
             after_avail = FALSE;
 
-            u->memchunk.index += (size_t) frames * u->frame_size;
-            u->memchunk.length -= (size_t) frames * u->frame_size;
+            written = frames * u->frame_size;
+            u->memchunk.index += written;
+            u->memchunk.length -= written;
 
             if (u->memchunk.length <= 0) {
                 pa_memblock_unref(u->memchunk.memblock);
@@ -798,19 +814,23 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
 
             work_done = TRUE;
 
-            u->write_count += frames * u->frame_size;
-            u->since_start += frames * u->frame_size;
+            u->write_count += written;
+            u->since_start += written;
 
 /*         pa_log_debug("wrote %lu frames", (unsigned long) frames); */
 
-            if ((size_t) frames * u->frame_size >= n_bytes)
+            if (written >= n_bytes)
                 break;
 
-            n_bytes -= (size_t) frames * u->frame_size;
+            n_bytes -= written;
         }
     }
 
+    input_underrun = pa_sink_process_input_underruns(u->sink, left_to_play);
+
     if (u->use_tsched) {
+        pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec);
+
         *sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec);
         process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
 
@@ -818,6 +838,8 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
             *sleep_usec -= process_usec;
         else
             *sleep_usec = 0;
+
+        *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep);
     } else
         *sleep_usec = 0;
 
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index a4ee920..ae467f4 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -231,6 +231,7 @@ enum {
     CONNECTION_MESSAGE_REVOKE
 };
 
+static bool sink_input_process_underrun_cb(pa_sink_input *i);
 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
 static void sink_input_kill_cb(pa_sink_input *i);
 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
@@ -1171,6 +1172,7 @@ static playback_stream* playback_stream_new(
 
     s->sink_input->parent.process_msg = sink_input_process_msg;
     s->sink_input->pop = sink_input_pop_cb;
+    s->sink_input->process_underrun = sink_input_process_underrun_cb;
     s->sink_input->process_rewind = sink_input_process_rewind_cb;
     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
     s->sink_input->update_max_request = sink_input_update_max_request_cb;
@@ -1573,6 +1575,44 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
 }
 
+
+static bool handle_input_underrun(playback_stream *s, bool force)
+{
+    bool send_drain;
+
+    if (pa_memblockq_is_readable(s->memblockq))
+        return false;
+
+    if (!s->is_underrun)
+        pa_log_debug("%s %s of '%s'", force ? "Actual" : "Implicit",
+            s->drain_request ? "drain" : "underrun", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
+
+    send_drain = s->drain_request && (force || pa_sink_input_safe_to_remove(s->sink_input));
+
+    if (send_drain) {
+         s->drain_request = false;
+         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
+         pa_log_debug("Drain acknowledged of '%s'", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
+    } else if (!s->is_underrun) {
+         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
+    }
+    s->is_underrun = true;
+    playback_stream_request_bytes(s);
+    return true;
+}
+
+/* Called from thread context */
+static bool sink_input_process_underrun_cb(pa_sink_input *i) {
+    playback_stream *s;
+
+    pa_sink_input_assert_ref(i);
+    s = PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+
+    return handle_input_underrun(s, true);
+}
+
+
 /* Called from thread context */
 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
     playback_stream *s;
@@ -1586,22 +1626,8 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
 #endif
 
-    if (pa_memblockq_is_readable(s->memblockq))
-        s->is_underrun = FALSE;
-    else {
-        if (!s->is_underrun)
-            pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
-
-        if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
-            s->drain_request = FALSE;
-            pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
-        } else if (!s->is_underrun)
-            pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
-
-        s->is_underrun = TRUE;
-
-        playback_stream_request_bytes(s);
-    }
+    if (!handle_input_underrun(s, false))
+        s->is_underrun = false;
 
     /* This call will not fail with prebuf=0, hence we check for
        underrun explicitly above */
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 519f47e..cefa645 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -532,6 +532,7 @@ int pa_sink_input_new(
     i->thread_info.rewrite_flush = FALSE;
     i->thread_info.dont_rewind_render = FALSE;
     i->thread_info.underrun_for = (uint64_t) -1;
+    i->thread_info.underrun_for_sink = 0;
     i->thread_info.playing_for = 0;
     i->thread_info.direct_outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
 
@@ -827,7 +828,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i, pa_usec_t *sink_latency) {
 }
 
 /* Called from thread context */
-void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa_memchunk *chunk, pa_cvolume *volume) {
+void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa_memchunk *chunk, pa_cvolume *volume) {
     pa_bool_t do_volume_adj_here, need_volume_factor_sink;
     pa_bool_t volume_is_norm;
     size_t block_size_max_sink, block_size_max_sink_input;
@@ -896,8 +897,10 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
 
             pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE);
             i->thread_info.playing_for = 0;
-            if (i->thread_info.underrun_for != (uint64_t) -1)
+            if (i->thread_info.underrun_for != (uint64_t) -1) {
                 i->thread_info.underrun_for += ilength_full;
+                i->thread_info.underrun_for_sink += slength;
+            }
             break;
         }
 
@@ -907,6 +910,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
         pa_assert(tchunk.memblock);
 
         i->thread_info.underrun_for = 0;
+        i->thread_info.underrun_for_sink = 0;
         i->thread_info.playing_for += tchunk.length;
 
         while (tchunk.length > 0) {
@@ -1020,6 +1024,23 @@ void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec *
 }
 
 /* Called from thread context */
+bool pa_sink_input_process_underrun(pa_sink_input *i) {
+    pa_sink_input_assert_ref(i);
+    pa_sink_input_assert_io_context(i);
+
+    if (pa_memblockq_is_readable(i->thread_info.render_memblockq))
+        return false;
+
+    if (i->process_underrun && i->process_underrun(i)) {
+        /* All valid data has been played back, so we can empty this queue. */
+        pa_memblockq_silence(i->thread_info.render_memblockq);
+        return true;
+    }
+    return false;
+}
+
+
+/* Called from thread context */
 void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
     size_t lbq;
     pa_bool_t called = FALSE;
@@ -1903,6 +1924,7 @@ void pa_sink_input_set_state_within_thread(pa_sink_input *i, pa_sink_input_state
         pa_log_debug("Requesting rewind due to uncorking");
 
         i->thread_info.underrun_for = (uint64_t) -1;
+        i->thread_info.underrun_for_sink = 0;
         i->thread_info.playing_for = 0;
 
         /* Set the uncorked state *before* requesting rewind */
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index 8bd5912..47bdaed 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -135,6 +135,11 @@ struct pa_sink_input {
      * the full block. */
     int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); /* may NOT be NULL */
 
+    /* This is called when the playback buffer has actually played back
+       all available data. Return true unless there is more data to play back.
+       Called from IO context. */
+    bool (*process_underrun) (pa_sink_input *i);
+
     /* Rewind the queue by the specified number of bytes. Called just
      * before peek() if it is called at all. Only called if the sink
      * input driver ever plans to call
@@ -232,6 +237,7 @@ struct pa_sink_input {
         pa_bool_t rewrite_flush:1, dont_rewind_render:1;
         size_t rewrite_nbytes;
         uint64_t underrun_for, playing_for;
+        uint64_t underrun_for_sink; /* Like underrun_for, but in sink sample spec */
 
         pa_sample_spec sample_spec;
 
@@ -407,6 +413,8 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t
 pa_usec_t pa_sink_input_set_requested_latency_within_thread(pa_sink_input *i, pa_usec_t usec);
 
 pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i);
+bool pa_sink_input_process_underrun(pa_sink_input *i);
+
 
 pa_memchunk* pa_sink_input_get_silence(pa_sink_input *i, pa_memchunk *ret);
 
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 6ebe956..718d392 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -915,6 +915,32 @@ void pa_sink_move_all_fail(pa_queue *q) {
     pa_queue_free(q, NULL);
 }
 
+ /* Called from IO thread context */
+size_t pa_sink_process_input_underruns(pa_sink *s, size_t left_to_play) {
+    pa_sink_input *i;
+    void *state = NULL;
+    size_t result = 0;
+
+    pa_sink_assert_ref(s);
+    pa_sink_assert_io_context(s);
+
+    PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state) {
+        size_t uf = i->thread_info.underrun_for_sink;
+        if (uf == 0)
+            continue;
+        if (uf >= left_to_play) {
+            if (pa_sink_input_process_underrun(i))
+                continue;
+        }
+        else if (uf > result)
+            result = uf;
+    }
+
+    if (result > 0)
+        pa_log_debug("Found underrun %ld bytes ago (%ld bytes ahead in playback buffer)", (long) result, (long) left_to_play - result);
+    return left_to_play - result;
+}
+
 /* Called from IO thread context */
 void pa_sink_process_rewind(pa_sink *s, size_t nbytes) {
     pa_sink_input *i;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index fcda5ef..3c0fc39 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -492,6 +492,8 @@ void pa_sink_update_volume_and_mute(pa_sink *s);
 
 pa_bool_t pa_sink_volume_change_apply(pa_sink *s, pa_usec_t *usec_to_next);
 
+size_t pa_sink_process_input_underruns(pa_sink *s, size_t left_to_play);
+
 /*** To be called exclusively by sink input drivers, from IO context */
 
 void pa_sink_request_rewind(pa_sink*s, size_t nbytes);
-- 
1.7.9.5



More information about the pulseaudio-discuss mailing list