[pulseaudio-discuss] [PATCH RFC 15/17] pstream: Split up do_read()
Peter Meerwald
pmeerw at pmeerw.net
Fri Oct 24 14:21:39 PDT 2014
From: Peter Meerwald <p.meerwald at bct-electronic.com>
no functional change, just using several funtion to break up that monstrosity
Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
---
src/pulsecore/pstream.c | 400 ++++++++++++++++++++++++------------------------
1 file changed, 204 insertions(+), 196 deletions(-)
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 400df42..8846d14 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -731,6 +731,205 @@ fail:
return -1;
}
+static int handle_descriptor(pa_pstream *p, struct pstream_read *re) {
+ uint32_t flags, length, channel;
+
+ pa_assert(p);
+ pa_assert(re);
+
+ flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+
+ if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
+ pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
+ return -1;
+ }
+
+ if (flags == PA_FLAG_SHMRELEASE) {
+ /* This is a SHM memblock release frame with no payload */
+ /* pa_log("Got release frame for %u", ntohl(*desc[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+ pa_assert(p->export);
+ pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+ goto frame_done;
+
+ } else if (flags == PA_FLAG_SHMREVOKE) {
+ /* This is a SHM memblock revoke frame with no payload */
+ /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+ pa_assert(p->import);
+ pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+ goto frame_done;
+ }
+
+ length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+
+ if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
+ pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
+ return -1;
+ }
+
+ pa_assert(!re->packet && !re->memblock);
+
+ channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+
+ if (channel == (uint32_t) -1) {
+ size_t plen;
+ if (flags != 0) {
+ pa_log_warn("Received packet frame with invalid flags value.");
+ return -1;
+ }
+
+ /* Frame is a packet frame */
+ re->packet = pa_packet_new(length);
+ re->data = (void *) pa_packet_data(re->packet, &plen);
+
+ } else {
+
+ if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
+ pa_log_warn("Received memblock frame with invalid seek mode.");
+ return -1;
+ }
+
+ if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+ if (length != sizeof(re->shm_info)) {
+ pa_log_warn("Received SHM memblock frame with invalid frame length.");
+ return -1;
+ }
+
+ /* Frame is a memblock frame referencing an SHM memblock */
+ re->data = re->shm_info;
+
+ } else if ((flags & PA_FLAG_SHMMASK) == 0) {
+
+ /* Frame is a memblock frame */
+ re->memblock = pa_memblock_new(p->mempool, length);
+ re->data = NULL; // XXX
+ } else {
+ pa_log_warn("Received memblock frame with invalid flags value.");
+ return -1;
+ }
+ }
+
+ return 1;
+
+frame_done:
+ re->index = 0;
+
+ return 0;
+}
+
+static void frame_done(pa_pstream *p, struct pstream_read *re) {
+ re->memblock = NULL;
+ re->packet = NULL;
+ re->index = 0;
+ re->data = NULL;
+
+#ifdef HAVE_CREDS
+ p->read_ancil.creds_valid = false;
+ p->read_ancil.nfd = 0;
+#endif
+}
+
+static void frame_complete(pa_pstream *p, struct pstream_read *re) {
+ if (re->memblock) {
+
+ /* This was a memblock frame. We can unref the memblock now */
+ pa_memblock_unref(re->memblock);
+
+ } else if (re->packet) {
+
+ if (p->receive_packet_callback)
+#ifdef HAVE_CREDS
+ p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata);
+#else
+ p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
+#endif
+
+ pa_packet_unref(re->packet);
+ } else {
+ pa_memblock *b;
+ uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+
+ pa_assert(p->import);
+
+ if (!(b = pa_memimport_get(p->import,
+ ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
+ !!(flags & PA_FLAG_SHMWRITABLE)))) {
+
+ if (pa_log_ratelimit(PA_LOG_DEBUG))
+ pa_log_debug("Failed to import memory block.");
+ }
+
+ if (p->receive_memblock_callback) {
+ int64_t offset;
+ pa_memchunk chunk;
+
+ chunk.memblock = b;
+ chunk.index = 0;
+ chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
+
+ offset = (int64_t) (
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+
+ p->receive_memblock_callback(
+ p,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ offset,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ &chunk,
+ p->receive_memblock_callback_userdata);
+ }
+
+ if (b)
+ pa_memblock_unref(b);
+ }
+
+ frame_done(p, re);
+}
+
+static void handle_payload(pa_pstream *p, struct pstream_read *re, ssize_t r) {
+ size_t l;
+
+ /* Is this memblock data? Than pass it to the user */
+ l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
+
+ if (l > 0) {
+ pa_memchunk chunk;
+
+ chunk.memblock = re->memblock;
+ chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
+ chunk.length = l;
+
+ if (p->receive_memblock_callback) {
+ int64_t offset;
+
+ offset = (int64_t) (
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+
+ p->receive_memblock_callback(
+ p,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ offset,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ &chunk,
+ p->receive_memblock_callback_userdata);
+ }
+
+ /* Drop seek info for following callbacks */
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ }
+}
+
static int do_read(pa_pstream *p, struct pstream_read *re) {
void *d;
size_t l;
@@ -793,210 +992,19 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
re->index += (size_t) r;
if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
- uint32_t flags, length, channel;
- /* Reading of frame descriptor complete */
-
- flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
-
- if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
- pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
- return -1;
- }
-
- if (flags == PA_FLAG_SHMRELEASE) {
-
- /* This is a SHM memblock release frame with no payload */
-
-/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
-
- pa_assert(p->export);
- pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
-
- goto frame_done;
-
- } else if (flags == PA_FLAG_SHMREVOKE) {
-
- /* This is a SHM memblock revoke frame with no payload */
-
-/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
-
- pa_assert(p->import);
- pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
-
- goto frame_done;
- }
-
- length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
-
- if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
- pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
- return -1;
- }
-
- pa_assert(!re->packet && !re->memblock);
-
- channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
-
- if (channel == (uint32_t) -1) {
- size_t plen;
-
- if (flags != 0) {
- pa_log_warn("Received packet frame with invalid flags value.");
- return -1;
- }
-
- /* Frame is a packet frame */
- re->packet = pa_packet_new(length);
- re->data = (void *) pa_packet_data(re->packet, &plen);
-
- } else {
-
- if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
- pa_log_warn("Received memblock frame with invalid seek mode.");
- return -1;
- }
-
- if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
-
- if (length != sizeof(re->shm_info)) {
- pa_log_warn("Received SHM memblock frame with invalid frame length.");
- return -1;
- }
-
- /* Frame is a memblock frame referencing an SHM memblock */
- re->data = re->shm_info;
-
- } else if ((flags & PA_FLAG_SHMMASK) == 0) {
-
- /* Frame is a memblock frame */
-
- re->memblock = pa_memblock_new(p->mempool, length);
- re->data = NULL;
- } else {
-
- pa_log_warn("Received memblock frame with invalid flags value.");
- return -1;
- }
- }
-
+ handle_descriptor(p, re);
} else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
/* Frame payload available */
-
- if (re->memblock && p->receive_memblock_callback) {
-
- /* Is this memblock data? Than pass it to the user */
- l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
-
- if (l > 0) {
- pa_memchunk chunk;
-
- chunk.memblock = re->memblock;
- chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
- chunk.length = l;
-
- if (p->receive_memblock_callback) {
- int64_t offset;
-
- offset = (int64_t) (
- (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
-
- p->receive_memblock_callback(
- p,
- ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
- offset,
- ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
- &chunk,
- p->receive_memblock_callback_userdata);
- }
-
- /* Drop seek info for following callbacks */
- re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
- re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
- re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
- }
- }
+ if (re->memblock && p->receive_memblock_callback)
+ handle_payload(p, re, r);
/* Frame complete */
- if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
-
- if (re->memblock) {
-
- /* This was a memblock frame. We can unref the memblock now */
- pa_memblock_unref(re->memblock);
-
- } else if (re->packet) {
-
- if (p->receive_packet_callback)
-#ifdef HAVE_CREDS
- p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata);
-#else
- p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
-#endif
-
- pa_packet_unref(re->packet);
- } else {
- pa_memblock *b;
- uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
- pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
-
- pa_assert(p->import);
-
- if (!(b = pa_memimport_get(p->import,
- ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
- !!(flags & PA_FLAG_SHMWRITABLE)))) {
-
- if (pa_log_ratelimit(PA_LOG_DEBUG))
- pa_log_debug("Failed to import memory block.");
- }
-
- if (p->receive_memblock_callback) {
- int64_t offset;
- pa_memchunk chunk;
-
- chunk.memblock = b;
- chunk.index = 0;
- chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
-
- offset = (int64_t) (
- (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
-
- p->receive_memblock_callback(
- p,
- ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
- offset,
- ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
- &chunk,
- p->receive_memblock_callback_userdata);
- }
-
- if (b)
- pa_memblock_unref(b);
- }
-
- goto frame_done;
- }
+ if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
+ frame_complete(p, re);
}
return 0;
-frame_done:
- re->memblock = NULL;
- re->packet = NULL;
- re->index = 0;
- re->data = NULL;
-
-#ifdef HAVE_CREDS
- p->read_ancil.creds_valid = false;
- p->read_ancil.nfd = 0;
-#endif
-
- return 0;
-
fail:
if (release_memblock)
pa_memblock_release(release_memblock);
--
1.9.1
More information about the pulseaudio-discuss
mailing list