[pulseaudio-discuss] [PATCH 10/11] pulsecore: pstreams: Introduce memfd blocks support

Ahmed S. Darwish darwish.07 at gmail.com
Sun Sep 20 14:38:05 PDT 2015


Now that we have the necessary infrastructure for memexporting and
mempimporting a memfd memblock, extend that support higher up in the
chain with pstreams.

A PulseAudio endpoint can now transparently send a memfd memblock to
the other end by simply calling pa_pstream_send_memblock().

Some DRY refactorings are needed, but they will be done in their own
commits due to the complexity of the pstreams code.

Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com>
---
 src/pulsecore/memblock.c |   8 +-
 src/pulsecore/memblock.h |   1 +
 src/pulsecore/pstream.c  | 195 +++++++++++++++++++++++++++++++++++++----------
 3 files changed, 158 insertions(+), 46 deletions(-)

diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 04caf34..5f0dcbe 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -561,7 +561,7 @@ pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
     return b->pool;
 }
 
-static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) {
+pa_mem_type_t pa_memblock_get_mem_type(pa_memblock *b) {
     switch (b->type) {
     case PA_MEMBLOCK_IMPORTED:
         pa_assert(b->per_type.imported.segment);
@@ -582,7 +582,7 @@ static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) {
 };
 
 int pa_memblock_get_memfd_fd(pa_memblock *b) {
-    pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+    pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
 
     switch (b->type) {
     case PA_MEMBLOCK_IMPORTED:
@@ -1460,7 +1460,7 @@ int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, ui
 {
     pa_shm *shm = NULL;
     pa_assert(shm_id);
-    pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX);
+    pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX);
 
     if (memexport_put(e, b, block_id, (pa_mem **)&shm, offset, size) < 0)
             return -1;
@@ -1475,7 +1475,7 @@ int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id,
 {
     pa_memfd *memfd = NULL;
     pa_assert(memfd_fd);
-    pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+    pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
 
     if (memexport_put(e, b, block_id, (pa_mem **)&memfd, offset, size) < 0)
             return -1;
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index fc84c17..359669e 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -118,6 +118,7 @@ void pa_memblock_release(pa_memblock *b);
 
 size_t pa_memblock_get_length(pa_memblock *b);
 pa_mempool * pa_memblock_get_pool(pa_memblock *b);
+pa_mem_type_t pa_memblock_get_mem_type(pa_memblock *b);
 int pa_memblock_get_memfd_fd(pa_memblock *b);
 
 pa_memblock *pa_memblock_will_need(pa_memblock *b);
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 6e1963f..a65b0a6 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -46,6 +46,7 @@
 #define PA_FLAG_SHMDATA     0x80000000LU
 #define PA_FLAG_SHMRELEASE  0x40000000LU
 #define PA_FLAG_SHMREVOKE   0xC0000000LU
+#define PA_FLAG_MEMFD_FD    0x01000000LU
 #define PA_FLAG_SHMMASK     0xFF000000LU
 #define PA_FLAG_SEEKMASK    0x000000FFLU
 #define PA_FLAG_SHMWRITABLE 0x00800000LU
@@ -69,6 +70,14 @@ enum {
     PA_PSTREAM_SHM_MAX
 };
 
+/* If we have a memfd block, this info follows the descriptor */
+enum {
+    PA_PSTREAM_MEMFD_BLOCKID,
+    PA_PSTREAM_MEMFD_INDEX,
+    PA_PSTREAM_MEMFD_LENGTH,
+    PA_PSTREAM_MEMFD_MAX
+};
+
 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
 
 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
@@ -85,7 +94,9 @@ PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
 struct item_info {
     enum {
         PA_PSTREAM_ITEM_PACKET,
-        PA_PSTREAM_ITEM_MEMBLOCK,
+        PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX,
+        PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD,
+        PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE,
         PA_PSTREAM_ITEM_SHMRELEASE,
         PA_PSTREAM_ITEM_SHMREVOKE
     } type;
@@ -111,7 +122,10 @@ struct pstream_read {
     pa_pstream_descriptor descriptor;
     pa_memblock *memblock;
     pa_packet *packet;
-    uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+    struct {
+        uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+        uint32_t memfd_info[PA_PSTREAM_MEMFD_MAX];
+    } per_type;
     void *data;
     size_t index;
 };
@@ -278,11 +292,17 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo
     return p;
 }
 
+static bool item_type_is_memblock(struct item_info *i) {
+    return i->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX ||
+           i->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD ||
+           i->type == PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE;
+}
+
 static void item_free(void *item) {
     struct item_info *i = item;
     pa_assert(i);
 
-    if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
+    if (item_type_is_memblock(i)) {
         pa_assert(i->chunk.memblock);
         pa_memblock_unref(i->chunk.memblock);
     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
@@ -376,7 +396,6 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
 
         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
             i = pa_xnew(struct item_info, 1);
-        i->type = PA_PSTREAM_ITEM_MEMBLOCK;
 
         n = PA_MIN(length, bsm);
         i->chunk.index = chunk->index + idx;
@@ -390,6 +409,27 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
         i->with_ancil_data = false;
 #endif
 
+        switch (pa_memblock_get_mem_type(chunk->memblock)) {
+        case PA_MEMORY_SHARED_POSIX:
+            i->type = PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX;
+            break;
+
+        case PA_MEMORY_SHARED_MEMFD:
+#ifdef HAVE_CREDS
+            i->type = PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD;
+#else
+            pa_assert_not_reached();
+#endif
+            break;
+
+        case PA_MEMORY_PRIVATE:
+            i->type = PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE;
+            break;
+
+        default:
+            pa_assert_not_reached();
+        }
+
         pa_queue_push(p->send_queue, i);
 
         idx += n;
@@ -516,8 +556,9 @@ static void prepare_next_write_item(pa_pstream *p) {
     } else {
         uint32_t flags;
         bool send_payload = true;
+        int memexport_result = -1;
 
-        pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
+        pa_assert(item_type_is_memblock(p->write.current));
         pa_assert(p->write.current->chunk.memblock);
 
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
@@ -527,10 +568,6 @@ static void prepare_next_write_item(pa_pstream *p) {
         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
 
         if (p->use_shm) {
-            uint32_t block_id, shm_id;
-            size_t offset, length;
-            uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
-            size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
             pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
             pa_memexport *current_export;
 
@@ -539,28 +576,70 @@ static void prepare_next_write_item(pa_pstream *p) {
             else
                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
 
-            if (pa_memexport_shm_put(current_export,
-                                     p->write.current->chunk.memblock,
-                                     &block_id,
-                                     &shm_id,
-                                     &offset,
-                                     &length) >= 0) {
+            if (p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX) {
+                uint32_t block_id, shm_id;
+                size_t offset, length;
+                uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
+                size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
+
+                memexport_result = pa_memexport_shm_put(current_export,
+                                         p->write.current->chunk.memblock,
+                                         &block_id,
+                                         &shm_id,
+                                         &offset,
+                                         &length);
+
+                if (memexport_result >= 0) {
+                    flags |= PA_FLAG_SHMDATA;
+
+                    shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+                    shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+                    shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+                    shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+                    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
+                    p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
+                }
+
+            } else if (p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD) {
+                uint32_t block_id;
+                int memfd_fd = -1;
+                size_t offset, length;
+                uint32_t *memfd_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
+                size_t memfd_size = sizeof(uint32_t) * PA_PSTREAM_MEMFD_MAX;
+
+                memexport_result = pa_memexport_memfd_put(current_export,
+                                        p->write.current->chunk.memblock,
+                                        &block_id,
+                                        &memfd_fd,
+                                        &offset,
+                                        &length);
+
+                if (memexport_result >= 0) {
+                    flags |= PA_FLAG_MEMFD_FD;
+
+                    memfd_info[PA_PSTREAM_MEMFD_BLOCKID] = htonl(block_id);
+                    memfd_info[PA_PSTREAM_MEMFD_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+                    memfd_info[PA_PSTREAM_MEMFD_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+                    pa_assert(memfd_fd >= 0);
+                    p->write.current->with_ancil_data = true;
+                    p->write.current->ancil_data.creds_valid = false;
+                    p->write.current->ancil_data.nfd = 1;
+                    p->write.current->ancil_data.fds[0] = memfd_fd;
+
+                    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(memfd_size);
+                    p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + memfd_size;
+                }
+            }
 
-                flags |= PA_FLAG_SHMDATA;
+            if (memexport_result >= 0) {
                 if (pa_mempool_is_remote_writable(current_pool))
                     flags |= PA_FLAG_SHMWRITABLE;
                 send_payload = false;
-
-                shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
-                shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
-                shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
-                shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
-
-                p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
-                p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
             }
-/*             else */
-/*                 pa_log_warn("Failed to export memory block."); */
+/*          else */
+/*              pa_log_warn("Failed to export memory block."); */
 
             if (current_export != p->export)
                 pa_memexport_free(current_export);
@@ -833,14 +912,22 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
 
             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
 
-                if (length != sizeof(re->shm_info)) {
+                if (length != sizeof(re->per_type.shm_info)) {
                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
                     return -1;
                 }
 
-                /* Frame is a memblock frame referencing an SHM memblock */
-                re->data = re->shm_info;
+                /* Frame is a memblock frame referencing a posix SHM memblock */
+                re->data = re->per_type.shm_info;
+            } else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) {
+
+                if (length != sizeof(re->per_type.memfd_info)) {
+                    pa_log_warn("Received memfd memblock frame with invalid frame length.");
+                    return -1;
+                }
 
+                /* Frame is a memblock frame referencing a memfd memblock */
+                re->data = &re->per_type.memfd_info;
             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
 
                 /* Frame is a memblock frame */
@@ -874,30 +961,54 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
 
             pa_packet_unref(re->packet);
         } else {
-            pa_memblock *b;
+            pa_memblock *b = NULL;
             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
-            pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
-
             pa_assert(p->import);
 
-            if (!(b = pa_memimport_shm_get(p->import,
-                                           ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
-                                           ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
-                                           ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
-                                           ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
-                                           !!(flags & PA_FLAG_SHMWRITABLE)))) {
-
-                if (pa_log_ratelimit(PA_LOG_DEBUG))
-                    pa_log_debug("Failed to import memory block.");
+            if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+                b = pa_memimport_shm_get(p->import,
+                                         ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_BLOCKID]),
+                                         ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_SHMID]),
+                                         ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_INDEX]),
+                                         ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_LENGTH]),
+                                         !!(flags & PA_FLAG_SHMWRITABLE));
+            } else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) {
+                int memfd_fd;
+                pa_assert(p->read_ancil_data.nfd == 1);
+
+                memfd_fd = p->read_ancil_data.fds[0];
+                pa_assert(memfd_fd >= 0);
+
+                b = pa_memimport_memfd_get(p->import,
+                                           ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_BLOCKID]),
+                                           memfd_fd,
+                                           ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_INDEX]),
+                                           ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_LENGTH]),
+                                           !!(flags & PA_FLAG_SHMWRITABLE));
+            } else {
+                pa_assert_not_reached();
             }
 
+            if (!b && pa_log_ratelimit(PA_LOG_DEBUG))
+                pa_log_debug("Failed to import memory block.");
+
             if (p->receive_memblock_callback) {
                 int64_t offset;
                 pa_memchunk chunk;
 
                 chunk.memblock = b;
                 chunk.index = 0;
-                chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
+
+                if (b) {
+                    chunk.length = pa_memblock_get_length(b);
+                } else {
+                    if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA)
+                        chunk.length = ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_LENGTH]);
+                    else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) {
+                        chunk.length = ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_LENGTH]);
+                    } else
+                        pa_assert_not_reached();
+                }
 
                 offset = (int64_t) (
                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |

-- 
Darwish
http://darwish.chasingpointers.com


More information about the pulseaudio-discuss mailing list