[gst-cvs] CVS: gstreamer/gst gstpad.h,1.66,1.67 gstqueue.c,1.17,1.18 gstqueue.h,1.3,1.4 gstscheduler.c,1.34,1.35 gstthread.c,1.57,1.58

Erik Walthinsen omegahacker at users.sourceforge.net
Sat Oct 27 13:29:44 PDT 2001


Update of /cvsroot/gstreamer/gstreamer/gst
In directory usw-pr-cvs1:/tmp/cvs-serv24873

Modified Files:
	gstpad.h gstqueue.c gstqueue.h gstscheduler.c gstthread.c 
Log Message:
added taaz's threading patch, including queue events

Index: gstpad.h
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstpad.h,v
retrieving revision 1.66
retrieving revision 1.67
diff -u -d -r1.66 -r1.67
--- gstpad.h	2001/10/21 18:00:31	1.66
+++ gstpad.h	2001/10/27 20:28:30	1.67
@@ -257,7 +257,7 @@
 #define GST_GPAD_REALPAD(pad)		(((GstGhostPad *)(pad))->realpad)
 
 /* Generic */
-#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
+#define GST_PAD_REALIZE(pad)	(GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
 #define GST_PAD_DIRECTION(pad)		GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
 #define GST_PAD_CAPS(pad)		GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
 #define GST_PAD_PEER(pad)		GST_RPAD_PEER(GST_PAD_REALIZE(pad))

Index: gstqueue.c
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstqueue.c,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- gstqueue.c	2001/09/14 22:16:47	1.17
+++ gstqueue.c	2001/10/27 20:28:30	1.18
@@ -81,6 +81,7 @@
 static GstBuffer *		gst_queue_get			(GstPad *pad);
 static GstBufferPool* 		gst_queue_get_bufferpool 	(GstPad *pad);
 	
+static void			gst_queue_locked_flush			(GstQueue *queue);
 static void			gst_queue_flush			(GstQueue *queue);
 
 static GstElementStateReturn	gst_queue_change_state		(GstElement *element);
@@ -180,9 +181,12 @@
   queue->size_bytes = 100 * 1024;	// 100KB
   queue->size_time = 1000000000LL;	// 1sec
 
-  queue->emptycond = g_cond_new ();
-  queue->fullcond = g_cond_new ();
-  GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
+  queue->qlock = g_mutex_new ();
+  queue->reader = FALSE;
+  queue->writer = FALSE;
+  queue->not_empty = g_cond_new ();
+  queue->not_full = g_cond_new ();
+  GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
 }
 
 static GstBufferPool*
@@ -215,41 +219,18 @@
   return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
 }
 
-static gboolean
-gst_queue_handle_event (GstPad *pad)
-{
-  GstQueue *queue;
-
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
-
-  GST_LOCK (queue);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
-		  queue->level_buffers);
-
-  GST_FLAG_SET (pad, GST_PAD_EOS);
-
-  g_cond_signal (queue->emptycond);
-
-  GST_UNLOCK (queue);
-
-  return TRUE;
-}
-
 static void
 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
 {
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
-
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
   gst_buffer_unref (GST_BUFFER (data));
 }
 
 static void
-gst_queue_flush (GstQueue *queue)
+gst_queue_locked_flush (GstQueue *queue)
 {
   g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
-		  (char *) GST_ELEMENT_NAME (queue));
+		  (gpointer) queue);
   g_slist_free (queue->queue);
 
   queue->queue = NULL;
@@ -258,39 +239,62 @@
 }
 
 static void
+gst_queue_flush (GstQueue *queue)
+{
+  g_mutex_lock (queue->qlock);
+  gst_queue_locked_flush (queue);
+  g_mutex_unlock (queue->qlock);
+}
+
+static void
 gst_queue_chain (GstPad *pad, GstBuffer *buf)
 {
   GstQueue *queue;
-  const guchar *name;
+  gboolean reader;
 
   g_return_if_fail (pad != NULL);
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (buf != NULL);
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-  name = GST_ELEMENT_NAME (queue);
+
+  reader = FALSE;
 
   /* we have to lock the queue since we span threads */
 
-//  GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
-  GST_LOCK (queue);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+  g_mutex_lock (queue->qlock);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
 
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
-    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
-    gst_queue_flush (queue);
+  if (GST_IS_EVENT(buf)) {
+    GstEvent *event = GST_EVENT(buf);
+    switch (GST_EVENT_TYPE(event)) {
+      case GST_EVENT_FLUSH:
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
+        gst_queue_locked_flush (queue);
+        break;
+      default:
+        break;
+    }
   }
 
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
 
-  if (queue->level_buffers >= queue->size_buffers) {
+  if (queue->level_buffers == queue->size_buffers) {
     // if this is a leaky queue...
     if (queue->leaky) {
+      // FIXME don't want to leak events!
       // if we leak on the upstream side, drop the current buffer
       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+        if (GST_IS_EVENT (buf))
+          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+              GST_EVENT_TYPE(GST_EVENT(buf)));
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
         gst_buffer_unref(buf);
         // now we have to clean up and exit right away
-        GST_UNLOCK (queue);
+        g_mutex_unlock (queue->qlock);
         return;
       }
       // otherwise we have to push a buffer off the other end
@@ -300,6 +304,10 @@
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
         front = queue->queue;
         leakbuf = (GstBuffer *)(front->data);
+        if (GST_IS_EVENT (leakbuf))
+          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+              GST_EVENT_TYPE(GST_EVENT(leakbuf)));
         queue->level_buffers--;
         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
         gst_buffer_unref(leakbuf);
@@ -308,7 +316,9 @@
       }
     }
 
-    while (queue->level_buffers >= queue->size_buffers) {
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+        queue->level_buffers, queue->size_buffers);
+    while (queue->level_buffers == queue->size_buffers) {
       // if there's a pending state change for this queue or its manager, switch
       // back to iterator so bottom half of state change executes
       if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -316,36 +326,45 @@
 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != 
 GST_STATE_VOID_PENDING)
       {
-        GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
-          GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
         if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
-          GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
-        GST_UNLOCK(queue);
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+        g_mutex_unlock (queue->qlock);
         cothread_switch(cothread_current_main());
       }
 
-      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
-      g_cond_signal (queue->emptycond);
-      g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
-      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+      if (queue->writer)
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
+      queue->writer = TRUE;
+      g_cond_wait (queue->not_full, queue->qlock);
+      queue->writer = FALSE;
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
     }
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
+        queue->level_buffers, queue->size_buffers);
   }
 
   /* put the buffer on the tail of the list */
   queue->queue = g_slist_append (queue->queue, buf);
   queue->level_buffers++;
   queue->level_bytes += GST_BUFFER_SIZE(buf);
-//  GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
+      GST_DEBUG_PAD_NAME(pad),
+      queue->level_buffers, queue->size_buffers);
 
-  /* if we were empty, but aren't any more, signal a condition */
-  if (queue->level_buffers == 1)
+  /* reader waiting on an empty queue */
+  reader = queue->reader;
+
+  g_mutex_unlock (queue->qlock);
+
+  if (reader)
   {
-    GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
-    g_cond_signal (queue->emptycond);
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
+    g_cond_signal (queue->not_empty);
   }
-
-  GST_UNLOCK (queue);
 }
 
 static GstBuffer *
@@ -354,7 +373,7 @@
   GstQueue *queue;
   GstBuffer *buf = NULL;
   GSList *front;
-  const guchar *name;
+  gboolean writer;
 
   g_assert(pad != NULL);
   g_assert(GST_IS_PAD(pad));
@@ -362,22 +381,16 @@
   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-  name = GST_ELEMENT_NAME (queue);
 
-  /* have to lock for thread-safety */
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
-  GST_LOCK (queue);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
+  writer = FALSE;
 
-  while (!queue->level_buffers) {
-    if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
-      GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
-      GST_UNLOCK(queue);
-      // this return NULL shouldn't hurt anything...
-      return NULL;
-    }
+  /* have to lock for thread-safety */
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+  g_mutex_lock (queue->qlock);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
 
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+  while (queue->level_buffers == 0) {
     // if there's a pending state change for this queue or its manager, switch
     // back to iterator so bottom half of state change executes
     if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -385,37 +398,60 @@
 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != 
 GST_STATE_VOID_PENDING)
     {
-      GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
-        GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
       if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
-        GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
-      GST_UNLOCK(queue);
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+      g_mutex_unlock (queue->qlock);
       cothread_switch(cothread_current_main());
     }
 
-    g_cond_signal (queue->fullcond);
-    g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+    if (queue->reader)
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
+    queue->reader = TRUE;
+    g_cond_wait (queue->not_empty, queue->qlock);
+    queue->reader = FALSE;
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
   }
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
 
   front = queue->queue;
   buf = (GstBuffer *)(front->data);
-  GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
   queue->queue = g_slist_remove_link (queue->queue, front);
   g_slist_free (front);
 
-//  if (queue->level_buffers < queue->size_buffers)
-  if (queue->level_buffers == queue->size_buffers)
-  {
-    GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
-    g_cond_signal (queue->fullcond);
-  }
-
   queue->level_buffers--;
   queue->level_bytes -= GST_BUFFER_SIZE(buf);
-  GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
+      GST_DEBUG_PAD_NAME(pad),
+      queue->level_buffers, queue->size_buffers);
 
-  GST_UNLOCK(queue);
+  /* writer waiting on a full queue */
+  writer = queue->writer;
+
+  g_mutex_unlock (queue->qlock);
+
+  if (writer)
+  {
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
+    g_cond_signal (queue->not_full);
+  }
+
+  // FIXME where should this be? locked?
+  if (GST_IS_EVENT(buf)) {
+    GstEvent *event = GST_EVENT(buf);
+    switch (GST_EVENT_TYPE(event)) {
+      case GST_EVENT_EOS:
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
+        gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
+        break;
+      default:
+        break;
+    }
+  }
 
   return buf;
 }

Index: gstqueue.h
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstqueue.h,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- gstqueue.h	2001/06/25 01:20:08	1.3
+++ gstqueue.h	2001/10/27 20:28:30	1.4
@@ -75,9 +75,12 @@
 
   gint leaky;		/* whether the queue is leaky, and if so at which end */
 
-//  GMutex *lock;	(optimization?)
-  GCond *emptycond;
-  GCond *fullcond;
+  GMutex *qlock;	/* lock for queue (vs object lock) */
+  /* we are single reader and single writer queue */
+  gboolean reader;	/* reader waiting on empty queue */
+  gboolean writer;	/* writer waiting on full queue */
+  GCond *not_empty;	/* signals buffers now available for reading */
+  GCond *not_full;	/* signals space now available for writing */
 
   GTimeVal *timeval;	/* the timeout for the queue locking */
 };

Index: gstscheduler.c
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstscheduler.c,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- gstscheduler.c	2001/10/21 18:00:31	1.34
+++ gstscheduler.c	2001/10/27 20:28:30	1.35
@@ -1490,6 +1490,7 @@
 
         } else {
           GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
+gst_schedule_show(sched);
 	  //eos = TRUE;
         }
       } else {

Index: gstthread.c
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstthread.c,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- gstthread.c	2001/10/20 23:15:29	1.57
+++ gstthread.c	2001/10/27 20:28:30	1.58
@@ -382,10 +382,10 @@
             //
             //FIXME also make this more efficient by keeping list of managed queues
             THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
-            GST_LOCK(e);
-            g_cond_signal((GST_QUEUE(e)->emptycond));
-            g_cond_signal((GST_QUEUE(e)->fullcond));
-            GST_UNLOCK(e);
+            //GST_LOCK(e);
+            g_cond_signal((GST_QUEUE(e)->not_empty));
+            g_cond_signal((GST_QUEUE(e)->not_full));
+            //GST_UNLOCK(e);
           }
           else
           {
@@ -417,10 +417,10 @@
               if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
               {
                 THR_DEBUG("  element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
-                GST_LOCK(peerelement);
-                g_cond_signal(GST_QUEUE(peerelement)->emptycond);
-                g_cond_signal(GST_QUEUE(peerelement)->fullcond);
-                GST_UNLOCK(peerelement);
+                //GST_LOCK(peerelement);
+                g_cond_signal(GST_QUEUE(peerelement)->not_empty);
+                g_cond_signal(GST_QUEUE(peerelement)->not_full);
+                //GST_UNLOCK(peerelement);
               }
             }
           }





More information about the Gstreamer-commits mailing list