[pulseaudio-discuss] [PATCH v7] pipe-source: implement autosuspend option

Georg Chini georg at chini.tk
Tue Feb 20 12:11:36 UTC 2018


On 19.02.2018 16:01, Raman Shyshniou wrote:
> Currently the pipe-source will remain running even if no
> writer is connected and therefore no data is produced.
> This patch adds the autosuspend=<bool> option to prevent this.
> Source will stay suspended if no writer is connected.
> This option is enabled by default.
> ---
>   src/modules/module-pipe-source.c | 279 ++++++++++++++++++++++++++++++---------
>   1 file changed, 215 insertions(+), 64 deletions(-)
>
> diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c
> index f8284c1..c1a1e9c 100644
> --- a/src/modules/module-pipe-source.c
> +++ b/src/modules/module-pipe-source.c
> @@ -33,6 +33,7 @@
>   #include <sys/filio.h>
>   #endif
>   
> +#include <pulse/rtclock.h>
>   #include <pulse/xmalloc.h>
>   
>   #include <pulsecore/core-error.h>
> @@ -57,11 +58,24 @@ PA_MODULE_USAGE(
>           "format=<sample format> "
>           "rate=<sample rate> "
>           "channels=<number of channels> "
> -        "channel_map=<channel map>");
> +        "channel_map=<channel map> "
> +        "autosuspend=<boolean>");
>   
>   #define DEFAULT_FILE_NAME "/tmp/music.input"
>   #define DEFAULT_SOURCE_NAME "fifo_input"
>   
> +struct pipe_source_msg {
> +    pa_msgobject parent;
> +};
> +
> +typedef struct pipe_source_msg pipe_source_msg;
> +PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject);
> +
> +enum {
> +    PIPE_SOURCE_SUSPEND,
> +    PIPE_SOURCE_RESUME
> +};
> +
>   struct userdata {
>       pa_core *core;
>       pa_module *module;
> @@ -71,12 +85,14 @@ struct userdata {
>       pa_thread_mq thread_mq;
>       pa_rtpoll *rtpoll;
>   
> +    pipe_source_msg *msg;
> +    pa_usec_t timestamp;
> +    bool autosuspend;
> +    size_t pipe_size;
> +
>       char *filename;
> +    int corkfd;
>       int fd;
> -
> -    pa_memchunk memchunk;
> -
> -    pa_rtpoll_item *rtpoll_item;
>   };
>   
>   static const char* const valid_modargs[] = {
> @@ -87,9 +103,41 @@ static const char* const valid_modargs[] = {
>       "rate",
>       "channels",
>       "channel_map",
> +    "autosuspend",
>       NULL
>   };
>   
> +/* Called from main context */
> +static int pipe_source_process_msg(
> +        pa_msgobject *o,
> +        int code,
> +        void *data,
> +        int64_t offset,
> +        pa_memchunk *chunk) {
> +
> +    struct userdata *u = (struct userdata *) data;
> +
> +    pa_assert(u);
> +
> +    switch (code) {
> +        case PIPE_SOURCE_SUSPEND:
> +            pa_log_debug("Suspending source %s because no writers left", u->source->name);
> +            pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION);
> +            break;
> +
> +        case PIPE_SOURCE_RESUME:
> +            pa_log_debug("Resuming source %s", u->source->name);
> +            pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION);
> +            break;
> +
> +        default:
> +            pa_assert_not_reached();
> +    }
> +
> +    return 0;
> +}
> +
> +/* Called from thread context */
>   static int source_process_msg(
>           pa_msgobject *o,
>           int code,
> @@ -101,17 +149,30 @@ static int source_process_msg(
>   
>       switch (code) {
>   
> +        case PA_SOURCE_MESSAGE_SET_STATE:
> +
> +            if (u->source->thread_info.state == PA_SOURCE_SUSPENDED || u->source->thread_info.state == PA_SOURCE_INIT) {
> +                if (PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(data)))
> +                    u->timestamp = pa_rtclock_now();
> +            }
> +
> +            break;
> +
>           case PA_SOURCE_MESSAGE_GET_LATENCY: {
> -            size_t n = 0;
> +            int64_t latency;
> +
> +            latency = PA_CLIP_SUB((int64_t)pa_rtclock_now(), (int64_t)u->timestamp);
>   
>   #ifdef FIONREAD
> -            int l;
> +            if (u->corkfd < 0) {
> +                int l;
>   
> -            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
> -                n = (size_t) l;
> +                if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
> +                    latency += pa_bytes_to_usec((uint64_t)l, &u->source->sample_spec);
> +            }
>   #endif
>   

I don't think it is correct to add the data in the pipe to the latency.
Essentially, the pipe should always be empty, because you read
all data as soon as it is available. If a block of data is written to the
pipe, you should already have accounted for that block through
the time difference between now and the time stamp.

> -            *((int64_t*) data) = pa_bytes_to_usec(n, &u->source->sample_spec);
> +            *((int64_t*) data) = latency;
>               return 0;
>           }
>       }
> @@ -121,7 +182,11 @@ static int source_process_msg(
>   
>   static void thread_func(void *userdata) {
>       struct userdata *u = userdata;
> +    pa_rtpoll_item *rtpoll_item;
> +    struct pollfd *pollfd;
> +    pa_memchunk chunk;
>       int read_type = 0;
> +    size_t fs;
>   
>       pa_assert(u);
>   
> @@ -129,68 +194,135 @@ static void thread_func(void *userdata) {
>   
>       pa_thread_mq_install(&u->thread_mq);
>   
> +    fs = pa_frame_size(&u->source->sample_spec);
> +
> +    pa_memchunk_reset(&chunk);
> +    chunk.memblock = pa_memblock_new(u->core->mempool, u->pipe_size);

Further down, you might put some data in the memchunk before the
read if the previous data was not frame aligned, therefore the memblock
size must be u->pipe_size + fs.

> +
> +    rtpoll_item = NULL;
> +    pollfd = NULL;
> +
> +    u->timestamp = pa_rtclock_now();
> +
> +    /* Close our writer here to suspend source if no writers left */
> +    pa_assert_se(pa_close(u->corkfd) == 0);
> +    u->corkfd = -1;
> +
>       for (;;) {
>           int ret;
> -        struct pollfd *pollfd;
>   
> -        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> +        if (chunk.length) {
> +            /* We have a pending data, let's stop polling pipe.
> +             * Setting up pollfd->events = 0 is not enough to stop
> +             * POLLHUP spam if all writers are closed pipe.
> +             * We need to stop polling pipe completely */
> +            if (rtpoll_item) {
> +                pa_rtpoll_item_free(rtpoll_item);
> +                rtpoll_item = NULL;
> +            }
> +        } else {
> +            /* We have no pending data, let's start polling pipe */
> +            if (rtpoll_item == NULL) {
> +                rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
> +                pollfd = pa_rtpoll_item_get_pollfd(rtpoll_item, NULL);
> +                pollfd->events = POLLIN;
> +                pollfd->fd = u->fd;
> +            }
> +        }

Why do you need to do that? As in your previous patches you
open the pipe for writing if all writers are disconnected, which
should stop POLLHUP's.

>   
> -        /* Try to read some data and pass it on to the source driver */
> -        if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) {
> -            ssize_t l;
> -            void *p;
> +        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
> +            goto fail;
>   
> -            if (!u->memchunk.memblock) {
> -                u->memchunk.memblock = pa_memblock_new(u->core->mempool, pa_pipe_buf(u->fd));
> -                u->memchunk.index = u->memchunk.length = 0;
> -            }
> +        if (ret == 0)
> +            goto finish;
>   
> -            pa_assert(pa_memblock_get_length(u->memchunk.memblock) > u->memchunk.index);
> +        if (rtpoll_item) {
> +            pollfd = pa_rtpoll_item_get_pollfd(rtpoll_item, NULL);
>   
> -            p = pa_memblock_acquire(u->memchunk.memblock);
> -            l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type);
> -            pa_memblock_release(u->memchunk.memblock);
> +            if (pollfd->revents & ~(POLLIN | POLLHUP)) {
> +                pa_log("FIFO shutdown.");

You could be more verbose what has happened - see for example
the thread function in module-bluez5-device.

> +                goto fail;
> +            }
> +        } else
> +            pollfd = NULL;
>   
> -            pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */
> +        /* Try to read some data if there are any events */
> +        if (pollfd && pollfd->revents) {
> +            ssize_t l;
> +            void *p;
>   
> -            if (l < 0) {
> +            pa_assert(chunk.index < fs);
>   
> -                if (errno == EINTR)
> -                    continue;
> -                else if (errno != EAGAIN) {
> -                    pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno));
> -                    goto fail;
> -                }
> +            p = pa_memblock_acquire(chunk.memblock);
> +            l = pa_read(u->fd, (uint8_t *) p + chunk.index, pa_memblock_get_length(chunk.memblock) - chunk.index, &read_type);
> +            pa_memblock_release(chunk.memblock);
>   
> -            } else {
> +            if (l > 0) {
> +                chunk.length = (size_t) l;
> +                u->timestamp = pa_rtclock_now();
>   
> -                u->memchunk.length = (size_t) l;
> -                pa_source_post(u->source, &u->memchunk);
> -                u->memchunk.index += (size_t) l;
> +                if (u->corkfd >= 0) {
> +                    if (u->autosuspend)
> +                        pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_RESUME, u, 0, NULL, NULL);
>   
> -                if (u->memchunk.index >= pa_memblock_get_length(u->memchunk.memblock)) {
> -                    pa_memblock_unref(u->memchunk.memblock);
> -                    pa_memchunk_reset(&u->memchunk);
> +                    pa_assert_se(pa_close(u->corkfd) == 0);
> +                    u->corkfd = -1;
> +                }
> +            } else if (l == 0) {
> +                /* Writer was disconnected: discard unalligned data tail */
> +                chunk.index = 0;
> +
> +                if (u->corkfd < 0) {
> +                    if (u->autosuspend)
> +                        pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL);
> +
> +                    if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) {
> +                        pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
> +                        goto fail;
> +                    }
>                   }
> +            } else {
> +                if (errno == EINTR || errno == EAGAIN)
> +                    continue;
>   
> -                pollfd->revents = 0;
> +                pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno));
> +                goto fail;
>               }
>           }
>   
> -        /* Hmm, nothing to do. Let's sleep */
> -        pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0);
> +        /* Post pending data.
> +         * Let's keep frame boundaries */
> +        if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
> +            size_t total_len = chunk.index + chunk.length;
> +            size_t frames = total_len / fs;
> +            size_t tail = total_len % fs;
>   
> -        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
> -            goto fail;
> +            if (frames > 0) {
> +                pa_memblock *memblock;
>   
> -        if (ret == 0)
> -            goto finish;
> +                memblock = pa_memblock_new(u->core->mempool, u->pipe_size);

Same comment about the memblock size as above.

> +                chunk.length = frames * fs;
> +                chunk.index = 0;
>   
> -        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> +                if (tail) {
> +                    void *src, *dst;
>   
> -        if (pollfd->revents & ~POLLIN) {
> -            pa_log("FIFO shutdown.");
> -            goto fail;
> +                    dst = pa_memblock_acquire(memblock);
> +                    src = pa_memblock_acquire(chunk.memblock);
> +
> +                    memcpy(dst, (uint8_t *) src + chunk.length, tail);
> +
> +                    pa_memblock_release(chunk.memblock);
> +                    pa_memblock_release(memblock);
> +                }
> +
> +                pa_source_post(u->source, &chunk);
> +                pa_memblock_unref(chunk.memblock);
> +                chunk.memblock = memblock;
> +            }
> +
> +            chunk.index = tail;
> +            chunk.length = 0;
>           }
>       }
>   
> @@ -201,6 +333,11 @@ fail:
>       pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
>   
>   finish:
> +    if (rtpoll_item)
> +        pa_rtpoll_item_free(rtpoll_item);
> +
> +    pa_memblock_unref(chunk.memblock);
> +
>       pa_log_debug("Thread shutting down");
>   }
>   
> @@ -210,7 +347,6 @@ int pa__init(pa_module *m) {
>       pa_sample_spec ss;
>       pa_channel_map map;
>       pa_modargs *ma;
> -    struct pollfd *pollfd;
>       pa_source_new_data data;
>   
>       pa_assert(m);
> @@ -230,7 +366,6 @@ int pa__init(pa_module *m) {
>       m->userdata = u = pa_xnew0(struct userdata, 1);
>       u->core = m->core;
>       u->module = m;
> -    pa_memchunk_reset(&u->memchunk);
>       u->rtpoll = pa_rtpoll_new();
>   
>       if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
> @@ -238,13 +373,31 @@ int pa__init(pa_module *m) {
>           goto fail;
>       }
>   
> +    if (!(u->msg = pa_msgobject_new(pipe_source_msg)))
> +        goto fail;
> +
> +    u->msg->parent.process_msg = pipe_source_process_msg;
> +
> +    u->autosuspend = true;
> +
> +    if (pa_modargs_get_value_boolean(ma, "autosuspend", &u->autosuspend) < 0) {
> +        pa_log("Failed to parse autosuspend argument.");
> +        goto fail;
> +    }
> +
>       u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
>   
>       if (mkfifo(u->filename, 0666) < 0) {
>           pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno));
>           goto fail;
>       }
> -    if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
> +
> +    if ((u->corkfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
> +        pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
> +        goto fail;
> +    }
> +
> +    if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) {
>           pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
>           goto fail;
>       }
> @@ -289,12 +442,10 @@ int pa__init(pa_module *m) {
>   
>       pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
>       pa_source_set_rtpoll(u->source, u->rtpoll);
> -    pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(pa_pipe_buf(u->fd), &u->source->sample_spec));
>   
> -    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
> -    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> -    pollfd->fd = u->fd;
> -    pollfd->events = pollfd->revents = 0;
> +    u->pipe_size = pa_pipe_buf(u->fd);
> +    u->pipe_size = PA_MIN(pa_mempool_block_size_max(u->core->mempool), u->pipe_size);
> +    pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(u->pipe_size, &u->source->sample_spec));
>   
>       if (!(u->thread = pa_thread_new("pipe-source", thread_func, u))) {
>           pa_log("Failed to create thread.");
> @@ -346,12 +497,6 @@ void pa__done(pa_module *m) {
>       if (u->source)
>           pa_source_unref(u->source);
>   
> -    if (u->memchunk.memblock)
> -        pa_memblock_unref(u->memchunk.memblock);
> -
> -    if (u->rtpoll_item)
> -        pa_rtpoll_item_free(u->rtpoll_item);
> -
>       if (u->rtpoll)
>           pa_rtpoll_free(u->rtpoll);
>   
> @@ -360,6 +505,12 @@ void pa__done(pa_module *m) {
>           pa_xfree(u->filename);
>       }
>   
> +    if (u->msg)
> +        pa_xfree(u->msg);
> +
> +    if (u->corkfd >= 0)
> +        pa_assert_se(pa_close(u->corkfd) == 0);
> +
>       if (u->fd >= 0)
>           pa_assert_se(pa_close(u->fd) == 0);
>   




More information about the pulseaudio-discuss mailing list