[pulseaudio-discuss] [PATCH RFCv3 16/51] pstream: Use small minibuffer to combine several read()s if possible

Peter Meerwald pmeerw at pmeerw.net
Tue Nov 4 15:26:11 PST 2014


From: Peter Meerwald <p.meerwald at bct-electronic.com>

idea is similar to b4342845d, Optimize write of smaller packages, but for read

use a READ_MINIBUF_SIZE of 40 bytes chosen so that a descriptor (20 bytes)
followed by shminfo (16 bytes) or by another descriptors (20 bytes) can be read() at once

only make use of this optimization when pstream is using SHM (so no
partial audio data has to be copied)

v3:
* use minibuffer size of 80 bytes; the clients often receives a descriptor +
  tagstruct (20 bytes each) and a shmrelease descriptor (20 bytes), so it makes
  sense to process at least 60 bytes in one go

v2: (thanks David Henningson)
* better documentation
* only use for SHM

Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
---
 src/pulsecore/pstream.c | 95 ++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 78 insertions(+), 17 deletions(-)

diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 3926353..a53a074 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -76,7 +76,8 @@ typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
 #define PA_PSTREAM_SHM_SIZE (PA_PSTREAM_SHM_MAX*sizeof(uint32_t))
 
-#define MINIBUF_SIZE (256)
+#define WRITE_MINIBUF_SIZE 256
+#define READ_MINIBUF_SIZE 80
 
 /* To allow uploading a single sample in one frame, this value should be the
  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
@@ -120,7 +121,10 @@ struct item_info {
 };
 
 struct pstream_read {
-    pa_pstream_descriptor descriptor;
+    union {
+        uint8_t minibuf[READ_MINIBUF_SIZE];
+        pa_pstream_descriptor descriptor;
+    };
     pa_memblock *memblock;
     pa_packet *packet;
     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
@@ -143,7 +147,7 @@ struct pa_pstream {
 
     struct {
         union {
-            uint8_t minibuf[MINIBUF_SIZE];
+            uint8_t minibuf[WRITE_MINIBUF_SIZE];
             pa_pstream_descriptor descriptor;
         };
         struct item_info* current;
@@ -535,7 +539,7 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write.data = (void *) pa_packet_data(p->write.current->per_type.packet, &plen);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
 
-        if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
+        if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
         }
@@ -548,7 +552,7 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write.data = (void *) pa_tagstruct_data(p->write.current->per_type.tagstruct, &tlen);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) tlen);
 
-        if (tlen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
+        if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen;
         }
@@ -731,6 +735,9 @@ fail:
     return -1;
 }
 
+/* Prepare processing of payload depending on descriptor by setting up
+ * appropriate buffer; returns 0 if if there is no payload, 1 if payload
+ * is expected, -1 on error */
 static int handle_descriptor(pa_pstream *p, struct pstream_read *re) {
     uint32_t flags, length, channel;
 
@@ -938,7 +945,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
     pa_assert(p);
     pa_assert(PA_REFCNT_VALUE(p) > 0);
 
-    if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
+    if (p->use_shm && re->index == 0) {
+        /* Special case: expecting a new descriptor but provide extra space;
+         * often we can save a read() */
+        d = (uint8_t*) re->minibuf;
+        l = READ_MINIBUF_SIZE;
+    } else if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
         d = (uint8_t*) re->descriptor + re->index;
         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
     } else {
@@ -989,18 +1001,67 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
     if (release_memblock)
         pa_memblock_release(release_memblock);
 
-    re->index += (size_t) r;
-
-    if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
-        handle_descriptor(p, re);
-    } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
-        /* Frame payload available */
-        if (re->memblock && p->receive_memblock_callback)
-            handle_payload(p, re, r);
+    if (p->use_shm && re->index == 0 && r > (int) PA_PSTREAM_DESCRIPTOR_SIZE) {
+        uint8_t *m = re->minibuf;
+
+        /* Special case: minibuffer contains descriptor plus some extra data */
+        while (r > 0) {
+            int frame_remaining;
+            if (r >= (int) PA_PSTREAM_DESCRIPTOR_SIZE) {
+                /* Put descriptor data into place and handle it */
+                memmove(re->descriptor, m, PA_PSTREAM_DESCRIPTOR_SIZE);
+                frame_remaining = handle_descriptor(p, re);
+                r -= PA_PSTREAM_DESCRIPTOR_SIZE;
+                m += PA_PSTREAM_DESCRIPTOR_SIZE;
+            } else {
+                /* Move remaining partial descriptor data into place */
+                memmove(re->descriptor, m, r);
+                re->index = r;
+                break;
+            }
 
-        /* Frame complete */
-        if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
-            frame_complete(p, re);
+            /* Descriptor indicated that frame has payload */
+            if (frame_remaining > 0) {
+                pa_assert(re->data || re->memblock);
+
+                /* Copy data from minibuffer to buffer prepared by handle_descriptor() */
+                l = PA_MIN(ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), r);
+                if (re->data)
+                    memcpy(re->data, m, l);
+                else {
+                    d = pa_memblock_acquire(re->memblock);
+                    memcpy(d, m, l);
+                    pa_memblock_release(re->memblock);
+                }
+                r -= l;
+                m += l;
+                re->index = PA_PSTREAM_DESCRIPTOR_SIZE + l;
+
+                /* Frame payload available */
+                if (re->memblock && p->receive_memblock_callback)
+                    handle_payload(p, re, l);
+
+                /* Frame complete */
+                if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
+                    frame_complete(p, re);
+            }
+            else
+                frame_done(p, re);
+        }
+    } else {
+        re->index += (size_t) r;
+
+        if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+            handle_descriptor(p, re);
+        } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
+            /* Frame payload available */
+            if (re->memblock && p->receive_memblock_callback)
+                handle_payload(p, re, r);
+
+            /* Frame complete */
+            if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
+                frame_complete(p, re);
+        }
     }
 
     return 0;
-- 
1.9.1



More information about the pulseaudio-discuss mailing list