Sending metadata across UDP

priyanka kataria priyanka.kataria86 at gmail.com
Fri Sep 27 05:19:01 UTC 2019


Hi Nicolas,

Thank you for quick reply.

> Consider adding an rtpjitterbuffer on the receiver, in future GST
version, you'll be able to use rtpsrc/rtpsink (or the rist variant) in
order to get a full feature RTP stream without complex pipeline
construction.

The actual pipeline which I use in the program has "rtpjitterbuffer", I
shared a simple test pipeline.

>For jpeg, consider configuring max-bitrate property on udpsink. As frames
are spread out on a lot more packet, it tend to become bursty and may
saturate the link or exhaust the udpsrc socket buffer-size.

Point noted, will make the changes.

>That is the method I would have used. It should work with any RTP packet,
so you likely have or hit a bug.
Here, I am attaching the source code for option 3 I tried.

"send_h264.c" and "recv_h264.c", work successfully and the frame number is
appended to RTP buffer. Prints in both the probes output the correct value.

However, the probe function (pay_src_probe) in "send_jpeg.c" never gets
called.
When I change the probe type from "GST_PAD_PROBE_TYPE_BUFFER" to
"GST_PAD_PROBE_TYPE_BUFFER_LIST", then it gets called, and the frame
numbers appended are wrong. i.e. for each frame around 100 times this
function gets called and program slows down like a sloth.

I checked the source code for "rtph264pay" and rtpjpegpay" element, both of
them are creating buffer lists which I guess is to push multiple RTP
packets at one go, to the next element in pipeline.
But strangely H264 works fine and JPEG fails.

Please check if my code has some bug.

And do you have any suggestions on KLV metadata approach?

Thanks,
Priyanka

On Thu, Sep 26, 2019 at 7:11 PM Nicolas Dufresne <nicolas at ndufresne.ca>
wrote:

>
>
> Le jeu. 26 sept. 2019 05 h 25, priyanka kataria <
> priyanka.kataria86 at gmail.com> a écrit :
>
>> Hello,
>>
>> I have an interesting problem:
>> Need to transfer some kind of metadata (say frame number) with each frame
>> over UDP. Receiver on the other hand, extracts he frame numebr from each
>> frame and maintains it for some other work.
>>
>> Sample sender and receiver pipelines:
>> Sender: gst-launch-1.0 -v filesrc location=file.h264  ! h264parse !
>> rtph264pay ! udpsink port=5001
>> Receiver: gst-launch-1.0 -v udpsrc port=5001 caps="application/x-rtp,
>> media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264" !
>> rtph264depay ! decodebin ! autovideosink
>>
>
> Consider adding an rtpjitterbuffer on the receiver, in future GST version,
> you'll be able to use rtpsrc/rtpsink (or the rist variant) in order to get
> a full feature RTP stream without complex pipeline construction.
>
> For jpeg, consider configuring max-bitrate property on udpsink. As frames
> are spread out on a lot more packet, it tend to become bursty and may
> saturate the link or exhaust the udpsrc socket buffer-size.
>
>
>
>> Things I have already tried (I am still a beginner, so some of the below
>> things may look stupid):
>> 1. In Sender pipeline, attaching a probe on "h264parse" element and
>> assigning incremental values to "GST_BUFFER_OFFSET".
>> But the set offset value is not reflected in the next element in the same
>> pipeline only.
>>
>> 2. In Sender pipeline, attaching a probe on "h264parse" element and
>> assigning incremental values to "GST_BUFFER_PTS".
>> The set PTS value is reflected in the next elements in the same pipeline,
>> but gets lost across UDP.
>> I checked this by attaching a probe on "rtph264depay" element (src pad).
>>
>> 3. Using "gst_rtp_buffer_add_extension_twobytes_header()".
>> This method works for H264 files, but fails with MJPEG files, and my
>> solution needs to be generic.
>> Here, I can provide more details with code if required.
>>
>
> That is the method I would have used. It should work with any RTP packet,
> so you likely have or hit a bug.
>
>
>> 4. Last thing I am trying is to mux KLV metadata into stream and send it
>> across UDP.
>> I refer the following link:
>> https://www.aeronetworks.ca/2018/05/mpeg-2-transport-streams.html.
>> This doesn't work though as written in the article but gave me an
>> overview on how to use the pipeline.
>> Now I want to create my custom my KLV metadata file which contains only
>> frame numbers and try to mux it.
>>
>> Please help me in creating such file.
>>
>> Also please share if there are any other working approaches I should try
>> to append metadata in each frame buffer.
>>
>> Thanks,
>> Priyanka
>> _______________________________________________
>> gstreamer-devel mailing list
>> gstreamer-devel at lists.freedesktop.org
>> https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
>
> _______________________________________________
> gstreamer-devel mailing list
> gstreamer-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/gstreamer-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20190927/a0c5e1d1/attachment-0001.html>
-------------- next part --------------
#include <gst/gst.h>
#include <gst/rtp/rtp.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <stdio.h>

static GMainLoop *loop;

static gboolean my_bus_callback(GstBus * bus, GstMessage * message, gpointer data)
{
	switch (GST_MESSAGE_TYPE(message)) {

	case GST_MESSAGE_ERROR: {
		GError *err;
		gchar *debug;

		gst_message_parse_error(message, &err, &debug);
		g_print("Error: %s\n", err->message);
		g_error_free(err);
		g_free(debug);
		g_main_loop_quit(loop);
		break;
	}
	case GST_MESSAGE_EOS:
		/* end-of-stream */
		g_main_loop_quit(loop);
		break;
	case GST_MESSAGE_STATE_CHANGED:
		break;
	default:
		/* unhandled message */
		break;
	}

	return TRUE;
}

static void on_pad_added(GstElement *element, GstPad *pad, gpointer data)
{
	GstPad *sinkpad;
	GstElement *parse = (GstElement *)data;

	sinkpad = gst_element_get_static_pad(parse, "sink");
	if (!gst_pad_link(pad, sinkpad))
		g_print("Failed to link rtspsrc pad to sink\n");
	gst_object_unref(sinkpad);
}

static GstPadProbeReturn depay_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
	GstBuffer *buffer = NULL;
	guint size;
	gpointer pointer;
	guint8 appbits;

	buffer = GST_PAD_PROBE_INFO_BUFFER(info);

	GstRTPBuffer rtpBuffer;
	memset(&rtpBuffer, 0, sizeof(GstRTPBuffer));

	if (buffer != NULL)
	{
		if (gst_rtp_buffer_map(buffer, (GstMapFlags)GST_MAP_READ, &rtpBuffer))
		{
			if (gst_rtp_buffer_get_extension(&rtpBuffer)) {
				
				gst_rtp_buffer_get_extension_twobytes_header(&rtpBuffer, &appbits, 5, 0, &pointer, &size);
				guint *data = (guint *)pointer;
				g_print("Frame ID received: %d\n", (int)*data);

				gst_rtp_buffer_unmap(&rtpBuffer);
			}
		}
		else
			g_print("RTP buffer not mapped\n");
	}
	else
		g_print("GST buffer is NULL\n");
		
	return GST_PAD_PROBE_OK;
}

int main(int argc, char *argv[])
{
	GstBus *bus;
	gulong depay_probe_id = 0;
	GstPad *depay_pad = NULL;

	/* Initialize GStreamer */
	gst_init(&argc, &argv);
	loop = g_main_loop_new(NULL, FALSE);

	GstElement *pipeline, *src, *depay, *decode, *conv, *sink;

	/* Build Pipeline */
	pipeline = gst_pipeline_new("My pipeline");

	src = gst_element_factory_make("udpsrc", NULL);
	depay = gst_element_factory_make("rtpjpegdepay", NULL);
	decode = gst_element_factory_make("jpegdec", NULL);
	conv = gst_element_factory_make("videoconvert", NULL);
	sink = gst_element_factory_make("autovideosink", NULL);

	g_object_set(GST_OBJECT(src), "uri", "udp://localhost:5000", NULL);
	g_object_set(GST_OBJECT(src), "caps", gst_caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)JPEG"), NULL);

	gst_bin_add_many(GST_BIN(pipeline), src, depay, decode, conv, sink, NULL);

	if (!gst_element_link_many(src, depay, decode, conv, sink, NULL))
		g_print("Linking failed \n");
	
	depay_pad = gst_element_get_static_pad(depay, "sink");
	gst_pad_add_probe(depay_pad, GST_PAD_PROBE_TYPE_BUFFER,
			depay_pad_buffer_probe, depay, NULL);

	/* change to playing */
	bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
	gst_bus_add_watch(bus, my_bus_callback, loop);
	gst_object_unref(bus);

	int ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
	g_main_loop_run(loop);

	gst_element_set_state(pipeline, GST_STATE_NULL);
	gst_object_unref(GST_OBJECT(pipeline));

	return 0;
}
-------------- next part --------------
#include <gst/gst.h>
#include <gst/rtp/rtp.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <stdio.h>

static GMainLoop *loop;

static gboolean my_bus_callback(GstBus * bus, GstMessage * message, gpointer data)
{
	switch (GST_MESSAGE_TYPE(message)) {

	case GST_MESSAGE_ERROR: {
		GError *err;
		gchar *debug;

		gst_message_parse_error(message, &err, &debug);
		g_print("Error: %s\n", err->message);
		g_error_free(err);
		g_free(debug);
		g_main_loop_quit(loop);
		break;
	}
	case GST_MESSAGE_EOS:
		/* end-of-stream */
		g_main_loop_quit(loop);
		break;
	case GST_MESSAGE_STATE_CHANGED:
		break;
	default:
		/* unhandled message */
		break;
	}

	return TRUE;
}

static void on_pad_added(GstElement *element, GstPad *pad, gpointer data)
{
	GstPad *sinkpad;
	GstElement *parse = (GstElement *)data;

	sinkpad = gst_element_get_static_pad(parse, "sink");
	if (!gst_pad_link(pad, sinkpad))
		g_print("Failed to link rtspsrc pad to sink\n");
	gst_object_unref(sinkpad);
}

static GstPadProbeReturn depay_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
	GstBuffer *buffer = NULL;
	guint size;
	gpointer pointer;
	guint8 appbits;

	buffer = GST_PAD_PROBE_INFO_BUFFER(info);

	GstRTPBuffer rtpBuffer;
	memset(&rtpBuffer, 0, sizeof(GstRTPBuffer));

	if (buffer != NULL)
	{
		if (gst_rtp_buffer_map(buffer, (GstMapFlags)GST_MAP_READ, &rtpBuffer))
		{
			if (gst_rtp_buffer_get_extension(&rtpBuffer)) {
				
				gst_rtp_buffer_get_extension_twobytes_header(&rtpBuffer, &appbits, 5, 0, &pointer, &size);
				guint *data = (guint *)pointer;
				g_print("Frame ID received: %d\n", (int)*data);

				gst_rtp_buffer_unmap(&rtpBuffer);
			}
		}
		else
			g_print("RTP buffer not mapped\n");
	}
	else
		g_print("GST buffer is NULL\n");
		
	return GST_PAD_PROBE_OK;
}

int main(int argc, char *argv[])
{
	GstBus *bus;
	gulong depay_probe_id = 0;
	GstPad *depay_pad = NULL;

	/* Initialize GStreamer */
	gst_init(&argc, &argv);
	loop = g_main_loop_new(NULL, FALSE);

	GstElement *pipeline, *src, *depay, *decode, *conv, *sink;

	/* Build Pipeline */
	pipeline = gst_pipeline_new("My pipeline");

	src = gst_element_factory_make("udpsrc", NULL);
	depay = gst_element_factory_make("rtph264depay", NULL);
	decode = gst_element_factory_make("avdec_h264", NULL);
	conv = gst_element_factory_make("videoconvert", NULL);
	sink = gst_element_factory_make("autovideosink", NULL);

	g_object_set(GST_OBJECT(src), "uri", "udp://localhost:5000", NULL);
	g_object_set(GST_OBJECT(src), "caps", gst_caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H264"), NULL);

	gst_bin_add_many(GST_BIN(pipeline), src, depay, decode, conv, sink, NULL);

	if (!gst_element_link_many(src, depay, decode, conv, sink, NULL))
		g_print("Linking failed \n");
	
	depay_pad = gst_element_get_static_pad(depay, "sink");
	gst_pad_add_probe(depay_pad, GST_PAD_PROBE_TYPE_BUFFER,
			depay_pad_buffer_probe, depay, NULL);

	/* change to playing */
	bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
	gst_bus_add_watch(bus, my_bus_callback, loop);
	gst_object_unref(bus);

	int ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
	g_main_loop_run(loop);

	gst_element_set_state(pipeline, GST_STATE_NULL);
	gst_object_unref(GST_OBJECT(pipeline));

	return 0;
}
-------------- next part --------------
#include <gst/gst.h>
#include <gst/rtp/rtp.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <stdio.h>
#include <string.h>

static GMainLoop *loop;

static gboolean my_bus_callback(GstBus * bus, GstMessage * message, gpointer data)
{
	switch (GST_MESSAGE_TYPE(message)) {

	case GST_MESSAGE_ERROR: {
		g_main_loop_quit(loop);
		break;
	}
	case GST_MESSAGE_EOS:
		g_main_loop_quit(loop);
		break;
	case GST_MESSAGE_STATE_CHANGED:
		break;
	default:
		break;
	}

	return TRUE;
}

static void cb_new_rtspsrc_pad(GstElement *element, GstPad*pad, gpointer  data)
{
	gchar *name;
	GstCaps * p_caps;
	gchar * description;
	GstElement *p_depay;

	name = gst_pad_get_name(pad);

	p_caps = gst_pad_get_pad_template_caps(pad);

	description = gst_caps_to_string(p_caps);
	g_free(description);

	p_depay = GST_ELEMENT(data);

	if (!gst_element_link_pads(element, name, p_depay, "sink"))
	{
		g_print("Failed to link elements 3\n");
	}

	g_free(name);
}

guint frame_count = 1;
static GstPadProbeReturn pay_src_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
	GstBuffer *buffer = NULL;
	gpointer misc_data = &frame_count;
	guint size;
	gpointer pointer;

	g_print("Frame Count: %d\n", frame_count);

	buffer = GST_PAD_PROBE_INFO_BUFFER(info);

	GstRTPBuffer rtpBuffer;
	guint8 appbits;
	memset(&rtpBuffer, 0, sizeof(GstRTPBuffer));

	if (buffer != NULL)
	{
		if (gst_rtp_buffer_map(buffer, (GstMapFlags)GST_MAP_READWRITE, &rtpBuffer))
		{
			gst_rtp_buffer_add_extension_twobytes_header(&rtpBuffer, 0, 5, misc_data, 4);
			gst_rtp_buffer_get_extension_twobytes_header(&rtpBuffer, &appbits, 5, 0, &pointer, &size);
			guint *data = (guint *)pointer;

			gst_rtp_buffer_unmap(&rtpBuffer);
		}
		else
			g_print("RTP buffer not mapped\n");
	}
	else
		g_print("GST buffer is NULL\n");

	frame_count++;

	return GST_PAD_PROBE_OK;
}

int main(int argc, char *argv[])
{
	GstBus *bus;
	GstPad *pay_src_pad;

	/* Initialize GStreamer */
	gst_init(&argc, &argv);
	loop = g_main_loop_new(NULL, FALSE);

	GstElement *pipeline, *src, *depay, *parse, *pay, *sink;

	/* Build Pipeline */
	pipeline = gst_pipeline_new("My pipeline");

	src = gst_element_factory_make("rtspsrc", NULL);
	depay = gst_element_factory_make("rtpjpegdepay", NULL);
	pay = gst_element_factory_make("rtpjpegpay", NULL);
	sink = gst_element_factory_make("udpsink", NULL);

	//g_object_set(G_OBJECT(sink), "sync", FALSE, NULL);
	g_object_set(G_OBJECT(sink), "port", 5000, NULL);
	g_object_set(GST_OBJECT(src), "location", "rtsp://10.142.56.252:8554/test", NULL);

	gst_bin_add_many(GST_BIN(pipeline), src, depay, NULL);

	// listen for newly created pads
	g_signal_connect(src, "pad-added", G_CALLBACK(cb_new_rtspsrc_pad), depay);

	gst_bin_add_many(GST_BIN(pipeline), pay, sink, NULL);

	if (!gst_element_link_many(depay, pay, sink, NULL))
		g_print("Failed to link pay to sink\n");

	pay_src_pad = gst_element_get_static_pad(pay, "src");
	gst_pad_add_probe(pay_src_pad, GST_PAD_PROBE_TYPE_BUFFER,
		(GstPadProbeCallback)pay_src_probe, pay, NULL);
	gst_object_unref(pay_src_pad);

	/* change to playing */
	bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
	gst_bus_add_watch(bus, my_bus_callback, loop);
	gst_object_unref(bus);

	int ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);

	g_main_loop_run(loop);

	gst_element_set_state(pipeline, GST_STATE_NULL);
	gst_object_unref(GST_OBJECT(pipeline));

	return 0;
}
-------------- next part --------------
#include <gst/gst.h>
#include <gst/rtp/rtp.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <stdio.h>
#include <string.h>

static GMainLoop *loop;

static gboolean my_bus_callback(GstBus * bus, GstMessage * message, gpointer data)
{
	switch (GST_MESSAGE_TYPE(message)) {

	case GST_MESSAGE_ERROR: {
		g_main_loop_quit(loop);
		break;
	}
	case GST_MESSAGE_EOS:
		g_main_loop_quit(loop);
		break;
	case GST_MESSAGE_STATE_CHANGED:
		break;
	default:
		break;
	}

	return TRUE;
}

static void cb_new_rtspsrc_pad(GstElement *element, GstPad*pad, gpointer  data)
{
	gchar *name;
	GstCaps * p_caps;
	gchar * description;
	GstElement *p_depay;

	name = gst_pad_get_name(pad);

	p_caps = gst_pad_get_pad_template_caps(pad);

	description = gst_caps_to_string(p_caps);
	g_free(description);

	p_depay = GST_ELEMENT(data);

	if (!gst_element_link_pads(element, name, p_depay, "sink"))
	{
		g_print("Failed to link elements 3\n");
	}

	g_free(name);
}

guint frame_count = 1;
static GstPadProbeReturn pay_src_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
	GstBuffer *buffer = NULL;
	gpointer misc_data = &frame_count;
	guint size;
	gpointer pointer;

	g_print("Frame Count: %d\n", frame_count);

	buffer = GST_PAD_PROBE_INFO_BUFFER(info);

	GstRTPBuffer rtpBuffer;
	guint8 appbits;
	memset(&rtpBuffer, 0, sizeof(GstRTPBuffer));

	if (buffer != NULL)
	{
		if (gst_rtp_buffer_map(buffer, (GstMapFlags)GST_MAP_READWRITE, &rtpBuffer))
		{
			gst_rtp_buffer_add_extension_twobytes_header(&rtpBuffer, 0, 5, misc_data, 4);
			gst_rtp_buffer_get_extension_twobytes_header(&rtpBuffer, &appbits, 5, 0, &pointer, &size);
			guint *data = (guint *)pointer;

			gst_rtp_buffer_unmap(&rtpBuffer);
		}
		else
			g_print("RTP buffer not mapped\n");
	}
	else
		g_print("GST buffer is NULL\n");

	frame_count++;

	return GST_PAD_PROBE_OK;
}

int main(int argc, char *argv[])
{
	GstBus *bus;
	GstPad *pay_src_pad;

	/* Initialize GStreamer */
	gst_init(&argc, &argv);
	loop = g_main_loop_new(NULL, FALSE);

	GstElement *pipeline, *src, *depay, *parse, *pay, *sink;

	/* Build Pipeline */
	pipeline = gst_pipeline_new("My pipeline");

	src = gst_element_factory_make("rtspsrc", NULL);
	depay = gst_element_factory_make("rtph264depay", NULL);
	parse = gst_element_factory_make("h264parse", NULL);
	pay = gst_element_factory_make("rtph264pay", NULL);
	sink = gst_element_factory_make("udpsink", NULL);

	//g_object_set(G_OBJECT(sink), "sync", FALSE, NULL);
	g_object_set(G_OBJECT(sink), "port", 5000, NULL);
	g_object_set(GST_OBJECT(src), "location", "rtsp://10.142.56.252:8554/test", NULL);

	gst_bin_add_many(GST_BIN(pipeline), src, depay, NULL);

	// listen for newly created pads
	g_signal_connect(src, "pad-added", G_CALLBACK(cb_new_rtspsrc_pad), depay);

	gst_bin_add_many(GST_BIN(pipeline), parse, pay, sink, NULL);

	if (!gst_element_link_many(depay, parse, pay, sink, NULL))
		g_print("Failed to link pay to sink\n");

	pay_src_pad = gst_element_get_static_pad(pay, "src");
	gst_pad_add_probe(pay_src_pad, GST_PAD_PROBE_TYPE_BUFFER,
		(GstPadProbeCallback)pay_src_probe, pay, NULL);
	gst_object_unref(pay_src_pad);

	/* change to playing */
	bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
	gst_bus_add_watch(bus, my_bus_callback, loop);
	gst_object_unref(bus);

	int ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);

	g_main_loop_run(loop);

	gst_element_set_state(pipeline, GST_STATE_NULL);
	gst_object_unref(GST_OBJECT(pipeline));

	return 0;
}
-------------- next part --------------
A non-text attachment was scrubbed...
Name: cmd
Type: application/octet-stream
Size: 456 bytes
Desc: not available
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20190927/a0c5e1d1/attachment-0001.obj>


More information about the gstreamer-devel mailing list