[pulseaudio-discuss] [PATCH 09/11] pulsecore: memexport/memimport: Introduce memfd blocks support

Ahmed S. Darwish darwish.07 at gmail.com
Sun Sep 20 14:36:07 PDT 2015


Introduce support for marshalling and umarshalling a memfd memblock
between different PulseAudio endpoints.

Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com>
---
 src/pulsecore/memblock.c  | 220 ++++++++++++++++++++++++++++++++++++++++------
 src/pulsecore/memblock.h  |   8 +-
 src/pulsecore/pstream.c   |  24 ++---
 src/tests/memblock-test.c |   8 +-
 4 files changed, 214 insertions(+), 46 deletions(-)

diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 6047ead..04caf34 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -97,7 +97,16 @@ struct pa_memblock {
 
 struct pa_memimport_segment {
     pa_memimport *import;
-    pa_shm memory;
+
+    pa_mem_type_t mem_type;
+    union {
+        pa_mem mem;
+        union {
+            pa_shm shm;
+            pa_memfd memfd;
+        } per_type;
+    };
+
     pa_memtrap *trap;
     unsigned n_blocks;
     bool writable;
@@ -108,7 +117,22 @@ struct pa_memimport {
     pa_mutex *mutex;
 
     pa_mempool *pool;
-    pa_hashmap *segments;
+    pa_hashmap *shm_segments;
+
+    /* Unlike what is done with Posix SHM segments above, we cannot track
+     * memfd-based memimport segments using our file-descriptor ID as key.
+     * File descriptors are recyclable by nature: the same fd number could
+     * map different memory regions at different points of time.
+     *
+     * Moreover, even if the other endpoint sent us the _very same_ fd
+     * twice over a unix domain socket, the kernel will pass them to us
+     * as different fd numbers.
+     *
+     * Thus only count the number of memfd segments allocated to this
+     * memmimport and assure that this counter is zero upon release.
+     */
+    int n_memfd_segments;
+
     pa_hashmap *blocks;
 
     /* Called whenever an imported memory block is no longer
@@ -537,6 +561,46 @@ pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
     return b->pool;
 }
 
+static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) {
+    switch (b->type) {
+    case PA_MEMBLOCK_IMPORTED:
+        pa_assert(b->per_type.imported.segment);
+        return b->per_type.imported.segment->mem_type;
+
+    case PA_MEMBLOCK_POOL:
+    case PA_MEMBLOCK_POOL_EXTERNAL:
+        return b->pool->mem_type;
+
+    case PA_MEMBLOCK_APPENDED:
+    case PA_MEMBLOCK_FIXED:
+    case PA_MEMBLOCK_USER:
+        return PA_MEMORY_PRIVATE;
+
+    default:
+        pa_assert_not_reached();
+    }
+};
+
+int pa_memblock_get_memfd_fd(pa_memblock *b) {
+    pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+
+    switch (b->type) {
+    case PA_MEMBLOCK_IMPORTED:
+        pa_assert(b->per_type.imported.segment);
+        return b->per_type.imported.segment->per_type.memfd.fd;
+
+    case PA_MEMBLOCK_POOL:
+    case PA_MEMBLOCK_POOL_EXTERNAL:
+        return b->pool->per_type.memfd.fd;
+
+    case PA_MEMBLOCK_APPENDED:
+    case PA_MEMBLOCK_FIXED:
+    case PA_MEMBLOCK_USER:
+    default:
+        pa_assert_not_reached();
+    }
+}
+
 /* No lock necessary */
 pa_memblock* pa_memblock_ref(pa_memblock*b) {
     pa_assert(b);
@@ -964,7 +1028,8 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
     i = pa_xnew(pa_memimport, 1);
     i->mutex = pa_mutex_new(true, true);
     i->pool = p;
-    i->segments = pa_hashmap_new(NULL, NULL);
+    i->shm_segments = pa_hashmap_new(NULL, NULL);
+    i->n_memfd_segments = 0;
     i->blocks = pa_hashmap_new(NULL, NULL);
     i->release_cb = cb;
     i->userdata = userdata;
@@ -978,25 +1043,59 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
 
 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
 
+static void segment_late_init(pa_memimport_segment *seg, pa_memimport *i, pa_mem_type_t mem_type, bool writable) {
+    seg->writable = writable;
+    seg->import = i;
+    seg->mem_type = mem_type;
+
+    pa_assert(seg->mem.ptr != NULL);
+    seg->trap = pa_memtrap_add(seg->mem.ptr, seg->mem.size);
+}
+
 /* Should be called locked */
-static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
+static pa_memimport_segment* segment_shm_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
     pa_memimport_segment* seg;
 
-    if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
+    if ((seg = pa_hashmap_get(i->shm_segments, PA_UINT32_TO_PTR(shm_id))))
+        return seg;
+
+    if (pa_hashmap_size(i->shm_segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
         return NULL;
 
     seg = pa_xnew0(pa_memimport_segment, 1);
 
-    if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
+    if (pa_shm_attach(&seg->per_type.shm, shm_id, writable) < 0) {
         pa_xfree(seg);
         return NULL;
     }
 
-    seg->writable = writable;
-    seg->import = i;
-    seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size);
+    segment_late_init(seg, i, PA_MEMORY_SHARED_POSIX, writable);
+
+    pa_hashmap_put(i->shm_segments, PA_UINT32_TO_PTR(seg->per_type.shm.id), seg);
+    return seg;
+}
+
+/* Should be called locked */
+static pa_memimport_segment* segment_memfd_attach(pa_memimport *i, int fd, bool writable) {
+    pa_memimport_segment* seg;
 
-    pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(seg->memory.id), seg);
+    /* FIXME: Introduce a proper memfd-tracking mechanism. We receive
+     * different fd numbers even for the very same fds passed by the
+     * other endpoint. This can lead to an _unbounded_ increase in
+     * the number of `pa_memimport_segment' allocations created here. */
+/*  if (i->n_memfd_segments >= PA_MEMIMPORT_SEGMENTS_MAX)
+        return NULL; */
+
+    seg = pa_xnew0(pa_memimport_segment, 1);
+
+    if (pa_memfd_attach(&seg->per_type.memfd, fd, writable) < 0) {
+        pa_xfree(seg);
+        return NULL;
+    }
+
+    segment_late_init(seg, i, PA_MEMORY_SHARED_MEMFD, writable);
+
+    ++ i->n_memfd_segments;
     return seg;
 }
 
@@ -1004,8 +1103,19 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bo
 static void segment_detach(pa_memimport_segment *seg) {
     pa_assert(seg);
 
-    pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
-    pa_shm_free(&seg->memory);
+    switch (seg->mem_type) {
+    case PA_MEMORY_SHARED_POSIX:
+        pa_hashmap_remove(seg->import->shm_segments, PA_UINT32_TO_PTR(seg->per_type.shm.id));
+        pa_shm_free(&seg->per_type.shm);
+        break;
+    case PA_MEMORY_SHARED_MEMFD:
+        pa_assert(seg->import->n_memfd_segments >= 1);
+        -- seg->import->n_memfd_segments;
+        pa_memfd_free(&seg->per_type.memfd);
+        break;
+    default:
+        pa_assert_not_reached();
+    }
 
     if (seg->trap)
         pa_memtrap_remove(seg->trap);
@@ -1025,7 +1135,8 @@ void pa_memimport_free(pa_memimport *i) {
     while ((b = pa_hashmap_first(i->blocks)))
         memblock_replace_import(b);
 
-    pa_assert(pa_hashmap_size(i->segments) == 0);
+    pa_assert(pa_hashmap_size(i->shm_segments) == 0);
+    pa_assert(i->n_memfd_segments == 0);
 
     pa_mutex_unlock(i->mutex);
 
@@ -1040,7 +1151,7 @@ void pa_memimport_free(pa_memimport *i) {
     pa_mutex_unlock(i->pool->mutex);
 
     pa_hashmap_free(i->blocks);
-    pa_hashmap_free(i->segments);
+    pa_hashmap_free(i->shm_segments);
 
     pa_mutex_free(i->mutex);
 
@@ -1048,10 +1159,10 @@ void pa_memimport_free(pa_memimport *i) {
 }
 
 /* Self-locked */
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
-                              size_t offset, size_t size, bool writable) {
+static pa_memblock* pa_memimport_get(pa_memimport *i, pa_mem_type_t type, uint32_t block_id,
+                                     uint32_t shm_id, int memfd_fd, size_t offset, size_t size, bool writable) {
     pa_memblock *b = NULL;
-    pa_memimport_segment *seg;
+    pa_memimport_segment *seg = NULL;
 
     pa_assert(i);
 
@@ -1065,16 +1176,29 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
     if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
         goto finish;
 
-    if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
-        if (!(seg = segment_attach(i, shm_id, writable)))
-            goto finish;
+    switch (type) {
+    case PA_MEMORY_SHARED_POSIX:
+        pa_assert(memfd_fd == -1);
+        seg = segment_shm_attach(i, shm_id, writable);
+        break;
+    case PA_MEMORY_SHARED_MEMFD:
+        pa_assert(shm_id == (uint32_t)-1);
+        seg = segment_memfd_attach(i, memfd_fd, writable);
+        break;
+    case PA_MEMORY_PRIVATE:
+    default:
+        pa_assert_not_reached();
+    }
+
+    if (!seg)
+        goto finish;
 
     if (writable != seg->writable) {
         pa_log("Cannot open segment - writable status changed!");
         goto finish;
     }
 
-    if (offset+size > seg->memory.size)
+    if (offset+size > seg->mem.size)
         goto finish;
 
     if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
@@ -1085,7 +1209,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
     b->type = PA_MEMBLOCK_IMPORTED;
     b->read_only = !writable;
     b->is_silence = false;
-    pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
+    pa_atomic_ptr_store(&b->data, (uint8_t*) seg->mem.ptr + offset);
     b->length = size;
     pa_atomic_store(&b->n_acquired, 0);
     pa_atomic_store(&b->please_signal, 0);
@@ -1104,6 +1228,16 @@ finish:
     return b;
 }
 
+pa_memblock *pa_memimport_shm_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+                                  size_t offset, size_t size, bool writable) {
+    return pa_memimport_get(i, PA_MEMORY_SHARED_POSIX, block_id, shm_id, -1, offset, size, writable);
+}
+
+pa_memblock *pa_memimport_memfd_get(pa_memimport *i, uint32_t block_id, int fd,
+                                    size_t offset, size_t size, bool writable) {
+    return pa_memimport_get(i, PA_MEMORY_SHARED_MEMFD, block_id, -1, fd, offset, size, writable);
+}
+
 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
     pa_memblock *b;
     int ret = 0;
@@ -1260,15 +1394,15 @@ static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
 }
 
 /* Self-locked */
-int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
-    pa_shm *memory;
+static int memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, pa_mem **memory_ptr, size_t *offset, size_t *size) {
     struct memexport_slot *slot;
+    pa_mem *memory;
     void *data;
 
     pa_assert(e);
     pa_assert(b);
     pa_assert(block_id);
-    pa_assert(shm_id);
+    pa_assert(memory_ptr);
     pa_assert(offset);
     pa_assert(size);
     pa_assert(b->pool == e->pool);
@@ -1300,17 +1434,17 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
 
     if (b->type == PA_MEMBLOCK_IMPORTED) {
         pa_assert(b->per_type.imported.segment);
-        memory = &b->per_type.imported.segment->memory;
+        *memory_ptr = &b->per_type.imported.segment->mem;
     } else {
         pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
         pa_assert(b->pool);
-        memory = &b->pool->per_type.shm;
+        *memory_ptr = &b->pool->mem;
     }
 
+    memory = *memory_ptr;
     pa_assert(data >= memory->ptr);
     pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
 
-    *shm_id = memory->id;
     *offset = (size_t) ((uint8_t*) data - (uint8_t*) memory->ptr);
     *size = b->length;
 
@@ -1321,3 +1455,33 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
 
     return 0;
 }
+
+int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size)
+{
+    pa_shm *shm = NULL;
+    pa_assert(shm_id);
+    pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX);
+
+    if (memexport_put(e, b, block_id, (pa_mem **)&shm, offset, size) < 0)
+            return -1;
+
+    pa_assert(shm);
+    *shm_id = shm->id;
+
+    return 0;
+}
+
+int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, int *memfd_fd, size_t *offset, size_t * size)
+{
+    pa_memfd *memfd = NULL;
+    pa_assert(memfd_fd);
+    pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+
+    if (memexport_put(e, b, block_id, (pa_mem **)&memfd, offset, size) < 0)
+            return -1;
+
+    pa_assert(memfd);
+    *memfd_fd = memfd->fd;
+
+    return 0;
+}
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 184ba55..fc84c17 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -118,6 +118,7 @@ void pa_memblock_release(pa_memblock *b);
 
 size_t pa_memblock_get_length(pa_memblock *b);
 pa_mempool * pa_memblock_get_pool(pa_memblock *b);
+int pa_memblock_get_memfd_fd(pa_memblock *b);
 
 pa_memblock *pa_memblock_will_need(pa_memblock *b);
 
@@ -135,14 +136,17 @@ size_t pa_mempool_block_size_max(pa_mempool *p);
 /* For receiving blocks from other nodes */
 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
 void pa_memimport_free(pa_memimport *i);
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+pa_memblock *pa_memimport_shm_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
                               size_t offset, size_t size, bool writable);
+pa_memblock *pa_memimport_memfd_get(pa_memimport *i, uint32_t block_id, int fd, size_t offset,
+        size_t size, bool writable);
 int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
 
 /* For sending blocks to other nodes */
 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
 void pa_memexport_free(pa_memexport *e);
-int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size);
+int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, int *memfd_fd, size_t *offset, size_t * size);
 int pa_memexport_process_release(pa_memexport *e, uint32_t id);
 
 #endif
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 8c14fbb..6e1963f 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -539,12 +539,12 @@ static void prepare_next_write_item(pa_pstream *p) {
             else
                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
 
-            if (pa_memexport_put(current_export,
-                                 p->write.current->chunk.memblock,
-                                 &block_id,
-                                 &shm_id,
-                                 &offset,
-                                 &length) >= 0) {
+            if (pa_memexport_shm_put(current_export,
+                                     p->write.current->chunk.memblock,
+                                     &block_id,
+                                     &shm_id,
+                                     &offset,
+                                     &length) >= 0) {
 
                 flags |= PA_FLAG_SHMDATA;
                 if (pa_mempool_is_remote_writable(current_pool))
@@ -880,12 +880,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
 
             pa_assert(p->import);
 
-            if (!(b = pa_memimport_get(p->import,
-                                       ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
-                                       ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
-                                       ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
-                                       ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
-                                       !!(flags & PA_FLAG_SHMWRITABLE)))) {
+            if (!(b = pa_memimport_shm_get(p->import,
+                                           ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
+                                           ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
+                                           ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
+                                           ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
+                                           !!(flags & PA_FLAG_SHMWRITABLE)))) {
 
                 if (pa_log_ratelimit(PA_LOG_DEBUG))
                     pa_log_debug("Failed to import memory block.");
diff --git a/src/tests/memblock-test.c b/src/tests/memblock-test.c
index e4c9d0a..d1cda87 100644
--- a/src/tests/memblock-test.c
+++ b/src/tests/memblock-test.c
@@ -122,22 +122,22 @@ START_TEST (memblock_test) {
         import_c = pa_memimport_new(pool_c, release_cb, (void*) "C");
         fail_unless(import_b != NULL);
 
-        r = pa_memexport_put(export_a, mb_a, &id, &shm_id, &offset, &size);
+        r = pa_memexport_shm_put(export_a, mb_a, &id, &shm_id, &offset, &size);
         fail_unless(r >= 0);
         fail_unless(shm_id == id_a);
 
         pa_log("A: Memory block exported as %u", id);
 
-        mb_b = pa_memimport_get(import_b, id, shm_id, offset, size, false);
+        mb_b = pa_memimport_shm_get(import_b, id, shm_id, offset, size, false);
         fail_unless(mb_b != NULL);
-        r = pa_memexport_put(export_b, mb_b, &id, &shm_id, &offset, &size);
+        r = pa_memexport_shm_put(export_b, mb_b, &id, &shm_id, &offset, &size);
         fail_unless(r >= 0);
         fail_unless(shm_id == id_a || shm_id == id_b);
         pa_memblock_unref(mb_b);
 
         pa_log("B: Memory block exported as %u", id);
 
-        mb_c = pa_memimport_get(import_c, id, shm_id, offset, size, false);
+        mb_c = pa_memimport_shm_get(import_c, id, shm_id, offset, size, false);
         fail_unless(mb_c != NULL);
         x = pa_memblock_acquire(mb_c);
         pa_log_debug("1 data=%s", x);

-- 
Darwish
http://darwish.chasingpointers.com


More information about the pulseaudio-discuss mailing list