[pulseaudio-discuss] [PATCH RFC 16/17] pstream: Use small minibuffer to combine several read()s if possible

Peter Meerwald pmeerw at pmeerw.net
Fri Oct 24 14:21:40 PDT 2014


From: Peter Meerwald <p.meerwald at bct-electronic.com>

idea is similar to b4342845d, Optimize write of smaller packages, but for read

Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
---
 src/pulsecore/pstream.c | 90 +++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 73 insertions(+), 17 deletions(-)

diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 8846d14..91868a7 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -76,7 +76,8 @@ typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
 #define PA_PSTREAM_SHM_SIZE (PA_PSTREAM_SHM_MAX*sizeof(uint32_t))
 
-#define MINIBUF_SIZE (256)
+#define WRITE_MINIBUF_SIZE 256
+#define READ_MINIBUF_SIZE 40
 
 /* To allow uploading a single sample in one frame, this value should be the
  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
@@ -120,7 +121,10 @@ struct item_info {
 };
 
 struct pstream_read {
-    pa_pstream_descriptor descriptor;
+    union {
+        uint8_t minibuf[READ_MINIBUF_SIZE];
+        pa_pstream_descriptor descriptor;
+    };
     pa_memblock *memblock;
     pa_packet *packet;
     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
@@ -143,7 +147,7 @@ struct pa_pstream {
 
     struct {
         union {
-            uint8_t minibuf[MINIBUF_SIZE];
+            uint8_t minibuf[WRITE_MINIBUF_SIZE];
             pa_pstream_descriptor descriptor;
         };
         struct item_info* current;
@@ -535,7 +539,7 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write.data = (void *) pa_packet_data(p->write.current->per_type.packet, &plen);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
 
-        if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
+        if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
         }
@@ -548,7 +552,7 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write.data = (void *) pa_tagstruct_data(p->write.current->per_type.tagstruct, &tlen);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) tlen);
 
-        if (tlen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
+        if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen);
             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen;
         }
@@ -938,7 +942,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
     pa_assert(p);
     pa_assert(PA_REFCNT_VALUE(p) > 0);
 
-    if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
+    if (re->index == 0) {
+        /* special case: expecting a new descriptor but provide extra space;
+         * often we can save a read() */
+        d = (uint8_t*) re->minibuf;
+        l = READ_MINIBUF_SIZE;
+    } else if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
         d = (uint8_t*) re->descriptor + re->index;
         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
     } else {
@@ -989,18 +998,65 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
     if (release_memblock)
         pa_memblock_release(release_memblock);
 
-    re->index += (size_t) r;
-
-    if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
-        handle_descriptor(p, re);
-    } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
-        /* Frame payload available */
-        if (re->memblock && p->receive_memblock_callback)
-            handle_payload(p, re, r);
+    if (re->index == 0 && r > (int) PA_PSTREAM_DESCRIPTOR_SIZE) {
+        uint8_t *m = re->minibuf;
+
+        while (r > 0) {
+            int frame_remaining;
+            if (r >= (int) PA_PSTREAM_DESCRIPTOR_SIZE) {
+                /* Handle descriptor */
+                memmove(re->descriptor, m, PA_PSTREAM_DESCRIPTOR_SIZE);
+                frame_remaining = handle_descriptor(p, re);
+                r -= PA_PSTREAM_DESCRIPTOR_SIZE;
+                m += PA_PSTREAM_DESCRIPTOR_SIZE;
+            } else {
+                /* Move remaining descriptor part */
+                memmove(re->descriptor, m, r);
+                re->index = r;
+                break;
+            }
 
-        /* Frame complete */
-        if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
-            frame_complete(p, re);
+            if (frame_remaining > 0) {
+                /* Copy data from minibuf */
+                pa_assert(re->data || re->memblock);
+
+                l = PA_MIN(ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), r);
+                if (re->data)
+                    memcpy(re->data, m, l);
+                else {
+                    d = pa_memblock_acquire(re->memblock);
+                    memcpy(d, m, l);
+                    pa_memblock_release(re->memblock);
+                }
+                r -= l;
+                m += l;
+                re->index = PA_PSTREAM_DESCRIPTOR_SIZE + l;
+
+                /* Frame payload available */
+                if (re->memblock && p->receive_memblock_callback)
+                    handle_payload(p, re, l);
+
+                /* Frame complete */
+                if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
+                    frame_complete(p, re);
+            }
+            else
+                frame_done(p, re);
+        }
+    } else {
+        re->index += (size_t) r;
+
+        if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+            handle_descriptor(p, re);
+        } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
+            /* Frame payload available */
+            if (re->memblock && p->receive_memblock_callback)
+                handle_payload(p, re, r);
+
+            /* Frame complete */
+            if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE)
+                frame_complete(p, re);
+        }
     }
 
     return 0;
-- 
1.9.1



More information about the pulseaudio-discuss mailing list