Changing RTSP stream location spawns more and more threads

twallis twallis at deeptrekker.com
Wed May 23 17:05:02 UTC 2018


Hello, I've been working on a Qt app to display a RTSP stream on an embedded
device (TI AM5728).  I would like to be able to change the location of the
stream on the fly, as there are multiple stream locations on the network.  I
am able to change stream locations using the following method:

1) Receive Qt signal in a Qt slot callback, create a pad probe to catch the
pipeline
2) in that pad probe callback, I create a new probe to catch an EOS event
that I send down the pipeline.  The probe is on the last 'processing'
element before the sink.  The EOS is sent to flush out the old stream
information.
3) in the EOS callback, I remove the old rtspsrc from the pipeline, create a
new one with the new location, and link it to a callback for its 'sometimes'
pad.

All of the above works as intended, however I am creating 7 new threads in
my program each time I change source.  As of yet I have no conclusions why
they're being created, but my hypothesis is that previous rtspsrc elements
are not being removed properly, or that I am creating more and more new
pipelines rather than editing a single one.  This has me quite stumped. 
Where in the following code would all these new threads be coming from??

#include "camerastream.h"
#include <QTimer>

//function prototypes for callbacks that nobody but me should access
static void on_rtsp_pad_added(GstElement* element, GstPad *pad, gpointer
data);
static gboolean bus_call(GstBus *bus, GstMessage *message, gpointer *data);
static GstPadProbeReturn on_pad_probe(GstPad *pad, GstPadProbeInfo *info,
gpointer data);
static GstPadProbeReturn event_probe(GstPad *pad, GstPadProbeInfo *info,
gpointer data);

gchar *m_name;

CameraStream::CameraStream(QThread *parent) : QThread(parent)
{
    m_cameraStream = NULL;
    m_loop = NULL;
    m_name = NULL;

    QTimer *timer = new QTimer(this);
    connect(timer, SIGNAL(timeout()), this, SLOT(slot_changeSource()));
    timer->start(10000);
}

CameraStream::~CameraStream(){
    //kill media pipeline
    gst_element_set_state(m_cameraStream, GST_STATE_NULL);
    gst_object_unref(m_cameraStream);
    g_main_loop_unref(m_loop);
}

void CameraStream::run(){   //overrides QThread::run()
    GstBus *bus;
    guint busWatchId;
    GstElement *src, *depay, *parser, *decoder, *vpe, *filter, *sink;
    GstCaps *vpeCaps;

    m_loop = g_main_loop_new(NULL, FALSE);

    //create pipeline elements
    m_cameraStream = gst_pipeline_new("display_pipeline");
    src = gst_element_factory_make("rtspsrc", "rtspsrc");
    depay = gst_element_factory_make("rtpjpegdepay", "depay");
    parser = gst_element_factory_make("jpegparse", "parser");
    decoder = gst_element_factory_make("ducatijpegdec", "decoder");
    vpe = gst_element_factory_make("vpe", "vpe");
    filter = gst_element_factory_make("capsfilter", "filter");
    sink = gst_element_factory_make("waylandsink", "sink");

    if(!(m_cameraStream || src || depay || parser || decoder || vpe ||
filter || sink)){
        qFatal("could not create pipeline elements");
        exit(1);
    }

    g_object_set(G_OBJECT(src), "location", "rtsp://192.168.50.29/av0_1",
"latency", 0, NULL);
    g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added),
m_cameraStream);

    //add src caps?
    vpeCaps = gst_caps_from_string("video/x-raw, format=NV12, width=800,
height=480");  //change this when Tomas' patch hits
    if(!vpeCaps){
        qFatal("cannot create caps");
        exit(1);
    }

    g_object_set(G_OBJECT(filter), "caps", vpeCaps, NULL);
    g_object_set(G_OBJECT(sink), "sync", false, NULL);

    //add and link elements to create full pipeline
    gst_bin_add_many(GST_BIN(m_cameraStream), src, depay, parser, decoder,
vpe, sink, NULL);
    if(!gst_element_link_many(depay, parser, decoder, vpe, sink, NULL)){
        qFatal("cannot link elements");
        exit(1);
    }

    gst_caps_unref(vpeCaps);

    bus = gst_pipeline_get_bus(GST_PIPELINE(m_cameraStream));
    busWatchId = gst_bus_add_watch(bus, GstBusFunc(bus_call), m_loop);
    gst_object_unref(bus);

    gst_element_set_state(m_cameraStream, GST_STATE_PLAYING);
    g_main_loop_run(m_loop);
}

void CameraStream::slot_changeSource(/*QString newUrl*/){
    GstPad *pad;
    //needs to stay playing, otherwise the probe callback will never trigger
    qDebug("\nslot_changeSource");
    GstElement* depay = gst_bin_get_by_name(GST_BIN(m_cameraStream),
"depay");
    if(depay){ //finds this
        qDebug("m_name: %s", m_name);
        pad = gst_element_get_static_pad(depay, "src");
        if(pad){
            qDebug("found pad");
            gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
on_pad_probe, m_cameraStream, NULL); //first Data could be QString newUrl
            gst_element_set_state(m_cameraStream, GST_STATE_PLAYING);
//needed to trip the on_rtsp_pad_added callback
        }
        else
            qDebug("did not find Pad");
    }
}

//allow linking of the rtspsrc and jpegdepay elements, for whatever reason
when linked with gst_element_link_many, rtspsrc doesn't link
//manually link with the filter cap application/x-rtp
static void on_rtsp_pad_added(GstElement* element, GstPad *pad, gpointer
data){
    gchar *name;
    GstElement *depay;    

    qDebug("on_rtsp_pad_added");    
    m_name = gst_pad_get_name(pad);
    qDebug("on_rtsp_pad_added, rtspsrc pad name: %s", m_name);
    //depay = GST_ELEMENT(data);
    depay = gst_bin_get_by_name(GST_BIN(data), "depay");
    if(depay){
        qDebug("pad_added: found depay");
    }
    if(element){
        qDebug("pad_added: found element");
    }
    else
        qDebug("pad_added: could not find element");
    if(!gst_element_link_pads(element, m_name, depay, "sink")){
        qFatal("pad_added: failed to link elements");
    }
    g_free(name);
}

//peek into the messages on the bus to detect EOS (live so unneeded) and
Error messages.
//the unused bus variable is due to these parameters being part of a
standard gstreamer template for bus callbacks.
static gboolean bus_call(GstBus *bus, GstMessage *message, gpointer *data){
    GMainLoop *loop = (GMainLoop*)data;

    switch(GST_MESSAGE_TYPE(message)){
        case GST_MESSAGE_EOS:
            qDebug("end of stream\n");
            g_main_loop_quit(loop);
            break;

        case GST_MESSAGE_ERROR:{
            gchar *debug;
            GError *error;
            gst_message_parse_error(message, &error, &debug);
            g_free(debug);
            qFatal("GSTREAMER GST_MESSAGE_ERROR! %s", error->message);
            g_main_loop_quit(loop);
            break;
        }

        default:{
            break;
        }
    }
    return TRUE;
}

static GstPadProbeReturn on_pad_probe(GstPad *pad, GstPadProbeInfo *info,
gpointer data){
    GstPad *srcPad, *sinkPad;
    GstElement *vpe, *depay;

    qDebug("on_pad_probe");
    gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info));

    //new probe for EOS, this one on the vpe
    vpe = gst_bin_get_by_name(GST_BIN(data), "vpe");
    srcPad = gst_element_get_static_pad(vpe, "src");
    gst_pad_add_probe(srcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
event_probe, data, NULL);

    //push EOS into the element, wait for the EOS to appear on the srcpad
    depay = gst_bin_get_by_name(GST_BIN(data), "depay");
    sinkPad = gst_element_get_static_pad(depay, "sink");
    gst_pad_send_event(sinkPad, gst_event_new_eos());

    return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn event_probe(GstPad *pad, GstPadProbeInfo *info,
gpointer data){
    GstElement *rtspsrcOld, *rtspsrcNew, *depay;

    qDebug("event_probe");
    if(GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) != GST_EVENT_EOS){
        qDebug("NOT EOS");
        return GST_PAD_PROBE_PASS;
    }
    else if(GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) == GST_EVENT_EOS){
        gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info));

        rtspsrcOld = gst_bin_get_by_name(GST_BIN(data), "rtspsrc");
        if(rtspsrcOld){
            qDebug("found rtspsrcOld");
            depay = gst_bin_get_by_name(GST_BIN(data), "depay");
            gst_element_unlink(rtspsrcOld, depay);
            gst_bin_remove(GST_BIN(data), rtspsrcOld); //remove old rtspsrc
from pipeline, should unlink from depay automatically.
            rtspsrcNew = gst_element_factory_make("rtspsrc", "rtspsrc");
//create new rtspsrc (works)
            g_object_set(rtspsrcNew, "location",
"rtsp://192.168.50.30/av0_1", "latency", 0, NULL);
            g_signal_connect(G_OBJECT(rtspsrcNew), "pad-added",
G_CALLBACK(on_rtsp_pad_added), data);

            gst_bin_add(GST_BIN(data), rtspsrcNew); //add the new rtspsrc to
the pipeline (works)
            //gst_element_set_state(GST_ELEMENT(data), GST_STATE_PLAYING);
//needed to trip the on_rtsp_pad_added callback

            return GST_PAD_PROBE_HANDLED;
        }
    }
    return GST_PAD_PROBE_OK;
}




--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list