Dynamic pipelines using valves

Clyde McQueen clydemcqueen at gmail.com
Tue Aug 31 15:30:31 UTC 2021


I'd like to build a pipeline with 1 source and 3 sinks, where the sinks can
be added and removed on the fly. I've seen examples using tees and valves,
so my basic pipeline has 2 tees, 3 queues, 3 valves and 0-3 sinks. The
pipeline mostly works, but it fails the 2nd time I create a sink for a
particular valve, and only when sink.sync=true.

The problem persists with a single src -> queue -> valve -> sink, as shown
below. The pipeline starts fine, but when valve.drop is set to true a
second time, the packets still do not flow. I've tried with live streams
where sink.sync=false, and it works well. It's only when sink.sync=true
that it fails.

Any ideas?

$ gst-launch-1.0 --version
gst-launch-1.0 version 1.16.2
GStreamer 1.16.2
https://launchpad.net/distros/ubuntu/+source/gstreamer1.0


#include <thread>

extern "C" {
#include "gst/gst.h"
#include "gst/base/gstbasesink.h"
}

class ValveTest
{
  GstElement *pipeline_;
  GstElement *src_;

  GstElement *queue_;
  GstElement *valve_;
  GstElement *sink_;

  bool valve_drop_{false};

public:
  ValveTest()
  {
    if (!gst_is_initialized()) {
      gst_init(nullptr, nullptr);
    }

    pipeline_ = gst_pipeline_new(nullptr);
    src_ = gst_element_factory_make("videotestsrc", "src");
    queue_ = gst_element_factory_make("queue", "queue");
    valve_ = gst_element_factory_make("valve", "valve");
    sink_ = gst_element_factory_make("xvimagesink", "sink");

    if (!pipeline_ || !src_ || !queue_ || !valve_ || !sink_) {
      g_critical("create failed");
      return;
    }

    gst_bin_add_many(GST_BIN(pipeline_), src_, queue_, valve_, sink_,
nullptr);

    if (!gst_element_link_many(src_, queue_, valve_, sink_, nullptr)) {
      g_critical("link src ... sink failed");
      return;
    }

    g_object_set(valve_, "drop", valve_drop_, nullptr);

    gst_base_sink_set_sync(GST_BASE_SINK(sink_), true);

    gst_element_set_state(pipeline_, GST_STATE_PLAYING);
  }

  void toggle_valve()
  {
    valve_drop_ = !valve_drop_;
    if (valve_drop_) {
      g_print("Drop\n");
      g_object_set(valve_, "drop", true, nullptr);
      gst_element_set_state(sink_, GST_STATE_NULL);
      gst_bin_remove(GST_BIN(pipeline_), sink_);
      gst_object_unref(GST_OBJECT(sink_));
      sink_ = nullptr;
    } else {
      g_print("The buffers must flow\n");
      sink_ = gst_element_factory_make("xvimagesink", "sink");
      gst_base_sink_set_sync(GST_BASE_SINK(sink_), true);
      gst_bin_add(GST_BIN(pipeline_), sink_);
      gst_element_link(valve_, sink_);
      gst_element_set_state(sink_, GST_STATE_PLAYING);
      g_object_set(valve_, "drop", false, nullptr);
    }
  }
};

int main()
{element
  ValveTest test;

  while (true) {
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(2s);
    test.toggle_valve();
  }

  return 0;
}

Thanks for any help!
/Clyde
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20210831/63dbcfba/attachment-0001.htm>


More information about the gstreamer-devel mailing list