[farsight2/master] Add polling thread

Olivier Crête olivier.crete at collabora.co.uk
Tue Jul 14 09:50:30 PDT 2009


---
 gst/fsmsnconference/fs-msn-stream.c |  110 +++++++++++++++++++++++++++++------
 1 files changed, 92 insertions(+), 18 deletions(-)

diff --git a/gst/fsmsnconference/fs-msn-stream.c b/gst/fsmsnconference/fs-msn-stream.c
index c94c904..4b49ce7 100644
--- a/gst/fsmsnconference/fs-msn-stream.c
+++ b/gst/fsmsnconference/fs-msn-stream.c
@@ -66,6 +66,17 @@ enum
   PROP_PORT
 };
 
+
+typedef struct _FsMsnPollFD FsMsnPollFD;
+
+struct _FsMsnPollFD {
+  GstPollFD pollfd;
+  gboolean want_read;
+  gboolean want_write;
+  void (*next_step) (FsMsnStream *self, FsMsnPollFD *fd);
+};
+
+
 struct _FsMsnStreamPrivate
 {
   FsMsnSession *session;
@@ -81,12 +92,10 @@ struct _FsMsnStreamPrivate
   gint port;
   GIOChannel *connection;
 
+  GThread *polling_thread;
+  GstClockTime poll_timeout;
   GstPoll *poll;
-  GstPollFD  listening_socket;
-  GArray *outgoing_pollfds;
-  GArray *incoming_pollfds;
-
-  /* Protected by the session mutex */
+  GArray *pollfds;
 
   GError *construction_error;
 
@@ -139,6 +148,9 @@ static gboolean fs_msn_authenticate_outgoing (FsMsnStream *stream,
 static void fs_msn_open_listening_port (FsMsnStream *stream,
     guint16 port);
 
+static gpointer connection_polling_thread (gpointer data);
+
+
 static GObjectClass *parent_class = NULL;
 
 static void
@@ -249,10 +261,9 @@ fs_msn_stream_init (FsMsnStream *self)
   self->priv->direction = FS_DIRECTION_NONE;
   self->priv->fdlist = g_array_new (FALSE, FALSE, sizeof(GIOChannel *));
 
+  self->priv->poll_timeout = GST_CLOCK_TIME_NONE;
   self->priv->poll = gst_poll_new (TRUE);
-  gst_poll_fd_init (&self->priv->listening_socket);
-  self->priv->incoming_pollfds = g_array_new (TRUE, TRUE, sizeof(GstPollFD));
-  self->priv->outgoing_pollfds = g_array_new (TRUE, TRUE, sizeof(GstPollFD));
+  self->priv->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
 }
 
 static void
@@ -266,21 +277,28 @@ fs_msn_stream_dispose (GObject *object)
     return;
   }
 
+  if (self->priv->polling_thread)
+  {
+    gst_poll_set_flushing (self->priv->poll, TRUE);
+    g_thread_join (self->priv->polling_thread);
+    self->priv->polling_thread = NULL;
+  }
+
   if (self->priv->participant)
   {
     g_object_unref (self->priv->participant);
     self->priv->participant = NULL;
   }
 
-  /* Make sure dispose does not run twice. */
-  self->priv->disposed = TRUE;
-
   if (self->priv->session)
   {
     g_object_unref (self->priv->session);
     self->priv->session = NULL;
   }
 
+  /* Make sure dispose does not run twice. */
+  self->priv->disposed = TRUE;
+
   parent_class->dispose (object);
 }
 
@@ -292,12 +310,9 @@ fs_msn_stream_finalize (GObject *object)
 
   gst_poll_free (self->priv->poll);
 
-  for (i = 0; i < self->priv->incoming_pollfds->len; i++)
-    close (g_array_index(self->priv->incoming_pollfds, GstPollFD, i).fd);
-  g_array_free (self->priv->incoming_pollfds, TRUE);
-  for (i = 0; i < self->priv->outgoing_pollfds->len; i++)
-    close (g_array_index(self->priv->outgoing_pollfds, GstPollFD, i).fd);
-  g_array_free (self->priv->outgoing_pollfds, TRUE);
+  for (i = 0; i < self->priv->pollfds->len; i++)
+    close (g_array_index(self->priv->pollfds, FsMsnPollFD, i).pollfd.fd);
+  g_array_free (self->priv->pollfds, TRUE);
 
   parent_class->finalize (object);
 }
@@ -661,6 +676,9 @@ fs_msn_stream_constructed (GObject *object)
         "Direction must be sending OR receiving");
   }
 
+  self->priv->polling_thread = g_thread_create (connection_polling_thread,
+      self, TRUE, &self->priv->construction_error);
+
   GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
 }
 
@@ -704,7 +722,7 @@ static gboolean main_fd_closed_cb (GIOChannel *ch,
 
   g_message ("disconnection on video feed %p %p", ch, self->priv->connection);
   g_source_remove (self->priv->main_watch);
-  /* FIXME - How to handle the disconnection of the stream
+  /* IXME - How to handle the disconnection of the stream
      Destroy the elements involved?
      Set the state to Null ?
   */
@@ -1186,3 +1204,59 @@ fs_msn_stream_new (FsMsnSession *session,
 
   return self;
 }
+
+
+static gpointer
+connection_polling_thread (gpointer data)
+{
+  FsMsnStream *self = data;
+  gint ret;
+  GstClockTime timeout;
+
+  GST_OBJECT_LOCK (self->priv->conference);
+  timeout = self->priv->poll_timeout;
+  GST_OBJECT_UNLOCK (self->priv->conference);
+
+  while ((ret = gst_poll_wait (self->priv->poll, timeout)) >= 0)
+  {
+    if (ret > 0)
+    {
+      gint i;
+
+      for (i = 0; i < self->priv->pollfds->len; i++)
+      {
+        FsMsnPollFD *pollfd = &g_array_index(self->priv->pollfds,
+            FsMsnPollFD, i);
+
+        if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd))
+        {
+          gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
+          close (pollfd->pollfd.fd);
+          g_array_remove_index_fast (self->priv->pollfds, i);
+          i--;
+          continue;
+        }
+
+        if (gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
+        {
+          gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
+          g_array_remove_index_fast (self->priv->pollfds, i);
+          i--;
+          continue;
+        }
+
+        if ((pollfd->want_read &&
+                gst_poll_fd_can_read (self->priv->poll, &pollfd->pollfd)) ||
+            (pollfd->want_write &&
+                gst_poll_fd_can_write (self->priv->poll, &pollfd->pollfd)))
+          pollfd->next_step (self, pollfd);
+      }
+
+    }
+    GST_OBJECT_LOCK (self->priv->conference);
+    timeout = self->priv->poll_timeout;
+    GST_OBJECT_UNLOCK (self->priv->conference);
+  }
+
+  return NULL;
+}
-- 
1.5.6.5




More information about the farsight-commits mailing list