[farsight2/master] Making the msn-connection use a state machine per socket and not use blocking sockets. Fixed GstPoll and tested it, it works!

Youness Alaoui youness.alaoui at collabora.co.uk
Tue Jul 14 09:50:32 PDT 2009


---
 gst/fsmsnconference/fs-msn-connection.c |  803 +++++++++++++++++++------------
 gst/fsmsnconference/fs-msn-connection.h |    8 +-
 gst/fsmsnconference/fs-msn-session.c    |    4 +-
 3 files changed, 500 insertions(+), 315 deletions(-)

diff --git a/gst/fsmsnconference/fs-msn-connection.c b/gst/fsmsnconference/fs-msn-connection.c
index ffb43b2..553ac9a 100644
--- a/gst/fsmsnconference/fs-msn-connection.c
+++ b/gst/fsmsnconference/fs-msn-connection.c
@@ -5,6 +5,7 @@
  * Copyright 2007 Nokia Corp.
  * Copyright 2007 Collabora Ltd.
  *  @author: Olivier Crete <olivier.crete at collabora.co.uk>
+ *  @author: Youness Alaoui <youness.alaoui at collabora.co.uk>
  *
  * fs-msn-connection.c - A MSN Connection gobject
  *
@@ -33,6 +34,7 @@
 #include <netinet/in.h>
 #include <fcntl.h>
 
+#include <errno.h>
 #include <string.h>
 #include <arpa/inet.h>
 #include <unistd.h>
@@ -44,6 +46,7 @@ enum
   LAST_SIGNAL
 };
 
+
 /* props */
 enum
 {
@@ -51,42 +54,48 @@ enum
 };
 
 
+typedef enum {
+  FS_MSN_STATUS_AUTH,
+  FS_MSN_STATUS_CONNECTED,
+  FS_MSN_STATUS_CONNECTED2,
+  FS_MSN_STATUS_SEND_RECEIVE,
+  FS_MSN_STATUS_PAUSED,
+} FsMsnStatus;
+
 typedef struct _FsMsnPollFD FsMsnPollFD;
+typedef void (*PollFdCallback) (FsMsnConnection *self, FsMsnPollFD *pollfd);
 
 struct _FsMsnPollFD {
   GstPollFD pollfd;
+  FsMsnStatus status;
+  gboolean server;
   gboolean want_read;
   gboolean want_write;
-  void (*next_step) (FsMsnConnection *self, FsMsnPollFD *pollfd);
+  PollFdCallback callback;
 };
 
 
 G_DEFINE_TYPE(FsMsnConnection, fs_msn_connection, G_TYPE_OBJECT);
 
-static void fs_msn_connection_dispose (GObject *object);
-static void fs_msn_connection_finalize (GObject *object);
-
-static void main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *fd);
-
-static void successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static GObjectClass *parent_class = NULL;
 
-static void fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static void fs_msn_connection_dispose (GObject *object);
 
-static gboolean fs_msn_connection_attempt_connection (FsMsnConnection *connection,
-    gchar const *ip,
-    guint16 port);
 
-static gboolean fs_msn_authenticate_outgoing (FsMsnConnection *connection,
-    gint fd);
+static gboolean fs_msn_connection_attempt_connection (
+    FsMsnConnection *connection,
+    FsCandidate *candidate);
+static gboolean fs_msn_open_listening_port (FsMsnConnection *connection,
+    guint16 port, NewLocalCandidateCB cb, gpointer data);
 
-static void fs_msn_open_listening_port (FsMsnConnection *connection,
-    guint16 port);
+static void successful_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static void accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+static void connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
 
 static gpointer connection_polling_thread (gpointer data);
-
 static void shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd);
-
-static GObjectClass *parent_class = NULL;
+static FsMsnPollFD * add_pollfd (FsMsnConnection *self, int fd,
+    PollFdCallback callback, gboolean read, gboolean write);
 
 static void
 fs_msn_connection_class_init (FsMsnConnectionClass *klass)
@@ -96,7 +105,6 @@ fs_msn_connection_class_init (FsMsnConnectionClass *klass)
   parent_class = g_type_class_peek_parent (klass);
 
   gobject_class->dispose = fs_msn_connection_dispose;
-  gobject_class->finalize = fs_msn_connection_finalize;
 }
 
 static void
@@ -108,18 +116,26 @@ fs_msn_connection_init (FsMsnConnection *self)
 
   self->poll_timeout = GST_CLOCK_TIME_NONE;
   self->poll = gst_poll_new (TRUE);
-  gst_poll_set_controllable (self->poll, TRUE);
-  self->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
+  gst_poll_set_flushing (self->poll, FALSE);
+  self->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD *));
+
+  g_static_rec_mutex_init (&self->mutex);
 }
 
 static void
 fs_msn_connection_dispose (GObject *object)
 {
   FsMsnConnection *self = FS_MSN_CONNECTION (object);
+  gint i;
+
+  g_static_rec_mutex_lock (&self->mutex);
 
   /* If dispose did already run, return. */
   if (self->disposed)
+  {
+    g_static_rec_mutex_unlock (&self->mutex);
     return;
+  }
 
   if (self->polling_thread)
   {
@@ -133,26 +149,19 @@ fs_msn_connection_dispose (GObject *object)
   if (self->remote_recipient_id)
     g_free (self->remote_recipient_id);
 
-  /* Make sure dispose does not run twice. */
-  self->disposed = TRUE;
-
-  parent_class->dispose (object);
-}
-
-static void
-fs_msn_connection_finalize (GObject *object)
-{
-  FsMsnConnection *self = FS_MSN_CONNECTION (object);
-  guint i;
-
-  /* TODO : why not in dispose */
   gst_poll_free (self->poll);
 
   for (i = 0; i < self->pollfds->len; i++)
-    close (g_array_index(self->pollfds, FsMsnPollFD, i).pollfd.fd);
+    close (g_array_index(self->pollfds, FsMsnPollFD *, i)->pollfd.fd);
   g_array_free (self->pollfds, TRUE);
 
-  parent_class->finalize (object);
+
+  /* Make sure dispose does not run twice. */
+  self->disposed = TRUE;
+
+  parent_class->dispose (object);
+
+  g_static_rec_mutex_unlock (&self->mutex);
 }
 
 /**
@@ -181,17 +190,23 @@ fs_msn_connection_new (guint session_id, guint initial_port)
 }
 
 gboolean
-fs_msn_connection_gather_local_candidates (FsMsnConnection *self)
+fs_msn_connection_gather_local_candidates (FsMsnConnection *self,
+    NewLocalCandidateCB cb, gpointer data)
 {
-
-  fs_msn_open_listening_port (self, self->initial_port);
+  gboolean ret = FALSE;
+  g_static_rec_mutex_lock (&self->mutex);
 
   self->polling_thread = g_thread_create (connection_polling_thread,
       self, TRUE, NULL);
 
-  return self->polling_thread != NULL;
+  if (self->polling_thread)
+    ret = fs_msn_open_listening_port (self, self->initial_port, cb, data);
+
+  g_static_rec_mutex_unlock (&self->mutex);
+  return ret;
 }
 
+
 /**
  * fs_msn_connection_set_remote_candidate:
  */
@@ -200,6 +215,12 @@ fs_msn_connection_set_remote_candidates (FsMsnConnection *self,
     GList *candidates, GError **error)
 {
   GList *item = NULL;
+  gchar *recipient_id = NULL;
+  gboolean ret = FALSE;
+
+  g_static_rec_mutex_lock (&self->mutex);
+
+  recipient_id = self->remote_recipient_id;
 
   for (item = candidates; item; item = g_list_next (item))
   {
@@ -209,148 +230,129 @@ fs_msn_connection_set_remote_candidates (FsMsnConnection *self,
     {
       g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
           "The candidate passed does not contain a valid ip or port");
-      return FALSE;
+      goto out;
     }
     if (!candidate->foundation)
     {
       g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
           "The candidate passed does not have a foundation (MSN recipient ID)");
-      return FALSE;
+      goto out;
     }
-    if (self->remote_recipient_id) {
-      if (g_strcmp0 (candidate->foundation, self->remote_recipient_id) != 0)
+    if (recipient_id)
+    {
+      if (g_strcmp0 (candidate->foundation, recipient_id) != 0)
       {
         g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
             "The candidates do not have the same recipient ID");
-        return FALSE;
+        goto out;
       }
-    } else {
-      self->remote_recipient_id = g_strdup (candidate->foundation);
     }
-    fs_msn_connection_attempt_connection(self, candidate->ip, candidate->port);
+    else
+    {
+      recipient_id = candidate->foundation;
+    }
   }
 
-  return TRUE;
-}
-
-
-static void
-main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
-{
-  g_message ("disconnection on video feed");
-  /* IXME - How to handle the disconnection of the connection
-     Destroy the elements involved?
-     Set the state to Null ?
-  */
-}
-
-static void
-successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
-{
-  gint error;
-  socklen_t option_len;
-
-  g_message ("handler called on fd %d", pollfd->pollfd.fd);
-
-  errno = 0;
-  if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
-      gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+  self->remote_recipient_id = g_strdup (recipient_id);
+  for (item = candidates; item; item = g_list_next (item))
   {
-    g_message ("connecton closed or error");
-    goto error;
+    FsCandidate *candidate = item->data;
+    fs_msn_connection_attempt_connection(self, candidate);
   }
 
-  option_len = sizeof(error);
+  ret = TRUE;
+ out:
+  g_static_rec_mutex_unlock (&self->mutex);
+  return ret;
+}
 
-  /* Get the error option */
-  if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
-  {
-    g_warning ("getsockopt() failed");
-    goto error;
-  }
 
-  /* Check if there is an error */
-  if (error)
-  {
-    g_message ("getsockopt gave an error : %d", error);
-    goto error;
-  }
+static gboolean
+fs_msn_open_listening_port (FsMsnConnection *self,
+    guint16 port, NewLocalCandidateCB cb, gpointer data)
+{
+  gint fd = -1;
+  struct sockaddr_in myaddr;
+  memset(&myaddr, 0, sizeof(myaddr));
 
-  /* Remove NON BLOCKING MODE */
-  if (fcntl(pollfd->pollfd.fd, F_SETFL,
-          fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
+  g_debug ("Attempting to listen on port %d.....",port);
+
+  if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
   {
-    g_warning ("fcntl() failed");
-    goto error;
+    // show error
+    g_debug ("could not create socket!");
+    return FALSE;
   }
 
-  g_message ("Got connection on fd %d", pollfd->pollfd.fd);
-
-  if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
+  // set non-blocking mode
+  fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
+  myaddr.sin_family = AF_INET;
+  do
   {
-    g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
-
-    // success! we need to shutdown/close all other channels
-    gint i;
-    for (i = 0; i < self->pollfds->len; i++)
+    g_debug ("Attempting to listen on port %d.....",port);
+    myaddr.sin_port = htons (port);
+    // bind
+    if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
     {
-      FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
-      if (pollfd != pollfd2)
+      if (port != 0 && errno == EADDRINUSE)
       {
-        g_message ("closing fd %d", pollfd2->pollfd.fd);
-        shutdown_fd (self, pollfd2);
-        i--;
+        port++;
+      }
+      else
+      {
+        perror ("bind");
+        close (fd);
+        return FALSE;
+      }
+    } else {
+      /* Listen */
+      if (listen(fd, 3) != 0)
+      {
+        if (port != 0 && errno == EADDRINUSE)
+        {
+          port++;
+        }
+        else
+        {
+          perror ("listen");
+          close (fd);
+          return FALSE;
+        }
       }
     }
+  } while (errno == EADDRINUSE);
 
-    /* TODO : callback */
+  add_pollfd (self, fd, accept_connection_cb, TRUE, TRUE);
 
-    pollfd->want_read = FALSE;
-    pollfd->want_write = FALSE;
-    gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
-    gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
-    pollfd->next_step = main_fd_closed_cb;
-    return;
-  }
-  else
-  {
-    g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
-  }
+  g_debug ("Listening on port %d", port);
 
-  /* Error */
- error:
-  g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
-  // find, shutdown and remove channel from fdlist
-  gint i;
-  for (i = 0; i < self->pollfds->len; i++)
-  {
-    FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
-    if (pollfd == pollfd2)
-    {
-      g_message ("closing fd %d", pollfd2->pollfd.fd);
-      shutdown_fd (self, pollfd2);
-      i--;
-    }
-  }
+  FsCandidate * cand = fs_candidate_new ("123", 1, FS_CANDIDATE_TYPE_HOST,
+      FS_NETWORK_PROTOCOL_TCP, "127.0.0.1", port);
+  g_static_rec_mutex_unlock (&self->mutex);
+  cb (cand, data);
+  g_static_rec_mutex_lock (&self->mutex);
+  fs_candidate_destroy (cand);
 
-  return;
+  self->local_recipient_id = g_strdup ("123");
+
+  return TRUE;
 }
 
 static gboolean
 fs_msn_connection_attempt_connection (FsMsnConnection *connection,
-    const gchar *ip,
-    guint16 port)
+    FsCandidate *candidate)
 {
   FsMsnConnection *self = FS_MSN_CONNECTION (connection);
-  FsMsnPollFD *pollfd = NULL;
+  FsMsnPollFD *pollfd;
   gint fd = -1;
   struct sockaddr_in theiraddr;
   memset(&theiraddr, 0, sizeof(theiraddr));
 
+
   if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
   {
     // show error
-    g_message ("could not create socket!");
+    g_debug ("could not create socket!");
     return FALSE;
   }
 
@@ -358,79 +360,33 @@ fs_msn_connection_attempt_connection (FsMsnConnection *connection,
   fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
 
   theiraddr.sin_family = AF_INET;
-  theiraddr.sin_addr.s_addr = inet_addr (ip);
-  theiraddr.sin_port = htons (port);
+  theiraddr.sin_addr.s_addr = inet_addr (candidate->ip);
+  theiraddr.sin_port = htons (candidate->port);
 
-  g_message ("Attempting connection to %s %d on socket %d", ip, port, fd);
+  g_debug ("Attempting connection to %s %d on socket %d", candidate->ip,
+      candidate->port, fd);
   // this is non blocking, the return value isn't too usefull
   gint ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
   if (ret < 0)
   {
     if (errno != EINPROGRESS)
     {
+      g_debug("ret %d %d %s", ret, errno, strerror(errno));
       close (fd);
       return FALSE;
     }
   }
-  g_message("ret %d %d %s", ret, errno, strerror(errno));
+  g_debug("ret %d %d %s", ret, errno, strerror(errno));
 
-  pollfd = g_slice_new0 (FsMsnPollFD);
-  gst_poll_fd_init (&pollfd->pollfd);
-  pollfd->pollfd.fd = fd;
-  pollfd->want_read = TRUE;
-  pollfd->want_write = TRUE;
-  gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
-  gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
-  pollfd->next_step = successfull_connection_cb;
-  g_array_append_val (self->pollfds, pollfd);
 
-  return TRUE;
-}
-
-static gboolean
-fs_msn_authenticate_incoming (FsMsnConnection *connection, gint fd)
-{
-  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
-  if (fd != 0)
-    {
-      gchar str[400];
-      gchar check[400];
+  pollfd = add_pollfd (self, fd, successful_connection_cb, TRUE, TRUE);
+  pollfd->server = FALSE;
 
-      memset(str, 0, sizeof(str));
-      if (recv(fd, str, sizeof(str), 0) != -1)
-        {
-          g_message ("Got %s, checking if it's auth", str);
-          sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
-                  self->local_recipient_id, self->session_id);
-          if (strcmp (str, check) != 0)
-            {
-              // send our connected message also
-              memset(str, 0, sizeof(str));
-              sprintf(str, "connected\r\n\r\n");
-              send(fd, str, strlen(str), 0);
-
-              // now we get connected
-              memset(str, 0, sizeof(str));
-              if (recv(fd, str, sizeof(str), 0) != -1)
-                {
-                  if (strcmp (str, "connected\r\n\r\n") == 0)
-                    {
-                      g_message ("Authentication successfull");
-                      return TRUE;
-                    }
-                }
-            }
-        }
-      else
-        {
-          perror("auth");
-        }
-    }
-  return FALSE;
+  return TRUE;
 }
 
 static void
-fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
 {
   struct sockaddr_in in;
   int fd = -1;
@@ -440,168 +396,336 @@ fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
   if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
       gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
   {
-    g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
+    g_debug ("Error in accept socket : %d", pollfd->pollfd.fd);
     goto error;
   }
 
   if ((fd = accept(pollfd->pollfd.fd,
               (struct sockaddr*) &in, &n)) == -1)
   {
-    g_message ("Error while running accept() %d", errno);
+    g_debug ("Error while running accept() %d", errno);
     return;
   }
 
-  /* Remove NON BLOCKING MODE */
-  if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
-  {
-    g_warning ("fcntl() failed");
-    goto error;
-  }
+  newpollfd = add_pollfd (self, fd, connection_cb, TRUE, FALSE);
+  newpollfd->server = TRUE;
 
-  // now we try to auth
-  if (fs_msn_authenticate_incoming(self,fd))
-  {
-    g_message ("Authenticated incoming successfully fd %d", fd);
+  return;
 
-    // success! we need to shutdown/close all other channels
-    gint i;
-    for (i = 0; i < self->pollfds->len; i++)
+  /* Error */
+ error:
+  g_debug ("Got error from fd %d, closing", fd);
+  // find, shutdown and remove channel from fdlist
+  gint i;
+  for (i = 0; i < self->pollfds->len; i++)
+  {
+    FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
+    if (pollfd == pollfd2)
     {
-      FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
-      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      g_debug ("closing fd %d", pollfd2->pollfd.fd);
       shutdown_fd (self, pollfd2);
       i--;
     }
-    /* TODO callback */
-
-    newpollfd = g_slice_new0 (FsMsnPollFD);
-    gst_poll_fd_init (&newpollfd->pollfd);
-    newpollfd->pollfd.fd = fd;
-    newpollfd->want_read = FALSE;
-    newpollfd->want_write = FALSE;
-    gst_poll_fd_ctl_read (self->poll, &newpollfd->pollfd, FALSE);
-    gst_poll_fd_ctl_write (self->poll, &newpollfd->pollfd, FALSE);
-    newpollfd->next_step = main_fd_closed_cb;
-    g_array_append_val (self->pollfds, newpollfd);
-    return;
   }
 
+  return;
+}
+
+
+static void
+successful_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+  gint error;
+  socklen_t option_len;
+
+  g_debug ("handler called on fd %d", pollfd->pollfd.fd);
+
+  errno = 0;
+  if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+      gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+  {
+    g_debug ("connecton closed or error");
+    goto error;
+  }
+
+  option_len = sizeof(error);
+
+  /* Get the error option */
+  if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
+  {
+    g_warning ("getsockopt() failed");
+    goto error;
+  }
+
+  /* Check if there is an error */
+  if (error)
+  {
+    g_debug ("getsockopt gave an error : %d", error);
+    goto error;
+  }
+
+  pollfd->callback = connection_cb;
+
+  g_debug ("connection succeeded on socket %p", pollfd);
+  return;
+
   /* Error */
  error:
-  g_message ("Got error from fd %d, closing", fd);
+  g_debug ("Got error from fd %d, closing", pollfd->pollfd.fd);
   // find, shutdown and remove channel from fdlist
   gint i;
   for (i = 0; i < self->pollfds->len; i++)
   {
-    FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+    FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
     if (pollfd == pollfd2)
     {
-      g_message ("closing fd %d", pollfd2->pollfd.fd);
+      g_debug ("closing fd %d", pollfd2->pollfd.fd);
       shutdown_fd (self, pollfd2);
       i--;
     }
   }
 
-  if (fd > 0)
-    close (fd);
-
   return;
 }
 
+
 static void
-fs_msn_open_listening_port (FsMsnConnection *connection,
-                            guint16 port)
+connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
 {
-  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
-  gint fd = -1;
-  FsMsnPollFD *pollfd = NULL;
-  struct sockaddr_in myaddr;
-  memset(&myaddr, 0, sizeof(myaddr));
+  gboolean success = FALSE;
 
-  g_message ("Attempting to listen on port %d.....\n",port);
+  g_debug ("handler called on fd %d. %d %d %d %d", pollfd->pollfd.fd,
+      pollfd->server, pollfd->status,
+      gst_poll_fd_can_read (self->poll, &pollfd->pollfd),
+      gst_poll_fd_can_write (self->poll, &pollfd->pollfd));
 
-  if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
+  if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+      gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
   {
-    // show error
-    g_message ("could not create socket!");
-    return;
+    g_debug ("connecton closed or error");
+    goto error;
   }
 
-  // set non-blocking mode
-  fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
-  myaddr.sin_family = AF_INET;
-  myaddr.sin_port = htons (port);
-  // bind
-  if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
+  if (gst_poll_fd_can_read (self->poll, &pollfd->pollfd))
   {
-    close (fd);
-    return;
-  }
+    switch (pollfd->status)
+    {
+      case FS_MSN_STATUS_AUTH:
+        if (pollfd->server)
+        {
+          gchar str[35] = {0};
+          gchar check[35] = {0};
+
+          if (recv(pollfd->pollfd.fd, str, 34, 0) != -1)
+          {
+            g_debug ("Got %s, checking if it's auth", str);
+            sprintf(check, "recipientid=%s&sessionid=%d\r\n\r\n",
+                self->local_recipient_id, self->session_id);
+            if (strcmp (str, check) == 0)
+            {
+              g_debug ("Authentication successful");
+              pollfd->status = FS_MSN_STATUS_CONNECTED;
+              pollfd->want_write = TRUE;
+              gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
+            }
+            else
+            {
+              g_debug ("Authentication failed");
+              goto error;
+            }
+          }
+          else
+          {
+            perror ("auth");
+            goto error;
+          }
+
+        } else {
+          g_debug ("shouldn't receive data when client on AUTH state");
+          goto error;
+        }
+        break;
+      case FS_MSN_STATUS_CONNECTED:
+        if (!pollfd->server)
+        {
+          gchar str[14] = {0};
+
+          if (recv(pollfd->pollfd.fd, str, 13, 0) != -1)
+          {
+            g_debug ("Got %s, checking if it's connected", str);
+            if (strcmp (str, "connected\r\n\r\n") == 0)
+            {
+              g_debug ("connection successful");
+              pollfd->status = FS_MSN_STATUS_CONNECTED2;
+              pollfd->want_write = TRUE;
+              gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
+            }
+            else
+            {
+              g_debug ("connected failed");
+              goto error;
+            }
+          }
+          else
+          {
+            perror ("connected");
+            goto error;
+          }
+        } else {
+          g_debug ("shouldn't receive data when server on CONNECTED state");
+          goto error;
+        }
+        break;
+      case FS_MSN_STATUS_CONNECTED2:
+        if (pollfd->server)
+        {
+          gchar str[14] = {0};
 
+          if (recv(pollfd->pollfd.fd, str, 13, 0) != -1)
+          {
+            g_debug ("Got %s, checking if it's connected", str);
+            if (strcmp (str, "connected\r\n\r\n") == 0)
+            {
+              g_debug ("connection successful");
+              pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
+              success = TRUE;
+            }
+            else
+            {
+              g_debug ("connected failed");
+              goto error;
+            }
+          }
+          else
+          {
+            perror ("connected");
+            goto error;
+          }
+
+        } else {
+          g_debug ("shouldn't receive data when client on CONNECTED2 state");
+          goto error;
+        }
+        break;
+      default:
+        g_debug ("Invalid status %d", pollfd->status);
+        goto error;
+        break;
 
-  /* Listen */
-  if (listen(fd, 3) != 0)
+    }
+  } else if (gst_poll_fd_can_write (self->poll, &pollfd->pollfd))
   {
-    close (fd);
-    return;
-  }
-  pollfd = g_slice_new0 (FsMsnPollFD);
-  gst_poll_fd_init (&pollfd->pollfd);
-  pollfd->pollfd.fd = fd;
-  pollfd->want_read = TRUE;
-  pollfd->want_write = FALSE;
-  gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
-  gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
-  pollfd->next_step = fd_accept_connection_cb;
+    pollfd->want_write = FALSE;
+    gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+    switch (pollfd->status)
+    {
+      case FS_MSN_STATUS_AUTH:
+        if (!pollfd->server)
+        {
+          gchar *str = g_strdup_printf("recipientid=%s&sessionid=%d\r\n\r\n",
+              self->remote_recipient_id, self->session_id);
+          if (send(pollfd->pollfd.fd, str, strlen (str), 0) != -1)
+          {
+            g_debug ("Sent %s", str);
+            pollfd->status = FS_MSN_STATUS_CONNECTED;
+            g_free (str);
+          }
+          else
+          {
+            g_free (str);
+            perror ("auth");
+            goto error;
+          }
 
-  g_array_append_val (self->pollfds, pollfd);
-  g_message ("Listening on port %d\n",port);
+        }
+        break;
+      case FS_MSN_STATUS_CONNECTED:
+        if (pollfd->server)
+        {
 
-}
+          if (send(pollfd->pollfd.fd, "connected\r\n\r\n", 13, 0) != -1)
+          {
+            g_debug ("sent connected");
+            pollfd->status = FS_MSN_STATUS_CONNECTED2;
+          }
+          else
+          {
+            perror ("sending connected");
+            goto error;
+          }
+        } else {
+          g_debug ("shouldn't receive data when server on CONNECTED state");
+          goto error;
+        }
+        break;
+      case FS_MSN_STATUS_CONNECTED2:
+        if (!pollfd->server)
+        {
 
-// Authenticate ourselves when connecting out
-static gboolean
-fs_msn_authenticate_outgoing (FsMsnConnection *connection, gint fd)
-{
-  FsMsnConnection *self = FS_MSN_CONNECTION (connection);
-  gchar str[400];
-  memset(str, 0, sizeof(str));
-  if (fd != 0)
-  {
-    g_message ("Authenticating connection on %d...", fd);
-    g_message ("sending : recipientid=%s&sessionid=%d\r\n\r\n",
-        self->remote_recipient_id, self->session_id);
-    sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
-        self->remote_recipient_id, self->session_id);
-    if (send(fd, str, strlen(str), 0) == -1)
-    {
-      g_message("sending failed");
-      perror("auth");
+          if (send(pollfd->pollfd.fd, "connected\r\n\r\n", 13, 0) != -1)
+          {
+            g_debug ("sent connected");
+            pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
+            success = TRUE;
+          }
+          else
+          {
+            perror ("sending connected");
+            goto error;
+          }
+        } else {
+          g_debug ("shouldn't receive data when client on CONNECTED2 state");
+          goto error;
+        }
+        break;
+      default:
+        g_debug ("Invalid status %d", pollfd->status);
+        goto error;
+        break;
     }
+  }
 
-    memset(str, 0, sizeof(str));
-    if (recv(fd, str, 13, 0) != -1)
+  if (success) {
+    // success! we need to shutdown/close all other channels
+    gint i;
+    for (i = 0; i < self->pollfds->len; i++)
     {
-      g_message ("Got %s, checking if it's auth", str);
-      // we should get a connected message now
-      if (strcmp (str, "connected\r\n\r\n") == 0)
+      FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
+      if (pollfd != pollfd2)
       {
-        // send our connected message also
-        memset(str, 0, sizeof(str));
-        sprintf(str, "connected\r\n\r\n");
-        send(fd, str, strlen(str), 0);
-        g_message ("Authentication successfull");
-        return TRUE;
+        g_debug ("closing fd %d", pollfd2->pollfd.fd);
+        shutdown_fd (self, pollfd2);
+        i--;
       }
     }
-    else
+
+    /* TODO : callback */
+
+    pollfd->want_read = FALSE;
+    pollfd->want_write = FALSE;
+    gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
+    gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+  }
+
+  return;
+ error:
+  /* Error */
+  g_debug ("Got error from fd %d, closing", pollfd->pollfd.fd);
+  // find, shutdown and remove channel from fdlist
+  gint i;
+  for (i = 0; i < self->pollfds->len; i++)
+  {
+    FsMsnPollFD *pollfd2 = g_array_index(self->pollfds, FsMsnPollFD *, i);
+    if (pollfd == pollfd2)
     {
-      perror("auth");
+      g_debug ("closing fd %d", pollfd2->pollfd.fd);
+      shutdown_fd (self, pollfd2);
+      i--;
     }
   }
-  return FALSE;
-}
 
+  return;
+}
 
 static gpointer
 connection_polling_thread (gpointer data)
@@ -609,54 +733,79 @@ connection_polling_thread (gpointer data)
   FsMsnConnection *self = data;
   gint ret;
   GstClockTime timeout;
+  GstPoll * poll;
 
+  g_static_rec_mutex_lock (&self->mutex);
   timeout = self->poll_timeout;
+  poll = self->poll;
+  g_debug ("poll waiting %d", self->pollfds->len);
+  g_static_rec_mutex_unlock (&self->mutex);
 
-  while ((ret = gst_poll_wait (self->poll, timeout)) >= 0)
+  while ((ret = gst_poll_wait (poll, timeout)) >= 0)
   {
+    g_debug ("gst_poll_wait returned : %d", ret);
+    g_static_rec_mutex_lock (&self->mutex);
     if (ret > 0)
     {
       gint i;
 
       for (i = 0; i < self->pollfds->len; i++)
       {
-        FsMsnPollFD *pollfd = &g_array_index(self->pollfds,
-            FsMsnPollFD, i);
+        FsMsnPollFD *pollfd = NULL;
 
-        if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
-            gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+        pollfd = g_array_index(self->pollfds, FsMsnPollFD *, i);
+
+        g_debug ("ret %d - i = %d, len = %d", ret, i, self->pollfds->len);
+
+        g_debug ("%p - error %d, close %d, read %d-%d, write %d-%d",
+            pollfd,
+            gst_poll_fd_has_error (poll, &pollfd->pollfd),
+            gst_poll_fd_has_closed (poll, &pollfd->pollfd),
+            pollfd->want_read,
+            gst_poll_fd_can_read (poll, &pollfd->pollfd),
+            pollfd->want_write,
+            gst_poll_fd_can_write (poll, &pollfd->pollfd));
+
+        if (gst_poll_fd_has_error (poll, &pollfd->pollfd) ||
+            gst_poll_fd_has_closed (poll, &pollfd->pollfd))
         {
-          pollfd->next_step (self, pollfd);
+          pollfd->callback (self, pollfd);
           shutdown_fd (self, pollfd);
           i--;
           continue;
         }
         if ((pollfd->want_read &&
-                gst_poll_fd_can_read (self->poll, &pollfd->pollfd)) ||
+                gst_poll_fd_can_read (poll, &pollfd->pollfd)) ||
             (pollfd->want_write &&
-                gst_poll_fd_can_write (self->poll, &pollfd->pollfd)))
-          pollfd->next_step (self, pollfd);
-      }
+                gst_poll_fd_can_write (poll, &pollfd->pollfd)))
+        {
+          pollfd->callback (self, pollfd);
+        }
 
+      }
     }
     timeout = self->poll_timeout;
+    g_static_rec_mutex_unlock (&self->mutex);
   }
 
   return NULL;
 }
 
+
 static void
 shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd)
 {
   gint i;
 
+  g_debug ("Shutting down pollfd %p", pollfd);
 
   if (!gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
     close (pollfd->pollfd.fd);
-  gst_poll_remove_fd (self->poll, &pollfd->pollfd);
+  g_debug ("gst poll remove : %d",
+      gst_poll_remove_fd (self->poll, &pollfd->pollfd));
   for (i = 0; i < self->pollfds->len; i++)
   {
-    FsMsnPollFD *p = &g_array_index(self->pollfds, FsMsnPollFD, i);
+    FsMsnPollFD *p = g_array_index(self->pollfds, FsMsnPollFD *, i);
     if (p == pollfd)
     {
       g_array_remove_index_fast (self->pollfds, i);
@@ -666,3 +815,35 @@ shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd)
   }
   gst_poll_restart (self->poll);
 }
+
+static FsMsnPollFD *
+add_pollfd (FsMsnConnection *self, int fd, PollFdCallback callback,
+    gboolean read, gboolean write)
+{
+  FsMsnPollFD *pollfd = g_slice_new0 (FsMsnPollFD);
+  gst_poll_fd_init (&pollfd->pollfd);
+  pollfd->pollfd.fd = fd;
+  pollfd->want_read = read;
+  pollfd->want_write = write;
+  pollfd->status = FS_MSN_STATUS_AUTH;
+
+  gst_poll_add_fd (self->poll, &pollfd->pollfd);
+
+  gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, read);
+  gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, write);
+  pollfd->callback = callback;
+
+        g_debug ("ADD_POLLFD %p (%p) - error %d, close %d, read %d-%d, write %d-%d",
+            self->pollfds, pollfd,
+            gst_poll_fd_has_error (self->poll, &pollfd->pollfd),
+            gst_poll_fd_has_closed (self->poll, &pollfd->pollfd),
+            pollfd->want_read,
+            gst_poll_fd_can_read (self->poll, &pollfd->pollfd),
+            pollfd->want_write,
+            gst_poll_fd_can_write (self->poll, &pollfd->pollfd));
+
+  g_array_append_val (self->pollfds, pollfd);
+  gst_poll_restart (self->poll);
+  return pollfd;
+}
+// (gdb) p ((FsMsnPollFD **) ((FsMsnConnection *)data)->pollfds->data)[1]
diff --git a/gst/fsmsnconference/fs-msn-connection.h b/gst/fsmsnconference/fs-msn-connection.h
index 4ee235d..a9cc39c 100644
--- a/gst/fsmsnconference/fs-msn-connection.h
+++ b/gst/fsmsnconference/fs-msn-connection.h
@@ -74,19 +74,23 @@ struct _FsMsnConnection
   GstClockTime poll_timeout;
   GstPoll *poll;
   GArray *pollfds;
-
   gboolean disposed;
+  GStaticRecMutex mutex;
 };
 
+typedef void (*NewLocalCandidateCB) (FsCandidate *candidate, gpointer data);
+
 GType fs_msn_connection_get_type (void);
 
 FsMsnConnection *fs_msn_connection_new (guint session_id, guint initial_port);
 
-gboolean fs_msn_connection_gather_local_candidates (FsMsnConnection *connection);
+gboolean fs_msn_connection_gather_local_candidates (FsMsnConnection *connection,
+    NewLocalCandidateCB cb, gpointer data);
 
 gboolean fs_msn_connection_set_remote_candidates (FsMsnConnection *connection,
     GList *candidates, GError **error);
 
+
 G_END_DECLS
 
 #endif /* __FS_MSN_CONNECTION_H__ */
diff --git a/gst/fsmsnconference/fs-msn-session.c b/gst/fsmsnconference/fs-msn-session.c
index af5f566..e567c2e 100644
--- a/gst/fsmsnconference/fs-msn-session.c
+++ b/gst/fsmsnconference/fs-msn-session.c
@@ -434,8 +434,8 @@ fs_msn_session_new (FsMediaType media_type,
 
   if (!session)
   {
-    self->priv->construction_error = g_error_new (FS_ERROR,
-        FS_ERROR_CONSTRUCTION, "Could not create object");
+    *error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not create object");
   }
   else if (session->priv->construction_error)
   {
-- 
1.5.6.5




More information about the farsight-commits mailing list