[pulseaudio-discuss] [PATCH RFCv2 20/27] pstream: Peek into next item on send queue to see if it can be put into minibuffer together with current item

Peter Meerwald pmeerw at pmeerw.net
Tue Oct 28 12:46:33 PDT 2014


From: Peter Meerwald <p.meerwald at bct-electronic.com>

patch b4342845 "pstream: Optimise write of smaller packages" combines (copies)
the descriptor and payload of certain (small) packages into a minibuf
to send the data with a single write()

this patch extends the idea by looking at the next item on the send queue
to see if it can be combined with the current package and appended to the
minibuffer

Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
---
 src/pulsecore/pstream.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 62 insertions(+), 2 deletions(-)

diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 1c966de..fbd85f9 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -153,7 +153,7 @@ struct pa_pstream {
         struct item_info* current;
         void *data;
         size_t index;
-        int minibuf_validsize;
+        unsigned minibuf_validsize;
         pa_memchunk memchunk;
     } write;
 
@@ -528,6 +528,55 @@ static void shm_descriptor(void *d, uint32_t flags, uint32_t block_id) {
     descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(block_id);
 }
 
+/* Peek at next item in send queue and copy descriptor/data into current
+ * write minibuffer if possible */
+static void peek_next_send_item(pa_pstream *p) {
+    const struct item_info *i;
+    const void *data = NULL;
+    uint32_t flags = 0;
+    size_t len = 0;
+    unsigned minibuf_usedsize = p->write.minibuf_validsize;
+
+    i = pa_queue_peek(p->send_queue);
+    if (!i)
+        return;
+
+    /* Items with anicllary data are rare, no need to complicate things here */
+    if (i->with_ancil)
+        return;
+
+    /* Write minibuffer not used yet, but the descriptor is already in there */
+    if (!minibuf_usedsize)
+        minibuf_usedsize = PA_PSTREAM_DESCRIPTOR_SIZE;
+
+    if (i->type == PA_PSTREAM_ITEM_PACKET) {
+        data = pa_packet_data(i->per_type.packet, &len);
+    } else if (i->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
+        data = pa_tagstruct_data(i->per_type.tagstruct, &len);
+    } else if (i->type == PA_PSTREAM_ITEM_SHMRELEASE) {
+        flags = PA_FLAG_SHMRELEASE;
+    } else if (i->type == PA_PSTREAM_ITEM_SHMREVOKE) {
+        flags = PA_FLAG_SHMREVOKE;
+    } else
+        return;
+
+    /* Check if next item's descriptor and payload fit into write minibuffer;
+     * combine/copy next item with current item if possible */
+    if (PA_PSTREAM_DESCRIPTOR_SIZE + len <= WRITE_MINIBUF_SIZE - minibuf_usedsize) {
+        uint8_t *m = p->write.minibuf + minibuf_usedsize;
+
+        p->write.minibuf_validsize = minibuf_usedsize + PA_PSTREAM_DESCRIPTOR_SIZE + len;
+
+        reset_descriptor(m, len);
+        if (len)
+            memcpy(m + PA_PSTREAM_DESCRIPTOR_SIZE, data, len);
+        else
+            shm_descriptor(m, flags, i->per_type.block_id);
+
+        item_free(pa_queue_pop(p->send_queue));
+    }
+}
+
 static void prepare_next_write_item(pa_pstream *p) {
     pa_assert(p);
     pa_assert(PA_REFCNT_VALUE(p) > 0);
@@ -554,6 +603,8 @@ static void prepare_next_write_item(pa_pstream *p) {
         if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
+
+            peek_next_send_item(p);
         }
 
     } else if (p->write.current->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
@@ -567,16 +618,22 @@ static void prepare_next_write_item(pa_pstream *p) {
         if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen;
+
+            peek_next_send_item(p);
         }
 
     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
 
         shm_descriptor(p->write.descriptor, PA_FLAG_SHMRELEASE, p->write.current->per_type.block_id);
 
+        peek_next_send_item(p);
+
     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
 
         shm_descriptor(p->write.descriptor, PA_FLAG_SHMREVOKE, p->write.current->per_type.block_id);
 
+        peek_next_send_item(p);
+
     } else {
         uint32_t flags;
         bool send_payload = true;
@@ -721,7 +778,10 @@ static int do_write(pa_pstream *p) {
 
     p->write.index += (size_t) r;
 
-    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
+    /* see if entire item or minibuffer has been sent */
+    if (p->write.index >=
+        (p->write.minibuf_validsize > 0 ? p->write.minibuf_validsize :
+        (PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])))) {
         pa_assert(p->write.current);
         item_free(p->write.current);
         p->write.current = NULL;
-- 
1.9.1



More information about the pulseaudio-discuss mailing list