[farsight2/master] Add implementation of the UdpStream sub-object

Olivier Crête olivier.crete at collabora.co.uk
Tue Dec 23 15:19:26 PST 2008


---
 transmitters/rawudp/fs-rawudp-transmitter.c |  349 +++++++++++++++++++++++++++
 transmitters/rawudp/fs-rawudp-transmitter.h |   15 ++
 2 files changed, 364 insertions(+), 0 deletions(-)

diff --git a/transmitters/rawudp/fs-rawudp-transmitter.c b/transmitters/rawudp/fs-rawudp-transmitter.c
index 0a2194b..1c5fb74 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.c
+++ b/transmitters/rawudp/fs-rawudp-transmitter.c
@@ -38,6 +38,15 @@
 #include "fs-rawudp-stream-transmitter.h"
 
 #include <gst/farsight/fs-session.h>
+#include <gst/farsight/fs-stream.h>
+
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <unistd.h>
 
 
 /* Signals */
@@ -67,6 +76,8 @@ struct _FsRawUdpTransmitterPrivate
   GstElement *udpsink_tee;
   GstElement *udprtcpsink_tee;
 
+  GList *udpstreams;
+
   gboolean disposed;
 };
 
@@ -360,3 +371,341 @@ fs_rawudp_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
   return FS_STREAM_TRANSMITTER (fs_rawudp_stream_transmitter_newv (
         n_parameters, parameters, error));
 }
+
+
+/*
+ * We have an UdpStream sub-object, this is linked to two udp srcs
+ * and two udpmultisinks
+ * It corresponds to an ip:port combo for rtp and one for rtcp
+ * from which the packets are sent and on which they are received
+ */
+
+struct _UdpStream {
+  gint refcount;
+
+  GstElement *udpsrc;
+  GstPad *udpsrc_requested_pad;
+
+  GstElement *udprtcpsrc;
+  GstPad *udprtcpsrc_requested_pad;
+
+  GstElement *udpsink;
+  GstPad *udpsink_requested_pad;
+
+  GstElement *udprtcpsink;
+  GstPad *udprtcpsink_requested_pad;
+
+  gchar *requested_ip;
+  guint requested_port;
+  gchar *requested_rtcp_ip;
+  guint requested_rtcp_port;
+
+  gint rtpfd;
+  gint rtcpfd;
+};
+
+static gint
+_bind_port (const gchar *ip, guint port, GError **error)
+{
+  int sock;
+  struct sockaddr_in address;
+  int retval;
+
+  if (ip) {
+    struct addrinfo hints;
+    struct addrinfo *result = NULL;
+
+    memset (&hints, 0, sizeof (struct addrinfo));
+    hints.ai_family = AF_INET;
+    hints.ai_flags = AI_NUMERICHOST;
+    retval = getaddrinfo (ip, NULL, &hints, &result);
+    if (retval != 0) {
+      *error = g_error_new (FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+        "Invalid IP address %s passed: %s", ip, gai_strerror (retval));
+      return -1;
+    }
+    memcpy (&address, result->ai_addr, sizeof(struct sockaddr_in));
+    freeaddrinfo (result);
+  } else {
+    address.sin_addr.s_addr = INADDR_ANY;
+  }
+  address.sin_family = AF_INET;
+  address.sin_port = htons (port);
+
+
+  if ((sock = socket (AF_INET, SOCK_DGRAM, 0)) <= 0) {
+    *error = g_error_new (FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+      "Error creating socket: %s", g_strerror (errno));
+    return -1;
+  }
+
+  do {
+    address.sin_family = AF_INET;
+    address.sin_addr.s_addr = INADDR_ANY;
+    address.sin_port = htons (port);
+    retval = bind (sock, (struct sockaddr *) &address, sizeof (address));
+    if (retval != 0)
+    {
+      g_debug ("could not bind port %d", port);
+      port += 2;
+      if (port > 65535) {
+        *error = g_error_new (FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+          "Could not bind the socket to a port");
+        close (sock);
+        return -1;
+      }
+    }
+  } while (retval != 0);
+
+  return sock;
+}
+
+static GstElement *
+_create_sinksource (gchar *elementname, GstBin *bin,
+  GstElement *teefunnel, gint fd, GstPadDirection direction,
+  GstPad **requested_pad, GError **error)
+{
+  GstElement *elem;
+  GstPadLinkReturn ret;
+  GstPad *ourpad = NULL;
+
+  g_assert (direction == GST_PAD_SINK || direction == GST_PAD_SRC);
+
+  elem = gst_element_factory_make (elementname, NULL);
+  if (!elem) {
+    *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+      "Could not create the %s element", elementname);
+    return NULL;
+  }
+
+  g_object_set (elem,
+    "closefd", FALSE,
+    "sockfd", fd,
+    NULL);
+
+  if (!gst_bin_add (bin, elem)) {
+    *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+      "Could not add the udpsrc element to the gst sink");
+    gst_object_unref (elem);
+    return NULL;
+  }
+
+  if (direction == GST_PAD_SINK)
+    *requested_pad = gst_element_get_request_pad (teefunnel, "sink%d");
+  else
+    *requested_pad = gst_element_get_request_pad (teefunnel, "src%d");
+
+  if (!*requested_pad) {
+    *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+      "Could not get the %s request pad from the %s",
+      (direction == GST_PAD_SINK) ? "sink" : "src",
+      (direction == GST_PAD_SINK) ? "tee" : "funnel");
+    goto error;
+  }
+
+  if (direction == GST_PAD_SINK)
+    ourpad = gst_element_get_static_pad (elem, "src");
+  else
+    ourpad = gst_element_get_static_pad (elem, "sink");
+
+  if (direction == GST_PAD_SINK)
+    ret = gst_pad_link (*requested_pad, ourpad);
+  else
+    ret = gst_pad_link (ourpad, *requested_pad);
+
+  if (GST_PAD_LINK_FAILED(ret)) {
+    *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+      "Could not link the new element %s (%d)", elementname, ret);
+    goto error;
+  }
+
+  if (!gst_element_sync_state_with_parent (elem)) {
+    *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+      "Could not sync the state of the new %s with its parent",
+      elementname);
+    goto error;
+  }
+
+  return elem;
+
+ error:
+  gst_element_set_state (elem, GST_STATE_NULL);
+  gst_bin_remove (bin, elem);
+  return NULL;
+}
+
+
+UdpStream *
+fs_rawudp_transmitter_get_udpstream (FsRawUdpTransmitter *trans,
+  const gchar *requested_ip, guint requested_port,
+  const gchar *requested_rtcp_ip, guint requested_rtcp_port,
+  GError **error)
+{
+  UdpStream *udpstream;
+  GList *udpstream_e;
+
+  *error = NULL;
+
+  /* First lets check if we already have one */
+
+  for (udpstream_e = g_list_first (trans->priv->udpstreams);
+       udpstream_e;
+       udpstream_e = g_list_next (udpstream_e)) {
+    udpstream = udpstream_e->data;
+    if ((requested_port == udpstream->requested_port &&
+        ((requested_ip == NULL && udpstream->requested_ip == NULL) ||
+          !strcmp (requested_ip, udpstream->requested_ip))) &&
+      (requested_rtcp_port == udpstream->requested_rtcp_port &&
+        ((requested_rtcp_ip == NULL && udpstream->requested_rtcp_ip == NULL) ||
+          !strcmp (requested_rtcp_ip, udpstream->requested_rtcp_ip)))) {
+      udpstream->refcount++;
+      return udpstream;
+    }
+  }
+
+  udpstream = g_new0 (UdpStream, 1);
+
+  udpstream->refcount = 1;
+  udpstream->requested_ip = g_strdup (requested_ip);
+  udpstream->requested_port = requested_port;
+  udpstream->requested_rtcp_ip = g_strdup (requested_rtcp_ip);
+  udpstream->requested_rtcp_port = requested_rtcp_port;
+  udpstream->rtpfd = -1;
+  udpstream->rtcpfd = -1;
+
+  /* Now lets bind both ports */
+
+  udpstream->rtpfd = _bind_port (requested_ip, requested_port, error);
+  if (udpstream->rtpfd < 0)
+    goto error;
+
+  udpstream->rtcpfd = _bind_port (requested_rtcp_ip, requested_rtcp_port,
+    error);
+  if (udpstream->rtcpfd < 0)
+    goto error;
+
+  /* Now lets create the elements */
+
+  udpstream->udpsrc = _create_sinksource ("udpsrc",
+    GST_BIN (trans->priv->gst_src), trans->priv->udpsrc_funnel,
+    udpstream->rtpfd, GST_PAD_SRC, &udpstream->udpsrc_requested_pad,
+    error);
+  if (!udpstream->udpsrc)
+    goto error;
+
+  udpstream->udprtcpsrc = _create_sinksource ("udpsrc",
+    GST_BIN (trans->priv->gst_src), trans->priv->udprtcpsrc_funnel,
+    udpstream->rtcpfd, GST_PAD_SRC, &udpstream->udprtcpsrc_requested_pad,
+    error);
+  if (!udpstream->udpsrc)
+    goto error;
+
+  udpstream->udpsink = _create_sinksource ("udpsink",
+    GST_BIN (trans->priv->gst_sink), trans->priv->udpsink_tee,
+    udpstream->rtpfd, GST_PAD_SINK, &udpstream->udpsink_requested_pad,
+    error);
+  if (!udpstream->udpsink)
+    goto error;
+
+  udpstream->udprtcpsink = _create_sinksource ("udpsink",
+    GST_BIN (trans->priv->gst_sink), trans->priv->udprtcpsink_tee,
+    udpstream->rtcpfd, GST_PAD_SINK, &udpstream->udprtcpsink_requested_pad,
+    error);
+  if (!udpstream->udpsink)
+    goto error;
+
+  g_object_set (udpstream->udprtcpsink, "async", FALSE, NULL);
+
+  trans->priv->udpstreams = g_list_prepend (trans->priv->udpstreams,
+    udpstream);
+
+  return udpstream;
+
+ error:
+  if (udpstream)
+    fs_rawudp_transmitter_put_udpstream (trans, udpstream);
+  return NULL;
+}
+
+void
+fs_rawudp_transmitter_put_udpstream (FsRawUdpTransmitter *trans,
+  UdpStream *udpstream)
+{
+  if (udpstream->refcount > 1) {
+    udpstream->refcount--;
+    return;
+  }
+
+  trans->priv->udpstreams = g_list_remove (trans->priv->udpstreams, udpstream);
+
+  if (udpstream->udpsrc) {
+    GstStateChangeReturn ret;
+    ret = gst_element_set_state (udpstream->udpsrc, GST_STATE_NULL);
+    if (ret != GST_STATE_CHANGE_SUCCESS) {
+      g_warning ("Error changing state of udpsrc: %d", ret);
+    }
+    gst_bin_remove (GST_BIN (trans->priv->gst_src), udpstream->udpsrc);
+  }
+
+  if (udpstream->udpsrc_requested_pad) {
+    gst_element_release_request_pad (trans->priv->udpsrc_funnel,
+      udpstream->udpsrc_requested_pad);
+  }
+
+  if (udpstream->udprtcpsrc) {
+    GstStateChangeReturn ret;
+    ret = gst_element_set_state (udpstream->udprtcpsrc, GST_STATE_NULL);
+    if (ret != GST_STATE_CHANGE_SUCCESS) {
+      g_warning ("Error changing state of udprtcpsrc: %d", ret);
+    }
+    gst_bin_remove (GST_BIN (trans->priv->gst_src), udpstream->udprtcpsrc);
+  }
+
+  if (udpstream->udprtcpsrc_requested_pad) {
+    gst_element_release_request_pad (trans->priv->udprtcpsrc_funnel,
+      udpstream->udprtcpsrc_requested_pad);
+  }
+
+
+  if (udpstream->udpsink) {
+    GstStateChangeReturn ret;
+    gst_object_ref (udpstream->udpsink);
+    gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpstream->udpsink);
+    ret = gst_element_set_state (udpstream->udpsink, GST_STATE_NULL);
+    if (ret != GST_STATE_CHANGE_SUCCESS) {
+      g_warning ("Error changing state of udpsink: %d", ret);
+    }
+    gst_object_unref (udpstream->udpsink);
+  }
+
+  if (udpstream->udpsink_requested_pad) {
+    gst_element_release_request_pad (trans->priv->udpsink_tee,
+      udpstream->udpsink_requested_pad);
+  }
+
+  if (udpstream->udprtcpsink) {
+    GstStateChangeReturn ret;
+    gst_object_ref (udpstream->udpsink);
+    gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpstream->udprtcpsink);
+    ret = gst_element_set_state (udpstream->udprtcpsink, GST_STATE_NULL);
+    if (ret != GST_STATE_CHANGE_SUCCESS) {
+      g_warning ("Error changing state of udprtcpsink: %d", ret);
+    }
+    gst_object_unref (udpstream->udpsink);
+  }
+
+  if (udpstream->udprtcpsink_requested_pad) {
+    gst_element_release_request_pad (trans->priv->udprtcpsink_tee,
+      udpstream->udprtcpsink_requested_pad);
+  }
+
+
+  if (udpstream->rtpfd >= 0)
+    close (udpstream->rtpfd);
+  if (udpstream->rtcpfd >= 0)
+    close (udpstream->rtcpfd);
+
+  g_free (udpstream->requested_ip);
+  g_free (udpstream->requested_rtcp_ip);
+  g_free (udpstream);
+}
diff --git a/transmitters/rawudp/fs-rawudp-transmitter.h b/transmitters/rawudp/fs-rawudp-transmitter.h
index 7269b16..4da2697 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.h
+++ b/transmitters/rawudp/fs-rawudp-transmitter.h
@@ -80,8 +80,23 @@ struct _FsRawUdpTransmitter
   gpointer _padding[8];
 };
 
+/* Private declaration */
+typedef struct _UdpStream UdpStream;
+
 GType fs_rawudp_transmitter_get_type (void);
 
+
+
+UdpStream *
+fs_rawudp_transmitter_get_udpstream (FsRawUdpTransmitter *trans,
+  const gchar *requested_ip, guint requested_port,
+  const gchar *requested_rtcp_ip, guint requested_rtcp_port,
+  GError **error);
+
+void fs_rawudp_transmitter_put_udpstream (FsRawUdpTransmitter *trans,
+  UdpStream *udpstream);
+
+
 G_END_DECLS
 
 #endif /* __FS_RAWUDP_TRANSMITTER_H__ */
-- 
1.5.6.5




More information about the farsight-commits mailing list