[0.11] gstreamer: bufferpool: Refactor stopping of the pool

Wim Taymans wtay at kemper.freedesktop.org
Fri Mar 4 07:18:51 PST 2011


Module: gstreamer
Branch: 0.11
Commit: 0986f3a64ad50ad5c4e2e791f13d22a1d26b655a
URL:    http://cgit.freedesktop.org/gstreamer/gstreamer/commit/?id=0986f3a64ad50ad5c4e2e791f13d22a1d26b655a

Author: Wim Taymans <wim.taymans at collabora.co.uk>
Date:   Mon Feb 21 18:43:19 2011 +0100

bufferpool: Refactor stopping of the pool

Move some methods around.
Make sure we check for config parsing errors.
Increment the outstanding buffers before calling acquire so that we can be sure
that set_active() doesn't free the pool from under us.

---

 gst/gstbufferpool.c |  165 +++++++++++++++++++++++++++++----------------------
 1 files changed, 93 insertions(+), 72 deletions(-)

diff --git a/gst/gstbufferpool.c b/gst/gstbufferpool.c
index 7ab221a..8ac4cb6 100644
--- a/gst/gstbufferpool.c
+++ b/gst/gstbufferpool.c
@@ -49,9 +49,9 @@
 struct _GstBufferPoolPrivate
 {
   GStaticRecMutex rec_lock;
+  guint size;
   guint min_buffers;
   guint max_buffers;
-  guint size;
   guint prefix;
   guint postfix;
   guint align;
@@ -149,6 +149,30 @@ gst_buffer_pool_new (void)
   return result;
 }
 
+static GstFlowReturn
+default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
+    GstBufferPoolParams * params)
+{
+  guint size, align;
+  GstBufferPoolPrivate *priv = pool->priv;
+
+  *buffer = gst_buffer_new ();
+
+  align = priv->align - 1;
+  size = priv->prefix + priv->postfix + priv->size + align;
+  if (size > 0) {
+    guint8 *memptr;
+
+    memptr = g_malloc (size);
+    GST_BUFFER_MALLOCDATA (*buffer) = memptr;
+    memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
+    GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
+    GST_BUFFER_SIZE (*buffer) = priv->size;
+  }
+
+  return GST_FLOW_OK;
+}
+
 /* the default implementation for preallocating the buffers
  * in the pool */
 static gboolean
@@ -190,12 +214,34 @@ alloc_failed:
   }
 }
 
+/* must be called with the lock */
+static gboolean
+do_start (GstBufferPool * pool)
+{
+  if (!pool->started) {
+    GstBufferPoolClass *pclass;
+
+    pclass = GST_BUFFER_POOL_GET_CLASS (pool);
+
+    /* start the pool, subclasses should allocate buffers and put them
+     * in the queue */
+    if (G_LIKELY (pclass->start)) {
+      if (!pclass->start (pool))
+        return FALSE;
+    }
+    pool->started = TRUE;
+  }
+  return TRUE;
+}
+
+
 static void
 default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
 {
   gst_buffer_unref (buffer);
 }
 
+/* must be called with the lock */
 static gboolean
 default_stop (GstBufferPool * pool)
 {
@@ -214,26 +260,7 @@ default_stop (GstBufferPool * pool)
   return TRUE;
 }
 
-static gboolean
-do_start (GstBufferPool * pool)
-{
-
-  if (!pool->started) {
-    GstBufferPoolClass *pclass;
-
-    pclass = GST_BUFFER_POOL_GET_CLASS (pool);
-
-    /* start the pool, subclasses should allocate buffers and put them
-     * in the queue */
-    if (G_LIKELY (pclass->start)) {
-      if (!pclass->start (pool))
-        return FALSE;
-    }
-    pool->started = TRUE;
-  }
-  return TRUE;
-}
-
+/* must be called with the lock */
 static gboolean
 do_stop (GstBufferPool * pool)
 {
@@ -335,12 +362,28 @@ static gboolean
 default_set_config (GstBufferPool * pool, GstStructure * config)
 {
   GstBufferPoolPrivate *priv = pool->priv;
+  guint size, min_buffers, max_buffers;
+  guint prefix, postfix, align;
 
   /* parse the config and keep around */
-  gst_buffer_pool_config_get (config, &priv->size, &priv->min_buffers,
-      &priv->max_buffers, &priv->prefix, &priv->postfix, &priv->align);
+  if (!gst_buffer_pool_config_get (config, &size, &min_buffers,
+          &max_buffers, &prefix, &postfix, &align))
+    goto wrong_config;
+
+  priv->size = size;
+  priv->min_buffers = min_buffers;
+  priv->max_buffers = max_buffers;
+  priv->prefix = prefix;
+  priv->postfix = postfix;
+  priv->align = align;
 
   return TRUE;
+
+wrong_config:
+  {
+    GST_WARNING_OBJECT (pool, "invalid config");
+    return FALSE;
+  }
 }
 
 /**
@@ -493,30 +536,6 @@ gst_buffer_pool_config_get (GstStructure * config, guint * size,
 }
 
 static GstFlowReturn
-default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
-    GstBufferPoolParams * params)
-{
-  guint size, align;
-  GstBufferPoolPrivate *priv = pool->priv;
-
-  *buffer = gst_buffer_new ();
-
-  align = priv->align - 1;
-  size = priv->prefix + priv->postfix + priv->size + align;
-  if (size > 0) {
-    guint8 *memptr;
-
-    memptr = g_malloc (size);
-    GST_BUFFER_MALLOCDATA (*buffer) = memptr;
-    memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
-    GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
-    GST_BUFFER_SIZE (*buffer) = priv->size;
-  }
-
-  return GST_FLOW_OK;
-}
-
-static GstFlowReturn
 default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
     GstBufferPoolParams * params)
 {
@@ -527,12 +546,12 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
   while (TRUE) {
-    if (g_atomic_int_get (&pool->flushing))
+    if (G_UNLIKELY (g_atomic_int_get (&pool->flushing)))
       return GST_FLOW_WRONG_STATE;
 
     /* try to get a buffer from the queue */
     *buffer = gst_atomic_queue_pop (pool->queue);
-    if (*buffer) {
+    if (G_LIKELY (*buffer)) {
       gst_poll_read_control (pool->poll);
       result = GST_FLOW_OK;
       break;
@@ -561,6 +580,24 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
   return result;
 }
 
+static inline void
+dec_outstanding (GstBufferPool * pool)
+{
+  if (g_atomic_int_dec_and_test (&pool->outstanding)) {
+    /* all buffers are returned to the pool, see if we need to free them */
+    if (g_atomic_int_get (&pool->flushing)) {
+      /* take the lock so that set_active is not run concurrently */
+      GST_BUFFER_POOL_LOCK (pool);
+      /* recheck the flushing state in the lock, the pool could have been
+       * set to active again */
+      if (g_atomic_int_get (&pool->flushing))
+        do_stop (pool);
+
+      GST_BUFFER_POOL_UNLOCK (pool);
+    }
+  }
+}
+
 /**
  * gst_buffer_pool_acquire_buffer:
  * @pool: a #GstBufferPool
@@ -587,13 +624,17 @@ gst_buffer_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
 
   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
+  /* assume we'll have one more outstanding buffer we need to do that so
+   * that concurrent set_active doesn't clear the buffers */
+  g_atomic_int_inc (&pool->outstanding);
+
   if (G_LIKELY (pclass->acquire_buffer))
     result = pclass->acquire_buffer (pool, buffer, params);
   else
     result = GST_FLOW_NOT_SUPPORTED;
 
-  if (G_LIKELY (result == GST_FLOW_OK && *buffer))
-    g_atomic_int_inc (&pool->outstanding);
+  if (G_UNLIKELY (result != GST_FLOW_OK))
+    dec_outstanding (pool);
 
   return result;
 }
@@ -630,25 +671,5 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
   if (G_LIKELY (pclass->release_buffer))
     pclass->release_buffer (pool, buffer);
 
-  if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
-    if (g_atomic_int_dec_and_test (&pool->outstanding)) {
-      /* take the lock so that set_active is not run concurrently */
-      GST_BUFFER_POOL_LOCK (pool);
-      /* recheck the flushing state in the lock, the pool could have been
-       * set to active again */
-      if (g_atomic_int_get (&pool->flushing)) {
-        if (!do_stop (pool))
-          goto stop_failed;
-      }
-      GST_BUFFER_POOL_UNLOCK (pool);
-    }
-  }
-  return;
-
-stop_failed:
-  {
-    GST_WARNING_OBJECT (pool, "stop failed");
-    GST_BUFFER_POOL_UNLOCK (pool);
-    return;
-  }
+  dec_outstanding (pool);
 }



More information about the gstreamer-commits mailing list