[gst-cvs] gst-plugins-good: pulsesink: optimize communication with PulseAudio using pa_stream_begin_write

Sebastian Dröge slomo at kemper.freedesktop.org
Wed Jun 2 05:07:02 PDT 2010


Module: gst-plugins-good
Branch: master
Commit: 66a76d1f65141ec1180809d2fc258b1e4ecb9768
URL:    http://cgit.freedesktop.org/gstreamer/gst-plugins-good/commit/?id=66a76d1f65141ec1180809d2fc258b1e4ecb9768

Author: Pierre-Louis Bossart <pierre-louis.bossart at intel.com>
Date:   Tue Jun  1 18:54:41 2010 -0500

pulsesink: optimize communication with PulseAudio using pa_stream_begin_write

---

 configure.ac          |    4 +
 ext/pulse/pulsesink.c |  167 ++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 170 insertions(+), 1 deletions(-)

diff --git a/configure.ac b/configure.ac
index d69031a..7782f16 100644
--- a/configure.ac
+++ b/configure.ac
@@ -803,6 +803,10 @@ AG_GST_CHECK_FEATURE(PULSE, [pulseaudio plug-in], pulseaudio, [
   if test x$HAVE_PULSE_0_9_15 = xyes; then
     AC_DEFINE(HAVE_PULSE_0_9_15, 1, [defined if pulseaudio >= 0.9.15 is available])
   fi
+  AG_GST_PKG_CHECK_MODULES(PULSE_0_9_16, libpulse >= 0.9.16)
+  if test x$HAVE_PULSE_0_9_16 = xyes; then
+    AC_DEFINE(HAVE_PULSE_0_9_16, 1, [defined if pulseaudio >= 0.9.16 is available])
+  fi
   AG_GST_PKG_CHECK_MODULES(PULSE_0_9_20, libpulse >= 0.9.20)
   if test x$HAVE_PULSE_0_9_20 = xyes; then
     AC_DEFINE(HAVE_PULSE_0_9_20, 1, [defined if pulseaudio >= 0.9.20 is available])
diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c
index 08a1d7e..b723c1c 100644
--- a/ext/pulse/pulsesink.c
+++ b/ext/pulse/pulsesink.c
@@ -112,11 +112,18 @@ struct _GstPulseRingBuffer
 
   pa_sample_spec sample_spec;
 
+#ifdef HAVE_PULSE_0_9_16
+  void *m_data;
+  size_t m_towrite;
+  size_t m_writable;
+  gint64 m_offset;
+  gint64 m_lastoffset;
+#endif
+
   gboolean corked:1;
   gboolean in_commit:1;
   gboolean paused:1;
 };
-
 struct _GstPulseRingBufferClass
 {
   GstRingBufferClass parent_class;
@@ -220,6 +227,14 @@ gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf,
   pbuf->sample_spec.channels = 0;
 #endif
 
+#ifdef HAVE_PULSE_0_9_16
+  pbuf->m_data = NULL;
+  pbuf->m_towrite = 0;
+  pbuf->m_writable = 0;
+  pbuf->m_offset = 0;
+  pbuf->m_lastoffset = 0;
+#endif
+
   pbuf->corked = TRUE;
   pbuf->in_commit = FALSE;
   pbuf->paused = FALSE;
@@ -229,6 +244,21 @@ static void
 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
 {
   if (pbuf->stream) {
+
+#ifdef HAVE_PULSE_0_9_16
+    if (pbuf->m_data) {
+      /* drop shm memory buffer */
+      pa_stream_cancel_write (pbuf->stream);
+
+      /* reset internal variables */
+      pbuf->m_data = NULL;
+      pbuf->m_towrite = 0;
+      pbuf->m_writable = 0;
+      pbuf->m_offset = 0;
+      pbuf->m_lastoffset = 0;
+    }
+#endif
+
     pa_stream_disconnect (pbuf->stream);
 
     /* Make sure we don't get any further callbacks */
@@ -1236,6 +1266,135 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
         offset);
 
+#ifdef HAVE_PULSE_0_9_16
+    if (offset != pbuf->m_lastoffset)
+      GST_LOG_OBJECT (psink,
+          "discontinuity, offset is %, last offset was %" G_GINT64_FORMAT,
+          offset, pbuf->m_lastoffset);
+
+    towrite = out_samples * bps;
+    if ((pbuf->m_writable < towrite) || (offset != pbuf->m_lastoffset)) {
+
+      /* if no room left or discontinuity in offset,
+         we need to flush data and get a new buffer */
+
+      /* flush the buffer if possible */
+      if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
+
+        GST_LOG_OBJECT (psink,
+            "flushing %d samples at offset %" G_GINT64_FORMAT,
+            pbuf->m_towrite / bps, pbuf->m_offset);
+
+        if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
+                pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
+          goto write_failed;
+        }
+      }
+      pbuf->m_towrite = 0;
+      pbuf->m_offset = offset;  /* keep track of current offset */
+
+      /* get a buffer to write in for now on */
+      for (;;) {
+        pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
+
+        if (pbuf->m_writable == (size_t) - 1)
+          goto writable_size_failed;
+
+        pbuf->m_writable /= bps;
+        pbuf->m_writable *= bps;        /* handle only complete samples */
+
+        if (pbuf->m_writable >= towrite)
+          break;
+
+        /* see if we need to uncork because we have no free space */
+        if (pbuf->corked) {
+          if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
+            goto uncork_failed;
+        }
+
+        /* we can't write a single byte, wait a bit */
+        GST_LOG_OBJECT (psink, "waiting for free space");
+        pa_threaded_mainloop_wait (psink->mainloop);
+
+        if (pbuf->paused)
+          goto was_paused;
+      }
+
+      if (pa_stream_begin_write (pbuf->stream, &pbuf->m_data,
+              &pbuf->m_writable) < 0) {
+        GST_LOG_OBJECT (psink, "pa_stream_begin_write() failed");
+        goto writable_size_failed;
+      }
+
+      /* make sure we only buffer up latency-time samples */
+      if (pbuf->m_writable > buf->spec.segsize) {
+        if (buf->spec.segsize < towrite) {
+          /* leave room for one frame */
+          pbuf->m_writable = towrite;
+        } else {
+          /* limit buffering to latency-time value
+           * note the amount of data passed to PA isn't going to be exactly
+           * latency-time, if there isn't enough room for towrite we flush and
+           * ask for a new buffer. Worst case the buffer passed will be
+           * segsize-towrite+1 bytes */
+
+          pbuf->m_writable = buf->spec.segsize;
+        }
+
+        GST_LOG_OBJECT (psink, "Limiting buffering to %" G_GSIZE_FORMAT,
+            pbuf->m_writable);
+      }
+    }
+
+    avail = out_samples;
+    GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
+        (guint) avail, offset);
+
+    if (G_LIKELY (inr == outr && !reverse)) {
+
+      /* no rate conversion, simply write out the samples */
+      /* copy the data into internal buffer */
+      assert (pbuf->m_writable >= towrite);
+
+      memcpy ((guint8 *) pbuf->m_data + pbuf->m_towrite, data, towrite);
+      pbuf->m_towrite += towrite;
+      pbuf->m_writable -= towrite;
+
+      data += towrite;
+      in_samples -= avail;
+      out_samples -= avail;
+    } else {
+      guint8 *dest, *d, *d_end;
+
+      /* write into the PulseAudio shm buffer */
+      dest = d = (guint8 *) pbuf->m_data + pbuf->m_towrite;
+      d_end = d + towrite;
+
+      if (!reverse) {
+        if (inr >= outr)
+          /* forward speed up */
+          FWD_UP_SAMPLES (data, data_end, d, d_end);
+        else
+          /* forward slow down */
+          FWD_DOWN_SAMPLES (data, data_end, d, d_end);
+      } else {
+        if (inr >= outr)
+          /* reverse speed up */
+          REV_UP_SAMPLES (data, data_end, d, d_end);
+        else
+          /* reverse slow down */
+          REV_DOWN_SAMPLES (data, data_end, d, d_end);
+      }
+      /* see what we have left to write */
+      towrite = (d - dest);
+      pbuf->m_towrite += towrite;
+      pbuf->m_writable -= towrite;
+
+      avail = towrite / bps;
+    }
+
+#else
+
     for (;;) {
       /* FIXME, this is not quite right */
       if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
@@ -1314,9 +1473,15 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
 
       avail = towrite / bps;
     }
+#endif /* HAVE_PULSE_0_9_16 */
+
     *sample += avail;
     offset += avail * bps;
 
+#ifdef HAVE_PULSE_0_9_16
+    pbuf->m_lastoffset = offset;
+#endif
+
     /* check if we need to uncork after writing the samples */
     if (pbuf->corked) {
       const pa_timing_info *info;





More information about the Gstreamer-commits mailing list