[pulseaudio-discuss] [PATCH RFCv3 20/51] pstream: Peek into next item on send queue to see if it can be put into minibuffer together with current item

Peter Meerwald pmeerw at pmeerw.net
Tue Nov 4 15:26:15 PST 2014


From: Peter Meerwald <p.meerwald at bct-electronic.com>

patch b4342845 "pstream: Optimise write of smaller packages" combines (copies)
the descriptor and payload of certain (small) packages into a minibuf
to send the data with a single write()

this patch extends the idea by looking at the next item on the send queue
to see if it can be combined with the current package and appended to the
minibuffer

v2 (thanks David Henningsson):
* peek ahead until minibuffer is full, read minibuffer size is set to 256 bytes

Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
---
 src/pulsecore/pstream.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 63 insertions(+), 3 deletions(-)

diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 7bd7760..e1b84c6 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -77,7 +77,7 @@ typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
 #define PA_PSTREAM_SHM_SIZE (PA_PSTREAM_SHM_MAX*sizeof(uint32_t))
 
 #define WRITE_MINIBUF_SIZE 256
-#define READ_MINIBUF_SIZE 80
+#define READ_MINIBUF_SIZE 256
 
 /* To allow uploading a single sample in one frame, this value should be the
  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
@@ -153,7 +153,7 @@ struct pa_pstream {
         struct item_info* current;
         void *data;
         size_t index;
-        int minibuf_validsize;
+        unsigned minibuf_validsize;
         pa_memchunk memchunk;
     } write;
 
@@ -528,6 +528,55 @@ static void shm_descriptor(void *d, uint32_t flags, uint32_t block_id) {
     descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(block_id);
 }
 
+/* Peek at next item in send queue and copy descriptor/data into current
+ * write minibuffer if possible */
+static void peek_next_send_item(pa_pstream *p) {
+    unsigned minibuf_usedsize = p->write.minibuf_validsize;
+    const struct item_info *i;
+
+    /* Write minibuffer not used yet, but the descriptor is already in there */
+    if (!minibuf_usedsize)
+        minibuf_usedsize = PA_PSTREAM_DESCRIPTOR_SIZE;
+
+    while ((i = pa_queue_peek(p->send_queue)) != NULL) {
+        const void *data = NULL;
+        uint32_t flags = 0;
+        size_t len = 0;
+
+        /* Items with anicllary data are rare, no need to complicate things here */
+        if (i->with_ancil_data)
+            break;
+
+        if (i->type == PA_PSTREAM_ITEM_PACKET) {
+            data = pa_packet_data(i->per_type.packet, &len);
+        } else if (i->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
+            data = pa_tagstruct_data(i->per_type.tagstruct, &len);
+        } else if (i->type == PA_PSTREAM_ITEM_SHMRELEASE) {
+            flags = PA_FLAG_SHMRELEASE;
+        } else if (i->type == PA_PSTREAM_ITEM_SHMREVOKE) {
+            flags = PA_FLAG_SHMREVOKE;
+        } else
+            break;
+
+        /* Check if next item's descriptor and payload fit into write minibuffer;
+         * combine/copy next item with current item if possible */
+        if (PA_PSTREAM_DESCRIPTOR_SIZE + len <= WRITE_MINIBUF_SIZE - minibuf_usedsize) {
+            uint8_t *m = p->write.minibuf + minibuf_usedsize;
+
+            minibuf_usedsize = p->write.minibuf_validsize = minibuf_usedsize + PA_PSTREAM_DESCRIPTOR_SIZE + len;
+
+            reset_descriptor(m, len);
+            if (len)
+                memcpy(m + PA_PSTREAM_DESCRIPTOR_SIZE, data, len);
+            else
+                shm_descriptor(m, flags, i->per_type.block_id);
+
+            item_free(pa_queue_pop(p->send_queue));
+        } else
+            break;
+    }
+}
+
 static void prepare_next_write_item(pa_pstream *p) {
     pa_assert(p);
     pa_assert(PA_REFCNT_VALUE(p) > 0);
@@ -554,6 +603,8 @@ static void prepare_next_write_item(pa_pstream *p) {
         if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
+
+            peek_next_send_item(p);
         }
 
     } else if (p->write.current->type == PA_PSTREAM_ITEM_TAGSTRUCT) {
@@ -567,16 +618,22 @@ static void prepare_next_write_item(pa_pstream *p) {
         if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen;
+
+            peek_next_send_item(p);
         }
 
     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
 
         shm_descriptor(p->write.descriptor, PA_FLAG_SHMRELEASE, p->write.current->per_type.block_id);
 
+        peek_next_send_item(p);
+
     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
 
         shm_descriptor(p->write.descriptor, PA_FLAG_SHMREVOKE, p->write.current->per_type.block_id);
 
+        peek_next_send_item(p);
+
     } else {
         uint32_t flags;
         bool send_payload = true;
@@ -721,7 +778,10 @@ static int do_write(pa_pstream *p) {
 
     p->write.index += (size_t) r;
 
-    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
+    /* see if entire item or minibuffer has been sent */
+    if (p->write.index >=
+        (p->write.minibuf_validsize > 0 ? p->write.minibuf_validsize :
+        (PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])))) {
         pa_assert(p->write.current);
         item_free(p->write.current);
         p->write.current = NULL;
-- 
1.9.1



More information about the pulseaudio-discuss mailing list