[pulseaudio-discuss] [PATCH 09/11] pulsecore: memexport/memimport: Introduce memfd blocks support
Ahmed S. Darwish
darwish.07 at gmail.com
Sun Sep 20 14:36:07 PDT 2015
Introduce support for marshalling and umarshalling a memfd memblock
between different PulseAudio endpoints.
Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com>
---
src/pulsecore/memblock.c | 220 ++++++++++++++++++++++++++++++++++++++++------
src/pulsecore/memblock.h | 8 +-
src/pulsecore/pstream.c | 24 ++---
src/tests/memblock-test.c | 8 +-
4 files changed, 214 insertions(+), 46 deletions(-)
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 6047ead..04caf34 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -97,7 +97,16 @@ struct pa_memblock {
struct pa_memimport_segment {
pa_memimport *import;
- pa_shm memory;
+
+ pa_mem_type_t mem_type;
+ union {
+ pa_mem mem;
+ union {
+ pa_shm shm;
+ pa_memfd memfd;
+ } per_type;
+ };
+
pa_memtrap *trap;
unsigned n_blocks;
bool writable;
@@ -108,7 +117,22 @@ struct pa_memimport {
pa_mutex *mutex;
pa_mempool *pool;
- pa_hashmap *segments;
+ pa_hashmap *shm_segments;
+
+ /* Unlike what is done with Posix SHM segments above, we cannot track
+ * memfd-based memimport segments using our file-descriptor ID as key.
+ * File descriptors are recyclable by nature: the same fd number could
+ * map different memory regions at different points of time.
+ *
+ * Moreover, even if the other endpoint sent us the _very same_ fd
+ * twice over a unix domain socket, the kernel will pass them to us
+ * as different fd numbers.
+ *
+ * Thus only count the number of memfd segments allocated to this
+ * memmimport and assure that this counter is zero upon release.
+ */
+ int n_memfd_segments;
+
pa_hashmap *blocks;
/* Called whenever an imported memory block is no longer
@@ -537,6 +561,46 @@ pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
return b->pool;
}
+static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) {
+ switch (b->type) {
+ case PA_MEMBLOCK_IMPORTED:
+ pa_assert(b->per_type.imported.segment);
+ return b->per_type.imported.segment->mem_type;
+
+ case PA_MEMBLOCK_POOL:
+ case PA_MEMBLOCK_POOL_EXTERNAL:
+ return b->pool->mem_type;
+
+ case PA_MEMBLOCK_APPENDED:
+ case PA_MEMBLOCK_FIXED:
+ case PA_MEMBLOCK_USER:
+ return PA_MEMORY_PRIVATE;
+
+ default:
+ pa_assert_not_reached();
+ }
+};
+
+int pa_memblock_get_memfd_fd(pa_memblock *b) {
+ pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+
+ switch (b->type) {
+ case PA_MEMBLOCK_IMPORTED:
+ pa_assert(b->per_type.imported.segment);
+ return b->per_type.imported.segment->per_type.memfd.fd;
+
+ case PA_MEMBLOCK_POOL:
+ case PA_MEMBLOCK_POOL_EXTERNAL:
+ return b->pool->per_type.memfd.fd;
+
+ case PA_MEMBLOCK_APPENDED:
+ case PA_MEMBLOCK_FIXED:
+ case PA_MEMBLOCK_USER:
+ default:
+ pa_assert_not_reached();
+ }
+}
+
/* No lock necessary */
pa_memblock* pa_memblock_ref(pa_memblock*b) {
pa_assert(b);
@@ -964,7 +1028,8 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
i = pa_xnew(pa_memimport, 1);
i->mutex = pa_mutex_new(true, true);
i->pool = p;
- i->segments = pa_hashmap_new(NULL, NULL);
+ i->shm_segments = pa_hashmap_new(NULL, NULL);
+ i->n_memfd_segments = 0;
i->blocks = pa_hashmap_new(NULL, NULL);
i->release_cb = cb;
i->userdata = userdata;
@@ -978,25 +1043,59 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
+static void segment_late_init(pa_memimport_segment *seg, pa_memimport *i, pa_mem_type_t mem_type, bool writable) {
+ seg->writable = writable;
+ seg->import = i;
+ seg->mem_type = mem_type;
+
+ pa_assert(seg->mem.ptr != NULL);
+ seg->trap = pa_memtrap_add(seg->mem.ptr, seg->mem.size);
+}
+
/* Should be called locked */
-static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
+static pa_memimport_segment* segment_shm_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
pa_memimport_segment* seg;
- if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
+ if ((seg = pa_hashmap_get(i->shm_segments, PA_UINT32_TO_PTR(shm_id))))
+ return seg;
+
+ if (pa_hashmap_size(i->shm_segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
return NULL;
seg = pa_xnew0(pa_memimport_segment, 1);
- if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
+ if (pa_shm_attach(&seg->per_type.shm, shm_id, writable) < 0) {
pa_xfree(seg);
return NULL;
}
- seg->writable = writable;
- seg->import = i;
- seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size);
+ segment_late_init(seg, i, PA_MEMORY_SHARED_POSIX, writable);
+
+ pa_hashmap_put(i->shm_segments, PA_UINT32_TO_PTR(seg->per_type.shm.id), seg);
+ return seg;
+}
+
+/* Should be called locked */
+static pa_memimport_segment* segment_memfd_attach(pa_memimport *i, int fd, bool writable) {
+ pa_memimport_segment* seg;
- pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(seg->memory.id), seg);
+ /* FIXME: Introduce a proper memfd-tracking mechanism. We receive
+ * different fd numbers even for the very same fds passed by the
+ * other endpoint. This can lead to an _unbounded_ increase in
+ * the number of `pa_memimport_segment' allocations created here. */
+/* if (i->n_memfd_segments >= PA_MEMIMPORT_SEGMENTS_MAX)
+ return NULL; */
+
+ seg = pa_xnew0(pa_memimport_segment, 1);
+
+ if (pa_memfd_attach(&seg->per_type.memfd, fd, writable) < 0) {
+ pa_xfree(seg);
+ return NULL;
+ }
+
+ segment_late_init(seg, i, PA_MEMORY_SHARED_MEMFD, writable);
+
+ ++ i->n_memfd_segments;
return seg;
}
@@ -1004,8 +1103,19 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bo
static void segment_detach(pa_memimport_segment *seg) {
pa_assert(seg);
- pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
- pa_shm_free(&seg->memory);
+ switch (seg->mem_type) {
+ case PA_MEMORY_SHARED_POSIX:
+ pa_hashmap_remove(seg->import->shm_segments, PA_UINT32_TO_PTR(seg->per_type.shm.id));
+ pa_shm_free(&seg->per_type.shm);
+ break;
+ case PA_MEMORY_SHARED_MEMFD:
+ pa_assert(seg->import->n_memfd_segments >= 1);
+ -- seg->import->n_memfd_segments;
+ pa_memfd_free(&seg->per_type.memfd);
+ break;
+ default:
+ pa_assert_not_reached();
+ }
if (seg->trap)
pa_memtrap_remove(seg->trap);
@@ -1025,7 +1135,8 @@ void pa_memimport_free(pa_memimport *i) {
while ((b = pa_hashmap_first(i->blocks)))
memblock_replace_import(b);
- pa_assert(pa_hashmap_size(i->segments) == 0);
+ pa_assert(pa_hashmap_size(i->shm_segments) == 0);
+ pa_assert(i->n_memfd_segments == 0);
pa_mutex_unlock(i->mutex);
@@ -1040,7 +1151,7 @@ void pa_memimport_free(pa_memimport *i) {
pa_mutex_unlock(i->pool->mutex);
pa_hashmap_free(i->blocks);
- pa_hashmap_free(i->segments);
+ pa_hashmap_free(i->shm_segments);
pa_mutex_free(i->mutex);
@@ -1048,10 +1159,10 @@ void pa_memimport_free(pa_memimport *i) {
}
/* Self-locked */
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
- size_t offset, size_t size, bool writable) {
+static pa_memblock* pa_memimport_get(pa_memimport *i, pa_mem_type_t type, uint32_t block_id,
+ uint32_t shm_id, int memfd_fd, size_t offset, size_t size, bool writable) {
pa_memblock *b = NULL;
- pa_memimport_segment *seg;
+ pa_memimport_segment *seg = NULL;
pa_assert(i);
@@ -1065,16 +1176,29 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
goto finish;
- if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
- if (!(seg = segment_attach(i, shm_id, writable)))
- goto finish;
+ switch (type) {
+ case PA_MEMORY_SHARED_POSIX:
+ pa_assert(memfd_fd == -1);
+ seg = segment_shm_attach(i, shm_id, writable);
+ break;
+ case PA_MEMORY_SHARED_MEMFD:
+ pa_assert(shm_id == (uint32_t)-1);
+ seg = segment_memfd_attach(i, memfd_fd, writable);
+ break;
+ case PA_MEMORY_PRIVATE:
+ default:
+ pa_assert_not_reached();
+ }
+
+ if (!seg)
+ goto finish;
if (writable != seg->writable) {
pa_log("Cannot open segment - writable status changed!");
goto finish;
}
- if (offset+size > seg->memory.size)
+ if (offset+size > seg->mem.size)
goto finish;
if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
@@ -1085,7 +1209,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
b->type = PA_MEMBLOCK_IMPORTED;
b->read_only = !writable;
b->is_silence = false;
- pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
+ pa_atomic_ptr_store(&b->data, (uint8_t*) seg->mem.ptr + offset);
b->length = size;
pa_atomic_store(&b->n_acquired, 0);
pa_atomic_store(&b->please_signal, 0);
@@ -1104,6 +1228,16 @@ finish:
return b;
}
+pa_memblock *pa_memimport_shm_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+ size_t offset, size_t size, bool writable) {
+ return pa_memimport_get(i, PA_MEMORY_SHARED_POSIX, block_id, shm_id, -1, offset, size, writable);
+}
+
+pa_memblock *pa_memimport_memfd_get(pa_memimport *i, uint32_t block_id, int fd,
+ size_t offset, size_t size, bool writable) {
+ return pa_memimport_get(i, PA_MEMORY_SHARED_MEMFD, block_id, -1, fd, offset, size, writable);
+}
+
int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
pa_memblock *b;
int ret = 0;
@@ -1260,15 +1394,15 @@ static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
}
/* Self-locked */
-int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
- pa_shm *memory;
+static int memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, pa_mem **memory_ptr, size_t *offset, size_t *size) {
struct memexport_slot *slot;
+ pa_mem *memory;
void *data;
pa_assert(e);
pa_assert(b);
pa_assert(block_id);
- pa_assert(shm_id);
+ pa_assert(memory_ptr);
pa_assert(offset);
pa_assert(size);
pa_assert(b->pool == e->pool);
@@ -1300,17 +1434,17 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
if (b->type == PA_MEMBLOCK_IMPORTED) {
pa_assert(b->per_type.imported.segment);
- memory = &b->per_type.imported.segment->memory;
+ *memory_ptr = &b->per_type.imported.segment->mem;
} else {
pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
pa_assert(b->pool);
- memory = &b->pool->per_type.shm;
+ *memory_ptr = &b->pool->mem;
}
+ memory = *memory_ptr;
pa_assert(data >= memory->ptr);
pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
- *shm_id = memory->id;
*offset = (size_t) ((uint8_t*) data - (uint8_t*) memory->ptr);
*size = b->length;
@@ -1321,3 +1455,33 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
return 0;
}
+
+int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size)
+{
+ pa_shm *shm = NULL;
+ pa_assert(shm_id);
+ pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX);
+
+ if (memexport_put(e, b, block_id, (pa_mem **)&shm, offset, size) < 0)
+ return -1;
+
+ pa_assert(shm);
+ *shm_id = shm->id;
+
+ return 0;
+}
+
+int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, int *memfd_fd, size_t *offset, size_t * size)
+{
+ pa_memfd *memfd = NULL;
+ pa_assert(memfd_fd);
+ pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+
+ if (memexport_put(e, b, block_id, (pa_mem **)&memfd, offset, size) < 0)
+ return -1;
+
+ pa_assert(memfd);
+ *memfd_fd = memfd->fd;
+
+ return 0;
+}
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 184ba55..fc84c17 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -118,6 +118,7 @@ void pa_memblock_release(pa_memblock *b);
size_t pa_memblock_get_length(pa_memblock *b);
pa_mempool * pa_memblock_get_pool(pa_memblock *b);
+int pa_memblock_get_memfd_fd(pa_memblock *b);
pa_memblock *pa_memblock_will_need(pa_memblock *b);
@@ -135,14 +136,17 @@ size_t pa_mempool_block_size_max(pa_mempool *p);
/* For receiving blocks from other nodes */
pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
void pa_memimport_free(pa_memimport *i);
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+pa_memblock *pa_memimport_shm_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
size_t offset, size_t size, bool writable);
+pa_memblock *pa_memimport_memfd_get(pa_memimport *i, uint32_t block_id, int fd, size_t offset,
+ size_t size, bool writable);
int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
/* For sending blocks to other nodes */
pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
void pa_memexport_free(pa_memexport *e);
-int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size);
+int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, int *memfd_fd, size_t *offset, size_t * size);
int pa_memexport_process_release(pa_memexport *e, uint32_t id);
#endif
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 8c14fbb..6e1963f 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -539,12 +539,12 @@ static void prepare_next_write_item(pa_pstream *p) {
else
pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
- if (pa_memexport_put(current_export,
- p->write.current->chunk.memblock,
- &block_id,
- &shm_id,
- &offset,
- &length) >= 0) {
+ if (pa_memexport_shm_put(current_export,
+ p->write.current->chunk.memblock,
+ &block_id,
+ &shm_id,
+ &offset,
+ &length) >= 0) {
flags |= PA_FLAG_SHMDATA;
if (pa_mempool_is_remote_writable(current_pool))
@@ -880,12 +880,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
pa_assert(p->import);
- if (!(b = pa_memimport_get(p->import,
- ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
- !!(flags & PA_FLAG_SHMWRITABLE)))) {
+ if (!(b = pa_memimport_shm_get(p->import,
+ ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
+ !!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
pa_log_debug("Failed to import memory block.");
diff --git a/src/tests/memblock-test.c b/src/tests/memblock-test.c
index e4c9d0a..d1cda87 100644
--- a/src/tests/memblock-test.c
+++ b/src/tests/memblock-test.c
@@ -122,22 +122,22 @@ START_TEST (memblock_test) {
import_c = pa_memimport_new(pool_c, release_cb, (void*) "C");
fail_unless(import_b != NULL);
- r = pa_memexport_put(export_a, mb_a, &id, &shm_id, &offset, &size);
+ r = pa_memexport_shm_put(export_a, mb_a, &id, &shm_id, &offset, &size);
fail_unless(r >= 0);
fail_unless(shm_id == id_a);
pa_log("A: Memory block exported as %u", id);
- mb_b = pa_memimport_get(import_b, id, shm_id, offset, size, false);
+ mb_b = pa_memimport_shm_get(import_b, id, shm_id, offset, size, false);
fail_unless(mb_b != NULL);
- r = pa_memexport_put(export_b, mb_b, &id, &shm_id, &offset, &size);
+ r = pa_memexport_shm_put(export_b, mb_b, &id, &shm_id, &offset, &size);
fail_unless(r >= 0);
fail_unless(shm_id == id_a || shm_id == id_b);
pa_memblock_unref(mb_b);
pa_log("B: Memory block exported as %u", id);
- mb_c = pa_memimport_get(import_c, id, shm_id, offset, size, false);
+ mb_c = pa_memimport_shm_get(import_c, id, shm_id, offset, size, false);
fail_unless(mb_c != NULL);
x = pa_memblock_acquire(mb_c);
pa_log_debug("1 data=%s", x);
--
Darwish
http://darwish.chasingpointers.com
More information about the pulseaudio-discuss
mailing list