[pulseaudio-discuss] [PATCH 06/11] memblock, pstream: Allow send/receive of remote writable memblocks
David Henningsson
david.henningsson at canonical.com
Tue Apr 29 06:22:20 PDT 2014
The shared ringbuffer memblock must be writable by both sides.
This makes it possible to send such a memblock over a pstream without
the "both sides writable" information getting lost.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
src/pulsecore/memblock.c | 40 +++++++++++++++++++++++++++++++++++-----
src/pulsecore/memblock.h | 6 +++++-
src/pulsecore/pstream.c | 32 ++++++++++++++++++++++----------
3 files changed, 62 insertions(+), 16 deletions(-)
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 8da0fcd..5ef2aa9 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -97,6 +97,7 @@ struct pa_memimport_segment {
pa_shm memory;
pa_memtrap *trap;
unsigned n_blocks;
+ bool writable;
};
/* A collection of multiple segments */
@@ -146,6 +147,7 @@ struct pa_mempool {
pa_shm memory;
size_t block_size;
unsigned n_blocks;
+ bool is_remote_writable;
pa_atomic_t n_init;
@@ -303,6 +305,19 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
}
/* No lock necessary */
+bool pa_mempool_is_remote_writable(pa_mempool *p) {
+ pa_assert(p);
+ return p->is_remote_writable;
+}
+
+/* No lock necessary */
+void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable) {
+ pa_assert(p);
+ pa_assert(!writable || pa_mempool_is_shared(p));
+ p->is_remote_writable = writable;
+}
+
+/* No lock necessary */
pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
pa_memblock *b = NULL;
struct mempool_slot *slot;
@@ -416,6 +431,14 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free
}
/* No lock necessary */
+bool pa_memblock_is_ours(pa_memblock *b) {
+ pa_assert(b);
+ pa_assert(PA_REFCNT_VALUE(b) > 0);
+
+ return b->type != PA_MEMBLOCK_IMPORTED;
+}
+
+/* No lock necessary */
bool pa_memblock_is_read_only(pa_memblock *b) {
pa_assert(b);
pa_assert(PA_REFCNT_VALUE(b) > 0);
@@ -905,7 +928,7 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
/* Should be called locked */
-static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
pa_memimport_segment* seg;
if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
@@ -913,11 +936,12 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
seg = pa_xnew0(pa_memimport_segment, 1);
- if (pa_shm_attach(&seg->memory, shm_id, false) < 0) {
+ if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
pa_xfree(seg);
return NULL;
}
+ seg->writable = writable;
seg->import = i;
seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size);
@@ -973,7 +997,8 @@ void pa_memimport_free(pa_memimport *i) {
}
/* Self-locked */
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+ size_t offset, size_t size, bool writable) {
pa_memblock *b = NULL;
pa_memimport_segment *seg;
@@ -990,9 +1015,14 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
goto finish;
if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
- if (!(seg = segment_attach(i, shm_id)))
+ if (!(seg = segment_attach(i, shm_id, writable)))
goto finish;
+ if (writable != seg->writable) {
+ pa_log("Cannot open segment - writable status changed!");
+ goto finish;
+ }
+
if (offset+size > seg->memory.size)
goto finish;
@@ -1002,7 +1032,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
PA_REFCNT_INIT(b);
b->pool = i->pool;
b->type = PA_MEMBLOCK_IMPORTED;
- b->read_only = true;
+ b->read_only = !writable;
b->is_silence = false;
pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
b->length = size;
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 502f207..d60f3c3 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -104,6 +104,7 @@ function is not multiple caller safe, i.e. needs to be locked
manually if called from more than one thread at the same time. */
void pa_memblock_unref_fixed(pa_memblock*b);
+bool pa_memblock_is_ours(pa_memblock *b);
bool pa_memblock_is_read_only(pa_memblock *b);
bool pa_memblock_is_silence(pa_memblock *b);
bool pa_memblock_ref_is_one(pa_memblock *b);
@@ -125,12 +126,15 @@ const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
void pa_mempool_vacuum(pa_mempool *p);
int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
bool pa_mempool_is_shared(pa_mempool *p);
+bool pa_mempool_is_remote_writable(pa_mempool *p);
+void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable);
size_t pa_mempool_block_size_max(pa_mempool *p);
/* For receiving blocks from other nodes */
pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
void pa_memimport_free(pa_memimport *i);
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+ size_t offset, size_t size, bool writable);
int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
/* For sending blocks to other nodes */
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 22ea250..539c4a2 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -45,11 +45,12 @@
#include "pstream.h"
/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
-#define PA_FLAG_SHMDATA 0x80000000LU
-#define PA_FLAG_SHMRELEASE 0x40000000LU
-#define PA_FLAG_SHMREVOKE 0xC0000000LU
-#define PA_FLAG_SHMMASK 0xFF000000LU
-#define PA_FLAG_SEEKMASK 0x000000FFLU
+#define PA_FLAG_SHMDATA 0x80000000LU
+#define PA_FLAG_SHMRELEASE 0x40000000LU
+#define PA_FLAG_SHMREVOKE 0xC0000000LU
+#define PA_FLAG_SHMMASK 0xFF000000LU
+#define PA_FLAG_SEEKMASK 0x000000FFLU
+#define PA_FLAG_SHMWRITABLE 0x00800000LU
/* The sequence descriptor header consists of 5 32bit integers: */
enum {
@@ -504,10 +505,15 @@ static void prepare_next_write_item(pa_pstream *p) {
size_t offset, length;
uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
+ pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
+ pa_memexport *current_export;
- pa_assert(p->export);
+ if (p->mempool == current_pool)
+ pa_assert_se(current_export = p->export);
+ else
+ pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
- if (pa_memexport_put(p->export,
+ if (pa_memexport_put(current_export,
p->write.current->chunk.memblock,
&block_id,
&shm_id,
@@ -515,6 +521,8 @@ static void prepare_next_write_item(pa_pstream *p) {
&length) >= 0) {
flags |= PA_FLAG_SHMDATA;
+ if (pa_mempool_is_remote_writable(current_pool))
+ flags |= PA_FLAG_SHMWRITABLE;
send_payload = false;
shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
@@ -527,6 +535,9 @@ static void prepare_next_write_item(pa_pstream *p) {
}
/* else */
/* pa_log_warn("Failed to export memory block."); */
+
+ if (current_export != p->export)
+ pa_memexport_free(current_export);
}
if (send_payload) {
@@ -824,8 +835,8 @@ static int do_read(pa_pstream *p) {
pa_packet_unref(p->read.packet);
} else {
pa_memblock *b;
-
- pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+ uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
pa_assert(p->import);
@@ -833,7 +844,8 @@ static int do_read(pa_pstream *p) {
ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
+ !!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
pa_log_debug("Failed to import memory block.");
--
1.9.1
More information about the pulseaudio-discuss
mailing list