[farsight2/master] Port msnstream to use GstPoll instead of GIOChannel

Youness Alaoui youness.alaoui at collabora.co.uk
Tue Jul 14 09:50:44 PDT 2009


---
 gst/fsmsnconference/fs-msn-stream.c |  290 +++++++++++++++++------------------
 1 files changed, 139 insertions(+), 151 deletions(-)

diff --git a/gst/fsmsnconference/fs-msn-stream.c b/gst/fsmsnconference/fs-msn-stream.c
index 4b49ce7..789d952 100644
--- a/gst/fsmsnconference/fs-msn-stream.c
+++ b/gst/fsmsnconference/fs-msn-stream.c
@@ -73,7 +73,7 @@ struct _FsMsnPollFD {
   GstPollFD pollfd;
   gboolean want_read;
   gboolean want_write;
-  void (*next_step) (FsMsnStream *self, FsMsnPollFD *fd);
+  void (*next_step) (FsMsnStream *self, FsMsnPollFD *pollfd);
 };
 
 
@@ -82,15 +82,12 @@ struct _FsMsnStreamPrivate
   FsMsnSession *session;
   FsMsnParticipant *participant;
   FsStreamDirection direction;
-  GArray *fdlist;
   FsMsnConference *conference;
   GstElement *media_fd_src,*media_fd_sink,*send_valve;
   GstPad *sink_pad,*src_pad;
-  guint in_watch, out_watch, main_watch;
   gint local_recipientid, local_sessionid;
   gint remote_recipientid, remote_sessionid;
   gint port;
-  GIOChannel *connection;
 
   GThread *polling_thread;
   GstClockTime poll_timeout;
@@ -130,13 +127,11 @@ static gboolean fs_msn_stream_set_remote_candidate  (FsMsnStream *stream,
     FsCandidate *candidate,
     GError **error);
 
-static gboolean main_fd_closed_cb (GIOChannel *ch,
-    GIOCondition cond,
-    gpointer data);
+static void main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *fd);
 
-static gboolean successfull_connection_cb (GIOChannel *ch,
-    GIOCondition cond,
-    gpointer data);
+static void successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
+
+static void fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
 
 static gboolean fs_msn_stream_attempt_connection (FsMsnStream *stream,
     gchar const *ip,
@@ -150,6 +145,7 @@ static void fs_msn_open_listening_port (FsMsnStream *stream,
 
 static gpointer connection_polling_thread (gpointer data);
 
+static void shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd);
 
 static GObjectClass *parent_class = NULL;
 
@@ -259,10 +255,10 @@ fs_msn_stream_init (FsMsnStream *self)
   self->priv->participant = NULL;
 
   self->priv->direction = FS_DIRECTION_NONE;
-  self->priv->fdlist = g_array_new (FALSE, FALSE, sizeof(GIOChannel *));
 
   self->priv->poll_timeout = GST_CLOCK_TIME_NONE;
   self->priv->poll = gst_poll_new (TRUE);
+  gst_poll_set_controllable (self->priv->poll, TRUE);
   self->priv->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
 }
 
@@ -714,43 +710,36 @@ fs_msn_stream_set_remote_candidates (FsStream *stream, GList *candidates,
   return TRUE;
 }
 
-static gboolean main_fd_closed_cb (GIOChannel *ch,
-                                   GIOCondition cond,
-                                   gpointer data)
+static void
+main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
 {
-  FsMsnStream *self = FS_MSN_STREAM (data);
-
-  g_message ("disconnection on video feed %p %p", ch, self->priv->connection);
-  g_source_remove (self->priv->main_watch);
+  g_message ("disconnection on video feed");
   /* IXME - How to handle the disconnection of the stream
      Destroy the elements involved?
      Set the state to Null ?
   */
-  return FALSE;
 }
 
-static gboolean successfull_connection_cb (GIOChannel *ch,
-    GIOCondition cond,
-    gpointer data)
+static void
+successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
 {
-  FsMsnStream *self = FS_MSN_STREAM (data);
   gint error;
   socklen_t option_len;
-  gint fd = g_io_channel_unix_get_fd (ch);
 
-  g_message ("handler called on fd %d", fd);
+  g_message ("handler called on fd %d", pollfd->pollfd.fd);
 
   errno = 0;
-  if (!((cond & G_IO_IN) || (cond & G_IO_OUT)))
+  if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
+      gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
   {
-    g_message ("Condition received is %d", cond);
+    g_message ("connecton closed or error");
     goto error;
   }
 
   option_len = sizeof(error);
 
   /* Get the error option */
-  if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
+  if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
   {
     g_warning ("getsockopt() failed");
     goto error;
@@ -764,37 +753,35 @@ static gboolean successfull_connection_cb (GIOChannel *ch,
   }
 
   /* Remove NON BLOCKING MODE */
-  if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
+  if (fcntl(pollfd->pollfd.fd, F_SETFL,
+          fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
   {
     g_warning ("fcntl() failed");
     goto error;
   }
 
-  g_message ("Got connection on fd %d", fd);
+  g_message ("Got connection on fd %d", pollfd->pollfd.fd);
 
-  if (fs_msn_authenticate_outgoing (self,fd))
+  if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
   {
-    g_message ("Authenticated outgoing successfully fd %d", fd);
-    self->priv->connection = ch;
+    g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
 
     // success! we need to shutdown/close all other channels
     gint i;
-    for (i = 0; i < self->priv->fdlist->len; i++)
+    for (i = 0; i < self->priv->pollfds->len; i++)
     {
-      GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
-      if (chan != ch)
+      FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+      if (pollfd != pollfd2)
       {
-        g_message ("closing fd %d", g_io_channel_unix_get_fd (chan));
-        g_io_channel_shutdown (chan, TRUE, NULL);
-        g_io_channel_unref (chan);
-        g_array_remove_index (self->priv->fdlist, i);
+        g_message ("closing fd %d", pollfd2->pollfd.fd);
+        shutdown_fd (self, pollfd2);
+        i--;
       }
     }
 
-    g_source_remove (self->priv->out_watch);
     if (self->priv->direction == FS_DIRECTION_RECV)
     {
-      g_message("Setting media_fd_src on fd %d",fd);
+      g_message("Setting media_fd_src on fd %d", pollfd->pollfd.fd);
 
       GstState state;
       gst_element_get_state(self->priv->media_fd_src, &state, NULL,
@@ -805,13 +792,13 @@ static gboolean successfull_connection_cb (GIOChannel *ch,
         g_message("Error: fdsrc in state above ready");
         gst_element_set_state(self->priv->media_fd_src,GST_STATE_READY);
       }
-      g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", fd, NULL);
+      g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", pollfd->pollfd.fd, NULL);
       gst_element_set_locked_state(self->priv->media_fd_src,FALSE);
       gst_element_sync_state_with_parent(self->priv->media_fd_src);
     }
     else if (self->priv->direction == FS_DIRECTION_SEND)
     {
-      g_message("Setting media_fd_sink on fd %d",fd);
+      g_message("Setting media_fd_sink on fd %d", pollfd->pollfd.fd);
 
       GstState state;
       gst_element_get_state(self->priv->media_fd_sink, &state, NULL,
@@ -822,40 +809,41 @@ static gboolean successfull_connection_cb (GIOChannel *ch,
         g_message("Error: fdsrc in state above ready");
         gst_element_set_state(self->priv->media_fd_sink,GST_STATE_READY);
       }
-      g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", fd, NULL);
+      g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", pollfd->pollfd.fd, NULL);
       gst_element_set_locked_state(self->priv->media_fd_sink,FALSE);
       gst_element_sync_state_with_parent(self->priv->media_fd_sink);
       g_object_set (G_OBJECT (self->priv->send_valve), "drop", FALSE, NULL);
 
     }
-    // add a watch on this fd to when it disconnects
-    self->priv->main_watch = g_io_add_watch (ch,
-        (G_IO_ERR|G_IO_HUP|G_IO_NVAL),
-        main_fd_closed_cb, self);
-    return FALSE;
+    pollfd->want_read = FALSE;
+    pollfd->want_write = FALSE;
+    gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, FALSE);
+    gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
+    pollfd->next_step = main_fd_closed_cb;
+    return;
   }
   else
   {
-    g_message ("Authentification failed on fd %d", fd);
+    g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
   }
 
   /* Error */
  error:
-  g_message ("Got error from fd %d, closing", fd);
+  g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
   // find, shutdown and remove channel from fdlist
   gint i;
-  for (i = 0; i < self->priv->fdlist->len; i++)
+  for (i = 0; i < self->priv->pollfds->len; i++)
   {
-    GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
-    if (ch == chan)
+    FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+    if (pollfd == pollfd2)
     {
-      g_io_channel_shutdown (chan, TRUE, NULL);
-      g_io_channel_unref (chan);
-      g_array_remove_index (self->priv->fdlist, i);
+      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
     }
   }
 
-  return FALSE;
+  return;
 }
 
 static gboolean
@@ -864,8 +852,7 @@ fs_msn_stream_attempt_connection (FsMsnStream *stream,
     guint16 port)
 {
   FsMsnStream *self = FS_MSN_STREAM (stream);
-
-  GIOChannel *chan;
+  FsMsnPollFD *pollfd = NULL;
   gint fd = -1;
   struct sockaddr_in theiraddr;
   memset(&theiraddr, 0, sizeof(theiraddr));
@@ -877,10 +864,6 @@ fs_msn_stream_attempt_connection (FsMsnStream *stream,
     return FALSE;
   }
 
-  chan = g_io_channel_unix_new (fd);
-  g_io_channel_set_close_on_unref (chan, TRUE);
-  g_array_append_val (self->priv->fdlist, chan);
-
   // set non-blocking mode
   fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
 
@@ -895,17 +878,22 @@ fs_msn_stream_attempt_connection (FsMsnStream *stream,
   {
     if (errno != EINPROGRESS)
     {
-      g_io_channel_shutdown (chan, TRUE, NULL);
-      g_io_channel_unref (chan);
+      close (fd);
       return FALSE;
     }
   }
   g_message("ret %d %d %s", ret, errno, strerror(errno));
 
-  // add a watch on that io for when it connects
-  self->priv->out_watch = g_io_add_watch (chan,
-      (G_IO_IN|G_IO_OUT|G_IO_PRI|G_IO_ERR|G_IO_HUP|G_IO_NVAL),
-      successfull_connection_cb, self);
+  pollfd = g_slice_new0 (FsMsnPollFD);
+  gst_poll_fd_init (&pollfd->pollfd);
+  pollfd->pollfd.fd = fd;
+  pollfd->want_read = TRUE;
+  pollfd->want_write = TRUE;
+  gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
+  gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, TRUE);
+  pollfd->next_step = successfull_connection_cb;
+  g_array_append_val (self->priv->pollfds, pollfd);
+
   return TRUE;
 }
 
@@ -963,47 +951,28 @@ fs_msn_authenticate_incoming (FsMsnStream *stream, gint fd)
   return FALSE;
 }
 
-static gboolean
-fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
+static void
+fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
 {
-  FsMsnStream *self = FS_MSN_STREAM (data);
   struct sockaddr_in in;
-  int fd;
-  GIOChannel *newchan = NULL;
+  int fd = -1;
   socklen_t n = sizeof (in);
+  FsMsnPollFD *newpollfd = NULL;
 
-  if (!(cond & G_IO_IN))
+  if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
+      gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
   {
-    g_message ("Error in condition not G_IO_IN on %d see next error",
-        g_io_channel_unix_get_fd (ch));
-    if (cond & G_IO_OUT)
-      g_message("Error: Data can be written (without blocking) in the"
-          " accept_connection cb");
-    else if (cond & G_IO_PRI)
-      g_message("Error: There is urgent data to read in the accept_connection"
-          " cb");
-    else if (cond & G_IO_ERR)
-      g_message("Error: There is an error in the accept_connection cb");
-    else if (cond & G_IO_HUP)
-      g_message("Error: Hung up (the connection has been broken, usually for"
-          " pipes and sockets) in the accept_connection cb");
-    else if (cond & G_IO_NVAL)
-      g_message("Error: Invalid request. The file descriptor is not open in"
-          " the accept_connection cb");
-    return FALSE;
+    g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
+    goto error;
   }
 
-  if ((fd = accept(g_io_channel_unix_get_fd (ch),
+  if ((fd = accept(pollfd->pollfd.fd,
               (struct sockaddr*) &in, &n)) == -1)
   {
     g_message ("Error while running accept() %d", errno);
-    return FALSE;
+    return;
   }
 
-  // ok we got a connection, let's set it up
-  newchan = g_io_channel_unix_new (fd);
-  g_io_channel_set_close_on_unref (newchan, TRUE);
-
   /* Remove NON BLOCKING MODE */
   if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
   {
@@ -1015,21 +984,15 @@ fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
   if (fs_msn_authenticate_incoming(self,fd))
   {
     g_message ("Authenticated incoming successfully fd %d", fd);
-    self->priv->connection = newchan;
 
     // success! we need to shutdown/close all other channels
-    g_source_remove (self->priv->in_watch);
     gint i;
-    for (i = 0; i < self->priv->fdlist->len; i++)
+    for (i = 0; i < self->priv->pollfds->len; i++)
     {
-      GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
-      if (chan != newchan)
-      {
-        g_message ("closing fd %d", g_io_channel_unix_get_fd (chan));
-        g_io_channel_shutdown (chan, TRUE, NULL);
-        g_io_channel_unref (chan);
-        g_array_remove_index (self->priv->fdlist, i);
-      }
+      FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
     }
     if (self->priv->direction == FS_DIRECTION_RECV)
     {
@@ -1051,11 +1014,17 @@ fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
     else if (self->priv->direction == FS_DIRECTION_SEND)
     {
     }
-    // add a watch on this fd to when it disconnects
-    self->priv->main_watch = g_io_add_watch (newchan,
-        (G_IO_ERR|G_IO_HUP|G_IO_NVAL),
-        main_fd_closed_cb, self);
-    return FALSE;
+
+    newpollfd = g_slice_new0 (FsMsnPollFD);
+    gst_poll_fd_init (&newpollfd->pollfd);
+    newpollfd->pollfd.fd = fd;
+    newpollfd->want_read = FALSE;
+    newpollfd->want_write = FALSE;
+    gst_poll_fd_ctl_read (self->priv->poll, &newpollfd->pollfd, FALSE);
+    gst_poll_fd_ctl_write (self->priv->poll, &newpollfd->pollfd, FALSE);
+    newpollfd->next_step = main_fd_closed_cb;
+    g_array_append_val (self->priv->pollfds, newpollfd);
+    return;
   }
 
   /* Error */
@@ -1063,18 +1032,21 @@ fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
   g_message ("Got error from fd %d, closing", fd);
   // find, shutdown and remove channel from fdlist
   gint i;
-  for (i = 0; i < self->priv->fdlist->len; i++)
+  for (i = 0; i < self->priv->pollfds->len; i++)
   {
-    GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
-    if (newchan == chan)
+    FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+    if (pollfd == pollfd2)
     {
-      g_io_channel_shutdown (chan, TRUE, NULL);
-      g_io_channel_unref (chan);
-      g_array_remove_index (self->priv->fdlist, i);
+      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
     }
   }
 
-  return FALSE;
+  if (fd > 0)
+    close (fd);
+
+  return;
 }
 
 static void
@@ -1082,12 +1054,12 @@ fs_msn_open_listening_port (FsMsnStream *stream,
                             guint16 port)
 {
   FsMsnStream *self = FS_MSN_STREAM (stream);
-  g_message ("Attempting to listen on port %d.....\n",port);
-
-  GIOChannel *chan;
   gint fd = -1;
-  struct sockaddr_in theiraddr;
-  memset(&theiraddr, 0, sizeof(theiraddr));
+  FsMsnPollFD *pollfd = NULL;
+  struct sockaddr_in myaddr;
+  memset(&myaddr, 0, sizeof(myaddr));
+
+  g_message ("Attempting to listen on port %d.....\n",port);
 
   if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
   {
@@ -1098,10 +1070,10 @@ fs_msn_open_listening_port (FsMsnStream *stream,
 
   // set non-blocking mode
   fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
-  theiraddr.sin_family = AF_INET;
-  theiraddr.sin_port = htons (port);
+  myaddr.sin_family = AF_INET;
+  myaddr.sin_port = htons (port);
   // bind
-  if (bind(fd, (struct sockaddr *) &theiraddr, sizeof(theiraddr)) != 0)
+  if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
   {
     close (fd);
     return;
@@ -1114,14 +1086,17 @@ fs_msn_open_listening_port (FsMsnStream *stream,
     close (fd);
     return;
   }
-  chan = g_io_channel_unix_new (fd);
-  g_io_channel_set_close_on_unref (chan, TRUE);
-
-  g_array_append_val (self->priv->fdlist, chan);
+  pollfd = g_slice_new0 (FsMsnPollFD);
+  gst_poll_fd_init (&pollfd->pollfd);
+  pollfd->pollfd.fd = fd;
+  pollfd->want_read = TRUE;
+  pollfd->want_write = FALSE;
+  gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
+  gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
+  pollfd->next_step = fd_accept_connection_cb;
+
+  g_array_append_val (self->priv->pollfds, pollfd);
   g_message ("Listening on port %d\n",port);
-  self->priv->in_watch = g_io_add_watch(chan,
-      G_IO_IN | G_IO_ERR | G_IO_HUP | G_IO_NVAL,
-      fd_accept_connection_cb, self);
 
 }
 
@@ -1228,23 +1203,14 @@ connection_polling_thread (gpointer data)
         FsMsnPollFD *pollfd = &g_array_index(self->priv->pollfds,
             FsMsnPollFD, i);
 
-        if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd))
-        {
-          gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
-          close (pollfd->pollfd.fd);
-          g_array_remove_index_fast (self->priv->pollfds, i);
-          i--;
-          continue;
-        }
-
-        if (gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
+        if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
+            gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
         {
-          gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
-          g_array_remove_index_fast (self->priv->pollfds, i);
+          pollfd->next_step (self, pollfd);
+          shutdown_fd (self, pollfd);
           i--;
           continue;
         }
-
         if ((pollfd->want_read &&
                 gst_poll_fd_can_read (self->priv->poll, &pollfd->pollfd)) ||
             (pollfd->want_write &&
@@ -1260,3 +1226,25 @@ connection_polling_thread (gpointer data)
 
   return NULL;
 }
+
+static void
+shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd)
+{
+  gint i;
+
+
+  if (!gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
+    close (pollfd->pollfd.fd);
+  gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
+  for (i = 0; i < self->priv->pollfds->len; i++)
+  {
+    FsMsnPollFD *p = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+    if (p == pollfd)
+    {
+      g_array_remove_index_fast (self->priv->pollfds, i);
+      break;
+    }
+
+  }
+  gst_poll_restart (self->priv->poll);
+}
-- 
1.5.6.5




More information about the farsight-commits mailing list