Pushing samples from appsink to another pipeline

kochmmi1 kochmmi1 at fel.cvut.cz
Mon May 31 14:55:42 UTC 2021


I have appsink application (v4l2src - capsfilter - queue- appsink).. The caps
are set to  Based on flags in the code, I would like to either save image or
push the sample to the file (save the video) in the new-sample callback.

The new-sample callback :
static GstFlowReturn new_sample_mjpeg(GstElement * elt, cameraInterface *
cameraInstance)
{
    if (!cameraInstance) return GST_FLOW_EOS;

    GstSample *sample;
    GstBuffer *buffer;
    GstMemory *memory;
    GstMapInfo info;
    GstFlowReturn ret = GST_FLOW_OK;
    sample = gst_app_sink_pull_sample (GST_APP_SINK (elt));
    if (cameraInstance->isRecording())
cameraInstance->addSampleFromAppsinkVideo(gst_sample_copy(sample));
    buffer = gst_sample_get_buffer (sample);

    if (buffer != NULL) {
        memory = gst_buffer_get_memory (buffer, 0);

        if (memory != NULL) {
            if (cameraInstance->isWaitingToSaveImage())
cameraInstance->saveSampleFromAppsinkJpeg(gst_sample_copy(sample));
            if (gst_memory_map (memory, &info, GST_MAP_READ)) {
                if (cameraInstance->isDisplaing() &&
cameraInstance->displayMsSinceLast()>100 ){
//decode from mjpeg, maximal 10Hz. This is fast ugly hack.
                    uvc_frame_t *rgb, *mjpeg;
                    rgb = uvc_allocate_frame(cameraInstance->width() *
cameraInstance->height() * 3);
                    mjpeg = uvc_allocate_frame(info.size);
                    memcpy(mjpeg->data, info.data, info.size);
                    mjpeg->data_bytes = info.size;
                    mjpeg->width = cameraInstance->width();
                    mjpeg->height = cameraInstance->height();
                    mjpeg->frame_format = UVC_FRAME_FORMAT_MJPEG;
                    mjpeg->library_owns_data = false;
                    uvc_mjpeg2rgb(mjpeg, rgb);
                    cameraInstance->addRgbSampleToBuffer(rgb);
                    uvc_free_frame(rgb);
                    uvc_free_frame(mjpeg);
                }
                gst_memory_unmap(memory, &info);

            } else {
                std::cerr << "sample_from_sink(): ERROR memory map" <<
std::endl;
            }
            gst_memory_unref(memory);
        } else {
            std::cerr << "sample_from_sink(): ERROR memory" << std::endl;
        }
    } else {
        std::cerr << "sample_from_sink(): ERROR buffer " <<
gst_buffer_get_size(buffer) << std::endl;
    }
    gst_sample_unref (sample);
    return ret;
}
Now, the pipelines are member structs:
    struct GstData{
        GstBus *bus;
        GstElement *pipeline;
        GstElement *source;
        GstElement *capsfilter;
        GstElement *queue;
        GstElement *muxer;
        GstElement *sink;
        std::mutex mutex;
        bool running = false;
    };

    GstData fromDevToApp; //pipeline from v4l2src (camera) to our app
    GstData fromAppToVid; //pipeline from our app to the videofile
    GstData fromAppToImg; //pipeline from our app to the imagefile

Now we have folowing methods in the cameraInterface class:
creating/terminating the video pipeline:
bool cameraInterface::createPipelineVideo(std::string path){
     if (!gst_is_initialized()) {
         qWarning()<<__PRETTY_FUNCTION__<<"Initializing gst";
         setenv("GST_DEBUG", ("*:" + std::to_string(3)).c_str(), 1);
         gst_init(nullptr, nullptr);
     }
     std::lock_guard<std::mutex>lock (fromAppToVid.mutex);

     GstStateChangeReturn ret;
     fromAppToVid.source = gst_element_factory_make ("appsrc",
"saveVideoSource");
     fromAppToVid.muxer = gst_element_factory_make ("avimux", "avimux");
     fromAppToVid.queue = gst_element_factory_make("queue", "rcr_queue");
     fromAppToVid.sink = gst_element_factory_make ("splitmuxsink", "sink");

     g_object_set (fromAppToVid.sink, "muxer", fromAppToVid.muxer, NULL);
     g_object_set (fromAppToVid.sink, "max-size-bytes", 273741824, NULL);

     g_object_set (fromAppToVid.sink, "location", path.c_str(), NULL);
     g_signal_connect(fromAppToVid.sink,"format-location-full", G_CALLBACK
(new_segment), NULL);

     g_object_set (fromAppToVid.source, "do-timestamp", true, 0, NULL);

     gst_util_set_object_arg (G_OBJECT (fromAppToVid.source), "format",
"time");


     fromAppToVid.pipeline = gst_pipeline_new ("pipeline_vid");

     if (!fromAppToVid.pipeline || !fromAppToVid.source ||
!fromAppToVid.muxer || !fromAppToVid.sink) {
       g_printerr ("Not all elements could be created.\n");
       return false;
     }

     GstCaps *caps = gst_caps_new_simple ("image/jpeg",
                  "width", G_TYPE_INT, setResolution.width,
                  "height", G_TYPE_INT, setResolution.height,
                  "interlace-mode", G_TYPE_STRING, "progressive",
                  "framerate", GST_TYPE_FRACTION, setResolution.framerate,
1,
                  NULL);


     gst_app_src_set_caps(GST_APP_SRC(fromAppToVid.source), caps);
     gst_caps_unref (caps);

     gst_app_src_set_duration(GST_APP_SRC(fromAppToVid.source),
GST_TIME_AS_MSECONDS(80));
     gst_app_src_set_latency(GST_APP_SRC(fromAppToVid.source), -1, 0);
     gst_app_src_set_stream_type(GST_APP_SRC(fromAppToVid.source),
GST_APP_STREAM_TYPE_STREAM);

     gst_bin_add_many (GST_BIN (fromAppToVid.pipeline),
fromAppToVid.source,fromAppToVid.queue, fromAppToVid.sink, NULL);

     if (gst_element_link_many(fromAppToVid.source, fromAppToVid.queue,
fromAppToVid.sink, NULL) != TRUE) {
       g_printerr ("Elements could not be linked.\n");
       gst_object_unref (fromAppToVid.pipeline);
       return false;
     }

     ret = gst_element_set_state (fromAppToVid.pipeline, GST_STATE_PLAYING);
     if (ret == GST_STATE_CHANGE_FAILURE) {
       g_printerr ("Unable to set the pipeline to the playing state.\n");
       gst_object_unref (fromAppToVid.pipeline);
       return false;
     }

     fromAppToVid.running = true;
    return  fromAppToVid.running;
}

bool cameraInterface::terminatePipeline(GstData *data){
    std::lock_guard<std::mutex>lock (data->mutex);
    if (!data->running) return false;
    GstMessage *EndMessage = gst_message_new_eos(&data->pipeline->object);
    gst_bus_post(data->pipeline->bus, EndMessage);
    gst_element_send_event(data->pipeline, gst_event_new_eos());

    /* Free resources */
    if (EndMessage != NULL)
      gst_message_unref (EndMessage);
    GstFlowReturn status = GstFlowReturn::GST_FLOW_OK;
    status = gst_app_src_end_of_stream(GST_APP_SRC(data->source));
    //end the pipeline
    usleep(500000); // Important
    gst_element_set_state (data->pipeline, GST_STATE_NULL);
    GstState currentState = GST_STATE_READY;
    GstClockTime timeout = 50;
    uint8_t safetyCounter = 255;
    do{
        gst_element_get_state(data->pipeline, &currentState, NULL,timeout );
        if (safetyCounter-- == 0){
            break;
        }
        usleep(10000);
    } while (currentState != GST_STATE_NULL);
    gst_object_unref (data->pipeline);
    data->running = false;
    return true;
}.
Adding sample is called from the callback: 

void cameraInterface::addSampleFromAppsinkVideo(GstSample *sample){
    if (fromAppToVid.running){
        std::lock_guard<std::mutex> lock(fromAppToVid.mutex);
        GstFlowReturn ret = 
gst_app_src_push_sample(GST_APP_SRC(fromAppToVid.source), sample);
    } 
    gst_sample_unref(sample);
}
The app crashes on recording after ~4-8gb of video (never over 10Gb.
different resoulutions were tested, but the 4-7Gb seems to be more
consistent than watching the number of created files) is saved,, leading me
to the idea that it is a memory-leak issue. I do not see where though.

The app also failes to save images - it saved the first 4 or five, and then
declared error. I tried some debbug, now I got the error right away. Saving
is handled as:

bool cameraInterface ::createPipelineImage(std::string path, GstSample
*sample){
    std::lock_guard<std::mutex>lock (fromAppToImg.mutex);
    fromAppToImg.running = true;
    GstStateChangeReturn ret;

    fromAppToImg.source = gst_element_factory_make ("appsrc",
"appsrc_capture");
    fromAppToImg.sink = gst_element_factory_make ("multifilesink",
"sink_capture");

    g_object_set (fromAppToImg.sink, "location", path.c_str(), NULL);

    fromAppToImg.pipeline = gst_pipeline_new ("pipeline_img");

    if (!fromAppToImg.pipeline || !fromAppToImg.source ||
!fromAppToImg.sink) {
      g_printerr ("Not all elements could be created.\n");
      fromAppToImg.running = false;
      return false;
    }
    GstCaps *caps;
    if (sample)caps = gst_sample_get_caps(sample);
    else caps = gst_caps_new_simple ("image/jpeg",
                 "width", G_TYPE_INT, setResolution.width,
                 "height", G_TYPE_INT, setResolution.height,
                 "framerate", GST_TYPE_FRACTION, setResolution.framerate, 1,
                 "pixel-aspect-ratio", GST_TYPE_FRACTION, 1,1,
                 "interlace-mode", G_TYPE_STRING, "progresive",
                 NULL);


    gst_app_src_set_caps(GST_APP_SRC(fromAppToImg.source), caps);
    gst_app_src_set_duration(GST_APP_SRC(fromAppToImg.source),
GST_TIME_AS_MSECONDS(80));
    gst_app_src_set_stream_type(GST_APP_SRC(fromAppToImg.source),
GST_APP_STREAM_TYPE_STREAM);
    gst_app_src_set_latency(GST_APP_SRC(fromAppToImg.source), -1, 0);

    gst_bin_add_many (GST_BIN (fromAppToImg.pipeline), fromAppToImg.source,
fromAppToImg.sink, NULL);
    gst_caps_unref (caps);
    if (gst_element_link_many(fromAppToImg.source, fromAppToImg.sink, NULL)
!= TRUE) {
      g_printerr ("Elements could not be linked.\n");
      gst_object_unref (fromAppToImg.pipeline);
      fromAppToImg.running = false;
      return false;
    }

    ret = gst_element_set_state (fromAppToImg.pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE) {
      g_printerr ("Unable to set the pipeline to the playing state.\n");
      gst_object_unref (fromAppToImg.pipeline);
      fromAppToImg.running = false;
      return false;
    }
    return true;
}
int cameraInterface::saveSampleFromAppsinkJpeg( GstSample *sample){

   std::thread([=]{
       mStatusFlags.shouldSaveImage = false;
       if (!createPipelineImage(mImagePath)){
           return;
       }
       if (!mStatusFlags.shouldSaveImage) {
           terminatePipeline(&fromAppToImg);
       }
       //push the image in the pipeline
       GstFlowReturn status = GstFlowReturn::GST_FLOW_OK;
       status =
gst_app_src_push_sample(GST_APP_SRC(fromAppToImg.source),sample);
       if (status !=  GstFlowReturn::GST_FLOW_OK) g_printerr ("Sample for
saving image not pushed: code %d.\n", status);
       usleep(50000);
       status = gst_app_src_end_of_stream(GST_APP_SRC(fromAppToImg.source));
       if (status !=  GstFlowReturn::GST_FLOW_OK) g_printerr ("EOS for
saving image not pushed %d \n", status);
       usleep(50000);
       //end the pipeline
       terminatePipeline(&fromAppToImg);
       mStatusFlags.shouldSaveImage = false;
       gst_sample_unref(sample);
       return;
   }).detach();
   return 1;
}
, the error output is 
"0:01:08.760736008  2282 0x65114430 FIXME                default
gstutils.c:3963:gst_pad_create_stream_id_internal:<appsrc_capture:src>
Creating random stream-id, consider implementing a deterministic way of
creating a stream-id

(xxx:2282): GStreamer-CRITICAL **: 15:53:01.984: gst_mini_object_unref:
assertion 'GST_MINI_OBJECT_REFCOUNT_VALUE (mini_object) > 0' failed

(xxx:2282): GLib-GObject-WARNING **: 15:53:01.995: invalid unclassed pointer
in cast to 'GstAppSrc'

** (xxx:2282): CRITICAL **: 15:53:01.995: gst_app_src_set_caps: assertion
'GST_IS_APP_SRC (appsrc)' failed

** (xxx:2282): CRITICAL **: 15:53:01.995: gst_app_src_push_internal:
assertion 'GST_IS_APP_SRC (appsrc)' failed
Sample for saving image not pushed: code -5.

(xxx:2282): GLib-GObject-WARNING **: 15:53:02.046: invalid unclassed pointer
in cast to 'GstAppSrc'

** (xxx:2282): CRITICAL **: 15:53:02.046: gst_app_src_end_of_stream:
assertion 'GST_IS_APP_SRC (appsrc)' failed
EOS for saving image not pushed -5 "

Am I forgotting to free/unref something? The GST is 1.14.2.



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list