[farsight2/master] Move all connection related stuff into its own object
Youness Alaoui
youness.alaoui at collabora.co.uk
Tue Jul 14 09:50:32 PDT 2009
---
gst/fsmsnconference/Makefile.am | 2 +
gst/fsmsnconference/fs-msn-conference.c | 28 +-
gst/fsmsnconference/fs-msn-connection.c | 668 ++++++++++++++++++++++++++++++
gst/fsmsnconference/fs-msn-connection.h | 92 +++++
gst/fsmsnconference/fs-msn-session.c | 53 +++-
gst/fsmsnconference/fs-msn-stream.c | 671 +------------------------------
6 files changed, 826 insertions(+), 688 deletions(-)
create mode 100644 gst/fsmsnconference/fs-msn-connection.c
create mode 100644 gst/fsmsnconference/fs-msn-connection.h
diff --git a/gst/fsmsnconference/Makefile.am b/gst/fsmsnconference/Makefile.am
index 7579f4d..5aca5db 100644
--- a/gst/fsmsnconference/Makefile.am
+++ b/gst/fsmsnconference/Makefile.am
@@ -4,6 +4,7 @@ libfsmsnconference_la_SOURCES = \
fs-msn-conference.c \
fs-msn-participant.c \
fs-msn-session.c \
+ fs-msn-connection.c \
fs-msn-stream.c
BUILT_SOURCES =
@@ -12,6 +13,7 @@ noinst_HEADERS = \
fs-msn-conference.h \
fs-msn-participant.h \
fs-msn-session.h \
+ fs-msn-connection.h \
fs-msn-stream.h
EXTRA_libfsmsnconference_la_SOURCES =
diff --git a/gst/fsmsnconference/fs-msn-conference.c b/gst/fsmsnconference/fs-msn-conference.c
index 2a7d45d..d3a3c27 100644
--- a/gst/fsmsnconference/fs-msn-conference.c
+++ b/gst/fsmsnconference/fs-msn-conference.c
@@ -171,7 +171,6 @@ static void
fs_msn_conference_class_init (FsMsnConferenceClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
FsBaseConferenceClass *baseconf_class = FS_BASE_CONFERENCE_CLASS (klass);
g_type_class_add_private (klass, sizeof (FsMsnConferencePrivate));
@@ -189,14 +188,6 @@ fs_msn_conference_class_init (FsMsnConferenceClass * klass)
GST_DEBUG_FUNCPTR (fs_msn_conference_set_property);
gobject_class->get_property =
GST_DEBUG_FUNCPTR (fs_msn_conference_get_property);
-
- gst_element_class_set_details (gstelement_class, &fs_msn_conference_details);
-
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&fs_msn_conference_sink_template));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&fs_msn_conference_src_template));
-
g_object_class_install_property (gobject_class,PROP_LOCAL_MSNADD,
g_param_spec_string ("local_address", "Msn Address",
"The local contact address for the MSN sessions",
@@ -206,6 +197,14 @@ fs_msn_conference_class_init (FsMsnConferenceClass * klass)
static void
fs_msn_conference_base_init (gpointer g_class)
{
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
+
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&fs_msn_conference_sink_template));
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&fs_msn_conference_src_template));
+
+ gst_element_class_set_details (gstelement_class, &fs_msn_conference_details);
}
static void
@@ -229,7 +228,7 @@ fs_msn_conference_get_property (GObject *object,
{
case PROP_LOCAL_MSNADD:
GST_OBJECT_LOCK (self);
- g_value_set_string (value,self->priv->local_address);
+ g_value_set_string (value, self->priv->local_address);
GST_OBJECT_UNLOCK (self);
break;
default:
@@ -296,6 +295,7 @@ fs_msn_conference_new_session (FsBaseConference *conf,
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Only video supported for msn webcam");
+ return NULL;
}
GST_OBJECT_LOCK (self);
@@ -311,13 +311,13 @@ fs_msn_conference_new_session (FsBaseConference *conf,
new_session = fs_msn_session_new (media_type, self, 1, error);
- GST_OBJECT_LOCK (self);
if (new_session)
{
+ GST_OBJECT_LOCK (self);
self->priv->session = new_session;
g_object_weak_ref (G_OBJECT (new_session), _remove_session, self);
+ GST_OBJECT_UNLOCK (self);
}
- GST_OBJECT_UNLOCK (self);
return FS_SESSION (new_session);
}
@@ -344,13 +344,13 @@ fs_msn_conference_new_participant (FsBaseConference *conf,
new_participant = fs_msn_participant_new (cname);
- GST_OBJECT_LOCK (self);
if (new_participant)
{
+ GST_OBJECT_LOCK (self);
self->priv->participant = new_participant;
g_object_weak_ref (G_OBJECT (new_participant), _remove_participant, self);
+ GST_OBJECT_UNLOCK (self);
}
- GST_OBJECT_UNLOCK (self);
return FS_PARTICIPANT (new_participant);
diff --git a/gst/fsmsnconference/fs-msn-connection.c b/gst/fsmsnconference/fs-msn-connection.c
new file mode 100644
index 0000000..ffb43b2
--- /dev/null
+++ b/gst/fsmsnconference/fs-msn-connection.c
@@ -0,0 +1,668 @@
+/*
+ * Farsight2 - Farsight MSN Connection
+ *
+ * Copyright 2008 Richard Spiers <richard.spiers at gmail.com>
+ * Copyright 2007 Nokia Corp.
+ * Copyright 2007 Collabora Ltd.
+ * @author: Olivier Crete <olivier.crete at collabora.co.uk>
+ *
+ * fs-msn-connection.c - A MSN Connection gobject
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "fs-msn-connection.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+
+#include <string.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <gst/gst.h>
+
+/* Signals */
+enum
+{
+ LAST_SIGNAL
+};
+
+/* props */
+enum
+{
+ PROP_0,
+};
+
+
+typedef struct _FsMsnPollFD FsMsnPollFD;
+
+struct _FsMsnPollFD {
+ GstPollFD pollfd;
+ gboolean want_read;
+ gboolean want_write;
+ void (*next_step) (FsMsnConnection *self, FsMsnPollFD *pollfd);
+};
+
+
+G_DEFINE_TYPE(FsMsnConnection, fs_msn_connection, G_TYPE_OBJECT);
+
+static void fs_msn_connection_dispose (GObject *object);
+static void fs_msn_connection_finalize (GObject *object);
+
+static void main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+
+static void successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+
+static void fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
+
+static gboolean fs_msn_connection_attempt_connection (FsMsnConnection *connection,
+ gchar const *ip,
+ guint16 port);
+
+static gboolean fs_msn_authenticate_outgoing (FsMsnConnection *connection,
+ gint fd);
+
+static void fs_msn_open_listening_port (FsMsnConnection *connection,
+ guint16 port);
+
+static gpointer connection_polling_thread (gpointer data);
+
+static void shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd);
+
+static GObjectClass *parent_class = NULL;
+
+static void
+fs_msn_connection_class_init (FsMsnConnectionClass *klass)
+{
+ GObjectClass *gobject_class = (GObjectClass *) klass;
+
+ parent_class = g_type_class_peek_parent (klass);
+
+ gobject_class->dispose = fs_msn_connection_dispose;
+ gobject_class->finalize = fs_msn_connection_finalize;
+}
+
+static void
+fs_msn_connection_init (FsMsnConnection *self)
+{
+ /* member init */
+
+ self->disposed = FALSE;
+
+ self->poll_timeout = GST_CLOCK_TIME_NONE;
+ self->poll = gst_poll_new (TRUE);
+ gst_poll_set_controllable (self->poll, TRUE);
+ self->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
+}
+
+static void
+fs_msn_connection_dispose (GObject *object)
+{
+ FsMsnConnection *self = FS_MSN_CONNECTION (object);
+
+ /* If dispose did already run, return. */
+ if (self->disposed)
+ return;
+
+ if (self->polling_thread)
+ {
+ gst_poll_set_flushing (self->poll, TRUE);
+ g_thread_join (self->polling_thread);
+ self->polling_thread = NULL;
+ }
+
+ if (self->local_recipient_id)
+ g_free (self->local_recipient_id);
+ if (self->remote_recipient_id)
+ g_free (self->remote_recipient_id);
+
+ /* Make sure dispose does not run twice. */
+ self->disposed = TRUE;
+
+ parent_class->dispose (object);
+}
+
+static void
+fs_msn_connection_finalize (GObject *object)
+{
+ FsMsnConnection *self = FS_MSN_CONNECTION (object);
+ guint i;
+
+ /* TODO : why not in dispose */
+ gst_poll_free (self->poll);
+
+ for (i = 0; i < self->pollfds->len; i++)
+ close (g_array_index(self->pollfds, FsMsnPollFD, i).pollfd.fd);
+ g_array_free (self->pollfds, TRUE);
+
+ parent_class->finalize (object);
+}
+
+/**
+ * fs_msn_connection_new:
+ * @session: The #FsMsnSession this connection is a child of
+ * @participant: The #FsMsnParticipant this connection is for
+ * @direction: the initial #FsDirection for this connection
+ *
+ *
+ * This function create a new connection
+ *
+ * Returns: the newly created string or NULL on error
+ */
+
+FsMsnConnection *
+fs_msn_connection_new (guint session_id, guint initial_port)
+{
+ FsMsnConnection *self = g_object_new (FS_TYPE_MSN_CONNECTION, NULL);
+
+ if (self) {
+ self->session_id = session_id;
+ self->initial_port = initial_port;
+ }
+
+ return self;
+}
+
+gboolean
+fs_msn_connection_gather_local_candidates (FsMsnConnection *self)
+{
+
+ fs_msn_open_listening_port (self, self->initial_port);
+
+ self->polling_thread = g_thread_create (connection_polling_thread,
+ self, TRUE, NULL);
+
+ return self->polling_thread != NULL;
+}
+
+/**
+ * fs_msn_connection_set_remote_candidate:
+ */
+gboolean
+fs_msn_connection_set_remote_candidates (FsMsnConnection *self,
+ GList *candidates, GError **error)
+{
+ GList *item = NULL;
+
+ for (item = candidates; item; item = g_list_next (item))
+ {
+ FsCandidate *candidate = item->data;
+
+ if (!candidate->ip || !candidate->port)
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+ "The candidate passed does not contain a valid ip or port");
+ return FALSE;
+ }
+ if (!candidate->foundation)
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+ "The candidate passed does not have a foundation (MSN recipient ID)");
+ return FALSE;
+ }
+ if (self->remote_recipient_id) {
+ if (g_strcmp0 (candidate->foundation, self->remote_recipient_id) != 0)
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+ "The candidates do not have the same recipient ID");
+ return FALSE;
+ }
+ } else {
+ self->remote_recipient_id = g_strdup (candidate->foundation);
+ }
+ fs_msn_connection_attempt_connection(self, candidate->ip, candidate->port);
+ }
+
+ return TRUE;
+}
+
+
+static void
+main_fd_closed_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+ g_message ("disconnection on video feed");
+ /* IXME - How to handle the disconnection of the connection
+ Destroy the elements involved?
+ Set the state to Null ?
+ */
+}
+
+static void
+successfull_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+ gint error;
+ socklen_t option_len;
+
+ g_message ("handler called on fd %d", pollfd->pollfd.fd);
+
+ errno = 0;
+ if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ {
+ g_message ("connecton closed or error");
+ goto error;
+ }
+
+ option_len = sizeof(error);
+
+ /* Get the error option */
+ if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
+ {
+ g_warning ("getsockopt() failed");
+ goto error;
+ }
+
+ /* Check if there is an error */
+ if (error)
+ {
+ g_message ("getsockopt gave an error : %d", error);
+ goto error;
+ }
+
+ /* Remove NON BLOCKING MODE */
+ if (fcntl(pollfd->pollfd.fd, F_SETFL,
+ fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
+ {
+ g_warning ("fcntl() failed");
+ goto error;
+ }
+
+ g_message ("Got connection on fd %d", pollfd->pollfd.fd);
+
+ if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
+ {
+ g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
+
+ // success! we need to shutdown/close all other channels
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ if (pollfd != pollfd2)
+ {
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
+ }
+ }
+
+ /* TODO : callback */
+
+ pollfd->want_read = FALSE;
+ pollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+ pollfd->next_step = main_fd_closed_cb;
+ return;
+ }
+ else
+ {
+ g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
+ }
+
+ /* Error */
+ error:
+ g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
+ // find, shutdown and remove channel from fdlist
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ if (pollfd == pollfd2)
+ {
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
+ }
+ }
+
+ return;
+}
+
+static gboolean
+fs_msn_connection_attempt_connection (FsMsnConnection *connection,
+ const gchar *ip,
+ guint16 port)
+{
+ FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+ FsMsnPollFD *pollfd = NULL;
+ gint fd = -1;
+ struct sockaddr_in theiraddr;
+ memset(&theiraddr, 0, sizeof(theiraddr));
+
+ if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
+ {
+ // show error
+ g_message ("could not create socket!");
+ return FALSE;
+ }
+
+ // set non-blocking mode
+ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
+
+ theiraddr.sin_family = AF_INET;
+ theiraddr.sin_addr.s_addr = inet_addr (ip);
+ theiraddr.sin_port = htons (port);
+
+ g_message ("Attempting connection to %s %d on socket %d", ip, port, fd);
+ // this is non blocking, the return value isn't too usefull
+ gint ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
+ if (ret < 0)
+ {
+ if (errno != EINPROGRESS)
+ {
+ close (fd);
+ return FALSE;
+ }
+ }
+ g_message("ret %d %d %s", ret, errno, strerror(errno));
+
+ pollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&pollfd->pollfd);
+ pollfd->pollfd.fd = fd;
+ pollfd->want_read = TRUE;
+ pollfd->want_write = TRUE;
+ gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
+ pollfd->next_step = successfull_connection_cb;
+ g_array_append_val (self->pollfds, pollfd);
+
+ return TRUE;
+}
+
+static gboolean
+fs_msn_authenticate_incoming (FsMsnConnection *connection, gint fd)
+{
+ FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+ if (fd != 0)
+ {
+ gchar str[400];
+ gchar check[400];
+
+ memset(str, 0, sizeof(str));
+ if (recv(fd, str, sizeof(str), 0) != -1)
+ {
+ g_message ("Got %s, checking if it's auth", str);
+ sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
+ self->local_recipient_id, self->session_id);
+ if (strcmp (str, check) != 0)
+ {
+ // send our connected message also
+ memset(str, 0, sizeof(str));
+ sprintf(str, "connected\r\n\r\n");
+ send(fd, str, strlen(str), 0);
+
+ // now we get connected
+ memset(str, 0, sizeof(str));
+ if (recv(fd, str, sizeof(str), 0) != -1)
+ {
+ if (strcmp (str, "connected\r\n\r\n") == 0)
+ {
+ g_message ("Authentication successfull");
+ return TRUE;
+ }
+ }
+ }
+ }
+ else
+ {
+ perror("auth");
+ }
+ }
+ return FALSE;
+}
+
+static void
+fd_accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+ struct sockaddr_in in;
+ int fd = -1;
+ socklen_t n = sizeof (in);
+ FsMsnPollFD *newpollfd = NULL;
+
+ if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ {
+ g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
+ goto error;
+ }
+
+ if ((fd = accept(pollfd->pollfd.fd,
+ (struct sockaddr*) &in, &n)) == -1)
+ {
+ g_message ("Error while running accept() %d", errno);
+ return;
+ }
+
+ /* Remove NON BLOCKING MODE */
+ if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
+ {
+ g_warning ("fcntl() failed");
+ goto error;
+ }
+
+ // now we try to auth
+ if (fs_msn_authenticate_incoming(self,fd))
+ {
+ g_message ("Authenticated incoming successfully fd %d", fd);
+
+ // success! we need to shutdown/close all other channels
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
+ }
+ /* TODO callback */
+
+ newpollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&newpollfd->pollfd);
+ newpollfd->pollfd.fd = fd;
+ newpollfd->want_read = FALSE;
+ newpollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->poll, &newpollfd->pollfd, FALSE);
+ gst_poll_fd_ctl_write (self->poll, &newpollfd->pollfd, FALSE);
+ newpollfd->next_step = main_fd_closed_cb;
+ g_array_append_val (self->pollfds, newpollfd);
+ return;
+ }
+
+ /* Error */
+ error:
+ g_message ("Got error from fd %d, closing", fd);
+ // find, shutdown and remove channel from fdlist
+ gint i;
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd2 = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ if (pollfd == pollfd2)
+ {
+ g_message ("closing fd %d", pollfd2->pollfd.fd);
+ shutdown_fd (self, pollfd2);
+ i--;
+ }
+ }
+
+ if (fd > 0)
+ close (fd);
+
+ return;
+}
+
+static void
+fs_msn_open_listening_port (FsMsnConnection *connection,
+ guint16 port)
+{
+ FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+ gint fd = -1;
+ FsMsnPollFD *pollfd = NULL;
+ struct sockaddr_in myaddr;
+ memset(&myaddr, 0, sizeof(myaddr));
+
+ g_message ("Attempting to listen on port %d.....\n",port);
+
+ if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
+ {
+ // show error
+ g_message ("could not create socket!");
+ return;
+ }
+
+ // set non-blocking mode
+ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
+ myaddr.sin_family = AF_INET;
+ myaddr.sin_port = htons (port);
+ // bind
+ if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
+ {
+ close (fd);
+ return;
+ }
+
+
+ /* Listen */
+ if (listen(fd, 3) != 0)
+ {
+ close (fd);
+ return;
+ }
+ pollfd = g_slice_new0 (FsMsnPollFD);
+ gst_poll_fd_init (&pollfd->pollfd);
+ pollfd->pollfd.fd = fd;
+ pollfd->want_read = TRUE;
+ pollfd->want_write = FALSE;
+ gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, TRUE);
+ gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
+ pollfd->next_step = fd_accept_connection_cb;
+
+ g_array_append_val (self->pollfds, pollfd);
+ g_message ("Listening on port %d\n",port);
+
+}
+
+// Authenticate ourselves when connecting out
+static gboolean
+fs_msn_authenticate_outgoing (FsMsnConnection *connection, gint fd)
+{
+ FsMsnConnection *self = FS_MSN_CONNECTION (connection);
+ gchar str[400];
+ memset(str, 0, sizeof(str));
+ if (fd != 0)
+ {
+ g_message ("Authenticating connection on %d...", fd);
+ g_message ("sending : recipientid=%s&sessionid=%d\r\n\r\n",
+ self->remote_recipient_id, self->session_id);
+ sprintf(str, "recipientid=%s&sessionid=%d\r\n\r\n",
+ self->remote_recipient_id, self->session_id);
+ if (send(fd, str, strlen(str), 0) == -1)
+ {
+ g_message("sending failed");
+ perror("auth");
+ }
+
+ memset(str, 0, sizeof(str));
+ if (recv(fd, str, 13, 0) != -1)
+ {
+ g_message ("Got %s, checking if it's auth", str);
+ // we should get a connected message now
+ if (strcmp (str, "connected\r\n\r\n") == 0)
+ {
+ // send our connected message also
+ memset(str, 0, sizeof(str));
+ sprintf(str, "connected\r\n\r\n");
+ send(fd, str, strlen(str), 0);
+ g_message ("Authentication successfull");
+ return TRUE;
+ }
+ }
+ else
+ {
+ perror("auth");
+ }
+ }
+ return FALSE;
+}
+
+
+static gpointer
+connection_polling_thread (gpointer data)
+{
+ FsMsnConnection *self = data;
+ gint ret;
+ GstClockTime timeout;
+
+ timeout = self->poll_timeout;
+
+ while ((ret = gst_poll_wait (self->poll, timeout)) >= 0)
+ {
+ if (ret > 0)
+ {
+ gint i;
+
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *pollfd = &g_array_index(self->pollfds,
+ FsMsnPollFD, i);
+
+ if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
+ gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ {
+ pollfd->next_step (self, pollfd);
+ shutdown_fd (self, pollfd);
+ i--;
+ continue;
+ }
+ if ((pollfd->want_read &&
+ gst_poll_fd_can_read (self->poll, &pollfd->pollfd)) ||
+ (pollfd->want_write &&
+ gst_poll_fd_can_write (self->poll, &pollfd->pollfd)))
+ pollfd->next_step (self, pollfd);
+ }
+
+ }
+ timeout = self->poll_timeout;
+ }
+
+ return NULL;
+}
+
+static void
+shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd)
+{
+ gint i;
+
+
+ if (!gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
+ close (pollfd->pollfd.fd);
+ gst_poll_remove_fd (self->poll, &pollfd->pollfd);
+ for (i = 0; i < self->pollfds->len; i++)
+ {
+ FsMsnPollFD *p = &g_array_index(self->pollfds, FsMsnPollFD, i);
+ if (p == pollfd)
+ {
+ g_array_remove_index_fast (self->pollfds, i);
+ break;
+ }
+
+ }
+ gst_poll_restart (self->poll);
+}
diff --git a/gst/fsmsnconference/fs-msn-connection.h b/gst/fsmsnconference/fs-msn-connection.h
new file mode 100644
index 0000000..4ee235d
--- /dev/null
+++ b/gst/fsmsnconference/fs-msn-connection.h
@@ -0,0 +1,92 @@
+/*
+ * Farsight2 - Farsight MSN Stream
+ *
+ * Copyright 2008 Richard Spiers <richard.spiers at gmail.com>
+ * Copyright 2007 Nokia Corp.
+ * Copyright 2007 Collabora Ltd.
+ * @author: Olivier Crete <olivier.crete at collabora.co.uk>
+ * @author: Youness Alaoui <youness.alaoui at collabora.co.uk>
+ *
+ * fs-msn-connection.h - An MSN Connection class
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef __FS_MSN_CONNECTION_H__
+#define __FS_MSN_CONNECTION_H__
+
+#include "fs-msn-participant.h"
+#include "fs-msn-session.h"
+
+G_BEGIN_DECLS
+
+/* TYPE MACROS */
+#define FS_TYPE_MSN_CONNECTION \
+ (fs_msn_connection_get_type ())
+#define FS_MSN_CONNECTION(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj), FS_TYPE_MSN_CONNECTION, FsMsnConnection))
+#define FS_MSN_CONNECTION_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass), FS_TYPE_MSN_CONNECTION, FsMsnConnectionClass))
+#define FS_IS_MSN_CONNECTION(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj), FS_TYPE_MSN_CONNECTION))
+#define FS_IS_MSN_CONNECTION_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass), FS_TYPE_MSN_CONNECTION))
+#define FS_MSN_CONNECTION_GET_CLASS(obj) \
+ (G_TYPE_INSTANCE_GET_CLASS ((obj), FS_TYPE_MSN_CONNECTION, FsMsnConnectionClass))
+#define FS_MSN_CONNECTION_CAST(obj) ((FsMsnConnection*) (obj))
+
+typedef struct _FsMsnConnection FsMsnConnection;
+typedef struct _FsMsnConnectionClass FsMsnConnectionClass;
+typedef struct _FsMsnConnectionPrivate FsMsnConnectionPrivate;
+
+
+struct _FsMsnConnectionClass
+{
+ GObjectClass parent_class;
+};
+
+/**
+ * FsMsnConnection:
+ *
+ */
+struct _FsMsnConnection
+{
+ GObject parent;
+
+ gchar *local_recipient_id;
+ gchar *remote_recipient_id;
+ gint session_id;
+ gint initial_port;
+
+ GThread *polling_thread;
+ GstClockTime poll_timeout;
+ GstPoll *poll;
+ GArray *pollfds;
+
+ gboolean disposed;
+};
+
+GType fs_msn_connection_get_type (void);
+
+FsMsnConnection *fs_msn_connection_new (guint session_id, guint initial_port);
+
+gboolean fs_msn_connection_gather_local_candidates (FsMsnConnection *connection);
+
+gboolean fs_msn_connection_set_remote_candidates (FsMsnConnection *connection,
+ GList *candidates, GError **error);
+
+G_END_DECLS
+
+#endif /* __FS_MSN_CONNECTION_H__ */
diff --git a/gst/fsmsnconference/fs-msn-session.c b/gst/fsmsnconference/fs-msn-session.c
index eb790eb..2d4ddbe 100644
--- a/gst/fsmsnconference/fs-msn-session.c
+++ b/gst/fsmsnconference/fs-msn-session.c
@@ -56,7 +56,12 @@ enum
PROP_0,
PROP_MEDIA_TYPE,
PROP_ID,
+ PROP_SINK_PAD,
PROP_CODEC_PREFERENCES,
+ PROP_CODECS,
+ PROP_CODECS_WITHOUT_CONFIG,
+ PROP_CURRENT_SEND_CODEC,
+ PROP_CODECS_READY,
PROP_CONFERENCE
};
@@ -129,6 +134,19 @@ fs_msn_session_class_init (FsMsnSessionClass *klass)
PROP_MEDIA_TYPE, "media-type");
g_object_class_override_property (gobject_class,
PROP_ID, "id");
+ g_object_class_override_property (gobject_class,
+ PROP_SINK_PAD, "sink-pad");
+
+ g_object_class_override_property (gobject_class,
+ PROP_CODEC_PREFERENCES, "codec-preferences");
+ g_object_class_override_property (gobject_class,
+ PROP_CODECS, "codecs");
+ g_object_class_override_property (gobject_class,
+ PROP_CODECS_WITHOUT_CONFIG, "codecs-without-config");
+ g_object_class_override_property (gobject_class,
+ PROP_CURRENT_SEND_CODEC, "current-send-codec");
+ g_object_class_override_property (gobject_class,
+ PROP_CODECS_READY, "codecs-ready");
g_object_class_install_property (gobject_class,
PROP_CONFERENCE,
@@ -223,6 +241,30 @@ fs_msn_session_get_property (GObject *object,
case PROP_CONFERENCE:
g_value_set_object (value, self->priv->conference);
break;
+ case PROP_SINK_PAD:
+ g_value_set_object (value, self->priv->media_sink_pad);
+ break;
+ case PROP_CODECS_READY:
+ g_value_set_boolean (value, TRUE);
+ break;
+ case PROP_CODEC_PREFERENCES:
+ case PROP_CODECS:
+ case PROP_CODECS_WITHOUT_CONFIG:
+ {
+ GList *codecs = NULL;
+ FsCodec *mimic_codec = fs_codec_new (FS_CODEC_ID_ANY, "mimic",
+ FS_MEDIA_TYPE_VIDEO, 0);
+ codecs = g_list_append (codecs, mimic_codec);
+ g_value_take_boxed (value, codecs);
+ }
+ break;
+ case PROP_CURRENT_SEND_CODEC:
+ {
+ FsCodec *send_codec = fs_codec_new (FS_CODEC_ID_ANY, "mimic",
+ FS_MEDIA_TYPE_VIDEO, 0);
+ g_value_take_boxed (value, send_codec);
+ break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -366,11 +408,14 @@ fs_msn_session_new_stream (FsSession *session,
new_stream = FS_STREAM_CAST (fs_msn_stream_new (self, msnparticipant,
direction,self->priv->conference,error));
- g_object_weak_ref (G_OBJECT (new_stream), _remove_stream, self);
+ if (new_stream)
+ {
+ GST_OBJECT_LOCK (self);
+ self->priv->stream = (FsMsnStream *) new_stream;
+ g_object_weak_ref (G_OBJECT (new_stream), _remove_stream, self);
+ GST_OBJECT_UNLOCK (self);
+ }
- FS_MSN_SESSION_LOCK (self);
- self->priv->stream = (FsMsnStream *) new_stream;
- FS_MSN_SESSION_UNLOCK (self);
return new_stream;
}
diff --git a/gst/fsmsnconference/fs-msn-stream.c b/gst/fsmsnconference/fs-msn-stream.c
index 970c293..a4ed6d0 100644
--- a/gst/fsmsnconference/fs-msn-stream.c
+++ b/gst/fsmsnconference/fs-msn-stream.c
@@ -56,26 +56,10 @@ enum
PROP_DIRECTION,
PROP_PARTICIPANT,
PROP_SESSION,
- PROP_SINK_PAD,
- PROP_SRC_PAD,
- PROP_CONFERENCE,
- PROP_L_RID,
- PROP_L_SID,
- PROP_R_RID,
- PROP_R_SID,
- PROP_PORT
+ PROP_CONFERENCE
};
-typedef struct _FsMsnPollFD FsMsnPollFD;
-
-struct _FsMsnPollFD {
- GstPollFD pollfd;
- gboolean want_read;
- gboolean want_write;
- void (*next_step) (FsMsnStream *self, FsMsnPollFD *pollfd);
-};
-
struct _FsMsnStreamPrivate
{
@@ -85,14 +69,6 @@ struct _FsMsnStreamPrivate
FsMsnConference *conference;
GstElement *media_fd_src,*media_fd_sink,*send_valve;
GstPad *sink_pad,*src_pad;
- gint local_recipientid, local_sessionid;
- gint remote_recipientid, remote_sessionid;
- gint port;
-
- GThread *polling_thread;
- GstClockTime poll_timeout;
- GstPoll *poll;
- GArray *pollfds;
GError *construction_error;
@@ -127,26 +103,6 @@ static gboolean fs_msn_stream_set_remote_candidate (FsMsnStream *stream,
FsCandidate *candidate,
GError **error);
-static void main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *fd);
-
-static void successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
-
-static void fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *fd);
-
-static gboolean fs_msn_stream_attempt_connection (FsMsnStream *stream,
- gchar const *ip,
- guint16 port);
-
-static gboolean fs_msn_authenticate_outgoing (FsMsnStream *stream,
- gint fd);
-
-static void fs_msn_open_listening_port (FsMsnStream *stream,
- guint16 port);
-
-static gpointer connection_polling_thread (gpointer data);
-
-static void shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd);
-
static GObjectClass *parent_class = NULL;
static void
@@ -181,67 +137,12 @@ fs_msn_stream_class_init (FsMsnStreamClass *klass)
"session");
g_object_class_install_property (gobject_class,
- PROP_SINK_PAD,
- g_param_spec_object ("sink-pad",
- "A gstreamer sink pad for this stream",
- "A pad used for sending data on this stream",
- GST_TYPE_PAD,
- G_PARAM_READABLE));
-
- g_object_class_install_property (gobject_class,
- PROP_SRC_PAD,
- g_param_spec_object ("src-pad",
- "A gstreamer src pad for this stream",
- "A pad used for reading data from this stream",
- GST_TYPE_PAD,
- G_PARAM_READABLE));
-
- g_object_class_install_property (gobject_class,
PROP_CONFERENCE,
g_param_spec_object ("conference",
"The Conference this stream refers to",
"This is a conveniance pointer for the Conference",
FS_TYPE_MSN_CONFERENCE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE));
-
- g_object_class_install_property (gobject_class,
- PROP_L_RID,
- g_param_spec_uint ("local-recipientid",
- "The local recipientid used for this stream",
- "The session ID used for this stream",
- 0, G_MAXUINT, 0,
- G_PARAM_READWRITE));
-
- g_object_class_install_property (gobject_class,
- PROP_L_SID,
- g_param_spec_uint ("local-sessionid",
- "The local sessionid used for this stream",
- "The session ID used for this stream",
- 0, G_MAXUINT, 0,
- G_PARAM_READWRITE));
-
- g_object_class_install_property (gobject_class,
- PROP_R_RID,
- g_param_spec_uint ("remote-recipientid",
- "The remote recipientid used for this stream",
- "The session ID used for this stream",
- 0, G_MAXUINT, 0,
- G_PARAM_READWRITE));
-
- g_object_class_install_property (gobject_class,
- PROP_R_SID,
- g_param_spec_uint ("remote-sessionid",
- "The remote sessionid used for this stream",
- "The session ID used for this stream",
- 0, G_MAXUINT, 0,
- G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class,
- PROP_PORT,
- g_param_spec_uint ("local-port",
- "The local port used for this stream",
- "The local port used for this stream",
- 0, G_MAXUINT, 0,
- G_PARAM_READWRITE));
}
static void
@@ -256,10 +157,6 @@ fs_msn_stream_init (FsMsnStream *self)
self->priv->direction = FS_DIRECTION_NONE;
- self->priv->poll_timeout = GST_CLOCK_TIME_NONE;
- self->priv->poll = gst_poll_new (TRUE);
- gst_poll_set_controllable (self->priv->poll, TRUE);
- self->priv->pollfds = g_array_new (TRUE, TRUE, sizeof(FsMsnPollFD));
}
static void
@@ -273,13 +170,6 @@ fs_msn_stream_dispose (GObject *object)
return;
}
- if (self->priv->polling_thread)
- {
- gst_poll_set_flushing (self->priv->poll, TRUE);
- g_thread_join (self->priv->polling_thread);
- self->priv->polling_thread = NULL;
- }
-
if (self->priv->participant)
{
g_object_unref (self->priv->participant);
@@ -301,15 +191,6 @@ fs_msn_stream_dispose (GObject *object)
static void
fs_msn_stream_finalize (GObject *object)
{
- FsMsnStream *self = FS_MSN_STREAM (object);
- guint i;
-
- gst_poll_free (self->priv->poll);
-
- for (i = 0; i < self->priv->pollfds->len; i++)
- close (g_array_index(self->priv->pollfds, FsMsnPollFD, i).pollfd.fd);
- g_array_free (self->priv->pollfds, TRUE);
-
parent_class->finalize (object);
}
@@ -333,30 +214,9 @@ fs_msn_stream_get_property (GObject *object,
case PROP_DIRECTION:
g_value_set_flags (value, self->priv->direction);
break;
- case PROP_SINK_PAD:
- g_value_set_object (value, self->priv->sink_pad);
- break;
- case PROP_SRC_PAD:
- g_value_set_object (value, self->priv->src_pad);
- break;
case PROP_CONFERENCE:
g_value_set_object (value, self->priv->conference);
break;
- case PROP_L_RID:
- g_value_set_uint (value, self->priv->local_recipientid);
- break;
- case PROP_L_SID:
- g_value_set_uint (value, self->priv->local_sessionid);
- break;
- case PROP_R_RID:
- g_value_set_uint (value, self->priv->remote_recipientid);
- break;
- case PROP_R_SID:
- g_value_set_uint (value, self->priv->remote_sessionid);
- break;
- case PROP_PORT:
- g_value_set_uint (value, self->priv->port);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -385,21 +245,6 @@ fs_msn_stream_set_property (GObject *object,
case PROP_CONFERENCE:
self->priv->conference = FS_MSN_CONFERENCE (g_value_dup_object (value));
break;
- case PROP_L_RID:
- self->priv->local_recipientid = g_value_get_uint (value);
- break;
- case PROP_L_SID:
- self->priv->local_sessionid = g_value_get_uint (value);
- break;
- case PROP_R_RID:
- self->priv->remote_recipientid = g_value_get_uint (value);
- break;
- case PROP_R_SID:
- self->priv->remote_sessionid = g_value_get_uint (value);
- break;
- case PROP_PORT:
- self->priv->port = g_value_get_uint (value);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -672,9 +517,6 @@ fs_msn_stream_constructed (GObject *object)
"Direction must be sending OR receiving");
}
- self->priv->polling_thread = g_thread_create (connection_polling_thread,
- self, TRUE, &self->priv->construction_error);
-
GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}
@@ -710,457 +552,14 @@ fs_msn_stream_set_remote_candidates (FsStream *stream, GList *candidates,
return TRUE;
}
-static void
-main_fd_closed_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
- g_message ("disconnection on video feed");
- /* IXME - How to handle the disconnection of the stream
- Destroy the elements involved?
- Set the state to Null ?
- */
-}
-
-static void
-successfull_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
- gint error;
- socklen_t option_len;
-
- g_message ("handler called on fd %d", pollfd->pollfd.fd);
-
- errno = 0;
- if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
- gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
- {
- g_message ("connecton closed or error");
- goto error;
- }
-
- option_len = sizeof(error);
-
- /* Get the error option */
- if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
- {
- g_warning ("getsockopt() failed");
- goto error;
- }
-
- /* Check if there is an error */
- if (error)
- {
- g_message ("getsockopt gave an error : %d", error);
- goto error;
- }
-
- /* Remove NON BLOCKING MODE */
- if (fcntl(pollfd->pollfd.fd, F_SETFL,
- fcntl(pollfd->pollfd.fd, F_GETFL) & ~O_NONBLOCK) != 0)
- {
- g_warning ("fcntl() failed");
- goto error;
- }
-
- g_message ("Got connection on fd %d", pollfd->pollfd.fd);
-
- if (fs_msn_authenticate_outgoing (self, pollfd->pollfd.fd))
- {
- g_message ("Authenticated outgoing successfully fd %d", pollfd->pollfd.fd);
-
- // success! we need to shutdown/close all other channels
- gint i;
- for (i = 0; i < self->priv->pollfds->len; i++)
- {
- FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
- if (pollfd != pollfd2)
- {
- g_message ("closing fd %d", pollfd2->pollfd.fd);
- shutdown_fd (self, pollfd2);
- i--;
- }
- }
-
- if (self->priv->direction == FS_DIRECTION_RECV)
- {
- GstState state;
- g_message("Setting media_fd_src on fd %d", pollfd->pollfd.fd);
-
- gst_element_get_state(self->priv->media_fd_src, &state, NULL,
- GST_CLOCK_TIME_NONE);
-
- if ( state > GST_STATE_READY)
- {
- g_message("Error: fdsrc in state above ready");
- gst_element_set_state(self->priv->media_fd_src, GST_STATE_READY);
- }
- g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", pollfd->pollfd.fd, NULL);
- gst_element_set_locked_state(self->priv->media_fd_src,FALSE);
- gst_element_sync_state_with_parent(self->priv->media_fd_src);
- }
- else if (self->priv->direction == FS_DIRECTION_SEND)
- {
- GstState state;
- g_message("Setting media_fd_sink on fd %d", pollfd->pollfd.fd);
-
- gst_element_get_state(self->priv->media_fd_sink, &state, NULL,
- GST_CLOCK_TIME_NONE);
-
- if ( state > GST_STATE_READY)
- {
- g_message("Error: fdsrc in state above ready");
- gst_element_set_state(self->priv->media_fd_sink, GST_STATE_READY);
- }
- g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", pollfd->pollfd.fd, NULL);
- gst_element_set_locked_state(self->priv->media_fd_sink,FALSE);
- gst_element_sync_state_with_parent(self->priv->media_fd_sink);
- g_object_set (G_OBJECT (self->priv->send_valve), "drop", FALSE, NULL);
-
- }
- pollfd->want_read = FALSE;
- pollfd->want_write = FALSE;
- gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, FALSE);
- gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
- pollfd->next_step = main_fd_closed_cb;
- return;
- }
- else
- {
- g_message ("Authentification failed on fd %d", pollfd->pollfd.fd);
- }
-
- /* Error */
- error:
- g_message ("Got error from fd %d, closing", pollfd->pollfd.fd);
- // find, shutdown and remove channel from fdlist
- gint i;
- for (i = 0; i < self->priv->pollfds->len; i++)
- {
- FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
- if (pollfd == pollfd2)
- {
- g_message ("closing fd %d", pollfd2->pollfd.fd);
- shutdown_fd (self, pollfd2);
- i--;
- }
- }
-
- return;
-}
-
-static gboolean
-fs_msn_stream_attempt_connection (FsMsnStream *stream,
- const gchar *ip,
- guint16 port)
-{
- FsMsnStream *self = FS_MSN_STREAM (stream);
- FsMsnPollFD *pollfd = NULL;
- gint fd = -1;
- struct sockaddr_in theiraddr;
- memset(&theiraddr, 0, sizeof(theiraddr));
-
- if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
- {
- // show error
- g_message ("could not create socket!");
- return FALSE;
- }
-
- // set non-blocking mode
- fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
-
- theiraddr.sin_family = AF_INET;
- theiraddr.sin_addr.s_addr = inet_addr (ip);
- theiraddr.sin_port = htons (port);
-
- g_message ("Attempting connection to %s %d on socket %d", ip, port, fd);
- // this is non blocking, the return value isn't too usefull
- gint ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
- if (ret < 0)
- {
- if (errno != EINPROGRESS)
- {
- close (fd);
- return FALSE;
- }
- }
- g_message("ret %d %d %s", ret, errno, strerror(errno));
-
- pollfd = g_slice_new0 (FsMsnPollFD);
- gst_poll_fd_init (&pollfd->pollfd);
- pollfd->pollfd.fd = fd;
- pollfd->want_read = TRUE;
- pollfd->want_write = TRUE;
- gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
- gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, TRUE);
- pollfd->next_step = successfull_connection_cb;
- g_array_append_val (self->priv->pollfds, pollfd);
-
- return TRUE;
-}
-
static gboolean
fs_msn_stream_set_remote_candidate (FsMsnStream *stream,
FsCandidate *candidate,
GError **error)
{
- FsMsnStream *self = FS_MSN_STREAM (stream);
- fs_msn_open_listening_port(self,self->priv->port); // FIXME - Should be done in the constructor, with a message passed onto the bus to give the port to the client program
- if (fs_msn_stream_attempt_connection(self,candidate->ip,candidate->port))
- return TRUE;
- return FALSE;
-}
-
-static gboolean
-fs_msn_authenticate_incoming (FsMsnStream *stream, gint fd)
-{
- FsMsnStream *self = FS_MSN_STREAM (stream);
- if (fd != 0)
- {
- gchar str[400];
- gchar check[400];
-
- memset(str, 0, sizeof(str));
- if (recv(fd, str, sizeof(str), 0) != -1)
- {
- g_message ("Got %s, checking if it's auth", str);
- sprintf(str, "recipientid=%d&sessionid=%d\r\n\r\n",
- self->priv->local_recipientid, self->priv->remote_sessionid);
- if (strcmp (str, check) != 0)
- {
- // send our connected message also
- memset(str, 0, sizeof(str));
- sprintf(str, "connected\r\n\r\n");
- send(fd, str, strlen(str), 0);
-
- // now we get connected
- memset(str, 0, sizeof(str));
- if (recv(fd, str, sizeof(str), 0) != -1)
- {
- if (strcmp (str, "connected\r\n\r\n") == 0)
- {
- g_message ("Authentication successfull");
- return TRUE;
- }
- }
- }
- }
- else
- {
- perror("auth");
- }
- }
- return FALSE;
-}
-
-static void
-fd_accept_connection_cb (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
- struct sockaddr_in in;
- int fd = -1;
- socklen_t n = sizeof (in);
- FsMsnPollFD *newpollfd = NULL;
-
- if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
- gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
- {
- g_message ("Error in accept socket : %d", pollfd->pollfd.fd);
- goto error;
- }
-
- if ((fd = accept(pollfd->pollfd.fd,
- (struct sockaddr*) &in, &n)) == -1)
- {
- g_message ("Error while running accept() %d", errno);
- return;
- }
-
- /* Remove NON BLOCKING MODE */
- if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) != 0)
- {
- g_warning ("fcntl() failed");
- goto error;
- }
-
- // now we try to auth
- if (fs_msn_authenticate_incoming(self,fd))
- {
- g_message ("Authenticated incoming successfully fd %d", fd);
-
- // success! we need to shutdown/close all other channels
- gint i;
- for (i = 0; i < self->priv->pollfds->len; i++)
- {
- FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
- g_message ("closing fd %d", pollfd2->pollfd.fd);
- shutdown_fd (self, pollfd2);
- i--;
- }
- if (self->priv->direction == FS_DIRECTION_RECV)
- {
- GstState state;
- g_message("Setting media_fd_src on fd %d",fd);
-
- gst_element_get_state(self->priv->media_fd_src, &state, NULL,
- GST_CLOCK_TIME_NONE);
-
- if ( state > GST_STATE_READY)
- {
- g_message("Error: fdsrc in state above ready");
- gst_element_set_state(self->priv->media_fd_src, GST_STATE_READY);
- }
- g_object_set (G_OBJECT(self->priv->media_fd_src), "fd", fd, NULL);
- gst_element_set_locked_state(self->priv->media_fd_src,FALSE);
- gst_element_sync_state_with_parent(self->priv->media_fd_src);
- }
- else if (self->priv->direction == FS_DIRECTION_SEND)
- {
-
- GstState state;
- g_message("Setting media_fd_sink on fd %d", fd);
-
- gst_element_get_state(self->priv->media_fd_sink, &state, NULL,
- GST_CLOCK_TIME_NONE);
-
- if ( state > GST_STATE_READY)
- {
- g_message("Error: fdsrc in state above ready");
- gst_element_set_state(self->priv->media_fd_sink, GST_STATE_READY);
- }
- g_object_set (G_OBJECT(self->priv->media_fd_sink), "fd", fd, NULL);
- gst_element_set_locked_state(self->priv->media_fd_sink,FALSE);
- gst_element_sync_state_with_parent(self->priv->media_fd_sink);
- g_object_set (G_OBJECT (self->priv->send_valve), "drop", FALSE, NULL);
-
- }
-
- newpollfd = g_slice_new0 (FsMsnPollFD);
- gst_poll_fd_init (&newpollfd->pollfd);
- newpollfd->pollfd.fd = fd;
- newpollfd->want_read = FALSE;
- newpollfd->want_write = FALSE;
- gst_poll_fd_ctl_read (self->priv->poll, &newpollfd->pollfd, FALSE);
- gst_poll_fd_ctl_write (self->priv->poll, &newpollfd->pollfd, FALSE);
- newpollfd->next_step = main_fd_closed_cb;
- g_array_append_val (self->priv->pollfds, newpollfd);
- return;
- }
-
- /* Error */
- error:
- g_message ("Got error from fd %d, closing", fd);
- // find, shutdown and remove channel from fdlist
- gint i;
- for (i = 0; i < self->priv->pollfds->len; i++)
- {
- FsMsnPollFD *pollfd2 = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
- if (pollfd == pollfd2)
- {
- g_message ("closing fd %d", pollfd2->pollfd.fd);
- shutdown_fd (self, pollfd2);
- i--;
- }
- }
-
- if (fd > 0)
- close (fd);
-
- return;
-}
-
-static void
-fs_msn_open_listening_port (FsMsnStream *stream,
- guint16 port)
-{
- FsMsnStream *self = FS_MSN_STREAM (stream);
- gint fd = -1;
- FsMsnPollFD *pollfd = NULL;
- struct sockaddr_in myaddr;
- memset(&myaddr, 0, sizeof(myaddr));
-
- g_message ("Attempting to listen on port %d.....\n",port);
-
- if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
- {
- // show error
- g_message ("could not create socket!");
- return;
- }
-
- // set non-blocking mode
- fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
- myaddr.sin_family = AF_INET;
- myaddr.sin_port = htons (port);
- // bind
- if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
- {
- close (fd);
- return;
- }
-
-
- /* Listen */
- if (listen(fd, 3) != 0)
- {
- close (fd);
- return;
- }
- pollfd = g_slice_new0 (FsMsnPollFD);
- gst_poll_fd_init (&pollfd->pollfd);
- pollfd->pollfd.fd = fd;
- pollfd->want_read = TRUE;
- pollfd->want_write = FALSE;
- gst_poll_fd_ctl_read (self->priv->poll, &pollfd->pollfd, TRUE);
- gst_poll_fd_ctl_write (self->priv->poll, &pollfd->pollfd, FALSE);
- pollfd->next_step = fd_accept_connection_cb;
-
- g_array_append_val (self->priv->pollfds, pollfd);
- g_message ("Listening on port %d\n",port);
-
-}
-
-// Authenticate ourselves when connecting out
-static gboolean
-fs_msn_authenticate_outgoing (FsMsnStream *stream, gint fd)
-{
- FsMsnStream *self = FS_MSN_STREAM (stream);
- gchar str[400];
- memset(str, 0, sizeof(str));
- if (fd != 0)
- {
- g_message ("Authenticating connection on %d...", fd);
- g_message ("sending : recipientid=%d&sessionid=%d\r\n\r\n",
- self->priv->remote_recipientid, self->priv->remote_sessionid);
- sprintf(str, "recipientid=%d&sessionid=%d\r\n\r\n",
- self->priv->remote_recipientid, self->priv->remote_sessionid);
- if (send(fd, str, strlen(str), 0) == -1)
- {
- g_message("sending failed");
- perror("auth");
- }
-
- memset(str, 0, sizeof(str));
- if (recv(fd, str, 13, 0) != -1)
- {
- g_message ("Got %s, checking if it's auth", str);
- // we should get a connected message now
- if (strcmp (str, "connected\r\n\r\n") == 0)
- {
- // send our connected message also
- memset(str, 0, sizeof(str));
- sprintf(str, "connected\r\n\r\n");
- send(fd, str, strlen(str), 0);
- g_message ("Authentication successfull");
- return TRUE;
- }
- }
- else
- {
- perror("auth");
- }
- }
return FALSE;
}
-
/**
* fs_msn_stream_new:
* @session: The #FsMsnSession this stream is a child of
@@ -1197,71 +596,3 @@ fs_msn_stream_new (FsMsnSession *session,
return self;
}
-
-static gpointer
-connection_polling_thread (gpointer data)
-{
- FsMsnStream *self = data;
- gint ret;
- GstClockTime timeout;
-
- GST_OBJECT_LOCK (self->priv->conference);
- timeout = self->priv->poll_timeout;
- GST_OBJECT_UNLOCK (self->priv->conference);
-
- while ((ret = gst_poll_wait (self->priv->poll, timeout)) >= 0)
- {
- if (ret > 0)
- {
- gint i;
-
- for (i = 0; i < self->priv->pollfds->len; i++)
- {
- FsMsnPollFD *pollfd = &g_array_index(self->priv->pollfds,
- FsMsnPollFD, i);
-
- if (gst_poll_fd_has_error (self->priv->poll, &pollfd->pollfd) ||
- gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
- {
- pollfd->next_step (self, pollfd);
- shutdown_fd (self, pollfd);
- i--;
- continue;
- }
- if ((pollfd->want_read &&
- gst_poll_fd_can_read (self->priv->poll, &pollfd->pollfd)) ||
- (pollfd->want_write &&
- gst_poll_fd_can_write (self->priv->poll, &pollfd->pollfd)))
- pollfd->next_step (self, pollfd);
- }
-
- }
- GST_OBJECT_LOCK (self->priv->conference);
- timeout = self->priv->poll_timeout;
- GST_OBJECT_UNLOCK (self->priv->conference);
- }
-
- return NULL;
-}
-
-static void
-shutdown_fd (FsMsnStream *self, FsMsnPollFD *pollfd)
-{
- gint i;
-
-
- if (!gst_poll_fd_has_closed (self->priv->poll, &pollfd->pollfd))
- close (pollfd->pollfd.fd);
- gst_poll_remove_fd (self->priv->poll, &pollfd->pollfd);
- for (i = 0; i < self->priv->pollfds->len; i++)
- {
- FsMsnPollFD *p = &g_array_index(self->priv->pollfds, FsMsnPollFD, i);
- if (p == pollfd)
- {
- g_array_remove_index_fast (self->priv->pollfds, i);
- break;
- }
-
- }
- gst_poll_restart (self->priv->poll);
-}
--
1.5.6.5
More information about the farsight-commits
mailing list