[gst-cvs] CVS: gstreamer/gst gstpad.h,1.66,1.67 gstqueue.c,1.17,1.18 gstqueue.h,1.3,1.4 gstscheduler.c,1.34,1.35 gstthread.c,1.57,1.58
Erik Walthinsen
omegahacker at users.sourceforge.net
Sat Oct 27 13:29:44 PDT 2001
Update of /cvsroot/gstreamer/gstreamer/gst
In directory usw-pr-cvs1:/tmp/cvs-serv24873
Modified Files:
gstpad.h gstqueue.c gstqueue.h gstscheduler.c gstthread.c
Log Message:
added taaz's threading patch, including queue events
Index: gstpad.h
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstpad.h,v
retrieving revision 1.66
retrieving revision 1.67
diff -u -d -r1.66 -r1.67
--- gstpad.h 2001/10/21 18:00:31 1.66
+++ gstpad.h 2001/10/27 20:28:30 1.67
@@ -257,7 +257,7 @@
#define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad)
/* Generic */
-#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
+#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
#define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad))
Index: gstqueue.c
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstqueue.c,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- gstqueue.c 2001/09/14 22:16:47 1.17
+++ gstqueue.c 2001/10/27 20:28:30 1.18
@@ -81,6 +81,7 @@
static GstBuffer * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+static void gst_queue_locked_flush (GstQueue *queue);
static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
@@ -180,9 +181,12 @@
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
- queue->emptycond = g_cond_new ();
- queue->fullcond = g_cond_new ();
- GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
+ queue->qlock = g_mutex_new ();
+ queue->reader = FALSE;
+ queue->writer = FALSE;
+ queue->not_empty = g_cond_new ();
+ queue->not_full = g_cond_new ();
+ GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static GstBufferPool*
@@ -215,41 +219,18 @@
return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
}
-static gboolean
-gst_queue_handle_event (GstPad *pad)
-{
- GstQueue *queue;
-
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-
- GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
-
- GST_LOCK (queue);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
- queue->level_buffers);
-
- GST_FLAG_SET (pad, GST_PAD_EOS);
-
- g_cond_signal (queue->emptycond);
-
- GST_UNLOCK (queue);
-
- return TRUE;
-}
-
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
- GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
-
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
gst_buffer_unref (GST_BUFFER (data));
}
static void
-gst_queue_flush (GstQueue *queue)
+gst_queue_locked_flush (GstQueue *queue)
{
g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
- (char *) GST_ELEMENT_NAME (queue));
+ (gpointer) queue);
g_slist_free (queue->queue);
queue->queue = NULL;
@@ -258,39 +239,62 @@
}
static void
+gst_queue_flush (GstQueue *queue)
+{
+ g_mutex_lock (queue->qlock);
+ gst_queue_locked_flush (queue);
+ g_mutex_unlock (queue->qlock);
+}
+
+static void
gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
- const guchar *name;
+ gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- name = GST_ELEMENT_NAME (queue);
+
+ reader = FALSE;
/* we have to lock the queue since we span threads */
-// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
- GST_LOCK (queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+ g_mutex_lock (queue->qlock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
- gst_queue_flush (queue);
+ if (GST_IS_EVENT(buf)) {
+ GstEvent *event = GST_EVENT(buf);
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_FLUSH:
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
+ gst_queue_locked_flush (queue);
+ break;
+ default:
+ break;
+ }
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
- if (queue->level_buffers >= queue->size_buffers) {
+ if (queue->level_buffers == queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
+ // FIXME don't want to leak events!
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+ if (GST_IS_EVENT (buf))
+ fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+ GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+ GST_EVENT_TYPE(GST_EVENT(buf)));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
- GST_UNLOCK (queue);
+ g_mutex_unlock (queue->qlock);
return;
}
// otherwise we have to push a buffer off the other end
@@ -300,6 +304,10 @@
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
+ if (GST_IS_EVENT (leakbuf))
+ fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+ GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+ GST_EVENT_TYPE(GST_EVENT(leakbuf)));
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
@@ -308,7 +316,9 @@
}
}
- while (queue->level_buffers >= queue->size_buffers) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+ queue->level_buffers, queue->size_buffers);
+ while (queue->level_buffers == queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -316,36 +326,45 @@
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_VOID_PENDING)
{
- GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
- GST_UNLOCK(queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+ g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
- g_cond_signal (queue->emptycond);
- g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ if (queue->writer)
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
+ queue->writer = TRUE;
+ g_cond_wait (queue->not_full, queue->qlock);
+ queue->writer = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
+ queue->level_buffers, queue->size_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
-// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
+ GST_DEBUG_PAD_NAME(pad),
+ queue->level_buffers, queue->size_buffers);
- /* if we were empty, but aren't any more, signal a condition */
- if (queue->level_buffers == 1)
+ /* reader waiting on an empty queue */
+ reader = queue->reader;
+
+ g_mutex_unlock (queue->qlock);
+
+ if (reader)
{
- GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
- g_cond_signal (queue->emptycond);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
+ g_cond_signal (queue->not_empty);
}
-
- GST_UNLOCK (queue);
}
static GstBuffer *
@@ -354,7 +373,7 @@
GstQueue *queue;
GstBuffer *buf = NULL;
GSList *front;
- const guchar *name;
+ gboolean writer;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
@@ -362,22 +381,16 @@
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- name = GST_ELEMENT_NAME (queue);
- /* have to lock for thread-safety */
- GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
- GST_LOCK (queue);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
- GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
+ writer = FALSE;
- while (!queue->level_buffers) {
- if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
- GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
- GST_UNLOCK(queue);
- // this return NULL shouldn't hurt anything...
- return NULL;
- }
+ /* have to lock for thread-safety */
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+ g_mutex_lock (queue->qlock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ while (queue->level_buffers == 0) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -385,37 +398,60 @@
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_VOID_PENDING)
{
- GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
- GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
- GST_UNLOCK(queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+ g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
- g_cond_signal (queue->fullcond);
- g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+ if (queue->reader)
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
+ queue->reader = TRUE;
+ g_cond_wait (queue->not_empty, queue->qlock);
+ queue->reader = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
- GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
-// if (queue->level_buffers < queue->size_buffers)
- if (queue->level_buffers == queue->size_buffers)
- {
- GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
- g_cond_signal (queue->fullcond);
- }
-
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
- GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
+ GST_DEBUG_PAD_NAME(pad),
+ queue->level_buffers, queue->size_buffers);
- GST_UNLOCK(queue);
+ /* writer waiting on a full queue */
+ writer = queue->writer;
+
+ g_mutex_unlock (queue->qlock);
+
+ if (writer)
+ {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
+ g_cond_signal (queue->not_full);
+ }
+
+ // FIXME where should this be? locked?
+ if (GST_IS_EVENT(buf)) {
+ GstEvent *event = GST_EVENT(buf);
+ switch (GST_EVENT_TYPE(event)) {
+ case GST_EVENT_EOS:
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
+ gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
+ break;
+ default:
+ break;
+ }
+ }
return buf;
}
Index: gstqueue.h
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstqueue.h,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- gstqueue.h 2001/06/25 01:20:08 1.3
+++ gstqueue.h 2001/10/27 20:28:30 1.4
@@ -75,9 +75,12 @@
gint leaky; /* whether the queue is leaky, and if so at which end */
-// GMutex *lock; (optimization?)
- GCond *emptycond;
- GCond *fullcond;
+ GMutex *qlock; /* lock for queue (vs object lock) */
+ /* we are single reader and single writer queue */
+ gboolean reader; /* reader waiting on empty queue */
+ gboolean writer; /* writer waiting on full queue */
+ GCond *not_empty; /* signals buffers now available for reading */
+ GCond *not_full; /* signals space now available for writing */
GTimeVal *timeval; /* the timeout for the queue locking */
};
Index: gstscheduler.c
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstscheduler.c,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- gstscheduler.c 2001/10/21 18:00:31 1.34
+++ gstscheduler.c 2001/10/27 20:28:30 1.35
@@ -1490,6 +1490,7 @@
} else {
GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
+gst_schedule_show(sched);
//eos = TRUE;
}
} else {
Index: gstthread.c
===================================================================
RCS file: /cvsroot/gstreamer/gstreamer/gst/gstthread.c,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- gstthread.c 2001/10/20 23:15:29 1.57
+++ gstthread.c 2001/10/27 20:28:30 1.58
@@ -382,10 +382,10 @@
//
//FIXME also make this more efficient by keeping list of managed queues
THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
- GST_LOCK(e);
- g_cond_signal((GST_QUEUE(e)->emptycond));
- g_cond_signal((GST_QUEUE(e)->fullcond));
- GST_UNLOCK(e);
+ //GST_LOCK(e);
+ g_cond_signal((GST_QUEUE(e)->not_empty));
+ g_cond_signal((GST_QUEUE(e)->not_full));
+ //GST_UNLOCK(e);
}
else
{
@@ -417,10 +417,10 @@
if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
{
THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
- GST_LOCK(peerelement);
- g_cond_signal(GST_QUEUE(peerelement)->emptycond);
- g_cond_signal(GST_QUEUE(peerelement)->fullcond);
- GST_UNLOCK(peerelement);
+ //GST_LOCK(peerelement);
+ g_cond_signal(GST_QUEUE(peerelement)->not_empty);
+ g_cond_signal(GST_QUEUE(peerelement)->not_full);
+ //GST_UNLOCK(peerelement);
}
}
}
More information about the Gstreamer-commits
mailing list