[farsight2/master] Replace the single UdpStream by two UdpPort structs (makes the code much more clean)

Olivier Crête olivier.crete at collabora.co.uk
Tue Dec 23 15:19:27 PST 2008


---
 transmitters/rawudp/fs-rawudp-transmitter.c |  291 ++++++++++++++-------------
 transmitters/rawudp/fs-rawudp-transmitter.h |   34 +++-
 2 files changed, 171 insertions(+), 154 deletions(-)

diff --git a/transmitters/rawudp/fs-rawudp-transmitter.c b/transmitters/rawudp/fs-rawudp-transmitter.c
index f0ae1f6..891a4ef 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.c
+++ b/transmitters/rawudp/fs-rawudp-transmitter.c
@@ -76,7 +76,8 @@ struct _FsRawUdpTransmitterPrivate
   GstElement *udpsink_tee;
   GstElement *udprtcpsink_tee;
 
-  GList *udpstreams;
+  GList *rtp_udpports;
+  GList *rtcp_udpports;
 
   gboolean disposed;
 };
@@ -375,34 +376,26 @@ fs_rawudp_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
 
 
 /*
- * We have an UdpStream sub-object, this is linked to two udp srcs
- * and two udpmultisinks
- * It corresponds to an ip:port combo for rtp and one for rtcp
- * from which the packets are sent and on which they are received
+ * The UdpPort structure is a ref-counted pseudo-object use to represent
+ * one ip:port combo on which we listen and send, so it includes  a udpsrc
+ * and a multiudpsink
  */
 
-struct _UdpStream {
+struct _UdpPort {
   gint refcount;
 
   GstElement *udpsrc;
   GstPad *udpsrc_requested_pad;
 
-  GstElement *udprtcpsrc;
-  GstPad *udprtcpsrc_requested_pad;
-
   GstElement *udpsink;
   GstPad *udpsink_requested_pad;
 
-  GstElement *udprtcpsink;
-  GstPad *udprtcpsink_requested_pad;
-
   gchar *requested_ip;
   guint requested_port;
-  gchar *requested_rtcp_ip;
-  guint requested_rtcp_port;
 
-  gint rtpfd;
-  gint rtcpfd;
+  gint fd;
+
+  guint component_id;
 };
 
 static gint
@@ -536,200 +529,210 @@ _create_sinksource (gchar *elementname, GstBin *bin,
 }
 
 
-UdpStream *
-fs_rawudp_transmitter_get_udpstream (FsRawUdpTransmitter *trans,
-  const gchar *requested_ip, guint requested_port,
-  const gchar *requested_rtcp_ip, guint requested_rtcp_port,
+UdpPort *
+fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans,
+  guint component_id, const gchar *requested_ip, guint requested_port,
   GError **error)
 {
-  UdpStream *udpstream;
-  GList *udpstream_e;
+  UdpPort *udpport;
+  GList *udpport_e;
 
   /* First lets check if we already have one */
 
-  for (udpstream_e = g_list_first (trans->priv->udpstreams);
-       udpstream_e;
-       udpstream_e = g_list_next (udpstream_e)) {
-    udpstream = udpstream_e->data;
-    if ((requested_port == udpstream->requested_port &&
-        ((requested_ip == NULL && udpstream->requested_ip == NULL) ||
-          !strcmp (requested_ip, udpstream->requested_ip))) &&
-      (requested_rtcp_port == udpstream->requested_rtcp_port &&
-        ((requested_rtcp_ip == NULL && udpstream->requested_rtcp_ip == NULL) ||
-          !strcmp (requested_rtcp_ip, udpstream->requested_rtcp_ip)))) {
-      udpstream->refcount++;
-      return udpstream;
+  if (component_id == FS_COMPONENT_RTP)
+    udpport_e = g_list_first (trans->priv->rtp_udpports);
+  else if (component_id == FS_COMPONENT_RTCP)
+    udpport_e = g_list_first (trans->priv->rtcp_udpports);
+  else {
+    g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_INVALID_ARGUMENTS,
+      "Invalid component %d", component_id);
+    return NULL;
+  }
+
+  for (; udpport_e;
+       udpport_e = g_list_next (udpport_e)) {
+    udpport = udpport_e->data;
+    if (requested_port == udpport->requested_port &&
+        ((requested_ip == NULL && udpport->requested_ip == NULL) ||
+          !strcmp (requested_ip, udpport->requested_ip))) {
+      udpport->refcount++;
+      return udpport;
     }
   }
 
-  udpstream = g_new0 (UdpStream, 1);
+  udpport = g_new0 (UdpPort, 1);
 
-  udpstream->refcount = 1;
-  udpstream->requested_ip = g_strdup (requested_ip);
-  udpstream->requested_port = requested_port;
-  udpstream->requested_rtcp_ip = g_strdup (requested_rtcp_ip);
-  udpstream->requested_rtcp_port = requested_rtcp_port;
-  udpstream->rtpfd = -1;
-  udpstream->rtcpfd = -1;
+  udpport->refcount = 1;
+  udpport->requested_ip = g_strdup (requested_ip);
+  udpport->requested_port = requested_port;
+  udpport->fd = -1;
+  udpport->component_id = component_id;
 
   /* Now lets bind both ports */
 
-  udpstream->rtpfd = _bind_port (requested_ip, requested_port, error);
-  if (udpstream->rtpfd < 0)
-    goto error;
-
-  udpstream->rtcpfd = _bind_port (requested_rtcp_ip, requested_rtcp_port,
-    error);
-  if (udpstream->rtcpfd < 0)
+  udpport->fd = _bind_port (requested_ip, requested_port, error);
+  if (udpport->fd < 0)
     goto error;
 
   /* Now lets create the elements */
 
-  udpstream->udpsrc = _create_sinksource ("udpsrc",
+  udpport->udpsrc = _create_sinksource ("udpsrc",
     GST_BIN (trans->priv->gst_src), trans->priv->udpsrc_funnel,
-    udpstream->rtpfd, GST_PAD_SRC, &udpstream->udpsrc_requested_pad,
-    error);
-  if (!udpstream->udpsrc)
-    goto error;
-
-  udpstream->udprtcpsrc = _create_sinksource ("udpsrc",
-    GST_BIN (trans->priv->gst_src), trans->priv->udprtcpsrc_funnel,
-    udpstream->rtcpfd, GST_PAD_SRC, &udpstream->udprtcpsrc_requested_pad,
+    udpport->fd, GST_PAD_SRC, &udpport->udpsrc_requested_pad,
     error);
-  if (!udpstream->udpsrc)
+  if (!udpport->udpsrc)
     goto error;
 
-  udpstream->udpsink = _create_sinksource ("udpsink",
+  udpport->udpsink = _create_sinksource ("udpsink",
     GST_BIN (trans->priv->gst_sink), trans->priv->udpsink_tee,
-    udpstream->rtpfd, GST_PAD_SINK, &udpstream->udpsink_requested_pad,
-    error);
-  if (!udpstream->udpsink)
-    goto error;
-
-  udpstream->udprtcpsink = _create_sinksource ("udpsink",
-    GST_BIN (trans->priv->gst_sink), trans->priv->udprtcpsink_tee,
-    udpstream->rtcpfd, GST_PAD_SINK, &udpstream->udprtcpsink_requested_pad,
+    udpport->fd, GST_PAD_SINK, &udpport->udpsink_requested_pad,
     error);
-  if (!udpstream->udpsink)
+  if (!udpport->udpsink)
     goto error;
 
-  g_object_set (udpstream->udprtcpsink, "async", FALSE, NULL);
+  if (component_id != FS_COMPONENT_RTP)
+    g_object_set (udpport->udpsink, "async", FALSE, NULL);
 
-  trans->priv->udpstreams = g_list_prepend (trans->priv->udpstreams,
-    udpstream);
+  if (component_id == FS_COMPONENT_RTP)
+    trans->priv->rtp_udpports = g_list_prepend (trans->priv->rtp_udpports,
+      udpport);
+  else if (component_id == FS_COMPONENT_RTCP)
+    trans->priv->rtcp_udpports = g_list_prepend (trans->priv->rtcp_udpports,
+      udpport);
 
-  return udpstream;
+  return udpport;
 
  error:
-  if (udpstream)
-    fs_rawudp_transmitter_put_udpstream (trans, udpstream);
+  if (udpport)
+    fs_rawudp_transmitter_put_udpport (trans, udpport);
   return NULL;
 }
 
 void
-fs_rawudp_transmitter_put_udpstream (FsRawUdpTransmitter *trans,
-  UdpStream *udpstream)
+fs_rawudp_transmitter_put_udpport (FsRawUdpTransmitter *trans,
+  UdpPort *udpport)
 {
-  if (udpstream->refcount > 1) {
-    udpstream->refcount--;
+  if (udpport->refcount > 1) {
+    udpport->refcount--;
     return;
   }
 
-  trans->priv->udpstreams = g_list_remove (trans->priv->udpstreams, udpstream);
+  if (udpport->component_id == FS_COMPONENT_RTP)
+    trans->priv->rtp_udpports = g_list_remove (trans->priv->rtp_udpports,
+      udpport);
+  else if (udpport->component_id == FS_COMPONENT_RTCP)
+    trans->priv->rtcp_udpports = g_list_remove (trans->priv->rtcp_udpports,
+      udpport);
 
-  if (udpstream->udpsrc) {
+  if (udpport->udpsrc) {
     GstStateChangeReturn ret;
-    ret = gst_element_set_state (udpstream->udpsrc, GST_STATE_NULL);
+    ret = gst_element_set_state (udpport->udpsrc, GST_STATE_NULL);
     if (ret != GST_STATE_CHANGE_SUCCESS) {
       g_warning ("Error changing state of udpsrc: %d", ret);
     }
-    gst_bin_remove (GST_BIN (trans->priv->gst_src), udpstream->udpsrc);
+    gst_bin_remove (GST_BIN (trans->priv->gst_src), udpport->udpsrc);
   }
 
-  if (udpstream->udpsrc_requested_pad) {
+  if (udpport->udpsrc_requested_pad) {
     gst_element_release_request_pad (trans->priv->udpsrc_funnel,
-      udpstream->udpsrc_requested_pad);
+      udpport->udpsrc_requested_pad);
   }
 
-  if (udpstream->udprtcpsrc) {
+  if (udpport->udpsink) {
     GstStateChangeReturn ret;
-    ret = gst_element_set_state (udpstream->udprtcpsrc, GST_STATE_NULL);
+    gst_object_ref (udpport->udpsink);
+    gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpport->udpsink);
+    ret = gst_element_set_state (udpport->udpsink, GST_STATE_NULL);
     if (ret != GST_STATE_CHANGE_SUCCESS) {
-      g_warning ("Error changing state of udprtcpsrc: %d", ret);
+      g_warning ("Error changing state of udpsink: %d", ret);
     }
-    gst_bin_remove (GST_BIN (trans->priv->gst_src), udpstream->udprtcpsrc);
+    gst_object_unref (udpport->udpsink);
   }
 
-  if (udpstream->udprtcpsrc_requested_pad) {
-    gst_element_release_request_pad (trans->priv->udprtcpsrc_funnel,
-      udpstream->udprtcpsrc_requested_pad);
+  if (udpport->udpsink_requested_pad) {
+    gst_element_release_request_pad (trans->priv->udpsink_tee,
+      udpport->udpsink_requested_pad);
   }
 
+  if (udpport->fd >= 0)
+    close (udpport->fd);
 
-  if (udpstream->udpsink) {
-    GstStateChangeReturn ret;
-    gst_object_ref (udpstream->udpsink);
-    gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpstream->udpsink);
-    ret = gst_element_set_state (udpstream->udpsink, GST_STATE_NULL);
-    if (ret != GST_STATE_CHANGE_SUCCESS) {
-      g_warning ("Error changing state of udpsink: %d", ret);
-    }
-    gst_object_unref (udpstream->udpsink);
-  }
+  g_free (udpport->requested_ip);
+  g_free (udpport);
+}
 
-  if (udpstream->udpsink_requested_pad) {
-    gst_element_release_request_pad (trans->priv->udpsink_tee,
-      udpstream->udpsink_requested_pad);
-  }
+void
+fs_rawudp_transmitter_udpport_add_dest (UdpPort *udpport,
+  const gchar *ip, gint port)
+{
+  g_signal_emit_by_name (udpport->udpsink, "add", 0,
+    ip, port, NULL);
+}
 
-  if (udpstream->udprtcpsink) {
-    GstStateChangeReturn ret;
-    gst_object_ref (udpstream->udpsink);
-    gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpstream->udprtcpsink);
-    ret = gst_element_set_state (udpstream->udprtcpsink, GST_STATE_NULL);
-    if (ret != GST_STATE_CHANGE_SUCCESS) {
-      g_warning ("Error changing state of udprtcpsink: %d", ret);
-    }
-    gst_object_unref (udpstream->udpsink);
-  }
 
-  if (udpstream->udprtcpsink_requested_pad) {
-    gst_element_release_request_pad (trans->priv->udprtcpsink_tee,
-      udpstream->udprtcpsink_requested_pad);
+void
+fs_rawudp_transmitter_udpport_remove_dest (UdpPort *udpport,
+  const gchar *ip, gint port)
+{
+  g_signal_emit_by_name (udpport->udpsink, "remove", 0,
+    ip, port, NULL);
+}
+
+gboolean
+fs_rawudp_transmitter_udpport_sendto (UdpPort *udpport,
+  gchar *msg, size_t len, const struct sockaddr *to, socklen_t tolen,
+  GError **error)
+{
+  if (sendto (udpport->fd, msg, len, 0, to, tolen) != len) {
+    g_set_error (error, FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+      "Could not send STUN request: %s", g_strerror (errno));
+    return FALSE;
   }
 
+  return TRUE;
+}
+
+gulong
+fs_rawudp_transmitter_udpport_connect_recv (UdpPort *udpport,
+  GCallback callback, gpointer user_data)
+{
+  GstPad *pad;
+  gulong id;
 
-  if (udpstream->rtpfd >= 0)
-    close (udpstream->rtpfd);
-  if (udpstream->rtcpfd >= 0)
-    close (udpstream->rtcpfd);
+  pad = gst_element_get_static_pad (udpport->udpsrc, "src");
+
+  id = gst_pad_add_buffer_probe (pad, callback, user_data);
+
+  gst_object_unref (pad);
 
-  g_free (udpstream->requested_ip);
-  g_free (udpstream->requested_rtcp_ip);
-  g_free (udpstream);
+  return id;
 }
 
+
 void
-fs_rawudp_transmitter_udpstream_add_dest (UdpStream *udpstream,
-  const gchar *ip, gint port, gboolean is_rtcp)
+fs_rawudp_transmitter_udpport_disconnect_recv (UdpPort *udpport, gulong id)
 {
-  if (is_rtcp)
-    g_signal_emit_by_name (udpstream->udpsink, "add", 0,
-      ip, port, NULL);
-  else
-    g_signal_emit_by_name (udpstream->udprtcpsink, "add", 0,
-      ip, port, NULL);
-}
+  GstPad *pad = gst_element_get_static_pad (udpport->udpsrc, "src");
 
+  gst_pad_remove_buffer_probe (pad, id);
 
-void
-fs_rawudp_transmitter_udpstream_remove_dest (UdpStream *udpstream,
-  const gchar *ip, gint port, gboolean is_rtcp)
+  gst_object_unref (pad);
+}
+
+gboolean
+fs_rawudp_transmitter_udpport_is_pad (UdpPort *udpport, GstPad *pad)
 {
-  if (is_rtcp)
-    g_signal_emit_by_name (udpstream->udpsink, "remove", 0,
-      ip, port, NULL);
-  else
-    g_signal_emit_by_name (udpstream->udprtcpsink, "remove", 0,
-      ip, port, NULL);
+  GstPad *mypad;
+  gboolean res;
+
+  mypad =  gst_element_get_static_pad (udpport->udpsrc, "src");
+
+  res = (mypad == pad);
+
+  if (mypad)
+    gst_object_unref (mypad);
+
+  return res;
 }
+
+
diff --git a/transmitters/rawudp/fs-rawudp-transmitter.h b/transmitters/rawudp/fs-rawudp-transmitter.h
index 83d8675..05ffd52 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.h
+++ b/transmitters/rawudp/fs-rawudp-transmitter.h
@@ -27,6 +27,10 @@
 
 #include <gst/farsight/fs-transmitter.h>
 
+#include <gst/gst.h>
+
+#include <arpa/inet.h>
+
 G_BEGIN_DECLS
 
 /* TYPE MACROS */
@@ -81,24 +85,34 @@ struct _FsRawUdpTransmitter
 };
 
 /* Private declaration */
-typedef struct _UdpStream UdpStream;
+typedef struct _UdpPort UdpPort;
 
 GType fs_rawudp_transmitter_get_type (void);
 
 
 
-UdpStream *fs_rawudp_transmitter_get_udpstream (FsRawUdpTransmitter *trans,
-  const gchar *requested_ip, guint requested_port,
-  const gchar *requested_rtcp_ip, guint requested_rtcp_port,
+UdpPort *fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans,
+  guint component_id, const gchar *requested_ip, guint requested_port,
+  GError **error);
+
+void fs_rawudp_transmitter_put_udpport (FsRawUdpTransmitter *trans,
+  UdpPort *udpport);
+
+void fs_rawudp_transmitter_udpport_add_dest (UdpPort *udpport,
+  const gchar *ip, gint port);
+void fs_rawudp_transmitter_udpport_remove_dest (UdpPort *udpport,
+  const gchar *ip, gint port);
+
+gboolean fs_rawudp_transmitter_udpport_sendto (UdpPort *udpport,
+  gchar *msg, size_t len, const struct sockaddr *to, socklen_t tolen,
   GError **error);
 
-void fs_rawudp_transmitter_put_udpstream (FsRawUdpTransmitter *trans,
-  UdpStream *udpstream);
+gulong fs_rawudp_transmitter_udpport_connect_recv (UdpPort *udpport,
+  GCallback callback, gpointer user_data);
+void fs_rawudp_transmitter_udpport_disconnect_recv (UdpPort *udpport,
+  gulong id);
 
-void fs_rawudp_transmitter_udpstream_add_dest (UdpStream *udpstream,
-  const gchar *ip, gint port, gboolean is_rtcp);
-void fs_rawudp_transmitter_udpstream_remove_dest (UdpStream *udpstream,
-  const gchar *ip, gint port, gboolean is_rtcp);
+gboolean fs_rawudp_transmitter_udpport_is_pad (UdpPort *udpport, GstPad *pad);
 
 
 
-- 
1.5.6.5




More information about the farsight-commits mailing list