[farsight2/master] Move all connection related stuff into its own object

Youness Alaoui youness.alaoui at collabora.co.uk
Tue Jul 14 09:50:32 PDT 2009


---
 gst/fsmsnconference/Makefile.am         |    2 +
 gst/fsmsnconference/fs-msn-conference.c |   28 +-
 gst/fsmsnconference/fs-msn-connection.c |  668 ++++++++++++++++++++++++++++++
 gst/fsmsnconference/fs-msn-connection.h |   92 +++++
 gst/fsmsnconference/fs-msn-session.c    |   53 +++-
 gst/fsmsnconference/fs-msn-stream.c     |  671 +------------------------------
 6 files changed, 826 insertions(+), 688 deletions(-)
 create mode 100644 gst/fsmsnconference/fs-msn-connection.c
 create mode 100644 gst/fsmsnconference/fs-msn-connection.h

diff --git a/gst/fsmsnconference/Makefile.am b/gst/fsmsnconference/Makefile.am
index 7579f4d..5aca5db 100644
--- a/gst/fsmsnconference/Makefile.am
+++ b/gst/fsmsnconference/Makefile.am
@@ -4,6 +4,7 @@ libfsmsnconference_la_SOURCES = \
 	fs-msn-conference.c \
 	fs-msn-participant.c \
 	fs-msn-session.c \
+	fs-msn-connection.c \
 	fs-msn-stream.c 
 
 BUILT_SOURCES =  
@@ -12,6 +13,7 @@ noinst_HEADERS = \
 	fs-msn-conference.h \
 	fs-msn-participant.h \
 	fs-msn-session.h \
+	fs-msn-connection.h  \
 	fs-msn-stream.h 
 
 EXTRA_libfsmsnconference_la_SOURCES = 
diff --git a/gst/fsmsnconference/fs-msn-conference.c b/gst/fsmsnconference/fs-msn-conference.c
index 2a7d45d..d3a3c27 100644
--- a/gst/fsmsnconference/fs-msn-conference.c
+++ b/gst/fsmsnconference/fs-msn-conference.c
@@ -171,7 +171,6 @@ static void
 fs_msn_conference_class_init (FsMsnConferenceClass * klass)
 {
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
-  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
   FsBaseConferenceClass *baseconf_class = FS_BASE_CONFERENCE_CLASS (klass);
 
   g_type_class_add_private (klass, sizeof (FsMsnConferencePrivate));
@@ -189,14 +188,6 @@ fs_msn_conference_class_init (FsMsnConferenceClass * klass)
     GST_DEBUG_FUNCPTR (fs_msn_conference_set_property);
   gobject_class->get_property =
     GST_DEBUG_FUNCPTR (fs_msn_conference_get_property);
-
-  gst_element_class_set_details (gstelement_class, &fs_msn_conference_details);
-
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&fs_msn_conference_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&fs_msn_conference_src_template));
-
   g_object_class_install_property (gobject_class,PROP_LOCAL_MSNADD,
       g_param_spec_string ("local_address", "Msn Address",
           "The local contact address for the MSN sessions",
@@ -206,6 +197,14 @@ fs_msn_conference_class_init (FsMsnConferenceClass * klass)
 static void
 fs_msn_conference_base_init (gpointer g_class)
 {
+  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&fs_msn_conference_sink_template));
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&fs_msn_conference_src_template));
+
+  gst_element_class_set_details (gstelement_class, &fs_msn_conference_details);
 }
 
 static void
@@ -229,7 +228,7 @@ fs_msn_conference_get_property (GObject *object,
   {
     case PROP_LOCAL_MSNADD:
       GST_OBJECT_LOCK (self);
-      g_value_set_string (value,self->priv->local_address);
+      g_value_set_string (value, self->priv->local_address);
       GST_OBJECT_UNLOCK (self);
       break;
     default:
@@ -296,6 +295,7 @@ fs_msn_conference_new_session (FsBaseConference *conf,
   {
     g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
         "Only video supported for msn webcam");
+    return NULL;
   }
 
   GST_OBJECT_LOCK (self);
@@ -311,13 +311,13 @@ fs_msn_conference_new_session (FsBaseConference *conf,
 
   new_session = fs_msn_session_new (media_type, self, 1, error);
 
-  GST_OBJECT_LOCK (self);
   if (new_session)
   {
+    GST_OBJECT_LOCK (self);
     self->priv->session = new_session;
     g_object_weak_ref (G_OBJECT (new_session), _remove_session, self);
+    GST_OBJECT_UNLOCK (self);
   }
-  GST_OBJECT_UNLOCK (self);
 
   return FS_SESSION (new_session);
 }
@@ -344,13 +344,13 @@ fs_msn_conference_new_participant (FsBaseConference *conf,
 
   new_participant = fs_msn_participant_new (cname);
 
-  GST_OBJECT_LOCK (self);
   if (new_participant)
   {
+    GST_OBJECT_LOCK (self);
     self->priv->participant = new_participant;
     g_object_weak_ref (G_OBJECT (new_participant), _remove_participant, self);
+    GST_OBJECT_UNLOCK (self);
   }
-  GST_OBJECT_UNLOCK (self);
 
   return FS_PARTICIPANT (new_participant);
 
diff --git a/gst/fsmsnconference/fs-msn-connection.c b/gst/fsmsnconference/fs-msn-connection.c
new file mode 100644
index 0000000..ffb43b2
--- /dev/null
+++ b/gst/fsmsnconference/fs-msn-connection.c
@@ -0,0 +1,668 @@
+/*
+ * Farsight2 - Farsight MSN Connection
+ *
+ * Copyright 2008 Richard Spiers <richard.spiers at gmail.com>
+ * Copyright 2007 Nokia Corp.
+ * Copyright 2007 Collabora Ltd.
+ *  @author: Olivier Crete <olivier.crete at collabora.co.uk>
+ *
+ * fs-msn-connection.c - A MSN Connection gobject
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "fs-msn-connection.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+
+#include <string.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <gst/gst.h>
+
+/* Signals */
+enum
+{
+  LAST_SIGNAL
+};
+
+/* props */
+enum
+{
+  PROP_0,
+};
+
+
+typedef struct _FsMsnPollFD FsMsnPollFD;
+
+struct _FsMsnPollFD {
+  GstPollFD pollfd;
+  gboolean want_read;
+  gboolean want_write;
+  void (*next_step) (FsMsnConnection *self, FsMsnPollFD *pollfd);
+};
+
+
+G_DEFINE_TYPE(FsMsnConnection, fs_msn_connection, G_TYPE_OBJECT);
+
+static void fs_msn_connection_dispose (GObject *object);
+static void fs_msn_connection_finalize (GObject *object);
+
+static void main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+
+static void successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+
+static void fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+
+static gboolean fs_msn_connection_attempt_connection (FsMsnConnection *connection,
+    gchar const *ip,
+    guint16 port);
+
+static gboolean fs_msn_authenticate_outgoing (FsMsnConnection *connection,
+    gint fd);
+
+static void fs_msn_open_listening_port (FsMsnConnection *connection,
+    guint16 port);
+
+static gpointer connection_polling_thread (gpointer data);
+
+static void shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd);
+
+static GObjectClass *parent_class = NULL;
+
+static void
+fs_msn_connection_class_init (FsMsnConnectionClass *klass)
+{
+  GObjectClass *gobject_class = (GObjectClass *) klass;
+
+  parent_class = g_type_class_peek_parent (klass);
+
+  gobject_class->dispose = fs_msn_connection_dispose;
+  gobject_class->finalize = fs_msn_connection_finalize;
+}
+
+static void
+fs_msn_connection_init (FsMsnConnection *self)
+{
+  /* member init */
+
+  self->disposed = FALSE;
+
+  self->poll_timeout = GST_CLOCK_TIME_NONE;
+  self->poll = gst_poll_new (TRUE);
+  gst_poll_set_controllable (self->poll, TRUE);
+  self->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
+}
+
+static void
+fs_msn_connection_dispose (GObject *object)
+{
+  FsMsnConnection *self = FS_MSN_CONNECTION (object);
+
+  /* If dispose did already run, return. */
+  if (self->disposed)
+    return;
+
+  if (self->polling_thread)
+  {
+    gst_poll_set_flushing (self->poll, TRUE);
+    g_thread_join (self->polling_thread);
+    self->polling_thread = NULL;
+  }
+
+  if (self->local_recipient_id)
+    g_free (self->local_recipient_id);
+  if (self->remote_recipient_id)
+    g_free (self->remote_recipient_id);
+
+  /* Make sure dispose does not run twice. */
+  self->disposed = TRUE;
+
+  parent_class->dispose (object);
+}
+
+static void
+fs_msn_connection_finalize (GObject *object)
+{
+  FsMsnConnection *self = FS_MSN_CONNECTION (object);
+  guint i;
+
+  /* TODO : why not in dispose */
+  gst_poll_free (self->poll);
+
+  for (i = 0; i < self->pollfds->len; i++)
+    close (g_array_index(self->pollfds, FsMsnPollFD, i).pollfd.fd);
+  g_array_free (self->pollfds, TRUE);
+
+  parent_class->finalize (object);
+}
+
+/**
+ * fs_msn_connection_new:
+ * @session: The #FsMsnSession this connection is a child of
+ * @participant: The #FsMsnParticipant this connection is for
+ * @direction: the initial #FsDirection for this connection
+ *
+ *
+ * This function create a new connection
+ *
+ * Returns: the newly created string or NULL on error
+ */
+
+FsMsnConnection *
+fs_msn_connection_new (guint session_id, guint initial_port)
+{
+  FsMsnConnection *self = g_object_new (FS_TYPE_MSN_CONNECTION, NULL);
+
+  if (self) {
+    self->session_id = session_id;
+    self->initial_port = initial_port;
+  }
+
+  return self;
+}
+
+gboolean
+fs_msn_connection_gather_local_candidates (FsMsnConnection *self)
+{
+
+  fs_msn_open_listening_port (self, self->initial_port);
+
+  self->polling_thread = g_thread_create (connection_polling_thread,
+      self, TRUE, NULL);
+
+  return self->polling_thread != NULL;
+}
+
+/**
+ * fs_msn_connection_set_remote_candidate:
+ */
+gboolean
+fs_msn_connection_set_remote_candidates (FsMsnConnection *self,
+    GList *candidates, GError **error)
+{
+  GList *item = NULL;
+
+  for (item = candidates; item; item = g_list_next (item))
+  {
+    FsCandidate *candidate = item->data;
+
+    if (!candidate->ip || !candidate->port)
+    {
+      g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+          "The candidate passed does not contain a valid ip or port");
+      return FALSE;
+    }
+    if (!candidate->foundation)
+    {
+      g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+          "The candidate passed does not have a foundation (MSN recipient ID)");
+      return FALSE;
+    }
+    if (self->remote_recipient_id) {
+      if (g_strcmp0 (candidate->foundation, self->remote_recipient_id) != 0)
+      {
+        g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+            "The candidates do not have the same recipient ID");
+        return FALSE;
+      }
+    } else {
+      self->remote_recipient_id = g_strdup (candidate->foundation);
+    }
+    fs_msn_connection_attempt_connection(self, candidate->ip, candidate->port);
+  }
+
+  return TRUE;
+}
+
+
+static void
+main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+  g_message ("disconnection on video feed");
+  /* IXME - How to handle the disconnection of the connection
+     Destroy the elements involved?
+     Set the state to Null ?
+  */
+}
+
+static void
+successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+  gint error;
+  socklen_t option_len;
+
+  g_message ("handler called on fd %d", pollfd->pollfd.fd);
+
+  errno = 0;
+  if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+      gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+  {
+    g_message ("connecton closed or error");
+    goto error;
+  }
+
+  option_len = sizeof(error);
+
+  /* Get the error option */
+  if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
+  {
+    g_warning ("getsockopt() failed");
+    goto error;
+  }
+
+  /* Check if there is an error */
+  if (error)
+  {
+    g_message ("getsockopt gave an error : %d", error);
+    goto error;
+  }
+
+  /* Remove NON BLOCKING MODE */
+  if (fcntl(pollfd->pollfd.fd, F_SETFL,
+          fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
+  {
+    g_warning ("fcntl() failed");
+    goto error;
+  }
+
+  g_message ("Got connection on fd %d", pollfd->pollfd.fd);
+
+  if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
+  {
+    g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
+
+    // success! we need to shutdown/close all other channels
+    gint i;
+    for (i = 0; i < self->pollfds->len; i++)
+    {
+      FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+      if (pollfd != pollfd2)
+      {
+        g_message ("closing fd %d", pollfd2->pollfd.fd);
+        shutdown_fd (self, pollfd2);
+        i--;
+      }
+    }
+
+    /* TODO : callback */
+
+    pollfd->want_read = FALSE;
+    pollfd->want_write = FALSE;
+    gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
+    gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+    pollfd->next_step = main_fd_closed_cb;
+    return;
+  }
+  else
+  {
+    g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
+  }
+
+  /* Error */
+ error:
+  g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
+  // find, shutdown and remove channel from fdlist
+  gint i;
+  for (i = 0; i < self->pollfds->len; i++)
+  {
+    FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+    if (pollfd == pollfd2)
+    {
+      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
+    }
+  }
+
+  return;
+}
+
+static gboolean
+fs_msn_connection_attempt_connection (FsMsnConnection *connection,
+    const gchar *ip,
+    guint16 port)
+{
+  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+  FsMsnPollFD *pollfd = NULL;
+  gint fd = -1;
+  struct sockaddr_in theiraddr;
+  memset(&theiraddr, 0, sizeof(theiraddr));
+
+  if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
+  {
+    // show error
+    g_message ("could not create socket!");
+    return FALSE;
+  }
+
+  // set non-blocking mode
+  fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
+
+  theiraddr.sin_family = AF_INET;
+  theiraddr.sin_addr.s_addr = inet_addr (ip);
+  theiraddr.sin_port = htons (port);
+
+  g_message ("Attempting connection to %s %d on socket %d", ip, port, fd);
+  // this is non blocking, the return value isn't too usefull
+  gint ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
+  if (ret < 0)
+  {
+    if (errno != EINPROGRESS)
+    {
+      close (fd);
+      return FALSE;
+    }
+  }
+  g_message("ret %d %d %s", ret, errno, strerror(errno));
+
+  pollfd = g_slice_new0 (FsMsnPollFD);
+  gst_poll_fd_init (&pollfd->pollfd);
+  pollfd->pollfd.fd = fd;
+  pollfd->want_read = TRUE;
+  pollfd->want_write = TRUE;
+  gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
+  gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
+  pollfd->next_step = successfull_connection_cb;
+  g_array_append_val (self->pollfds, pollfd);
+
+  return TRUE;
+}
+
+static gboolean
+fs_msn_authenticate_incoming (FsMsnConnection *connection, gint fd)
+{
+  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+  if (fd != 0)
+    {
+      gchar str[400];
+      gchar check[400];
+
+      memset(str, 0, sizeof(str));
+      if (recv(fd, str, sizeof(str), 0) != -1)
+        {
+          g_message ("Got %s, checking if it's auth", str);
+          sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
+                  self->local_recipient_id, self->session_id);
+          if (strcmp (str, check) != 0)
+            {
+              // send our connected message also
+              memset(str, 0, sizeof(str));
+              sprintf(str, "connected\r\n\r\n");
+              send(fd, str, strlen(str), 0);
+
+              // now we get connected
+              memset(str, 0, sizeof(str));
+              if (recv(fd, str, sizeof(str), 0) != -1)
+                {
+                  if (strcmp (str, "connected\r\n\r\n") == 0)
+                    {
+                      g_message ("Authentication successfull");
+                      return TRUE;
+                    }
+                }
+            }
+        }
+      else
+        {
+          perror("auth");
+        }
+    }
+  return FALSE;
+}
+
+static void
+fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+  struct sockaddr_in in;
+  int fd = -1;
+  socklen_t n = sizeof (in);
+  FsMsnPollFD *newpollfd = NULL;
+
+  if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+      gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+  {
+    g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
+    goto error;
+  }
+
+  if ((fd = accept(pollfd->pollfd.fd,
+              (struct sockaddr*) &in, &n)) == -1)
+  {
+    g_message ("Error while running accept() %d", errno);
+    return;
+  }
+
+  /* Remove NON BLOCKING MODE */
+  if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
+  {
+    g_warning ("fcntl() failed");
+    goto error;
+  }
+
+  // now we try to auth
+  if (fs_msn_authenticate_incoming(self,fd))
+  {
+    g_message ("Authenticated incoming successfully fd %d", fd);
+
+    // success! we need to shutdown/close all other channels
+    gint i;
+    for (i = 0; i < self->pollfds->len; i++)
+    {
+      FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
+    }
+    /* TODO callback */
+
+    newpollfd = g_slice_new0 (FsMsnPollFD);
+    gst_poll_fd_init (&newpollfd->pollfd);
+    newpollfd->pollfd.fd = fd;
+    newpollfd->want_read = FALSE;
+    newpollfd->want_write = FALSE;
+    gst_poll_fd_ctl_read (self->poll, &newpollfd->pollfd, FALSE);
+    gst_poll_fd_ctl_write (self->poll, &newpollfd->pollfd, FALSE);
+    newpollfd->next_step = main_fd_closed_cb;
+    g_array_append_val (self->pollfds, newpollfd);
+    return;
+  }
+
+  /* Error */
+ error:
+  g_message ("Got error from fd %d, closing", fd);
+  // find, shutdown and remove channel from fdlist
+  gint i;
+  for (i = 0; i < self->pollfds->len; i++)
+  {
+    FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+    if (pollfd == pollfd2)
+    {
+      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
+    }
+  }
+
+  if (fd > 0)
+    close (fd);
+
+  return;
+}
+
+static void
+fs_msn_open_listening_port (FsMsnConnection *connection,
+                            guint16 port)
+{
+  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+  gint fd = -1;
+  FsMsnPollFD *pollfd = NULL;
+  struct sockaddr_in myaddr;
+  memset(&myaddr, 0, sizeof(myaddr));
+
+  g_message ("Attempting to listen on port %d.....\n",port);
+
+  if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
+  {
+    // show error
+    g_message ("could not create socket!");
+    return;
+  }
+
+  // set non-blocking mode
+  fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
+  myaddr.sin_family = AF_INET;
+  myaddr.sin_port = htons (port);
+  // bind
+  if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
+  {
+    close (fd);
+    return;
+  }
+
+
+  /* Listen */
+  if (listen(fd, 3) != 0)
+  {
+    close (fd);
+    return;
+  }
+  pollfd = g_slice_new0 (FsMsnPollFD);
+  gst_poll_fd_init (&pollfd->pollfd);
+  pollfd->pollfd.fd = fd;
+  pollfd->want_read = TRUE;
+  pollfd->want_write = FALSE;
+  gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
+  gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+  pollfd->next_step = fd_accept_connection_cb;
+
+  g_array_append_val (self->pollfds, pollfd);
+  g_message ("Listening on port %d\n",port);
+
+}
+
+// Authenticate ourselves when connecting out
+static gboolean
+fs_msn_authenticate_outgoing (FsMsnConnection *connection, gint fd)
+{
+  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+  gchar str[400];
+  memset(str, 0, sizeof(str));
+  if (fd != 0)
+  {
+    g_message ("Authenticating connection on %d...", fd);
+    g_message ("sending : recipientid=%s&sessionid=%d\r\n\r\n",
+        self->remote_recipient_id, self->session_id);
+    sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
+        self->remote_recipient_id, self->session_id);
+    if (send(fd, str, strlen(str), 0) == -1)
+    {
+      g_message("sending failed");
+      perror("auth");
+    }
+
+    memset(str, 0, sizeof(str));
+    if (recv(fd, str, 13, 0) != -1)
+    {
+      g_message ("Got %s, checking if it's auth", str);
+      // we should get a connected message now
+      if (strcmp (str, "connected\r\n\r\n") == 0)
+      {
+        // send our connected message also
+        memset(str, 0, sizeof(str));
+        sprintf(str, "connected\r\n\r\n");
+        send(fd, str, strlen(str), 0);
+        g_message ("Authentication successfull");
+        return TRUE;
+      }
+    }
+    else
+    {
+      perror("auth");
+    }
+  }
+  return FALSE;
+}
+
+
+static gpointer
+connection_polling_thread (gpointer data)
+{
+  FsMsnConnection *self = data;
+  gint ret;
+  GstClockTime timeout;
+
+  timeout = self->poll_timeout;
+
+  while ((ret = gst_poll_wait (self->poll, timeout)) >= 0)
+  {
+    if (ret > 0)
+    {
+      gint i;
+
+      for (i = 0; i < self->pollfds->len; i++)
+      {
+        FsMsnPollFD *pollfd = &g_array_index(self->pollfds,
+            FsMsnPollFD, i);
+
+        if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+            gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+        {
+          pollfd->next_step (self, pollfd);
+          shutdown_fd (self, pollfd);
+          i--;
+          continue;
+        }
+        if ((pollfd->want_read &&
+                gst_poll_fd_can_read (self->poll, &pollfd->pollfd)) ||
+            (pollfd->want_write &&
+                gst_poll_fd_can_write (self->poll, &pollfd->pollfd)))
+          pollfd->next_step (self, pollfd);
+      }
+
+    }
+    timeout = self->poll_timeout;
+  }
+
+  return NULL;
+}
+
+static void
+shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+  gint i;
+
+
+  if (!gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+    close (pollfd->pollfd.fd);
+  gst_poll_remove_fd (self->poll, &pollfd->pollfd);
+  for (i = 0; i < self->pollfds->len; i++)
+  {
+    FsMsnPollFD *p = &g_array_index(self->pollfds, FsMsnPollFD, i);
+    if (p == pollfd)
+    {
+      g_array_remove_index_fast (self->pollfds, i);
+      break;
+    }
+
+  }
+  gst_poll_restart (self->poll);
+}
diff --git a/gst/fsmsnconference/fs-msn-connection.h b/gst/fsmsnconference/fs-msn-connection.h
new file mode 100644
index 0000000..4ee235d
--- /dev/null
+++ b/gst/fsmsnconference/fs-msn-connection.h
@@ -0,0 +1,92 @@
+/*
+ * Farsight2 - Farsight MSN Stream
+ *
+ * Copyright 2008 Richard Spiers <richard.spiers at gmail.com>
+ * Copyright 2007 Nokia Corp.
+ * Copyright 2007 Collabora Ltd.
+ *  @author: Olivier Crete <olivier.crete at collabora.co.uk>
+ *  @author: Youness Alaoui <youness.alaoui at collabora.co.uk>
+ *
+ * fs-msn-connection.h - An MSN Connection class
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+#ifndef __FS_MSN_CONNECTION_H__
+#define __FS_MSN_CONNECTION_H__
+
+#include "fs-msn-participant.h"
+#include "fs-msn-session.h"
+
+G_BEGIN_DECLS
+
+/* TYPE MACROS */
+#define FS_TYPE_MSN_CONNECTION \
+  (fs_msn_connection_get_type ())
+#define FS_MSN_CONNECTION(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), FS_TYPE_MSN_CONNECTION, FsMsnConnection))
+#define FS_MSN_CONNECTION_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), FS_TYPE_MSN_CONNECTION, FsMsnConnectionClass))
+#define FS_IS_MSN_CONNECTION(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), FS_TYPE_MSN_CONNECTION))
+#define FS_IS_MSN_CONNECTION_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), FS_TYPE_MSN_CONNECTION))
+#define FS_MSN_CONNECTION_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj), FS_TYPE_MSN_CONNECTION, FsMsnConnectionClass))
+#define FS_MSN_CONNECTION_CAST(obj) ((FsMsnConnection*) (obj))
+
+typedef struct _FsMsnConnection FsMsnConnection;
+typedef struct _FsMsnConnectionClass FsMsnConnectionClass;
+typedef struct _FsMsnConnectionPrivate FsMsnConnectionPrivate;
+
+
+struct _FsMsnConnectionClass
+{
+  GObjectClass parent_class;
+};
+
+/**
+ * FsMsnConnection:
+ *
+ */
+struct _FsMsnConnection
+{
+  GObject parent;
+
+  gchar *local_recipient_id;
+  gchar *remote_recipient_id;
+  gint session_id;
+  gint initial_port;
+
+  GThread *polling_thread;
+  GstClockTime poll_timeout;
+  GstPoll *poll;
+  GArray *pollfds;
+
+  gboolean disposed;
+};
+
+GType fs_msn_connection_get_type (void);
+
+FsMsnConnection *fs_msn_connection_new (guint session_id, guint initial_port);
+
+gboolean fs_msn_connection_gather_local_candidates (FsMsnConnection *connection);
+
+gboolean fs_msn_connection_set_remote_candidates (FsMsnConnection *connection,
+    GList *candidates, GError **error);
+
+G_END_DECLS
+
+#endif /* __FS_MSN_CONNECTION_H__ */
diff --git a/gst/fsmsnconference/fs-msn-session.c b/gst/fsmsnconference/fs-msn-session.c
index eb790eb..2d4ddbe 100644
--- a/gst/fsmsnconference/fs-msn-session.c
+++ b/gst/fsmsnconference/fs-msn-session.c
@@ -56,7 +56,12 @@ enum
   PROP_0,
   PROP_MEDIA_TYPE,
   PROP_ID,
+  PROP_SINK_PAD,
   PROP_CODEC_PREFERENCES,
+  PROP_CODECS,
+  PROP_CODECS_WITHOUT_CONFIG,
+  PROP_CURRENT_SEND_CODEC,
+  PROP_CODECS_READY,
   PROP_CONFERENCE
 };
 
@@ -129,6 +134,19 @@ fs_msn_session_class_init (FsMsnSessionClass *klass)
       PROP_MEDIA_TYPE, "media-type");
   g_object_class_override_property (gobject_class,
       PROP_ID, "id");
+  g_object_class_override_property (gobject_class,
+      PROP_SINK_PAD, "sink-pad");
+
+  g_object_class_override_property (gobject_class,
+    PROP_CODEC_PREFERENCES, "codec-preferences");
+  g_object_class_override_property (gobject_class,
+    PROP_CODECS, "codecs");
+  g_object_class_override_property (gobject_class,
+    PROP_CODECS_WITHOUT_CONFIG, "codecs-without-config");
+  g_object_class_override_property (gobject_class,
+    PROP_CURRENT_SEND_CODEC, "current-send-codec");
+  g_object_class_override_property (gobject_class,
+    PROP_CODECS_READY, "codecs-ready");
 
   g_object_class_install_property (gobject_class,
       PROP_CONFERENCE,
@@ -223,6 +241,30 @@ fs_msn_session_get_property (GObject *object,
     case PROP_CONFERENCE:
       g_value_set_object (value, self->priv->conference);
       break;
+    case PROP_SINK_PAD:
+      g_value_set_object (value, self->priv->media_sink_pad);
+      break;
+    case PROP_CODECS_READY:
+      g_value_set_boolean (value, TRUE);
+      break;
+    case PROP_CODEC_PREFERENCES:
+    case PROP_CODECS:
+    case PROP_CODECS_WITHOUT_CONFIG:
+      {
+        GList *codecs = NULL;
+        FsCodec *mimic_codec = fs_codec_new (FS_CODEC_ID_ANY, "mimic",
+          FS_MEDIA_TYPE_VIDEO, 0);
+        codecs = g_list_append (codecs, mimic_codec);
+        g_value_take_boxed (value, codecs);
+      }
+      break;
+    case PROP_CURRENT_SEND_CODEC:
+      {
+        FsCodec *send_codec = fs_codec_new (FS_CODEC_ID_ANY, "mimic",
+            FS_MEDIA_TYPE_VIDEO, 0);
+        g_value_take_boxed (value, send_codec);
+        break;
+      }
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -366,11 +408,14 @@ fs_msn_session_new_stream (FsSession *session,
   new_stream = FS_STREAM_CAST (fs_msn_stream_new (self, msnparticipant,
           direction,self->priv->conference,error));
 
-  g_object_weak_ref (G_OBJECT (new_stream), _remove_stream, self);
+  if (new_stream)
+  {
+    GST_OBJECT_LOCK (self);
+    self->priv->stream = (FsMsnStream *) new_stream;
+    g_object_weak_ref (G_OBJECT (new_stream), _remove_stream, self);
+    GST_OBJECT_UNLOCK (self);
+  }
 
-  FS_MSN_SESSION_LOCK (self);
-  self->priv->stream = (FsMsnStream *) new_stream;
-  FS_MSN_SESSION_UNLOCK (self);
 
   return new_stream;
 }
diff --git a/gst/fsmsnconference/fs-msn-stream.c b/gst/fsmsnconference/fs-msn-stream.c
index 970c293..a4ed6d0 100644
--- a/gst/fsmsnconference/fs-msn-stream.c
+++ b/gst/fsmsnconference/fs-msn-stream.c
@@ -56,26 +56,10 @@ enum
   PROP_DIRECTION,
   PROP_PARTICIPANT,
   PROP_SESSION,
-  PROP_SINK_PAD,
-  PROP_SRC_PAD,
-  PROP_CONFERENCE,
-  PROP_L_RID,
-  PROP_L_SID,
-  PROP_R_RID,
-  PROP_R_SID,
-  PROP_PORT
+  PROP_CONFERENCE
 };
 
 
-typedef struct _FsMsnPollFD FsMsnPollFD;
-
-struct _FsMsnPollFD {
-  GstPollFD pollfd;
-  gboolean want_read;
-  gboolean want_write;
-  void (*next_step) (FsMsnStream *self, FsMsnPollFD *pollfd);
-};
-
 
 struct _FsMsnStreamPrivate
 {
@@ -85,14 +69,6 @@ struct _FsMsnStreamPrivate
   FsMsnConference *conference;
   GstElement *media_fd_src,*media_fd_sink,*send_valve;
   GstPad *sink_pad,*src_pad;
-  gint local_recipientid, local_sessionid;
-  gint remote_recipientid, remote_sessionid;
-  gint port;
-
-  GThread *polling_thread;
-  GstClockTime poll_timeout;
-  GstPoll *poll;
-  GArray *pollfds;
 
   GError *construction_error;
 
@@ -127,26 +103,6 @@ static gboolean fs_msn_stream_set_remote_candidate  (FsMsnStream *stream,
     FsCandidate *candidate,
     GError **error);
 
-static void main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *fd);
-
-static void successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
-
-static void fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
-
-static gboolean fs_msn_stream_attempt_connection (FsMsnStream *stream,
-    gchar const *ip,
-    guint16 port);
-
-static gboolean fs_msn_authenticate_outgoing (FsMsnStream *stream,
-    gint fd);
-
-static void fs_msn_open_listening_port (FsMsnStream *stream,
-    guint16 port);
-
-static gpointer connection_polling_thread (gpointer data);
-
-static void shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd);
-
 static GObjectClass *parent_class = NULL;
 
 static void
@@ -181,67 +137,12 @@ fs_msn_stream_class_init (FsMsnStreamClass *klass)
       "session");
 
   g_object_class_install_property (gobject_class,
-      PROP_SINK_PAD,
-      g_param_spec_object ("sink-pad",
-          "A gstreamer sink pad for this stream",
-          "A pad used for sending data on this stream",
-          GST_TYPE_PAD,
-          G_PARAM_READABLE));
-
-  g_object_class_install_property (gobject_class,
-      PROP_SRC_PAD,
-      g_param_spec_object ("src-pad",
-          "A gstreamer src pad for this stream",
-          "A pad used for reading data from this stream",
-          GST_TYPE_PAD,
-          G_PARAM_READABLE));
-
-  g_object_class_install_property (gobject_class,
       PROP_CONFERENCE,
       g_param_spec_object ("conference",
           "The Conference this stream refers to",
           "This is a conveniance pointer for the Conference",
           FS_TYPE_MSN_CONFERENCE,
           G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE));
-
-  g_object_class_install_property (gobject_class,
-      PROP_L_RID,
-      g_param_spec_uint ("local-recipientid",
-          "The local recipientid used for this stream",
-          "The session ID used for this stream",
-          0, G_MAXUINT, 0,
-          G_PARAM_READWRITE));
-
-  g_object_class_install_property (gobject_class,
-      PROP_L_SID,
-      g_param_spec_uint ("local-sessionid",
-          "The local sessionid used for this stream",
-          "The session ID used for this stream",
-          0, G_MAXUINT, 0,
-          G_PARAM_READWRITE));
-
-  g_object_class_install_property (gobject_class,
-      PROP_R_RID,
-      g_param_spec_uint ("remote-recipientid",
-          "The remote recipientid used for this stream",
-          "The session ID used for this stream",
-          0, G_MAXUINT, 0,
-          G_PARAM_READWRITE));
-
-  g_object_class_install_property (gobject_class,
-      PROP_R_SID,
-      g_param_spec_uint ("remote-sessionid",
-          "The remote sessionid used for this stream",
-          "The session ID used for this stream",
-          0, G_MAXUINT, 0,
-          G_PARAM_READWRITE));
-  g_object_class_install_property (gobject_class,
-      PROP_PORT,
-      g_param_spec_uint ("local-port",
-          "The local port used for this stream",
-          "The local port used for this stream",
-          0, G_MAXUINT, 0,
-          G_PARAM_READWRITE));
 }
 
 static void
@@ -256,10 +157,6 @@ fs_msn_stream_init (FsMsnStream *self)
 
   self->priv->direction = FS_DIRECTION_NONE;
 
-  self->priv->poll_timeout = GST_CLOCK_TIME_NONE;
-  self->priv->poll = gst_poll_new (TRUE);
-  gst_poll_set_controllable (self->priv->poll, TRUE);
-  self->priv->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
 }
 
 static void
@@ -273,13 +170,6 @@ fs_msn_stream_dispose (GObject *object)
     return;
   }
 
-  if (self->priv->polling_thread)
-  {
-    gst_poll_set_flushing (self->priv->poll, TRUE);
-    g_thread_join (self->priv->polling_thread);
-    self->priv->polling_thread = NULL;
-  }
-
   if (self->priv->participant)
   {
     g_object_unref (self->priv->participant);
@@ -301,15 +191,6 @@ fs_msn_stream_dispose (GObject *object)
 static void
 fs_msn_stream_finalize (GObject *object)
 {
-  FsMsnStream *self = FS_MSN_STREAM (object);
-  guint i;
-
-  gst_poll_free (self->priv->poll);
-
-  for (i = 0; i < self->priv->pollfds->len; i++)
-    close (g_array_index(self->priv->pollfds, FsMsnPollFD, i).pollfd.fd);
-  g_array_free (self->priv->pollfds, TRUE);
-
   parent_class->finalize (object);
 }
 
@@ -333,30 +214,9 @@ fs_msn_stream_get_property (GObject *object,
     case PROP_DIRECTION:
       g_value_set_flags (value, self->priv->direction);
       break;
-    case PROP_SINK_PAD:
-      g_value_set_object (value, self->priv->sink_pad);
-      break;
-    case PROP_SRC_PAD:
-      g_value_set_object (value, self->priv->src_pad);
-      break;
     case PROP_CONFERENCE:
       g_value_set_object (value, self->priv->conference);
       break;
-    case PROP_L_RID:
-      g_value_set_uint (value, self->priv->local_recipientid);
-      break;
-    case PROP_L_SID:
-      g_value_set_uint (value, self->priv->local_sessionid);
-      break;
-    case PROP_R_RID:
-      g_value_set_uint (value, self->priv->remote_recipientid);
-      break;
-    case PROP_R_SID:
-      g_value_set_uint (value, self->priv->remote_sessionid);
-      break;
-    case PROP_PORT:
-      g_value_set_uint (value, self->priv->port);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -385,21 +245,6 @@ fs_msn_stream_set_property (GObject *object,
     case PROP_CONFERENCE:
       self->priv->conference = FS_MSN_CONFERENCE (g_value_dup_object (value));
       break;
-    case PROP_L_RID:
-      self->priv->local_recipientid = g_value_get_uint (value);
-      break;
-    case PROP_L_SID:
-      self->priv->local_sessionid = g_value_get_uint (value);
-      break;
-    case PROP_R_RID:
-      self->priv->remote_recipientid = g_value_get_uint (value);
-      break;
-    case PROP_R_SID:
-      self->priv->remote_sessionid = g_value_get_uint (value);
-      break;
-    case PROP_PORT:
-      self->priv->port = g_value_get_uint (value);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -672,9 +517,6 @@ fs_msn_stream_constructed (GObject *object)
         "Direction must be sending OR receiving");
   }
 
-  self->priv->polling_thread = g_thread_create (connection_polling_thread,
-      self, TRUE, &self->priv->construction_error);
-
   GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
 }
 
@@ -710,457 +552,14 @@ fs_msn_stream_set_remote_candidates (FsStream *stream, GList *candidates,
   return TRUE;
 }
 
-static void
-main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
-  g_message ("disconnection on video feed");
-  /* IXME - How to handle the disconnection of the stream
-     Destroy the elements involved?
-     Set the state to Null ?
-  */
-}
-
-static void
-successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
-  gint error;
-  socklen_t option_len;
-
-  g_message ("handler called on fd %d", pollfd->pollfd.fd);
-
-  errno = 0;
-  if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
-      gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
-  {
-    g_message ("connecton closed or error");
-    goto error;
-  }
-
-  option_len = sizeof(error);
-
-  /* Get the error option */
-  if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
-  {
-    g_warning ("getsockopt() failed");
-    goto error;
-  }
-
-  /* Check if there is an error */
-  if (error)
-  {
-    g_message ("getsockopt gave an error : %d", error);
-    goto error;
-  }
-
-  /* Remove NON BLOCKING MODE */
-  if (fcntl(pollfd->pollfd.fd, F_SETFL,
-          fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
-  {
-    g_warning ("fcntl() failed");
-    goto error;
-  }
-
-  g_message ("Got connection on fd %d", pollfd->pollfd.fd);
-
-  if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
-  {
-    g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
-
-    // success! we need to shutdown/close all other channels
-    gint i;
-    for (i = 0; i < self->priv->pollfds->len; i++)
-    {
-      FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
-      if (pollfd != pollfd2)
-      {
-        g_message ("closing fd %d", pollfd2->pollfd.fd);
-        shutdown_fd (self, pollfd2);
-        i--;
-      }
-    }
-
-    if (self->priv->direction == FS_DIRECTION_RECV)
-    {
-      GstState state;
-      g_message("Setting media_fd_src on fd %d", pollfd->pollfd.fd);
-
-      gst_element_get_state(self->priv->media_fd_src, &state, NULL,
-          GST_CLOCK_TIME_NONE);
-
-      if ( state > GST_STATE_READY)
-      {
-        g_message("Error: fdsrc in state above ready");
-        gst_element_set_state(self->priv->media_fd_src, GST_STATE_READY);
-      }
-      g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", pollfd->pollfd.fd, NULL);
-      gst_element_set_locked_state(self->priv->media_fd_src,FALSE);
-      gst_element_sync_state_with_parent(self->priv->media_fd_src);
-    }
-    else if (self->priv->direction == FS_DIRECTION_SEND)
-    {
-      GstState state;
-      g_message("Setting media_fd_sink on fd %d", pollfd->pollfd.fd);
-
-      gst_element_get_state(self->priv->media_fd_sink, &state, NULL,
-          GST_CLOCK_TIME_NONE);
-
-      if ( state > GST_STATE_READY)
-      {
-        g_message("Error: fdsrc in state above ready");
-        gst_element_set_state(self->priv->media_fd_sink, GST_STATE_READY);
-      }
-      g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", pollfd->pollfd.fd, NULL);
-      gst_element_set_locked_state(self->priv->media_fd_sink,FALSE);
-      gst_element_sync_state_with_parent(self->priv->media_fd_sink);
-      g_object_set (G_OBJECT (self->priv->send_valve), "drop", FALSE, NULL);
-
-    }
-    pollfd->want_read = FALSE;
-    pollfd->want_write = FALSE;
-    gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, FALSE);
-    gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
-    pollfd->next_step = main_fd_closed_cb;
-    return;
-  }
-  else
-  {
-    g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
-  }
-
-  /* Error */
- error:
-  g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
-  // find, shutdown and remove channel from fdlist
-  gint i;
-  for (i = 0; i < self->priv->pollfds->len; i++)
-  {
-    FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
-    if (pollfd == pollfd2)
-    {
-      g_message ("closing fd %d", pollfd2->pollfd.fd);
-      shutdown_fd (self, pollfd2);
-      i--;
-    }
-  }
-
-  return;
-}
-
-static gboolean
-fs_msn_stream_attempt_connection (FsMsnStream *stream,
-    const gchar *ip,
-    guint16 port)
-{
-  FsMsnStream *self = FS_MSN_STREAM (stream);
-  FsMsnPollFD *pollfd = NULL;
-  gint fd = -1;
-  struct sockaddr_in theiraddr;
-  memset(&theiraddr, 0, sizeof(theiraddr));
-
-  if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
-  {
-    // show error
-    g_message ("could not create socket!");
-    return FALSE;
-  }
-
-  // set non-blocking mode
-  fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
-
-  theiraddr.sin_family = AF_INET;
-  theiraddr.sin_addr.s_addr = inet_addr (ip);
-  theiraddr.sin_port = htons (port);
-
-  g_message ("Attempting connection to %s %d on socket %d", ip, port, fd);
-  // this is non blocking, the return value isn't too usefull
-  gint ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
-  if (ret < 0)
-  {
-    if (errno != EINPROGRESS)
-    {
-      close (fd);
-      return FALSE;
-    }
-  }
-  g_message("ret %d %d %s", ret, errno, strerror(errno));
-
-  pollfd = g_slice_new0 (FsMsnPollFD);
-  gst_poll_fd_init (&pollfd->pollfd);
-  pollfd->pollfd.fd = fd;
-  pollfd->want_read = TRUE;
-  pollfd->want_write = TRUE;
-  gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
-  gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, TRUE);
-  pollfd->next_step = successfull_connection_cb;
-  g_array_append_val (self->priv->pollfds, pollfd);
-
-  return TRUE;
-}
-
 static gboolean
 fs_msn_stream_set_remote_candidate  (FsMsnStream *stream,
                                      FsCandidate *candidate,
                                      GError **error)
 {
-  FsMsnStream *self = FS_MSN_STREAM (stream);
-  fs_msn_open_listening_port(self,self->priv->port); // FIXME - Should be done in the constructor, with a message passed onto the bus to give the port to the client program
-  if (fs_msn_stream_attempt_connection(self,candidate->ip,candidate->port))
-    return TRUE;
-  return FALSE;
-}
-
-static gboolean
-fs_msn_authenticate_incoming (FsMsnStream *stream, gint fd)
-{
-  FsMsnStream *self = FS_MSN_STREAM (stream);
-  if (fd != 0)
-    {
-      gchar str[400];
-      gchar check[400];
-
-      memset(str, 0, sizeof(str));
-      if (recv(fd, str, sizeof(str), 0) != -1)
-        {
-          g_message ("Got %s, checking if it's auth", str);
-          sprintf(str, "recipientid=%d&sessionid=%d\r\n\r\n",
-                  self->priv->local_recipientid, self->priv->remote_sessionid);
-          if (strcmp (str, check) != 0)
-            {
-              // send our connected message also
-              memset(str, 0, sizeof(str));
-              sprintf(str, "connected\r\n\r\n");
-              send(fd, str, strlen(str), 0);
-
-              // now we get connected
-              memset(str, 0, sizeof(str));
-              if (recv(fd, str, sizeof(str), 0) != -1)
-                {
-                  if (strcmp (str, "connected\r\n\r\n") == 0)
-                    {
-                      g_message ("Authentication successfull");
-                      return TRUE;
-                    }
-                }
-            }
-        }
-      else
-        {
-          perror("auth");
-        }
-    }
-  return FALSE;
-}
-
-static void
-fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
-  struct sockaddr_in in;
-  int fd = -1;
-  socklen_t n = sizeof (in);
-  FsMsnPollFD *newpollfd = NULL;
-
-  if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
-      gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
-  {
-    g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
-    goto error;
-  }
-
-  if ((fd = accept(pollfd->pollfd.fd,
-              (struct sockaddr*) &in, &n)) == -1)
-  {
-    g_message ("Error while running accept() %d", errno);
-    return;
-  }
-
-  /* Remove NON BLOCKING MODE */
-  if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
-  {
-    g_warning ("fcntl() failed");
-    goto error;
-  }
-
-  // now we try to auth
-  if (fs_msn_authenticate_incoming(self,fd))
-  {
-    g_message ("Authenticated incoming successfully fd %d", fd);
-
-    // success! we need to shutdown/close all other channels
-    gint i;
-    for (i = 0; i < self->priv->pollfds->len; i++)
-    {
-      FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
-      g_message ("closing fd %d", pollfd2->pollfd.fd);
-      shutdown_fd (self, pollfd2);
-      i--;
-    }
-    if (self->priv->direction == FS_DIRECTION_RECV)
-    {
-      GstState state;
-      g_message("Setting media_fd_src on fd %d",fd);
-
-      gst_element_get_state(self->priv->media_fd_src, &state, NULL,
-          GST_CLOCK_TIME_NONE);
-
-      if ( state > GST_STATE_READY)
-      {
-        g_message("Error: fdsrc in state above ready");
-        gst_element_set_state(self->priv->media_fd_src, GST_STATE_READY);
-      }
-      g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", fd, NULL);
-      gst_element_set_locked_state(self->priv->media_fd_src,FALSE);
-      gst_element_sync_state_with_parent(self->priv->media_fd_src);
-    }
-    else if (self->priv->direction == FS_DIRECTION_SEND)
-    {
-
-      GstState state;
-      g_message("Setting media_fd_sink on fd %d", fd);
-
-      gst_element_get_state(self->priv->media_fd_sink, &state, NULL,
-          GST_CLOCK_TIME_NONE);
-
-      if ( state > GST_STATE_READY)
-      {
-        g_message("Error: fdsrc in state above ready");
-        gst_element_set_state(self->priv->media_fd_sink, GST_STATE_READY);
-      }
-      g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", fd, NULL);
-      gst_element_set_locked_state(self->priv->media_fd_sink,FALSE);
-      gst_element_sync_state_with_parent(self->priv->media_fd_sink);
-      g_object_set (G_OBJECT (self->priv->send_valve), "drop", FALSE, NULL);
-
-    }
-
-    newpollfd = g_slice_new0 (FsMsnPollFD);
-    gst_poll_fd_init (&newpollfd->pollfd);
-    newpollfd->pollfd.fd = fd;
-    newpollfd->want_read = FALSE;
-    newpollfd->want_write = FALSE;
-    gst_poll_fd_ctl_read (self->priv->poll, &newpollfd->pollfd, FALSE);
-    gst_poll_fd_ctl_write (self->priv->poll, &newpollfd->pollfd, FALSE);
-    newpollfd->next_step = main_fd_closed_cb;
-    g_array_append_val (self->priv->pollfds, newpollfd);
-    return;
-  }
-
-  /* Error */
- error:
-  g_message ("Got error from fd %d, closing", fd);
-  // find, shutdown and remove channel from fdlist
-  gint i;
-  for (i = 0; i < self->priv->pollfds->len; i++)
-  {
-    FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
-    if (pollfd == pollfd2)
-    {
-      g_message ("closing fd %d", pollfd2->pollfd.fd);
-      shutdown_fd (self, pollfd2);
-      i--;
-    }
-  }
-
-  if (fd > 0)
-    close (fd);
-
-  return;
-}
-
-static void
-fs_msn_open_listening_port (FsMsnStream *stream,
-                            guint16 port)
-{
-  FsMsnStream *self = FS_MSN_STREAM (stream);
-  gint fd = -1;
-  FsMsnPollFD *pollfd = NULL;
-  struct sockaddr_in myaddr;
-  memset(&myaddr, 0, sizeof(myaddr));
-
-  g_message ("Attempting to listen on port %d.....\n",port);
-
-  if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
-  {
-    // show error
-    g_message ("could not create socket!");
-    return;
-  }
-
-  // set non-blocking mode
-  fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
-  myaddr.sin_family = AF_INET;
-  myaddr.sin_port = htons (port);
-  // bind
-  if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
-  {
-    close (fd);
-    return;
-  }
-
-
-  /* Listen */
-  if (listen(fd, 3) != 0)
-  {
-    close (fd);
-    return;
-  }
-  pollfd = g_slice_new0 (FsMsnPollFD);
-  gst_poll_fd_init (&pollfd->pollfd);
-  pollfd->pollfd.fd = fd;
-  pollfd->want_read = TRUE;
-  pollfd->want_write = FALSE;
-  gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
-  gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
-  pollfd->next_step = fd_accept_connection_cb;
-
-  g_array_append_val (self->priv->pollfds, pollfd);
-  g_message ("Listening on port %d\n",port);
-
-}
-
-// Authenticate ourselves when connecting out
-static gboolean
-fs_msn_authenticate_outgoing (FsMsnStream *stream, gint fd)
-{
-  FsMsnStream *self = FS_MSN_STREAM (stream);
-  gchar str[400];
-  memset(str, 0, sizeof(str));
-  if (fd != 0)
-  {
-    g_message ("Authenticating connection on %d...", fd);
-    g_message ("sending : recipientid=%d&sessionid=%d\r\n\r\n",
-        self->priv->remote_recipientid, self->priv->remote_sessionid);
-    sprintf(str, "recipientid=%d&sessionid=%d\r\n\r\n",
-        self->priv->remote_recipientid, self->priv->remote_sessionid);
-    if (send(fd, str, strlen(str), 0) == -1)
-    {
-      g_message("sending failed");
-      perror("auth");
-    }
-
-    memset(str, 0, sizeof(str));
-    if (recv(fd, str, 13, 0) != -1)
-    {
-      g_message ("Got %s, checking if it's auth", str);
-      // we should get a connected message now
-      if (strcmp (str, "connected\r\n\r\n") == 0)
-      {
-        // send our connected message also
-        memset(str, 0, sizeof(str));
-        sprintf(str, "connected\r\n\r\n");
-        send(fd, str, strlen(str), 0);
-        g_message ("Authentication successfull");
-        return TRUE;
-      }
-    }
-    else
-    {
-      perror("auth");
-    }
-  }
   return FALSE;
 }
 
-
 /**
  * fs_msn_stream_new:
  * @session: The #FsMsnSession this stream is a child of
@@ -1197,71 +596,3 @@ fs_msn_stream_new (FsMsnSession *session,
   return self;
 }
 
-
-static gpointer
-connection_polling_thread (gpointer data)
-{
-  FsMsnStream *self = data;
-  gint ret;
-  GstClockTime timeout;
-
-  GST_OBJECT_LOCK (self->priv->conference);
-  timeout = self->priv->poll_timeout;
-  GST_OBJECT_UNLOCK (self->priv->conference);
-
-  while ((ret = gst_poll_wait (self->priv->poll, timeout)) >= 0)
-  {
-    if (ret > 0)
-    {
-      gint i;
-
-      for (i = 0; i < self->priv->pollfds->len; i++)
-      {
-        FsMsnPollFD *pollfd = &g_array_index(self->priv->pollfds,
-            FsMsnPollFD, i);
-
-        if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
-            gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
-        {
-          pollfd->next_step (self, pollfd);
-          shutdown_fd (self, pollfd);
-          i--;
-          continue;
-        }
-        if ((pollfd->want_read &&
-                gst_poll_fd_can_read (self->priv->poll, &pollfd->pollfd)) ||
-            (pollfd->want_write &&
-                gst_poll_fd_can_write (self->priv->poll, &pollfd->pollfd)))
-          pollfd->next_step (self, pollfd);
-      }
-
-    }
-    GST_OBJECT_LOCK (self->priv->conference);
-    timeout = self->priv->poll_timeout;
-    GST_OBJECT_UNLOCK (self->priv->conference);
-  }
-
-  return NULL;
-}
-
-static void
-shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
-  gint i;
-
-
-  if (!gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
-    close (pollfd->pollfd.fd);
-  gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
-  for (i = 0; i < self->priv->pollfds->len; i++)
-  {
-    FsMsnPollFD *p = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
-    if (p == pollfd)
-    {
-      g_array_remove_index_fast (self->priv->pollfds, i);
-      break;
-    }
-
-  }
-  gst_poll_restart (self->priv->poll);
-}
-- 
1.5.6.5




More information about the farsight-commits mailing list