[pulseaudio-discuss] [PATCH 04/12] srbchannel: Add the shared ringbuffer object

David Henningsson david.henningsson at canonical.com
Fri May 30 04:59:23 PDT 2014


An shm ringbuffer that is used for low overhead server-client communication.
Signalling is done through eventfd semaphores - it's based on pa_fdsem to avoid
syscalls if nothing is waiting on the other side.

Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
 src/Makefile.am            |   3 +-
 src/pulsecore/srbchannel.c | 305 +++++++++++++++++++++++++++++++++++++++++++++
 src/pulsecore/srbchannel.h |  62 +++++++++
 3 files changed, 369 insertions(+), 1 deletion(-)
 create mode 100644 src/pulsecore/srbchannel.c
 create mode 100644 src/pulsecore/srbchannel.h

diff --git a/src/Makefile.am b/src/Makefile.am
index 1ac8a16..b9a660e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -618,6 +618,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/creds.h \
 		pulsecore/dynarray.c pulsecore/dynarray.h \
 		pulsecore/endianmacros.h \
+		pulsecore/fdsem.c pulsecore/fdsem.h \
 		pulsecore/flist.c pulsecore/flist.h \
 		pulsecore/g711.c pulsecore/g711.h \
 		pulsecore/hashmap.c pulsecore/hashmap.h \
@@ -651,6 +652,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/queue.c pulsecore/queue.h \
 		pulsecore/random.c pulsecore/random.h \
 		pulsecore/refcnt.h \
+		pulsecore/srbchannel.c pulsecore/srbchannel.h \
 		pulsecore/sample-util.c pulsecore/sample-util.h \
 		pulsecore/shm.c pulsecore/shm.h \
 		pulsecore/bitset.c pulsecore/bitset.h \
@@ -880,7 +882,6 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/core-scache.c pulsecore/core-scache.h \
 		pulsecore/core-subscribe.c pulsecore/core-subscribe.h \
 		pulsecore/core.c pulsecore/core.h \
-		pulsecore/fdsem.c pulsecore/fdsem.h \
 		pulsecore/hook-list.c pulsecore/hook-list.h \
 		pulsecore/ltdl-helper.c pulsecore/ltdl-helper.h \
 		pulsecore/modargs.c pulsecore/modargs.h \
diff --git a/src/pulsecore/srbchannel.c b/src/pulsecore/srbchannel.c
new file mode 100644
index 0000000..5fe2220
--- /dev/null
+++ b/src/pulsecore/srbchannel.c
@@ -0,0 +1,305 @@
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2014 David Henningsson, Canonical Ltd.
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as
+  published by the Free Software Foundation; either version 2.1 of the
+  License, or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with PulseAudio; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+  USA.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "srbchannel.h"
+
+#include <pulsecore/atomic.h>
+#include <pulse/xmalloc.h>
+
+/* #define DEBUG_SRBCHANNEL */
+
+/* This ringbuffer might be useful in other contexts too, but
+   right now it's only used inside the srbchannel, so let's keep it here
+   for the time being. */
+typedef struct pa_ringbuffer pa_ringbuffer;
+struct pa_ringbuffer {
+    pa_atomic_t *count;
+    int capacity;
+    uint8_t *memory;
+    int readindex, writeindex;
+};
+
+static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
+    int c = pa_atomic_load(r->count);
+    if (r->readindex + c > r->capacity)
+        *count = r->capacity - r->readindex;
+    else
+        *count = c;
+    return r->memory + r->readindex;
+}
+
+/* Returns true only if the buffer was completely full before the drop. */
+static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
+    bool b = pa_atomic_sub(r->count, count) >= r->capacity;
+    r->readindex += count;
+    r->readindex %= r->capacity;
+    return b;
+}
+
+static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
+    int c = pa_atomic_load(r->count);
+    *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
+    return r->memory + r->writeindex;
+}
+
+static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) {
+    pa_atomic_add(r->count, count);
+    r->writeindex += count;
+    r->writeindex %= r->capacity;
+}
+
+struct pa_srbchannel {
+    pa_ringbuffer rb_read, rb_write;
+    pa_fdsem *sem_read, *sem_write;
+    pa_memblock *memblock;
+    void *cb_userdata;
+    pa_srbchannel_cb_t callback;
+    pa_io_event *read_event;
+    pa_mainloop_api *mainloop;
+};
+
+/* We always listen to sem_read, and always signal on sem_write.
+
+   This means we signal the same semaphore for two scenarios:
+   1) We have written something to our send buffer, and want the other
+      side to read it
+   2) We have read something from our receive buffer that was previously
+      completely full, and want the other side to continue writing
+*/
+
+size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
+    size_t written = 0;
+    while (l > 0) {
+        int towrite;
+        void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
+        if ((size_t) towrite > l)
+            towrite = l;
+        if (towrite == 0) {
+#ifdef DEBUG_SRBCHANNEL
+            pa_log("srbchannel output buffer full");
+#endif
+            break;
+        }
+        memcpy(ptr, data, towrite);
+        pa_ringbuffer_end_write(&sr->rb_write, towrite);
+        written += towrite;
+        data = (uint8_t*) data + towrite;
+        l -= towrite;
+    }
+#ifdef DEBUG_SRBCHANNEL
+    pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
+#endif
+    pa_fdsem_post(sr->sem_write);
+    return written;
+}
+
+size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
+    size_t isread = 0;
+    while (l > 0) {
+        int toread;
+        void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
+        if ((size_t) toread > l)
+            toread = l;
+        if (toread == 0)
+            break;
+        memcpy(data, ptr, toread);
+        if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
+#ifdef DEBUG_SRBCHANNEL
+            pa_log("read from full output buffer, signalling fdsem");
+#endif
+            pa_fdsem_post(sr->sem_write);
+        }
+
+        isread += toread;
+        data = (uint8_t*) data + toread;
+        l -= toread;
+    }
+#ifdef DEBUG_SRBCHANNEL
+    pa_log("Read %d bytes from srbchannel", (int) isread);
+#endif
+    return isread;
+}
+
+/* This is the memory layout of the ringbuffer shm block. It is followed by
+   read and write ringbuffer memory. */
+struct srbheader {
+    pa_atomic_t read_count;
+    pa_atomic_t write_count;
+    pa_fdsem_data read_semdata;
+    pa_fdsem_data write_semdata;
+    int capacity;
+    int readbuf_offset;
+    int writebuf_offset;
+    /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
+};
+
+static void srbchannel_rwloop(pa_srbchannel* sr) {
+    do {
+#ifdef DEBUG_SRBCHANNEL
+        int q;
+        pa_ringbuffer_peek(&sr->rb_read, &q);
+        pa_log("In rw loop from srbchannel, before callback, count = %d", q);
+#endif
+
+        if (sr->callback)
+            if (!sr->callback(sr, sr->cb_userdata)) {
+#ifdef DEBUG_SRBCHANNEL
+                pa_log("Aborting read loop from srbchannel");
+#endif
+                return;
+            }
+
+#ifdef DEBUG_SRBCHANNEL
+        pa_ringbuffer_peek(&sr->rb_read, &q);
+        pa_log("In rw loop from srbchannel, after callback, count = %d", q);
+#endif
+
+    } while (pa_fdsem_before_poll(sr->sem_read) < 0);
+}
+
+static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) {
+    pa_srbchannel* sr = userdata;
+
+    pa_fdsem_after_poll(sr->sem_read);
+    srbchannel_rwloop(sr);
+}
+
+pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
+    int capacity;
+    int readfd;
+    struct srbheader *srh;
+
+    pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
+    sr->mainloop = m;
+    sr->memblock = pa_memblock_new_pool(p, -1);
+    srh = pa_memblock_acquire(sr->memblock);
+    pa_zero(*srh);
+
+    sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
+    srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
+    capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
+    sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
+    srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
+    capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
+    pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
+        (int) pa_memblock_get_length(sr->memblock), capacity);
+
+    srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
+    sr->rb_read.count = &srh->read_count;
+    sr->rb_write.count = &srh->write_count;
+
+    sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata);
+    sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata);
+
+    readfd = pa_fdsem_get(sr->sem_read);
+#ifdef DEBUG_SRBCHANNEL
+    pa_log("Enabling io event on fd %d", readfd);
+#endif
+    sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
+    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
+
+    return sr;
+}
+
+static void pa_srbchannel_swap(pa_srbchannel *sr) {
+    pa_srbchannel temp = *sr;
+    sr->sem_read = temp.sem_write;
+    sr->sem_write = temp.sem_read;
+    sr->rb_read = temp.rb_write;
+    sr->rb_write = temp.rb_read;
+}
+
+pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t)
+{
+    int temp;
+    struct srbheader *srh;
+    pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
+
+    sr->mainloop = m;
+    sr->memblock = t->memblock;
+    pa_memblock_ref(sr->memblock);
+    srh = pa_memblock_acquire(sr->memblock);
+
+    sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
+    sr->rb_read.count = &srh->read_count;
+    sr->rb_write.count = &srh->write_count;
+    sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
+    sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
+
+    sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
+    sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
+
+    pa_srbchannel_swap(sr);
+    temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;
+
+#ifdef DEBUG_SRBCHANNEL
+    pa_log("Enabling io event on fd %d", t->readfd);
+#endif
+    sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
+    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
+
+    return sr;
+}
+
+void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) {
+    t->memblock = sr->memblock;
+    t->readfd = pa_fdsem_get(sr->sem_read);
+    t->writefd = pa_fdsem_get(sr->sem_write);
+}
+
+void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {
+    if (sr->callback)
+        pa_fdsem_after_poll(sr->sem_read);
+
+    sr->callback = callback;
+    sr->cb_userdata = userdata;
+
+    if (sr->callback)
+        /* Maybe deferred event? */
+        srbchannel_rwloop(sr);
+}
+
+void pa_srbchannel_free(pa_srbchannel *sr)
+{
+#ifdef DEBUG_SRBCHANNEL
+    pa_log("Freeing srbchannel");
+#endif
+    pa_assert(sr);
+
+    if (sr->read_event)
+        sr->mainloop->io_free(sr->read_event);
+
+    if (sr->sem_read)
+        pa_fdsem_free(sr->sem_read);
+    if (sr->sem_write)
+        pa_fdsem_free(sr->sem_write);
+
+    if (sr->memblock) {
+        pa_memblock_release(sr->memblock);
+        pa_memblock_unref(sr->memblock);
+    }
+
+    pa_xfree(sr);
+}
diff --git a/src/pulsecore/srbchannel.h b/src/pulsecore/srbchannel.h
new file mode 100644
index 0000000..843bf96
--- /dev/null
+++ b/src/pulsecore/srbchannel.h
@@ -0,0 +1,62 @@
+#ifndef foopulsesrbchannelhfoo
+#define foopulsesrbchannelhfoo
+
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2014 David Henningsson, Canonical Ltd.
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as
+  published by the Free Software Foundation; either version 2.1 of the
+  License, or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with PulseAudio; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+  USA.
+***/
+
+#include <pulse/mainloop-api.h>
+#include <pulsecore/fdsem.h>
+#include <pulsecore/memblock.h>
+
+/* An shm ringbuffer that is used for low overhead server-client communication.
+   Signaling is done through eventfd semaphores (pa_fdsem). */
+
+typedef struct pa_srbchannel pa_srbchannel;
+
+typedef struct pa_srbchannel_template {
+    int readfd, writefd;
+    pa_memblock *memblock;
+} pa_srbchannel_template;
+
+pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p);
+/* Note: this creates a srbchannel with swapped read and write. */
+pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t);
+
+void pa_srbchannel_free(pa_srbchannel *sr);
+
+void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t);
+
+size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l);
+size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l);
+
+/* Set the callback function that is called whenever data becomes available for reading.
+   It can also be called if the output buffer was full and can now be written to.
+
+   Return false to abort all processing (e g if the srbchannel has been freed during the callback).
+   Otherwise return true.
+
+   Note that the callback will be called immediately, to be able to process stuff that
+   might already be in the buffer.
+*/
+typedef bool (*pa_srbchannel_cb_t)(pa_srbchannel *sr, void *userdata);
+void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata);
+
+#endif
-- 
1.9.1



More information about the pulseaudio-discuss mailing list