[gst-devel] last minute! check out this code...
Matt Howell
matth at ridgerun.com
Tue Mar 27 22:22:28 CEST 2001
erik -
attached is a program that demonstrates scheduling and interruptable
"blocking" i/o. it's called iolib.c (needs -lpthread). also attached is a
test program to spit out data: iosrc.c.
$ ./iosrc 0 &
$ ./iosrc 1 &
$ ./iosrc 2 &
$ ./iolib
press 'p' to put it into pause state, then press 'p' to toggle between playing
and paused.
hopefully you'll get this before leaving for the plane. otherwise, we'll take
a look at it when you get in.
matt.
-------------- next part --------------
#include <stdio.h>
main(int argc, char *argv[])
{
char filename[80];
FILE *fp;
sprintf(filename, "fu_input%d", atoi(argv[1]));
fp = fopen(filename, "w");
if (!fp)
{
perror("fopen"); exit(-1);
}
while (1)
{
fprintf(fp, "0123456789A"); // 10 bytes
fflush(fp);
sleep(1);
}
fclose(fp);
}
-------------- next part --------------
// matth at ridgerun.com march 26, 2001
// interruptable i/o lib - see email Wed Mar 21 to gst-devel titled:
// "Re: [gst-devel] Multi-threaded queueing locking issues"
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
extern int errno;
// add to gst_element some persistent storage for file descriptors and
// associated flags... basically, only need to know if element opened for
// blocking i/o
//////////////////////////////////////////////////////////////////////////////
// kludge structs and defines to test...
typedef struct {
int fd;
int flags;
} flags;
// extra kludge: this could certainly be made smaller!
// this should be global? will more than one element ever open the same
// resource a different way?
#define MAXFD 1024
typedef struct {
int state;
int pending;
flags flags[MAXFD]; // assume no more than MAXFD total filehandles
char readname[256];
char writename[256];
int tid; // not pthread tid -- just an integer distinguishing them
} gst_element;
#define GST_FD_FLAGS(e,fd) ((e->flags)[(fd)].flags)
// NONE => READY <=> PAUSED <=> PLAYING
#define STATE_NONE 0
#define STATE_READY 1
#define STATE_PAUSED 2
#define STATE_PLAYING 3
#define GST_STATE(e) ((e)->state)
#define GST_STATE_PENDING(e) ((e)->pending)
//
//////////////////////////////////////////////////////////////////////////////
// thread stuff
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t pending_state = PTHREAD_COND_INITIALIZER; // parent -> child
pthread_cond_t changed_state = PTHREAD_COND_INITIALIZER; // child -> parent
int pending_state_changes = 0; // conditional variable
int gst_open(gst_element *e, const char *pathname, int flags, mode_t mode)
{
int fd = open(pathname, (flags & ~O_SYNC) | O_NONBLOCK, mode);
// if (flags & (O_SYNC | ~(O_NONBLOCK | O_NDELAY)) && fd != -1)
// {
// // record blocking i/o for this fd in e's persistent data
// GST_FD_FLAGS(e,fd) = flags;
// }
if (fd != -1)
GST_FD_FLAGS(e,fd) = flags; // for now, record it always
return fd;
}
size_t gst_read(gst_element *e, int fd, void *buf, size_t count)
{
if ((e->flags)[fd].flags & O_SYNC ||
!(e->flags)[fd].flags & (O_NONBLOCK | O_NDELAY))
{
int actual = 0;
size_t remaining = count;
// what if partial read happens?
// + can't always "re-get" data
// + can't always "un-get" data
// (why would you do a blocking read on a "live" device? lazy, i guess ;-)
//
// regardless, if you do and there's a state change, the data is lost IMO.
// if it's "live" (you can't re- or un- get it) the data was time-sensitive.
// assume it's lost and gone forever.
//
// requirement on user (filter writer):
// 1. write your filters so you can handle a "blocking" read that
// returns -1 (EINTR)
// 2. be able to read from a live source at any point. i.e., if you get
// paused in the middle of reading a frame (or whatever) -- EVEN IF
// YOU DID A BLOCKING READ -- you need to be able to handle starting
// up again at an arbitrary location. (this is obvious)
while (remaining > 0)
{
// what should this condition be?
if (GST_STATE_PENDING(e) != GST_STATE(e))
{
int ret;
fprintf(stderr, " IO_%d READ INTERRUPT: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
ret = lseek(fd, remaining-count, SEEK_CUR); // does this matter?
fprintf(stderr, " IO_%d READ: lseek(%d)\n", e->tid, remaining-count);
if (ret == remaining-count-1)
perror("lseek");
errno = EINTR;
return -1;
}
if ((actual = read(fd, buf, remaining)) < 0)
{
fprintf(stderr, " IO_%d READ ERR: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
return actual; // some other error
}
if (actual)
{
remaining -= actual; buf += actual;
fprintf(stderr, " IO_%d READ INFO: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
}
else
{
fprintf(stderr, " IO_%d READ SLEEP: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
// some timeout ... *how* should this sleep??
sleep(1);
// usleep(100);
// pthread_cond_timedwait(&pending_state, &mutex, );
}
}
}
else
return read(fd, buf, count);
return count;
}
int gst_write(gst_element *e, int fd, const void *buf, size_t count)
{
if ((e->flags)[fd].flags & O_SYNC ||
!(e->flags)[fd].flags & (O_NONBLOCK | O_NDELAY))
{
int actual = 0;
size_t remaining = count;
while (remaining > 0)
{
// what should this condition be?
if (GST_STATE_PENDING(e) != GST_STATE(e))
{
int ret;
fprintf(stderr, " IO_%d WRITE INTERRUPT: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
ret = lseek(fd, remaining-count, SEEK_CUR); // does this matter?
fprintf(stderr, " IO_%d WRITE: lseek(%d)\n", e->tid, remaining-count);
if (ret == remaining-count-1)
perror("lseek");
errno = EINTR;
return -1;
}
if ((actual = write(fd, buf, remaining)) < 0)
{
fprintf(stderr, " IO_%d WRITE ERR: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
return actual; // some other error
}
if (actual)
{
remaining -= actual; buf += actual;
fprintf(stderr, " IO_%d WRITE INFO: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
}
else
{
fprintf(stderr, " IO_%d WRITE SLEEP: fd=%d count=%d read=%d\n",
e->tid, fd, count, count-remaining);
// some timeout ... *how* should this sleep??
sleep(1);
// usleep(100);
// pthread_cond_timedwait(&pending_state, &mutex, );
}
}
}
else
return write(fd, buf, count);
return count;
}
int gst_close(gst_element *e, int fd)
{
GST_FD_FLAGS(e,fd) = 0;
return close(fd);
}
void *thread_src(void *arg)
{
int parent_waiting = 1;
int fdread, fdwrite;
unsigned char buf[17];
gst_element *e = (gst_element *)arg;
buf[16] = 0; // null terminate
pthread_mutex_lock(&mutex);
fprintf(stderr, "THREAD%d: start\n", e->tid);
// open files
if ((fdread = gst_open(e, e->readname, O_SYNC, 0)) == -1)
{
perror("THREAD: err: open readname"); goto thread_src_exit;
}
if ((fdwrite = gst_open(e, e->writename,
O_CREAT|O_WRONLY|O_TRUNC|O_SYNC, 00644)) == -1)
{
perror("THREAD: err: open writename"); goto thread_src_exit;
}
while (1) // begin i/o loop
{
if (GST_STATE(e) == STATE_NONE)
{
// do nothing -- we should have a pending state
// this should only happen once NONE => PAUSED
}
else if (GST_STATE(e) == STATE_READY)
{
fprintf(stderr, "THREAD%d: ready - sleeping...\n", e->tid);
pthread_cond_broadcast(&changed_state); // wake parent up
GST_STATE_PENDING(e) = STATE_READY;
// loop while paused, waiting for a (new) pending_state
while (GST_STATE_PENDING(e) == STATE_READY)
pthread_cond_wait(&pending_state, &mutex);
fprintf(stderr, "THREAD%d: ready - awake: new state pending\n", e->tid);
}
else if (GST_STATE(e) == STATE_PAUSED)
{
fprintf(stderr, "THREAD%d: paused - sleeping...\n", e->tid);
pthread_cond_broadcast(&changed_state); // wake parent up
GST_STATE_PENDING(e) = STATE_PAUSED;
// loop while paused, waiting for a (new) pending_state
while (GST_STATE_PENDING(e) == STATE_PAUSED)
pthread_cond_wait(&pending_state, &mutex);
fprintf(stderr, "THREAD%d: paused: awake: new state pending\n", e->tid);
}
else if (GST_STATE(e) == STATE_PLAYING)
{
int ret = 0;
// do read
fprintf(stderr, "THREAD%d: playing - read 16 bytes from: %d\n",
e->tid, fdread);
ret = gst_read(e, fdread, buf, sizeof(buf)-1);
if (ret == sizeof(buf)-1)
{
// read successful
fprintf(stderr, "THREAD%d: read '%s'\n", e->tid, buf);
}
else if (ret == -1 && errno == EINTR)
{
perror("THREAD: read: i/o interrupted! continuing");
if (GST_STATE(e) == GST_STATE_PENDING(e)) // sanity check
{
fprintf(stderr, "THREAD%d SOFTERR! state==pending\n", e->tid);
goto thread_src_exit;
}
}
else if (ret == -1)
{
perror("THREAD: i/o err: read"); goto thread_src_exit;
}
if (ret != -1)
{
// do write
fprintf(stderr, "THREAD%d: playing - write 16 bytes to: %d\n",
e->tid, fdwrite);
ret = gst_write(e, fdwrite, buf, sizeof(buf)-1);
if (ret == sizeof(buf)-1)
{
// write successful
// fprintf(stderr, "THREAD%d: wrote '%s'\n", e->tid, buf);
}
else if (ret == -1 && errno == EINTR)
{
perror("THREAD: write: i/o interrupted! continuing");
if (GST_STATE(e) == GST_STATE_PENDING(e)) // sanity check
{
fprintf(stderr, "THREAD%d SOFTERR! state==pending\n", e->tid);
goto thread_src_exit;
}
}
else if (ret == -1)
{
perror("THREAD: i/o err: write"); goto thread_src_exit;
}
}
}
//
// handle state transitions (if there's an outstanding one)
//
if (GST_STATE_PENDING(e) != GST_STATE(e))
{
if (GST_STATE_PENDING(e) == STATE_READY)
{
// we already have mutex in both NONE -> READY and PAUSED -> READY
GST_STATE(e) = STATE_READY;
pending_state_changes--;
// we still own mutex...
// we'll release it when we do cond_wait in GST_STATE(e)==READY
}
else if (GST_STATE_PENDING(e) == STATE_PAUSED)
{
// only when we go from PLAYING -> PAUSED will we not own the mutex
// so lock it
if (GST_STATE(e) == STATE_PLAYING)
pthread_mutex_lock(&mutex);
GST_STATE(e) = STATE_PAUSED;
pending_state_changes--;
if (!pending_state_changes && errno == EINTR)
errno = 0;
// we still own mutex...
// we'll release it when we do cond_wait in GST_STATE(e)==READY
}
else if (GST_STATE_PENDING(e) == STATE_PLAYING)
{
// we already have mutex in PAUSED -> PLAYING
GST_STATE(e) = STATE_PLAYING;
pending_state_changes--;
pthread_cond_broadcast(&changed_state); // wake parent up
GST_STATE_PENDING(e) = STATE_PLAYING;
// for all practical purposes, we're "playing" -- even though we
// haven't gotten to GST_STATE(e)==PLAYING... unlock
pthread_mutex_unlock(&mutex);
}
} // end handling state transition
} // end i/o while loop
thread_src_exit:
gst_close(e, fdread);
gst_close(e, fdwrite);
}
main(int argc, char *argv[])
{
#define NUM_CHILDREN 3
char s[256];
pthread_t tid[NUM_CHILDREN];
gst_element e[NUM_CHILDREN];
int i, state = STATE_READY;
#define INPUTNAME "fu_input"
#define OUTPUTNAME "fu_output"
// setup and spawn threads
pthread_mutex_lock(&mutex);
fprintf(stderr, "MAIN: creating threads...\n");
for (i = 0; i < NUM_CHILDREN; i++)
{
GST_STATE(&(e[i])) = STATE_NONE;
GST_STATE_PENDING(&(e[i])) = STATE_READY;
pending_state_changes++;
sprintf(e[i].readname, "%s%d", INPUTNAME, i); // vary this?
sprintf(e[i].writename, "%s%d", OUTPUTNAME, i); // vary this?
e[i].tid = i;
pthread_create(&(tid[i]), NULL, thread_src, (void *)(&(e[i])));
}
fprintf(stderr, "MAIN: waiting for all threads to start...\n");
while (pending_state_changes > 0)
pthread_cond_wait(&changed_state, &mutex);
fprintf(stderr, "MAIN: threads all started\n");
pthread_mutex_unlock(&mutex);
while (1)
{
scanf("%s", &s);
fprintf(stderr, "MAIN: read '%s'\n", s);
if (!strcmp(s, "p"))
{
// toggle state playing <-> paused
if (state == STATE_READY || state == STATE_PLAYING)
{
pthread_mutex_lock(&mutex);
fprintf(stderr, "MAIN: setting threads to PAUSED...\n");
pending_state_changes = NUM_CHILDREN;
for (i = 0; i < NUM_CHILDREN; i++)
GST_STATE_PENDING(&(e[i])) = STATE_PAUSED;
pthread_cond_broadcast(&pending_state); // wake children up
while (pending_state_changes > 0)
pthread_cond_wait(&changed_state, &mutex);
fprintf(stderr, "MAIN: all threads are PAUSED\n");
state = STATE_PAUSED;
pthread_mutex_unlock(&mutex);
}
else if (state == STATE_PAUSED)
{
pthread_mutex_lock(&mutex);
fprintf(stderr, "MAIN: setting threads to PLAYING...\n");
pending_state_changes = NUM_CHILDREN;
for (i = 0; i < NUM_CHILDREN; i++)
GST_STATE_PENDING(&(e[i])) = STATE_PLAYING;
pthread_cond_broadcast(&pending_state); // wake children up
while (pending_state_changes > 0)
pthread_cond_wait(&changed_state, &mutex);
fprintf(stderr, "MAIN: all threads are PLAYING\n");
state = STATE_PLAYING;
pthread_mutex_unlock(&mutex);
}
}
}
}
More information about the gstreamer-devel
mailing list