[pulseaudio-discuss] [PATCH RFCv3 15/51] pstream: Split up do_read()

David Henningsson david.henningsson at canonical.com
Fri Feb 27 02:03:40 PST 2015


I'd like to see this one pushed, as I was thinking of writing something 
on top of it. But see comments below

On 2014-11-05 00:26, Peter Meerwald wrote:
> From: Peter Meerwald <p.meerwald at bct-electronic.com>
>
> no functional change, just using several funtion to break up that monstrosity

s/funtion/functions

>
> Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
> ---
>   src/pulsecore/pstream.c | 400 ++++++++++++++++++++++++------------------------
>   1 file changed, 204 insertions(+), 196 deletions(-)
>
> diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
> index 69f9abd..3926353 100644
> --- a/src/pulsecore/pstream.c
> +++ b/src/pulsecore/pstream.c
> @@ -731,6 +731,205 @@ fail:
>       return -1;
>   }
>
> +static int handle_descriptor(pa_pstream *p, struct pstream_read *re) {
> +    uint32_t flags, length, channel;
> +
> +    pa_assert(p);
> +    pa_assert(re);
> +
> +    flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
> +
> +    if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
> +        pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
> +        return -1;
> +    }
> +
> +    if (flags == PA_FLAG_SHMRELEASE) {
> +        /* This is a SHM memblock release frame with no payload */
> +        /* pa_log("Got release frame for %u", ntohl(*desc[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
> +
> +        pa_assert(p->export);
> +        pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
> +
> +        goto frame_done;
> +
> +    } else if (flags == PA_FLAG_SHMREVOKE) {
> +        /* This is a SHM memblock revoke frame with no payload */
> +        /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
> +
> +        pa_assert(p->import);
> +        pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
> +
> +        goto frame_done;
> +    }
> +
> +    length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
> +
> +    if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
> +        pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
> +        return -1;
> +    }
> +
> +    pa_assert(!re->packet && !re->memblock);
> +
> +    channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
> +
> +    if (channel == (uint32_t) -1) {
> +        size_t plen;
> +        if (flags != 0) {
> +            pa_log_warn("Received packet frame with invalid flags value.");
> +            return -1;
> +        }
> +
> +        /* Frame is a packet frame */
> +        re->packet = pa_packet_new(length);
> +        re->data = (void *) pa_packet_data(re->packet, &plen);
> +
> +    } else {
> +
> +        if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
> +            pa_log_warn("Received memblock frame with invalid seek mode.");
> +            return -1;
> +        }
> +
> +        if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
> +            if (length != sizeof(re->shm_info)) {
> +                pa_log_warn("Received SHM memblock frame with invalid frame length.");
> +                return -1;
> +            }
> +
> +            /* Frame is a memblock frame referencing an SHM memblock */
> +            re->data = re->shm_info;
> +
> +        } else if ((flags & PA_FLAG_SHMMASK) == 0) {
> +
> +            /* Frame is a memblock frame */
> +            re->memblock = pa_memblock_new(p->mempool, length);
> +            re->data = NULL; // XXX
> +        } else {
> +            pa_log_warn("Received memblock frame with invalid flags value.");
> +            return -1;
> +        }
> +    }
> +
> +    return 1;
> +
> +frame_done:
> +    re->index = 0;
> +
> +    return 0;
> +}

Did you also mean to call "frame_done" here? The handle_descriptor 
returns something but the result is unused?

Confusing to have both a label and a function with the same name.

> +
> +static void frame_done(pa_pstream *p, struct pstream_read *re) {
> +    re->memblock = NULL;
> +    re->packet = NULL;
> +    re->index = 0;
> +    re->data = NULL;
> +
> +#ifdef HAVE_CREDS
> +    p->read_ancil_data.creds_valid = false;
> +    p->read_ancil_data.nfd = 0;
> +#endif
> +}
> +
> +static void frame_complete(pa_pstream *p, struct pstream_read *re) {
> +    if (re->memblock) {
> +
> +        /* This was a memblock frame. We can unref the memblock now */
> +        pa_memblock_unref(re->memblock);
> +
> +    } else if (re->packet) {
> +
> +        if (p->receive_packet_callback)
> +#ifdef HAVE_CREDS
> +            p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
> +#else
> +            p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
> +#endif
> +
> +        pa_packet_unref(re->packet);
> +    } else {
> +        pa_memblock *b;
> +        uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
> +        pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
> +
> +        pa_assert(p->import);
> +
> +        if (!(b = pa_memimport_get(p->import,
> +                                  ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
> +                                  ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
> +                                  ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
> +                                  ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
> +                                  !!(flags & PA_FLAG_SHMWRITABLE)))) {
> +
> +            if (pa_log_ratelimit(PA_LOG_DEBUG))
> +                pa_log_debug("Failed to import memory block.");
> +        }
> +
> +        if (p->receive_memblock_callback) {
> +            int64_t offset;
> +            pa_memchunk chunk;
> +
> +            chunk.memblock = b;
> +            chunk.index = 0;
> +            chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
> +
> +            offset = (int64_t) (
> +                    (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
> +                    (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
> +
> +            p->receive_memblock_callback(
> +                    p,
> +                    ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
> +                    offset,
> +                    ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
> +                    &chunk,
> +                    p->receive_memblock_callback_userdata);
> +        }
> +
> +        if (b)
> +            pa_memblock_unref(b);
> +    }
> +
> +    frame_done(p, re);
> +}
> +
> +static void handle_payload(pa_pstream *p, struct pstream_read *re, ssize_t r) {
> +    size_t l;
> +
> +    /* Is this memblock data? Than pass it to the user */
> +    l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
> +
> +    if (l > 0) {
> +        pa_memchunk chunk;
> +
> +        chunk.memblock = re->memblock;
> +        chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
> +        chunk.length = l;
> +
> +        if (p->receive_memblock_callback) {
> +            int64_t offset;
> +
> +            offset = (int64_t) (
> +                    (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
> +                    (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
> +
> +            p->receive_memblock_callback(
> +                p,
> +                ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
> +                offset,
> +                ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
> +                &chunk,
> +                p->receive_memblock_callback_userdata);
> +        }
> +
> +        /* Drop seek info for following callbacks */
> +        re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
> +            re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
> +            re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
> +    }
> +}
> +
>   static int do_read(pa_pstream *p, struct pstream_read *re) {
>       void *d;
>       size_t l;
> @@ -793,210 +992,19 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
>       re->index += (size_t) r;
>
>       if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
> -        uint32_t flags, length, channel;
> -        /* Reading of frame descriptor complete */
> -
> -        flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
> -
> -        if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
> -            pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
> -            return -1;
> -        }
> -
> -        if (flags == PA_FLAG_SHMRELEASE) {
> -
> -            /* This is a SHM memblock release frame with no payload */
> -
> -/*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
> -
> -            pa_assert(p->export);
> -            pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
> -
> -            goto frame_done;
> -
> -        } else if (flags == PA_FLAG_SHMREVOKE) {
> -
> -            /* This is a SHM memblock revoke frame with no payload */
> -
> -/*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
> -
> -            pa_assert(p->import);
> -            pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
> -
> -            goto frame_done;
> -        }
> -
> -        length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
> -
> -        if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
> -            pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
> -            return -1;
> -        }
> -
> -        pa_assert(!re->packet && !re->memblock);
> -
> -        channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
> -
> -        if (channel == (uint32_t) -1) {
> -            size_t plen;
> -
> -            if (flags != 0) {
> -                pa_log_warn("Received packet frame with invalid flags value.");
> -                return -1;
> -            }
> -
> -            /* Frame is a packet frame */
> -            re->packet = pa_packet_new(length);
> -            re->data = (void *) pa_packet_data(re->packet, &plen);
> -
> -        } else {
> -
> -            if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
> -                pa_log_warn("Received memblock frame with invalid seek mode.");
> -                return -1;
> -            }
> -
> -            if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
> -
> -                if (length != sizeof(re->shm_info)) {
> -                    pa_log_warn("Received SHM memblock frame with invalid frame length.");
> -                    return -1;
> -                }
> -
> -                /* Frame is a memblock frame referencing an SHM memblock */
> -                re->data = re->shm_info;
> -
> -            } else if ((flags & PA_FLAG_SHMMASK) == 0) {
> -
> -                /* Frame is a memblock frame */
> -
> -                re->memblock = pa_memblock_new(p->mempool, length);
> -                re->data = NULL;
> -            } else {
> -
> -                pa_log_warn("Received memblock frame with invalid flags value.");
> -                return -1;
> -            }
> -        }
> -
> +        handle_descriptor(p, re);
>       } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
>           /* Frame payload available */
> -
> -        if (re->memblock && p->receive_memblock_callback) {
> -
> -            /* Is this memblock data? Than pass it to the user */
> -            l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
> -
> -            if (l > 0) {
> -                pa_memchunk chunk;
> -
> -                chunk.memblock = re->memblock;
> -                chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
> -                chunk.length = l;
> -
> -                if (p->receive_memblock_callback) {
> -                    int64_t offset;
> -
> -                    offset = (int64_t) (
> -                            (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
> -                            (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
> -
> -                    p->receive_memblock_callback(
> -                        p,
> -                        ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
> -                        offset,
> -                        ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
> -                        &chunk,
> -                        p->receive_memblock_callback_userdata);
> -                }
> -
> -                /* Drop seek info for following callbacks */
> -                re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
> -                    re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
> -                    re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
> -            }
> -        }
> +        if (re->memblock && p->receive_memblock_callback)
> +            handle_payload(p, re, r);
>
>           /* Frame complete */
> -        if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
> -
> -            if (re->memblock) {
> -
> -                /* This was a memblock frame. We can unref the memblock now */
> -                pa_memblock_unref(re->memblock);
> -
> -            } else if (re->packet) {
> -
> -                if (p->receive_packet_callback)
> -#ifdef HAVE_CREDS
> -                    p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
> -#else
> -                    p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
> -#endif
> -
> -                pa_packet_unref(re->packet);
> -            } else {
> -                pa_memblock *b;
> -                uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
> -                pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
> -
> -                pa_assert(p->import);
> -
> -                if (!(b = pa_memimport_get(p->import,
> -                                          ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
> -                                          ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
> -                                          ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
> -                                          ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
> -                                          !!(flags & PA_FLAG_SHMWRITABLE)))) {
> -
> -                    if (pa_log_ratelimit(PA_LOG_DEBUG))
> -                        pa_log_debug("Failed to import memory block.");
> -                }
> -
> -                if (p->receive_memblock_callback) {
> -                    int64_t offset;
> -                    pa_memchunk chunk;
> -
> -                    chunk.memblock = b;
> -                    chunk.index = 0;
> -                    chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
> -
> -                    offset = (int64_t) (
> -                            (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
> -                            (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
> -
> -                    p->receive_memblock_callback(
> -                            p,
> -                            ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
> -                            offset,
> -                            ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
> -                            &chunk,
> -                            p->receive_memblock_callback_userdata);
> -                }
> -
> -                if (b)
> -                    pa_memblock_unref(b);
> -            }
> -
> -            goto frame_done;
> -        }
> +        if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
> +            frame_complete(p, re);
>       }
>
>       return 0;
>
> -frame_done:
> -    re->memblock = NULL;
> -    re->packet = NULL;
> -    re->index = 0;
> -    re->data = NULL;
> -
> -#ifdef HAVE_CREDS
> -    p->read_ancil_data.creds_valid = false;
> -    p->read_ancil_data.nfd = 0;
> -#endif
> -
> -    return 0;
> -
>   fail:
>       if (release_memblock)
>           pa_memblock_release(release_memblock);
>

-- 
David Henningsson, Canonical Ltd.
https://launchpad.net/~diwic


More information about the pulseaudio-discuss mailing list