[pulseaudio-discuss] [PATCH RFC 16/17] pstream: Use small minibuffer to combine several read()s if possible

David Henningsson david.henningsson at canonical.com
Mon Oct 27 02:49:21 PDT 2014



On 2014-10-24 23:21, Peter Meerwald wrote:
> From: Peter Meerwald <p.meerwald at bct-electronic.com>
>
> idea is similar to b4342845d, Optimize write of smaller packages, but for read

IIRC, I tried this too, but never got it right, so if I posted a patch 
here for that, then I withdrew it later.

Any reason that READ_MINIBUF_SIZE is less than WRITE_MINIBUF_SIZE, is 
reading 256 bytes too much?

Also; the code isn't documented much, could you explain or write a 
comment what happens in the case of a very short packet, so that the 
next descriptor ends up the read minibuf?

>
> Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
> ---
>   src/pulsecore/pstream.c | 90 +++++++++++++++++++++++++++++++++++++++----------
>   1 file changed, 73 insertions(+), 17 deletions(-)
>
> diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
> index 8846d14..91868a7 100644
> --- a/src/pulsecore/pstream.c
> +++ b/src/pulsecore/pstream.c
> @@ -76,7 +76,8 @@ typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
>   #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
>   #define PA_PSTREAM_SHM_SIZE (PA_PSTREAM_SHM_MAX*sizeof(uint32_t))
>
> -#define MINIBUF_SIZE (256)
> +#define WRITE_MINIBUF_SIZE 256
> +#define READ_MINIBUF_SIZE 40
>
>   /* To allow uploading a single sample in one frame, this value should be the
>    * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
> @@ -120,7 +121,10 @@ struct item_info {
>   };
>
>   struct pstream_read {
> -    pa_pstream_descriptor descriptor;
> +    union {
> +        uint8_t minibuf[READ_MINIBUF_SIZE];
> +        pa_pstream_descriptor descriptor;
> +    };
>       pa_memblock *memblock;
>       pa_packet *packet;
>       uint32_t shm_info[PA_PSTREAM_SHM_MAX];
> @@ -143,7 +147,7 @@ struct pa_pstream {
>
>       struct {
>           union {
> -            uint8_t minibuf[MINIBUF_SIZE];
> +            uint8_t minibuf[WRITE_MINIBUF_SIZE];
>               pa_pstream_descriptor descriptor;
>           };
>           struct item_info* current;
> @@ -535,7 +539,7 @@ static void prepare_next_write_item(pa_pstream *p) {
>           p->write.data = (void *) pa_packet_data(p->write.current->per_type.packet, &plen);
>           p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
>
> -        if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
> +        if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
>               memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
>               p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
>           }
> @@ -548,7 +552,7 @@ static void prepare_next_write_item(pa_pstream *p) {
>           p->write.data = (void *) pa_tagstruct_data(p->write.current->per_type.tagstruct, &tlen);
>           p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) tlen);
>
> -        if (tlen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
> +        if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
>               memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen);
>               p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen;
>           }
> @@ -938,7 +942,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
>       pa_assert(p);
>       pa_assert(PA_REFCNT_VALUE(p) > 0);
>
> -    if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
> +    if (re->index == 0) {
> +        /* special case: expecting a new descriptor but provide extra space;
> +         * often we can save a read() */
> +        d = (uint8_t*) re->minibuf;
> +        l = READ_MINIBUF_SIZE;
> +    } else if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
>           d = (uint8_t*) re->descriptor + re->index;
>           l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
>       } else {
> @@ -989,18 +998,65 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
>       if (release_memblock)
>           pa_memblock_release(release_memblock);
>
> -    re->index += (size_t) r;
> -
> -    if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
> -        handle_descriptor(p, re);
> -    } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
> -        /* Frame payload available */
> -        if (re->memblock && p->receive_memblock_callback)
> -            handle_payload(p, re, r);
> +    if (re->index == 0 && r > (int) PA_PSTREAM_DESCRIPTOR_SIZE) {
> +        uint8_t *m = re->minibuf;
> +
> +        while (r > 0) {
> +            int frame_remaining;
> +            if (r >= (int) PA_PSTREAM_DESCRIPTOR_SIZE) {
> +                /* Handle descriptor */
> +                memmove(re->descriptor, m, PA_PSTREAM_DESCRIPTOR_SIZE);
> +                frame_remaining = handle_descriptor(p, re);
> +                r -= PA_PSTREAM_DESCRIPTOR_SIZE;
> +                m += PA_PSTREAM_DESCRIPTOR_SIZE;
> +            } else {
> +                /* Move remaining descriptor part */
> +                memmove(re->descriptor, m, r);
> +                re->index = r;
> +                break;
> +            }
>
> -        /* Frame complete */
> -        if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
> -            frame_complete(p, re);
> +            if (frame_remaining > 0) {
> +                /* Copy data from minibuf */
> +                pa_assert(re->data || re->memblock);
> +
> +                l = PA_MIN(ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), r);
> +                if (re->data)
> +                    memcpy(re->data, m, l);
> +                else {
> +                    d = pa_memblock_acquire(re->memblock);
> +                    memcpy(d, m, l);
> +                    pa_memblock_release(re->memblock);
> +                }
> +                r -= l;
> +                m += l;
> +                re->index = PA_PSTREAM_DESCRIPTOR_SIZE + l;
> +
> +                /* Frame payload available */
> +                if (re->memblock && p->receive_memblock_callback)
> +                    handle_payload(p, re, l);
> +
> +                /* Frame complete */
> +                if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
> +                    frame_complete(p, re);
> +            }
> +            else
> +                frame_done(p, re);
> +        }
> +    } else {
> +        re->index += (size_t) r;
> +
> +        if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
> +            handle_descriptor(p, re);
> +        } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
> +            /* Frame payload available */
> +            if (re->memblock && p->receive_memblock_callback)
> +                handle_payload(p, re, r);
> +
> +            /* Frame complete */
> +            if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
> +                frame_complete(p, re);
> +        }
>       }
>
>       return 0;
>

-- 
David Henningsson, Canonical Ltd.
https://launchpad.net/~diwic


More information about the pulseaudio-discuss mailing list