[pulseaudio-discuss] [PATCH] "pipe-sink" module timig based upon "null-sink" module mechanics.

Tanu Kaskinen tanuk at iki.fi
Thu Nov 30 18:44:56 UTC 2017


On Tue, 2017-11-14 at 20:22 +0100, Samo Pogačnik wrote:
> Using 'cat /run/user/1000/pulse/fifo_output > some-file' upon the loaded
> "pipe-sink" module:
> ...
> load-module module-pipe-sink sink_name=pipe_sink format=s16le rate=44100 channels=2
> ...,
> caused audio players (totem, rhytmbox) to run right through played music
> files without proper timing. Also, resulting "raw" files weren't correct.
> 
> I tried to fix that. Initial attempts weren't that successfull, until i
> merged some of the mechanics from the "null-sink" module.
> 
> So, here is the patch of my "pipe-sink" module modification.
> 
> Many thanks for considering the proposed change.
> 
> Samo

Thanks for the patch! Since this changes the pipe-sink behaviour rather
drastically, other people using this module might not like this. You
could add a new module argument: "use_system_clock_for_timing". If the
option is not set, the old behaviour would be used.

More comments below:

> ---
>  src/modules/module-pipe-sink.c | 170 ++++++++++++++++++++++++++---------------
>  1 file changed, 108 insertions(+), 62 deletions(-)
> 
> diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c
> index 8396a63..ca600e9 100644
> --- a/src/modules/module-pipe-sink.c
> +++ b/src/modules/module-pipe-sink.c
> @@ -34,6 +34,9 @@
>  #endif
>  
>  #include <pulse/xmalloc.h>
> +#include <pulse/timeval.h>
> +#include <pulse/util.h>
> +#include <pulse/rtclock.h>
>  
>  #include <pulsecore/core-error.h>
>  #include <pulsecore/sink.h>
> @@ -82,6 +85,8 @@ struct userdata {
>      pa_rtpoll_item *rtpoll_item;
>  
>      int write_type;
> +    pa_usec_t block_usec;
> +    pa_usec_t timestamp;
>  };
>  
>  static const char* const valid_modargs[] = {
> @@ -99,20 +104,21 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>      struct userdata *u = PA_SINK(o)->userdata;
>  
>      switch (code) {
> +        case PA_SINK_MESSAGE_SET_STATE:
>  
> -        case PA_SINK_MESSAGE_GET_LATENCY: {
> -            size_t n = 0;
> +            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
> +                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
> +                    u->timestamp = pa_rtclock_now();
> +            }
>  
> -#ifdef FIONREAD
> -            int l;
> +            break;
>  
> -            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
> -                n = (size_t) l;
> -#endif
> +        case PA_SINK_MESSAGE_GET_LATENCY: {
> +            pa_usec_t now;
>  
> -            n += u->memchunk.length;
> +            now = pa_rtclock_now();
> +            *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
>  
> -            *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
>              return 0;
>          }
>      }
> @@ -120,47 +126,93 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>      return pa_sink_process_msg(o, code, data, offset, chunk);
>  }
>  
> -static int process_render(struct userdata *u) {
> +static void sink_update_requested_latency_cb(pa_sink *s) {
> +    struct userdata *u;
> +    size_t nbytes;
> +
> +    pa_sink_assert_ref(s);
> +    pa_assert_se(u = s->userdata);
> +
> +    u->block_usec = pa_sink_get_requested_latency_within_thread(s);
> +
> +    if (u->block_usec == (pa_usec_t) -1)
> +        u->block_usec = s->thread_info.max_latency;
> +
> +    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
> +    pa_sink_set_max_rewind_within_thread(s, nbytes);
> +    pa_sink_set_max_request_within_thread(s, nbytes);
> +}
> +
> +static void process_rewind(struct userdata *u, pa_usec_t now) {
> +    size_t rewind_nbytes, in_buffer;
> +    pa_usec_t delay;
> +
>      pa_assert(u);
>  
> -    if (u->memchunk.length <= 0)
> -        pa_sink_render(u->sink, u->buffer_size, &u->memchunk);
> +    rewind_nbytes = u->sink->thread_info.rewind_nbytes;
>  
> -    pa_assert(u->memchunk.length > 0);
> +    if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewind_nbytes <= 0)
> +        goto do_nothing;
>  
> -    for (;;) {
> +    pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes);
> +
> +    if (u->timestamp <= now)
> +        goto do_nothing;
> +
> +    delay = u->timestamp - now;
> +    in_buffer = pa_usec_to_bytes(delay, &u->sink->sample_spec);
> +
> +    if (in_buffer <= 0)
> +        goto do_nothing;
> +
> +    if (rewind_nbytes > in_buffer)
> +        rewind_nbytes = in_buffer;
> +
> +    pa_sink_process_rewind(u->sink, rewind_nbytes);
> +    u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec);
> +
> +    pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
> +    return;

Since this is copied from the null sink, this doesn't actually do any
real rewinding. Rewinding means discarding previously buffered audio.
If you don't have anything buffered that you could discard, then you
shouldn't pretend to be rewinding.

> +
> +do_nothing:
> +
> +    pa_sink_process_rewind(u->sink, 0);
> +}
> +
> +static void process_render(struct userdata *u, pa_usec_t now) {
> +    size_t ate = 0;
> +
> +    pa_assert(u);
> +
> +    /* This is the configured latency. Sink inputs connected to us
> +    might not have a single frame more than the maxrequest value
> +    queued. Hence: at maximum read this many bytes from the sink
> +    inputs. */
> +
> +    /* Fill the buffer up the latency size */
> +    while (u->timestamp < now + u->block_usec) {
>          ssize_t l;
>          void *p;
>  
> +        pa_sink_render(u->sink, u->sink->thread_info.max_request, &u->memchunk);
> +
> +        pa_assert(u->memchunk.length > 0);
> +
>          p = pa_memblock_acquire(u->memchunk.memblock);
>          l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &u->write_type);
>          pa_memblock_release(u->memchunk.memblock);
>  
>          pa_assert(l != 0);
>  
> -        if (l < 0) {
> -
> -            if (errno == EINTR)
> -                continue;
> -            else if (errno == EAGAIN)
> -                return 0;
> -            else {
> -                pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
> -                return -1;
> -            }

You shouldn't just remove error handling. Also, if pa_write() writes
only part of the rendered audio, you probably shouldn't just drop the
unwritten data, or at least the drop-outs should be logged.

> -
> -        } else {
> +        pa_memblock_unref(u->memchunk.memblock);
>  
> -            u->memchunk.index += (size_t) l;
> -            u->memchunk.length -= (size_t) l;
> +/*         pa_log_debug("Ate %lu bytes.", (unsigned long) chunk.length); */

This comment provides no value.

> +        u->timestamp += pa_bytes_to_usec(u->memchunk.length, &u->sink->sample_spec);
>  
> -            if (u->memchunk.length <= 0) {
> -                pa_memblock_unref(u->memchunk.memblock);
> -                pa_memchunk_reset(&u->memchunk);
> -            }
> -        }
> +        ate += u->memchunk.length;
>  
> -        return 0;
> +        if (ate >= u->sink->thread_info.max_request)
> +            break;
>      }
>  }
>  
> @@ -173,40 +225,33 @@ static void thread_func(void *userdata) {
>  
>      pa_thread_mq_install(&u->thread_mq);
>  
> +    u->timestamp = pa_rtclock_now();
> +
>      for (;;) {
> -        struct pollfd *pollfd;
> +        pa_usec_t now = 0;
>          int ret;
>  
> -        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
> +            now = pa_rtclock_now();
>  
>          if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> -            pa_sink_process_rewind(u->sink, 0);
> +            process_rewind(u, now);
>  
> -        /* Render some data and write it to the fifo */
> +        /* Render some data and drop it immediately */

This isn't the null sink, the data isn't dropped.

>          if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
> -            if (pollfd->revents) {
> -                if (process_render(u) < 0)
> -                    goto fail;
> +            if (u->timestamp <= now)
> +                process_render(u, now);
>  
> -                pollfd->revents = 0;
> -            }
> -        }
> +            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
> +        } else
> +            pa_rtpoll_set_timer_disabled(u->rtpoll);
>  
>          /* Hmm, nothing to do. Let's sleep */
> -        pollfd->events = (short) (u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0);
> -
>          if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
>              goto fail;
>  
>          if (ret == 0)
>              goto finish;
> -
> -        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> -
> -        if (pollfd->revents & ~POLLOUT) {
> -            pa_log("FIFO shutdown.");
> -            goto fail;
> -        }
>      }
>  
>  fail:
> @@ -248,19 +293,16 @@ int pa__init(pa_module *m) {
>      m->userdata = u;
>      pa_memchunk_reset(&u->memchunk);
>      u->rtpoll = pa_rtpoll_new();
> -
> -    if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
> -        pa_log("pa_thread_mq_init() failed.");
> -        goto fail;
> -    }
> -
> +    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);

Why did you remove the error handling?

>      u->write_type = 0;
>  
>      u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
>  
>      if (mkfifo(u->filename, 0666) < 0) {
> +        int errno_save = errno;
>          pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno));
> -        goto fail;
> +        if (errno_save != EEXIST)
> +          goto fail;

This change is not related to the timing changes. If you want to allow
using an already existing file, that should be in a separate patch.

Also, please use 4 spaces for each level of indentation.

>      }
>      if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
>          pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
> @@ -294,7 +336,7 @@ int pa__init(pa_module *m) {
>          goto fail;
>      }
>  
> -    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
> +    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
>      pa_sink_new_data_done(&data);
>  
>      if (!u->sink) {
> @@ -303,14 +345,16 @@ int pa__init(pa_module *m) {
>      }
>  
>      u->sink->parent.process_msg = sink_process_msg;
> +    u->sink->update_requested_latency = sink_update_requested_latency_cb;
>      u->sink->userdata = u;
>  
>      pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
>      pa_sink_set_rtpoll(u->sink, u->rtpoll);
>  
>      u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink->sample_spec);
> +    u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
> +    pa_sink_set_max_rewind(u->sink, u->buffer_size);
>      pa_sink_set_max_request(u->sink, u->buffer_size);
> -    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));
>  
>      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
>      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> @@ -322,6 +366,8 @@ int pa__init(pa_module *m) {
>          goto fail;
>      }
>  
> +    pa_sink_set_latency_range(u->sink, 0, u->block_usec);

Nitpicking: I think this would fit better where the
pa_sink_set_fixed_latency() call used to be.

-- 
Tanu

https://www.patreon.com/tanuk


More information about the pulseaudio-discuss mailing list