[farsight2/master] Generate local candidates from user request, STUN or by scanning the available interfaces

Olivier Crête olivier.crete at collabora.co.uk
Tue Dec 23 15:19:28 PST 2008


---
 transmitters/rawudp/fs-rawudp-stream-transmitter.c |  481 +++++++++++++++++++-
 1 files changed, 473 insertions(+), 8 deletions(-)

diff --git a/transmitters/rawudp/fs-rawudp-stream-transmitter.c b/transmitters/rawudp/fs-rawudp-stream-transmitter.c
index b340352..be5457c 100644
--- a/transmitters/rawudp/fs-rawudp-stream-transmitter.c
+++ b/transmitters/rawudp/fs-rawudp-stream-transmitter.c
@@ -36,11 +36,22 @@
 
 #include "fs-rawudp-stream-transmitter.h"
 
+#include "stun.h"
+#include "fs-interfaces.h"
+
 #include <gst/farsight/fs-stream.h>
 #include <gst/farsight/fs-candidate.h>
 
 #include <gst/gst.h>
 
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <unistd.h>
+
 /* Signals */
 enum
 {
@@ -73,17 +84,30 @@ struct _FsRawUdpStreamTransmitterPrivate
   FsCandidate *remote_rtp_candidate;
   FsCandidate *remote_rtcp_candidate;
 
+  FsCandidate *local_forced_rtp_candidate;
+  FsCandidate *local_forced_rtcp_candidate
+;
+  FsCandidate *local_stun_rtp_candidate;
+  FsCandidate *local_stun_rtcp_candidate;
+
   UdpPort *rtp_udpport;
   UdpPort *rtcp_udpport;
 
-  GMutex *stun_mutex; /* protects everything that happens on the STUN thread */
-
   gchar *stun_ip;
   guint stun_port;
-
   guint stun_timeout;
 
+  gulong stun_rtp_recv_id;
+  gulong stun_rtcp_recv_id;
+
+  gchar stun_cookie[16];
+
   GList *prefered_local_candidates;
+
+  guint next_candidate_id;
+
+  GMutex *sources_mutex;
+  GList *sources;
 };
 
 #define FS_RAWUDP_STREAM_TRANSMITTER_GET_PRIVATE(o)  \
@@ -108,6 +132,17 @@ static gboolean fs_rawudp_stream_transmitter_add_remote_candidate (
     FsStreamTransmitter *streamtransmitter, FsCandidate *candidate,
     GError **error);
 
+static gboolean fs_rawudp_stream_transmitter_start_stun (
+    FsRawUdpStreamTransmitter *self, guint component_id, GError **error);
+static void fs_rawudp_stream_transmitter_stop_stun (
+    FsRawUdpStreamTransmitter *self, guint component_id);
+
+static FsCandidate * fs_rawudp_stream_transmitter_build_forced_candidate (
+    FsRawUdpStreamTransmitter *self, const char *ip, gint port,
+    guint component_id);
+static gboolean fs_rawudp_stream_transmitter_finish_candidate_generation (
+    gpointer user_data);
+
 
 static GObjectClass *parent_class = NULL;
 // static guint signals[LAST_SIGNAL] = { 0 };
@@ -194,6 +229,14 @@ fs_rawudp_stream_transmitter_init (FsRawUdpStreamTransmitter *self)
   self->priv->disposed = FALSE;
 
   self->priv->sending = TRUE;
+
+  /* Use a pseudo-random number for the STUN cookie */
+  ((guint32*)self->priv->stun_cookie)[0] = g_random_int ();
+  ((guint32*)self->priv->stun_cookie)[1] = g_random_int ();
+  ((guint32*)self->priv->stun_cookie)[2] = g_random_int ();
+  ((guint32*)self->priv->stun_cookie)[3] = g_random_int ();
+
+  self->priv->sources_mutex = g_mutex_new ();
 }
 
 static void
@@ -206,6 +249,20 @@ fs_rawudp_stream_transmitter_dispose (GObject *object)
     return;
   }
 
+  if (self->priv->stun_rtp_recv_id)
+    fs_rawudp_stream_transmitter_stop_stun (self, FS_COMPONENT_RTP);
+  if (self->priv->stun_rtp_recv_id)
+    fs_rawudp_stream_transmitter_stop_stun (self, FS_COMPONENT_RTCP);
+
+  g_mutex_lock (self->priv->sources_mutex);
+  if (self->priv->sources) {
+    g_list_foreach (self->priv->sources, (GFunc) g_source_remove, NULL);
+    g_list_free (self->priv->sources);
+    self->priv->sources = NULL;
+  }
+  g_mutex_unlock (self->priv->sources_mutex);
+
+
   /* Make sure dispose does not run twice. */
   self->priv->disposed = TRUE;
 
@@ -252,16 +309,25 @@ fs_rawudp_stream_transmitter_finalize (GObject *object)
     self->priv->rtp_udpport = NULL;
   }
 
-
   if (self->priv->rtcp_udpport) {
     fs_rawudp_transmitter_put_udpport (self->priv->transmitter,
       self->priv->rtcp_udpport);
     self->priv->rtcp_udpport = NULL;
   }
 
-  if (self->priv->stun_mutex) {
-    g_mutex_free (self->priv->stun_mutex);
-    self->priv->stun_mutex = NULL;
+  if (self->priv->local_forced_rtp_candidate) {
+    fs_candidate_destroy (self->priv->local_forced_rtp_candidate);
+    self->priv->local_forced_rtp_candidate = NULL;
+  }
+
+  if (self->priv->local_forced_rtcp_candidate) {
+    fs_candidate_destroy (self->priv->local_forced_rtcp_candidate);
+    self->priv->local_forced_rtcp_candidate = NULL;
+  }
+
+  if (self->priv->sources_mutex) {
+    g_mutex_free (self->priv->sources_mutex);
+    self->priv->sources_mutex = NULL;
   }
 
   parent_class->finalize (object);
@@ -411,7 +477,6 @@ fs_rawudp_stream_transmitter_build (FsRawUdpStreamTransmitter *self,
     }
   }
 
-
   self->priv->rtp_udpport =
     fs_rawudp_transmitter_get_udpport (self->priv->transmitter,
       FS_COMPONENT_RTP, ip, port, error);
@@ -425,6 +490,40 @@ fs_rawudp_stream_transmitter_build (FsRawUdpStreamTransmitter *self,
   if (!self->priv->rtcp_udpport)
     return FALSE;
 
+
+
+  if (ip) {
+    self->priv->local_forced_rtp_candidate =
+      fs_rawudp_stream_transmitter_build_forced_candidate (self, ip,
+        fs_rawudp_transmitter_udpport_get_port (self->priv->rtp_udpport),
+        FS_COMPONENT_RTP);
+  }
+
+  if (rtcp_ip) {
+    self->priv->local_forced_rtp_candidate =
+      fs_rawudp_stream_transmitter_build_forced_candidate (self, rtcp_ip,
+        fs_rawudp_transmitter_udpport_get_port (self->priv->rtcp_udpport),
+        FS_COMPONENT_RTCP);
+  }
+
+
+  if (self->priv->stun_ip && self->priv->stun_port) {
+    if (!fs_rawudp_stream_transmitter_start_stun (self, FS_COMPONENT_RTP,
+        error))
+      return FALSE;
+    if (!fs_rawudp_stream_transmitter_start_stun (self, FS_COMPONENT_RTCP,
+        error))
+      return FALSE;
+  } else {
+    guint id;
+    id = g_idle_add (fs_rawudp_stream_transmitter_finish_candidate_generation,
+      self);
+    g_mutex_lock (self->priv->sources_mutex);
+    self->priv->sources = g_list_prepend (self->priv->sources,
+      GUINT_TO_POINTER(id));
+    g_mutex_unlock (self->priv->sources_mutex);
+  }
+
   return TRUE;
 }
 
@@ -527,3 +626,369 @@ fs_rawudp_stream_transmitter_newv (FsRawUdpTransmitter *transmitter,
 
   return streamtransmitter;
 }
+
+struct CandidateTransit {
+  FsRawUdpStreamTransmitter *self;
+  FsCandidate *candidate;
+  guint component_id;
+};
+
+static gboolean
+fs_rawudp_stream_transmitter_emit_stun_candidate (gpointer user_data)
+{
+  struct CandidateTransit *data = user_data;
+
+  if (data->component_id == FS_COMPONENT_RTP)
+    data->self->priv->local_stun_rtp_candidate = data->candidate;
+  else if (data->component_id == FS_COMPONENT_RTCP)
+    data->self->priv->local_stun_rtcp_candidate = data->candidate;
+
+  g_signal_emit_by_name (data->self, "new-local-candidate", data->candidate);
+
+  /* Lets call it over if its over */
+  if (data->self->priv->stun_rtp_recv_id == 0 &&
+    data->self->priv->stun_rtcp_recv_id == 0 ) {
+    fs_rawudp_stream_transmitter_finish_candidate_generation (data->self);
+  } else {
+    /* Lets remove this source from the list of sources to destroy */
+    GSource * source = g_main_current_source ();
+
+    if (source)  {
+      guint id = g_source_get_id (source);
+
+      g_mutex_lock (data->self->priv->sources_mutex);
+      data->self->priv->sources =
+        g_list_remove (data->self->priv->sources, GUINT_TO_POINTER (id));
+      g_mutex_unlock (data->self->priv->sources_mutex);
+    }
+  }
+
+  g_free (data);
+
+  /* We only pass one candidate at a time */
+  return FALSE;
+}
+
+/*
+ * We have to be extremely careful in this callback, it is executed
+ * on the streaming thread, so all data must either be object-constant
+ * (ie doesnt change for the life of the object) or locked by a mutex
+ *
+ */
+
+static gboolean
+fs_rawudp_stream_transmitter_stun_recv_cb (GstPad *pad, GstBuffer *buffer,
+  gpointer user_data)
+{
+  FsRawUdpStreamTransmitter *self = FS_RAWUDP_STREAM_TRANSMITTER (user_data);
+  guint component_id;
+  FsCandidate *candidate = NULL;
+  StunMessage *msg;
+  StunAttribute **attr;
+
+
+  if (GST_BUFFER_SIZE (buffer) < 4) {
+    /* Packet is too small to be STUN */
+    return TRUE;
+  }
+
+  if (GST_BUFFER_DATA (buffer)[0] >> 6) {
+    /* Non stun packet */
+    return TRUE;
+  }
+
+  if (fs_rawudp_transmitter_udpport_is_pad (self->priv->rtp_udpport, pad))
+    component_id = FS_COMPONENT_RTP;
+  else if (fs_rawudp_transmitter_udpport_is_pad (
+        self->priv->rtcp_udpport, pad))
+    component_id = FS_COMPONENT_RTCP;
+  else
+    return FALSE;
+
+  msg = stun_message_unpack (GST_BUFFER_SIZE (buffer),
+    (const gchar *) GST_BUFFER_DATA (buffer));
+  if (!msg) {
+    /* invalid message */
+    return TRUE;
+  }
+
+  if (memcmp (msg->transaction_id, self->priv->stun_cookie, 16) != 0) {
+    /* not ours */
+    stun_message_free (msg);
+    return TRUE;
+  }
+
+  if (msg->type == STUN_MESSAGE_BINDING_ERROR_RESPONSE) {
+    stun_message_free (msg);
+    fs_rawudp_stream_transmitter_stop_stun (self, component_id);
+    /* FIXME: Return local candidate */
+    return FALSE;
+  }
+
+  if (msg->type != STUN_MESSAGE_BINDING_RESPONSE) {
+    /* Some other stun message */
+    stun_message_free (msg);
+    return TRUE;
+  }
+
+
+  for (attr = msg->attributes; *attr; attr++) {
+    if ((*attr)->type == STUN_ATTRIBUTE_MAPPED_ADDRESS) {
+
+      candidate = g_new0 (FsCandidate,1);
+      candidate->candidate_id = g_strdup_printf ("L%u",
+        self->priv->next_candidate_id);
+      candidate->component_id = component_id;
+      candidate->ip = g_strdup_printf ("%u.%u.%u.%u",
+          ((*attr)->address.ip & 0xff000000) >> 24,
+          ((*attr)->address.ip & 0x00ff0000) >> 16,
+          ((*attr)->address.ip & 0x0000ff00) >>  8,
+          ((*attr)->address.ip & 0x000000ff));
+      candidate->port = (*attr)->address.port;
+      candidate->proto = FS_NETWORK_PROTOCOL_UDP;
+      if (component_id == FS_COMPONENT_RTP)
+        candidate->proto_subtype = g_strdup ("RTP");
+      else if (component_id == FS_COMPONENT_RTCP)
+        candidate->proto_subtype = g_strdup ("RTCP");
+      candidate->proto_profile = g_strdup ("AVP");
+      candidate->type = FS_CANDIDATE_TYPE_SRFLX;
+
+
+      g_debug ("Stun server says we are %u.%u.%u.%u %u\n",
+          ((*attr)->address.ip & 0xff000000) >> 24,
+          ((*attr)->address.ip & 0x00ff0000) >> 16,
+          ((*attr)->address.ip & 0x0000ff00) >>  8,
+          ((*attr)->address.ip & 0x000000ff),(*attr)->address.port);
+      break;
+    }
+  }
+
+  fs_rawudp_stream_transmitter_stop_stun (self, component_id);
+
+  if (candidate) {
+    guint id;
+    struct CandidateTransit *data = g_new0 (struct CandidateTransit, 1);
+    data->self = self;
+    data->candidate = candidate;
+    data->component_id = component_id;
+    id = g_idle_add (fs_rawudp_stream_transmitter_emit_stun_candidate, data);
+    g_mutex_lock (self->priv->sources_mutex);
+    self->priv->sources = g_list_prepend (self->priv->sources,
+      GUINT_TO_POINTER(id));
+    g_mutex_unlock (self->priv->sources_mutex);
+  }
+
+  /* It was a stun packet, lets drop it */
+    stun_message_free (msg);
+  return FALSE;
+}
+
+static gboolean
+fs_rawudp_stream_transmitter_start_stun (FsRawUdpStreamTransmitter *self,
+  guint component_id, GError **error)
+{
+  struct addrinfo hints;
+  struct addrinfo *result = NULL;
+  struct sockaddr_in address;
+  gchar *packed;
+  guint length;
+  int retval;
+  StunMessage *msg;
+  gboolean ret = TRUE;
+  UdpPort *udpport;
+
+  if (component_id == FS_COMPONENT_RTP)
+    udpport = self->priv->rtp_udpport;
+  else if (component_id == FS_COMPONENT_RTCP)
+    udpport = self->priv->rtcp_udpport;
+  else {
+    g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_INVALID_ARGUMENTS,
+      "Only components RTP(1) and RTCP(2) are support, %u isnt",
+      component_id);
+    return FALSE;
+  }
+
+  memset (&hints, 0, sizeof (struct addrinfo));
+  hints.ai_family = AF_INET;
+  hints.ai_flags = AI_NUMERICHOST;
+  retval = getaddrinfo (self->priv->stun_ip, NULL, &hints, &result);
+  if (retval != 0) {
+    g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+      "Invalid IP address %s passed for STUN: %s",
+      self->priv->stun_ip, gai_strerror (retval));
+    return FALSE;
+  }
+  memcpy (&address, result->ai_addr, sizeof(struct sockaddr_in));
+  freeaddrinfo (result);
+
+  address.sin_family = AF_INET;
+  address.sin_port = htons (self->priv->stun_port);
+
+  if (component_id == FS_COMPONENT_RTP)
+    self->priv->stun_rtp_recv_id =
+      fs_rawudp_transmitter_udpport_connect_recv (udpport,
+        G_CALLBACK (fs_rawudp_stream_transmitter_stun_recv_cb), self);
+  else if (component_id == FS_COMPONENT_RTCP)
+    self->priv->stun_rtcp_recv_id =
+      fs_rawudp_transmitter_udpport_connect_recv (udpport,
+        G_CALLBACK (fs_rawudp_stream_transmitter_stun_recv_cb), self);
+
+  msg = stun_message_new (STUN_MESSAGE_BINDING_REQUEST,
+    self->priv->stun_cookie, 0);
+  if (!msg) {
+    g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+      "Could not create a new STUN binding request");
+    return FALSE;
+  }
+
+  length = stun_message_pack (msg, &packed);
+
+  if (!fs_rawudp_transmitter_udpport_sendto (udpport,
+      packed, length, (const struct sockaddr *)&address, sizeof(address),
+      error)) {
+    ret = FALSE;
+  }
+
+  g_free (packed);
+  stun_message_free (msg);
+
+  return ret;
+}
+
+
+static void
+fs_rawudp_stream_transmitter_stop_stun (FsRawUdpStreamTransmitter *self,
+  guint component_id)
+{
+  if (component_id == FS_COMPONENT_RTP) {
+    if (self->priv->stun_rtp_recv_id) {
+      fs_rawudp_transmitter_udpport_disconnect_recv (self->priv->rtp_udpport,
+        self->priv->stun_rtp_recv_id);
+      self->priv->stun_rtp_recv_id = 0;
+    }
+  }
+  else if (component_id == FS_COMPONENT_RTCP) {
+    if (self->priv->stun_rtcp_recv_id) {
+      fs_rawudp_transmitter_udpport_disconnect_recv (self->priv->rtcp_udpport,
+        self->priv->stun_rtcp_recv_id);
+      self->priv->stun_rtcp_recv_id = 0;
+    }
+  }
+}
+
+static FsCandidate *
+fs_rawudp_stream_transmitter_build_forced_candidate (
+    FsRawUdpStreamTransmitter *self, const char *ip, gint port,
+    guint component_id)
+{
+  FsCandidate *candidate = g_new0 (FsCandidate, 1);
+
+  candidate = g_new0 (FsCandidate,1);
+  candidate->candidate_id = g_strdup ("L1");
+  candidate->component_id = component_id;
+  candidate->ip = g_strdup (ip);
+  candidate->port = port;
+  candidate->proto = FS_NETWORK_PROTOCOL_UDP;
+  if (component_id == FS_COMPONENT_RTP)
+    candidate->proto_subtype = g_strdup ("RTP");
+  else if (component_id == FS_COMPONENT_RTCP)
+    candidate->proto_subtype = g_strdup ("RTCP");
+  candidate->proto_profile = g_strdup ("AVP");
+  candidate->type = FS_CANDIDATE_TYPE_HOST;
+
+  return candidate;
+}
+
+static void
+fs_rawudp_stream_transmitter_emit_local_candidates (
+    FsRawUdpStreamTransmitter *self, guint component_id)
+{
+  GList *ips = NULL;
+  GList *current;
+  guint port;
+
+  if (component_id == FS_COMPONENT_RTP)
+    port = fs_rawudp_transmitter_udpport_get_port (self->priv->rtp_udpport);
+  else if (component_id == FS_COMPONENT_RTCP)
+    port = fs_rawudp_transmitter_udpport_get_port (self->priv->rtcp_udpport);
+  else
+    return;
+
+  ips = farsight_get_local_ips(FALSE);
+
+  for (current = g_list_first (ips);
+       current;
+       current = g_list_next(current)) {
+    FsCandidate *candidate = g_new0 (FsCandidate, 1);
+    candidate->candidate_id = g_strdup_printf ("L%u",
+      self->priv->next_candidate_id++);
+    candidate->component_id = component_id;
+    candidate->ip = g_strdup (current->data);
+    candidate->port = port;
+    candidate->proto = FS_NETWORK_PROTOCOL_UDP;
+    if (component_id == FS_COMPONENT_RTP)
+      candidate->proto_subtype = g_strdup ("RTP");
+    else if (component_id == FS_COMPONENT_RTCP)
+      candidate->proto_subtype = g_strdup ("RTCP");
+    candidate->proto_profile = g_strdup ("AVP");
+    candidate->type = FS_CANDIDATE_TYPE_HOST;
+
+    g_signal_emit_by_name (self, "new-local-candidate", candidate);
+
+    fs_candidate_destroy (candidate);
+  }
+
+
+  /* free list of ips */
+  g_list_foreach (ips, (GFunc) g_free, NULL);
+  g_list_free (ips);
+}
+
+/*
+ * This is called when the local candidates have been generated
+ */
+
+static gboolean
+fs_rawudp_stream_transmitter_finish_candidate_generation (gpointer user_data)
+{
+  FsRawUdpStreamTransmitter *self = user_data;
+  GSource *source;
+
+  /* If we have a STUN'd rtp candidate, dont send the locally generated
+   * ones */
+  if (!self->priv->local_stun_rtp_candidate) {
+    if (self->priv->local_forced_rtp_candidate)
+      g_signal_emit_by_name (self, "new-local-candidate",
+        self->priv->local_forced_rtp_candidate);
+    else
+      fs_rawudp_stream_transmitter_emit_local_candidates (self,
+        FS_COMPONENT_RTP);
+  }
+
+  /* Lets do the same for rtcp */
+  if (!self->priv->local_stun_rtcp_candidate) {
+    if (self->priv->local_forced_rtcp_candidate)
+      g_signal_emit_by_name (self, "new-local-candidate",
+        self->priv->local_forced_rtcp_candidate);
+    else
+      fs_rawudp_stream_transmitter_emit_local_candidates (self,
+        FS_COMPONENT_RTCP);
+  }
+
+  g_signal_emit_by_name (self, "local-candidates-prepared");
+
+  /* Lets remove this source from the list of sources to destroy
+   * For the case when its called from an idle source
+   */
+  source = g_main_current_source ();
+  if (source)  {
+    guint id = g_source_get_id (source);
+
+    g_mutex_lock (self->priv->sources_mutex);
+    self->priv->sources = g_list_remove (self->priv->sources,
+      GUINT_TO_POINTER (id));
+    g_mutex_unlock (self->priv->sources_mutex);
+  }
+
+  return FALSE;
+}
-- 
1.5.6.5




More information about the farsight-commits mailing list