GStreamer multithreads

JieJay jie.zhang1178 at gmail.com
Tue Apr 20 10:14:01 UTC 2021


Hi guys,

I am a beginner to GStreamer. Recently i'm trying to understand the
mechanisme of multithreads in GStreamer. Say that i have several independant
video sources, i want to realize the following functions:

1. Visualise these video sources in paralle.
2. If necessary, pause one of the sources and let the others continue the
display.

Then based on this principe I've written these following codes:

```
static gboolean push_data(GstSession *p_gstSession){
    static gboolean white = FALSE;
    GstBuffer *buffer;
    size_t size = 0;
    GstFlowReturn ret = GST_FLOW_OK;
    GstMapInfo info;


    //Creation of a random perturbation to check out the independance of
each thread 
/*     int v1 = rand() % 10000000;
    g_print (" %i\n", v1);
    if (v1 > 9999998) {
        g_print("SLEEP !");
        g_usleep(20000000);
    } */


    CVideoData *videoData = (CVideoData*) p_gstSession->inputFifo->Pop();
        if (videoData && videoData !=NULL ) {
            size = videoData->GetSize();
        buffer = gst_buffer_new_allocate(NULL, size, NULL);
        gst_buffer_map(buffer, &info, GST_MAP_WRITE);
        GST_BUFFER_PTS (buffer) = p_gstSession->timestamp;
        // Ne semble pas avoir d'impact, à cause de la soure live ?
        GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale_int (1,
GST_SECOND, 90000);
            p_gstSession->timestamp += GST_BUFFER_DURATION (buffer);
        memcpy(info.data, videoData->GetData(), size);
        gst_buffer_unmap(buffer, &info);
        g_signal_emit_by_name (p_gstSession->source, "push-buffer", buffer,
&ret);
        p_gstSession->num_packet++;
        gst_buffer_unref(buffer);

    }

    if (ret != GST_FLOW_OK) {
        /* something wrong, stop pushing */
        g_main_loop_quit (p_gstSession->main_loop);
    }
    return TRUE;

}

static void
start_feed (GstElement * pipeline, guint size, GstSession *gstSession)
{
    if (gstSession->sourceid == 0) {
        g_print ("start feeding at packet %i\n", gstSession->num_packet);
        gstSession->sourceid = g_idle_add ((GSourceFunc) push_data,
gstSession);
    }
}

/* This callback is called when appsrc has enough data and we can stop
sending.
 * We remove the idle handler from the mainloop */
static void
stop_feed (GstElement * pipeline,GstSession *gstSession)
{
    if (gstSession->sourceid != 0) {
        g_print ("stop feeding at packet %i\n", gstSession->num_packet);
        g_source_remove (gstSession->sourceid);
        gstSession->sourceid = 0;
    }
}

static gboolean
on_pipeline_message (GstBus * bus, GstMessage * message, GstSession
*gstSession)
{
    GstState state, pending;

    switch (GST_MESSAGE_TYPE (message)) {
        case GST_MESSAGE_EOS:
            g_print ("Received End of Stream message\n");
            g_main_loop_quit (gstSession->main_loop);
            break;
        case GST_MESSAGE_ERROR:
        {
            g_print ("Received error\n");

            GError *err = NULL;
            gchar *dbg_info = NULL;

            gst_message_parse_error (message, &err, &dbg_info);
            g_printerr ("ERROR from element %s: %s\n",
                        GST_OBJECT_NAME (message->src), err->message);
            g_printerr ("Debugging info: %s\n", (dbg_info) ? dbg_info :
"none");
            g_error_free (err);
            g_free (dbg_info);
        }

            g_main_loop_quit (gstSession->main_loop);
            break;
        case GST_MESSAGE_STATE_CHANGED:
            gst_element_get_state(gstSession->source, &state, &pending,
GST_CLOCK_TIME_NONE);
            /* g_print ("State changed from %i to %i\n", state, pending); */
            break;
        default:
            break;
    }
    return TRUE;
}

int   CSimpleUdpDisplayGstPipeline::start() {

    int argc = 1;
    char *dummy_args[] = {NULL};
    GstBus *bus = NULL;
    char **argv = dummy_args;
    gst_init(&argc, &argv);
    gstSession.pipeline = gst_pipeline_new("pipeline");
//    if (enable_appsrc) {
        gstSession.source = gst_element_factory_make("appsrc", "appsrc");

        bus = gst_element_get_bus (gstSession.pipeline);
        gst_bus_add_watch (bus, (GstBusFunc) on_pipeline_message,
&gstSession);
        gst_object_unref (bus);

        /* configure the appsrc, we will push data into the appsrc from the
         * mainloop */
        g_signal_connect (gstSession.source, "need-data", G_CALLBACK
(start_feed), &gstSession);
        g_signal_connect (gstSession.source, "enough-data", G_CALLBACK
(stop_feed), &gstSession);
        g_object_set (gstSession.source, "is-live", true, NULL);
        g_object_set (gstSession.source, "do-timestamp", false, NULL);
        g_object_set (gstSession.source, "format", GST_FORMAT_TIME, NULL);

    gstSession.jitterbuffer = gst_element_factory_make("rtpjitterbuffer",
"rtpjitterbuffer");
    if (video_format == "H264") {
        gstSession.payload_depay = gst_element_factory_make("rtph264depay",
"depay");
        gstSession.decoder = gst_element_factory_make("avdec_h264",
"decoder");


    } else if (video_format=="MP4V-ES"){

            gstSession.payload_depay =
gst_element_factory_make("rtpmp4vdepay", "depay");
            gstSession.decoder = gst_element_factory_make("avdec_mpeg4",
"decoder");

    }

    gstSession.videoconvert = gst_element_factory_make("videoconvert",
"videoconvert");
    gstSession.videoscale = gst_element_factory_make("videoscale",
"videoscale");
    gstSession.display = gst_element_factory_make("xvimagesink",
"ximagesink");
    gstSession.queue = gst_element_factory_make("queue", "queue");

    gstSession.timeoverlay = gst_element_factory_make("timeoverlay",
"timeoverlay");
    gstSession.capsfilter = gst_element_factory_make("capsfilter",
"capsfilter");


    GstCaps *caps = gst_caps_new_simple("application/x-rtp",
                                        "media", G_TYPE_STRING, "video",
                                        "clock-rate", G_TYPE_INT,
clock_rate,
                                        "encoding-name", G_TYPE_STRING,
video_format.c_str(),
                                        "profile-level-id", G_TYPE_STRING,
pli.c_str(),
                                        "sprop-parameter-sets",
G_TYPE_STRING, sps.c_str(),
                                        NULL);

    if (!gstSession.source || !gstSession.payload_depay ||
!gstSession.jitterbuffer || !gstSession.capsfilter || !gstSession.decoder ||
        !gstSession.videoconvert || !gstSession.videoscale ||
!gstSession.display || !gstSession.timeoverlay) {
        g_printerr("One gst element could not be created.\n");
        return 1;
    }

    g_object_set(G_OBJECT(gstSession.capsfilter), "caps", caps, NULL);
    g_object_set(G_OBJECT(gstSession.jitterbuffer), "mode", 0, NULL);

    gst_bin_add_many(GST_BIN(gstSession.pipeline), gstSession.source,
gstSession.capsfilter, gstSession.jitterbuffer, gstSession.payload_depay,
                     gstSession.decoder, gstSession.timeoverlay,
gstSession.videoconvert,
                     gstSession.videoscale, gstSession.display,
gstSession.queue, NULL);

    if (!gst_element_link_many(gstSession.source, gstSession.capsfilter,
gstSession.jitterbuffer, gstSession.payload_depay, gstSession.decoder,
                               gstSession.timeoverlay,
gstSession.videoconvert, gstSession.videoscale, gstSession.display,
                               NULL)) {
        g_printerr("Erreur lors du linkage du pipeline principale (flux
camera)\n");
    };

    gstSession.main_loop = g_main_loop_new(NULL, FALSE);
    gst_element_set_state(gstSession.pipeline, GST_STATE_PLAYING);

    g_print("Lancement de GstreamerVisualiser... \n");
    g_main_loop_run(gstSession.main_loop);

    gst_element_set_state(gstSession.pipeline, GST_STATE_NULL);
    gst_object_unref(GST_OBJECT(gstSession.pipeline));
    return 0;
}
```

So basically i wanted to create for each video source an independante
pipeline to make them on display mode in paralle, So what I've done was to
call the start() methode several times in my main function to create several
threads. Finally I've successfully got the displays for all video sources.
But when I uncomment the random perturbation function in the methode
push_data(), all the sources seemed to be influenced and jumpe in to the
sleep mode, which mean that all displays are frozen.

So is there any way to stopping one video source injecting while let the
other sources continue to push data? Do i need to write for each pipeline an
independant push_data()mehtode to avoid the global perturbation?



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list