[pulseaudio-discuss] [PATCH v3 07/11] memimport: Support memfd blocks

Ahmed S. Darwish darwish.07 at gmail.com
Sat Mar 12 23:07:27 UTC 2016


To transfer memfd-backed blocks without passing their fd every time,
thus minimizing overhead and avoiding fd leaks, a command is sent
with the memfd fd as ancil data very early on.

This command has an ID that uniquely identifies the memfd region.
Further memfd block references are then exclusively done using this
ID.

This commit implements the details of such 'permanent' mappings on
the receiving end, using memimport segments.

Suggested-by: David Henningsson <david.henningsson at canonical.com>
Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com>
---
 src/pulsecore/memblock.c  | 105 ++++++++++++++++++++++++++++++++++++++++++----
 src/pulsecore/memblock.h  |   9 ++--
 src/pulsecore/pstream.c   |   4 ++
 src/pulsecore/shm.c       |   7 +---
 src/pulsecore/shm.h       |   2 +-
 src/tests/memblock-test.c |   9 ++--
 6 files changed, 113 insertions(+), 23 deletions(-)

diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 49665ee..cdbc75a 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -100,6 +100,28 @@ struct pa_memimport_segment {
     bool writable;
 };
 
+/*
+ * If true, this segment's lifetime will not be limited by the
+ * number of active blocks (seg->n_blocks) using its shared memory.
+ * Rather, it will exist for the full lifetime of the memimport it
+ * is attached to.
+ *
+ * This is done to support memfd blocks transport.
+ *
+ * To transfer memfd-backed blocks without passing their fd every
+ * time, thus minimizing overhead and avoiding fd leaks, a command
+ * is sent with the memfd fd as ancil data very early on.
+ *
+ * This command has an ID that identifies the memfd region. Further
+ * block references are then exclusively done using this ID. On the
+ * receiving end, such logic is enabled by the memimport's segment
+ * hash and 'permanent' segments below.
+ */
+static bool segment_is_permanent(pa_memimport_segment *seg) {
+    pa_assert(seg);
+    return seg->memory.type == PA_MEM_TYPE_SHARED_MEMFD;
+}
+
 /* A collection of multiple segments */
 struct pa_memimport {
     pa_mutex *mutex;
@@ -930,6 +952,13 @@ bool pa_mempool_is_shared(pa_mempool *p) {
 }
 
 /* No lock necessary */
+bool pa_mempool_is_memfd_backed(const pa_mempool *p) {
+    pa_assert(p);
+
+    return (p->memory.type == PA_MEM_TYPE_SHARED_MEMFD);
+}
+
+/* No lock necessary */
 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
     pa_assert(p);
 
@@ -983,15 +1012,17 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
 
 /* Should be called locked */
-static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
+static pa_memimport_segment* segment_attach(pa_memimport *i, pa_mem_type_t type, uint32_t shm_id,
+                                            int memfd_fd, bool writable) {
     pa_memimport_segment* seg;
+    pa_assert(pa_mem_type_is_shared(type));
 
     if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
         return NULL;
 
     seg = pa_xnew0(pa_memimport_segment, 1);
 
-    if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
+    if (pa_shm_attach(&seg->memory, type, shm_id, memfd_fd, writable) < 0) {
         pa_xfree(seg);
         return NULL;
     }
@@ -1007,6 +1038,7 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bo
 /* Should be called locked */
 static void segment_detach(pa_memimport_segment *seg) {
     pa_assert(seg);
+    pa_assert(seg->n_blocks == (segment_is_permanent(seg) ? 1 : 0));
 
     pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
     pa_shm_free(&seg->memory);
@@ -1021,6 +1053,8 @@ static void segment_detach(pa_memimport_segment *seg) {
 void pa_memimport_free(pa_memimport *i) {
     pa_memexport *e;
     pa_memblock *b;
+    pa_memimport_segment *seg;
+    void *state = NULL;
 
     pa_assert(i);
 
@@ -1029,6 +1063,15 @@ void pa_memimport_free(pa_memimport *i) {
     while ((b = pa_hashmap_first(i->blocks)))
         memblock_replace_import(b);
 
+    /* Permanent segments exist for the lifetime of the memimport. Now
+     * that we're freeing the memimport itself, clear them all up.
+     *
+     * Careful! segment_detach() internally removes itself from the
+     * memimport's hash; the same hash we're now using for iteration. */
+    PA_HASHMAP_FOREACH(seg, i->segments, state) {
+        if (segment_is_permanent(seg))
+            segment_detach(seg);
+    }
     pa_assert(pa_hashmap_size(i->segments) == 0);
 
     pa_mutex_unlock(i->mutex);
@@ -1052,13 +1095,45 @@ void pa_memimport_free(pa_memimport *i) {
     pa_xfree(i);
 }
 
+/* Create a new memimport's memfd segment entry, with passed SHM ID
+ * as key and the newly-created segment (with its mmap()-ed memfd
+ * memory region) as its value.
+ *
+ * Note! check comments at 'pa_shm->fd', 'segment_is_permanent()',
+ * and 'pa_pstream_register_memfd_mempool()' for further details. */
+int pa_memimport_attach_memfd(pa_memimport *i, uint32_t shm_id, int memfd_fd, bool writable) {
+    pa_memimport_segment *seg;
+    int ret = -1;
+
+    pa_assert(i);
+    pa_assert(memfd_fd != -1);
+
+    pa_mutex_lock(i->mutex);
+
+    if (!(seg = segment_attach(i, PA_MEM_TYPE_SHARED_MEMFD, shm_id, memfd_fd, writable)))
+        goto finish;
+
+    /* n_blocks acts as a segment reference count. To avoid the segment
+     * being deleted when receiving silent memchunks, etc., mark our
+     * permanent presence by incrementing that refcount. */
+    seg->n_blocks++;
+
+    pa_assert(segment_is_permanent(seg));
+    ret = 0;
+
+finish:
+    pa_mutex_unlock(i->mutex);
+    return ret;
+}
+
 /* Self-locked */
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+pa_memblock* pa_memimport_get(pa_memimport *i, pa_mem_type_t type, uint32_t block_id, uint32_t shm_id,
                               size_t offset, size_t size, bool writable) {
     pa_memblock *b = NULL;
     pa_memimport_segment *seg;
 
     pa_assert(i);
+    pa_assert(pa_mem_type_is_shared(type));
 
     pa_mutex_lock(i->mutex);
 
@@ -1070,12 +1145,20 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
     if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
         goto finish;
 
-    if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
-        if (!(seg = segment_attach(i, shm_id, writable)))
+    if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id)))) {
+        if (type == PA_MEM_TYPE_SHARED_MEMFD) {
+            pa_log("Bailing out! No cached memimport segment for memfd ID %u", shm_id);
+            pa_log("Did the other PA endpoint forget registering its memfd pool?");
             goto finish;
+        }
+
+        pa_assert(type == PA_MEM_TYPE_SHARED_POSIX);
+        if (!(seg = segment_attach(i, type, shm_id, -1, writable)))
+            goto finish;
+    }
 
-    if (writable != seg->writable) {
-        pa_log("Cannot open segment - writable status changed!");
+    if (writable && !seg->writable) {
+        pa_log("Cannot import cached segment in write mode - previously mapped as read-only");
         goto finish;
     }
 
@@ -1268,13 +1351,15 @@ static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
 }
 
 /* Self-locked */
-int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
-    pa_shm *memory;
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, pa_mem_type_t *type, uint32_t *block_id,
+                     uint32_t *shm_id, size_t *offset, size_t * size) {
+    pa_shm  *memory;
     struct memexport_slot *slot;
     void *data;
 
     pa_assert(e);
     pa_assert(b);
+    pa_assert(type);
     pa_assert(block_id);
     pa_assert(shm_id);
     pa_assert(offset);
@@ -1312,12 +1397,14 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
     } else {
         pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
         pa_assert(b->pool);
+        pa_assert(pa_mempool_is_shared(b->pool));
         memory = &b->pool->memory;
     }
 
     pa_assert(data >= memory->ptr);
     pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
 
+    *type = memory->type;
     *shm_id = memory->id;
     *offset = (size_t) ((uint8_t*) data - (uint8_t*) memory->ptr);
     *size = b->length;
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 718235f..de93e3d 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -131,6 +131,7 @@ const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
 void pa_mempool_vacuum(pa_mempool *p);
 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
 bool pa_mempool_is_shared(pa_mempool *p);
+bool pa_mempool_is_memfd_backed(const pa_mempool *p);
 bool pa_mempool_is_remote_writable(pa_mempool *p);
 void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable);
 size_t pa_mempool_block_size_max(pa_mempool *p);
@@ -138,14 +139,16 @@ size_t pa_mempool_block_size_max(pa_mempool *p);
 /* For receiving blocks from other nodes */
 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
 void pa_memimport_free(pa_memimport *i);
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
-                              size_t offset, size_t size, bool writable);
+int pa_memimport_attach_memfd(pa_memimport *i, uint32_t shm_id, int memfd_fd, bool writable);
+pa_memblock* pa_memimport_get(pa_memimport *i, pa_mem_type_t type, uint32_t block_id,
+                              uint32_t shm_id, size_t offset, size_t size, bool writable);
 int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
 
 /* For sending blocks to other nodes */
 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
 void pa_memexport_free(pa_memexport *e);
-int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, pa_mem_type_t *type, uint32_t *block_id,
+                     uint32_t *shm_id, size_t *offset, size_t * size);
 int pa_memexport_process_release(pa_memexport *e, uint32_t id);
 
 #endif
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index fb43a1b..0fb37a0 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -536,6 +536,7 @@ static void prepare_next_write_item(pa_pstream *p) {
         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
 
         if (p->use_shm) {
+            pa_mem_type_t type;
             uint32_t block_id, shm_id;
             size_t offset, length;
             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
@@ -550,10 +551,12 @@ static void prepare_next_write_item(pa_pstream *p) {
 
             if (pa_memexport_put(current_export,
                                  p->write.current->chunk.memblock,
+                                 &type,
                                  &block_id,
                                  &shm_id,
                                  &offset,
                                  &length) >= 0) {
+                pa_assert(type == PA_MEM_TYPE_SHARED_POSIX);
 
                 flags |= PA_FLAG_SHMDATA;
                 if (pa_mempool_is_remote_writable(current_pool))
@@ -891,6 +894,7 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
             pa_assert(p->import);
 
             if (!(b = pa_memimport_get(p->import,
+                                       PA_MEM_TYPE_SHARED_POSIX,
                                        ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
                                        ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
                                        ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
diff --git a/src/pulsecore/shm.c b/src/pulsecore/shm.c
index c2cf8ff..bcf7182 100644
--- a/src/pulsecore/shm.c
+++ b/src/pulsecore/shm.c
@@ -417,15 +417,10 @@ fail:
 }
 
 /* Caller owns passed @memfd_fd and must close it down when appropriate. */
-static int NEW_API_pa_shm_attach(pa_shm *m, pa_mem_type_t type, unsigned id, int memfd_fd, bool writable) {
+int pa_shm_attach(pa_shm *m, pa_mem_type_t type, unsigned id, int memfd_fd, bool writable) {
     return shm_attach(m, type, id, memfd_fd, writable, false);
 }
 
-/* Compatibility version until the new API is used in external sources */
-int pa_shm_attach(pa_shm *m, unsigned id, bool writable) {
-    return NEW_API_pa_shm_attach(m, PA_MEM_TYPE_SHARED_POSIX, id, -1, writable);
-}
-
 int pa_shm_cleanup(void) {
 
 #ifdef HAVE_SHM_OPEN
diff --git a/src/pulsecore/shm.h b/src/pulsecore/shm.h
index 10df9c1..e8bfa56 100644
--- a/src/pulsecore/shm.h
+++ b/src/pulsecore/shm.h
@@ -46,7 +46,7 @@ typedef struct pa_shm {
 } pa_shm;
 
 int pa_shm_create_rw(pa_shm *m, pa_mem_type_t type, size_t size, mode_t mode);
-int pa_shm_attach(pa_shm *m, unsigned id, bool writable);
+int pa_shm_attach(pa_shm *m, pa_mem_type_t type, unsigned id, int memfd_fd, bool writable);
 
 void pa_shm_punch(pa_shm *m, size_t offset, size_t size);
 
diff --git a/src/tests/memblock-test.c b/src/tests/memblock-test.c
index 089648f..04eb377 100644
--- a/src/tests/memblock-test.c
+++ b/src/tests/memblock-test.c
@@ -74,6 +74,7 @@ START_TEST (memblock_test) {
     pa_memblock *mb_a, *mb_b, *mb_c;
     int r, i;
     pa_memblock* blocks[5];
+    pa_mem_type_t mem_type;
     uint32_t id, shm_id;
     size_t offset, size;
     char *x;
@@ -122,22 +123,22 @@ START_TEST (memblock_test) {
         import_c = pa_memimport_new(pool_c, release_cb, (void*) "C");
         fail_unless(import_b != NULL);
 
-        r = pa_memexport_put(export_a, mb_a, &id, &shm_id, &offset, &size);
+        r = pa_memexport_put(export_a, mb_a, &mem_type, &id, &shm_id, &offset, &size);
         fail_unless(r >= 0);
         fail_unless(shm_id == id_a);
 
         pa_log("A: Memory block exported as %u", id);
 
-        mb_b = pa_memimport_get(import_b, id, shm_id, offset, size, false);
+        mb_b = pa_memimport_get(import_b, PA_MEM_TYPE_SHARED_POSIX, id, shm_id, offset, size, false);
         fail_unless(mb_b != NULL);
-        r = pa_memexport_put(export_b, mb_b, &id, &shm_id, &offset, &size);
+        r = pa_memexport_put(export_b, mb_b, &mem_type, &id, &shm_id, &offset, &size);
         fail_unless(r >= 0);
         fail_unless(shm_id == id_a || shm_id == id_b);
         pa_memblock_unref(mb_b);
 
         pa_log("B: Memory block exported as %u", id);
 
-        mb_c = pa_memimport_get(import_c, id, shm_id, offset, size, false);
+        mb_c = pa_memimport_get(import_c, PA_MEM_TYPE_SHARED_POSIX, id, shm_id, offset, size, false);
         fail_unless(mb_c != NULL);
         x = pa_memblock_acquire(mb_c);
         pa_log_debug("1 data=%s", x);
-- 
2.7.2



More information about the pulseaudio-discuss mailing list