[pulseaudio-discuss] [PATCH 06/11] memblock, pstream: Allow send/receive of remote writable memblocks

David Henningsson david.henningsson at canonical.com
Tue Apr 29 06:22:20 PDT 2014


The shared ringbuffer memblock must be writable by both sides.
This makes it possible to send such a memblock over a pstream without
the "both sides writable" information getting lost.

Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
 src/pulsecore/memblock.c | 40 +++++++++++++++++++++++++++++++++++-----
 src/pulsecore/memblock.h |  6 +++++-
 src/pulsecore/pstream.c  | 32 ++++++++++++++++++++++----------
 3 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 8da0fcd..5ef2aa9 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -97,6 +97,7 @@ struct pa_memimport_segment {
     pa_shm memory;
     pa_memtrap *trap;
     unsigned n_blocks;
+    bool writable;
 };
 
 /* A collection of multiple segments */
@@ -146,6 +147,7 @@ struct pa_mempool {
     pa_shm memory;
     size_t block_size;
     unsigned n_blocks;
+    bool is_remote_writable;
 
     pa_atomic_t n_init;
 
@@ -303,6 +305,19 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
 }
 
 /* No lock necessary */
+bool pa_mempool_is_remote_writable(pa_mempool *p) {
+    pa_assert(p);
+    return p->is_remote_writable;
+}
+
+/* No lock necessary */
+void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable) {
+    pa_assert(p);
+    pa_assert(!writable || pa_mempool_is_shared(p));
+    p->is_remote_writable = writable;
+}
+
+/* No lock necessary */
 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
     pa_memblock *b = NULL;
     struct mempool_slot *slot;
@@ -416,6 +431,14 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free
 }
 
 /* No lock necessary */
+bool pa_memblock_is_ours(pa_memblock *b) {
+    pa_assert(b);
+    pa_assert(PA_REFCNT_VALUE(b) > 0);
+
+    return b->type != PA_MEMBLOCK_IMPORTED;
+}
+
+/* No lock necessary */
 bool pa_memblock_is_read_only(pa_memblock *b) {
     pa_assert(b);
     pa_assert(PA_REFCNT_VALUE(b) > 0);
@@ -905,7 +928,7 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
 
 /* Should be called locked */
-static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
     pa_memimport_segment* seg;
 
     if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
@@ -913,11 +936,12 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
 
     seg = pa_xnew0(pa_memimport_segment, 1);
 
-    if (pa_shm_attach(&seg->memory, shm_id, false) < 0) {
+    if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
         pa_xfree(seg);
         return NULL;
     }
 
+    seg->writable = writable;
     seg->import = i;
     seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size);
 
@@ -973,7 +997,8 @@ void pa_memimport_free(pa_memimport *i) {
 }
 
 /* Self-locked */
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+                              size_t offset, size_t size, bool writable) {
     pa_memblock *b = NULL;
     pa_memimport_segment *seg;
 
@@ -990,9 +1015,14 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
         goto finish;
 
     if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
-        if (!(seg = segment_attach(i, shm_id)))
+        if (!(seg = segment_attach(i, shm_id, writable)))
             goto finish;
 
+    if (writable != seg->writable) {
+        pa_log("Cannot open segment - writable status changed!");
+        goto finish;
+    }
+
     if (offset+size > seg->memory.size)
         goto finish;
 
@@ -1002,7 +1032,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
     PA_REFCNT_INIT(b);
     b->pool = i->pool;
     b->type = PA_MEMBLOCK_IMPORTED;
-    b->read_only = true;
+    b->read_only = !writable;
     b->is_silence = false;
     pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
     b->length = size;
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 502f207..d60f3c3 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -104,6 +104,7 @@ function is not multiple caller safe, i.e. needs to be locked
 manually if called from more than one thread at the same time. */
 void pa_memblock_unref_fixed(pa_memblock*b);
 
+bool pa_memblock_is_ours(pa_memblock *b);
 bool pa_memblock_is_read_only(pa_memblock *b);
 bool pa_memblock_is_silence(pa_memblock *b);
 bool pa_memblock_ref_is_one(pa_memblock *b);
@@ -125,12 +126,15 @@ const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
 void pa_mempool_vacuum(pa_mempool *p);
 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
 bool pa_mempool_is_shared(pa_mempool *p);
+bool pa_mempool_is_remote_writable(pa_mempool *p);
+void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable);
 size_t pa_mempool_block_size_max(pa_mempool *p);
 
 /* For receiving blocks from other nodes */
 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
 void pa_memimport_free(pa_memimport *i);
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+                              size_t offset, size_t size, bool writable);
 int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
 
 /* For sending blocks to other nodes */
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 22ea250..539c4a2 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -45,11 +45,12 @@
 #include "pstream.h"
 
 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
-#define PA_FLAG_SHMDATA    0x80000000LU
-#define PA_FLAG_SHMRELEASE 0x40000000LU
-#define PA_FLAG_SHMREVOKE  0xC0000000LU
-#define PA_FLAG_SHMMASK    0xFF000000LU
-#define PA_FLAG_SEEKMASK   0x000000FFLU
+#define PA_FLAG_SHMDATA     0x80000000LU
+#define PA_FLAG_SHMRELEASE  0x40000000LU
+#define PA_FLAG_SHMREVOKE   0xC0000000LU
+#define PA_FLAG_SHMMASK     0xFF000000LU
+#define PA_FLAG_SEEKMASK    0x000000FFLU
+#define PA_FLAG_SHMWRITABLE 0x00800000LU
 
 /* The sequence descriptor header consists of 5 32bit integers: */
 enum {
@@ -504,10 +505,15 @@ static void prepare_next_write_item(pa_pstream *p) {
             size_t offset, length;
             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
+            pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
+            pa_memexport *current_export;
 
-            pa_assert(p->export);
+            if (p->mempool == current_pool)
+                pa_assert_se(current_export = p->export);
+            else
+                pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
 
-            if (pa_memexport_put(p->export,
+            if (pa_memexport_put(current_export,
                                  p->write.current->chunk.memblock,
                                  &block_id,
                                  &shm_id,
@@ -515,6 +521,8 @@ static void prepare_next_write_item(pa_pstream *p) {
                                  &length) >= 0) {
 
                 flags |= PA_FLAG_SHMDATA;
+                if (pa_mempool_is_remote_writable(current_pool))
+                    flags |= PA_FLAG_SHMWRITABLE;
                 send_payload = false;
 
                 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
@@ -527,6 +535,9 @@ static void prepare_next_write_item(pa_pstream *p) {
             }
 /*             else */
 /*                 pa_log_warn("Failed to export memory block."); */
+
+            if (current_export != p->export)
+                pa_memexport_free(current_export);
         }
 
         if (send_payload) {
@@ -824,8 +835,8 @@ static int do_read(pa_pstream *p) {
                 pa_packet_unref(p->read.packet);
             } else {
                 pa_memblock *b;
-
-                pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+                uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+                pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
 
                 pa_assert(p->import);
 
@@ -833,7 +844,8 @@ static int do_read(pa_pstream *p) {
                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
-                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
+                                          !!(flags & PA_FLAG_SHMWRITABLE)))) {
 
                     if (pa_log_ratelimit(PA_LOG_DEBUG))
                         pa_log_debug("Failed to import memory block.");
-- 
1.9.1



More information about the pulseaudio-discuss mailing list