<div dir="ltr"><div dir="ltr"><div dir="ltr">Hi Guys,<div><br></div><div>I am working on an application that is supposed to record video from a RTSP camera alongside audio from a local ALSA source into a MP4 container.</div><div><br></div><div>I know the MP4 container need an EOS to finalize properly so I send EOS signal through the pipeline (both alsasrc and rtspsrc) and await an EOS recieved event before setting state GST_STATE_NULL.</div><div><br></div><div><div>My code is prepended to this mail and inspired from <a href="https://gstreamer.freedesktop.org/documentation/tutorials/basic/dynamic-pipelines.html">https://gstreamer.freedesktop.org/documentation/tutorials/basic/dynamic-pipelines.html</a></div><div><br></div><div>I interact with the program by sending "start" or "stop" to the gst.fifo file handle.</div></div><div><br></div><div>This works fine and I get nice MP4 files, except that when I want to record a new file I see an ever increased memory consumption giving my application a very limited lifespan. </div><div><br></div><div><div>I assume its because audio and video is not in sync and some buffering occur. </div></div><div><div><br></div><div>Pointers on how I should deal with this problem are very welcome.</div></div><div><br></div><div>kind regards</div><div><br></div><div>Jesper</div><div><br></div><div><br></div><div>---------------------------</div><div><br></div><div><div>#include <gst/gst.h></div><div>#include <gst/base/gstbaseparse.h></div><div>#include <stdio.h></div><div>#include <fcntl.h></div><div>#include <stdlib.h></div><div>#include <unistd.h></div><div>#include <string.h></div><div>#include <dirent.h></div><div>#include <sqlite3.h></div><div>#include <sys/statvfs.h></div><div>#include <sys/stat.h> </div><div>#include <sys/types.h> </div><div>#include <pthread.h> </div><div><br></div><div>#define RTSP_CAMERA_L "rtsp://<a href="http://192.168.131.176:1001/">192.168.131.176:1001/</a>"</div><div>#define RTSP_CAMERA_R "rtsp://<a href="http://192.168.131.176:1002/">192.168.131.176:1002/</a>"</div><div>#define BASE_PATH ""</div><div><br></div><div>static int current_recording_starttime = 0;</div><div><br></div><div>int recording = 0;</div><div><br></div><div><br></div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# Datastructure keeping references to pipeline elements</div><div>//#</div><div>//######################################################################</div><div><br></div><div>typedef struct _CustomData {</div><div>    GstElement *pipeline;</div><div><br></div><div>    GstElement *rtspsrc_l;</div><div>    GstElement *rtph264depay_l;</div><div>    GstElement *h264parse_l;</div><div>    GstElement *mp4mux_l;</div><div>    GstElement *filesink_l;</div><div>    </div><div>    GstElement *alsasrc;</div><div>    GstElement *audioconvert;</div><div>    GstElement *faac;</div><div>    GstElement *audio_tee;</div><div>    GstElement *queue_l;</div><div><br></div><div>    GMainLoop *loop;</div><div>    GstBus *bus;</div><div>} CustomData;</div><div>CustomData data;</div><div><br></div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# Bus callback</div><div>//#</div><div>//######################################################################</div><div><br></div><div>int bus_callback(GstBus *bus, GstMessage *msg, gpointer d) {</div><div>    GError *err;</div><div>    gchar *debug_info;</div><div><br></div><div>    switch (GST_MESSAGE_TYPE(msg)) {</div><div>        case GST_MESSAGE_ERROR:</div><div>            gst_message_parse_error(msg, &err, &debug_info);</div><div>            g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);</div><div>            g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");</div><div>            g_clear_error(&err);</div><div>            g_free(debug_info);</div><div>            //g_main_loop_quit(data.loop);</div><div>            break;</div><div>        case GST_MESSAGE_EOS:</div><div>            g_print("End-Of-Stream reached.\n");</div><div>            if(gst_element_set_state(data.pipeline, GST_STATE_NULL) == GST_STATE_CHANGE_FAILURE) printf("ERROR A\r\n");</div><div>    </div><div>            break;</div><div>        case GST_MESSAGE_STATE_CHANGED:</div><div>            // We are only interested in state-changed messages from the pipeline </div><div>            if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline)) {</div><div>                GstState old_state, new_state, pending_state;</div><div>                gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);</div><div>                g_print("Pipeline state changed from %s to %s:\n",</div><div>                        gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));</div><div>                if(new_state == GST_STATE_PAUSED && old_state == GST_STATE_PLAYING)</div><div>                {</div><div>                    //printf("AAAAAAAAAAAAAAAAAAAA\r\n");</div><div>                    </div><div>                }</div><div><br></div><div><br></div><div>            } else {</div><div>                GstState old_state, new_state, pending_state;</div><div>                gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);</div><div>                //g_print("Element %s state changed from %s to %s:\n", GST_OBJECT_NAME(GST_MESSAGE_SRC(msg)),</div><div>                //        gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));</div><div>            }</div><div><br></div><div>            break;</div><div>        case GST_MESSAGE_PROGRESS:</div><div>            {</div><div>                //g_print("Progress notify\r\n");</div><div>                GstProgressType type;</div><div>                gchar *code, *text;</div><div>                gst_message_parse_progress (msg, &type, &code, &text);</div><div>                switch (type) {</div><div>                    case GST_PROGRESS_TYPE_START:</div><div>                        //printf("GST_PROGRESS_TYPE_START\r\n");</div><div>                        break;</div><div>                    case GST_PROGRESS_TYPE_CONTINUE:</div><div>                        //printf("GST_PROGRESS_TYPE_CONTINUE\r\n");</div><div>                        break;</div><div>                    case GST_PROGRESS_TYPE_COMPLETE:</div><div>                        //printf("PROGRESS_TYPE_COMPLETE\r\n");</div><div>                        break;</div><div>                    case GST_PROGRESS_TYPE_CANCELED:</div><div>                        printf("GST_PROGRESS_TYPE_CANCELED\r\n");</div><div>                        break;</div><div>                    case GST_PROGRESS_TYPE_ERROR:</div><div>                        printf("GST_PROGRESS_TYPE_ERROR\r\n");</div><div>                        break;</div><div>                    default:</div><div>                        printf("DEFAULT\r\n");</div><div>                        break;</div><div>                }</div><div>                //printf("Progress: (%s) %s\n", code, text);</div><div>                g_free (code);</div><div>                g_free (text);</div><div>            }</div><div>            break;</div><div>        case GST_MESSAGE_NEW_CLOCK:</div><div>            {</div><div>                GstClock *clock;</div><div>                gst_message_parse_new_clock (msg, &clock);</div><div>                printf("New clock: %s\n", (clock ? GST_OBJECT_NAME (clock) : "NULL"));</div><div>                </div><div>            }</div><div>            break;</div><div>        case GST_MESSAGE_STREAM_START:</div><div>            {</div><div>                guint group_id;</div><div>                gst_message_parse_group_id (msg, &group_id);    </div><div>                printf("Stream start group id: %d\n", group_id);</div><div>            }</div><div>            break;</div><div>        case GST_MESSAGE_ASYNC_DONE:</div><div>            {</div><div>                GstClockTime t;</div><div>                gst_message_parse_async_done(msg, &t);</div><div>                printf("Async done: %u ns\n", t);</div><div>            }</div><div>            break;</div><div>        case GST_MESSAGE_STREAM_STATUS:</div><div>            {</div><div>                //GstClockTime t;</div><div>                //gst_message_parse_stream_status(msg, &t);</div><div>                //printf("Stream status\n");</div><div>            }</div><div>            break;</div><div>        case GST_MESSAGE_LATENCY:</div><div>            {</div><div>                //GstClockTime t;</div><div>                //gst_message_parse_stream_status(msg, &t);</div><div>                printf("Latency\n");</div><div>            }</div><div>            break;</div><div>            </div><div>        default:</div><div>            // We should not reach here </div><div>            g_printerr("Unexpected message received. Type: %s\n", GST_MESSAGE_TYPE_NAME(msg));</div><div>            </div><div>            break;</div><div>    }</div><div>    return TRUE;</div><div>}</div><div><br></div><div><br></div><div><br></div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# Callback for rtsp sources to connect to pipeline</div><div>//#</div><div>//######################################################################</div><div><br></div><div><br></div><div>static void pad_added_handler(GstElement *src, GstPad *new_pad, CustomData* data) </div><div>{</div><div>    GstPad *sink_pad;</div><div><br></div><div>    sink_pad = gst_element_get_static_pad(data->rtph264depay_l, "sink");</div><div>    </div><div>    if (gst_pad_is_linked (sink_pad)) </div><div>    {</div><div>        g_print ("We are already linked. Ignoring.\n");</div><div>        goto exit;</div><div>    }</div><div><br></div><div>    //g_print("Received new pad '%s' from '%s':\n", GST_PAD_NAME(new_pad), GST_ELEMENT_NAME(src));</div><div><br></div><div>    GstCaps *new_pad_caps = gst_pad_get_current_caps(new_pad);</div><div>    GstStructure *new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);</div><div>    const gchar *new_pad_type = gst_structure_get_name(new_pad_struct);</div><div>    </div><div>    if (g_str_has_prefix(new_pad_type, "application/x-rtp")) {</div><div>        GstPadLinkReturn ret = gst_pad_link (new_pad, sink_pad);</div><div>        if (GST_PAD_LINK_FAILED(ret)) {</div><div>            g_print("Type is '%s' but link failed.\n", new_pad_type);</div><div>        } else {</div><div>            g_print("Link succeeded (type '%s') from %s to %s.\n", new_pad_type, GST_PAD_NAME(new_pad), GST_PAD_NAME(sink_pad));</div><div>        }</div><div>        return;</div><div>    }    </div><div><br></div><div>exit:</div><div>    //Unreference the new pad's caps, if we got them</div><div>    if (new_pad_caps != NULL)</div><div>    {</div><div>        gst_caps_unref(new_pad_caps);</div><div>    }</div><div>    //Unreference the sink pad</div><div>    gst_object_unref(sink_pad);</div><div>}</div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# GST_Init</div><div>//#</div><div>//######################################################################</div><div><br></div><div>int initialize()</div><div>{</div><div>    //Create base pipeline</div><div>    char buffer[100];</div><div> </div><div>    data.pipeline = gst_pipeline_new("veo-pipeline");</div><div>    if (data.pipeline == NULL)</div><div>    {</div><div>        printf("Pipeline could not be created.");</div><div>        return -1;</div><div>    }</div><div><br></div><div>    //Create all elements - RIGHT</div><div>    data.rtspsrc_l = gst_element_factory_make("rtspsrc", "rtspsrc_l");</div><div>    data.rtph264depay_l = gst_element_factory_make("rtph264depay", "rtph264depay_l");</div><div>    data.h264parse_l = gst_element_factory_make("h264parse", "h264parse_l");</div><div>    data.mp4mux_l = gst_element_factory_make("qtmux", "mp4mux_l");</div><div>    data.filesink_l = gst_element_factory_make("filesink", "filesink_l");</div><div>    </div><div>    data.alsasrc = gst_element_factory_make("alsasrc", "alsasrc");</div><div>    data.audioconvert = gst_element_factory_make("audioconvert", "audioconvert");</div><div>    data.faac = gst_element_factory_make("faac", "faac");</div><div>    data.audio_tee = gst_element_factory_make("tee", "audio_tee");</div><div>    data.queue_l = gst_element_factory_make("queue", "queue_l");</div><div>    </div><div><br></div><div>    if ( !data.rtspsrc_l || !data.rtph264depay_l || !data.h264parse_l || </div><div>         !data.mp4mux_l || !data.filesink_l)</div><div>    {</div><div>        g_printerr("Not all RIGHT elements could be created.\n");</div><div>        return -1;</div><div>    }</div><div><br></div><div><br></div><div>    if ( !data.alsasrc || !data.audioconvert || !data.faac || !data.audio_tee || </div><div>         !data.queue_l)</div><div>    {</div><div>        g_printerr("Not all AUDIO elements could be created.\n");</div><div>        return -1;</div><div>    }</div><div><br></div><div>    gst_bin_add_many(GST_BIN(data.pipeline),</div><div>        data.rtspsrc_l,</div><div>        data.rtph264depay_l,</div><div>        data.h264parse_l,</div><div>        data.mp4mux_l,</div><div>        data.filesink_l, NULL);</div><div>    </div><div>    gst_bin_add_many(GST_BIN(data.pipeline),</div><div>        data.alsasrc,</div><div>        data.audioconvert,</div><div>        data.faac,</div><div>        data.audio_tee, </div><div>        data.queue_l, </div><div>        NULL);</div><div><br></div><div>    </div><div>    //Set parameters</div><div>    gst_base_parse_set_pts_interpolation ((GstBaseParse *)(data.h264parse_l),TRUE);</div><div>    </div><div>    g_object_set(data.alsasrc, "buffer-time", 5 * 1000000, NULL);//5 second buffer</div><div><br></div><div><br></div><div>    sprintf(buffer, "%ssmall", RTSP_CAMERA_L);</div><div>    g_object_set(data.rtspsrc_l,</div><div>        "location", buffer,</div><div>        "ntp-sync", TRUE,</div><div>        "protocols", (1 << 2), //GST_RTSP_LOWER_TRANS_TCP</div><div>        "do-rtsp-keep-alive", FALSE,</div><div>        "debug", TRUE,</div><div>            NULL);</div><div>    </div><div><br></div><div>    // Listen to the bus </div><div>    data.bus = gst_element_get_bus(data.pipeline);</div><div>    gst_bus_add_watch(data.bus, bus_callback, &data);</div><div>    </div><div>    //Setup callback to connect rtsp pads to other modules</div><div>    g_signal_connect(data.rtspsrc_l, "pad-added", G_CALLBACK(pad_added_handler), &(data));</div><div>    </div><div><br></div><div>    gst_element_link(data.rtph264depay_l, data.h264parse_l);</div><div>    gst_element_link(data.h264parse_l, data.mp4mux_l);</div><div>    gst_element_link(data.mp4mux_l, data.filesink_l);</div><div>    </div><div><br></div><div>    gst_element_link(data.alsasrc, data.audioconvert);</div><div>    gst_element_link(data.audioconvert, data.faac);</div><div>    gst_element_link(data.faac, data.audio_tee);</div><div>    gst_element_link(data.audio_tee, data.queue_l);</div><div>    gst_element_link(data.queue_l, data.mp4mux_l);</div><div><br></div><div><br></div><div>}</div><div><br></div><div><br></div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# Start recording</div><div>//#</div><div>//######################################################################</div><div><br></div><div><br></div><div><br></div><div>int start() </div><div>{</div><div>    char buffer[100];</div><div><br></div><div>    printf("#######################################################\r\n");</div><div>    printf("Start\r\n");</div><div>    printf("#######################################################\r\n");</div><div><br></div><div>    if(recording != 0)</div><div>    {</div><div>        printf("Cant start recording when already recording\r\n");</div><div>        return -1;</div><div>    }</div><div>  </div><div>    current_recording_starttime ++;</div><div>    </div><div>    //Set filesink location names</div><div>    sprintf(buffer, "%s%d_l.mp4", BASE_PATH, current_recording_starttime);</div><div>    g_object_set(data.filesink_l, "location", buffer, NULL);</div><div><br></div><div>    //Start pipeline</div><div>    GstPadLinkReturn ret;</div><div>    ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);</div><div>    if (ret == GST_STATE_CHANGE_FAILURE) {</div><div>        g_printerr("Cant set ready.\n");</div><div>        gst_object_unref(data.pipeline);</div><div>        return -1;</div><div>    }    </div><div>    recording = 1;</div><div><br></div><div>    return 0;</div><div>}</div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# Stop recording </div><div>//#</div><div>//######################################################################</div><div><br></div><div>int stop() {</div><div>    printf("#######################################################\r\n");</div><div>    printf("Stop\r\n");</div><div>    printf("#######################################################\r\n");</div><div>    </div><div>    </div><div>    if (recording == 0)</div><div>    {</div><div>        printf("Pipeline must be recording for us to stop it\r\n");</div><div>        return -1;</div><div>    }</div><div>    </div><div><br></div><div>    gst_element_send_event(data.rtspsrc_l, gst_event_new_eos());</div><div>    gst_element_send_event(data.alsasrc, gst_event_new_eos());</div><div>    </div><div>    recording = 0;</div><div><br></div><div>    </div><div>    return 0;</div><div>}</div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# FIFO</div><div>//#</div><div>//######################################################################</div><div><br></div><div>int start_counter = 0;</div><div>int stop_counter = 0;</div><div>#define BUFLEN 128</div><div>char buf[BUFLEN];</div><div>char* TARGET_START = "start";</div><div>char* TARGET_STOP  = "stop";</div><div>//The fifo filehandle we are told to start/stop through</div><div>//"start" or "stop" is sent</div><div>char *fifo_name = "gst.fifo";</div><div><br></div><div>pthread_t fifo_thread_id; </div><div><br></div><div>void* pollfifo(void* p) {</div><div>    //sleep(5);</div><div>    printf("Listen to fifo\r\n");</div><div>    //Read from fifo untill empty</div><div>    while(1)</div><div>    {</div><div>        int fifo = open(fifo_name, O_RDONLY);</div><div>        char buffer[128];</div><div>        int n = read(fifo, buffer, 128);</div><div>        if (n > 0) {</div><div>            for(int i = 0; i < n; i++)</div><div>            {</div><div>                if (buffer[i] == TARGET_START[start_counter]) {</div><div>                    start_counter++;</div><div>                    if (start_counter >= strlen(TARGET_START)) {</div><div>                        start_counter = 0;</div><div>                        start();</div><div>                    }</div><div>                } else {</div><div>                    start_counter = 0;</div><div>                }</div><div>                </div><div>                if (buffer[i] == TARGET_STOP[stop_counter]) {</div><div>                    stop_counter++;</div><div>                    if (stop_counter >= strlen(TARGET_STOP)) {</div><div>                        stop_counter = 0;</div><div>                        stop();</div><div>                    }</div><div>                } else {</div><div>                    stop_counter = 0;</div><div>                }</div><div>            }</div><div>        }</div><div>        close(fifo);</div><div>    }</div><div>}</div><div><br></div><div>//######################################################################</div><div>//#</div><div>//# Main</div><div>//#</div><div>//######################################################################</div><div><br></div><div>int main(int argc, char *argv[]) </div><div>{</div><div>    memset(&data, 0, sizeof(CustomData));</div><div>    gst_init(&argc, &argv);</div><div>    guint major;</div><div>    guint minor;</div><div>    guint micro;</div><div>    guint nano;</div><div>    gst_version(&major, &minor, &micro, &nano);</div><div>    printf("Recorder\r\n");</div><div>    printf("Gstreamer version %d %d %d %d\r\n", major, minor, micro, nano);</div><div>    mkfifo(fifo_name, 0666);</div><div>    data.loop = g_main_loop_new(NULL, FALSE);    </div><div>    initialize();</div><div>    pthread_create(&fifo_thread_id, NULL, pollfifo, NULL); </div><div>    g_main_loop_run(data.loop);</div><div>    return 0;</div><div>}</div><div><br></div></div></div></div></div>