[pulseaudio-discuss] [PATCH v2 09/12] pulsecore: pstream: Support memfd blocks transport

Ahmed S. Darwish darwish.07 at gmail.com
Fri Feb 12 00:19:19 UTC 2016


Now that we have the necessary infrastructure for memexporting and
mempimporting a memfd memblock, extend that support higher up in the
chain with pstreams.

A PulseAudio endpoint can now _transparently_ send a memfd memblock
to the other end by simply calling pa_pstream_send_memblock().

If the pipe does not support memfd trannsfers, we fall back to
sending the full block's data instead of just its reference.

# Further details:

A single pstream connection usually transfers blocks from multiple
pools including the server's srbchannel mempool, the client's audio
data mempool, and the server's shared core mempool.

If these mempools are memfd-backed, we now require registering them
with the pstream before sending any blocks they cover. This is done
to minimize fd passing overhead and the possibility of fd leaks.

Moreover, to support all these pools without hard-coding their number
(or nature) in the Pulse communication protocol, a new pstream packet
type is introduced. That special packet can be sent _anytime_ during
the pstrem's lifetime and is used for creating on demand SHM ID to
memfd mappings.

Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com>
---
 src/pulsecore/pstream.c | 257 +++++++++++++++++++++++++++++++++++++++++++-----
 src/pulsecore/pstream.h |   2 +
 2 files changed, 233 insertions(+), 26 deletions(-)

diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index ef2bbf9..11ea7f3 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -38,16 +38,20 @@
 #include <pulsecore/creds.h>
 #include <pulsecore/refcnt.h>
 #include <pulsecore/flist.h>
+#include <pulsecore/hashmap.h>
 #include <pulsecore/macro.h>
 
 #include "pstream.h"
 
 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
 #define PA_FLAG_SHMDATA     0x80000000LU
+#define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
 #define PA_FLAG_SHMRELEASE  0x40000000LU
 #define PA_FLAG_SHMREVOKE   0xC0000000LU
 #define PA_FLAG_SHMMASK     0xFF000000LU
 #define PA_FLAG_SEEKMASK    0x000000FFLU
+#define PA_FLAG_PACKET_MASK 0x00000F00LU
+#define PA_FLAG_PACKET_SHMID_TO_MEMFD_FD    0x00000100LU
 #define PA_FLAG_SHMWRITABLE 0x00800000LU
 
 /* The sequence descriptor header consists of 5 32bit integers: */
@@ -92,6 +96,9 @@ struct item_info {
 
     /* packet info */
     pa_packet *packet;
+    bool shmid_to_memfd_packet:1;
+    bool close_memfd_fd_after_send:1;
+
 #ifdef HAVE_CREDS
     bool with_ancil_data;
     pa_cmsg_ancil_data ancil_data;
@@ -147,6 +154,10 @@ struct pa_pstream {
     pa_memimport *import;
     pa_memexport *export;
 
+    /* This pipe supports sending memfd fd as ancillary data */
+    bool use_memfd;
+    pa_hashmap *registered_memfd_ids;
+
     pa_pstream_packet_cb_t receive_packet_callback;
     void *receive_packet_callback_userdata;
 
@@ -331,7 +342,8 @@ static void pstream_free(pa_pstream *p) {
     pa_xfree(p);
 }
 
-void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data) {
+static int send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data,
+                        bool shmid_to_memfd_packet, bool close_fd_after_send) {
     struct item_info *i;
 
     pa_assert(p);
@@ -339,13 +351,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil
     pa_assert(packet);
 
     if (p->dead)
-        return;
+        return -1;
 
     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
         i = pa_xnew(struct item_info, 1);
 
     i->type = PA_PSTREAM_ITEM_PACKET;
     i->packet = pa_packet_ref(packet);
+    i->shmid_to_memfd_packet = shmid_to_memfd_packet;
+    i->close_memfd_fd_after_send = close_fd_after_send;
 
 #ifdef HAVE_CREDS
     if ((i->with_ancil_data = !!ancil_data)) {
@@ -360,6 +374,120 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil
     pa_queue_push(p->send_queue, i);
 
     p->mainloop->defer_enable(p->defer_event, 1);
+    return 0;
+}
+
+void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data) {
+    send_packet(p, packet, ancil_data, false, false);
+}
+
+static int send_shmid_to_memfd_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data,
+                                      bool close_fd_after_send) {
+    return send_packet(p, packet, ancil_data, true, close_fd_after_send);
+}
+
+/* Before sending any blocks from a certain memfd-backed pool over the
+ * pipe, we must call this method early on.
+ *
+ * We require so so we can transfer memfd blocks without passing their
+ * file descriptor every time, thus minimizing overhead and the
+ * possibility of fd leaks.
+ *
+ * Upon registration, a packet is sent with the memfd fd as ancil data
+ * to the other end. This packet has an ID that identifies the memfd
+ * region. Once the other end receives that packet, it creates a
+ * permanent mapping between that ID and the passed memfd region.
+ *
+ * Doing so, further transfers don't need to reference the memfd fd at
+ * all, just its ID. Thus, we can quickly close the file descriptor on
+ * both sides afterwards. */
+int pa_pstream_register_memfd_mempool(pa_pstream *p, pa_mempool *pool, const char **fail_reason) {
+#ifdef HAVE_CREDS
+    unsigned shm_id;
+    int memfd_fd, err = -1;
+    pa_packet *packet;
+    pa_cmsg_ancil_data ancil;
+    bool global_mempool;
+
+    pa_assert(p);
+    pa_assert(PA_REFCNT_VALUE(p) > 0);
+    pa_assert(fail_reason);
+
+    *fail_reason = NULL;
+    global_mempool = pa_mempool_is_global(pool);
+
+    if (!pa_mempool_is_shared(pool)) {
+        *fail_reason = "mempool not shared";
+        return err;
+    }
+
+    if (!pa_mempool_is_memfd_backed(pool)) {
+        *fail_reason = "mempool is not memfd-backed";
+        return err;
+    }
+
+    if (!p->use_memfd) {
+        *fail_reason = "pipe does not support memfd transport";
+        return err;
+    }
+
+    if (pa_mempool_get_shm_id(pool, &shm_id)) {
+        *fail_reason = "could not extract pool SHM ID";
+        return err;
+    }
+
+    if (pa_hashmap_get(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id)) != NULL) {
+        *fail_reason = "previously registered memfd SHM ID";
+        return err;
+    }
+
+    memfd_fd = (global_mempool) ? pa_global_mempool_get_shm_memfd_fd(pool) :
+                                  pa_mempool_get_and_reset_shm_memfd_fd(pool);
+
+    /* Note! For per-client mempools we've taken ownership of the memfd
+     * fd, and we're thus the sole code path responsible for closing it.
+     * In case of any failure, it MUST be closed. */
+
+    pa_assert_se(packet = pa_packet_new_data(&shm_id, sizeof(shm_id)));
+    ancil.creds_valid = false;
+    ancil.nfd = 1;
+    ancil.fds[0] = memfd_fd;
+
+    if (send_shmid_to_memfd_packet(p, packet, &ancil, !global_mempool)) {
+        pa_packet_unref(packet);
+        if (!global_mempool)
+            pa_assert_se(pa_close(memfd_fd) == 0);
+        return err;
+    }
+
+    /* OK, packet scheduled to be sent. Upon actual sending, our cleanup
+     * hook `packet_sent_callback` will do the rest.. */
+
+    pa_packet_unref(packet);
+    pa_hashmap_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), PA_UINT32_TO_PTR(1u));
+#endif
+    return 0;
+}
+
+/* Cleanup hook after successfully sending a packet. Mainly used to close
+ * the memfd file descriptor after sending special SHM-ID<->memfd packets.
+ *
+ * Check pa_pstream_register_memfd_mempool() above. */
+static void packet_sent_callback(struct item_info *item, struct pa_cmsg_ancil_data *sent_ancil_data) {
+    pa_assert(item);
+    pa_assert(item->type == PA_PSTREAM_ITEM_PACKET);
+
+    if (!item->shmid_to_memfd_packet)
+        return;
+
+    pa_assert(sent_ancil_data);
+    pa_assert(item->ancil_data.nfd == 1);
+    pa_assert(item->ancil_data.nfd == sent_ancil_data->nfd);
+    pa_assert(item->ancil_data.fds[0] != -1);
+    pa_assert(item->ancil_data.fds[0] == sent_ancil_data->fds[0]);
+
+    if (item->close_memfd_fd_after_send)
+        pa_assert_se(pa_close(sent_ancil_data->fds[0]) == 0);
 }
 
 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
@@ -507,6 +635,9 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
 
+        if (p->write.current->shmid_to_memfd_packet)
+            p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_PACKET_SHMID_TO_MEMFD_FD);
+
         if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
@@ -524,7 +655,7 @@ static void prepare_next_write_item(pa_pstream *p) {
 
     } else {
         uint32_t flags;
-        bool send_payload = true;
+        bool send_memblock_payload = true;
 
         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
         pa_assert(p->write.current->chunk.memblock);
@@ -556,20 +687,35 @@ static void prepare_next_write_item(pa_pstream *p) {
                                  &shm_id,
                                  &offset,
                                  &length) >= 0) {
-                pa_assert(type == PA_MEM_TYPE_SHARED_POSIX);
 
-                flags |= PA_FLAG_SHMDATA;
-                if (pa_mempool_is_remote_writable(current_pool))
-                    flags |= PA_FLAG_SHMWRITABLE;
-                send_payload = false;
+                if (type == PA_MEM_TYPE_SHARED_POSIX)
+                    send_memblock_payload = false;
 
-                shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
-                shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
-                shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
-                shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+                if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
+                    if (pa_hashmap_get(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id))) {
+                        send_memblock_payload = false;
+                        flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
+                    } else {
+                        if (pa_log_ratelimit(PA_LOG_ERROR)) {
+                            pa_log("Cannot send memblock backed by an unknown memfd region (ID = %u)", shm_id);
+                            pa_log("Fallig back to fully sending memblock data over socket");
+                        }
+                    }
+                }
 
-                p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
-                p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
+                if (!send_memblock_payload) {
+                    flags |= PA_FLAG_SHMDATA;
+                    if (pa_mempool_is_remote_writable(current_pool))
+                        flags |= PA_FLAG_SHMWRITABLE;
+
+                    shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+                    shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+                    shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+                    shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+                    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
+                    p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
+                }
             }
 /*             else */
 /*                 pa_log_warn("Failed to export memory block."); */
@@ -578,7 +724,7 @@ static void prepare_next_write_item(pa_pstream *p) {
                 pa_memexport_free(current_export);
         }
 
-        if (send_payload) {
+        if (send_memblock_payload) {
             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
             p->write.memchunk = p->write.current->chunk;
             pa_memblock_ref(p->write.memchunk.memblock);
@@ -672,6 +818,10 @@ static int do_write(pa_pstream *p) {
 
     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
         pa_assert(p->write.current);
+
+        if (p->write.current->type == PA_PSTREAM_ITEM_PACKET)
+            packet_sent_callback(p->write.current, &p->write_ancil_data);
+
         item_free(p->write.current);
         p->write.current = NULL;
 
@@ -718,6 +868,49 @@ static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
         p->receive_memblock_callback_userdata);
 }
 
+/* Finished reading a packet */
+static void packet_complete(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil, bool shmid_to_memfd_packet) {
+    int memfd_fd;
+    unsigned shm_id;
+    size_t packet_len;
+
+    if (shmid_to_memfd_packet) {
+        if (!p->use_memfd) {
+            pa_log("Received a memfd block reference over a pipe that does not support memfds");
+            return;
+        }
+
+        if (ancil->nfd != 1) {
+            pa_log("Expected 1 memfd fd to be received over pipe. got %d", ancil->nfd);
+            (ancil->nfd == 0) ? pa_log("Did we reach our open file descriptors limit?") : ((void)0);
+            return;
+        }
+
+        memfd_fd = ancil->fds[0];
+        if (memfd_fd == -1) {
+            pa_log("Received invalid memfd fd = -1");
+            return;
+        }
+
+        shm_id = *(unsigned *)pa_packet_data(packet, &packet_len);
+        pa_assert(packet_len == sizeof(shm_id));
+
+        if (pa_memimport_add_permanent_shmid_to_memfd_mapping(p->import, shm_id, memfd_fd, true)) {
+            pa_log("Failed to create permanent mapping for received memfd region, with ID = %u", shm_id);
+            pa_assert_se(pa_close(memfd_fd) == 0);
+            return;
+        }
+
+        pa_assert_se(pa_close(memfd_fd) == 0);
+    } else if (p->receive_packet_callback) {
+#ifdef HAVE_CREDS
+        p->receive_packet_callback(p, packet, ancil, p->receive_packet_callback_userdata);
+#else
+        p->receive_packet_callback(p, packet, NULL, p->receive_packet_callback_userdata);
+#endif
+    }
+}
+
 static int do_read(pa_pstream *p, struct pstream_read *re) {
     void *d;
     size_t l;
@@ -827,8 +1020,8 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
         if (channel == (uint32_t) -1) {
             size_t plen;
 
-            if (flags != 0) {
-                pa_log_warn("Received packet frame with invalid flags value.");
+            if ((flags & ~PA_FLAG_PACKET_MASK) != 0) {
+                pa_log_warn("Received packet frame with invalid flags value = 0x%xu", flags);
                 return -1;
             }
 
@@ -843,7 +1036,7 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
                 return -1;
             }
 
-            if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+            if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
 
                 if (length != sizeof(re->shm_info)) {
                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
@@ -876,24 +1069,24 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
             pa_memblock_unref(re->memblock);
 
         } else if (re->packet) {
+            uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+            bool shmid_to_memfd_packet = (flags & PA_FLAG_PACKET_SHMID_TO_MEMFD_FD);
 
-            if (p->receive_packet_callback)
-#ifdef HAVE_CREDS
-                p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
-#else
-                p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
-#endif
+            packet_complete(p, re->packet, &p->read_ancil_data, shmid_to_memfd_packet);
 
             pa_packet_unref(re->packet);
         } else {
             pa_memblock *b;
+            pa_mem_type_t type;
             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
-            pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+            pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
 
             pa_assert(p->import);
 
+            type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ? PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
+
             if (!(b = pa_memimport_get(p->import,
-                                       PA_MEM_TYPE_SHARED_POSIX,
+                                       type,
                                        ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
                                        ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
                                        ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
@@ -1089,6 +1282,18 @@ void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
     }
 }
 
+void pa_pstream_enable_memfd(pa_pstream *p) {
+    pa_assert(p);
+    pa_assert(PA_REFCNT_VALUE(p) > 0);
+    pa_assert(p->use_shm);
+
+    p->use_memfd = true;
+
+    if (!p->registered_memfd_ids) {
+        p->registered_memfd_ids = pa_hashmap_new(NULL, NULL);
+    }
+}
+
 bool pa_pstream_get_shm(pa_pstream *p) {
     pa_assert(p);
     pa_assert(PA_REFCNT_VALUE(p) > 0);
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index f4e1462..a49c40f 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -49,6 +49,7 @@ void pa_pstream_unref(pa_pstream*p);
 void pa_pstream_unlink(pa_pstream *p);
 
 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data);
+int pa_pstream_register_memfd_mempool(pa_pstream *p, pa_mempool *pool, const char **fail_reason);
 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk);
 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id);
 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id);
@@ -63,6 +64,7 @@ void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb,
 bool pa_pstream_is_pending(pa_pstream *p);
 
 void pa_pstream_enable_shm(pa_pstream *p, bool enable);
+void pa_pstream_enable_memfd(pa_pstream *p);
 bool pa_pstream_get_shm(pa_pstream *p);
 
 /* Enables shared ringbuffer channel. Note that the srbchannel is now owned by the pstream.

Regards,

-- 
Darwish
http://darwish.chasingpointers.com


More information about the pulseaudio-discuss mailing list