[Nice] [nice/master] Make libnice thread-safe

Youness Alaoui youness.alaoui at collabora.co.uk
Wed Nov 5 14:00:56 PST 2008


darcs-hash:20080402185805-4f0f6-d575bacdd796be98b42597605baf21b24a331047.gz
---
 agent/agent-priv.h             |    1 +
 agent/agent.c                  |  149 +++++++++++++++++++++++++++++++++-------
 agent/conncheck.c              |   21 +++++-
 agent/discovery.c              |   16 ++++-
 agent/test-add-remove-stream.c |    1 +
 agent/test-fallback.c          |    1 +
 agent/test-fullmode.c          |    1 +
 agent/test-mainloop.c          |    1 +
 agent/test-poll.c              |    1 +
 agent/test-recv.c              |    1 +
 agent/test-restart.c           |    1 +
 agent/test.c                   |    1 +
 configure.ac                   |    1 +
 13 files changed, 168 insertions(+), 28 deletions(-)

diff --git a/agent/agent-priv.h b/agent/agent-priv.h
index c40a330..32b7198 100644
--- a/agent/agent-priv.h
+++ b/agent/agent-priv.h
@@ -94,6 +94,7 @@ struct _NiceAgent
   guint keepalive_timer_id;       /**< id of keepalive timer */
   guint64 tie_breaker;            /**< tie breaker (ICE sect 5.2
 				     "Determining Role" ID-19) */
+  GMutex * mutex;                 /* Mutex used for thread-safe lib */
   /* XXX: add pointer to internal data struct for ABI-safe extensions */
 };
 
diff --git a/agent/agent.c b/agent/agent.c
index 2f22e59..4e632ff 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -369,6 +369,7 @@ nice_agent_init (NiceAgent *agent)
 
   agent->rng = nice_rng_new ();
   priv_generate_tie_breaker (agent);
+  agent->mutex = g_mutex_new ();
 }
 
 
@@ -398,6 +399,8 @@ nice_agent_get_property (
 {
   NiceAgent *agent = NICE_AGENT (object);
 
+  g_mutex_lock (agent->mutex);
+
   switch (property_id)
     {
     case PROP_SOCKET_FACTORY:
@@ -440,6 +443,8 @@ nice_agent_get_property (
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
     }
+
+  g_mutex_unlock (agent->mutex);
 }
 
 
@@ -452,6 +457,8 @@ nice_agent_set_property (
 {
   NiceAgent *agent = NICE_AGENT (object);
 
+  g_mutex_lock (agent->mutex);
+
   switch (property_id)
     {
     case PROP_SOCKET_FACTORY:
@@ -489,6 +496,9 @@ nice_agent_set_property (
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
     }
+
+  g_mutex_unlock (agent->mutex);
+
 }
 
 void agent_signal_gathering_done (NiceAgent *agent)
@@ -612,8 +622,12 @@ nice_agent_add_stream (
   GSList *i, *modified_list = NULL;
   guint n;
 
-  if (!agent->local_addresses)
+  g_mutex_lock (agent->mutex);
+
+  if (!agent->local_addresses) {
+    g_mutex_unlock (agent->mutex);
     return 0;
+  }
 
   stream = stream_new (n_components);
   if (stream) {
@@ -631,8 +645,10 @@ nice_agent_add_stream (
   }
 
   /* note: error in allocating objects */
-  if (!modified_list)
+  if (!modified_list) {
+    g_mutex_unlock (agent->mutex);
     return 0;
+  }
 
   g_debug ("In %s mode, starting candidate gathering.", agent->full_mode ? "ICE-FULL" : "ICE-LITE");
 
@@ -687,6 +703,7 @@ nice_agent_add_stream (
     discovery_schedule (agent);
   }
 
+  g_mutex_unlock (agent->mutex);
   return stream->id;
 }
 
@@ -712,10 +729,13 @@ nice_agent_remove_stream (
 
   Stream *stream;
 
+  g_mutex_lock (agent->mutex);
   stream = agent_find_stream (agent, stream_id);
 
-  if (!stream)
+  if (!stream) {
+    g_mutex_unlock (agent->mutex);
     return;
+  }
 
   /* note: remove items with matching stream_ids from both lists */
   conn_check_prune_stream (agent, stream);
@@ -728,6 +748,8 @@ nice_agent_remove_stream (
 
   if (!agent->streams)
     priv_remove_keepalive_timer (agent);
+
+  g_mutex_unlock (agent->mutex);
 }
 
 /**
@@ -746,13 +768,19 @@ nice_agent_add_local_address (NiceAgent *agent, NiceAddress *addr)
   NiceAddress *dup;
   GSList *modified_list;
 
+  g_mutex_lock (agent->mutex);
+
   dup = nice_address_dup (addr);
   nice_address_set_port (dup, 0);
   modified_list = g_slist_append (agent->local_addresses, dup);
   if (modified_list) {
     agent->local_addresses = modified_list;
+
+    g_mutex_unlock (agent->mutex);
     return TRUE;
   }
+
+  g_mutex_unlock (agent->mutex);
   return FALSE;
 }
 
@@ -880,6 +908,8 @@ nice_agent_set_remote_credentials (
 {
   Stream *stream;
 
+  g_mutex_lock (agent->mutex);
+
   stream = agent_find_stream (agent, stream_id);
   /* note: oddly enough, ufrag and pwd can be empty strings */
   if (stream && ufrag && pwd) {
@@ -887,9 +917,11 @@ nice_agent_set_remote_credentials (
     g_strlcpy (stream->remote_ufrag, ufrag, NICE_STREAM_MAX_UFRAG);
     g_strlcpy (stream->remote_password, pwd, NICE_STREAM_MAX_PWD);
 
+    g_mutex_unlock (agent->mutex);
     return TRUE;
   }
 
+  g_mutex_unlock (agent->mutex);
   return FALSE;
 }
 
@@ -913,15 +945,21 @@ nice_agent_get_local_credentials (
 {
   Stream *stream = agent_find_stream (agent, stream_id);
 
-  if (stream == NULL)
+  g_mutex_lock (agent->mutex);
+  if (stream == NULL) {
+    g_mutex_unlock (agent->mutex);
     return FALSE;
+  }
 
-  if (!ufrag || !pwd)
+  if (!ufrag || !pwd) {
+    g_mutex_unlock (agent->mutex);
     return FALSE;
+  }
 
   *ufrag = stream->local_ufrag;
   *pwd = stream->local_password;
 
+  g_mutex_unlock (agent->mutex);
   return TRUE;
 }
 
@@ -952,12 +990,11 @@ nice_agent_add_remote_candidate (
   const gchar *password)
 {
 
-  /* XXX: to be deprecated */
-
   /* XXX: should we allow use of this method without an 
    *      initial call to nice_agent_set_remote_candidates()
    *      with an empty set? */
-  return 
+
+  gboolean ret =
     priv_add_remote_candidate (agent,
 			       stream_id,
 			       component_id,
@@ -972,6 +1009,10 @@ nice_agent_add_remote_candidate (
 
   /* XXX/later: for each component, generate a new check with the new
      candidate, see below set_remote_candidates() */
+
+
+  g_mutex_unlock (agent->mutex);
+  return ret;
 }
 
 /**
@@ -996,9 +1037,8 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
   const GSList *i; 
   int added = 0;
 
-
-  if (agent->discovery_unsched_items > 0)
-    return -1;
+ /* XXX: clean up existing remote candidates, and abort any 
+  *      connectivity checks using these candidates */
 
  for (i = candidates; i && added >= 0; i = i->next) {
    NiceCandidateDesc *d = (NiceCandidateDesc*) i->data;
@@ -1028,6 +1068,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
      g_debug ("Warning: unable to schedule any conn checks!");
  }
 
+ g_mutex_unlock (agent->mutex);
  return added;
 }
 
@@ -1127,8 +1168,11 @@ nice_agent_recv (
   Stream *stream;
   Component *component;
 
-  if (!agent_find_component (agent, stream_id, component_id, &stream, &component))
+  g_mutex_lock (agent->mutex);
+  if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) {
+    g_mutex_unlock (agent->mutex);
     return 0;
+  }
 
   FD_ZERO (&fds);
 
@@ -1164,8 +1208,10 @@ nice_agent_recv (
                 len = _nice_agent_recv (agent, stream, component, socket,
 					buf_len, buf);
 
-                if (len >= 0)
+                if (len >= 0) {
+                  g_mutex_unlock (agent->mutex);
                   return len;
+                }
               }
         }
     }
@@ -1173,6 +1219,7 @@ nice_agent_recv (
   /* note: commented out to avoid compiler warnings 
    *
    * g_assert_not_reached (); */
+  g_mutex_unlock (agent->mutex);
 }
 
 NICEAPI_EXPORT guint
@@ -1187,15 +1234,22 @@ nice_agent_recv_sock (
   NiceUDPSocket *socket;
   Stream *stream;
   Component *component;
+  guint ret;
 
-  if (!agent_find_component (agent, stream_id, component_id, &stream, &component))
+  g_mutex_lock (agent->mutex);
+  if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) {
+    g_mutex_unlock (agent->mutex);
     return 0;
+  }
 
   socket = component_find_udp_socket_by_fd (component, sock);
   g_assert (socket);
 
-  return _nice_agent_recv (agent, stream, component,
+  ret = _nice_agent_recv (agent, stream, component,
 			   socket, buf_len, buf);
+
+  g_mutex_unlock (agent->mutex);
+  return ret;
 }
 
 
@@ -1225,6 +1279,8 @@ nice_agent_poll_read (
   GSList *i;
   guint j;
 
+  g_mutex_lock (agent->mutex);
+
   FD_ZERO (&fds);
 
   for (i = agent->streams; i; i = i->next)
@@ -1257,9 +1313,11 @@ nice_agent_poll_read (
 
   num_readable = select (max_fd + 1, &fds, NULL, NULL, NULL);
 
-  if (num_readable < 1)
+  if (num_readable < 1) {
+    g_mutex_unlock (agent->mutex);
     /* none readable, or error */
     return NULL;
+  }
 
   for (j = 0; j <= max_fd; j++)
     if (FD_ISSET (j, &fds))
@@ -1268,6 +1326,7 @@ nice_agent_poll_read (
 	  GSList *modified_list = g_slist_append (ret, GUINT_TO_POINTER (j));
 	  if (modified_list == NULL) {
 	    g_slist_free (ret);
+            g_mutex_unlock (agent->mutex);
 	    return NULL;
 	  }
 	  ret = modified_list;
@@ -1306,6 +1365,7 @@ nice_agent_poll_read (
           }
       }
 
+  g_mutex_unlock (agent->mutex);
   return ret;
 }
 
@@ -1331,6 +1391,8 @@ nice_agent_send (
   Stream *stream;
   Component *component;
 
+  g_mutex_lock (agent->mutex);
+
   agent_find_component (agent, stream_id, component_id, &stream, &component);
 
   if (component->selected_pair.local != NULL)
@@ -1351,9 +1413,11 @@ nice_agent_send (
       addr = &component->selected_pair.remote->addr;
       nice_udp_socket_send (sock, addr, len, buf);
       component->media_after_tick = TRUE;
+      g_mutex_unlock (agent->mutex);
       return len;
     }
 
+  g_mutex_unlock (agent->mutex);
   return -1;
 }
 
@@ -1375,11 +1439,19 @@ nice_agent_get_local_candidates (
   guint component_id)
 {
   Component *component;
+  GSList * ret;
 
+  g_mutex_lock (agent->mutex);
   if (!agent_find_component (agent, stream_id, component_id, NULL, &component))
-    return NULL;
+    {
+      g_mutex_unlock (agent->mutex);
+      return NULL;
+    }
+
+  ret = g_slist_copy (component->local_candidates);
 
-  return g_slist_copy (component->local_candidates);
+  g_mutex_unlock (agent->mutex);
+  return ret;
 }
 
 
@@ -1403,14 +1475,22 @@ nice_agent_get_remote_candidates (
   guint component_id)
 {
   Component *component;
+  GSList *ret;
 
+  g_mutex_lock (agent->mutex);
   if (!agent_find_component (agent, stream_id, component_id, NULL, &component))
-    return NULL;
+    {
+      g_mutex_unlock (agent->mutex);
+      return NULL;
+    }
 
   /* XXX: should we expose NiceCandidate to the client, or should
    *      we instead return a list of NiceCandidateDesc's? */
 
-  return g_slist_copy (component->remote_candidates);
+  ret = g_slist_copy (component->remote_candidates);
+
+  g_mutex_unlock (agent->mutex);
+  return ret;
 }
 
 /**
@@ -1431,6 +1511,8 @@ nice_agent_restart (
   GSList *i;
   gboolean res = TRUE;
 
+  g_mutex_lock (agent->mutex);
+
   /* step: clean up all connectivity checks */
   conn_check_free (agent);
 
@@ -1445,6 +1527,7 @@ nice_agent_restart (
     res = stream_restart (stream, agent->rng);
   }
 
+  g_mutex_unlock (agent->mutex);
   return res;
 }
 
@@ -1493,6 +1576,8 @@ nice_agent_dispose (GObject *object)
 
   if (G_OBJECT_CLASS (nice_agent_parent_class)->dispose)
     G_OBJECT_CLASS (nice_agent_parent_class)->dispose (object);
+
+  g_mutex_free (agent->mutex);
 }
 
 
@@ -1549,6 +1634,8 @@ nice_agent_g_source_cb (
   gchar buf[MAX_STUN_DATAGRAM_PAYLOAD];
   guint len;
 
+  g_mutex_lock (agent->mutex);
+
   /* note: dear compiler, these are for you: */
   (void)source;
 
@@ -1559,6 +1646,7 @@ nice_agent_g_source_cb (
     agent->read_func (agent, stream->id, component->id,
         len, buf, agent->read_func_data);
 
+  g_mutex_unlock (agent->mutex);
   return TRUE;
 }
 
@@ -1636,16 +1724,21 @@ nice_agent_main_context_attach (
 {
   GSList *i;
 
-  if (agent->main_context_set)
+  g_mutex_lock (agent->mutex);
+  if (agent->main_context_set) {
+    g_mutex_unlock (agent->mutex);
     return FALSE;
+  }
 
   /* attach candidates */
 
   for (i = agent->streams; i; i = i->next) {
     Stream *stream = i->data;
     gboolean res = priv_attach_new_stream (agent, stream);
-    if (!res)
+    if (!res) {
+      g_mutex_unlock (agent->mutex);
       return FALSE;
+    }
   }
   
   agent->main_context = ctx;
@@ -1653,6 +1746,7 @@ nice_agent_main_context_attach (
   agent->read_func = func;
   agent->read_func_data = data;
 
+  g_mutex_unlock (agent->mutex);
   return TRUE;
 }
 
@@ -1675,12 +1769,18 @@ nice_agent_set_selected_pair (
   Stream *stream;
   CandidatePair pair;
 
+  g_mutex_lock (agent->mutex);
+
   /* step: check that params specify an existing pair */
-  if (!agent_find_component (agent, stream_id, component_id, &stream, &component))
+  if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) {
+    g_mutex_unlock (agent->mutex);
     return FALSE;
+  }
 
-  if (!component_find_pair (component, agent, lfoundation, rfoundation, &pair))
+  if (!component_find_pair (component, agent, lfoundation, rfoundation, &pair)){
+    g_mutex_unlock (agent->mutex);
     return FALSE;
+  }
 
   /* step: stop connectivity checks (note: for the whole stream) */
   conn_check_prune_stream (agent, stream); 
@@ -1692,5 +1792,6 @@ nice_agent_set_selected_pair (
   component_update_selected_pair (component, &pair);
   agent_signal_new_selected_pair (agent, stream_id, component_id, lfoundation, rfoundation);
 
+  g_mutex_unlock (agent->mutex);
   return TRUE;
 }
diff --git a/agent/conncheck.c b/agent/conncheck.c
index 83176fe..e53742b 100644
--- a/agent/conncheck.c
+++ b/agent/conncheck.c
@@ -322,6 +322,7 @@ static gboolean priv_conn_check_tick_stream (Stream *stream, NiceAgent *agent, G
 
 }
 
+
 /**
  * Timer callback that handles initiating and managing connectivity
  * checks (paced by the Ta timer).
@@ -330,7 +331,7 @@ static gboolean priv_conn_check_tick_stream (Stream *stream, NiceAgent *agent, G
  *
  * @return will return FALSE when no more pending timers.
  */
-static gboolean priv_conn_check_tick (gpointer pointer)
+static gboolean priv_conn_check_tick_unlocked (gpointer pointer)
 {
   CandidateCheckPair *pair = NULL;
   NiceAgent *agent = pointer;
@@ -382,6 +383,18 @@ static gboolean priv_conn_check_tick (gpointer pointer)
   return keep_timer_going;
 }
 
+static gboolean priv_conn_check_tick (gpointer pointer)
+{
+  NiceAgent *agent = pointer;
+  gboolean ret;
+
+  g_mutex_lock (agent->mutex);
+  ret = priv_conn_check_tick_unlocked (pointer);
+  g_mutex_unlock (agent->mutex);
+
+  return ret;
+}
+
 /**
  * Timer callback that handles initiating and managing connectivity
  * checks (paced by the Ta timer).
@@ -396,6 +409,8 @@ static gboolean priv_conn_keepalive_tick (gpointer pointer)
   GSList *i, *j;
   int errors = 0;
 
+  g_mutex_lock (agent->mutex);
+
   /* case 1: session established and media flowing
    *         (ref ICE sect 10 "Keepalives" ID-19)  */
   for (i = agent->streams; i; i = i->next) {
@@ -441,9 +456,11 @@ static gboolean priv_conn_keepalive_tick (gpointer pointer)
     
   if (errors) {
     g_debug ("%s: stopping keepalive timer", G_STRFUNC);
+    g_mutex_unlock (agent->mutex);
     return FALSE;
   }
 
+  g_mutex_unlock (agent->mutex);
   return TRUE;
 }
 
@@ -461,7 +478,7 @@ gboolean conn_check_schedule_next (NiceAgent *agent)
 
   if (res == TRUE) {
     /* step: call once imediately */
-    res = priv_conn_check_tick ((gpointer) agent);
+    res = priv_conn_check_tick_unlocked ((gpointer) agent);
 
     /* step: schedule timer if not running yet */
     if (res && agent->conncheck_timer_id == 0) 
diff --git a/agent/discovery.c b/agent/discovery.c
index e9dd3b6..14d0d9c 100644
--- a/agent/discovery.c
+++ b/agent/discovery.c
@@ -459,7 +459,7 @@ NiceCandidate *discovery_learn_remote_peer_reflexive_candidate (
  *
  * @return will return FALSE when no more pending timers.
  */
-static gboolean priv_discovery_tick (gpointer pointer)
+static gboolean priv_discovery_tick_unlocked (gpointer pointer)
 {
   CandidateDiscovery *cand;
   NiceAgent *agent = pointer;
@@ -569,6 +569,18 @@ static gboolean priv_discovery_tick (gpointer pointer)
   return TRUE;
 }
 
+static gboolean priv_discovery_tick (gpointer pointer)
+{
+  NiceAgent *agent = pointer;
+  gboolean ret;
+
+  g_mutex_lock (agent->mutex);
+  ret = priv_discovery_tick_unlocked (pointer);
+  g_mutex_unlock (agent->mutex);
+
+  return ret;
+}
+
 /**
  * Initiates the candidate discovery process by starting
  * the necessary timers.
@@ -583,7 +595,7 @@ void discovery_schedule (NiceAgent *agent)
     
     if (agent->discovery_timer_id == 0) {
       /* step: run first iteration immediately */
-      gboolean res = priv_discovery_tick (agent);
+      gboolean res = priv_discovery_tick_unlocked (agent);
       if (res == TRUE) {
 	agent->discovery_timer_id = 
 	  g_timeout_add (agent->timer_ta, priv_discovery_tick, agent);
diff --git a/agent/test-add-remove-stream.c b/agent/test-add-remove-stream.c
index 48b6d3c..ff06c2a 100644
--- a/agent/test-add-remove-stream.c
+++ b/agent/test-add-remove-stream.c
@@ -53,6 +53,7 @@ main (void)
 
   nice_address_init (&addr);
   g_type_init ();
+  g_thread_init (NULL);
 
   nice_udp_fake_socket_factory_init (&factory);
 
diff --git a/agent/test-fallback.c b/agent/test-fallback.c
index a65fe02..4b1366a 100644
--- a/agent/test-fallback.c
+++ b/agent/test-fallback.c
@@ -332,6 +332,7 @@ int main (void)
   const char *stun_server = NULL, *stun_server_port = NULL;
 
   g_type_init ();
+  g_thread_init (NULL);
   global_mainloop = g_main_loop_new (NULL, FALSE);
 
   /* Note: impl limits ...
diff --git a/agent/test-fullmode.c b/agent/test-fullmode.c
index cb04d4e..241335d 100644
--- a/agent/test-fullmode.c
+++ b/agent/test-fullmode.c
@@ -684,6 +684,7 @@ int main (void)
   const char *stun_server = NULL, *stun_server_port = NULL;
 
   g_type_init ();
+  g_thread_init (NULL);
   global_mainloop = g_main_loop_new (NULL, FALSE);
 
   /* Note: impl limits ...
diff --git a/agent/test-mainloop.c b/agent/test-mainloop.c
index eb86a38..827db76 100644
--- a/agent/test-mainloop.c
+++ b/agent/test-mainloop.c
@@ -72,6 +72,7 @@ main (void)
 
   nice_address_init (&addr);
   g_type_init ();
+  g_thread_init (NULL);
 
   nice_udp_fake_socket_factory_init (&factory);
   agent = nice_agent_new (&factory);
diff --git a/agent/test-poll.c b/agent/test-poll.c
index d78ae19..be5fd39 100644
--- a/agent/test-poll.c
+++ b/agent/test-poll.c
@@ -80,6 +80,7 @@ main (void)
 
   nice_address_init (&addr);
   g_type_init ();
+  g_thread_init (NULL);
 
   /* set up agent */
 
diff --git a/agent/test-recv.c b/agent/test-recv.c
index 12abdc6..7636bf5 100644
--- a/agent/test-recv.c
+++ b/agent/test-recv.c
@@ -53,6 +53,7 @@ main (void)
 
   nice_address_init (&addr);
   g_type_init ();
+  g_thread_init (NULL);
 
   nice_udp_fake_socket_factory_init (&factory);
 
diff --git a/agent/test-restart.c b/agent/test-restart.c
index 81b6809..821a366 100644
--- a/agent/test-restart.c
+++ b/agent/test-restart.c
@@ -375,6 +375,7 @@ int main (void)
   const char *stun_server = NULL, *stun_server_port = NULL;
 
   g_type_init ();
+  g_thread_init (NULL);
   global_mainloop = g_main_loop_new (NULL, FALSE);
 
   /* Note: impl limits ...
diff --git a/agent/test.c b/agent/test.c
index 7f7a8b2..dd5eb2f 100644
--- a/agent/test.c
+++ b/agent/test.c
@@ -56,6 +56,7 @@ main (void)
   nice_address_init (&addr_local);
   nice_address_init (&addr_remote);
   g_type_init ();
+  g_thread_init (NULL);
 
   nice_udp_fake_socket_factory_init (&factory);
 
diff --git a/configure.ac b/configure.ac
index c2b0e15..bc74978 100644
--- a/configure.ac
+++ b/configure.ac
@@ -46,6 +46,7 @@ PKG_CHECK_MODULES(OPENSSL, [openssl])
 PKG_CHECK_MODULES(GLIB, [dnl
 	glib-2.0 >= 2.10 dnl
 	gobject-2.0 >= 2.10 dnl
+	gthread-2.0 >= 2.10 dnl
 	])
 
 AC_ARG_WITH(gstreamer, 
-- 
1.5.6.5




More information about the Nice mailing list