[farsight2/master] Making the msn-connection use a state machine per socket and not use blocking sockets. Fixed GstPoll and tested it, it works!
Youness Alaoui
youness.alaoui at collabora.co.uk
Tue Jul 14 09:50:32 PDT 2009
---
gst/fsmsnconference/fs-msn-connection.c | 803 +++++++++++++++++++------------
gst/fsmsnconference/fs-msn-connection.h | 8 +-
gst/fsmsnconference/fs-msn-session.c | 4 +-
3 files changed, 500 insertions(+), 315 deletions(-)
diff --git a/gst/fsmsnconference/fs-msn-connection.c b/gst/fsmsnconference/fs-msn-connection.c
index ffb43b2..553ac9a 100644
--- a/gst/fsmsnconference/fs-msn-connection.c
+++ b/gst/fsmsnconference/fs-msn-connection.c
@@ -5,6 +5,7 @@
* Copyright 2007 Nokia Corp.
* Copyright 2007 Collabora Ltd.
* @author: Olivier Crete <olivier.crete at collabora.co.uk>
+ * @author: Youness Alaoui <youness.alaoui at collabora.co.uk>
*
* fs-msn-connection.c - A MSN Connection gobject
*
@@ -33,6 +34,7 @@
#include <netinet/in.h>
#include <fcntl.h>
+#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
@@ -44,6 +46,7 @@ enum
LAST_SIGNAL
};
+
/* props */
enum
{
@@ -51,42 +54,48 @@ enum
};
+typedef enum {
+ FS_MSN_STATUS_AUTH,
+ FS_MSN_STATUS_CONNECTED,
+ FS_MSN_STATUS_CONNECTED2,
+ FS_MSN_STATUS_SEND_RECEIVE,
+ FS_MSN_STATUS_PAUSED,
+} FsMsnStatus;
+
typedef struct _FsMsnPollFD FsMsnPollFD;
+typedef void (*PollFdCallback) (FsMsnConnection *self, FsMsnPollFD *pollfd);
struct _FsMsnPollFD {
GstPollFD pollfd;
+ FsMsnStatus status;
+ gboolean server;
gboolean want_read;
gboolean want_write;
- void (*next_step) (FsMsnConnection *self, FsMsnPollFD *pollfd);
+ PollFdCallback callback;
};
G_DEFINE_TYPE(FsMsnConnection, fs_msn_connection, G_TYPE_OBJECT);
-static void fs_msn_connection_dispose (GObject *object);
-static void fs_msn_connection_finalize (GObject *object);
-
-static void main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *fd);
-
-static void successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static GObjectClass *parent_class = NULL;
-static void fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static void fs_msn_connection_dispose (GObject *object);
-static gboolean fs_msn_connection_attempt_connection (FsMsnConnection *connection,
- gchar const *ip,
- guint16 port);
-static gboolean fs_msn_authenticate_outgoing (FsMsnConnection *connection,
- gint fd);
+static gboolean fs_msn_connection_attempt_connection (
+ FsMsnConnection *connection,
+ FsCandidate *candidate);
+static gboolean fs_msn_open_listening_port (FsMsnConnection *connection,
+ guint16 port, NewLocalCandidateCB cb, gpointer data);
-static void fs_msn_open_listening_port (FsMsnConnection *connection,
- guint16 port);
+static void successful_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static void accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static void connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
static gpointer connection_polling_thread (gpointer data);
-
static void shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd);
-
-static GObjectClass *parent_class = NULL;
+static FsMsnPollFD * add_pollfd (FsMsnConnection *self, int fd,
+ PollFdCallback callback, gboolean read, gboolean write);
static void
fs_msn_connection_class_init (FsMsnConnectionClass *klass)
@@ -96,7 +105,6 @@ fs_msn_connection_class_init (FsMsnConnectionClass *klass)
parent_class = g_type_class_peek_parent (klass);
gobject_class->dispose = fs_msn_connection_dispose;
- gobject_class->finalize = fs_msn_connection_finalize;
}
static void
@@ -108,18 +116,26 @@ fs_msn_connection_init (FsMsnConnection *self)
self->poll_timeout = GST_CLOCK_TIME_NONE;
self->poll = gst_poll_new (TRUE);
- gst_poll_set_controllable (self->poll, TRUE);
- self->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
+ gst_poll_set_flushing (self->poll, FALSE);
+ self->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD *));
+
+ g_static_rec_mutex_init (&self->mutex);
}
static void
fs_msn_connection_dispose (GObject *object)
{
FsMsnConnection *self = FS_MSN_CONNECTION (object);
+ gint i;
+
+ g_static_rec_mutex_lock (&self->mutex);
/* If dispose did already run, return. */
if (self->disposed)
+ {
+ g_static_rec_mutex_unlock (&self->mutex);
return;
+ }
if (self->polling_thread)
{
@@ -133,26 +149,19 @@ fs_msn_connection_dispose (GObject *object)
if (self->remote_recipient_id)
g_free (self->remote_recipient_id);
- /* Make sure dispose does not run twice. */
- self->disposed = TRUE;
-
- parent_class->dispose (object);
-}
-
-static void
-fs_msn_connection_finalize (GObject *object)
-{
- FsMsnConnection *self = FS_MSN_CONNECTION (object);
- guint i;
-
- /* TODO : why not in dispose */
gst_poll_free (self->poll);
for (i = 0; i < self->pollfds->len; i++)
- close (g_array_index(self->pollfds, FsMsnPollFD, i).pollfd.fd);
+ close (g_array_index(self->pollfds, FsMsnPollFD *, i)->pollfd.fd);
g_array_free (self->pollfds, TRUE);
- parent_class->finalize (object);
+
+ /* Make sure dispose does not run twice. */
+ self->disposed = TRUE;
+
+ parent_class->dispose (object);
+
+ g_static_rec_mutex_unlock (&self->mutex);
}
/**
@@ -181,17 +190,23 @@ fs_msn_connection_new (guint session_id, guint initial_port)
}
gboolean
-fs_msn_connection_gather_local_candidates (FsMsnConnection *self)
+fs_msn_connection_gather_local_candidates (FsMsnConnection *self,
+ NewLocalCandidateCB cb, gpointer data)
{
-
- fs_msn_open_listening_port (self, self->initial_port);
+ gboolean ret = FALSE;
+ g_static_rec_mutex_lock (&self->mutex);
self->polling_thread = g_thread_create (connection_polling_thread,
self, TRUE, NULL);
- return self->polling_thread != NULL;
+ if (self->polling_thread)
+ ret = fs_msn_open_listening_port (self, self->initial_port, cb, data);
+
+ g_static_rec_mutex_unlock (&self->mutex);
+ return ret;
}
+
/**
* fs_msn_connection_set_remote_candidate:
*/
@@ -200,6 +215,12 @@ fs_msn_connection_set_remote_candidates (FsMsnConnection *self,
GList *candidates, GError **error)
{
GList *item = NULL;
+ gchar *recipient_id = NULL;
+ gboolean ret = FALSE;
+
+ g_static_rec_mutex_lock (&self->mutex);
+
+ recipient_id = self->remote_recipient_id;
for (item = candidates; item; item = g_list_next (item))
{
@@ -209,148 +230,129 @@ fs_msn_connection_set_remote_candidates (FsMsnConnection *self,
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidate passed does not contain a valid ip or port");
- return FALSE;
+ goto out;
}
if (!candidate->foundation)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidate passed does not have a foundation (MSN recipient ID)");
- return FALSE;
+ goto out;
}
- if (self->remote_recipient_id) {
- if (g_strcmp0 (candidate->foundation, self->remote_recipient_id) != 0)
+ if (recipient_id)
+ {
+ if (g_strcmp0 (candidate->foundation, recipient_id) != 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidates do not have the same recipient ID");
- return FALSE;
+ goto out;
}
- } else {
- self->remote_recipient_id = g_strdup (candidate->foundation);
}
- fs_msn_connection_attempt_connection(self, candidate->ip, candidate->port);
+ else
+ {
+ recipient_id = candidate->foundation;
+ }
}
- return TRUE;
-}
-
-
-static void
-main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
-{
- g_message ("disconnection on video feed");
- /* IXME - How to handle the disconnection of the connection
- Destroy the elements involved?
- Set the state to Null ?
- */
-}
-
-static void
-successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
-{
- gint error;
- socklen_t option_len;
-
- g_message ("handler called on fd %d", pollfd->pollfd.fd);
-
- errno = 0;
- if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
- gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ self->remote_recipient_id = g_strdup (recipient_id);
+ for (item = candidates; item; item = g_list_next (item))
{
- g_message ("connecton closed or error");
- goto error;
+ FsCandidate *candidate = item->data;
+ fs_msn_connection_attempt_connection(self, candidate);
}
- option_len = sizeof(error);
+ ret = TRUE;
+ out:
+ g_static_rec_mutex_unlock (&self->mutex);
+ return ret;
+}
- /* Get the error option */
- if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
- {
- g_warning ("getsockopt() failed");
- goto error;
- }
- /* Check if there is an error */
- if (error)
- {
- g_message ("getsockopt gave an error : %d", error);
- goto error;
- }
+static gboolean
+fs_msn_open_listening_port (FsMsnConnection *self,
+ guint16 port, NewLocalCandidateCB cb, gpointer data)
+{
+ gint fd = -1;
+ struct sockaddr_in myaddr;
+ memset(&myaddr, 0, sizeof(myaddr));
- /* Remove NON BLOCKING MODE */
- if (fcntl(pollfd->pollfd.fd, F_SETFL,
- fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
+ g_debug ("Attempting to listen on port %d.....",port);
+
+ if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
{
- g_warning ("fcntl() failed");
- goto error;
+ // show error
+ g_debug ("could not create socket!");
+ return FALSE;
}
- g_message ("Got connection on fd %d", pollfd->pollfd.fd);
-
- if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
+ // set non-blocking mode
+ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
+ myaddr.sin_family = AF_INET;
+ do
{
- g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
-
- // success! we need to shutdown/close all other channels
- gint i;
- for (i = 0; i < self->pollfds->len; i++)
+ g_debug ("Attempting to listen on port %d.....",port);
+ myaddr.sin_port = htons (port);
+ // bind
+ if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
{
- FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
- if (pollfd != pollfd2)
+ if (port != 0 && errno == EADDRINUSE)
{
- g_message ("closing fd %d", pollfd2->pollfd.fd);
- shutdown_fd (self, pollfd2);
- i--;
+ port++;
+ }
+ else
+ {
+ perror ("bind");
+ close (fd);
+ return FALSE;
+ }
+ } else {
+ /* Listen */
+ if (listen(fd, 3) != 0)
+ {
+ if (port != 0 && errno == EADDRINUSE)
+ {
+ port++;
+ }
+ else
+ {
+ perror ("listen");
+ close (fd);
+ return FALSE;
+ }
}
}
+ } while (errno == EADDRINUSE);
- /* TODO : callback */
+ add_pollfd (self, fd, accept_connection_cb, TRUE, TRUE);
- pollfd->want_read = FALSE;
- pollfd->want_write = FALSE;
- gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
- gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
- pollfd->next_step = main_fd_closed_cb;
- return;
- }
- else
- {
- g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
- }
+ g_debug ("Listening on port %d", port);
- /* Error */
- error:
- g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
- // find, shutdown and remove channel from fdlist
- gint i;
- for (i = 0; i < self->pollfds->len; i++)
- {
- FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
- if (pollfd == pollfd2)
- {
- g_message ("closing fd %d", pollfd2->pollfd.fd);
- shutdown_fd (self, pollfd2);
- i--;
- }
- }
+ FsCandidate * cand = fs_candidate_new ("123", 1, FS_CANDIDATE_TYPE_HOST,
+ FS_NETWORK_PROTOCOL_TCP, "127.0.0.1", port);
+ g_static_rec_mutex_unlock (&self->mutex);
+ cb (cand, data);
+ g_static_rec_mutex_lock (&self->mutex);
+ fs_candidate_destroy (cand);
- return;
+ self->local_recipient_id = g_strdup ("123");
+
+ return TRUE;
}
static gboolean
fs_msn_connection_attempt_connection (FsMsnConnection *connection,
- const gchar *ip,
- guint16 port)
+ FsCandidate *candidate)
{
FsMsnConnection *self = FS_MSN_CONNECTION (connection);
- FsMsnPollFD *pollfd = NULL;
+ FsMsnPollFD *pollfd;
gint fd = -1;
struct sockaddr_in theiraddr;
memset(&theiraddr, 0, sizeof(theiraddr));
+
if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
{
// show error
- g_message ("could not create socket!");
+ g_debug ("could not create socket!");
return FALSE;
}
@@ -358,79 +360,33 @@ fs_msn_connection_attempt_connection (FsMsnConnection *connection,
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
theiraddr.sin_family = AF_INET;
- theiraddr.sin_addr.s_addr = inet_addr (ip);
- theiraddr.sin_port = htons (port);
+ theiraddr.sin_addr.s_addr = inet_addr (candidate->ip);
+ theiraddr.sin_port = htons (candidate->port);
- g_message ("Attempting connection to %s %d on socket %d", ip, port, fd);
+ g_debug ("Attempting connection to %s %d on socket %d", candidate->ip,
+ candidate->port, fd);
// this is non blocking, the return value isn't too usefull
gint ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
if (ret < 0)
{
if (errno != EINPROGRESS)
{
+ g_debug("ret %d %d %s", ret, errno, strerror(errno));
close (fd);
return FALSE;
}
}
- g_message("ret %d %d %s", ret, errno, strerror(errno));
+ g_debug("ret %d %d %s", ret, errno, strerror(errno));
- pollfd = g_slice_new0 (FsMsnPollFD);
- gst_poll_fd_init (&pollfd->pollfd);
- pollfd->pollfd.fd = fd;
- pollfd->want_read = TRUE;
- pollfd->want_write = TRUE;
- gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
- gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
- pollfd->next_step = successfull_connection_cb;
- g_array_append_val (self->pollfds, pollfd);
- return TRUE;
-}
-
-static gboolean
-fs_msn_authenticate_incoming (FsMsnConnection *connection, gint fd)
-{
- FsMsnConnection *self = FS_MSN_CONNECTION (connection);
- if (fd != 0)
- {
- gchar str[400];
- gchar check[400];
+ pollfd = add_pollfd (self, fd, successful_connection_cb, TRUE, TRUE);
+ pollfd->server = FALSE;
- memset(str, 0, sizeof(str));
- if (recv(fd, str, sizeof(str), 0) != -1)
- {
- g_message ("Got %s, checking if it's auth", str);
- sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
- self->local_recipient_id, self->session_id);
- if (strcmp (str, check) != 0)
- {
- // send our connected message also
- memset(str, 0, sizeof(str));
- sprintf(str, "connected\r\n\r\n");
- send(fd, str, strlen(str), 0);
-
- // now we get connected
- memset(str, 0, sizeof(str));
- if (recv(fd, str, sizeof(str), 0) != -1)
- {
- if (strcmp (str, "connected\r\n\r\n") == 0)
- {
- g_message ("Authentication successfull");
- return TRUE;
- }
- }
- }
- }
- else
- {
- perror("auth");
- }
- }
- return FALSE;
+ return TRUE;
}
static void
-fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
{
struct sockaddr_in in;
int fd = -1;
@@ -440,168 +396,336 @@ fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
{
- g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
+ g_debug ("Error in accept socket : %d", pollfd->pollfd.fd);
goto error;
}
if ((fd = accept(pollfd->pollfd.fd,
(struct sockaddr*) &in, &n)) == -1)
{
- g_message ("Error while running accept() %d", errno);
+ g_debug ("Error while running accept() %d", errno);
return;
}
- /* Remove NON BLOCKING MODE */
- if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
- {
- g_warning ("fcntl() failed");
- goto error;
- }
+ newpollfd = add_pollfd (self, fd, connection_cb, TRUE, FALSE);
+ newpollfd->server = TRUE;
- // now we try to auth
- if (fs_msn_authenticate_incoming(self,fd))
- {
- g_message ("Authenticated incoming successfully fd %d", fd);
+ return;
- // success! we need to shutdown/close all other channels
- gint i;
- for (i = 0; i < self->pollfds->len; i++)
+ /* Error */
+ error:
+ g_debug ("Got error from fd %d, closing", fd);
+ // find, shutdown and remove channel from fdlist
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
+ if (pollfd == pollfd2)
{
- FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
- g_message ("closing fd %d", pollfd2->pollfd.fd);
+ g_debug ("closing fd %d", pollfd2->pollfd.fd);
shutdown_fd (self, pollfd2);
i--;
}
- /* TODO callback */
-
- newpollfd = g_slice_new0 (FsMsnPollFD);
- gst_poll_fd_init (&newpollfd->pollfd);
- newpollfd->pollfd.fd = fd;
- newpollfd->want_read = FALSE;
- newpollfd->want_write = FALSE;
- gst_poll_fd_ctl_read (self->poll, &newpollfd->pollfd, FALSE);
- gst_poll_fd_ctl_write (self->poll, &newpollfd->pollfd, FALSE);
- newpollfd->next_step = main_fd_closed_cb;
- g_array_append_val (self->pollfds, newpollfd);
- return;
}
+ return;
+}
+
+
+static void
+successful_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+ gint error;
+ socklen_t option_len;
+
+ g_debug ("handler called on fd %d", pollfd->pollfd.fd);
+
+ errno = 0;
+ if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ {
+ g_debug ("connecton closed or error");
+ goto error;
+ }
+
+ option_len = sizeof(error);
+
+ /* Get the error option */
+ if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
+ {
+ g_warning ("getsockopt() failed");
+ goto error;
+ }
+
+ /* Check if there is an error */
+ if (error)
+ {
+ g_debug ("getsockopt gave an error : %d", error);
+ goto error;
+ }
+
+ pollfd->callback = connection_cb;
+
+ g_debug ("connection succeeded on socket %p", pollfd);
+ return;
+
/* Error */
error:
- g_message ("Got error from fd %d, closing", fd);
+ g_debug ("Got error from fd %d, closing", pollfd->pollfd.fd);
// find, shutdown and remove channel from fdlist
gint i;
for (i = 0; i < self->pollfds->len; i++)
{
- FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
if (pollfd == pollfd2)
{
- g_message ("closing fd %d", pollfd2->pollfd.fd);
+ g_debug ("closing fd %d", pollfd2->pollfd.fd);
shutdown_fd (self, pollfd2);
i--;
}
}
- if (fd > 0)
- close (fd);
-
return;
}
+
static void
-fs_msn_open_listening_port (FsMsnConnection *connection,
- guint16 port)
+connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
{
- FsMsnConnection *self = FS_MSN_CONNECTION (connection);
- gint fd = -1;
- FsMsnPollFD *pollfd = NULL;
- struct sockaddr_in myaddr;
- memset(&myaddr, 0, sizeof(myaddr));
+ gboolean success = FALSE;
- g_message ("Attempting to listen on port %d.....\n",port);
+ g_debug ("handler called on fd %d. %d %d %d %d", pollfd->pollfd.fd,
+ pollfd->server, pollfd->status,
+ gst_poll_fd_can_read (self->poll, &pollfd->pollfd),
+ gst_poll_fd_can_write (self->poll, &pollfd->pollfd));
- if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
+ if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
{
- // show error
- g_message ("could not create socket!");
- return;
+ g_debug ("connecton closed or error");
+ goto error;
}
- // set non-blocking mode
- fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
- myaddr.sin_family = AF_INET;
- myaddr.sin_port = htons (port);
- // bind
- if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
+ if (gst_poll_fd_can_read (self->poll, &pollfd->pollfd))
{
- close (fd);
- return;
- }
+ switch (pollfd->status)
+ {
+ case FS_MSN_STATUS_AUTH:
+ if (pollfd->server)
+ {
+ gchar str[35] = {0};
+ gchar check[35] = {0};
+
+ if (recv(pollfd->pollfd.fd, str, 34, 0) != -1)
+ {
+ g_debug ("Got %s, checking if it's auth", str);
+ sprintf(check, "recipientid=%s&sessionid=%d\r\n\r\n",
+ self->local_recipient_id, self->session_id);
+ if (strcmp (str, check) == 0)
+ {
+ g_debug ("Authentication successful");
+ pollfd->status = FS_MSN_STATUS_CONNECTED;
+ pollfd->want_write = TRUE;
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
+ }
+ else
+ {
+ g_debug ("Authentication failed");
+ goto error;
+ }
+ }
+ else
+ {
+ perror ("auth");
+ goto error;
+ }
+
+ } else {
+ g_debug ("shouldn't receive data when client on AUTH state");
+ goto error;
+ }
+ break;
+ case FS_MSN_STATUS_CONNECTED:
+ if (!pollfd->server)
+ {
+ gchar str[14] = {0};
+
+ if (recv(pollfd->pollfd.fd, str, 13, 0) != -1)
+ {
+ g_debug ("Got %s, checking if it's connected", str);
+ if (strcmp (str, "connected\r\n\r\n") == 0)
+ {
+ g_debug ("connection successful");
+ pollfd->status = FS_MSN_STATUS_CONNECTED2;
+ pollfd->want_write = TRUE;
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
+ }
+ else
+ {
+ g_debug ("connected failed");
+ goto error;
+ }
+ }
+ else
+ {
+ perror ("connected");
+ goto error;
+ }
+ } else {
+ g_debug ("shouldn't receive data when server on CONNECTED state");
+ goto error;
+ }
+ break;
+ case FS_MSN_STATUS_CONNECTED2:
+ if (pollfd->server)
+ {
+ gchar str[14] = {0};
+ if (recv(pollfd->pollfd.fd, str, 13, 0) != -1)
+ {
+ g_debug ("Got %s, checking if it's connected", str);
+ if (strcmp (str, "connected\r\n\r\n") == 0)
+ {
+ g_debug ("connection successful");
+ pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
+ success = TRUE;
+ }
+ else
+ {
+ g_debug ("connected failed");
+ goto error;
+ }
+ }
+ else
+ {
+ perror ("connected");
+ goto error;
+ }
+
+ } else {
+ g_debug ("shouldn't receive data when client on CONNECTED2 state");
+ goto error;
+ }
+ break;
+ default:
+ g_debug ("Invalid status %d", pollfd->status);
+ goto error;
+ break;
- /* Listen */
- if (listen(fd, 3) != 0)
+ }
+ } else if (gst_poll_fd_can_write (self->poll, &pollfd->pollfd))
{
- close (fd);
- return;
- }
- pollfd = g_slice_new0 (FsMsnPollFD);
- gst_poll_fd_init (&pollfd->pollfd);
- pollfd->pollfd.fd = fd;
- pollfd->want_read = TRUE;
- pollfd->want_write = FALSE;
- gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
- gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
- pollfd->next_step = fd_accept_connection_cb;
+ pollfd->want_write = FALSE;
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+ switch (pollfd->status)
+ {
+ case FS_MSN_STATUS_AUTH:
+ if (!pollfd->server)
+ {
+ gchar *str = g_strdup_printf("recipientid=%s&sessionid=%d\r\n\r\n",
+ self->remote_recipient_id, self->session_id);
+ if (send(pollfd->pollfd.fd, str, strlen (str), 0) != -1)
+ {
+ g_debug ("Sent %s", str);
+ pollfd->status = FS_MSN_STATUS_CONNECTED;
+ g_free (str);
+ }
+ else
+ {
+ g_free (str);
+ perror ("auth");
+ goto error;
+ }
- g_array_append_val (self->pollfds, pollfd);
- g_message ("Listening on port %d\n",port);
+ }
+ break;
+ case FS_MSN_STATUS_CONNECTED:
+ if (pollfd->server)
+ {
-}
+ if (send(pollfd->pollfd.fd, "connected\r\n\r\n", 13, 0) != -1)
+ {
+ g_debug ("sent connected");
+ pollfd->status = FS_MSN_STATUS_CONNECTED2;
+ }
+ else
+ {
+ perror ("sending connected");
+ goto error;
+ }
+ } else {
+ g_debug ("shouldn't receive data when server on CONNECTED state");
+ goto error;
+ }
+ break;
+ case FS_MSN_STATUS_CONNECTED2:
+ if (!pollfd->server)
+ {
-// Authenticate ourselves when connecting out
-static gboolean
-fs_msn_authenticate_outgoing (FsMsnConnection *connection, gint fd)
-{
- FsMsnConnection *self = FS_MSN_CONNECTION (connection);
- gchar str[400];
- memset(str, 0, sizeof(str));
- if (fd != 0)
- {
- g_message ("Authenticating connection on %d...", fd);
- g_message ("sending : recipientid=%s&sessionid=%d\r\n\r\n",
- self->remote_recipient_id, self->session_id);
- sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
- self->remote_recipient_id, self->session_id);
- if (send(fd, str, strlen(str), 0) == -1)
- {
- g_message("sending failed");
- perror("auth");
+ if (send(pollfd->pollfd.fd, "connected\r\n\r\n", 13, 0) != -1)
+ {
+ g_debug ("sent connected");
+ pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
+ success = TRUE;
+ }
+ else
+ {
+ perror ("sending connected");
+ goto error;
+ }
+ } else {
+ g_debug ("shouldn't receive data when client on CONNECTED2 state");
+ goto error;
+ }
+ break;
+ default:
+ g_debug ("Invalid status %d", pollfd->status);
+ goto error;
+ break;
}
+ }
- memset(str, 0, sizeof(str));
- if (recv(fd, str, 13, 0) != -1)
+ if (success) {
+ // success! we need to shutdown/close all other channels
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
{
- g_message ("Got %s, checking if it's auth", str);
- // we should get a connected message now
- if (strcmp (str, "connected\r\n\r\n") == 0)
+ FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
+ if (pollfd != pollfd2)
{
- // send our connected message also
- memset(str, 0, sizeof(str));
- sprintf(str, "connected\r\n\r\n");
- send(fd, str, strlen(str), 0);
- g_message ("Authentication successfull");
- return TRUE;
+ g_debug ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
}
}
- else
+
+ /* TODO : callback */
+
+ pollfd->want_read = FALSE;
+ pollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+ }
+
+ return;
+ error:
+ /* Error */
+ g_debug ("Got error from fd %d, closing", pollfd->pollfd.fd);
+ // find, shutdown and remove channel from fdlist
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
+ if (pollfd == pollfd2)
{
- perror("auth");
+ g_debug ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
}
}
- return FALSE;
-}
+ return;
+}
static gpointer
connection_polling_thread (gpointer data)
@@ -609,54 +733,79 @@ connection_polling_thread (gpointer data)
FsMsnConnection *self = data;
gint ret;
GstClockTime timeout;
+ GstPoll * poll;
+ g_static_rec_mutex_lock (&self->mutex);
timeout = self->poll_timeout;
+ poll = self->poll;
+ g_debug ("poll waiting %d", self->pollfds->len);
+ g_static_rec_mutex_unlock (&self->mutex);
- while ((ret = gst_poll_wait (self->poll, timeout)) >= 0)
+ while ((ret = gst_poll_wait (poll, timeout)) >= 0)
{
+ g_debug ("gst_poll_wait returned : %d", ret);
+ g_static_rec_mutex_lock (&self->mutex);
if (ret > 0)
{
gint i;
for (i = 0; i < self->pollfds->len; i++)
{
- FsMsnPollFD *pollfd = &g_array_index(self->pollfds,
- FsMsnPollFD, i);
+ FsMsnPollFD *pollfd = NULL;
- if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
- gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ pollfd = g_array_index(self->pollfds, FsMsnPollFD *, i);
+
+ g_debug ("ret %d - i = %d, len = %d", ret, i, self->pollfds->len);
+
+ g_debug ("%p - error %d, close %d, read %d-%d, write %d-%d",
+ pollfd,
+ gst_poll_fd_has_error (poll, &pollfd->pollfd),
+ gst_poll_fd_has_closed (poll, &pollfd->pollfd),
+ pollfd->want_read,
+ gst_poll_fd_can_read (poll, &pollfd->pollfd),
+ pollfd->want_write,
+ gst_poll_fd_can_write (poll, &pollfd->pollfd));
+
+ if (gst_poll_fd_has_error (poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (poll, &pollfd->pollfd))
{
- pollfd->next_step (self, pollfd);
+ pollfd->callback (self, pollfd);
shutdown_fd (self, pollfd);
i--;
continue;
}
if ((pollfd->want_read &&
- gst_poll_fd_can_read (self->poll, &pollfd->pollfd)) ||
+ gst_poll_fd_can_read (poll, &pollfd->pollfd)) ||
(pollfd->want_write &&
- gst_poll_fd_can_write (self->poll, &pollfd->pollfd)))
- pollfd->next_step (self, pollfd);
- }
+ gst_poll_fd_can_write (poll, &pollfd->pollfd)))
+ {
+ pollfd->callback (self, pollfd);
+ }
+ }
}
timeout = self->poll_timeout;
+ g_static_rec_mutex_unlock (&self->mutex);
}
return NULL;
}
+
static void
shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd)
{
gint i;
+ g_debug ("Shutting down pollfd %p", pollfd);
if (!gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
close (pollfd->pollfd.fd);
- gst_poll_remove_fd (self->poll, &pollfd->pollfd);
+ g_debug ("gst poll remove : %d",
+ gst_poll_remove_fd (self->poll, &pollfd->pollfd));
for (i = 0; i < self->pollfds->len; i++)
{
- FsMsnPollFD *p = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ FsMsnPollFD *p = g_array_index(self->pollfds, FsMsnPollFD *, i);
if (p == pollfd)
{
g_array_remove_index_fast (self->pollfds, i);
@@ -666,3 +815,35 @@ shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd)
}
gst_poll_restart (self->poll);
}
+
+static FsMsnPollFD *
+add_pollfd (FsMsnConnection *self, int fd, PollFdCallback callback,
+ gboolean read, gboolean write)
+{
+ FsMsnPollFD *pollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&pollfd->pollfd);
+ pollfd->pollfd.fd = fd;
+ pollfd->want_read = read;
+ pollfd->want_write = write;
+ pollfd->status = FS_MSN_STATUS_AUTH;
+
+ gst_poll_add_fd (self->poll, &pollfd->pollfd);
+
+ gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, read);
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, write);
+ pollfd->callback = callback;
+
+ g_debug ("ADD_POLLFD %p (%p) - error %d, close %d, read %d-%d, write %d-%d",
+ self->pollfds, pollfd,
+ gst_poll_fd_has_error (self->poll, &pollfd->pollfd),
+ gst_poll_fd_has_closed (self->poll, &pollfd->pollfd),
+ pollfd->want_read,
+ gst_poll_fd_can_read (self->poll, &pollfd->pollfd),
+ pollfd->want_write,
+ gst_poll_fd_can_write (self->poll, &pollfd->pollfd));
+
+ g_array_append_val (self->pollfds, pollfd);
+ gst_poll_restart (self->poll);
+ return pollfd;
+}
+// (gdb) p ((FsMsnPollFD **) ((FsMsnConnection *)data)->pollfds->data)[1]
diff --git a/gst/fsmsnconference/fs-msn-connection.h b/gst/fsmsnconference/fs-msn-connection.h
index 4ee235d..a9cc39c 100644
--- a/gst/fsmsnconference/fs-msn-connection.h
+++ b/gst/fsmsnconference/fs-msn-connection.h
@@ -74,19 +74,23 @@ struct _FsMsnConnection
GstClockTime poll_timeout;
GstPoll *poll;
GArray *pollfds;
-
gboolean disposed;
+ GStaticRecMutex mutex;
};
+typedef void (*NewLocalCandidateCB) (FsCandidate *candidate, gpointer data);
+
GType fs_msn_connection_get_type (void);
FsMsnConnection *fs_msn_connection_new (guint session_id, guint initial_port);
-gboolean fs_msn_connection_gather_local_candidates (FsMsnConnection *connection);
+gboolean fs_msn_connection_gather_local_candidates (FsMsnConnection *connection,
+ NewLocalCandidateCB cb, gpointer data);
gboolean fs_msn_connection_set_remote_candidates (FsMsnConnection *connection,
GList *candidates, GError **error);
+
G_END_DECLS
#endif /* __FS_MSN_CONNECTION_H__ */
diff --git a/gst/fsmsnconference/fs-msn-session.c b/gst/fsmsnconference/fs-msn-session.c
index af5f566..e567c2e 100644
--- a/gst/fsmsnconference/fs-msn-session.c
+++ b/gst/fsmsnconference/fs-msn-session.c
@@ -434,8 +434,8 @@ fs_msn_session_new (FsMediaType media_type,
if (!session)
{
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not create object");
+ *error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not create object");
}
else if (session->priv->construction_error)
{
--
1.5.6.5
More information about the farsight-commits
mailing list