[pulseaudio-discuss] [PATCH RFCv2 20/27] pstream: Peek into next item on send queue to see if it can be put into minibuffer together with current item

David Henningsson david.henningsson at canonical.com
Mon Nov 3 00:45:50 PST 2014


Good idea, but the implementation seems to have some room for improvement?

I get the feeling that we should have a while loop instead of just 
poking one extra item here, it could give the benefit of less copy-paste 
(e g the "if i->type" rows look like copy-pasted from somewhere), and 
also this would give the opportunity of putting more than two messages 
into the same minibuffer?

But maybe that would require additional refactorings, or become too 
complex...?

On 2014-10-28 20:46, Peter Meerwald wrote:
> From: Peter Meerwald <p.meerwald at bct-electronic.com>
>
> patch b4342845 "pstream: Optimise write of smaller packages" combines (copies)
> the descriptor and payload of certain (small) packages into a minibuf
> to send the data with a single write()
>
> this patch extends the idea by looking at the next item on the send queue
> to see if it can be combined with the current package and appended to the
> minibuffer
>
> Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
> ---
>   src/pulsecore/pstream.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++--
>   1 file changed, 62 insertions(+), 2 deletions(-)
>
> diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
> index 1c966de..fbd85f9 100644
> --- a/src/pulsecore/pstream.c
> +++ b/src/pulsecore/pstream.c
> @@ -153,7 +153,7 @@ struct pa_pstream {
>           struct item_info* current;
>           void *data;
>           size_t index;
> -        int minibuf_validsize;
> +        unsigned minibuf_validsize;
>           pa_memchunk memchunk;
>       } write;
>
> @@ -528,6 +528,55 @@ static void shm_descriptor(void *d, uint32_t flags, uint32_t block_id) {
>       descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(block_id);
>   }
>
> +/* Peek at next item in send queue and copy descriptor/data into current
> + * write minibuffer if possible */
> +static void peek_next_send_item(pa_pstream *p) {
> +    const struct item_info *i;
> +    const void *data = NULL;
> +    uint32_t flags = 0;
> +    size_t len = 0;
> +    unsigned minibuf_usedsize = p->write.minibuf_validsize;
> +
> +    i = pa_queue_peek(p->send_queue);
> +    if (!i)
> +        return;
> +
> +    /* Items with anicllary data are rare, no need to complicate things here */
> +    if (i->with_ancil)
> +        return;
> +
> +    /* Write minibuffer not used yet, but the descriptor is already in there */
> +    if (!minibuf_usedsize)
> +        minibuf_usedsize = PA_PSTREAM_DESCRIPTOR_SIZE;
> +
> +    if (i->type == PA_PSTREAM_ITEM_PACKET) {
> +        data = pa_packet_data(i->per_type.packet, &len);
> +    } else if (i->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
> +        data = pa_tagstruct_data(i->per_type.tagstruct, &len);
> +    } else if (i->type == PA_PSTREAM_ITEM_SHMRELEASE) {
> +        flags = PA_FLAG_SHMRELEASE;
> +    } else if (i->type == PA_PSTREAM_ITEM_SHMREVOKE) {
> +        flags = PA_FLAG_SHMREVOKE;
> +    } else
> +        return;
> +
> +    /* Check if next item's descriptor and payload fit into write minibuffer;
> +     * combine/copy next item with current item if possible */
> +    if (PA_PSTREAM_DESCRIPTOR_SIZE + len <= WRITE_MINIBUF_SIZE - minibuf_usedsize) {
> +        uint8_t *m = p->write.minibuf + minibuf_usedsize;
> +
> +        p->write.minibuf_validsize = minibuf_usedsize + PA_PSTREAM_DESCRIPTOR_SIZE + len;
> +
> +        reset_descriptor(m, len);
> +        if (len)
> +            memcpy(m + PA_PSTREAM_DESCRIPTOR_SIZE, data, len);
> +        else
> +            shm_descriptor(m, flags, i->per_type.block_id);
> +
> +        item_free(pa_queue_pop(p->send_queue));
> +    }
> +}
> +
>   static void prepare_next_write_item(pa_pstream *p) {
>       pa_assert(p);
>       pa_assert(PA_REFCNT_VALUE(p) > 0);
> @@ -554,6 +603,8 @@ static void prepare_next_write_item(pa_pstream *p) {
>           if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
>               memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
>               p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
> +
> +            peek_next_send_item(p);
>           }
>
>       } else if (p->write.current->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
> @@ -567,16 +618,22 @@ static void prepare_next_write_item(pa_pstream *p) {
>           if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
>               memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen);
>               p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen;
> +
> +            peek_next_send_item(p);
>           }
>
>       } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
>
>           shm_descriptor(p->write.descriptor, PA_FLAG_SHMRELEASE, p->write.current->per_type.block_id);
>
> +        peek_next_send_item(p);
> +
>       } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
>
>           shm_descriptor(p->write.descriptor, PA_FLAG_SHMREVOKE, p->write.current->per_type.block_id);
>
> +        peek_next_send_item(p);
> +
>       } else {
>           uint32_t flags;
>           bool send_payload = true;
> @@ -721,7 +778,10 @@ static int do_write(pa_pstream *p) {
>
>       p->write.index += (size_t) r;
>
> -    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
> +    /* see if entire item or minibuffer has been sent */
> +    if (p->write.index >=
> +        (p->write.minibuf_validsize > 0 ? p->write.minibuf_validsize :
> +        (PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])))) {
>           pa_assert(p->write.current);
>           item_free(p->write.current);
>           p->write.current = NULL;
>

-- 
David Henningsson, Canonical Ltd.
https://launchpad.net/~diwic


More information about the pulseaudio-discuss mailing list