Branching pipeline burning memory when restarting
Jesper Taxbøl
jesper at taxboel.dk
Thu Jan 24 22:37:58 UTC 2019
Hi Guys,
I am working on an application that is supposed to record video from a RTSP
camera alongside audio from a local ALSA source into a MP4 container.
I know the MP4 container need an EOS to finalize properly so I send EOS
signal through the pipeline (both alsasrc and rtspsrc) and await an EOS
recieved event before setting state GST_STATE_NULL.
My code is prepended to this mail and inspired from
https://gstreamer.freedesktop.org/documentation/tutorials/basic/dynamic-pipelines.html
I interact with the program by sending "start" or "stop" to the gst.fifo
file handle.
This works fine and I get nice MP4 files, except that when I want to record
a new file I see an ever increased memory consumption giving my application
a very limited lifespan.
I assume its because audio and video is not in sync and some buffering
occur.
Pointers on how I should deal with this problem are very welcome.
kind regards
Jesper
---------------------------
#include <gst/gst.h>
#include <gst/base/gstbaseparse.h>
#include <stdio.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <dirent.h>
#include <sqlite3.h>
#include <sys/statvfs.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <pthread.h>
#define RTSP_CAMERA_L "rtsp://192.168.131.176:1001/"
#define RTSP_CAMERA_R "rtsp://192.168.131.176:1002/"
#define BASE_PATH ""
static int current_recording_starttime = 0;
int recording = 0;
//######################################################################
//#
//# Datastructure keeping references to pipeline elements
//#
//######################################################################
typedef struct _CustomData {
GstElement *pipeline;
GstElement *rtspsrc_l;
GstElement *rtph264depay_l;
GstElement *h264parse_l;
GstElement *mp4mux_l;
GstElement *filesink_l;
GstElement *alsasrc;
GstElement *audioconvert;
GstElement *faac;
GstElement *audio_tee;
GstElement *queue_l;
GMainLoop *loop;
GstBus *bus;
} CustomData;
CustomData data;
//######################################################################
//#
//# Bus callback
//#
//######################################################################
int bus_callback(GstBus *bus, GstMessage *msg, gpointer d) {
GError *err;
gchar *debug_info;
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_ERROR:
gst_message_parse_error(msg, &err, &debug_info);
g_printerr("Error received from element %s: %s\n",
GST_OBJECT_NAME(msg->src), err->message);
g_printerr("Debugging information: %s\n", debug_info ?
debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
//g_main_loop_quit(data.loop);
break;
case GST_MESSAGE_EOS:
g_print("End-Of-Stream reached.\n");
if(gst_element_set_state(data.pipeline, GST_STATE_NULL) ==
GST_STATE_CHANGE_FAILURE) printf("ERROR A\r\n");
break;
case GST_MESSAGE_STATE_CHANGED:
// We are only interested in state-changed messages from the
pipeline
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline)) {
GstState old_state, new_state, pending_state;
gst_message_parse_state_changed(msg, &old_state,
&new_state, &pending_state);
g_print("Pipeline state changed from %s to %s:\n",
gst_element_state_get_name(old_state),
gst_element_state_get_name(new_state));
if(new_state == GST_STATE_PAUSED && old_state ==
GST_STATE_PLAYING)
{
//printf("AAAAAAAAAAAAAAAAAAAA\r\n");
}
} else {
GstState old_state, new_state, pending_state;
gst_message_parse_state_changed(msg, &old_state,
&new_state, &pending_state);
//g_print("Element %s state changed from %s to %s:\n",
GST_OBJECT_NAME(GST_MESSAGE_SRC(msg)),
// gst_element_state_get_name(old_state),
gst_element_state_get_name(new_state));
}
break;
case GST_MESSAGE_PROGRESS:
{
//g_print("Progress notify\r\n");
GstProgressType type;
gchar *code, *text;
gst_message_parse_progress (msg, &type, &code, &text);
switch (type) {
case GST_PROGRESS_TYPE_START:
//printf("GST_PROGRESS_TYPE_START\r\n");
break;
case GST_PROGRESS_TYPE_CONTINUE:
//printf("GST_PROGRESS_TYPE_CONTINUE\r\n");
break;
case GST_PROGRESS_TYPE_COMPLETE:
//printf("PROGRESS_TYPE_COMPLETE\r\n");
break;
case GST_PROGRESS_TYPE_CANCELED:
printf("GST_PROGRESS_TYPE_CANCELED\r\n");
break;
case GST_PROGRESS_TYPE_ERROR:
printf("GST_PROGRESS_TYPE_ERROR\r\n");
break;
default:
printf("DEFAULT\r\n");
break;
}
//printf("Progress: (%s) %s\n", code, text);
g_free (code);
g_free (text);
}
break;
case GST_MESSAGE_NEW_CLOCK:
{
GstClock *clock;
gst_message_parse_new_clock (msg, &clock);
printf("New clock: %s\n", (clock ? GST_OBJECT_NAME (clock)
: "NULL"));
}
break;
case GST_MESSAGE_STREAM_START:
{
guint group_id;
gst_message_parse_group_id (msg, &group_id);
printf("Stream start group id: %d\n", group_id);
}
break;
case GST_MESSAGE_ASYNC_DONE:
{
GstClockTime t;
gst_message_parse_async_done(msg, &t);
printf("Async done: %u ns\n", t);
}
break;
case GST_MESSAGE_STREAM_STATUS:
{
//GstClockTime t;
//gst_message_parse_stream_status(msg, &t);
//printf("Stream status\n");
}
break;
case GST_MESSAGE_LATENCY:
{
//GstClockTime t;
//gst_message_parse_stream_status(msg, &t);
printf("Latency\n");
}
break;
default:
// We should not reach here
g_printerr("Unexpected message received. Type: %s\n",
GST_MESSAGE_TYPE_NAME(msg));
break;
}
return TRUE;
}
//######################################################################
//#
//# Callback for rtsp sources to connect to pipeline
//#
//######################################################################
static void pad_added_handler(GstElement *src, GstPad *new_pad, CustomData*
data)
{
GstPad *sink_pad;
sink_pad = gst_element_get_static_pad(data->rtph264depay_l, "sink");
if (gst_pad_is_linked (sink_pad))
{
g_print ("We are already linked. Ignoring.\n");
goto exit;
}
//g_print("Received new pad '%s' from '%s':\n", GST_PAD_NAME(new_pad),
GST_ELEMENT_NAME(src));
GstCaps *new_pad_caps = gst_pad_get_current_caps(new_pad);
GstStructure *new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);
const gchar *new_pad_type = gst_structure_get_name(new_pad_struct);
if (g_str_has_prefix(new_pad_type, "application/x-rtp")) {
GstPadLinkReturn ret = gst_pad_link (new_pad, sink_pad);
if (GST_PAD_LINK_FAILED(ret)) {
g_print("Type is '%s' but link failed.\n", new_pad_type);
} else {
g_print("Link succeeded (type '%s') from %s to %s.\n",
new_pad_type, GST_PAD_NAME(new_pad), GST_PAD_NAME(sink_pad));
}
return;
}
exit:
//Unreference the new pad's caps, if we got them
if (new_pad_caps != NULL)
{
gst_caps_unref(new_pad_caps);
}
//Unreference the sink pad
gst_object_unref(sink_pad);
}
//######################################################################
//#
//# GST_Init
//#
//######################################################################
int initialize()
{
//Create base pipeline
char buffer[100];
data.pipeline = gst_pipeline_new("veo-pipeline");
if (data.pipeline == NULL)
{
printf("Pipeline could not be created.");
return -1;
}
//Create all elements - RIGHT
data.rtspsrc_l = gst_element_factory_make("rtspsrc", "rtspsrc_l");
data.rtph264depay_l = gst_element_factory_make("rtph264depay",
"rtph264depay_l");
data.h264parse_l = gst_element_factory_make("h264parse", "h264parse_l");
data.mp4mux_l = gst_element_factory_make("qtmux", "mp4mux_l");
data.filesink_l = gst_element_factory_make("filesink", "filesink_l");
data.alsasrc = gst_element_factory_make("alsasrc", "alsasrc");
data.audioconvert = gst_element_factory_make("audioconvert",
"audioconvert");
data.faac = gst_element_factory_make("faac", "faac");
data.audio_tee = gst_element_factory_make("tee", "audio_tee");
data.queue_l = gst_element_factory_make("queue", "queue_l");
if ( !data.rtspsrc_l || !data.rtph264depay_l || !data.h264parse_l ||
!data.mp4mux_l || !data.filesink_l)
{
g_printerr("Not all RIGHT elements could be created.\n");
return -1;
}
if ( !data.alsasrc || !data.audioconvert || !data.faac ||
!data.audio_tee ||
!data.queue_l)
{
g_printerr("Not all AUDIO elements could be created.\n");
return -1;
}
gst_bin_add_many(GST_BIN(data.pipeline),
data.rtspsrc_l,
data.rtph264depay_l,
data.h264parse_l,
data.mp4mux_l,
data.filesink_l, NULL);
gst_bin_add_many(GST_BIN(data.pipeline),
data.alsasrc,
data.audioconvert,
data.faac,
data.audio_tee,
data.queue_l,
NULL);
//Set parameters
gst_base_parse_set_pts_interpolation ((GstBaseParse
*)(data.h264parse_l),TRUE);
g_object_set(data.alsasrc, "buffer-time", 5 * 1000000, NULL);//5 second
buffer
sprintf(buffer, "%ssmall", RTSP_CAMERA_L);
g_object_set(data.rtspsrc_l,
"location", buffer,
"ntp-sync", TRUE,
"protocols", (1 << 2), //GST_RTSP_LOWER_TRANS_TCP
"do-rtsp-keep-alive", FALSE,
"debug", TRUE,
NULL);
// Listen to the bus
data.bus = gst_element_get_bus(data.pipeline);
gst_bus_add_watch(data.bus, bus_callback, &data);
//Setup callback to connect rtsp pads to other modules
g_signal_connect(data.rtspsrc_l, "pad-added",
G_CALLBACK(pad_added_handler), &(data));
gst_element_link(data.rtph264depay_l, data.h264parse_l);
gst_element_link(data.h264parse_l, data.mp4mux_l);
gst_element_link(data.mp4mux_l, data.filesink_l);
gst_element_link(data.alsasrc, data.audioconvert);
gst_element_link(data.audioconvert, data.faac);
gst_element_link(data.faac, data.audio_tee);
gst_element_link(data.audio_tee, data.queue_l);
gst_element_link(data.queue_l, data.mp4mux_l);
}
//######################################################################
//#
//# Start recording
//#
//######################################################################
int start()
{
char buffer[100];
printf("#######################################################\r\n");
printf("Start\r\n");
printf("#######################################################\r\n");
if(recording != 0)
{
printf("Cant start recording when already recording\r\n");
return -1;
}
current_recording_starttime ++;
//Set filesink location names
sprintf(buffer, "%s%d_l.mp4", BASE_PATH, current_recording_starttime);
g_object_set(data.filesink_l, "location", buffer, NULL);
//Start pipeline
GstPadLinkReturn ret;
ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
g_printerr("Cant set ready.\n");
gst_object_unref(data.pipeline);
return -1;
}
recording = 1;
return 0;
}
//######################################################################
//#
//# Stop recording
//#
//######################################################################
int stop() {
printf("#######################################################\r\n");
printf("Stop\r\n");
printf("#######################################################\r\n");
if (recording == 0)
{
printf("Pipeline must be recording for us to stop it\r\n");
return -1;
}
gst_element_send_event(data.rtspsrc_l, gst_event_new_eos());
gst_element_send_event(data.alsasrc, gst_event_new_eos());
recording = 0;
return 0;
}
//######################################################################
//#
//# FIFO
//#
//######################################################################
int start_counter = 0;
int stop_counter = 0;
#define BUFLEN 128
char buf[BUFLEN];
char* TARGET_START = "start";
char* TARGET_STOP = "stop";
//The fifo filehandle we are told to start/stop through
//"start" or "stop" is sent
char *fifo_name = "gst.fifo";
pthread_t fifo_thread_id;
void* pollfifo(void* p) {
//sleep(5);
printf("Listen to fifo\r\n");
//Read from fifo untill empty
while(1)
{
int fifo = open(fifo_name, O_RDONLY);
char buffer[128];
int n = read(fifo, buffer, 128);
if (n > 0) {
for(int i = 0; i < n; i++)
{
if (buffer[i] == TARGET_START[start_counter]) {
start_counter++;
if (start_counter >= strlen(TARGET_START)) {
start_counter = 0;
start();
}
} else {
start_counter = 0;
}
if (buffer[i] == TARGET_STOP[stop_counter]) {
stop_counter++;
if (stop_counter >= strlen(TARGET_STOP)) {
stop_counter = 0;
stop();
}
} else {
stop_counter = 0;
}
}
}
close(fifo);
}
}
//######################################################################
//#
//# Main
//#
//######################################################################
int main(int argc, char *argv[])
{
memset(&data, 0, sizeof(CustomData));
gst_init(&argc, &argv);
guint major;
guint minor;
guint micro;
guint nano;
gst_version(&major, &minor, µ, &nano);
printf("Recorder\r\n");
printf("Gstreamer version %d %d %d %d\r\n", major, minor, micro, nano);
mkfifo(fifo_name, 0666);
data.loop = g_main_loop_new(NULL, FALSE);
initialize();
pthread_create(&fifo_thread_id, NULL, pollfifo, NULL);
g_main_loop_run(data.loop);
return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20190124/d6c3040d/attachment-0001.html>
More information about the gstreamer-devel
mailing list