[farsight2/master] Port msnstream to use GstPoll instead of GIOChannel
Youness Alaoui
youness.alaoui at collabora.co.uk
Tue Jul 14 09:50:44 PDT 2009
---
gst/fsmsnconference/fs-msn-stream.c | 290 +++++++++++++++++------------------
1 files changed, 139 insertions(+), 151 deletions(-)
diff --git a/gst/fsmsnconference/fs-msn-stream.c b/gst/fsmsnconference/fs-msn-stream.c
index 4b49ce7..789d952 100644
--- a/gst/fsmsnconference/fs-msn-stream.c
+++ b/gst/fsmsnconference/fs-msn-stream.c
@@ -73,7 +73,7 @@ struct _FsMsnPollFD {
GstPollFD pollfd;
gboolean want_read;
gboolean want_write;
- void (*next_step) (FsMsnStream *self, FsMsnPollFD *fd);
+ void (*next_step) (FsMsnStream *self, FsMsnPollFD *pollfd);
};
@@ -82,15 +82,12 @@ struct _FsMsnStreamPrivate
FsMsnSession *session;
FsMsnParticipant *participant;
FsStreamDirection direction;
- GArray *fdlist;
FsMsnConference *conference;
GstElement *media_fd_src,*media_fd_sink,*send_valve;
GstPad *sink_pad,*src_pad;
- guint in_watch, out_watch, main_watch;
gint local_recipientid, local_sessionid;
gint remote_recipientid, remote_sessionid;
gint port;
- GIOChannel *connection;
GThread *polling_thread;
GstClockTime poll_timeout;
@@ -130,13 +127,11 @@ static gboolean fs_msn_stream_set_remote_candidate (FsMsnStream *stream,
FsCandidate *candidate,
GError **error);
-static gboolean main_fd_closed_cb (GIOChannel *ch,
- GIOCondition cond,
- gpointer data);
+static void main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *fd);
-static gboolean successfull_connection_cb (GIOChannel *ch,
- GIOCondition cond,
- gpointer data);
+static void successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
+
+static void fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
static gboolean fs_msn_stream_attempt_connection (FsMsnStream *stream,
gchar const *ip,
@@ -150,6 +145,7 @@ static void fs_msn_open_listening_port (FsMsnStream *stream,
static gpointer connection_polling_thread (gpointer data);
+static void shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd);
static GObjectClass *parent_class = NULL;
@@ -259,10 +255,10 @@ fs_msn_stream_init (FsMsnStream *self)
self->priv->participant = NULL;
self->priv->direction = FS_DIRECTION_NONE;
- self->priv->fdlist = g_array_new (FALSE, FALSE, sizeof(GIOChannel *));
self->priv->poll_timeout = GST_CLOCK_TIME_NONE;
self->priv->poll = gst_poll_new (TRUE);
+ gst_poll_set_controllable (self->priv->poll, TRUE);
self->priv->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
}
@@ -714,43 +710,36 @@ fs_msn_stream_set_remote_candidates (FsStream *stream, GList *candidates,
return TRUE;
}
-static gboolean main_fd_closed_cb (GIOChannel *ch,
- GIOCondition cond,
- gpointer data)
+static void
+main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
{
- FsMsnStream *self = FS_MSN_STREAM (data);
-
- g_message ("disconnection on video feed %p %p", ch, self->priv->connection);
- g_source_remove (self->priv->main_watch);
+ g_message ("disconnection on video feed");
/* IXME - How to handle the disconnection of the stream
Destroy the elements involved?
Set the state to Null ?
*/
- return FALSE;
}
-static gboolean successfull_connection_cb (GIOChannel *ch,
- GIOCondition cond,
- gpointer data)
+static void
+successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
{
- FsMsnStream *self = FS_MSN_STREAM (data);
gint error;
socklen_t option_len;
- gint fd = g_io_channel_unix_get_fd (ch);
- g_message ("handler called on fd %d", fd);
+ g_message ("handler called on fd %d", pollfd->pollfd.fd);
errno = 0;
- if (!((cond & G_IO_IN) || (cond & G_IO_OUT)))
+ if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
{
- g_message ("Condition received is %d", cond);
+ g_message ("connecton closed or error");
goto error;
}
option_len = sizeof(error);
/* Get the error option */
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
+ if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
{
g_warning ("getsockopt() failed");
goto error;
@@ -764,37 +753,35 @@ static gboolean successfull_connection_cb (GIOChannel *ch,
}
/* Remove NON BLOCKING MODE */
- if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
+ if (fcntl(pollfd->pollfd.fd, F_SETFL,
+ fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
{
g_warning ("fcntl() failed");
goto error;
}
- g_message ("Got connection on fd %d", fd);
+ g_message ("Got connection on fd %d", pollfd->pollfd.fd);
- if (fs_msn_authenticate_outgoing (self,fd))
+ if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
{
- g_message ("Authenticated outgoing successfully fd %d", fd);
- self->priv->connection = ch;
+ g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
// success! we need to shutdown/close all other channels
gint i;
- for (i = 0; i < self->priv->fdlist->len; i++)
+ for (i = 0; i < self->priv->pollfds->len; i++)
{
- GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
- if (chan != ch)
+ FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+ if (pollfd != pollfd2)
{
- g_message ("closing fd %d", g_io_channel_unix_get_fd (chan));
- g_io_channel_shutdown (chan, TRUE, NULL);
- g_io_channel_unref (chan);
- g_array_remove_index (self->priv->fdlist, i);
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
}
}
- g_source_remove (self->priv->out_watch);
if (self->priv->direction == FS_DIRECTION_RECV)
{
- g_message("Setting media_fd_src on fd %d",fd);
+ g_message("Setting media_fd_src on fd %d", pollfd->pollfd.fd);
GstState state;
gst_element_get_state(self->priv->media_fd_src, &state, NULL,
@@ -805,13 +792,13 @@ static gboolean successfull_connection_cb (GIOChannel *ch,
g_message("Error: fdsrc in state above ready");
gst_element_set_state(self->priv->media_fd_src,GST_STATE_READY);
}
- g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", fd, NULL);
+ g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", pollfd->pollfd.fd, NULL);
gst_element_set_locked_state(self->priv->media_fd_src,FALSE);
gst_element_sync_state_with_parent(self->priv->media_fd_src);
}
else if (self->priv->direction == FS_DIRECTION_SEND)
{
- g_message("Setting media_fd_sink on fd %d",fd);
+ g_message("Setting media_fd_sink on fd %d", pollfd->pollfd.fd);
GstState state;
gst_element_get_state(self->priv->media_fd_sink, &state, NULL,
@@ -822,40 +809,41 @@ static gboolean successfull_connection_cb (GIOChannel *ch,
g_message("Error: fdsrc in state above ready");
gst_element_set_state(self->priv->media_fd_sink,GST_STATE_READY);
}
- g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", fd, NULL);
+ g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", pollfd->pollfd.fd, NULL);
gst_element_set_locked_state(self->priv->media_fd_sink,FALSE);
gst_element_sync_state_with_parent(self->priv->media_fd_sink);
g_object_set (G_OBJECT (self->priv->send_valve), "drop", FALSE, NULL);
}
- // add a watch on this fd to when it disconnects
- self->priv->main_watch = g_io_add_watch (ch,
- (G_IO_ERR|G_IO_HUP|G_IO_NVAL),
- main_fd_closed_cb, self);
- return FALSE;
+ pollfd->want_read = FALSE;
+ pollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, FALSE);
+ gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
+ pollfd->next_step = main_fd_closed_cb;
+ return;
}
else
{
- g_message ("Authentification failed on fd %d", fd);
+ g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
}
/* Error */
error:
- g_message ("Got error from fd %d, closing", fd);
+ g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
// find, shutdown and remove channel from fdlist
gint i;
- for (i = 0; i < self->priv->fdlist->len; i++)
+ for (i = 0; i < self->priv->pollfds->len; i++)
{
- GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
- if (ch == chan)
+ FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+ if (pollfd == pollfd2)
{
- g_io_channel_shutdown (chan, TRUE, NULL);
- g_io_channel_unref (chan);
- g_array_remove_index (self->priv->fdlist, i);
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
}
}
- return FALSE;
+ return;
}
static gboolean
@@ -864,8 +852,7 @@ fs_msn_stream_attempt_connection (FsMsnStream *stream,
guint16 port)
{
FsMsnStream *self = FS_MSN_STREAM (stream);
-
- GIOChannel *chan;
+ FsMsnPollFD *pollfd = NULL;
gint fd = -1;
struct sockaddr_in theiraddr;
memset(&theiraddr, 0, sizeof(theiraddr));
@@ -877,10 +864,6 @@ fs_msn_stream_attempt_connection (FsMsnStream *stream,
return FALSE;
}
- chan = g_io_channel_unix_new (fd);
- g_io_channel_set_close_on_unref (chan, TRUE);
- g_array_append_val (self->priv->fdlist, chan);
-
// set non-blocking mode
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
@@ -895,17 +878,22 @@ fs_msn_stream_attempt_connection (FsMsnStream *stream,
{
if (errno != EINPROGRESS)
{
- g_io_channel_shutdown (chan, TRUE, NULL);
- g_io_channel_unref (chan);
+ close (fd);
return FALSE;
}
}
g_message("ret %d %d %s", ret, errno, strerror(errno));
- // add a watch on that io for when it connects
- self->priv->out_watch = g_io_add_watch (chan,
- (G_IO_IN|G_IO_OUT|G_IO_PRI|G_IO_ERR|G_IO_HUP|G_IO_NVAL),
- successfull_connection_cb, self);
+ pollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&pollfd->pollfd);
+ pollfd->pollfd.fd = fd;
+ pollfd->want_read = TRUE;
+ pollfd->want_write = TRUE;
+ gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
+ gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, TRUE);
+ pollfd->next_step = successfull_connection_cb;
+ g_array_append_val (self->priv->pollfds, pollfd);
+
return TRUE;
}
@@ -963,47 +951,28 @@ fs_msn_authenticate_incoming (FsMsnStream *stream, gint fd)
return FALSE;
}
-static gboolean
-fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
+static void
+fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
{
- FsMsnStream *self = FS_MSN_STREAM (data);
struct sockaddr_in in;
- int fd;
- GIOChannel *newchan = NULL;
+ int fd = -1;
socklen_t n = sizeof (in);
+ FsMsnPollFD *newpollfd = NULL;
- if (!(cond & G_IO_IN))
+ if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
{
- g_message ("Error in condition not G_IO_IN on %d see next error",
- g_io_channel_unix_get_fd (ch));
- if (cond & G_IO_OUT)
- g_message("Error: Data can be written (without blocking) in the"
- " accept_connection cb");
- else if (cond & G_IO_PRI)
- g_message("Error: There is urgent data to read in the accept_connection"
- " cb");
- else if (cond & G_IO_ERR)
- g_message("Error: There is an error in the accept_connection cb");
- else if (cond & G_IO_HUP)
- g_message("Error: Hung up (the connection has been broken, usually for"
- " pipes and sockets) in the accept_connection cb");
- else if (cond & G_IO_NVAL)
- g_message("Error: Invalid request. The file descriptor is not open in"
- " the accept_connection cb");
- return FALSE;
+ g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
+ goto error;
}
- if ((fd = accept(g_io_channel_unix_get_fd (ch),
+ if ((fd = accept(pollfd->pollfd.fd,
(struct sockaddr*) &in, &n)) == -1)
{
g_message ("Error while running accept() %d", errno);
- return FALSE;
+ return;
}
- // ok we got a connection, let's set it up
- newchan = g_io_channel_unix_new (fd);
- g_io_channel_set_close_on_unref (newchan, TRUE);
-
/* Remove NON BLOCKING MODE */
if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
{
@@ -1015,21 +984,15 @@ fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
if (fs_msn_authenticate_incoming(self,fd))
{
g_message ("Authenticated incoming successfully fd %d", fd);
- self->priv->connection = newchan;
// success! we need to shutdown/close all other channels
- g_source_remove (self->priv->in_watch);
gint i;
- for (i = 0; i < self->priv->fdlist->len; i++)
+ for (i = 0; i < self->priv->pollfds->len; i++)
{
- GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
- if (chan != newchan)
- {
- g_message ("closing fd %d", g_io_channel_unix_get_fd (chan));
- g_io_channel_shutdown (chan, TRUE, NULL);
- g_io_channel_unref (chan);
- g_array_remove_index (self->priv->fdlist, i);
- }
+ FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
}
if (self->priv->direction == FS_DIRECTION_RECV)
{
@@ -1051,11 +1014,17 @@ fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
else if (self->priv->direction == FS_DIRECTION_SEND)
{
}
- // add a watch on this fd to when it disconnects
- self->priv->main_watch = g_io_add_watch (newchan,
- (G_IO_ERR|G_IO_HUP|G_IO_NVAL),
- main_fd_closed_cb, self);
- return FALSE;
+
+ newpollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&newpollfd->pollfd);
+ newpollfd->pollfd.fd = fd;
+ newpollfd->want_read = FALSE;
+ newpollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->priv->poll, &newpollfd->pollfd, FALSE);
+ gst_poll_fd_ctl_write (self->priv->poll, &newpollfd->pollfd, FALSE);
+ newpollfd->next_step = main_fd_closed_cb;
+ g_array_append_val (self->priv->pollfds, newpollfd);
+ return;
}
/* Error */
@@ -1063,18 +1032,21 @@ fd_accept_connection_cb (GIOChannel *ch, GIOCondition cond, gpointer data)
g_message ("Got error from fd %d, closing", fd);
// find, shutdown and remove channel from fdlist
gint i;
- for (i = 0; i < self->priv->fdlist->len; i++)
+ for (i = 0; i < self->priv->pollfds->len; i++)
{
- GIOChannel *chan = g_array_index(self->priv->fdlist, GIOChannel*, i);
- if (newchan == chan)
+ FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+ if (pollfd == pollfd2)
{
- g_io_channel_shutdown (chan, TRUE, NULL);
- g_io_channel_unref (chan);
- g_array_remove_index (self->priv->fdlist, i);
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
}
}
- return FALSE;
+ if (fd > 0)
+ close (fd);
+
+ return;
}
static void
@@ -1082,12 +1054,12 @@ fs_msn_open_listening_port (FsMsnStream *stream,
guint16 port)
{
FsMsnStream *self = FS_MSN_STREAM (stream);
- g_message ("Attempting to listen on port %d.....\n",port);
-
- GIOChannel *chan;
gint fd = -1;
- struct sockaddr_in theiraddr;
- memset(&theiraddr, 0, sizeof(theiraddr));
+ FsMsnPollFD *pollfd = NULL;
+ struct sockaddr_in myaddr;
+ memset(&myaddr, 0, sizeof(myaddr));
+
+ g_message ("Attempting to listen on port %d.....\n",port);
if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
{
@@ -1098,10 +1070,10 @@ fs_msn_open_listening_port (FsMsnStream *stream,
// set non-blocking mode
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
- theiraddr.sin_family = AF_INET;
- theiraddr.sin_port = htons (port);
+ myaddr.sin_family = AF_INET;
+ myaddr.sin_port = htons (port);
// bind
- if (bind(fd, (struct sockaddr *) &theiraddr, sizeof(theiraddr)) != 0)
+ if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
{
close (fd);
return;
@@ -1114,14 +1086,17 @@ fs_msn_open_listening_port (FsMsnStream *stream,
close (fd);
return;
}
- chan = g_io_channel_unix_new (fd);
- g_io_channel_set_close_on_unref (chan, TRUE);
-
- g_array_append_val (self->priv->fdlist, chan);
+ pollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&pollfd->pollfd);
+ pollfd->pollfd.fd = fd;
+ pollfd->want_read = TRUE;
+ pollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
+ gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
+ pollfd->next_step = fd_accept_connection_cb;
+
+ g_array_append_val (self->priv->pollfds, pollfd);
g_message ("Listening on port %d\n",port);
- self->priv->in_watch = g_io_add_watch(chan,
- G_IO_IN | G_IO_ERR | G_IO_HUP | G_IO_NVAL,
- fd_accept_connection_cb, self);
}
@@ -1228,23 +1203,14 @@ connection_polling_thread (gpointer data)
FsMsnPollFD *pollfd = &g_array_index(self->priv->pollfds,
FsMsnPollFD, i);
- if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd))
- {
- gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
- close (pollfd->pollfd.fd);
- g_array_remove_index_fast (self->priv->pollfds, i);
- i--;
- continue;
- }
-
- if (gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
+ if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
{
- gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
- g_array_remove_index_fast (self->priv->pollfds, i);
+ pollfd->next_step (self, pollfd);
+ shutdown_fd (self, pollfd);
i--;
continue;
}
-
if ((pollfd->want_read &&
gst_poll_fd_can_read (self->priv->poll, &pollfd->pollfd)) ||
(pollfd->want_write &&
@@ -1260,3 +1226,25 @@ connection_polling_thread (gpointer data)
return NULL;
}
+
+static void
+shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd)
+{
+ gint i;
+
+
+ if (!gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
+ close (pollfd->pollfd.fd);
+ gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
+ for (i = 0; i < self->priv->pollfds->len; i++)
+ {
+ FsMsnPollFD *p = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
+ if (p == pollfd)
+ {
+ g_array_remove_index_fast (self->priv->pollfds, i);
+ break;
+ }
+
+ }
+ gst_poll_restart (self->priv->poll);
+}
--
1.5.6.5
More information about the farsight-commits
mailing list