Dynamic pipelines using valves

Clyde McQueen clydemcqueen at gmail.com
Sat Sep 18 21:35:56 UTC 2021


For reference, I fixed this by setting the videotestsrc.is-live
property to true:

g_object_set(src_, "is-live", SRC_IS_LIVE, nullptr);

Without this setting, when I unlinked the xvimagesink element the
videotestsrc element -- which was no longer throttled by
xvimagesink.sync=true -- produced buffers at a fantastic rate. These
buffers were promptly dropped by the valve. With GST_DEBUG=5 I noticed
that the buffer PTS values quickly got many minutes ahead of the
clock. I'm not entirely sure what happened when I re-added the sink,
but I suppose the videotestsrc element was waiting for the clock to
exceed the last PTS value, so it appeared to be stalled.

Best,
/Clyde

On Tue, Aug 31, 2021 at 8:30 AM Clyde McQueen <clydemcqueen at gmail.com> wrote:
>
> I'd like to build a pipeline with 1 source and 3 sinks, where the sinks can be added and removed on the fly. I've seen examples using tees and valves, so my basic pipeline has 2 tees, 3 queues, 3 valves and 0-3 sinks. The pipeline mostly works, but it fails the 2nd time I create a sink for a particular valve, and only when sink.sync=true.
>
> The problem persists with a single src -> queue -> valve -> sink, as shown below. The pipeline starts fine, but when valve.drop is set to true a second time, the packets still do not flow. I've tried with live streams where sink.sync=false, and it works well. It's only when sink.sync=true that it fails.
>
> Any ideas?
>
> $ gst-launch-1.0 --version
> gst-launch-1.0 version 1.16.2
> GStreamer 1.16.2
> https://launchpad.net/distros/ubuntu/+source/gstreamer1.0
>
>
> #include <thread>
>
> extern "C" {
> #include "gst/gst.h"
> #include "gst/base/gstbasesink.h"
> }
>
> class ValveTest
> {
>   GstElement *pipeline_;
>   GstElement *src_;
>
>   GstElement *queue_;
>   GstElement *valve_;
>   GstElement *sink_;
>
>   bool valve_drop_{false};
>
> public:
>   ValveTest()
>   {
>     if (!gst_is_initialized()) {
>       gst_init(nullptr, nullptr);
>     }
>
>     pipeline_ = gst_pipeline_new(nullptr);
>     src_ = gst_element_factory_make("videotestsrc", "src");
>     queue_ = gst_element_factory_make("queue", "queue");
>     valve_ = gst_element_factory_make("valve", "valve");
>     sink_ = gst_element_factory_make("xvimagesink", "sink");
>
>     if (!pipeline_ || !src_ || !queue_ || !valve_ || !sink_) {
>       g_critical("create failed");
>       return;
>     }
>
>     gst_bin_add_many(GST_BIN(pipeline_), src_, queue_, valve_, sink_, nullptr);
>
>     if (!gst_element_link_many(src_, queue_, valve_, sink_, nullptr)) {
>       g_critical("link src ... sink failed");
>       return;
>     }
>
>     g_object_set(valve_, "drop", valve_drop_, nullptr);
>
>     gst_base_sink_set_sync(GST_BASE_SINK(sink_), true);
>
>     gst_element_set_state(pipeline_, GST_STATE_PLAYING);
>   }
>
>   void toggle_valve()
>   {
>     valve_drop_ = !valve_drop_;
>     if (valve_drop_) {
>       g_print("Drop\n");
>       g_object_set(valve_, "drop", true, nullptr);
>       gst_element_set_state(sink_, GST_STATE_NULL);
>       gst_bin_remove(GST_BIN(pipeline_), sink_);
>       gst_object_unref(GST_OBJECT(sink_));
>       sink_ = nullptr;
>     } else {
>       g_print("The buffers must flow\n");
>       sink_ = gst_element_factory_make("xvimagesink", "sink");
>       gst_base_sink_set_sync(GST_BASE_SINK(sink_), true);
>       gst_bin_add(GST_BIN(pipeline_), sink_);
>       gst_element_link(valve_, sink_);
>       gst_element_set_state(sink_, GST_STATE_PLAYING);
>       g_object_set(valve_, "drop", false, nullptr);
>     }
>   }
> };
>
> int main()
> {element
>   ValveTest test;
>
>   while (true) {
>     using namespace std::chrono_literals;
>     std::this_thread::sleep_for(2s);
>     test.toggle_valve();
>   }
>
>   return 0;
> }
>
> Thanks for any help!
> /Clyde
>


More information about the gstreamer-devel mailing list