[pulseaudio-discuss] [PATCH] pulse: Fix hole handling in pa_stream_peek().

Tanu Kaskinen tanuk at iki.fi
Wed Nov 7 00:36:40 PST 2012


Previously, if there was a hole in a recording stream,
pa_stream_peek() would crash. Holes could be handled silently inside
pa_stream_peek() by generating silence (wouldn't work for compressed
streams, though) or by skipping any holes. However, I think it's
better to let the caller decide how the holes should be handled, so
in case of holes, pa_stream_peek() will return NULL data pointer and
the length of the hole in the nbytes argument.

This change is technically an interface break, because previously the
documentation didn't mention the possibility of holes that need
special handling. However, since holes caused crashing anyway in the
past, it's not a regression if applications keep misbehaving due to
not handing holes properly.

When adapting pacat to this change, a dependency to sample-util was
added, and that required moving some code from libpulsecore to
libpulsecommon.

Some words about when holes can appear in recording streams: I think
it would be reasonable behavior if overruns due to the application
reading data too slowly would cause holes. Currently that's not the
case - overruns will just cause audio to be skipped. But the point is
that this might change some day. I'm not sure how holes can occur
with the current code, but as the linked bug shows, they can happen.
It's most likely due to recording from a monitor source where the
thing being monitored has holes in its playback stream.

BugLink: http://bugs.launchpad.net/bugs/1058200
---
 src/Makefile.am    |    8 ++++----
 src/pulse/simple.c |    7 ++++++-
 src/pulse/stream.c |   20 +++++++++++++++----
 src/pulse/stream.h |   20 ++++++++++++++-----
 src/utils/pacat.c  |   55 +++++++++++++++++++++++++++++++++++++++-------------
 src/utils/padsp.c  |   15 +++++++++++++-
 6 files changed, 97 insertions(+), 28 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index 155f908..f8a0230 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -586,6 +586,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/dynarray.c pulsecore/dynarray.h \
 		pulsecore/endianmacros.h \
 		pulsecore/flist.c pulsecore/flist.h \
+		pulsecore/g711.c pulsecore/g711.h \
 		pulsecore/hashmap.c pulsecore/hashmap.h \
 		pulsecore/i18n.c pulsecore/i18n.h \
 		pulsecore/idxset.c pulsecore/idxset.h \
@@ -617,6 +618,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/queue.c pulsecore/queue.h \
 		pulsecore/random.c pulsecore/random.h \
 		pulsecore/refcnt.h \
+		pulsecore/sample-util.c pulsecore/sample-util.h \
 		pulsecore/shm.c pulsecore/shm.h \
 		pulsecore/bitset.c pulsecore/bitset.h \
 		pulsecore/socket-client.c pulsecore/socket-client.h \
@@ -624,6 +626,8 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/socket-util.c pulsecore/socket-util.h \
 		pulsecore/strbuf.c pulsecore/strbuf.h \
 		pulsecore/strlist.c pulsecore/strlist.h \
+		pulsecore/svolume_c.c pulsecore/svolume_arm.c \
+		pulsecore/svolume_mmx.c pulsecore/svolume_sse.c \
 		pulsecore/tagstruct.c pulsecore/tagstruct.h \
 		pulsecore/time-smoother.c pulsecore/time-smoother.h \
 		pulsecore/tokenizer.c pulsecore/tokenizer.h \
@@ -838,7 +842,6 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/core-subscribe.c pulsecore/core-subscribe.h \
 		pulsecore/core.c pulsecore/core.h \
 		pulsecore/fdsem.c pulsecore/fdsem.h \
-		pulsecore/g711.c pulsecore/g711.h \
 		pulsecore/hook-list.c pulsecore/hook-list.h \
 		pulsecore/ltdl-helper.c pulsecore/ltdl-helper.h \
 		pulsecore/modargs.c pulsecore/modargs.h \
@@ -853,13 +856,10 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \
 		pulsecore/remap_mmx.c pulsecore/remap_sse.c \
 		pulsecore/resampler.c pulsecore/resampler.h \
 		pulsecore/rtpoll.c pulsecore/rtpoll.h \
-		pulsecore/sample-util.c pulsecore/sample-util.h \
 		pulsecore/cpu.h \
 		pulsecore/cpu-arm.c pulsecore/cpu-arm.h \
 		pulsecore/cpu-x86.c pulsecore/cpu-x86.h \
 		pulsecore/cpu-orc.c pulsecore/cpu-orc.h \
-		pulsecore/svolume_c.c pulsecore/svolume_arm.c \
-		pulsecore/svolume_mmx.c pulsecore/svolume_sse.c \
 		pulsecore/sconv-s16be.c pulsecore/sconv-s16be.h \
 		pulsecore/sconv-s16le.c pulsecore/sconv-s16le.h \
 		pulsecore/sconv_sse.c \
diff --git a/src/pulse/simple.c b/src/pulse/simple.c
index 3524296..860cd18 100644
--- a/src/pulse/simple.c
+++ b/src/pulse/simple.c
@@ -331,9 +331,14 @@ int pa_simple_read(pa_simple *p, void*data, size_t length, int *rerror) {
             r = pa_stream_peek(p->stream, &p->read_data, &p->read_length);
             CHECK_SUCCESS_GOTO(p, rerror, r == 0, unlock_and_fail);
 
-            if (!p->read_data) {
+            if (p->read_length <= 0) {
                 pa_threaded_mainloop_wait(p->mainloop);
                 CHECK_DEAD_GOTO(p, rerror, unlock_and_fail);
+            } else if (!p->read_data) {
+                /* There's a hole in the stream, skip it. We could generate
+                 * silence, but that wouldn't work for compressed streams. */
+                r = pa_stream_drop(p->stream);
+                CHECK_SUCCESS_GOTO(p, rerror, r == 0, unlock_and_fail);
             } else
                 p->read_index = 0;
         }
diff --git a/src/pulse/stream.c b/src/pulse/stream.c
index 2b6d306..f692d37 100644
--- a/src/pulse/stream.c
+++ b/src/pulse/stream.c
@@ -1593,9 +1593,17 @@ int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
     if (!s->peek_memchunk.memblock) {
 
         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
+            /* record_memblockq is empty. */
             *data = NULL;
             *length = 0;
             return 0;
+
+        } else if (!s->peek_memchunk.memblock) {
+            /* record_memblockq isn't empty, but it doesn't have any data at
+             * the current read index. */
+            *data = NULL;
+            *length = s->peek_memchunk.length;
+            return 0;
         }
 
         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
@@ -1614,7 +1622,7 @@ int pa_stream_drop(pa_stream *s) {
     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
-    PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
+    PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
 
     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
 
@@ -1622,9 +1630,13 @@ int pa_stream_drop(pa_stream *s) {
     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
 
-    pa_assert(s->peek_data);
-    pa_memblock_release(s->peek_memchunk.memblock);
-    pa_memblock_unref(s->peek_memchunk.memblock);
+    if (s->peek_memchunk.memblock) {
+        pa_assert(s->peek_data);
+        s->peek_data = NULL;
+        pa_memblock_release(s->peek_memchunk.memblock);
+        pa_memblock_unref(s->peek_memchunk.memblock);
+    }
+
     pa_memchunk_reset(&s->peek_memchunk);
 
     return 0;
diff --git a/src/pulse/stream.h b/src/pulse/stream.h
index b4464fa..a6785ec 100644
--- a/src/pulse/stream.h
+++ b/src/pulse/stream.h
@@ -534,11 +534,21 @@ int pa_stream_write(
         pa_seek_mode_t seek      /**< Seek mode, must be PA_SEEK_RELATIVE for upload streams */);
 
 /** Read the next fragment from the buffer (for recording streams).
- * \a data will point to the actual data and \a nbytes will contain the size
- * of the data in bytes (which can be less or more than a complete
- * fragment). Use pa_stream_drop() to actually remove the data from
- * the buffer. If no data is available this will return a NULL
- * pointer. */
+ * If there is data at the current read index, \a data will point to
+ * the actual data and \a nbytes will contain the size of the data in
+ * bytes (which can be less or more than a complete fragment).
+ *
+ * If there is no data at the current read index, it means that either
+ * the buffer is empty or it contains a hole (that is, the write index
+ * is ahead of the read index but there's no data where the read index
+ * points at). If the buffer is empty, \a data will be NULL and
+ * \a nbytes will be 0. If there is a hole, \a data will be NULL and
+ * \a nbytes will contain the length of the hole.
+ *
+ * Use pa_stream_drop() to actually remove the data from the buffer
+ * and move the read index forward. pa_stream_drop() should not be
+ * called if the buffer is empty, but it should be called if there is
+ * a hole. */
 int pa_stream_peek(
         pa_stream *p                 /**< The stream to use */,
         const void **data            /**< Pointer to pointer that will point to data */,
diff --git a/src/utils/pacat.c b/src/utils/pacat.c
index 2cd8aa5..293006b 100644
--- a/src/utils/pacat.c
+++ b/src/utils/pacat.c
@@ -45,6 +45,7 @@
 #include <pulsecore/log.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/sndfile-util.h>
+#include <pulsecore/sample-util.h>
 
 #define TIME_EVENT_USEC 50000
 
@@ -257,24 +258,36 @@ static void stream_read_callback(pa_stream *s, size_t length, void *userdata) {
                 return;
             }
 
-            pa_assert(data);
             pa_assert(length > 0);
 
-            if (buffer) {
-                buffer = pa_xrealloc(buffer, buffer_length + length);
-                memcpy((uint8_t*) buffer + buffer_length, data, length);
-                buffer_length += length;
-            } else {
-                buffer = pa_xmalloc(length);
-                memcpy(buffer, data, length);
-                buffer_length = length;
-                buffer_index = 0;
+            /* If there is a hole in the stream, we generate silence, except
+             * if it's a passthrough stream in which case we skip the hole. */
+            if (data || !(flags & PA_STREAM_PASSTHROUGH)) {
+                if (buffer) {
+                    buffer = pa_xrealloc(buffer, buffer_length + length);
+                    if (data)
+                        memcpy((uint8_t *) buffer + buffer_length, data, length);
+                    else
+                        pa_silence_memory((uint8_t *) buffer + buffer_length, length, &sample_spec);
+                    buffer_length += length;
+                } else {
+                    buffer = pa_xmalloc(length);
+                    if (data)
+                        memcpy(buffer, data, length);
+                    else
+                        pa_silence_memory(buffer, length, &sample_spec);
+                    buffer_length = length;
+                    buffer_index = 0;
+                }
             }
 
             pa_stream_drop(s);
         }
 
     } else {
+        void *silence_buffer = NULL;
+        size_t silence_buffer_length = 0;
+
         pa_assert(sndfile);
 
         while (pa_stream_readable_size(s) > 0) {
@@ -287,23 +300,39 @@ static void stream_read_callback(pa_stream *s, size_t length, void *userdata) {
                 return;
             }
 
-            pa_assert(data);
             pa_assert(length > 0);
 
+            if (!data && (flags & PA_STREAM_PASSTHROUGH)) {
+                pa_stream_drop(s);
+                continue;
+            }
+
+            if (!data && length > silence_buffer_length) {
+                if (silence_buffer)
+                    silence_buffer = pa_xrealloc(silence_buffer, length);
+                else
+                    silence_buffer = pa_xmalloc(length);
+
+                pa_silence_memory(silence_buffer, length, &sample_spec);
+                silence_buffer_length = length;
+            }
+
             if (writef_function) {
                 size_t k = pa_frame_size(&sample_spec);
 
-                if ((bytes = writef_function(sndfile, data, (sf_count_t) (length/k))) > 0)
+                if ((bytes = writef_function(sndfile, data ? data : silence_buffer, (sf_count_t) (length/k))) > 0)
                     bytes *= (sf_count_t) k;
 
             } else
-                bytes = sf_write_raw(sndfile, data, (sf_count_t) length);
+                bytes = sf_write_raw(sndfile, data ? data : silence_buffer, (sf_count_t) length);
 
             if (bytes < (sf_count_t) length)
                 quit(1);
 
             pa_stream_drop(s);
         }
+
+        pa_xfree(silence_buffer);
     }
 }
 
diff --git a/src/utils/padsp.c b/src/utils/padsp.c
index f6a3520..858cec8 100644
--- a/src/utils/padsp.c
+++ b/src/utils/padsp.c
@@ -922,9 +922,22 @@ static int fd_info_copy_data(fd_info *i, int force) {
                 return -1;
             }
 
-            if (!data)
+            if (len <= 0)
                 break;
 
+            if (!data) {
+                /* Maybe we should generate silence here, but I'm lazy and
+                 * I'll just skip any holes in the stream. */
+                if (pa_stream_drop(i->rec_stream) < 0) {
+                    debug(DEBUG_LEVEL_NORMAL, __FILE__": pa_stream_drop(): %s\n", pa_strerror(pa_context_errno(i->context)));
+                    return -1;
+                }
+
+                assert(n >= len);
+                n -= len;
+                continue;
+            }
+
             buf = (const char*)data + i->rec_offset;
 
             if ((r = write(i->thread_fd, buf, len - i->rec_offset)) <= 0) {
-- 
1.7.10.4



More information about the pulseaudio-discuss mailing list