[pulseaudio-discuss] [PATCH] "pipe-sink" module timig based upon "null-sink" module mechanics.

Samo Pogačnik samo_pogacnik at t-2.net
Fri Dec 1 18:22:38 UTC 2017


Dne 30.11.2017 (čet) ob 20:44 +0200 je Tanu Kaskinen napisal(a):
> On Tue, 2017-11-14 at 20:22 +0100, Samo Pogačnik wrote:
> > 
> > Using 'cat /run/user/1000/pulse/fifo_output > some-file' upon the
> > loaded
> > "pipe-sink" module:
> > ...
> > load-module module-pipe-sink sink_name=pipe_sink format=s16le
> > rate=44100 channels=2
> > ...,
> > caused audio players (totem, rhytmbox) to run right through played
> > music
> > files without proper timing. Also, resulting "raw" files weren't
> > correct.
> > 
> > I tried to fix that. Initial attempts weren't that successfull,
> > unUsing 'cat /run/user/1000/pulse/fifo_output > some-file' upon
> > the
> > loaded
> > "pipe-sink" module:
> > ...
> > load-module module-pipe-sink sink_name=pipe_sink format=s16le
> > rate=44100 channels=2
> > ...,
> > caused audio players (totem, rhytmbox) to run right through played
> > music
> > files without proper timing. Also, resulting "raw" files weren't
> > correct.
> > 
> > I tried to fix that. Initial attempts weren't that successfull,
> > until i
> > merged some of the mechanics from the "null-sink" module.
> > 
> > So, here is the patch of my "pipe-sink" module modification.
> > 
> > Many thanks for considering the proposed change.
> > 
> > Samo
> Thanks for the patch! Since this changes the pipe-sink behaviour
> rather
> drastically, other people using this module might not like this. You
> could add a new module argument: "use_system_clock_for_timing". If
> the
> option is not set, the old behaviour would be used.
> 
> More comments below:
> 
Thank you for your comments.

Regarding your initial observation about the change, would it be
sensible to make a separate "pipe-sink" module (i.e. pipe-timed-sink)
or do you prefer a single module with additional option?

Anyway i'll try to follow your comments about the code as i can and
provide another patch in some time.

I have some additional remarks below:
> > 
> > ---
> >  src/modules/module-pipe-sink.c | 170 ++++++++++++++++++++++++++---
> > ------------
> >  1 file changed, 108 insertions(+), 62 deletions(-)
> > 
> > diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-
> > pipe-sink.c
> > index 8396a63..ca600e9 100644
> > --- a/src/modules/module-pipe-sink.c
> > +++ b/src/modules/module-pipe-sink.c
> > @@ -34,6 +34,9 @@
> >  #endif
> >  
> >  #include <pulse/xmalloc.h>
> > +#include <pulse/timeval.h>
> > +#include <pulse/util.h>
> > +#include <pulse/rtclock.h>
> >  
> >  #include <pulsecore/core-error.h>
> >  #include <pulsecore/sink.h>
> > @@ -82,6 +85,8 @@ struct userdata {
> >      pa_rtpoll_item *rtpoll_item;
> >  
> >      int write_type;
> > +    pa_usec_t block_usec;
> > +    pa_usec_t timestamp;
> >  };
> >  
> >  static const char* const valid_modargs[] = {
> > @@ -99,20 +104,21 @@ static int sink_process_msg(pa_msgobject *o,
> > int code, void *data, int64_t offse
> >      struct userdata *u = PA_SINK(o)->userdata;
> >  
> >      switch (code) {
> > +        case PA_SINK_MESSAGE_SET_STATE:
> >  
> > -        case PA_SINK_MESSAGE_GET_LATENCY: {
> > -            size_t n = 0;
> > +            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED ||
> > pa_sink_get_state(u->sink) == PA_SINK_INIT) {
> > +                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING ||
> > PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
> > +                    u->timestamp = pa_rtclock_now();
> > +            }
> >  
> > -#ifdef FIONREAD
> > -            int l;
> > +            break;
> >  
> > -            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
> > -                n = (size_t) l;
> > -#endif
> > +        case PA_SINK_MESSAGE_GET_LATENCY: {
> > +            pa_usec_t now;
> >  
> > -            n += u->memchunk.length;
> > +            now = pa_rtclock_now();
> > +            *((int64_t*) data) = (int64_t)u->timestamp -
> > (int64_t)now;
> >  
> > -            *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink-
> > >sample_spec);
> >              return 0;
> >          }
> >      }
> > @@ -120,47 +126,93 @@ static int sink_process_msg(pa_msgobject *o,
> > int code, void *data, int64_t offse
> >      return pa_sink_process_msg(o, code, data, offset, chunk);
> >  }
> >  
> > -static int process_render(struct userdata *u) {
> > +static void sink_update_requested_latency_cb(pa_sink *s) {
> > +    struct userdata *u;
> > +    size_t nbytes;
> > +
> > +    pa_sink_assert_ref(s);
> > +    pa_assert_se(u = s->userdata);
> > +
> > +    u->block_usec =
> > pa_sink_get_requested_latency_within_thread(s);
> > +
> > +    if (u->block_usec == (pa_usec_t) -1)
> > +        u->block_usec = s->thread_info.max_latency;
> > +
> > +    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
> > +    pa_sink_set_max_rewind_within_thread(s, nbytes);
> > +    pa_sink_set_max_request_within_thread(s, nbytes);
> > +}
> > +
> > +static void process_rewind(struct userdata *u, pa_usec_t now) {
> > +    size_t rewind_nbytes, in_buffer;
> > +    pa_usec_t delay;
> > +
> >      pa_assert(u);
> >  
> > -    if (u->memchunk.length <= 0)
> > -        pa_sink_render(u->sink, u->buffer_size, &u->memchunk);
> > +    rewind_nbytes = u->sink->thread_info.rewind_nbytes;
> >  
> > -    pa_assert(u->memchunk.length > 0);
> > +    if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) ||
> > rewind_nbytes <= 0)
> > +        goto do_nothing;
> >  
> > -    for (;;) {
> > +    pa_log_debug("Requested to rewind %lu bytes.", (unsigned long)
> > rewind_nbytes);
> > +
> > +    if (u->timestamp <= now)
> > +        goto do_nothing;
> > +
> > +    delay = u->timestamp - now;
> > +    in_buffer = pa_usec_to_bytes(delay, &u->sink->sample_spec);
> > +
> > +    if (in_buffer <= 0)
> > +        goto do_nothing;
> > +
> > +    if (rewind_nbytes > in_buffer)
> > +        rewind_nbytes = in_buffer;
> > +
> > +    pa_sink_process_rewind(u->sink, rewind_nbytes);
> > +    u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink-
> > >sample_spec);
> > +
> > +    pa_log_debug("Rewound %lu bytes.", (unsigned long)
> > rewind_nbytes);
> > +    return;
> Since this is copied from the null sink, this doesn't actually do any
> real rewinding. Rewinding means discarding previously buffered audio.
> If you don't have anything buffered that you could discard, then you
> shouldn't pretend to be rewinding.
> 
Ok, i'll have to better understand this "null-sink" mechanics.
> > 
> > +
> > +do_nothing:
> > +
> > +    pa_sink_process_rewind(u->sink, 0);
> > +}
> > +
> > +static void process_render(struct userdata *u, pa_usec_t now) {
> > +    size_t ate = 0;
> > +
> > +    pa_assert(u);
> > +
> > +    /* This is the configured latency. Sink inputs connected to us
> > +    might not have a single frame more than the maxrequest value
> > +    queued. Hence: at maximum read this many bytes from the sink
> > +    inputs. */
> > +
> > +    /* Fill the buffer up the latency size */
> > +    while (u->timestamp < now + u->block_usec) {
> >          ssize_t l;
> >          void *p;
> >  
> > +        pa_sink_render(u->sink, u->sink->thread_info.max_request,
> > &u->memchunk);
> > +
> > +        pa_assert(u->memchunk.length > 0);
> > +
> >          p = pa_memblock_acquire(u->memchunk.memblock);
> >          l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u-
> > >memchunk.length, &u->write_type);
> >          pa_memblock_release(u->memchunk.memblock);
> >  
> >          pa_assert(l != 0);
> >  
> > -        if (l < 0) {
> > -
> > -            if (errno == EINTR)
> > -                continue;
> > -            else if (errno == EAGAIN)
> > -                return 0;
> > -            else {
> > -                pa_log("Faied to write data to FIFO: %s",
> > pa_cstrerror(errno));
> > -                return -1;
> > -            }
> You shouldn't just remove error handling. Also, if pa_write() writes
> only part of the rendered audio, you probably shouldn't just drop the
> unwritten data, or at least the drop-outs should be logged.
> 
I agree about logging about drop-outs in some limited way, since that
may be a constant situation when the pipe is full and there is no
reader.
Regarding this topic, i also have a test implementation of self-
draining the pipe before each first write into the pipe after entering
the "running" state. If there is no real reader, each "start play"
flushes all potentially old data first. What is your opinion about
that?
> > 
> > -
> > -        } else {
> > +        pa_memblock_unref(u->memchunk.memblock);
> >  
> > -            u->memchunk.index += (size_t) l;
> > -            u->memchunk.length -= (size_t) l;
> > +/*         pa_log_debug("Ate %lu bytes.", (unsigned long)
> > chunk.length); */
> This comment provides no value.
> 
Ok.
> > 
> > +        u->timestamp += pa_bytes_to_usec(u->memchunk.length, &u-
> > >sink->sample_spec);
> >  
> > -            if (u->memchunk.length <= 0) {
> > -                pa_memblock_unref(u->memchunk.memblock);
> > -                pa_memchunk_reset(&u->memchunk);
> > -            }
> > -        }
> > +        ate += u->memchunk.length;
> >  
> > -        return 0;
> > +        if (ate >= u->sink->thread_info.max_request)
> > +            break;
> >      }
> >  }
> >  
> > @@ -173,40 +225,33 @@ static void thread_func(void *userdata) {
> >  
> >      pa_thread_mq_install(&u->thread_mq);
> >  
> > +    u->timestamp = pa_rtclock_now();
> > +
> >      for (;;) {
> > -        struct pollfd *pollfd;
> > +        pa_usec_t now = 0;
> >          int ret;
> >  
> > -        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> > +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
> > +            now = pa_rtclock_now();
> >  
> >          if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> > -            pa_sink_process_rewind(u->sink, 0);
> > +            process_rewind(u, now);
> >  
> > -        /* Render some data and write it to the fifo */
> > +        /* Render some data and drop it immediately */
> This isn't the null sink, the data isn't dropped.
> 
Yp.
> > 
> >          if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
> > -            if (pollfd->revents) {
> > -                if (process_render(u) < 0)
> > -                    goto fail;
> > +            if (u->timestamp <= now)
> > +                process_render(u, now);
> >  
> > -                pollfd->revents = 0;
> > -            }
> > -        }
> > +            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
> > +        } else
> > +            pa_rtpoll_set_timer_disabled(u->rtpoll);
> >  
> >          /* Hmm, nothing to do. Let's sleep */
> > -        pollfd->events = (short) (u->sink->thread_info.state ==
> > PA_SINK_RUNNING ? POLLOUT : 0);
> > -
> >          if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
> >              goto fail;
> >  
> >          if (ret == 0)
> >              goto finish;
> > -
> > -        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> > -
> > -        if (pollfd->revents & ~POLLOUT) {
> > -            pa_log("FIFO shutdown.");
> > -            goto fail;
> > -        }
> >      }
> >  
> >  fail:
> > @@ -248,19 +293,16 @@ int pa__init(pa_module *m) {
> >      m->userdata = u;
> >      pa_memchunk_reset(&u->memchunk);
> >      u->rtpoll = pa_rtpoll_new();
> > -
> > -    if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u-
> > >rtpoll) < 0) {
> > -        pa_log("pa_thread_mq_init() failed.");
> > -        goto fail;
> > -    }
> > -
> > +    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u-
> > >rtpoll);
> Why did you remove the error handling?
> 
I removed this check by mistake, while testing on an older version:(
> > 
> >      u->write_type = 0;
> >  
> >      u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file",
> > DEFAULT_FILE_NAME));
> >  
> >      if (mkfifo(u->filename, 0666) < 0) {
> > +        int errno_save = errno;
> >          pa_log("mkfifo('%s'): %s", u->filename,
> > pa_cstrerror(errno));
> > -        goto fail;
> > +        if (errno_save != EEXIST)
> > +          goto fail;
> This change is not related to the timing changes. If you want to
> allow
> using an already existing file, that should be in a separate patch.
> 
Ok.
> Also, please use 4 spaces for each level of indentation.
> 
Sure.
> > 
> >      }
> >      if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
> >          pa_log("open('%s'): %s", u->filename,
> > pa_cstrerror(errno));
> > @@ -294,7 +336,7 @@ int pa__init(pa_module *m) {
> >          goto fail;
> >      }
> >  
> > -    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
> > +    u->sink = pa_sink_new(m->core, &data,
> > PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
> >      pa_sink_new_data_done(&data);
> >  
> >      if (!u->sink) {
> > @@ -303,14 +345,16 @@ int pa__init(pa_module *m) {
> >      }
> >  
> >      u->sink->parent.process_msg = sink_process_msg;
> > +    u->sink->update_requested_latency =
> > sink_update_requested_latency_cb;
> >      u->sink->userdata = u;
> >  
> >      pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
> >      pa_sink_set_rtpoll(u->sink, u->rtpoll);
> >  
> >      u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink-
> > >sample_spec);
> > +    u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink-
> > >sample_spec);
> > +    pa_sink_set_max_rewind(u->sink, u->buffer_size);
> >      pa_sink_set_max_request(u->sink, u->buffer_size);
> > -    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u-
> > >buffer_size, &u->sink->sample_spec));
> >  
> >      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll,
> > PA_RTPOLL_NEVER, 1);
> >      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> > @@ -322,6 +366,8 @@ int pa__init(pa_module *m) {
> >          goto fail;
> >      }
> >  
> > +    pa_sink_set_latency_range(u->sink, 0, u->block_usec);
> Nitpicking: I think this would fit better where the
> pa_sink_set_fixed_latency() call used to be.
> 
Ok.

--
regards, Samo



More information about the pulseaudio-discuss mailing list