[pulseaudio-discuss] [PATCH 09/12] pstream: Allow reading/writing through srbchannel
David Henningsson
david.henningsson at canonical.com
Fri May 30 04:59:28 PDT 2014
For writing, we prefer writing through the srbchannel if one is available,
and we have no ancil data to send.
For reading, we support reading from both in parallel. This meant replicating
a struct used for reading, so a lot of this patch is just a search/replace in
do_read to use the appropriate channel for reading.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
---
src/pulsecore/pstream.c | 227 ++++++++++++++++++++++++++++++++----------------
src/pulsecore/pstream.h | 5 ++
2 files changed, 155 insertions(+), 77 deletions(-)
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 539c4a2..505a5e0 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -109,12 +109,23 @@ struct item_info {
uint32_t block_id;
};
+struct pstream_read {
+ pa_pstream_descriptor descriptor;
+ pa_memblock *memblock;
+ pa_packet *packet;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+ void *data;
+ size_t index;
+};
+
struct pa_pstream {
PA_REFCNT_DECLARE;
pa_mainloop_api *mainloop;
pa_defer_event *defer_event;
pa_iochannel *io;
+ pa_srbchannel *sr, *srpending;
+ bool is_srpending;
pa_queue *send_queue;
@@ -132,14 +143,7 @@ struct pa_pstream {
pa_memchunk memchunk;
} write;
- struct {
- pa_pstream_descriptor descriptor;
- pa_memblock *memblock;
- pa_packet *packet;
- uint32_t shm_info[PA_PSTREAM_SHM_MAX];
- void *data;
- size_t index;
- } read;
+ struct pstream_read readio, readsr;
bool use_shm;
pa_memimport *import;
@@ -172,7 +176,7 @@ struct pa_pstream {
};
static int do_write(pa_pstream *p);
-static int do_read(pa_pstream *p);
+static int do_read(pa_pstream *p, struct pstream_read *re);
static void do_pstream_read_write(pa_pstream *p) {
pa_assert(p);
@@ -182,8 +186,13 @@ static void do_pstream_read_write(pa_pstream *p) {
p->mainloop->defer_enable(p->defer_event, 0);
+ if (!p->dead && p->sr) {
+ do_write(p);
+ while (!p->dead && do_read(p, &p->readsr) == 0);
+ }
+
if (!p->dead && pa_iochannel_is_readable(p->io)) {
- if (do_read(p) < 0)
+ if (do_read(p, &p->readio) < 0)
goto fail;
} else if (!p->dead && pa_iochannel_is_hungup(p->io))
goto fail;
@@ -208,6 +217,17 @@ fail:
pa_pstream_unref(p);
}
+static bool sr_callback(pa_srbchannel *sr, void *userdata) {
+ pa_pstream *p = userdata;
+
+ pa_assert(p);
+ pa_assert(PA_REFCNT_VALUE(p) > 0);
+ pa_assert(p->sr == sr);
+
+ do_pstream_read_write(p);
+ return p->sr != NULL;
+}
+
static void io_callback(pa_iochannel*io, void *userdata) {
pa_pstream *p = userdata;
@@ -289,11 +309,17 @@ static void pstream_free(pa_pstream *p) {
if (p->write.memchunk.memblock)
pa_memblock_unref(p->write.memchunk.memblock);
- if (p->read.memblock)
- pa_memblock_unref(p->read.memblock);
+ if (p->readsr.memblock)
+ pa_memblock_unref(p->readsr.memblock);
- if (p->read.packet)
- pa_packet_unref(p->read.packet);
+ if (p->readsr.packet)
+ pa_packet_unref(p->readsr.packet);
+
+ if (p->readio.memblock)
+ pa_memblock_unref(p->readio.memblock);
+
+ if (p->readio.packet)
+ pa_packet_unref(p->readio.packet);
pa_xfree(p);
}
@@ -556,6 +582,20 @@ static void prepare_next_write_item(pa_pstream *p) {
#endif
}
+static void check_srpending(pa_pstream *p) {
+ if (!p->is_srpending)
+ return;
+
+ if (p->sr)
+ pa_srbchannel_free(p->sr);
+
+ p->sr = p->srpending;
+ p->is_srpending = false;
+
+ if (p->sr)
+ pa_srbchannel_set_callback(p->sr, sr_callback, p);
+}
+
static int do_write(pa_pstream *p) {
void *d;
size_t l;
@@ -568,8 +608,11 @@ static int do_write(pa_pstream *p) {
if (!p->write.current)
prepare_next_write_item(p);
- if (!p->write.current)
+ if (!p->write.current) {
+ /* The out queue is empty, so switching channels is safe */
+ check_srpending(p);
return 0;
+ }
if (p->write.minibuf_validsize > 0) {
d = p->write.minibuf + p->write.index;
@@ -606,8 +649,9 @@ static int do_write(pa_pstream *p) {
p->send_ancil_now = false;
} else
#endif
-
- if ((r = pa_iochannel_write(p->io, d, l)) < 0)
+ if (p->sr)
+ r = pa_srbchannel_write(p->sr, d, l);
+ else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
goto fail;
if (release_memblock)
@@ -639,7 +683,7 @@ fail:
return -1;
}
-static int do_read(pa_pstream *p) {
+static int do_read(pa_pstream *p, struct pstream_read *re) {
void *d;
size_t l;
ssize_t r;
@@ -647,23 +691,29 @@ static int do_read(pa_pstream *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
- if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
- d = (uint8_t*) p->read.descriptor + p->read.index;
- l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
+ if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
+ d = (uint8_t*) re->descriptor + re->index;
+ l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
} else {
- pa_assert(p->read.data || p->read.memblock);
+ pa_assert(re->data || re->memblock);
- if (p->read.data)
- d = p->read.data;
+ if (re->data)
+ d = re->data;
else {
- d = pa_memblock_acquire(p->read.memblock);
- release_memblock = p->read.memblock;
+ d = pa_memblock_acquire(re->memblock);
+ release_memblock = re->memblock;
}
- d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
- l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
+ d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
+ l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
+ if (re == &p->readsr) {
+ r = pa_srbchannel_read(p->sr, d, l);
+ if (r == 0)
+ return 1;
+ }
+ else
#ifdef HAVE_CREDS
{
pa_ancil b;
@@ -689,13 +739,13 @@ static int do_read(pa_pstream *p) {
if (release_memblock)
pa_memblock_release(release_memblock);
- p->read.index += (size_t) r;
+ re->index += (size_t) r;
- if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+ if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
uint32_t flags, length, channel;
/* Reading of frame descriptor complete */
- flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
@@ -706,10 +756,10 @@ static int do_read(pa_pstream *p) {
/* This is a SHM memblock release frame with no payload */
-/* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
pa_assert(p->export);
- pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+ pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
goto frame_done;
@@ -717,24 +767,24 @@ static int do_read(pa_pstream *p) {
/* This is a SHM memblock revoke frame with no payload */
-/* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
pa_assert(p->import);
- pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+ pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
goto frame_done;
}
- length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+ length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
return -1;
}
- pa_assert(!p->read.packet && !p->read.memblock);
+ pa_assert(!re->packet && !re->memblock);
- channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+ channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
if (channel == (uint32_t) -1) {
@@ -744,8 +794,8 @@ static int do_read(pa_pstream *p) {
}
/* Frame is a packet frame */
- p->read.packet = pa_packet_new(length);
- p->read.data = p->read.packet->data;
+ re->packet = pa_packet_new(length);
+ re->data = re->packet->data;
} else {
@@ -756,20 +806,20 @@ static int do_read(pa_pstream *p) {
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
- if (length != sizeof(p->read.shm_info)) {
+ if (length != sizeof(re->shm_info)) {
pa_log_warn("Received SHM memblock frame with invalid frame length.");
return -1;
}
/* Frame is a memblock frame referencing an SHM memblock */
- p->read.data = p->read.shm_info;
+ re->data = re->shm_info;
} else if ((flags & PA_FLAG_SHMMASK) == 0) {
/* Frame is a memblock frame */
- p->read.memblock = pa_memblock_new(p->mempool, length);
- p->read.data = NULL;
+ re->memblock = pa_memblock_new(p->mempool, length);
+ re->data = NULL;
} else {
pa_log_warn("Received memblock frame with invalid flags value.");
@@ -777,74 +827,74 @@ static int do_read(pa_pstream *p) {
}
}
- } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
+ } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
/* Frame payload available */
- if (p->read.memblock && p->receive_memblock_callback) {
+ if (re->memblock && p->receive_memblock_callback) {
/* Is this memblock data? Than pass it to the user */
- l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
+ l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
if (l > 0) {
pa_memchunk chunk;
- chunk.memblock = p->read.memblock;
- chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
+ chunk.memblock = re->memblock;
+ chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
chunk.length = l;
if (p->receive_memblock_callback) {
int64_t offset;
offset = (int64_t) (
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
p->receive_memblock_callback(
p,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->receive_memblock_callback_userdata);
}
/* Drop seek info for following callbacks */
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
}
}
/* Frame complete */
- if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+ if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
- if (p->read.memblock) {
+ if (re->memblock) {
/* This was a memblock frame. We can unref the memblock now */
- pa_memblock_unref(p->read.memblock);
+ pa_memblock_unref(re->memblock);
- } else if (p->read.packet) {
+ } else if (re->packet) {
if (p->receive_packet_callback)
#ifdef HAVE_CREDS
- p->receive_packet_callback(p, p->read.packet, &p->read_ancil, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata);
#else
- p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
#endif
- pa_packet_unref(p->read.packet);
+ pa_packet_unref(re->packet);
} else {
pa_memblock *b;
- uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
pa_assert(p->import);
if (!(b = pa_memimport_get(p->import,
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
!!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
@@ -857,17 +907,17 @@ static int do_read(pa_pstream *p) {
chunk.memblock = b;
chunk.index = 0;
- chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
+ chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
offset = (int64_t) (
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
p->receive_memblock_callback(
p,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->receive_memblock_callback_userdata);
}
@@ -883,10 +933,10 @@ static int do_read(pa_pstream *p) {
return 0;
frame_done:
- p->read.memblock = NULL;
- p->read.packet = NULL;
- p->read.index = 0;
- p->read.data = NULL;
+ re->memblock = NULL;
+ re->packet = NULL;
+ re->index = 0;
+ re->data = NULL;
#ifdef HAVE_CREDS
p->read_ancil.creds_valid = false;
@@ -988,6 +1038,9 @@ void pa_pstream_unlink(pa_pstream *p) {
p->dead = true;
+ while (p->sr || p->is_srpending) /* In theory there could be one active and one pending */
+ pa_pstream_set_srbchannel(p, NULL);
+
if (p->import) {
pa_memimport_free(p->import);
p->import = NULL;
@@ -1040,3 +1093,23 @@ bool pa_pstream_get_shm(pa_pstream *p) {
return p->use_shm;
}
+
+void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *sr) {
+ pa_assert(p);
+ pa_assert(PA_REFCNT_VALUE(p) > 0 || sr == NULL);
+
+ if (sr == p->sr)
+ return;
+
+ /* We can't handle quick switches between srbchannels. */
+ pa_assert(!p->is_srpending);
+
+ p->srpending = sr;
+ p->is_srpending = true;
+
+ /* Switch immediately, if possible. */
+ if (p->dead)
+ check_srpending(p);
+ else
+ do_write(p);
+}
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index 4961570..73dba02 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -31,6 +31,7 @@
#include <pulsecore/packet.h>
#include <pulsecore/memblock.h>
#include <pulsecore/iochannel.h>
+#include <pulsecore/srbchannel.h>
#include <pulsecore/memchunk.h>
#include <pulsecore/creds.h>
#include <pulsecore/macro.h>
@@ -66,4 +67,8 @@ bool pa_pstream_is_pending(pa_pstream *p);
void pa_pstream_enable_shm(pa_pstream *p, bool enable);
bool pa_pstream_get_shm(pa_pstream *p);
+/* Enables shared ringbuffer channel. Note that the srbchannel is now owned by the pstream.
+ Setting sr to NULL will free any existing srbchannel. */
+void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *sr);
+
#endif
--
1.9.1
More information about the pulseaudio-discuss
mailing list