[pulseaudio-discuss] [PATCH] "pipe-sink" added option "use_system_clock_for_timing"

Samo Pogačnik samo_pogacnik at t-2.net
Sat Dec 16 15:57:40 UTC 2017


Dne 15.12.2017 (pet) ob 18:22 +0100 je Samo Pogačnik napisal(a):
> Dne 15.12.2017 (pet) ob 04:01 +0200 je Tanu Kaskinen napisal(a):
> > 
> > On Wed, 2017-12-13 at 22:14 +0100, Samo Pogačnik wrote:
> > > 
> > > 
> > > Thank you for the remarks. I tried to correct all issues and
> > > below is a new patch.
> > Thanks. I recommend you send patches with "git send-email" in the
> > future. That way the formatting will always be right (I had some
> > trouble getting git accept the patch), the patch will have a commit
> > message and the author and date will be correct.
> Sorry for the trouble, i will improve my git skills.

Below is another patch.

> 
> > 
> > 
> > > 
> > > 
> > > +static int process_render_use_timing(struct userdata *u,
> > > pa_usec_t
> > > now) {
> > > +    int ret = 0;
> > > +    size_t dropped = 0;
> > > +    size_t consumed = 0;
> > > +
> > > +    pa_assert(u);
> > > +
> > > +    /* This is the configured latency. Sink inputs connected to
> > > us
> > > +    might not have a single frame more than the maxrequest value
> > > +    queued. Hence: at maximum read this many bytes from the sink
> > > +    inputs. */
> > I could have complained about this earlier: this comment is a bit
> > hard
> > to understand. What does "this" refer to? (Apparently to
> > max_request
> > or
> > block_usec.) Either remove the comment or rewrite it so that it's
> > easy
> > to understand.
> I though this must be good comment, since it comes from a sample
> module. However, i fully accept your remark and i would just remove
> the
> comment for now. Do you agree? 

The comment is removed.

> 
> > 
> > 
> > > 
> > > 
> > > +
> > > +    /* Fill the buffer up the latency size */
> > > +    while (u->timestamp < now + u->block_usec) {
> > > +        ssize_t written = 0;
> > > +        pa_memchunk chunk;
> > > +
> > > +        pa_sink_render(u->sink, u->sink-
> > > >thread_info.max_request,
> > > &chunk);
> > > +
> > > +        pa_assert(chunk.length > 0);
> > > +
> > > +        if ((written = pipe_sink_write(u, &chunk)) < 0) {
> > > +            written = -1 - written;
> > > +            ret = -1;
> > > +        }
> > Why don't you return immediately here? It doesn't make sense to me
> > to
> > continue after writing has failed.
> > 
> > If you return immediately, pipe_sink_write() can then simply return
> > -1.
> Here i tried to be as accurate as possible regarding the drops
> counter,
> but if we agree, that drop counting becomes irrelevant upon real fifo
> error, then i would not bother returning number of potentially
> written
> bytes within pipe_sink_write() before error condition occurred. What
> do
> you say?

In this patch i left current handling of written bytes even in error
condition and added "fifo_error" flag to prevent log polution during
error condition and to prevent immediate unloading of the module upon
fifo error.
Counting of dropped bytes continues normally during error condition.
On entering suspended state, error flag is reset to get another error
message upon leaving suspend afterwards, if error condition persists.

>  
> > 
> > 
> > > 
> > > 
> > > +
> > > +        pa_memblock_unref(chunk.memblock);
> > > +
> > > +        u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink-
> > > > 
> > > > sample_spec);
> > > +
> > > +        dropped = chunk.length - written;
> > > +
> > > +        if (dropped != 0) {
> > > +            if (u->bytes_dropped == 0) {
> > > +                pa_log_debug("Pipe-sink just dropped %zu bytes",
> > > dropped);
> > > +                u->bytes_dropped = dropped;
> > > +            } else
> > > +                u->bytes_dropped += dropped;
> > > +        } else {
> > > +            if (u->bytes_dropped != 0) {
> > > +                pa_log_debug("Pipe-sink continuously dropped %zu
> > > bytes", u->bytes_dropped);
> > > +                u->bytes_dropped = 0;
> > > +            }
> > > +        }
> > This still doesn't handle the case of two subsequent partial writes
> > correctly. The code treats that case as one continuous drop, but
> > since
> > the second write writes something, it's not a continuous drop.
> > 
> > I think the logic should be something like this:
> > 
> > if (u->bytes_dropped != 0 && dropped != chunk.length) {
> >     pa_log_debug("Pipe-sink continuously dropped %zu bytes", u-
> > > 
> > > bytes_dropped);
> >     u->bytes_dropped = 0;
> > }
> > 
> > if (u->bytes_dropped == 0 && dropped != 0)
> >     pa_log_debug("Pipe-sink just dropped %zu bytes", dropped);
> > 
> > u->bytes_dropped += dropped;
> > 
> Silly me. Thank you for the suggestion.

Used your suggestion, thanks.

regards, Samo
---
 src/modules/module-pipe-sink.c | 236 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 224 insertions(+), 12 deletions(-)

diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c
index 64ef807..733edde 100644
--- a/src/modules/module-pipe-sink.c
+++ b/src/modules/module-pipe-sink.c
@@ -34,6 +34,9 @@
 #endif
 
 #include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
+#include <pulse/util.h>
+#include <pulse/rtclock.h>
 
 #include <pulsecore/core-error.h>
 #include <pulsecore/sink.h>
@@ -57,7 +60,9 @@ PA_MODULE_USAGE(
         "format=<sample format> "
         "rate=<sample rate> "
         "channels=<number of channels> "
-        "channel_map=<channel map>");
+        "channel_map=<channel map> "
+        "use_system_clock_for_timing=<yes or no> "
+);
 
 #define DEFAULT_FILE_NAME "fifo_output"
 #define DEFAULT_SINK_NAME "fifo_output"
@@ -74,12 +79,18 @@ struct userdata {
     char *filename;
     int fd;
     size_t buffer_size;
+    size_t bytes_dropped;
+    bool fifo_error;
 
     pa_memchunk memchunk;
 
     pa_rtpoll_item *rtpoll_item;
 
     int write_type;
+    pa_usec_t block_usec;
+    pa_usec_t timestamp;
+
+    bool use_system_clock_for_timing;
 };
 
 static const char* const valid_modargs[] = {
@@ -90,6 +101,7 @@ static const char* const valid_modargs[] = {
     "rate",
     "channels",
     "channel_map",
+    "use_system_clock_for_timing",
     NULL
 };
 
@@ -97,27 +109,158 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
+        case PA_SINK_MESSAGE_SET_STATE:
+            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
+                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
+                    u->timestamp = pa_rtclock_now();
+            } else if (pa_sink_get_state(u->sink) == PA_SINK_RUNNING || pa_sink_get_state(u->sink) == PA_SINK_IDLE) {
+                if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) {
+                    /* Clear potential FIFO error flag */
+                    u->fifo_error = false;
+
+                    /* Continuously dropping data (clear counter on entering suspended state. */
+                    if (u->bytes_dropped != 0) {
+                        pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped);
+                        u->bytes_dropped = 0;
+                    }
+                }
+            }
+            break;
 
-        case PA_SINK_MESSAGE_GET_LATENCY: {
-            size_t n = 0;
+        case PA_SINK_MESSAGE_GET_LATENCY:
+            if (u->use_system_clock_for_timing) {
+                pa_usec_t now;
+                now = pa_rtclock_now();
+                *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
+            } else {
+                size_t n = 0;
 
 #ifdef FIONREAD
-            int l;
+                int l;
 
-            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
-                n = (size_t) l;
+                if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
+                    n = (size_t) l;
 #endif
 
-            n += u->memchunk.length;
+                n += u->memchunk.length;
 
-            *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
+                *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
+            }
             return 0;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
+static void sink_update_requested_latency_cb(pa_sink *s) {
+    struct userdata *u;
+    size_t nbytes;
+
+    pa_sink_assert_ref(s);
+    pa_assert_se(u = s->userdata);
+
+    u->block_usec = pa_sink_get_requested_latency_within_thread(s);
+
+    if (u->block_usec == (pa_usec_t) -1)
+        u->block_usec = s->thread_info.max_latency;
+
+    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
+    pa_sink_set_max_request_within_thread(s, nbytes);
+}
+
+static ssize_t pipe_sink_write(struct userdata *u, pa_memchunk *pchunk) {
+    size_t index, length;
+    ssize_t count = 0;
+    void *p;
+
+    pa_assert(u);
+    pa_assert(pchunk);
+
+    index = pchunk->index;
+    length = pchunk->length;
+    p = pa_memblock_acquire(pchunk->memblock);
+
+    for (;;) {
+        ssize_t l;
+
+        l = pa_write(u->fd, (uint8_t*) p + index, length, &u->write_type);
+
+        pa_assert(l != 0);
+
+        if (l < 0) {
+            if (errno == EAGAIN)
+                break;
+            else if (errno != EINTR) {
+                if (!u->fifo_error) {
+                    pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
+                    u->fifo_error = true;
+                }
+                count = l - count;
+                break;
+            }
+        } else {
+            count += l;
+            index += l;
+            length -= l;
+
+            if (length <= 0) {
+                break;
+            }
+        }
+    }
+
+    pa_memblock_release(pchunk->memblock);
+
+    if (u->fifo_error && count >= 0) {
+        pa_log("Recovered from FIFO error");
+        u->fifo_error = false;
+    }
+    return count;
+}
+
+static int process_render_use_timing(struct userdata *u, pa_usec_t now) {
+    size_t dropped = 0;
+    size_t consumed = 0;
+
+    pa_assert(u);
+
+    /* Fill the buffer up the latency size */
+    while (u->timestamp < now + u->block_usec) {
+        ssize_t written = 0;
+        pa_memchunk chunk;
+
+        pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk);
+
+        pa_assert(chunk.length > 0);
+
+        if ((written = pipe_sink_write(u, &chunk)) < 0)
+            written = -1 - written;
+
+        pa_memblock_unref(chunk.memblock);
+
+        u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
+
+        dropped = chunk.length - written;
+
+        if (u->bytes_dropped != 0 && dropped != chunk.length) {
+            pa_log_debug("Pipe-sink continuously dropped %zu bytes", u->bytes_dropped);
+            u->bytes_dropped = 0;
+        }
+
+        if (u->bytes_dropped == 0 && dropped != 0)
+            pa_log_debug("Pipe-sink just dropped %zu bytes", dropped);
+
+        u->bytes_dropped += dropped;
+
+        consumed += chunk.length;
+
+        if (consumed >= u->sink->thread_info.max_request)
+            break;
+    }
+
+    return 0;
+}
+
 static int process_render(struct userdata *u) {
     pa_assert(u);
 
@@ -162,6 +305,55 @@ static int process_render(struct userdata *u) {
     }
 }
 
+static void thread_func_use_timing(void *userdata) {
+    struct userdata *u = userdata;
+
+    pa_assert(u);
+
+    pa_log_debug("Thread (use timing) starting up");
+
+    pa_thread_mq_install(&u->thread_mq);
+
+    u->timestamp = pa_rtclock_now();
+
+    for (;;) {
+        pa_usec_t now = 0;
+        int ret;
+
+        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
+            now = pa_rtclock_now();
+
+        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+            pa_sink_process_rewind(u->sink, 0);
+
+        /* Render some data and write it to the fifo */
+        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
+            if (u->timestamp <= now)
+                if (process_render_use_timing(u, now) < 0)
+                    goto fail;
+
+            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
+        } else
+            pa_rtpoll_set_timer_disabled(u->rtpoll);
+
+        /* Hmm, nothing to do. Let's sleep */
+        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
+            goto fail;
+
+        if (ret == 0)
+            goto finish;
+    }
+
+fail:
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
+    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
+    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
+
+finish:
+    pa_log_debug("Thread (use timing) shutting down");
+}
+
 static void thread_func(void *userdata) {
     struct userdata *u = userdata;
 
@@ -225,6 +417,7 @@ int pa__init(pa_module *m) {
     pa_modargs *ma;
     struct pollfd *pollfd;
     pa_sink_new_data data;
+    pa_thread_func_t thread_routine;
 
     pa_assert(m);
 
@@ -247,6 +440,11 @@ int pa__init(pa_module *m) {
     pa_memchunk_reset(&u->memchunk);
     u->rtpoll = pa_rtpoll_new();
 
+    if (pa_modargs_get_value_boolean(ma, "use_system_clock_for_timing", &u->use_system_clock_for_timing) < 0) {
+        pa_log("Failed to parse use_system_clock_for_timing argument.");
+        goto fail;
+    }
+
     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
         pa_log("pa_thread_mq_init() failed.");
         goto fail;
@@ -292,7 +490,10 @@ int pa__init(pa_module *m) {
         goto fail;
     }
 
-    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
+    if (u->use_system_clock_for_timing)
+        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
+    else
+        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
     pa_sink_new_data_done(&data);
 
     if (!u->sink) {
@@ -301,21 +502,32 @@ int pa__init(pa_module *m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
+    if (u->use_system_clock_for_timing)
+        u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->userdata = u;
 
     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
     pa_sink_set_rtpoll(u->sink, u->rtpoll);
 
+    u->bytes_dropped = 0;
+    u->fifo_error = false;
     u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink->sample_spec);
+    if (u->use_system_clock_for_timing) {
+        u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
+        pa_sink_set_latency_range(u->sink, 0, u->block_usec);
+        thread_routine = thread_func_use_timing;
+    } else {
+        pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));
+        thread_routine = thread_func;
+    }
     pa_sink_set_max_request(u->sink, u->buffer_size);
-    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));
 
     u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
     pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
     pollfd->fd = u->fd;
     pollfd->events = pollfd->revents = 0;
 
-    if (!(u->thread = pa_thread_new("pipe-sink", thread_func, u))) {
+    if (!(u->thread = pa_thread_new("pipe-sink", thread_routine, u))) {
         pa_log("Failed to create thread.");
         goto fail;
     }
-- 
2.7.4



More information about the pulseaudio-discuss mailing list