[farsight2/master] Add polling thread
Olivier Crête
olivier.crete at collabora.co.uk
Tue Jul 14 09:50:30 PDT 2009
---
gst/fsmsnconference/fs-msn-stream.c | 110 +++++++++++++++++++++++++++++------
1 files changed, 92 insertions(+), 18 deletions(-)
diff --git a/gst/fsmsnconference/fs-msn-stream.c b/gst/fsmsnconference/fs-msn-stream.c
index c94c904..4b49ce7 100644
--- a/gst/fsmsnconference/fs-msn-stream.c
+++ b/gst/fsmsnconference/fs-msn-stream.c
@@ -66,6 +66,17 @@ enum
PROP_PORT
};
+
+typedef struct _FsMsnPollFD FsMsnPollFD;
+
+struct _FsMsnPollFD {
+ GstPollFD pollfd;
+ gboolean want_read;
+ gboolean want_write;
+ void (*next_step) (FsMsnStream *self, FsMsnPollFD *fd);
+};
+
+
struct _FsMsnStreamPrivate
{
FsMsnSession *session;
@@ -81,12 +92,10 @@ struct _FsMsnStreamPrivate
gint port;
GIOChannel *connection;
+ GThread *polling_thread;
+ GstClockTime poll_timeout;
GstPoll *poll;
- GstPollFD listening_socket;
- GArray *outgoing_pollfds;
- GArray *incoming_pollfds;
-
- /* Protected by the session mutex */
+ GArray *pollfds;
GError *construction_error;
@@ -139,6 +148,9 @@ static gboolean fs_msn_authenticate_outgoing (FsMsnStream *stream,
static void fs_msn_open_listening_port (FsMsnStream *stream,
guint16 port);
+static gpointer connection_polling_thread (gpointer data);
+
+
static GObjectClass *parent_class = NULL;
static void
@@ -249,10 +261,9 @@ fs_msn_stream_init (FsMsnStream *self)
self->priv->direction = FS_DIRECTION_NONE;
self->priv->fdlist = g_array_new (FALSE, FALSE, sizeof(GIOChannel *));
+ self->priv->poll_timeout = GST_CLOCK_TIME_NONE;
self->priv->poll = gst_poll_new (TRUE);
- gst_poll_fd_init (&self->priv->listening_socket);
- self->priv->incoming_pollfds = g_array_new (TRUE, TRUE, sizeof(GstPollFD));
- self->priv->outgoing_pollfds = g_array_new (TRUE, TRUE, sizeof(GstPollFD));
+ self->priv->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
}
static void
@@ -266,21 +277,28 @@ fs_msn_stream_dispose (GObject *object)
return;
}
+ if (self->priv->polling_thread)
+ {
+ gst_poll_set_flushing (self->priv->poll, TRUE);
+ g_thread_join (self->priv->polling_thread);
+ self->priv->polling_thread = NULL;
+ }
+
if (self->priv->participant)
{
g_object_unref (self->priv->participant);
self->priv->participant = NULL;
}
- /* Make sure dispose does not run twice. */
- self->priv->disposed = TRUE;
-
if (self->priv->session)
{
g_object_unref (self->priv->session);
self->priv->session = NULL;
}
+ /* Make sure dispose does not run twice. */
+ self->priv->disposed = TRUE;
+
parent_class->dispose (object);
}
@@ -292,12 +310,9 @@ fs_msn_stream_finalize (GObject *object)
gst_poll_free (self->priv->poll);
- for (i = 0; i < self->priv->incoming_pollfds->len; i++)
- close (g_array_index(self->priv->incoming_pollfds, GstPollFD, i).fd);
- g_array_free (self->priv->incoming_pollfds, TRUE);
- for (i = 0; i < self->priv->outgoing_pollfds->len; i++)
- close (g_array_index(self->priv->outgoing_pollfds, GstPollFD, i).fd);
- g_array_free (self->priv->outgoing_pollfds, TRUE);
+ for (i = 0; i < self->priv->pollfds->len; i++)
+ close (g_array_index(self->priv->pollfds, FsMsnPollFD, i).pollfd.fd);
+ g_array_free (self->priv->pollfds, TRUE);
parent_class->finalize (object);
}
@@ -661,6 +676,9 @@ fs_msn_stream_constructed (GObject *object)
"Direction must be sending OR receiving");
}
+ self->priv->polling_thread = g_thread_create (connection_polling_thread,
+ self, TRUE, &self->priv->construction_error);
+
GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}
@@ -704,7 +722,7 @@ static gboolean main_fd_closed_cb (GIOChannel *ch,
g_message ("disconnection on video feed %p %p", ch, self->priv->connection);
g_source_remove (self->priv->main_watch);
- /* FIXME - How to handle the disconnection of the stream
+ /* IXME - How to handle the disconnection of the stream
Destroy the elements involved?
Set the state to Null ?
*/
@@ -1186,3 +1204,59 @@ fs_msn_stream_new (FsMsnSession *session,
return self;
}
+
+
+static gpointer
+connection_polling_thread (gpointer data)
+{
+ FsMsnStream *self = data;
+ gint ret;
+ GstClockTime timeout;
+
+ GST_OBJECT_LOCK (self->priv->conference);
+ timeout = self->priv->poll_timeout;
+ GST_OBJECT_UNLOCK (self->priv->conference);
+
+ while ((ret = gst_poll_wait (self->priv->poll, timeout)) >= 0)
+ {
+ if (ret > 0)
+ {
+ gint i;
+
+ for (i = 0; i < self->priv->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd = &g_array_index(self->priv->pollfds,
+ FsMsnPollFD, i);
+
+ if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd))
+ {
+ gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
+ close (pollfd->pollfd.fd);
+ g_array_remove_index_fast (self->priv->pollfds, i);
+ i--;
+ continue;
+ }
+
+ if (gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
+ {
+ gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
+ g_array_remove_index_fast (self->priv->pollfds, i);
+ i--;
+ continue;
+ }
+
+ if ((pollfd->want_read &&
+ gst_poll_fd_can_read (self->priv->poll, &pollfd->pollfd)) ||
+ (pollfd->want_write &&
+ gst_poll_fd_can_write (self->priv->poll, &pollfd->pollfd)))
+ pollfd->next_step (self, pollfd);
+ }
+
+ }
+ GST_OBJECT_LOCK (self->priv->conference);
+ timeout = self->priv->poll_timeout;
+ GST_OBJECT_UNLOCK (self->priv->conference);
+ }
+
+ return NULL;
+}
--
1.5.6.5
More information about the farsight-commits
mailing list