Problems adding/removing elements dynamically to capture video to disk while streaming

Øystein Skotheim oystein.skotheim at scoutdi.com
Tue Oct 22 12:01:52 UTC 2019


I am having trouble adding elements dynamically to a tee to allow simultaneous streaming and recording of video when requested by the user.

I first set up the following pipeline to stream video via RTSP:

nvarguscamerasrc ! video/x-raw(memory:NVMM),width=1280,height=720,framerate=30/1,format=NV12 ! tee name=sourceTee ! queue ! omxh264enc ! rtph264pay name=pay0 pt=96

I then try to add elements to the sourceTee when the user clicks start and take them down again when the user clicks stop. I try to flush the video capture part of the pipeline by injecting EOS into the video capture queue's sink pad and waiting for it to arrive on the filesink.

It works most of the time, but when I start and stop video capture multiple times, it is not stable, and sometimes it appears that I take down the whole pipeline and the video stream stops.

Can somebody take a look at my code for taking down the pipeline and let me know what I am doing wrong?

A struct with elements to capture video to disk:

struct ArgusVideoCaptureElements
{
  GstElement* queue = nullptr;
  GstElement* nvtee = nullptr;
  GstElement* convert = nullptr;
  GstCaps* convertCaps = nullptr;
  GstElement* encoder = nullptr;
  GstElement* mux = nullptr;
  GstElement* filesink = nullptr;
};

This is the code I use to start capturing video (at the same time as the stream is running):

void ArgusVideoCapture::start()
{
    m_bin = gst_bin_new("video_capture_bin");

    // Pipeline to capture video to disk
    // sourceTee. ! queue ! nvvidconv ! video/x-raw(memory:NVMM),format=(string)I420 ! omxh264enc ! mp4mux ! filesink location=test.mp4

    ASSERT_THROW(
        m_elems.queue = gst_element_factory_make("queue", NULL),
        "Video Capture: Could not create queue element");

    ASSERT_THROW(
        m_elems.nvtee = gst_element_factory_make("nvtee", NULL),
        "Video Capture: Could not create nvtee element");

    ASSERT_THROW(
        m_elems.convert = gst_element_factory_make("nvvidconv", NULL),
        "Video Capture: Could not create videoconvert element");

    ASSERT_THROW(
        m_elems.convertCaps = gst_caps_from_string("video/x-raw(memory:NVMM), format=(string)I420"),
        "Video Capture: Could not create videoconvert caps");

    ASSERT_THROW(
        m_elems.encoder = gst_element_factory_make("omxh264enc", NULL),
        "Video Capture: Could not create encoder");

    ASSERT_THROW(
        m_elems.mux = gst_element_factory_make("mp4mux", NULL),
        "Video Capture: Could not create multiplexer");

    ASSERT_THROW(
        m_elems.filesink = gst_element_factory_make("filesink", NULL),
        "Video Capture: Could not create filesink");

    auto filename = generateFilename();

    gst_util_set_object_arg(G_OBJECT(m_elems.encoder), "tune", "zerolatency");
    g_object_set(G_OBJECT(m_elems.filesink), "location", filename.c_str(), NULL);
    g_object_set(G_OBJECT(m_elems.filesink), "sync", FALSE, NULL);

    gst_bin_add_many(GST_BIN(m_bin), m_elems.queue, m_elems.nvtee, m_elems.convert,
        m_elems.encoder, m_elems.mux, m_elems.filesink, NULL);

    gst_element_link_many(m_elems.queue, m_elems.nvtee, m_elems.convert, NULL);
    gst_element_link_filtered(m_elems.convert, m_elems.encoder, m_elems.convertCaps);
    gst_element_link_many(m_elems.encoder, m_elems.mux, m_elems.filesink, NULL);

    // Create ghost pad for bin
    GstPad* pad = gst_element_get_static_pad(m_elems.queue, "sink");
    ASSERT_THROW(pad, "Could not get sink pad from queue element");
    ASSERT_THROW(gst_element_add_pad(m_bin, gst_ghost_pad_new("sink", pad)),
        "Could not add sink ghost pad to video capture bin");
    gst_object_unref(GST_OBJECT(pad));

    GstElement *tee = gst_bin_get_by_name(GST_BIN(m_pipeline), m_teeName.c_str());
    ASSERT_THROW(tee, "Could not get tee element");

    m_teePad = gst_element_get_request_pad(tee, "src_%u");
    ASSERT_THROW(m_teePad, "Could not get tee request pad");
    gst_object_unref(tee);

    ASSERT_THROW(gst_bin_add(GST_BIN(m_pipeline), m_bin), "Could not add video capture bin to pipeline");

    GstPad* sinkPad = gst_element_get_static_pad(m_bin, "sink");
    ASSERT_THROW(sinkPad, "Could not get sink pad from video capture element");
    ASSERT_THROW(gst_pad_link(m_teePad, sinkPad) == GstPadLinkReturn::GST_PAD_LINK_OK, "Could not link tee pad to video capture bin");
    gst_object_unref(sinkPad);

    gst_element_sync_state_with_parent(m_bin);
}

----

This is the code I use to take down the video capture bin:

void ArgusVideoCapture::stop()
{
    if (m_teePad == nullptr)
    {
        log_warning("Cannot stop video capture since it was not running");
        return;
    }

    m_eosReceived = false;

    gst_pad_add_probe(m_teePad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
    [](GstPad* pad, GstPadProbeInfo* info, gpointer user_data) -> GstPadProbeReturn
        {
            log_info("Blocking downstream data on the tee pad for video capture");
            ArgusVideoCapture *self = reinterpret_cast<ArgusVideoCapture *>(user_data);

            gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info));
            GstPad *sinkPad = gst_element_get_static_pad(self->m_elems.filesink, "sink");

            // Callback to wait for end of stream
            gst_pad_add_probe(sinkPad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
                [](GstPad* pad, GstPadProbeInfo* info, gpointer userData) -> GstPadProbeReturn
                {
                    ArgusVideoCapture *self = reinterpret_cast<ArgusVideoCapture *>(userData);

                    if (GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) != GST_EVENT_EOS)
                        return GST_PAD_PROBE_PASS;

                    log_info("File sink received EOS");
                    gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info));
                    self->m_eosReceived = true;
                }, self, nullptr);

            gst_object_unref(sinkPad);
            GstPad* binSinkPad = gst_element_get_static_pad(self->m_bin, "sink");
            gst_pad_send_event(binSinkPad, gst_event_new_eos());
            gst_object_unref(binSinkPad);

        }, this, nullptr);

    log_info("Unlinking from tee pad");
    GstPad* sinkPad = gst_element_get_static_pad(m_bin, "sink");
    gst_pad_unlink(m_teePad, sinkPad);
    gst_object_unref(sinkPad);
    gst_object_unref(m_teePad);

    auto start = std::chrono::high_resolution_clock::now();
    auto now = start;
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now-start).count();

    while (!m_eosReceived && (duration < 3000))
    {
        now = std::chrono::high_resolution_clock::now();
        duration = std::chrono::duration_cast<std::chrono::milliseconds>(now-start).count();
    }

    if (!m_eosReceived)
        log_warning("Timeout while waiting for EOS");

    gst_pad_add_probe(m_teePad, GST_PAD_PROBE_TYPE_IDLE,
    [](GstPad* pad, GstPadProbeInfo* info, gpointer userData) -> GstPadProbeReturn
        {
            ArgusVideoCapture *self = reinterpret_cast<ArgusVideoCapture*>(userData);
            log_info("Unlinking from tee pad");
            GstPad* sinkPad = gst_element_get_static_pad(self->m_bin, "sink");
            gst_pad_unlink(self->m_teePad, sinkPad);
            gst_object_unref(sinkPad);
            gst_object_unref(self->m_teePad);
            log_info("Setting bin state to NULL");
            gst_element_set_state(self->m_bin, GST_STATE_NULL);
            log_info("Removing bin from pipeline");
            gst_bin_remove(GST_BIN(self->m_pipeline), self->m_bin);
        }, this, nullptr);
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20191022/71b8059c/attachment-0001.html>


More information about the gstreamer-devel mailing list