[gst-devel] last minute! check out this code...

Matt Howell matth at ridgerun.com
Tue Mar 27 22:22:28 CEST 2001


erik -
attached is a program that demonstrates scheduling and interruptable
"blocking" i/o.  it's called iolib.c (needs -lpthread).  also attached is a
test program to spit out data: iosrc.c.

$ ./iosrc 0 &
$ ./iosrc 1 &
$ ./iosrc 2 &
$ ./iolib

press 'p' to put it into pause state, then press 'p' to toggle between playing
and paused.

hopefully you'll get this before leaving for the plane.  otherwise, we'll take
a look at it when you get in.

matt.


-------------- next part --------------
#include <stdio.h>

main(int argc, char *argv[])
{
  char filename[80];
  FILE *fp;
  
  sprintf(filename, "fu_input%d", atoi(argv[1]));
  fp = fopen(filename, "w");
  if (!fp)
  {
    perror("fopen"); exit(-1);
  }
  while (1)
  {
    fprintf(fp, "0123456789A"); // 10 bytes
    fflush(fp);
    sleep(1);
  }
  fclose(fp);
}
-------------- next part --------------
// matth at ridgerun.com march 26, 2001
// interruptable i/o lib - see email Wed Mar 21 to gst-devel titled: 
//   "Re: [gst-devel] Multi-threaded queueing locking issues"

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>

extern int errno;

// add to gst_element some persistent storage for file descriptors and
// associated flags... basically, only need to know if element opened for
// blocking i/o

//////////////////////////////////////////////////////////////////////////////
// kludge structs and defines to test...

typedef struct {
  int fd;
  int flags;
} flags;

// extra kludge: this could certainly be made smaller!
// this should be global?  will more than one element ever open the same
// resource a different way?
#define MAXFD 1024

typedef struct {
  int state;
  int pending;
  flags flags[MAXFD]; // assume no more than MAXFD total filehandles
  char readname[256];
  char writename[256];
  int tid; // not pthread tid -- just an integer distinguishing them
} gst_element;

#define GST_FD_FLAGS(e,fd) ((e->flags)[(fd)].flags)

// NONE => READY <=> PAUSED <=> PLAYING
#define STATE_NONE 0
#define STATE_READY 1
#define STATE_PAUSED 2
#define STATE_PLAYING 3
#define GST_STATE(e) ((e)->state)
#define GST_STATE_PENDING(e) ((e)->pending)

//
//////////////////////////////////////////////////////////////////////////////

// thread stuff
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t pending_state = PTHREAD_COND_INITIALIZER; // parent -> child
pthread_cond_t changed_state = PTHREAD_COND_INITIALIZER; // child -> parent
int pending_state_changes = 0; // conditional variable



int gst_open(gst_element *e, const char *pathname, int flags, mode_t mode)
{
  int fd = open(pathname, (flags & ~O_SYNC) | O_NONBLOCK, mode);
//  if (flags & (O_SYNC | ~(O_NONBLOCK | O_NDELAY)) && fd != -1)
//  {
//    // record blocking i/o for this fd in e's persistent data
//    GST_FD_FLAGS(e,fd) = flags;
//  }

  if (fd != -1)
    GST_FD_FLAGS(e,fd) = flags; // for now, record it always

  return fd;
}


size_t gst_read(gst_element *e, int fd, void *buf, size_t count)
{
  if ((e->flags)[fd].flags & O_SYNC ||
      !(e->flags)[fd].flags & (O_NONBLOCK | O_NDELAY))
  {
    int actual = 0;
    size_t remaining = count;
    // what if partial read happens?
    // + can't always "re-get" data
    // + can't always "un-get" data
    // (why would you do a blocking read on a "live" device?  lazy, i guess ;-)
    //
    // regardless, if you do and there's a state change, the data is lost IMO.
    // if it's "live" (you can't re- or un- get it) the data was time-sensitive.
    // assume it's lost and gone forever.
    //
    // requirement on user (filter writer):
    // 1. write your filters so you can handle a "blocking" read that
    //    returns -1 (EINTR)
    // 2. be able to read from a live source at any point.  i.e., if you get
    //    paused in the middle of reading a frame (or whatever) -- EVEN IF
    //    YOU DID A BLOCKING READ -- you need to be able to handle starting
    //    up again at an arbitrary location.  (this is obvious)
    while (remaining > 0)
    {
      // what should this condition be? 
      if (GST_STATE_PENDING(e) != GST_STATE(e))
      {
        int ret;
fprintf(stderr, "   IO_%d READ INTERRUPT: fd=%d count=%d read=%d\n",
                 e->tid, fd, count, count-remaining);
        ret = lseek(fd, remaining-count, SEEK_CUR); // does this matter?
        fprintf(stderr, "   IO_%d READ: lseek(%d)\n", e->tid, remaining-count);
        if (ret == remaining-count-1)
          perror("lseek");
        errno = EINTR;
        return -1;
      }
      if ((actual = read(fd, buf, remaining)) < 0)
      {
fprintf(stderr, "   IO_%d READ ERR: fd=%d count=%d read=%d\n",
                e->tid, fd, count, count-remaining);
        return actual; // some other error
      }
      if (actual)
      {
        remaining -= actual; buf += actual;
fprintf(stderr, "   IO_%d READ INFO: fd=%d count=%d read=%d\n",
                e->tid, fd, count, count-remaining);
      }
      else
      {
fprintf(stderr, "   IO_%d READ SLEEP: fd=%d count=%d read=%d\n", 
                e->tid, fd, count, count-remaining);
        // some timeout ... *how* should this sleep??
        sleep(1);
//        usleep(100);
//        pthread_cond_timedwait(&pending_state, &mutex, );
      }
    }
  }
  else
    return read(fd, buf, count);

  return count;
}


int gst_write(gst_element *e, int fd, const void *buf, size_t count)
{
  if ((e->flags)[fd].flags & O_SYNC ||
      !(e->flags)[fd].flags & (O_NONBLOCK | O_NDELAY))
  {
    int actual = 0;
    size_t remaining = count;
    while (remaining > 0)
    {
      // what should this condition be? 
      if (GST_STATE_PENDING(e) != GST_STATE(e))
      {
        int ret;
fprintf(stderr, "   IO_%d WRITE INTERRUPT: fd=%d count=%d read=%d\n",
                 e->tid, fd, count, count-remaining);
        ret = lseek(fd, remaining-count, SEEK_CUR); // does this matter?
        fprintf(stderr, "   IO_%d WRITE: lseek(%d)\n", e->tid, remaining-count);
        if (ret == remaining-count-1)
          perror("lseek");
        errno = EINTR;
        return -1;
      }
      if ((actual = write(fd, buf, remaining)) < 0)
      {
fprintf(stderr, "   IO_%d WRITE ERR: fd=%d count=%d read=%d\n",
                e->tid, fd, count, count-remaining);
        return actual; // some other error
      }
      if (actual)
      {
        remaining -= actual; buf += actual;
fprintf(stderr, "   IO_%d WRITE INFO: fd=%d count=%d read=%d\n",
                e->tid, fd, count, count-remaining);
      }
      else
      {
fprintf(stderr, "   IO_%d WRITE SLEEP: fd=%d count=%d read=%d\n", 
                e->tid, fd, count, count-remaining);
        // some timeout ... *how* should this sleep??
        sleep(1);
//        usleep(100);
//        pthread_cond_timedwait(&pending_state, &mutex, );
      }
    }
  }
  else
    return write(fd, buf, count);

  return count;
}


int gst_close(gst_element *e, int fd)
{
  GST_FD_FLAGS(e,fd) = 0;
  return close(fd);
}


void *thread_src(void *arg)
{
  int parent_waiting = 1;
  int fdread, fdwrite;
  unsigned char buf[17];
  gst_element *e = (gst_element *)arg;
  buf[16] = 0; // null terminate

  pthread_mutex_lock(&mutex);
  fprintf(stderr, "THREAD%d: start\n", e->tid);

  // open files
  if ((fdread = gst_open(e, e->readname, O_SYNC, 0)) == -1)
  {
    perror("THREAD: err: open readname"); goto thread_src_exit; 
  }
  if ((fdwrite = gst_open(e, e->writename,
                          O_CREAT|O_WRONLY|O_TRUNC|O_SYNC, 00644)) == -1)
  {
    perror("THREAD: err: open writename"); goto thread_src_exit; 
  }

  while (1) // begin i/o loop
  {
    if (GST_STATE(e) == STATE_NONE)
    {
      // do nothing -- we should have a pending state
      // this should only happen once NONE => PAUSED
    }
    else if (GST_STATE(e) == STATE_READY)
    {
      fprintf(stderr, "THREAD%d: ready - sleeping...\n", e->tid);
      pthread_cond_broadcast(&changed_state); // wake parent up
      GST_STATE_PENDING(e) = STATE_READY;

      // loop while paused, waiting for a (new) pending_state
      while (GST_STATE_PENDING(e) == STATE_READY)
        pthread_cond_wait(&pending_state, &mutex);

      fprintf(stderr, "THREAD%d: ready - awake: new state pending\n", e->tid);
    }
    else if (GST_STATE(e) == STATE_PAUSED)
    {
      fprintf(stderr, "THREAD%d: paused - sleeping...\n", e->tid);
      pthread_cond_broadcast(&changed_state); // wake parent up
      GST_STATE_PENDING(e) = STATE_PAUSED;

      // loop while paused, waiting for a (new) pending_state
      while (GST_STATE_PENDING(e) == STATE_PAUSED)
        pthread_cond_wait(&pending_state, &mutex);

      fprintf(stderr, "THREAD%d: paused: awake: new state pending\n", e->tid);
    }
    else if (GST_STATE(e) == STATE_PLAYING)
    {
      int ret = 0;
      // do read
      fprintf(stderr, "THREAD%d: playing - read 16 bytes from: %d\n",
              e->tid, fdread);
      ret = gst_read(e, fdread, buf, sizeof(buf)-1);
      if (ret == sizeof(buf)-1)
      {
        // read successful
        fprintf(stderr, "THREAD%d: read '%s'\n", e->tid, buf);
      }
      else if (ret == -1 && errno == EINTR)
      {
        perror("THREAD: read: i/o interrupted!  continuing");
        if (GST_STATE(e) == GST_STATE_PENDING(e)) // sanity check
        {
          fprintf(stderr, "THREAD%d SOFTERR!  state==pending\n", e->tid);
          goto thread_src_exit;
        }
      }
      else if (ret == -1)
      {
        perror("THREAD: i/o err: read"); goto thread_src_exit;
      }

      if (ret != -1)
      {
        // do write
        fprintf(stderr, "THREAD%d: playing - write 16 bytes to: %d\n",
                e->tid, fdwrite);
        ret = gst_write(e, fdwrite, buf, sizeof(buf)-1);
        if (ret == sizeof(buf)-1)
        {
          // write successful
//          fprintf(stderr, "THREAD%d: wrote '%s'\n", e->tid, buf);
        }
        else if (ret == -1 && errno == EINTR)
        {
          perror("THREAD: write: i/o interrupted!  continuing");
          if (GST_STATE(e) == GST_STATE_PENDING(e)) // sanity check
          {
            fprintf(stderr, "THREAD%d SOFTERR!  state==pending\n", e->tid);
            goto thread_src_exit;
          }
        }
        else if (ret == -1)
        {
          perror("THREAD: i/o err: write"); goto thread_src_exit;
        }
      }
    }

    //
    // handle state transitions (if there's an outstanding one)
    //
    if (GST_STATE_PENDING(e) != GST_STATE(e))
    {
      if (GST_STATE_PENDING(e) == STATE_READY)
      {
        // we already have mutex in both NONE -> READY and PAUSED -> READY

        GST_STATE(e) = STATE_READY;
        pending_state_changes--;
        // we still own mutex...
        // we'll release it when we do cond_wait in GST_STATE(e)==READY
      }
      else if (GST_STATE_PENDING(e) == STATE_PAUSED)
      {
        // only when we go from PLAYING -> PAUSED will we not own the mutex
        // so lock it
        if (GST_STATE(e) == STATE_PLAYING)
          pthread_mutex_lock(&mutex);

        GST_STATE(e) = STATE_PAUSED;
        pending_state_changes--;
        if (!pending_state_changes && errno == EINTR)
          errno = 0;

        // we still own mutex...
        // we'll release it when we do cond_wait in GST_STATE(e)==READY
      }
      else if (GST_STATE_PENDING(e) == STATE_PLAYING)
      {
        // we already have mutex in PAUSED -> PLAYING

        GST_STATE(e) = STATE_PLAYING;
        pending_state_changes--;
        pthread_cond_broadcast(&changed_state); // wake parent up
        GST_STATE_PENDING(e) = STATE_PLAYING;
        // for all practical purposes, we're "playing" -- even though we
        // haven't gotten to GST_STATE(e)==PLAYING... unlock
        pthread_mutex_unlock(&mutex);
      }
    } // end handling state transition

  } // end i/o while loop

thread_src_exit:
  gst_close(e, fdread);
  gst_close(e, fdwrite);
}


main(int argc, char *argv[])
{
#define NUM_CHILDREN 3
  char s[256];
  pthread_t tid[NUM_CHILDREN];
  gst_element e[NUM_CHILDREN];
  int i, state = STATE_READY;

#define INPUTNAME "fu_input"
#define OUTPUTNAME "fu_output"

  // setup and spawn threads
  pthread_mutex_lock(&mutex);
  fprintf(stderr, "MAIN: creating threads...\n");
  for (i = 0; i < NUM_CHILDREN; i++)
  {
    GST_STATE(&(e[i])) = STATE_NONE;
    GST_STATE_PENDING(&(e[i])) = STATE_READY;
    pending_state_changes++;
    sprintf(e[i].readname, "%s%d", INPUTNAME, i); // vary this?
    sprintf(e[i].writename, "%s%d", OUTPUTNAME, i); // vary this?
    e[i].tid = i;
    pthread_create(&(tid[i]), NULL, thread_src, (void *)(&(e[i])));
  }

  fprintf(stderr, "MAIN: waiting for all threads to start...\n");
  while (pending_state_changes > 0)
    pthread_cond_wait(&changed_state, &mutex);
  fprintf(stderr, "MAIN: threads all started\n");
  pthread_mutex_unlock(&mutex);

  while (1)
  {
    scanf("%s", &s);
    fprintf(stderr, "MAIN: read '%s'\n", s);
    if (!strcmp(s, "p"))
    {
      // toggle state playing <-> paused
      if (state == STATE_READY || state == STATE_PLAYING)
      {
        pthread_mutex_lock(&mutex);
        fprintf(stderr, "MAIN: setting threads to PAUSED...\n");
        pending_state_changes = NUM_CHILDREN;
        for (i = 0; i < NUM_CHILDREN; i++)
          GST_STATE_PENDING(&(e[i])) = STATE_PAUSED;
        pthread_cond_broadcast(&pending_state); // wake children up
        while (pending_state_changes > 0)
          pthread_cond_wait(&changed_state, &mutex);
        fprintf(stderr, "MAIN: all threads are PAUSED\n");
        state = STATE_PAUSED;
        pthread_mutex_unlock(&mutex);
      }
      else if (state == STATE_PAUSED)
      {
        pthread_mutex_lock(&mutex);
        fprintf(stderr, "MAIN: setting threads to PLAYING...\n");
        pending_state_changes = NUM_CHILDREN;
        for (i = 0; i < NUM_CHILDREN; i++)
          GST_STATE_PENDING(&(e[i])) = STATE_PLAYING;
        pthread_cond_broadcast(&pending_state); // wake children up
        while (pending_state_changes > 0)
          pthread_cond_wait(&changed_state, &mutex);
        fprintf(stderr, "MAIN: all threads are PLAYING\n");
        state = STATE_PLAYING;
        pthread_mutex_unlock(&mutex);
      }
    }
  }

}



More information about the gstreamer-devel mailing list