[pulseaudio-discuss] [PATCH] "pipe-sink" added option "use_system_clock_for_timing"

Tanu Kaskinen tanuk at iki.fi
Tue Dec 12 00:04:34 UTC 2017


On Sat, 2017-12-09 at 20:09 +0100, Samo Poga─Źnik wrote:
> ---
>  src/modules/module-pipe-sink.c | 236 ++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 221 insertions(+), 15 deletions(-)
> 
> diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c
> index 8396a63..e2bf949 100644
> --- a/src/modules/module-pipe-sink.c
> +++ b/src/modules/module-pipe-sink.c
> @@ -33,7 +33,9 @@
>  #include <sys/filio.h>
>  #endif
>  
> -#include <pulse/xmalloc.h>
> +#include <pulse/timeval.h>
> +#include <pulse/util.h>
> +#include <pulse/rtclock.h>
>  
>  #include <pulsecore/core-error.h>
>  #include <pulsecore/sink.h>
> @@ -59,7 +61,9 @@ PA_MODULE_USAGE(
>          "format=<sample format> "
>          "rate=<sample rate> "
>          "channels=<number of channels> "
> -        "channel_map=<channel map>");
> +        "channel_map=<channel map> "
> +        "use_system_clock_for_timing=<yes or no> "
> +);
>  
>  #define DEFAULT_FILE_NAME "fifo_output"
>  #define DEFAULT_SINK_NAME "fifo_output"
> @@ -76,12 +80,19 @@ struct userdata {
>      char *filename;
>      int fd;
>      size_t buffer_size;
> +    size_t bytes_dropped;
> +    bool log_dropped;
> +    bool pipe_sink_open;
>  
>      pa_memchunk memchunk;
>  
>      pa_rtpoll_item *rtpoll_item;
>  
>      int write_type;
> +    pa_usec_t block_usec;
> +    pa_usec_t timestamp;
> +
> +    bool use_system_clock_for_timing;
>  };
>  
>  static const char* const valid_modargs[] = {
> @@ -92,6 +103,7 @@ static const char* const valid_modargs[] = {
>      "rate",
>      "channels",
>      "channel_map",
> +    "use_system_clock_for_timing",
>      NULL
>  };
>  
> @@ -99,27 +111,148 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>      struct userdata *u = PA_SINK(o)->userdata;
>  
>      switch (code) {
> +        case PA_SINK_MESSAGE_SET_STATE:
> +            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
> +                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
> +                    u->timestamp = pa_rtclock_now();
> +            }
> +            break;
>  
> -        case PA_SINK_MESSAGE_GET_LATENCY: {
> -            size_t n = 0;
> +        case PA_SINK_MESSAGE_GET_LATENCY:
> +            if (u->use_system_clock_for_timing) {
> +                pa_usec_t now;
> +                now = pa_rtclock_now();
> +                *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
> +            } else {
> +                size_t n = 0;
>  
>  #ifdef FIONREAD
> -            int l;
> +                int l;
>  
> -            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
> -                n = (size_t) l;
> +                if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
> +                    n = (size_t) l;
>  #endif
>  
> -            n += u->memchunk.length;
> +                n += u->memchunk.length;
>  
> -            *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
> +                *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
> +            }
>              return 0;
> -        }
>      }
>  
>      return pa_sink_process_msg(o, code, data, offset, chunk);
>  }
>  
> +static void sink_update_requested_latency_cb(pa_sink *s) {
> +    struct userdata *u;
> +    size_t nbytes;
> +
> +    pa_sink_assert_ref(s);
> +    pa_assert_se(u = s->userdata);
> +
> +    u->block_usec = pa_sink_get_requested_latency_within_thread(s);
> +
> +    if (u->block_usec == (pa_usec_t) -1)
> +        u->block_usec = s->thread_info.max_latency;
> +
> +    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
> +    pa_sink_set_max_rewind_within_thread(s, nbytes);

The sink doesn't support rewinding, so you shouldn't set max_rewind.

> +    pa_sink_set_max_request_within_thread(s, nbytes);
> +}
> +
> +static size_t pipe_sink_write(struct userdata *u, void *p) {

p comes from u->memchunk, so it seems weird to me to pass it as a
parameter when it's available via the userdata. You probably did this
way because you call pa_memblock_acquire() before calling this
function, but I'd prefer doing the pa_memblock_acquire() and
pa_memblock_release() calls in this function.

I would also like to use a stack-allocated pa_memchunk instead of
u->memchunk, because using u->memchunk gives the impression that
u->memchunk carries some state that needs to be preserved, while in
reality u->memchunk is reset every time process_render_use_timing() is
called and it's not used outside process_render_use_timing() or
pipe_sink_write().

> +    ssize_t l;
> +    size_t index, length;
> +
> +    pa_assert(u);
> +
> +    index = u->memchunk.index;
> +    length = u->memchunk.length;
> +
> +    for (;;) {
> +        l = pa_write(u->fd, (uint8_t*) p + index, length, &u->write_type);
> +
> +        pa_assert(l != 0);
> +
> +        if (l < 0) {
> +            if (errno == EAGAIN)
> +                break;
> +            else if (errno != EINTR) {
> +                pa_log("Failed to write data to FIFO (%d): %s", errno, pa_cstrerror(errno));
> +                break;

The caller is not notified about the error, so if there's some problem
with the FIFO, we'll keep try writing to it and the log will be spammed
with error messages.

The module should at least stop writing until it's suspended and
started again, or maybe it should even unload itself.

> +            }
> +        } else {
> +            index += (size_t) l;
> +            length -= (size_t) l;
> +
> +            if (length <= 0) {
> +                break;
> +            }
> +        }
> +    }
> +
> +    return index;

You seem to assume that u->memchunk.index is always zero. The memchunk
returned by pa_sink_render() may contain any index.

> +}
> +
> +static void process_render_use_timing(struct userdata *u, pa_usec_t now) {
> +    size_t dropped = 0;
> +    size_t consumed = 0;
> +
> +    pa_assert(u);
> +
> +    if (!u->pipe_sink_open) {
> +        /* Clear bytes_dropped statistics. */
> +        pa_log_debug("pipe-sink cleared dropped bytes statistics (sum: %zu -> 0 dropped bytes)", u->bytes_dropped);
> +        u->bytes_dropped = 0;
> +        u->log_dropped = true;
> +        u->pipe_sink_open = true;

I think this would fit better in the PA_SINK_MESSAGE_SET_STATE handler.
u->pipe_sink_open won't be needed then.

> +    }
> +
> +    /* This is the configured latency. Sink inputs connected to us
> +    might not have a single frame more than the maxrequest value
> +    queued. Hence: at maximum read this many bytes from the sink
> +    inputs. */
> +
> +    /* Fill the buffer up the latency size */
> +    while (u->timestamp < now + u->block_usec) {
> +        void *p;
> +        size_t written = 0;
> +
> +        pa_sink_render(u->sink, u->sink->thread_info.max_request, &u->memchunk);
> +
> +        pa_assert(u->memchunk.length > 0);
> +
> +        p = pa_memblock_acquire(u->memchunk.memblock);
> +
> +        written = pipe_sink_write(u, p);
> +
> +        pa_memblock_release(u->memchunk.memblock);
> +
> +        pa_memblock_unref(u->memchunk.memblock);
> +
> +        u->timestamp += pa_bytes_to_usec(u->memchunk.length, &u->sink->sample_spec);
> +
> +        dropped += (u->memchunk.length - written);
> +        consumed += u->memchunk.length;
> +
> +        if (consumed >= u->sink->thread_info.max_request)
> +            break;
> +    }
> +
> +    u->bytes_dropped += dropped;
> +
> +    if (dropped == 0) {
> +        if (!u->log_dropped)
> +            pa_log_debug("pipe-sink stopped dropping (sum: %zu dropped bytes)", u->bytes_dropped);
> +        u->log_dropped = true;
> +    }
> +
> +    if ((dropped != 0) && u->log_dropped) {
> +        pa_log_debug("pipe-sink just dropped %zu bytes (sum: %zu dropped bytes)", dropped, u->bytes_dropped);
> +        u->log_dropped = false;
> +    }

I think this logging code belongs inside the loop where
pipe_sink_write() is called. Consider the (unlikely) case where two
subsequent pipe_sink_write() calls write the audio partially: in that
case there are two separate drops, but your current code treats them as
one continuous drop.

You seem to aggregate all drops in u->bytes_dropped for the duration
when the sink is open, but I think it would be more useful to count
only the bytes of one continuos drop. Now it's not known how long the
individual holes in the stream are.

> +}
> +
>  static int process_render(struct userdata *u) {
>      pa_assert(u);
>  
> @@ -164,6 +297,56 @@ static int process_render(struct userdata *u) {
>      }
>  }
>  
> +static void thread_func_use_timing(void *userdata) {
> +    struct userdata *u = userdata;
> +
> +    pa_assert(u);
> +
> +    pa_log_debug("Thread (use timing) starting up");
> +
> +    pa_thread_mq_install(&u->thread_mq);
> +
> +    u->timestamp = pa_rtclock_now();
> +
> +    for (;;) {
> +        pa_usec_t now = 0;
> +        int ret;
> +
> +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
> +            now = pa_rtclock_now();
> +
> +        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> +            pa_sink_process_rewind(u->sink, 0);
> +
> +        /* Render some data and write it to the fifo */
> +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
> +            if (u->timestamp <= now)
> +                process_render_use_timing(u, now);
> +
> +            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
> +        } else {
> +            pa_rtpoll_set_timer_disabled(u->rtpoll);
> +            u->pipe_sink_open = false;
> +        }
> +
> +        /* Hmm, nothing to do. Let's sleep */
> +        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
> +            goto fail;
> +
> +        if (ret == 0)
> +            goto finish;
> +    }
> +
> +fail:
> +    /* If this was no regular exit from the loop we have to continue
> +     * processing messages until we received PA_MESSAGE_SHUTDOWN */
> +    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
> +    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
> +
> +finish:
> +    pa_log_debug("Thread (use timing) shutting down");
> +}
> +
>  static void thread_func(void *userdata) {
>      struct userdata *u = userdata;
>  
> @@ -227,6 +410,7 @@ int pa__init(pa_module *m) {
>      pa_modargs *ma;
>      struct pollfd *pollfd;
>      pa_sink_new_data data;
> +    pa_thread_func_t thread_routine;
>  
>      pa_assert(m);
>  
> @@ -249,6 +433,11 @@ int pa__init(pa_module *m) {
>      pa_memchunk_reset(&u->memchunk);
>      u->rtpoll = pa_rtpoll_new();
>  
> +    if (pa_modargs_get_value_boolean(ma, "use_system_clock_for_timing", &u->use_system_clock_for_timing) < 0) {
> +        pa_log("Failed to parse use_system_clock_for_timing argument.");
> +        goto fail;
> +    }
> +
>      if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
>          pa_log("pa_thread_mq_init() failed.");
>          goto fail;
> @@ -294,7 +483,10 @@ int pa__init(pa_module *m) {
>          goto fail;
>      }
>  
> -    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
> +    if (u->use_system_clock_for_timing)
> +        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
> +    else
> +        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
>      pa_sink_new_data_done(&data);
>  
>      if (!u->sink) {
> @@ -303,21 +495,34 @@ int pa__init(pa_module *m) {
>      }
>  
>      u->sink->parent.process_msg = sink_process_msg;
> +    if (u->use_system_clock_for_timing)
> +        u->sink->update_requested_latency = sink_update_requested_latency_cb;
>      u->sink->userdata = u;
>  
>      pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
>      pa_sink_set_rtpoll(u->sink, u->rtpoll);
>  
> +    u->bytes_dropped = 0;
> +    u->log_dropped = true;
> +    u->pipe_sink_open = false;
>      u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink->sample_spec);
> +    if (u->use_system_clock_for_timing) {
> +        u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
> +        pa_sink_set_latency_range(u->sink, 0, u->block_usec);
> +        pa_sink_set_max_rewind(u->sink, u->buffer_size);

max_rewind shouldn't be set.

> +        thread_routine = thread_func_use_timing;
> +    } else {
> +        pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));
> +        thread_routine = thread_func;
> +    }
>      pa_sink_set_max_request(u->sink, u->buffer_size);
> -    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));
>  
>      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
>      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
>      pollfd->fd = u->fd;
>      pollfd->events = pollfd->revents = 0;
>  
> -    if (!(u->thread = pa_thread_new("pipe-sink", thread_func, u))) {
> +    if (!(u->thread = pa_thread_new("pipe-sink", thread_routine, u))) {
>          pa_log("Failed to create thread.");
>          goto fail;
>      }
> @@ -367,8 +572,9 @@ void pa__done(pa_module *m) {
>      if (u->sink)
>          pa_sink_unref(u->sink);
>  
> -    if (u->memchunk.memblock)
> -        pa_memblock_unref(u->memchunk.memblock);
> +    if (!u->use_system_clock_for_timing)
> +        if (u->memchunk.memblock)
> +            pa_memblock_unref(u->memchunk.memblock);
>  
>      if (u->rtpoll_item)
>          pa_rtpoll_item_free(u->rtpoll_item);
-- 
Tanu

https://www.patreon.com/tanuk


More information about the pulseaudio-discuss mailing list