[pulseaudio-discuss] [PATCH 04/11] srchannel: Add the shared ringbuffer object

Peter Meerwald pmeerw at pmeerw.net
Tue May 6 07:01:33 PDT 2014


Hello,

> An shm ringbuffer that is used for low overhead server-client communication.
> Signalling is done through eventfd semaphores - it's based on pa_fdsem to avoid
> syscalls if nothing is waiting on the other side.

comment inline

> ---
>  src/Makefile.am           |   3 +-
>  src/pulsecore/srchannel.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++
>  src/pulsecore/srchannel.h |  62 ++++++++++
>  3 files changed, 361 insertions(+), 1 deletion(-)
>  create mode 100644 src/pulsecore/srchannel.c
>  create mode 100644 src/pulsecore/srchannel.h
> 
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 5c2d5bc..9e72982 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -618,6 +618,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
>  		pulsecore/creds.h \
>  		pulsecore/dynarray.c pulsecore/dynarray.h \
>  		pulsecore/endianmacros.h \
> +		pulsecore/fdsem.c pulsecore/fdsem.h \
>  		pulsecore/flist.c pulsecore/flist.h \
>  		pulsecore/g711.c pulsecore/g711.h \
>  		pulsecore/hashmap.c pulsecore/hashmap.h \
> @@ -651,6 +652,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
>  		pulsecore/queue.c pulsecore/queue.h \
>  		pulsecore/random.c pulsecore/random.h \
>  		pulsecore/refcnt.h \
> +		pulsecore/srchannel.c pulsecore/srchannel.h \
>  		pulsecore/sample-util.c pulsecore/sample-util.h \
>  		pulsecore/shm.c pulsecore/shm.h \
>  		pulsecore/bitset.c pulsecore/bitset.h \
> @@ -880,7 +882,6 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \
>  		pulsecore/core-scache.c pulsecore/core-scache.h \
>  		pulsecore/core-subscribe.c pulsecore/core-subscribe.h \
>  		pulsecore/core.c pulsecore/core.h \
> -		pulsecore/fdsem.c pulsecore/fdsem.h \
>  		pulsecore/hook-list.c pulsecore/hook-list.h \
>  		pulsecore/ltdl-helper.c pulsecore/ltdl-helper.h \
>  		pulsecore/modargs.c pulsecore/modargs.h \
> diff --git a/src/pulsecore/srchannel.c b/src/pulsecore/srchannel.c
> new file mode 100644
> index 0000000..15485b2
> --- /dev/null
> +++ b/src/pulsecore/srchannel.c
> @@ -0,0 +1,297 @@
> +/***
> +  This file is part of PulseAudio.
> +
> +  Copyright 2014 David Henningsson, Canonical Ltd.
> +
> +  PulseAudio is free software; you can redistribute it and/or modify
> +  it under the terms of the GNU Lesser General Public License as
> +  published by the Free Software Foundation; either version 2.1 of the
> +  License, or (at your option) any later version.
> +
> +  PulseAudio is distributed in the hope that it will be useful, but
> +  WITHOUT ANY WARRANTY; without even the implied warranty of
> +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +  Lesser General Public License for more details.
> +
> +  You should have received a copy of the GNU Lesser General Public
> +  License along with PulseAudio; if not, write to the Free Software
> +  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
> +  USA.
> +***/
> +
> +#ifdef HAVE_CONFIG_H
> +#include <config.h>
> +#endif
> +
> +#include "srchannel.h"
> +
> +#include <pulsecore/atomic.h>
> +#include <pulse/xmalloc.h>
> +
> +/* #define DEBUG_SRCHANNEL */
> +
> +/* This ringbuffer might be useful in other contexts too, but
> +   right now it's only used inside the srchannel, so let's keep it here
> +   for the time being. */

count is atomic, but readindex, writeindex are not

the assumption is that there is exactly one reader and 
exactly one writer?

> +typedef struct pa_ringbuffer pa_ringbuffer;
> +struct pa_ringbuffer {
> +    pa_atomic_t *count;
> +    int capacity;
> +    uint8_t *memory;
> +    int readindex, writeindex;
> +};
> +
> +static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
> +    int c = pa_atomic_load(r->count);
> +    if (r->readindex + c > r->capacity)
> +        *count = r->capacity - r->readindex;
> +    else
> +        *count = c;
> +    return r->memory + r->readindex;
> +}
> +
> +/* Returns true only if the buffer was completely full before the drop. */
> +static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
> +    bool b = pa_atomic_sub(r->count, count) >= r->capacity;
> +    r->readindex += count;
> +    r->readindex %= r->capacity;
> +    return b;
> +}
> +
> +static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
> +    int c = pa_atomic_load(r->count);
> +    *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
> +    return r->memory + r->writeindex;
> +}
> +
> +static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) {
> +    pa_atomic_add(r->count, count);
> +    r->writeindex += count;
> +    r->writeindex %= r->capacity;
> +}
> +
> +struct pa_srchannel {
> +    pa_ringbuffer rb_read, rb_write;
> +    pa_fdsem *sem_read, *sem_write;
> +    pa_memblock *memblock;
> +    void *cb_userdata;
> +    pa_srchannel_cb_t callback;
> +    pa_io_event *read_event;
> +    pa_mainloop_api *mainloop;
> +};
> +
> +ssize_t pa_srchannel_write(pa_srchannel *sr, const void *data, size_t l) {
> +    ssize_t written = 0;
> +    while (l > 0) {
> +        int towrite;
> +        void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
> +        if ((size_t) towrite > l)
> +            towrite = l;
> +        if (towrite == 0) {
> +#ifdef DEBUG_SRCHANNEL
> +            pa_log("srchannel output buffer full");
> +#endif
> +            break;
> +        }
> +        memcpy(ptr, data, towrite);
> +        pa_ringbuffer_end_write(&sr->rb_write, towrite);
> +        written += towrite;
> +        data = (uint8_t*) data + towrite;
> +        l -= towrite;
> +    }
> +#ifdef DEBUG_SRCHANNEL
> +    pa_log("Wrote %d bytes to srchannel, signalling fdsem", (int) written);
> +#endif
> +    pa_fdsem_post(sr->sem_write);
> +    return written;
> +}
> +
> +ssize_t pa_srchannel_read(pa_srchannel *sr, void *data, size_t l) {
> +    ssize_t isread = 0;
> +    while (l > 0) {
> +        int toread;
> +        void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
> +        if ((size_t) toread > l)
> +            toread = l;
> +        if (toread == 0)
> +            break;
> +        memcpy(data, ptr, toread);
> +        if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
> +#ifdef DEBUG_SRCHANNEL
> +            pa_log("read from full output buffer, signalling fdsem");
> +#endif
> +            pa_fdsem_post(sr->sem_write);
> +        }
> +
> +        isread += toread;
> +        data = (uint8_t*) data + toread;
> +        l -= toread;
> +    }
> +#ifdef DEBUG_SRCHANNEL
> +    pa_log("Read %d bytes from srchannel", (int) isread);
> +#endif
> +    return isread;
> +}
> +
> +/* This is the memory layout of the ringbuffer shm block. It is followed by
> +   read and write ringbuffer memory. */
> +struct srheader {
> +    pa_atomic_t read_count;
> +    pa_atomic_t write_count;
> +    pa_fdsem_data read_semdata;
> +    pa_fdsem_data write_semdata;
> +    int capacity;
> +    int readbuf_offset;
> +    int writebuf_offset;
> +    /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
> +};
> +
> +static void read_loop(pa_srchannel* sr) {
> +    do {
> +        int q;
> +        pa_ringbuffer_peek(&sr->rb_read, &q);
> +#ifdef DEBUG_SRCHANNEL
> +        pa_log("In read loop from srchannel, before callback, count = %d", q);
> +#endif
> +
> +        if (q > 0 && sr->callback)
> +            if (!sr->callback(sr, sr->cb_userdata)) {
> +#ifdef DEBUG_SRCHANNEL
> +                pa_log("Aborting read loop from srchannel");
> +#endif
> +                return;
> +            }
> +
> +#ifdef DEBUG_SRCHANNEL
> +        pa_ringbuffer_peek(&sr->rb_read, &q);
> +        pa_log("In read loop from srchannel, after callback, count = %d", q);
> +#endif
> +
> +  } while (pa_fdsem_before_poll(sr->sem_read) < 0);
> +}
> +
> +static void read_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) {
> +    pa_srchannel* sr = userdata;
> +
> +    pa_fdsem_after_poll(sr->sem_read);
> +    read_loop(sr);
> +}
> +
> +pa_srchannel* pa_srchannel_new(pa_mainloop_api *m, pa_mempool *p) {
> +    int capacity;
> +    int readfd;
> +    struct srheader *srh;
> +
> +    pa_srchannel* sr = pa_xmalloc0(sizeof(pa_srchannel));
> +    sr->mainloop = m;
> +    sr->memblock = pa_memblock_new_pool(p, -1);
> +    srh = pa_memblock_acquire(sr->memblock);
> +    pa_zero(*srh);
> +
> +    sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
> +    srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
> +    capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
> +    sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
> +    srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
> +    capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
> +    pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
> +        (int) pa_memblock_get_length(sr->memblock), capacity);
> +
> +    srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
> +    sr->rb_read.count = &srh->read_count;
> +    sr->rb_write.count = &srh->write_count;
> +
> +    sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata);
> +    sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata);
> +
> +    readfd = pa_fdsem_get(sr->sem_read);
> +#ifdef DEBUG_SRCHANNEL
> +    pa_log("Enabling io event on fd %d", readfd);
> +#endif
> +    sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, read_cb, sr);
> +    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
> +
> +    return sr;
> +}
> +
> +static void pa_srchannel_swap(pa_srchannel *sr) {
> +    pa_srchannel temp = *sr;
> +    sr->sem_read = temp.sem_write;
> +    sr->sem_write = temp.sem_read;
> +    sr->rb_read = temp.rb_write;
> +    sr->rb_write = temp.rb_read;
> +}
> +
> +pa_srchannel* pa_srchannel_new_from_template(pa_mainloop_api *m, pa_srchannel_template *t)
> +{
> +    int temp;
> +    struct srheader *srh;
> +    pa_srchannel* sr = pa_xmalloc0(sizeof(pa_srchannel));
> +
> +    sr->mainloop = m;
> +    sr->memblock = t->memblock;
> +    pa_memblock_ref(sr->memblock);
> +    srh = pa_memblock_acquire(sr->memblock);
> +
> +    sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
> +    sr->rb_read.count = &srh->read_count;
> +    sr->rb_write.count = &srh->write_count;
> +    sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
> +    sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
> +
> +    sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
> +    sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
> +
> +    pa_srchannel_swap(sr);
> +    temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;
> +
> +#ifdef DEBUG_SRCHANNEL
> +    pa_log("Enabling io event on fd %d", t->readfd);
> +#endif
> +    sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, read_cb, sr);
> +    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
> +
> +    return sr;
> +}
> +
> +void pa_srchannel_export(pa_srchannel *sr, pa_srchannel_template *t) {
> +    t->memblock = sr->memblock;
> +    t->readfd = pa_fdsem_get(sr->sem_read);
> +    t->writefd = pa_fdsem_get(sr->sem_write);
> +}
> +
> +void pa_srchannel_set_callback(pa_srchannel *sr, pa_srchannel_cb_t callback, void *userdata) {
> +    if (sr->callback)
> +        pa_fdsem_after_poll(sr->sem_read);
> +
> +    sr->callback = callback;
> +    sr->cb_userdata = userdata;
> +
> +    if (sr->callback)
> +        /* Maybe deferred event? */
> +        read_loop(sr);
> +}
> +
> +void pa_srchannel_free(pa_srchannel *sr)
> +{
> +#ifdef DEBUG_SRCHANNEL
> +    pa_log("Freeing srchannel");
> +#endif
> +    if (!sr)
> +        return;
> +
> +    if (sr->read_event)
> +        sr->mainloop->io_free(sr->read_event);
> +
> +    if (sr->sem_read)
> +        pa_fdsem_free(sr->sem_read);
> +    if (sr->sem_write)
> +        pa_fdsem_free(sr->sem_write);
> +
> +    if (sr->memblock) {
> +        pa_memblock_release(sr->memblock);
> +        pa_memblock_unref(sr->memblock);
> +    }
> +
> +    pa_xfree(sr);
> +}
> diff --git a/src/pulsecore/srchannel.h b/src/pulsecore/srchannel.h
> new file mode 100644
> index 0000000..8eac77f
> --- /dev/null
> +++ b/src/pulsecore/srchannel.h
> @@ -0,0 +1,62 @@
> +#ifndef foopulsesrchannelhfoo
> +#define foopulsesrchannelhfoo
> +
> +/***
> +  This file is part of PulseAudio.
> +
> +  Copyright 2014 David Henningsson, Canonical Ltd.
> +
> +  PulseAudio is free software; you can redistribute it and/or modify
> +  it under the terms of the GNU Lesser General Public License as
> +  published by the Free Software Foundation; either version 2.1 of the
> +  License, or (at your option) any later version.
> +
> +  PulseAudio is distributed in the hope that it will be useful, but
> +  WITHOUT ANY WARRANTY; without even the implied warranty of
> +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +  Lesser General Public License for more details.
> +
> +  You should have received a copy of the GNU Lesser General Public
> +  License along with PulseAudio; if not, write to the Free Software
> +  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
> +  USA.
> +***/
> +
> +#include <pulse/mainloop-api.h>
> +#include <pulsecore/fdsem.h>
> +#include <pulsecore/memblock.h>
> +
> +/* An shm ringbuffer that is used for low overhead server-client communication.
> +   Signaling is done through eventfd semaphores (pa_fdsem). */
> +
> +typedef struct pa_srchannel pa_srchannel;
> +
> +typedef struct pa_srchannel_template {
> +    int readfd, writefd;
> +    pa_memblock *memblock;
> +} pa_srchannel_template;
> +
> +pa_srchannel* pa_srchannel_new(pa_mainloop_api *m, pa_mempool *p);
> +/* Note: this creates a srchannel with swapped read and write. */
> +pa_srchannel* pa_srchannel_new_from_template(pa_mainloop_api *m, pa_srchannel_template *t);
> +
> +void pa_srchannel_free(pa_srchannel *sr);
> +
> +void pa_srchannel_export(pa_srchannel *sr, pa_srchannel_template *t);
> +
> +ssize_t pa_srchannel_write(pa_srchannel *sr, const void *data, size_t l);
> +ssize_t pa_srchannel_read(pa_srchannel *sr, void *data, size_t l);
> +
> +/* Set the callback function that is called whenever data becomes available for reading.
> +   It can also be called if the output buffer was full and can now be written to.
> +
> +   Return false to abort all processing (e g if the srchannel has been freed during the callback).
> +   Otherwise return true.
> +
> +   Note that if the ringbuffer is not empty when the callback is set up, the callback
> +   will be called immediately.
> +*/
> +typedef bool (*pa_srchannel_cb_t)(pa_srchannel *sr, void *userdata);
> +void pa_srchannel_set_callback(pa_srchannel *sr, pa_srchannel_cb_t callback, void *userdata);
> +
> +#endif
> 

-- 

Peter Meerwald
+43-664-2444418 (mobile)


More information about the pulseaudio-discuss mailing list