[farsight2/master] Add implementation of the UdpStream sub-object
Olivier Crête
olivier.crete at collabora.co.uk
Tue Dec 23 15:19:26 PST 2008
---
transmitters/rawudp/fs-rawudp-transmitter.c | 349 +++++++++++++++++++++++++++
transmitters/rawudp/fs-rawudp-transmitter.h | 15 ++
2 files changed, 364 insertions(+), 0 deletions(-)
diff --git a/transmitters/rawudp/fs-rawudp-transmitter.c b/transmitters/rawudp/fs-rawudp-transmitter.c
index 0a2194b..1c5fb74 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.c
+++ b/transmitters/rawudp/fs-rawudp-transmitter.c
@@ -38,6 +38,15 @@
#include "fs-rawudp-stream-transmitter.h"
#include <gst/farsight/fs-session.h>
+#include <gst/farsight/fs-stream.h>
+
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <unistd.h>
/* Signals */
@@ -67,6 +76,8 @@ struct _FsRawUdpTransmitterPrivate
GstElement *udpsink_tee;
GstElement *udprtcpsink_tee;
+ GList *udpstreams;
+
gboolean disposed;
};
@@ -360,3 +371,341 @@ fs_rawudp_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
return FS_STREAM_TRANSMITTER (fs_rawudp_stream_transmitter_newv (
n_parameters, parameters, error));
}
+
+
+/*
+ * We have an UdpStream sub-object, this is linked to two udp srcs
+ * and two udpmultisinks
+ * It corresponds to an ip:port combo for rtp and one for rtcp
+ * from which the packets are sent and on which they are received
+ */
+
+struct _UdpStream {
+ gint refcount;
+
+ GstElement *udpsrc;
+ GstPad *udpsrc_requested_pad;
+
+ GstElement *udprtcpsrc;
+ GstPad *udprtcpsrc_requested_pad;
+
+ GstElement *udpsink;
+ GstPad *udpsink_requested_pad;
+
+ GstElement *udprtcpsink;
+ GstPad *udprtcpsink_requested_pad;
+
+ gchar *requested_ip;
+ guint requested_port;
+ gchar *requested_rtcp_ip;
+ guint requested_rtcp_port;
+
+ gint rtpfd;
+ gint rtcpfd;
+};
+
+static gint
+_bind_port (const gchar *ip, guint port, GError **error)
+{
+ int sock;
+ struct sockaddr_in address;
+ int retval;
+
+ if (ip) {
+ struct addrinfo hints;
+ struct addrinfo *result = NULL;
+
+ memset (&hints, 0, sizeof (struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_flags = AI_NUMERICHOST;
+ retval = getaddrinfo (ip, NULL, &hints, &result);
+ if (retval != 0) {
+ *error = g_error_new (FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+ "Invalid IP address %s passed: %s", ip, gai_strerror (retval));
+ return -1;
+ }
+ memcpy (&address, result->ai_addr, sizeof(struct sockaddr_in));
+ freeaddrinfo (result);
+ } else {
+ address.sin_addr.s_addr = INADDR_ANY;
+ }
+ address.sin_family = AF_INET;
+ address.sin_port = htons (port);
+
+
+ if ((sock = socket (AF_INET, SOCK_DGRAM, 0)) <= 0) {
+ *error = g_error_new (FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+ "Error creating socket: %s", g_strerror (errno));
+ return -1;
+ }
+
+ do {
+ address.sin_family = AF_INET;
+ address.sin_addr.s_addr = INADDR_ANY;
+ address.sin_port = htons (port);
+ retval = bind (sock, (struct sockaddr *) &address, sizeof (address));
+ if (retval != 0)
+ {
+ g_debug ("could not bind port %d", port);
+ port += 2;
+ if (port > 65535) {
+ *error = g_error_new (FS_STREAM_ERROR, FS_STREAM_ERROR_NETWORK,
+ "Could not bind the socket to a port");
+ close (sock);
+ return -1;
+ }
+ }
+ } while (retval != 0);
+
+ return sock;
+}
+
+static GstElement *
+_create_sinksource (gchar *elementname, GstBin *bin,
+ GstElement *teefunnel, gint fd, GstPadDirection direction,
+ GstPad **requested_pad, GError **error)
+{
+ GstElement *elem;
+ GstPadLinkReturn ret;
+ GstPad *ourpad = NULL;
+
+ g_assert (direction == GST_PAD_SINK || direction == GST_PAD_SRC);
+
+ elem = gst_element_factory_make (elementname, NULL);
+ if (!elem) {
+ *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+ "Could not create the %s element", elementname);
+ return NULL;
+ }
+
+ g_object_set (elem,
+ "closefd", FALSE,
+ "sockfd", fd,
+ NULL);
+
+ if (!gst_bin_add (bin, elem)) {
+ *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+ "Could not add the udpsrc element to the gst sink");
+ gst_object_unref (elem);
+ return NULL;
+ }
+
+ if (direction == GST_PAD_SINK)
+ *requested_pad = gst_element_get_request_pad (teefunnel, "sink%d");
+ else
+ *requested_pad = gst_element_get_request_pad (teefunnel, "src%d");
+
+ if (!*requested_pad) {
+ *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+ "Could not get the %s request pad from the %s",
+ (direction == GST_PAD_SINK) ? "sink" : "src",
+ (direction == GST_PAD_SINK) ? "tee" : "funnel");
+ goto error;
+ }
+
+ if (direction == GST_PAD_SINK)
+ ourpad = gst_element_get_static_pad (elem, "src");
+ else
+ ourpad = gst_element_get_static_pad (elem, "sink");
+
+ if (direction == GST_PAD_SINK)
+ ret = gst_pad_link (*requested_pad, ourpad);
+ else
+ ret = gst_pad_link (ourpad, *requested_pad);
+
+ if (GST_PAD_LINK_FAILED(ret)) {
+ *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+ "Could not link the new element %s (%d)", elementname, ret);
+ goto error;
+ }
+
+ if (!gst_element_sync_state_with_parent (elem)) {
+ *error = g_error_new (FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+ "Could not sync the state of the new %s with its parent",
+ elementname);
+ goto error;
+ }
+
+ return elem;
+
+ error:
+ gst_element_set_state (elem, GST_STATE_NULL);
+ gst_bin_remove (bin, elem);
+ return NULL;
+}
+
+
+UdpStream *
+fs_rawudp_transmitter_get_udpstream (FsRawUdpTransmitter *trans,
+ const gchar *requested_ip, guint requested_port,
+ const gchar *requested_rtcp_ip, guint requested_rtcp_port,
+ GError **error)
+{
+ UdpStream *udpstream;
+ GList *udpstream_e;
+
+ *error = NULL;
+
+ /* First lets check if we already have one */
+
+ for (udpstream_e = g_list_first (trans->priv->udpstreams);
+ udpstream_e;
+ udpstream_e = g_list_next (udpstream_e)) {
+ udpstream = udpstream_e->data;
+ if ((requested_port == udpstream->requested_port &&
+ ((requested_ip == NULL && udpstream->requested_ip == NULL) ||
+ !strcmp (requested_ip, udpstream->requested_ip))) &&
+ (requested_rtcp_port == udpstream->requested_rtcp_port &&
+ ((requested_rtcp_ip == NULL && udpstream->requested_rtcp_ip == NULL) ||
+ !strcmp (requested_rtcp_ip, udpstream->requested_rtcp_ip)))) {
+ udpstream->refcount++;
+ return udpstream;
+ }
+ }
+
+ udpstream = g_new0 (UdpStream, 1);
+
+ udpstream->refcount = 1;
+ udpstream->requested_ip = g_strdup (requested_ip);
+ udpstream->requested_port = requested_port;
+ udpstream->requested_rtcp_ip = g_strdup (requested_rtcp_ip);
+ udpstream->requested_rtcp_port = requested_rtcp_port;
+ udpstream->rtpfd = -1;
+ udpstream->rtcpfd = -1;
+
+ /* Now lets bind both ports */
+
+ udpstream->rtpfd = _bind_port (requested_ip, requested_port, error);
+ if (udpstream->rtpfd < 0)
+ goto error;
+
+ udpstream->rtcpfd = _bind_port (requested_rtcp_ip, requested_rtcp_port,
+ error);
+ if (udpstream->rtcpfd < 0)
+ goto error;
+
+ /* Now lets create the elements */
+
+ udpstream->udpsrc = _create_sinksource ("udpsrc",
+ GST_BIN (trans->priv->gst_src), trans->priv->udpsrc_funnel,
+ udpstream->rtpfd, GST_PAD_SRC, &udpstream->udpsrc_requested_pad,
+ error);
+ if (!udpstream->udpsrc)
+ goto error;
+
+ udpstream->udprtcpsrc = _create_sinksource ("udpsrc",
+ GST_BIN (trans->priv->gst_src), trans->priv->udprtcpsrc_funnel,
+ udpstream->rtcpfd, GST_PAD_SRC, &udpstream->udprtcpsrc_requested_pad,
+ error);
+ if (!udpstream->udpsrc)
+ goto error;
+
+ udpstream->udpsink = _create_sinksource ("udpsink",
+ GST_BIN (trans->priv->gst_sink), trans->priv->udpsink_tee,
+ udpstream->rtpfd, GST_PAD_SINK, &udpstream->udpsink_requested_pad,
+ error);
+ if (!udpstream->udpsink)
+ goto error;
+
+ udpstream->udprtcpsink = _create_sinksource ("udpsink",
+ GST_BIN (trans->priv->gst_sink), trans->priv->udprtcpsink_tee,
+ udpstream->rtcpfd, GST_PAD_SINK, &udpstream->udprtcpsink_requested_pad,
+ error);
+ if (!udpstream->udpsink)
+ goto error;
+
+ g_object_set (udpstream->udprtcpsink, "async", FALSE, NULL);
+
+ trans->priv->udpstreams = g_list_prepend (trans->priv->udpstreams,
+ udpstream);
+
+ return udpstream;
+
+ error:
+ if (udpstream)
+ fs_rawudp_transmitter_put_udpstream (trans, udpstream);
+ return NULL;
+}
+
+void
+fs_rawudp_transmitter_put_udpstream (FsRawUdpTransmitter *trans,
+ UdpStream *udpstream)
+{
+ if (udpstream->refcount > 1) {
+ udpstream->refcount--;
+ return;
+ }
+
+ trans->priv->udpstreams = g_list_remove (trans->priv->udpstreams, udpstream);
+
+ if (udpstream->udpsrc) {
+ GstStateChangeReturn ret;
+ ret = gst_element_set_state (udpstream->udpsrc, GST_STATE_NULL);
+ if (ret != GST_STATE_CHANGE_SUCCESS) {
+ g_warning ("Error changing state of udpsrc: %d", ret);
+ }
+ gst_bin_remove (GST_BIN (trans->priv->gst_src), udpstream->udpsrc);
+ }
+
+ if (udpstream->udpsrc_requested_pad) {
+ gst_element_release_request_pad (trans->priv->udpsrc_funnel,
+ udpstream->udpsrc_requested_pad);
+ }
+
+ if (udpstream->udprtcpsrc) {
+ GstStateChangeReturn ret;
+ ret = gst_element_set_state (udpstream->udprtcpsrc, GST_STATE_NULL);
+ if (ret != GST_STATE_CHANGE_SUCCESS) {
+ g_warning ("Error changing state of udprtcpsrc: %d", ret);
+ }
+ gst_bin_remove (GST_BIN (trans->priv->gst_src), udpstream->udprtcpsrc);
+ }
+
+ if (udpstream->udprtcpsrc_requested_pad) {
+ gst_element_release_request_pad (trans->priv->udprtcpsrc_funnel,
+ udpstream->udprtcpsrc_requested_pad);
+ }
+
+
+ if (udpstream->udpsink) {
+ GstStateChangeReturn ret;
+ gst_object_ref (udpstream->udpsink);
+ gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpstream->udpsink);
+ ret = gst_element_set_state (udpstream->udpsink, GST_STATE_NULL);
+ if (ret != GST_STATE_CHANGE_SUCCESS) {
+ g_warning ("Error changing state of udpsink: %d", ret);
+ }
+ gst_object_unref (udpstream->udpsink);
+ }
+
+ if (udpstream->udpsink_requested_pad) {
+ gst_element_release_request_pad (trans->priv->udpsink_tee,
+ udpstream->udpsink_requested_pad);
+ }
+
+ if (udpstream->udprtcpsink) {
+ GstStateChangeReturn ret;
+ gst_object_ref (udpstream->udpsink);
+ gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpstream->udprtcpsink);
+ ret = gst_element_set_state (udpstream->udprtcpsink, GST_STATE_NULL);
+ if (ret != GST_STATE_CHANGE_SUCCESS) {
+ g_warning ("Error changing state of udprtcpsink: %d", ret);
+ }
+ gst_object_unref (udpstream->udpsink);
+ }
+
+ if (udpstream->udprtcpsink_requested_pad) {
+ gst_element_release_request_pad (trans->priv->udprtcpsink_tee,
+ udpstream->udprtcpsink_requested_pad);
+ }
+
+
+ if (udpstream->rtpfd >= 0)
+ close (udpstream->rtpfd);
+ if (udpstream->rtcpfd >= 0)
+ close (udpstream->rtcpfd);
+
+ g_free (udpstream->requested_ip);
+ g_free (udpstream->requested_rtcp_ip);
+ g_free (udpstream);
+}
diff --git a/transmitters/rawudp/fs-rawudp-transmitter.h b/transmitters/rawudp/fs-rawudp-transmitter.h
index 7269b16..4da2697 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.h
+++ b/transmitters/rawudp/fs-rawudp-transmitter.h
@@ -80,8 +80,23 @@ struct _FsRawUdpTransmitter
gpointer _padding[8];
};
+/* Private declaration */
+typedef struct _UdpStream UdpStream;
+
GType fs_rawudp_transmitter_get_type (void);
+
+
+UdpStream *
+fs_rawudp_transmitter_get_udpstream (FsRawUdpTransmitter *trans,
+ const gchar *requested_ip, guint requested_port,
+ const gchar *requested_rtcp_ip, guint requested_rtcp_port,
+ GError **error);
+
+void fs_rawudp_transmitter_put_udpstream (FsRawUdpTransmitter *trans,
+ UdpStream *udpstream);
+
+
G_END_DECLS
#endif /* __FS_RAWUDP_TRANSMITTER_H__ */
--
1.5.6.5
More information about the farsight-commits
mailing list