[pulseaudio-discuss] [PATCH v2] Second draft implementation of better drain reporting
David Henningsson
david.henningsson at canonical.com
Thu Mar 7 07:40:33 PST 2013
Ok, I went back to the calculations and found what was wrong; the few samples played
back during the time of the actual fillup was not properly handled.
The unix_write part is untested.
Otherwise fixed according to Tanuk's suggestions (hopefully I didn't miss anything).
So what do you think remains before this patch can go in? Anybody who wants to do some
testing?
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
src/modules/alsa/alsa-sink.c | 72 ++++++++++++++++++++++++++++++---------
src/pulsecore/protocol-native.c | 58 ++++++++++++++++++++++---------
src/pulsecore/sink-input.c | 28 +++++++++++++--
src/pulsecore/sink-input.h | 8 +++++
src/pulsecore/sink.c | 23 +++++++++++++
src/pulsecore/sink.h | 2 ++
6 files changed, 157 insertions(+), 34 deletions(-)
diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c
index 69d006e..44ffdaa 100644
--- a/src/modules/alsa/alsa-sink.c
+++ b/src/modules/alsa/alsa-sink.c
@@ -146,6 +146,8 @@ struct userdata {
uint64_t since_start;
pa_usec_t smoother_interval;
pa_usec_t last_smoother_update;
+ int64_t since_fill;
+ int64_t last_avail;
pa_idxset *formats;
@@ -508,7 +510,7 @@ static size_t check_left_to_play(struct userdata *u, size_t n_bytes, pa_bool_t o
static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) {
pa_bool_t work_done = FALSE;
pa_usec_t max_sleep_usec = 0, process_usec = 0;
- size_t left_to_play;
+ size_t left_to_play, input_underrun;
unsigned j = 0;
pa_assert(u);
@@ -535,11 +537,14 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
}
n_bytes = (size_t) n * u->frame_size;
+ u->since_fill += n_bytes - u->last_avail;
#ifdef DEBUG_TIMING
- pa_log_debug("avail: %lu", (unsigned long) n_bytes);
+ pa_log_debug("avail: %lu, since fill: %ld, advance: %ld", (unsigned long) n_bytes,
+ (long) u->since_fill, (long) (n_bytes - u->last_avail));
#endif
+ u->last_avail = n_bytes;
left_to_play = check_left_to_play(u, n_bytes, on_timeout);
on_timeout = FALSE;
@@ -600,6 +605,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
const snd_pcm_channel_area_t *areas;
snd_pcm_uframes_t offset, frames;
snd_pcm_sframes_t sframes;
+ size_t written;
frames = (snd_pcm_uframes_t) (n_bytes / u->frame_size);
/* pa_log_debug("%lu frames to write", (unsigned long) frames); */
@@ -635,12 +641,14 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
- chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
+ written = frames * u->frame_size;
+ chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, written, TRUE);
chunk.length = pa_memblock_get_length(chunk.memblock);
chunk.index = 0;
pa_sink_render_into_full(u->sink, &chunk);
pa_memblock_unref_fixed(chunk.memblock);
+ u->since_fill = 0;
if (PA_UNLIKELY((sframes = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {
@@ -655,21 +663,26 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
work_done = TRUE;
- u->write_count += frames * u->frame_size;
- u->since_start += frames * u->frame_size;
+ u->write_count += written;
+ u->since_start += written;
+ u->last_avail -= written;
#ifdef DEBUG_TIMING
- pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) (frames * u->frame_size), (unsigned long) n_bytes);
+ pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) written, (unsigned long) n_bytes);
#endif
- if ((size_t) frames * u->frame_size >= n_bytes)
+ if (written >= n_bytes)
break;
- n_bytes -= (size_t) frames * u->frame_size;
+ n_bytes -= written;
}
}
+ input_underrun = pa_sink_process_input_underruns(u->sink, u->since_fill, left_to_play);
+
if (u->use_tsched) {
+ pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec);
+
*sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec);
process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
@@ -677,6 +690,8 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
*sleep_usec -= process_usec;
else
*sleep_usec = 0;
+
+ *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep);
} else
*sleep_usec = 0;
@@ -686,7 +701,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) {
pa_bool_t work_done = FALSE;
pa_usec_t max_sleep_usec = 0, process_usec = 0;
- size_t left_to_play;
+ size_t left_to_play, input_underrun;
unsigned j = 0;
pa_assert(u);
@@ -710,6 +725,14 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
}
n_bytes = (size_t) n * u->frame_size;
+ u->since_fill += n_bytes - u->last_avail;
+
+#ifdef DEBUG_TIMING
+ pa_log_debug("avail: %lu, since fill: %ld, advance: %ld", (unsigned long) n_bytes,
+ (long) u->since_fill, (long) (n_bytes - u->last_avail));
+#endif
+
+ u->last_avail = n_bytes;
left_to_play = check_left_to_play(u, n_bytes, on_timeout);
on_timeout = FALSE;
@@ -754,11 +777,14 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
for (;;) {
snd_pcm_sframes_t frames;
void *p;
+ size_t written;
/* pa_log_debug("%lu frames to write", (unsigned long) frames); */
- if (u->memchunk.length <= 0)
+ if (u->memchunk.length <= 0) {
pa_sink_render(u->sink, n_bytes, &u->memchunk);
+ u->since_fill = 0;
+ }
pa_assert(u->memchunk.length > 0);
@@ -788,8 +814,9 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
pa_assert(frames > 0);
after_avail = FALSE;
- u->memchunk.index += (size_t) frames * u->frame_size;
- u->memchunk.length -= (size_t) frames * u->frame_size;
+ written = frames * u->frame_size;
+ u->memchunk.index += written;
+ u->memchunk.length -= written;
if (u->memchunk.length <= 0) {
pa_memblock_unref(u->memchunk.memblock);
@@ -798,19 +825,24 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
work_done = TRUE;
- u->write_count += frames * u->frame_size;
- u->since_start += frames * u->frame_size;
+ u->write_count += written;
+ u->since_start += written;
+ u->last_avail -= written;
/* pa_log_debug("wrote %lu frames", (unsigned long) frames); */
- if ((size_t) frames * u->frame_size >= n_bytes)
+ if (written >= n_bytes)
break;
- n_bytes -= (size_t) frames * u->frame_size;
+ n_bytes -= written;
}
}
+ input_underrun = pa_sink_process_input_underruns(u->sink, u->since_fill, left_to_play);
+
if (u->use_tsched) {
+ pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec);
+
*sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec);
process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
@@ -818,6 +850,8 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle
*sleep_usec -= process_usec;
else
*sleep_usec = 0;
+
+ *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep);
} else
*sleep_usec = 0;
@@ -1099,6 +1133,8 @@ static int unsuspend(struct userdata *u) {
pa_smoother_reset(u->smoother, pa_rtclock_now(), TRUE);
u->smoother_interval = SMOOTHER_MIN_INTERVAL;
u->last_smoother_update = 0;
+ u->since_fill = 0;
+ u->last_avail = u->hwbuf_size;
u->first = TRUE;
u->since_start = 0;
@@ -1663,6 +1699,8 @@ static int process_rewind(struct userdata *u) {
pa_log_info("Tried rewind, but was apparently not possible.");
else {
u->write_count -= rewind_nbytes;
+ u->since_fill -= rewind_nbytes;
+ u->last_avail += rewind_nbytes;
pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
pa_sink_process_rewind(u->sink, rewind_nbytes);
@@ -2311,6 +2349,8 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca
u->frame_size = frame_size;
u->fragment_size = frag_size = (size_t) (period_frames * frame_size);
u->hwbuf_size = buffer_size = (size_t) (buffer_frames * frame_size);
+ u->last_avail = u->hwbuf_size;
+
pa_cvolume_mute(&u->hardware_volume, u->sink->sample_spec.channels);
pa_log_info("Using %0.1f fragments of size %lu bytes (%0.2fms), buffer size is %lu bytes (%0.2fms)",
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index a4ee920..ae467f4 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -231,6 +231,7 @@ enum {
CONNECTION_MESSAGE_REVOKE
};
+static bool sink_input_process_underrun_cb(pa_sink_input *i);
static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
static void sink_input_kill_cb(pa_sink_input *i);
static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
@@ -1171,6 +1172,7 @@ static playback_stream* playback_stream_new(
s->sink_input->parent.process_msg = sink_input_process_msg;
s->sink_input->pop = sink_input_pop_cb;
+ s->sink_input->process_underrun = sink_input_process_underrun_cb;
s->sink_input->process_rewind = sink_input_process_rewind_cb;
s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
s->sink_input->update_max_request = sink_input_update_max_request_cb;
@@ -1573,6 +1575,44 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
}
+
+static bool handle_input_underrun(playback_stream *s, bool force)
+{
+ bool send_drain;
+
+ if (pa_memblockq_is_readable(s->memblockq))
+ return false;
+
+ if (!s->is_underrun)
+ pa_log_debug("%s %s of '%s'", force ? "Actual" : "Implicit",
+ s->drain_request ? "drain" : "underrun", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
+
+ send_drain = s->drain_request && (force || pa_sink_input_safe_to_remove(s->sink_input));
+
+ if (send_drain) {
+ s->drain_request = false;
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
+ pa_log_debug("Drain acknowledged of '%s'", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
+ } else if (!s->is_underrun) {
+ pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
+ }
+ s->is_underrun = true;
+ playback_stream_request_bytes(s);
+ return true;
+}
+
+/* Called from thread context */
+static bool sink_input_process_underrun_cb(pa_sink_input *i) {
+ playback_stream *s;
+
+ pa_sink_input_assert_ref(i);
+ s = PLAYBACK_STREAM(i->userdata);
+ playback_stream_assert_ref(s);
+
+ return handle_input_underrun(s, true);
+}
+
+
/* Called from thread context */
static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
playback_stream *s;
@@ -1586,22 +1626,8 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
#endif
- if (pa_memblockq_is_readable(s->memblockq))
- s->is_underrun = FALSE;
- else {
- if (!s->is_underrun)
- pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
-
- if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
- s->drain_request = FALSE;
- pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
- } else if (!s->is_underrun)
- pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
-
- s->is_underrun = TRUE;
-
- playback_stream_request_bytes(s);
- }
+ if (!handle_input_underrun(s, false))
+ s->is_underrun = false;
/* This call will not fail with prebuf=0, hence we check for
underrun explicitly above */
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 519f47e..621e2a7 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -532,6 +532,7 @@ int pa_sink_input_new(
i->thread_info.rewrite_flush = FALSE;
i->thread_info.dont_rewind_render = FALSE;
i->thread_info.underrun_for = (uint64_t) -1;
+ i->thread_info.underrun_for_sink = 0;
i->thread_info.playing_for = 0;
i->thread_info.direct_outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
@@ -827,7 +828,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i, pa_usec_t *sink_latency) {
}
/* Called from thread context */
-void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa_memchunk *chunk, pa_cvolume *volume) {
+void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa_memchunk *chunk, pa_cvolume *volume) {
pa_bool_t do_volume_adj_here, need_volume_factor_sink;
pa_bool_t volume_is_norm;
size_t block_size_max_sink, block_size_max_sink_input;
@@ -896,8 +897,10 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE);
i->thread_info.playing_for = 0;
- if (i->thread_info.underrun_for != (uint64_t) -1)
+ if (i->thread_info.underrun_for != (uint64_t) -1) {
i->thread_info.underrun_for += ilength_full;
+ i->thread_info.underrun_for_sink += slength;
+ }
break;
}
@@ -907,6 +910,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p
pa_assert(tchunk.memblock);
i->thread_info.underrun_for = 0;
+ i->thread_info.underrun_for_sink = 0;
i->thread_info.playing_for += tchunk.length;
while (tchunk.length > 0) {
@@ -1020,6 +1024,25 @@ void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec *
}
/* Called from thread context */
+bool pa_sink_input_process_underrun(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */) {
+ pa_sink_input_assert_ref(i);
+ pa_sink_input_assert_io_context(i);
+
+ if (nbytes < pa_memblockq_get_maxrewind(i->thread_info.render_memblockq))
+ return false;
+ if (pa_memblockq_is_readable(i->thread_info.render_memblockq))
+ return false;
+
+ if (i->process_underrun && i->process_underrun(i)) {
+ /* All valid data has been played back, so we can empty this queue. */
+ pa_memblockq_silence(i->thread_info.render_memblockq);
+ return true;
+ }
+ return false;
+}
+
+
+/* Called from thread context */
void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
size_t lbq;
pa_bool_t called = FALSE;
@@ -1903,6 +1926,7 @@ void pa_sink_input_set_state_within_thread(pa_sink_input *i, pa_sink_input_state
pa_log_debug("Requesting rewind due to uncorking");
i->thread_info.underrun_for = (uint64_t) -1;
+ i->thread_info.underrun_for_sink = 0;
i->thread_info.playing_for = 0;
/* Set the uncorked state *before* requesting rewind */
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index 8bd5912..5790589 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -135,6 +135,11 @@ struct pa_sink_input {
* the full block. */
int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); /* may NOT be NULL */
+ /* This is called when the playback buffer has actually played back
+ all available data. Return true unless there is more data to play back.
+ Called from IO context. */
+ bool (*process_underrun) (pa_sink_input *i);
+
/* Rewind the queue by the specified number of bytes. Called just
* before peek() if it is called at all. Only called if the sink
* input driver ever plans to call
@@ -232,6 +237,7 @@ struct pa_sink_input {
pa_bool_t rewrite_flush:1, dont_rewind_render:1;
size_t rewrite_nbytes;
uint64_t underrun_for, playing_for;
+ uint64_t underrun_for_sink; /* Like underrun_for, but in sink sample spec */
pa_sample_spec sample_spec;
@@ -407,6 +413,8 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t
pa_usec_t pa_sink_input_set_requested_latency_within_thread(pa_sink_input *i, pa_usec_t usec);
pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i);
+bool pa_sink_input_process_underrun(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */);
+
pa_memchunk* pa_sink_input_get_silence(pa_sink_input *i, pa_memchunk *ret);
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 6ebe956..1d9e296 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -915,6 +915,29 @@ void pa_sink_move_all_fail(pa_queue *q) {
pa_queue_free(q, NULL);
}
+ /* Called from IO thread context */
+size_t pa_sink_process_input_underruns(pa_sink *s, size_t since_fill, size_t left_to_play) {
+ pa_sink_input *i;
+ void *state = NULL;
+ size_t result = 0;
+
+ pa_sink_assert_ref(s);
+ pa_sink_assert_io_context(s);
+ PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state) {
+ size_t uf = i->thread_info.underrun_for_sink;
+ if (uf == 0)
+ continue;
+ if (pa_sink_input_process_underrun(i, uf + since_fill))
+ continue;
+ if (uf < left_to_play && uf > result)
+ result = uf;
+ }
+
+ if (result > 0)
+ pa_log_debug("Found underrun %ld bytes ago (%ld bytes ahead in playback buffer)", (long) result, (long) left_to_play - result);
+ return left_to_play - result;
+}
+
/* Called from IO thread context */
void pa_sink_process_rewind(pa_sink *s, size_t nbytes) {
pa_sink_input *i;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index fcda5ef..0803216 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -492,6 +492,8 @@ void pa_sink_update_volume_and_mute(pa_sink *s);
pa_bool_t pa_sink_volume_change_apply(pa_sink *s, pa_usec_t *usec_to_next);
+size_t pa_sink_process_input_underruns(pa_sink *s, size_t since_fill, size_t left_to_play);
+
/*** To be called exclusively by sink input drivers, from IO context */
void pa_sink_request_rewind(pa_sink*s, size_t nbytes);
--
1.7.9.5
More information about the pulseaudio-discuss
mailing list