[pulseaudio-discuss] [PATCH RFCv2 20/27] pstream: Peek into next item on send queue to see if it can be put into minibuffer together with current item

Peter Meerwald pmeerw at pmeerw.net
Mon Nov 3 00:54:32 PST 2014


Hello David,

thanks for looking at the patch!

> Good idea, but the implementation seems to have some room for improvement?

certainly

> I get the feeling that we should have a while loop instead of just 
> poking one extra item here, it could give the benefit of less copy-paste 
> (e g the "if i->type" rows look like copy-pasted from somewhere), and 
> also this would give the opportunity of putting more than two messages 
> into the same minibuffer?

can certainly be done; let me do some statistics how often we improve when 
using a peek loop

the i->type are the items that can be merged; no harm if not all are 
caught

> But maybe that would require additional refactorings, or become too 
> complex...?

I wrote the code with that loop in mind, so complexity added should just 
be the additional loop (hopefully :)

p.
 
> 
> On 2014-10-28 20:46, Peter Meerwald wrote:
> > From: Peter Meerwald <p.meerwald at bct-electronic.com>
> >
> > patch b4342845 "pstream: Optimise write of smaller packages" combines
> (copies)
> > the descriptor and payload of certain (small) packages into a minibuf
> > to send the data with a single write()
> >
> > this patch extends the idea by looking at the next item on the send queue
> > to see if it can be combined with the current package and appended to the
> > minibuffer
> >
> > Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
> > ---
> >   src/pulsecore/pstream.c | 64
> +++++++++++++++++++++++++++++++++++++++++++++++--
> >   1 file changed, 62 insertions(+), 2 deletions(-)
> >
> > diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
> > index 1c966de..fbd85f9 100644
> > --- a/src/pulsecore/pstream.c
> > +++ b/src/pulsecore/pstream.c
> > @@ -153,7 +153,7 @@ struct pa_pstream {
> >           struct item_info* current;
> >           void *data;
> >           size_t index;
> > -        int minibuf_validsize;
> > +        unsigned minibuf_validsize;
> >           pa_memchunk memchunk;
> >       } write;
> >
> > @@ -528,6 +528,55 @@ static void shm_descriptor(void *d, uint32_t flags,
> uint32_t block_id) {
> >       descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(block_id);
> >   }
> >
> > +/* Peek at next item in send queue and copy descriptor/data into current
> > + * write minibuffer if possible */
> > +static void peek_next_send_item(pa_pstream *p) {
> > +    const struct item_info *i;
> > +    const void *data = NULL;
> > +    uint32_t flags = 0;
> > +    size_t len = 0;
> > +    unsigned minibuf_usedsize = p->write.minibuf_validsize;
> > +
> > +    i = pa_queue_peek(p->send_queue);
> > +    if (!i)
> > +        return;
> > +
> > +    /* Items with anicllary data are rare, no need to complicate things
> here */
> > +    if (i->with_ancil)
> > +        return;
> > +
> > +    /* Write minibuffer not used yet, but the descriptor is already in
> there */
> > +    if (!minibuf_usedsize)
> > +        minibuf_usedsize = PA_PSTREAM_DESCRIPTOR_SIZE;
> > +
> > +    if (i->type == PA_PSTREAM_ITEM_PACKET) {
> > +        data = pa_packet_data(i->per_type.packet, &len);
> > +    } else if (i->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
> > +        data = pa_tagstruct_data(i->per_type.tagstruct, &len);
> > +    } else if (i->type == PA_PSTREAM_ITEM_SHMRELEASE) {
> > +        flags = PA_FLAG_SHMRELEASE;
> > +    } else if (i->type == PA_PSTREAM_ITEM_SHMREVOKE) {
> > +        flags = PA_FLAG_SHMREVOKE;
> > +    } else
> > +        return;
> > +
> > +    /* Check if next item's descriptor and payload fit into write
> minibuffer;
> > +     * combine/copy next item with current item if possible */
> > +    if (PA_PSTREAM_DESCRIPTOR_SIZE + len <= WRITE_MINIBUF_SIZE -
> minibuf_usedsize) {
> > +        uint8_t *m = p->write.minibuf + minibuf_usedsize;
> > +
> > +        p->write.minibuf_validsize = minibuf_usedsize +
> PA_PSTREAM_DESCRIPTOR_SIZE + len;
> > +
> > +        reset_descriptor(m, len);
> > +        if (len)
> > +            memcpy(m + PA_PSTREAM_DESCRIPTOR_SIZE, data, len);
> > +        else
> > +            shm_descriptor(m, flags, i->per_type.block_id);
> > +
> > +        item_free(pa_queue_pop(p->send_queue));
> > +    }
> > +}
> > +
> >   static void prepare_next_write_item(pa_pstream *p) {
> >       pa_assert(p);
> >       pa_assert(PA_REFCNT_VALUE(p) > 0);
> > @@ -554,6 +603,8 @@ static void prepare_next_write_item(pa_pstream *p) {
> >           if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
> >               memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE],
> p->write.data, plen);
> >               p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE +
> plen;
> > +
> > +            peek_next_send_item(p);
> >           }
> >
> >       } else if (p->write.current->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
> > @@ -567,16 +618,22 @@ static void prepare_next_write_item(pa_pstream *p) {
> >           if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
> >               memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE],
> p->write.data, tlen);
> >               p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE +
> tlen;
> > +
> > +            peek_next_send_item(p);
> >           }
> >
> >       } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
> >
> >           shm_descriptor(p->write.descriptor, PA_FLAG_SHMRELEASE,
> p->write.current->per_type.block_id);
> >
> > +        peek_next_send_item(p);
> > +
> >       } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
> >
> >           shm_descriptor(p->write.descriptor, PA_FLAG_SHMREVOKE,
> p->write.current->per_type.block_id);
> >
> > +        peek_next_send_item(p);
> > +
> >       } else {
> >           uint32_t flags;
> >           bool send_payload = true;
> > @@ -721,7 +778,10 @@ static int do_write(pa_pstream *p) {
> >
> >       p->write.index += (size_t) r;
> >
> > -    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE +
> ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
> > +    /* see if entire item or minibuffer has been sent */
> > +    if (p->write.index >=
> > +        (p->write.minibuf_validsize > 0 ? p->write.minibuf_validsize :
> > +        (PA_PSTREAM_DESCRIPTOR_SIZE +
> ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])))) {
> >           pa_assert(p->write.current);
> >           item_free(p->write.current);
> >           p->write.current = NULL;
> >
> 
> 

-- 

Peter Meerwald
+43-664-2444418 (mobile)


More information about the pulseaudio-discuss mailing list