[farsight2/master] Generate local candidates from user request, STUN or by scanning the available interfaces
Olivier Crête
olivier.crete at collabora.co.uk
Tue Dec 23 15:19:28 PST 2008
---
transmitters/rawudp/fs-rawudp-stream-transmitter.c | 481 +++++++++++++++++++-
1 files changed, 473 insertions(+), 8 deletions(-)
diff --git a/transmitters/rawudp/fs-rawudp-stream-transmitter.c b/transmitters/rawudp/fs-rawudp-stream-transmitter.c
index b340352..be5457c 100644
--- a/transmitters/rawudp/fs-rawudp-stream-transmitter.c
+++ b/transmitters/rawudp/fs-rawudp-stream-transmitter.c
@@ -36,11 +36,22 @@
#include "fs-rawudp-stream-transmitter.h"
+#include "stun.h"
+#include "fs-interfaces.h"
+
#include <gst/farsight/fs-stream.h>
#include <gst/farsight/fs-candidate.h>
#include <gst/gst.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <unistd.h>
+
/* Signals */
enum
{
@@ -73,17 +84,30 @@ struct _FsRawUdpStreamTransmitterPrivate
FsCandidate *remote_rtp_candidate;
FsCandidate *remote_rtcp_candidate;
+ FsCandidate *local_forced_rtp_candidate;
+ FsCandidate *local_forced_rtcp_candidate
+;
+ FsCandidate *local_stun_rtp_candidate;
+ FsCandidate *local_stun_rtcp_candidate;
+
UdpPort *rtp_udpport;
UdpPort *rtcp_udpport;
- GMutex *stun_mutex; /* protects everything that happens on the STUN thread */
-
gchar *stun_ip;
guint stun_port;
-
guint stun_timeout;
+ gulong stun_rtp_recv_id;
+ gulong stun_rtcp_recv_id;
+
+ gchar stun_cookie[16];
+
GList *prefered_local_candidates;
+
+ guint next_candidate_id;
+
+ GMutex *sources_mutex;
+ GList *sources;
};
#define FS_RAWUDP_STREAM_TRANSMITTER_GET_PRIVATE(o) \
@@ -108,6 +132,17 @@ static gboolean fs_rawudp_stream_transmitter_add_remote_candidate (
FsStreamTransmitter *streamtransmitter, FsCandidate *candidate,
GError **error);
+static gboolean fs_rawudp_stream_transmitter_start_stun (
+ FsRawUdpStreamTransmitter *self, guint component_id, GError **error);
+static void fs_rawudp_stream_transmitter_stop_stun (
+ FsRawUdpStreamTransmitter *self, guint component_id);
+
+static FsCandidate * fs_rawudp_stream_transmitter_build_forced_candidate (
+ FsRawUdpStreamTransmitter *self, const char *ip, gint port,
+ guint component_id);
+static gboolean fs_rawudp_stream_transmitter_finish_candidate_generation (
+ gpointer user_data);
+
static GObjectClass *parent_class = NULL;
// static guint signals[LAST_SIGNAL] = { 0 };
@@ -194,6 +229,14 @@ fs_rawudp_stream_transmitter_init (FsRawUdpStreamTransmitter *self)
self->priv->disposed = FALSE;
self->priv->sending = TRUE;
+
+ /* Use a pseudo-random number for the STUN cookie */
+ ((guint32*)self->priv->stun_cookie)[0] = g_random_int ();
+ ((guint32*)self->priv->stun_cookie)[1] = g_random_int ();
+ ((guint32*)self->priv->stun_cookie)[2] = g_random_int ();
+ ((guint32*)self->priv->stun_cookie)[3] = g_random_int ();
+
+ self->priv->sources_mutex = g_mutex_new ();
}
static void
@@ -206,6 +249,20 @@ fs_rawudp_stream_transmitter_dispose (GObject *object)
return;
}
+ if (self->priv->stun_rtp_recv_id)
+ fs_rawudp_stream_transmitter_stop_stun (self, FS_COMPONENT_RTP);
+ if (self->priv->stun_rtp_recv_id)
+ fs_rawudp_stream_transmitter_stop_stun (self, FS_COMPONENT_RTCP);
+
+ g_mutex_lock (self->priv->sources_mutex);
+ if (self->priv->sources) {
+ g_list_foreach (self->priv->sources, (GFunc) g_source_remove, NULL);
+ g_list_free (self->priv->sources);
+ self->priv->sources = NULL;
+ }
+ g_mutex_unlock (self->priv->sources_mutex);
+
+
/* Make sure dispose does not run twice. */
self->priv->disposed = TRUE;
@@ -252,16 +309,25 @@ fs_rawudp_stream_transmitter_finalize (GObject *object)
self->priv->rtp_udpport = NULL;
}
-
if (self->priv->rtcp_udpport) {
fs_rawudp_transmitter_put_udpport (self->priv->transmitter,
self->priv->rtcp_udpport);
self->priv->rtcp_udpport = NULL;
}
- if (self->priv->stun_mutex) {
- g_mutex_free (self->priv->stun_mutex);
- self->priv->stun_mutex = NULL;
+ if (self->priv->local_forced_rtp_candidate) {
+ fs_candidate_destroy (self->priv->local_forced_rtp_candidate);
+ self->priv->local_forced_rtp_candidate = NULL;
+ }
+
+ if (self->priv->local_forced_rtcp_candidate) {
+ fs_candidate_destroy (self->priv->local_forced_rtcp_candidate);
+ self->priv->local_forced_rtcp_candidate = NULL;
+ }
+
+ if (self->priv->sources_mutex) {
+ g_mutex_free (self->priv->sources_mutex);
+ self->priv->sources_mutex = NULL;
}
parent_class->finalize (object);
@@ -411,7 +477,6 @@ fs_rawudp_stream_transmitter_build (FsRawUdpStreamTransmitter *self,
}
}
-
self->priv->rtp_udpport =
fs_rawudp_transmitter_get_udpport (self->priv->transmitter,
FS_COMPONENT_RTP, ip, port, error);
@@ -425,6 +490,40 @@ fs_rawudp_stream_transmitter_build (FsRawUdpStreamTransmitter *self,
if (!self->priv->rtcp_udpport)
return FALSE;
+
+
+ if (ip) {
+ self->priv->local_forced_rtp_candidate =
+ fs_rawudp_stream_transmitter_build_forced_candidate (self, ip,
+ fs_rawudp_transmitter_udpport_get_port (self->priv->rtp_udpport),
+ FS_COMPONENT_RTP);
+ }
+
+ if (rtcp_ip) {
+ self->priv->local_forced_rtp_candidate =
+ fs_rawudp_stream_transmitter_build_forced_candidate (self, rtcp_ip,
+ fs_rawudp_transmitter_udpport_get_port (self->priv->rtcp_udpport),
+ FS_COMPONENT_RTCP);
+ }
+
+
+ if (self->priv->stun_ip && self->priv->stun_port) {
+ if (!fs_rawudp_stream_transmitter_start_stun (self, FS_COMPONENT_RTP,
+ error))
+ return FALSE;
+ if (!fs_rawudp_stream_transmitter_start_stun (self, FS_COMPONENT_RTCP,
+ error))
+ return FALSE;
+ } else {
+ guint id;
+ id = g_idle_add (fs_rawudp_stream_transmitter_finish_candidate_generation,
+ self);
+ g_mutex_lock (self->priv->sources_mutex);
+ self->priv->sources = g_list_prepend (self->priv->sources,
+ GUINT_TO_POINTER(id));
+ g_mutex_unlock (self->priv->sources_mutex);
+ }
+
return TRUE;
}
@@ -527,3 +626,369 @@ fs_rawudp_stream_transmitter_newv (FsRawUdpTransmitter *transmitter,
return streamtransmitter;
}
+
+struct CandidateTransit {
+ FsRawUdpStreamTransmitter *self;
+ FsCandidate *candidate;
+ guint component_id;
+};
+
+static gboolean
+fs_rawudp_stream_transmitter_emit_stun_candidate (gpointer user_data)
+{
+ struct CandidateTransit *data = user_data;
+
+ if (data->component_id == FS_COMPONENT_RTP)
+ data->self->priv->local_stun_rtp_candidate = data->candidate;
+ else if (data->component_id == FS_COMPONENT_RTCP)
+ data->self->priv->local_stun_rtcp_candidate = data->candidate;
+
+ g_signal_emit_by_name (data->self, "new-local-candidate", data->candidate);
+
+ /* Lets call it over if its over */
+ if (data->self->priv->stun_rtp_recv_id == 0 &&
+ data->self->priv->stun_rtcp_recv_id == 0 ) {
+ fs_rawudp_stream_transmitter_finish_candidate_generation (data->self);
+ } else {
+ /* Lets remove this source from the list of sources to destroy */
+ GSource * source = g_main_current_source ();
+
+ if (source) {
+ guint id = g_source_get_id (source);
+
+ g_mutex_lock (data->self->priv->sources_mutex);
+ data->self->priv->sources =
+ g_list_remove (data->self->priv->sources, GUINT_TO_POINTER (id));
+ g_mutex_unlock (data->self->priv->sources_mutex);
+ }
+ }
+
+ g_free (data);
+
+ /* We only pass one candidate at a time */
+ return FALSE;
+}
+
+/*
+ * We have to be extremely careful in this callback, it is executed
+ * on the streaming thread, so all data must either be object-constant
+ * (ie doesnt change for the life of the object) or locked by a mutex
+ *
+ */
+
+static gboolean
+fs_rawudp_stream_transmitter_stun_recv_cb (GstPad *pad, GstBuffer *buffer,
+ gpointer user_data)
+{
+ FsRawUdpStreamTransmitter *self = FS_RAWUDP_STREAM_TRANSMITTER (user_data);
+ guint component_id;
+ FsCandidate *candidate = NULL;
+ StunMessage *msg;
+ StunAttribute **attr;
+
+
+ if (GST_BUFFER_SIZE (buffer) < 4) {
+ /* Packet is too small to be STUN */
+ return TRUE;
+ }
+
+ if (GST_BUFFER_DATA (buffer)[0] >> 6) {
+ /* Non stun packet */
+ return TRUE;
+ }
+
+ if (fs_rawudp_transmitter_udpport_is_pad (self->priv->rtp_udpport, pad))
+ component_id = FS_COMPONENT_RTP;
+ else if (fs_rawudp_transmitter_udpport_is_pad (
+ self->priv->rtcp_udpport, pad))
+ component_id = FS_COMPONENT_RTCP;
+ else
+ return FALSE;
+
+ msg = stun_message_unpack (GST_BUFFER_SIZE (buffer),
+ (const gchar *) GST_BUFFER_DATA (buffer));
+ if (!msg) {
+ /* invalid message */
+ return TRUE;
+ }
+
+ if (memcmp (msg->transaction_id, self->priv->stun_cookie, 16) != 0) {
+ /* not ours */
+ stun_message_free (msg);
+ return TRUE;
+ }
+
+ if (msg->type == STUN_MESSAGE_BINDING_ERROR_RESPONSE) {
+ stun_message_free (msg);
+ fs_rawudp_stream_transmitter_stop_stun (self, component_id);
+ /* FIXME: Return local candidate */
+ return FALSE;
+ }
+
+ if (msg->type != STUN_MESSAGE_BINDING_RESPONSE) {
+ /* Some other stun message */
+ stun_message_free (msg);
+ return TRUE;
+ }
+
+
+ for (attr = msg->attributes; *attr; attr++) {
+ if ((*attr)->type == STUN_ATTRIBUTE_MAPPED_ADDRESS) {
+
+ candidate = g_new0 (FsCandidate,1);
+ candidate->candidate_id = g_strdup_printf ("L%u",
+ self->priv->next_candidate_id);
+ candidate->component_id = component_id;
+ candidate->ip = g_strdup_printf ("%u.%u.%u.%u",
+ ((*attr)->address.ip & 0xff000000) >> 24,
+ ((*attr)->address.ip & 0x00ff0000) >> 16,
+ ((*attr)->address.ip & 0x0000ff00) >> 8,
+ ((*attr)->address.ip & 0x000000ff));
+ candidate->port = (*attr)->address.port;
+ candidate->proto = FS_NETWORK_PROTOCOL_UDP;
+ if (component_id == FS_COMPONENT_RTP)
+ candidate->proto_subtype = g_strdup ("RTP");
+ else if (component_id == FS_COMPONENT_RTCP)
+ candidate->proto_subtype = g_strdup ("RTCP");
+ candidate->proto_profile = g_strdup ("AVP");
+ candidate->type = FS_CANDIDATE_TYPE_SRFLX;
+
+
+ g_debug ("Stun server says we are %u.%u.%u.%u %u\n",
+ ((*attr)->address.ip & 0xff000000) >> 24,
+ ((*attr)->address.ip & 0x00ff0000) >> 16,
+ ((*attr)->address.ip & 0x0000ff00) >> 8,
+ ((*attr)->address.ip & 0x000000ff),(*attr)->address.port);
+ break;
+ }
+ }
+
+ fs_rawudp_stream_transmitter_stop_stun (self, component_id);
+
+ if (candidate) {
+ guint id;
+ struct CandidateTransit *data = g_new0 (struct CandidateTransit, 1);
+ data->self = self;
+ data->candidate = candidate;
+ data->component_id = component_id;
+ id = g_idle_add (fs_rawudp_stream_transmitter_emit_stun_candidate, data);
+ g_mutex_lock (self->priv->sources_mutex);
+ self->priv->sources = g_list_prepend (self->priv->sources,
+ GUINT_TO_POINTER(id));
+ g_mutex_unlock (self->priv->sources_mutex);
+ }
+
+ /* It was a stun packet, lets drop it */
+ stun_message_free (msg);
+ return FALSE;
+}
+
+static gboolean
+fs_rawudp_stream_transmitter_start_stun (FsRawUdpStreamTransmitter *self,
+ guint component_id, GError **error)
+{
+ struct addrinfo hints;
+ struct addrinfo *result = NULL;
+ struct sockaddr_in address;
+ gchar *packed;
+ guint length;
+ int retval;
+ StunMessage *msg;
+ gboolean ret = TRUE;
+ UdpPort *udpport;
+
+ if (component_id == FS_COMPONENT_RTP)
+ udpport = self->priv->rtp_udpport;
+ else if (component_id == FS_COMPONENT_RTCP)
+ udpport = self->priv->rtcp_udpport;
+ else {
+ g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_INVALID_ARGUMENTS,
+ "Only components RTP(1) and RTCP(2) are support, %u isnt",
+ component_id);
+ return FALSE;
+ }
+
+ memset (&hints, 0, sizeof (struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_flags = AI_NUMERICHOST;
+ retval = getaddrinfo (self->priv->stun_ip, NULL, &hints, &result);
+ if (retval != 0) {
+ g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+ "Invalid IP address %s passed for STUN: %s",
+ self->priv->stun_ip, gai_strerror (retval));
+ return FALSE;
+ }
+ memcpy (&address, result->ai_addr, sizeof(struct sockaddr_in));
+ freeaddrinfo (result);
+
+ address.sin_family = AF_INET;
+ address.sin_port = htons (self->priv->stun_port);
+
+ if (component_id == FS_COMPONENT_RTP)
+ self->priv->stun_rtp_recv_id =
+ fs_rawudp_transmitter_udpport_connect_recv (udpport,
+ G_CALLBACK (fs_rawudp_stream_transmitter_stun_recv_cb), self);
+ else if (component_id == FS_COMPONENT_RTCP)
+ self->priv->stun_rtcp_recv_id =
+ fs_rawudp_transmitter_udpport_connect_recv (udpport,
+ G_CALLBACK (fs_rawudp_stream_transmitter_stun_recv_cb), self);
+
+ msg = stun_message_new (STUN_MESSAGE_BINDING_REQUEST,
+ self->priv->stun_cookie, 0);
+ if (!msg) {
+ g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+ "Could not create a new STUN binding request");
+ return FALSE;
+ }
+
+ length = stun_message_pack (msg, &packed);
+
+ if (!fs_rawudp_transmitter_udpport_sendto (udpport,
+ packed, length, (const struct sockaddr *)&address, sizeof(address),
+ error)) {
+ ret = FALSE;
+ }
+
+ g_free (packed);
+ stun_message_free (msg);
+
+ return ret;
+}
+
+
+static void
+fs_rawudp_stream_transmitter_stop_stun (FsRawUdpStreamTransmitter *self,
+ guint component_id)
+{
+ if (component_id == FS_COMPONENT_RTP) {
+ if (self->priv->stun_rtp_recv_id) {
+ fs_rawudp_transmitter_udpport_disconnect_recv (self->priv->rtp_udpport,
+ self->priv->stun_rtp_recv_id);
+ self->priv->stun_rtp_recv_id = 0;
+ }
+ }
+ else if (component_id == FS_COMPONENT_RTCP) {
+ if (self->priv->stun_rtcp_recv_id) {
+ fs_rawudp_transmitter_udpport_disconnect_recv (self->priv->rtcp_udpport,
+ self->priv->stun_rtcp_recv_id);
+ self->priv->stun_rtcp_recv_id = 0;
+ }
+ }
+}
+
+static FsCandidate *
+fs_rawudp_stream_transmitter_build_forced_candidate (
+ FsRawUdpStreamTransmitter *self, const char *ip, gint port,
+ guint component_id)
+{
+ FsCandidate *candidate = g_new0 (FsCandidate, 1);
+
+ candidate = g_new0 (FsCandidate,1);
+ candidate->candidate_id = g_strdup ("L1");
+ candidate->component_id = component_id;
+ candidate->ip = g_strdup (ip);
+ candidate->port = port;
+ candidate->proto = FS_NETWORK_PROTOCOL_UDP;
+ if (component_id == FS_COMPONENT_RTP)
+ candidate->proto_subtype = g_strdup ("RTP");
+ else if (component_id == FS_COMPONENT_RTCP)
+ candidate->proto_subtype = g_strdup ("RTCP");
+ candidate->proto_profile = g_strdup ("AVP");
+ candidate->type = FS_CANDIDATE_TYPE_HOST;
+
+ return candidate;
+}
+
+static void
+fs_rawudp_stream_transmitter_emit_local_candidates (
+ FsRawUdpStreamTransmitter *self, guint component_id)
+{
+ GList *ips = NULL;
+ GList *current;
+ guint port;
+
+ if (component_id == FS_COMPONENT_RTP)
+ port = fs_rawudp_transmitter_udpport_get_port (self->priv->rtp_udpport);
+ else if (component_id == FS_COMPONENT_RTCP)
+ port = fs_rawudp_transmitter_udpport_get_port (self->priv->rtcp_udpport);
+ else
+ return;
+
+ ips = farsight_get_local_ips(FALSE);
+
+ for (current = g_list_first (ips);
+ current;
+ current = g_list_next(current)) {
+ FsCandidate *candidate = g_new0 (FsCandidate, 1);
+ candidate->candidate_id = g_strdup_printf ("L%u",
+ self->priv->next_candidate_id++);
+ candidate->component_id = component_id;
+ candidate->ip = g_strdup (current->data);
+ candidate->port = port;
+ candidate->proto = FS_NETWORK_PROTOCOL_UDP;
+ if (component_id == FS_COMPONENT_RTP)
+ candidate->proto_subtype = g_strdup ("RTP");
+ else if (component_id == FS_COMPONENT_RTCP)
+ candidate->proto_subtype = g_strdup ("RTCP");
+ candidate->proto_profile = g_strdup ("AVP");
+ candidate->type = FS_CANDIDATE_TYPE_HOST;
+
+ g_signal_emit_by_name (self, "new-local-candidate", candidate);
+
+ fs_candidate_destroy (candidate);
+ }
+
+
+ /* free list of ips */
+ g_list_foreach (ips, (GFunc) g_free, NULL);
+ g_list_free (ips);
+}
+
+/*
+ * This is called when the local candidates have been generated
+ */
+
+static gboolean
+fs_rawudp_stream_transmitter_finish_candidate_generation (gpointer user_data)
+{
+ FsRawUdpStreamTransmitter *self = user_data;
+ GSource *source;
+
+ /* If we have a STUN'd rtp candidate, dont send the locally generated
+ * ones */
+ if (!self->priv->local_stun_rtp_candidate) {
+ if (self->priv->local_forced_rtp_candidate)
+ g_signal_emit_by_name (self, "new-local-candidate",
+ self->priv->local_forced_rtp_candidate);
+ else
+ fs_rawudp_stream_transmitter_emit_local_candidates (self,
+ FS_COMPONENT_RTP);
+ }
+
+ /* Lets do the same for rtcp */
+ if (!self->priv->local_stun_rtcp_candidate) {
+ if (self->priv->local_forced_rtcp_candidate)
+ g_signal_emit_by_name (self, "new-local-candidate",
+ self->priv->local_forced_rtcp_candidate);
+ else
+ fs_rawudp_stream_transmitter_emit_local_candidates (self,
+ FS_COMPONENT_RTCP);
+ }
+
+ g_signal_emit_by_name (self, "local-candidates-prepared");
+
+ /* Lets remove this source from the list of sources to destroy
+ * For the case when its called from an idle source
+ */
+ source = g_main_current_source ();
+ if (source) {
+ guint id = g_source_get_id (source);
+
+ g_mutex_lock (self->priv->sources_mutex);
+ self->priv->sources = g_list_remove (self->priv->sources,
+ GUINT_TO_POINTER (id));
+ g_mutex_unlock (self->priv->sources_mutex);
+ }
+
+ return FALSE;
+}
--
1.5.6.5
More information about the farsight-commits
mailing list