[pulseaudio-discuss] [PATCH 10/11] pulsecore: pstreams: Introduce memfd blocks support
Ahmed S. Darwish
darwish.07 at gmail.com
Sun Sep 20 14:38:05 PDT 2015
Now that we have the necessary infrastructure for memexporting and
mempimporting a memfd memblock, extend that support higher up in the
chain with pstreams.
A PulseAudio endpoint can now transparently send a memfd memblock to
the other end by simply calling pa_pstream_send_memblock().
Some DRY refactorings are needed, but they will be done in their own
commits due to the complexity of the pstreams code.
Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com>
---
src/pulsecore/memblock.c | 8 +-
src/pulsecore/memblock.h | 1 +
src/pulsecore/pstream.c | 195 +++++++++++++++++++++++++++++++++++++----------
3 files changed, 158 insertions(+), 46 deletions(-)
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 04caf34..5f0dcbe 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -561,7 +561,7 @@ pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
return b->pool;
}
-static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) {
+pa_mem_type_t pa_memblock_get_mem_type(pa_memblock *b) {
switch (b->type) {
case PA_MEMBLOCK_IMPORTED:
pa_assert(b->per_type.imported.segment);
@@ -582,7 +582,7 @@ static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) {
};
int pa_memblock_get_memfd_fd(pa_memblock *b) {
- pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+ pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
switch (b->type) {
case PA_MEMBLOCK_IMPORTED:
@@ -1460,7 +1460,7 @@ int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, ui
{
pa_shm *shm = NULL;
pa_assert(shm_id);
- pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX);
+ pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX);
if (memexport_put(e, b, block_id, (pa_mem **)&shm, offset, size) < 0)
return -1;
@@ -1475,7 +1475,7 @@ int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id,
{
pa_memfd *memfd = NULL;
pa_assert(memfd_fd);
- pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
+ pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD);
if (memexport_put(e, b, block_id, (pa_mem **)&memfd, offset, size) < 0)
return -1;
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index fc84c17..359669e 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -118,6 +118,7 @@ void pa_memblock_release(pa_memblock *b);
size_t pa_memblock_get_length(pa_memblock *b);
pa_mempool * pa_memblock_get_pool(pa_memblock *b);
+pa_mem_type_t pa_memblock_get_mem_type(pa_memblock *b);
int pa_memblock_get_memfd_fd(pa_memblock *b);
pa_memblock *pa_memblock_will_need(pa_memblock *b);
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 6e1963f..a65b0a6 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -46,6 +46,7 @@
#define PA_FLAG_SHMDATA 0x80000000LU
#define PA_FLAG_SHMRELEASE 0x40000000LU
#define PA_FLAG_SHMREVOKE 0xC0000000LU
+#define PA_FLAG_MEMFD_FD 0x01000000LU
#define PA_FLAG_SHMMASK 0xFF000000LU
#define PA_FLAG_SEEKMASK 0x000000FFLU
#define PA_FLAG_SHMWRITABLE 0x00800000LU
@@ -69,6 +70,14 @@ enum {
PA_PSTREAM_SHM_MAX
};
+/* If we have a memfd block, this info follows the descriptor */
+enum {
+ PA_PSTREAM_MEMFD_BLOCKID,
+ PA_PSTREAM_MEMFD_INDEX,
+ PA_PSTREAM_MEMFD_LENGTH,
+ PA_PSTREAM_MEMFD_MAX
+};
+
typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
#define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
@@ -85,7 +94,9 @@ PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
struct item_info {
enum {
PA_PSTREAM_ITEM_PACKET,
- PA_PSTREAM_ITEM_MEMBLOCK,
+ PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX,
+ PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD,
+ PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE,
PA_PSTREAM_ITEM_SHMRELEASE,
PA_PSTREAM_ITEM_SHMREVOKE
} type;
@@ -111,7 +122,10 @@ struct pstream_read {
pa_pstream_descriptor descriptor;
pa_memblock *memblock;
pa_packet *packet;
- uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+ struct {
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+ uint32_t memfd_info[PA_PSTREAM_MEMFD_MAX];
+ } per_type;
void *data;
size_t index;
};
@@ -278,11 +292,17 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo
return p;
}
+static bool item_type_is_memblock(struct item_info *i) {
+ return i->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX ||
+ i->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD ||
+ i->type == PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE;
+}
+
static void item_free(void *item) {
struct item_info *i = item;
pa_assert(i);
- if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
+ if (item_type_is_memblock(i)) {
pa_assert(i->chunk.memblock);
pa_memblock_unref(i->chunk.memblock);
} else if (i->type == PA_PSTREAM_ITEM_PACKET) {
@@ -376,7 +396,6 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
i = pa_xnew(struct item_info, 1);
- i->type = PA_PSTREAM_ITEM_MEMBLOCK;
n = PA_MIN(length, bsm);
i->chunk.index = chunk->index + idx;
@@ -390,6 +409,27 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
i->with_ancil_data = false;
#endif
+ switch (pa_memblock_get_mem_type(chunk->memblock)) {
+ case PA_MEMORY_SHARED_POSIX:
+ i->type = PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX;
+ break;
+
+ case PA_MEMORY_SHARED_MEMFD:
+#ifdef HAVE_CREDS
+ i->type = PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD;
+#else
+ pa_assert_not_reached();
+#endif
+ break;
+
+ case PA_MEMORY_PRIVATE:
+ i->type = PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE;
+ break;
+
+ default:
+ pa_assert_not_reached();
+ }
+
pa_queue_push(p->send_queue, i);
idx += n;
@@ -516,8 +556,9 @@ static void prepare_next_write_item(pa_pstream *p) {
} else {
uint32_t flags;
bool send_payload = true;
+ int memexport_result = -1;
- pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
+ pa_assert(item_type_is_memblock(p->write.current));
pa_assert(p->write.current->chunk.memblock);
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
@@ -527,10 +568,6 @@ static void prepare_next_write_item(pa_pstream *p) {
flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
if (p->use_shm) {
- uint32_t block_id, shm_id;
- size_t offset, length;
- uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
- size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
pa_memexport *current_export;
@@ -539,28 +576,70 @@ static void prepare_next_write_item(pa_pstream *p) {
else
pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
- if (pa_memexport_shm_put(current_export,
- p->write.current->chunk.memblock,
- &block_id,
- &shm_id,
- &offset,
- &length) >= 0) {
+ if (p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX) {
+ uint32_t block_id, shm_id;
+ size_t offset, length;
+ uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
+ size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
+
+ memexport_result = pa_memexport_shm_put(current_export,
+ p->write.current->chunk.memblock,
+ &block_id,
+ &shm_id,
+ &offset,
+ &length);
+
+ if (memexport_result >= 0) {
+ flags |= PA_FLAG_SHMDATA;
+
+ shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+ shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+ shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+ shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
+ p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
+ }
+
+ } else if (p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD) {
+ uint32_t block_id;
+ int memfd_fd = -1;
+ size_t offset, length;
+ uint32_t *memfd_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
+ size_t memfd_size = sizeof(uint32_t) * PA_PSTREAM_MEMFD_MAX;
+
+ memexport_result = pa_memexport_memfd_put(current_export,
+ p->write.current->chunk.memblock,
+ &block_id,
+ &memfd_fd,
+ &offset,
+ &length);
+
+ if (memexport_result >= 0) {
+ flags |= PA_FLAG_MEMFD_FD;
+
+ memfd_info[PA_PSTREAM_MEMFD_BLOCKID] = htonl(block_id);
+ memfd_info[PA_PSTREAM_MEMFD_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+ memfd_info[PA_PSTREAM_MEMFD_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+ pa_assert(memfd_fd >= 0);
+ p->write.current->with_ancil_data = true;
+ p->write.current->ancil_data.creds_valid = false;
+ p->write.current->ancil_data.nfd = 1;
+ p->write.current->ancil_data.fds[0] = memfd_fd;
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(memfd_size);
+ p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + memfd_size;
+ }
+ }
- flags |= PA_FLAG_SHMDATA;
+ if (memexport_result >= 0) {
if (pa_mempool_is_remote_writable(current_pool))
flags |= PA_FLAG_SHMWRITABLE;
send_payload = false;
-
- shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
- shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
- shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
- shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
-
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
- p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
}
-/* else */
-/* pa_log_warn("Failed to export memory block."); */
+/* else */
+/* pa_log_warn("Failed to export memory block."); */
if (current_export != p->export)
pa_memexport_free(current_export);
@@ -833,14 +912,22 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
- if (length != sizeof(re->shm_info)) {
+ if (length != sizeof(re->per_type.shm_info)) {
pa_log_warn("Received SHM memblock frame with invalid frame length.");
return -1;
}
- /* Frame is a memblock frame referencing an SHM memblock */
- re->data = re->shm_info;
+ /* Frame is a memblock frame referencing a posix SHM memblock */
+ re->data = re->per_type.shm_info;
+ } else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) {
+
+ if (length != sizeof(re->per_type.memfd_info)) {
+ pa_log_warn("Received memfd memblock frame with invalid frame length.");
+ return -1;
+ }
+ /* Frame is a memblock frame referencing a memfd memblock */
+ re->data = &re->per_type.memfd_info;
} else if ((flags & PA_FLAG_SHMMASK) == 0) {
/* Frame is a memblock frame */
@@ -874,30 +961,54 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
pa_packet_unref(re->packet);
} else {
- pa_memblock *b;
+ pa_memblock *b = NULL;
uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
- pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
-
pa_assert(p->import);
- if (!(b = pa_memimport_shm_get(p->import,
- ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
- !!(flags & PA_FLAG_SHMWRITABLE)))) {
-
- if (pa_log_ratelimit(PA_LOG_DEBUG))
- pa_log_debug("Failed to import memory block.");
+ if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+ b = pa_memimport_shm_get(p->import,
+ ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_LENGTH]),
+ !!(flags & PA_FLAG_SHMWRITABLE));
+ } else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) {
+ int memfd_fd;
+ pa_assert(p->read_ancil_data.nfd == 1);
+
+ memfd_fd = p->read_ancil_data.fds[0];
+ pa_assert(memfd_fd >= 0);
+
+ b = pa_memimport_memfd_get(p->import,
+ ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_BLOCKID]),
+ memfd_fd,
+ ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_INDEX]),
+ ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_LENGTH]),
+ !!(flags & PA_FLAG_SHMWRITABLE));
+ } else {
+ pa_assert_not_reached();
}
+ if (!b && pa_log_ratelimit(PA_LOG_DEBUG))
+ pa_log_debug("Failed to import memory block.");
+
if (p->receive_memblock_callback) {
int64_t offset;
pa_memchunk chunk;
chunk.memblock = b;
chunk.index = 0;
- chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
+
+ if (b) {
+ chunk.length = pa_memblock_get_length(b);
+ } else {
+ if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA)
+ chunk.length = ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_LENGTH]);
+ else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) {
+ chunk.length = ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_LENGTH]);
+ } else
+ pa_assert_not_reached();
+ }
offset = (int64_t) (
(((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
--
Darwish
http://darwish.chasingpointers.com
More information about the pulseaudio-discuss
mailing list