[farsight2/master] Add queue to RTP sink in transmitter

Olivier Crête olivier.crete at collabora.co.uk
Tue Dec 23 15:19:28 PST 2008


---
 transmitters/rawudp/fs-rawudp-transmitter.c |   84 +++++++++++++++++++++++++-
 1 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/transmitters/rawudp/fs-rawudp-transmitter.c b/transmitters/rawudp/fs-rawudp-transmitter.c
index 1b6312e..476ab97 100644
--- a/transmitters/rawudp/fs-rawudp-transmitter.c
+++ b/transmitters/rawudp/fs-rawudp-transmitter.c
@@ -390,6 +390,8 @@ struct _UdpPort {
   GstElement *udpsink;
   GstPad *udpsink_requested_pad;
 
+  GstElement *queue;
+
   gchar *requested_ip;
   guint requested_port;
 
@@ -461,7 +463,7 @@ _bind_port (const gchar *ip, guint port, guint *used_port, GError **error)
 static GstElement *
 _create_sinksource (gchar *elementname, GstBin *bin,
   GstElement *teefunnel, gint fd, GstPadDirection direction,
-  GstPad **requested_pad, GError **error)
+  GstElement **queue, GstPad **requested_pad, GError **error)
 {
   GstElement *elem;
   GstPadLinkReturn ret;
@@ -483,7 +485,8 @@ _create_sinksource (gchar *elementname, GstBin *bin,
 
   if (!gst_bin_add (bin, elem)) {
     g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
-      "Could not add the udpsrc element to the gst sink");
+      "Could not add the %s element to the gst %s bin", elementname,
+      (direction == GST_PAD_SINK) ? "sink" : "src");
     gst_object_unref (elem);
     return NULL;
   }
@@ -506,11 +509,63 @@ _create_sinksource (gchar *elementname, GstBin *bin,
   else
     ourpad = gst_element_get_static_pad (elem, "sink");
 
+  if (queue) {
+    GstPad *queuesink;
+
+    *queue = gst_element_factory_make ("queue", NULL);
+    if (!*queue) {
+        g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+          "Could not create the queue element");
+        goto error;
+    }
+
+    if (!gst_bin_add (bin, *queue)) {
+      g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+        "Could not add the queue element to the gst %s bin",
+        (direction == GST_PAD_SINK) ? "sink" : "src");
+      gst_object_unref (*queue);
+      *queue = NULL;
+      goto error;
+    }
+
+    queuesink = gst_element_get_static_pad (*queue, "sink");
+
+    ret = gst_pad_link (*requested_pad, queuesink);
+
+    if (GST_PAD_LINK_FAILED(ret)) {
+      g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+        "Could not link the new element %s (%d)", elementname, ret);
+      goto error;
+    }
+
+
+    if (GST_PAD_LINK_FAILED(ret)) {
+      g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+        "Could not link the new queue (%d)", ret);
+      gst_object_unref (queuesink);
+      goto error;
+    }
+
+    gst_object_unref (queuesink);
+    gst_object_unref (ourpad);
+    ourpad = gst_element_get_static_pad (*queue, "src");
+
+
+    if (!gst_element_sync_state_with_parent (*queue)) {
+      g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
+        "Could not sync the state of the new queue with its parent");
+      goto error;
+    }
+  }
+
+
   if (direction == GST_PAD_SINK)
     ret = gst_pad_link (*requested_pad, ourpad);
   else
     ret = gst_pad_link (ourpad, *requested_pad);
 
+  gst_object_unref (ourpad);
+
   if (GST_PAD_LINK_FAILED(ret)) {
     g_set_error (error, FS_SESSION_ERROR, FS_SESSION_ERROR_CONSTRUCTION,
       "Could not link the new element %s (%d)", elementname, ret);
@@ -527,8 +582,14 @@ _create_sinksource (gchar *elementname, GstBin *bin,
   return elem;
 
  error:
+  if (queue && *queue) {
+    gst_element_set_state (*queue, GST_STATE_NULL);
+    gst_bin_remove (bin, *queue);
+  }
   gst_element_set_state (elem, GST_STATE_NULL);
   gst_bin_remove (bin, elem);
+  if (ourpad)
+    gst_object_unref (ourpad);
   return NULL;
 }
 
@@ -540,6 +601,7 @@ fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans,
 {
   UdpPort *udpport;
   GList *udpport_e;
+  GstElement **queue = NULL;
 
   /* First lets check if we already have one */
 
@@ -583,14 +645,17 @@ fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans,
 
   udpport->udpsrc = _create_sinksource ("udpsrc",
     GST_BIN (trans->priv->gst_src), trans->priv->udpsrc_funnel,
-    udpport->fd, GST_PAD_SRC, &udpport->udpsrc_requested_pad,
+    udpport->fd, GST_PAD_SRC, NULL, &udpport->udpsrc_requested_pad,
     error);
   if (!udpport->udpsrc)
     goto error;
 
+  if (component_id == FS_COMPONENT_RTP)
+    queue = &udpport->queue;
+
   udpport->udpsink = _create_sinksource ("udpsink",
     GST_BIN (trans->priv->gst_sink), trans->priv->udpsink_tee,
-    udpport->fd, GST_PAD_SINK, &udpport->udpsink_requested_pad,
+    udpport->fd, GST_PAD_SINK, queue, &udpport->udpsink_requested_pad,
     error);
   if (!udpport->udpsink)
     goto error;
@@ -643,6 +708,17 @@ fs_rawudp_transmitter_put_udpport (FsRawUdpTransmitter *trans,
       udpport->udpsrc_requested_pad);
   }
 
+  if (udpport->queue) {
+    GstStateChangeReturn ret;
+    gst_object_ref (udpport->queue);
+    gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpport->queue);
+    ret = gst_element_set_state (udpport->queue, GST_STATE_NULL);
+    if (ret != GST_STATE_CHANGE_SUCCESS) {
+      g_warning ("Error changing state of udpsink: %d", ret);
+    }
+    gst_object_unref (udpport->queue);
+  }
+
   if (udpport->udpsink) {
     GstStateChangeReturn ret;
     gst_object_ref (udpport->udpsink);
-- 
1.5.6.5




More information about the farsight-commits mailing list