Buffer-type probe at 'src' pad of GstAudioAggregator
Sergei Hrushev
hrushev at gmail.com
Thu Mar 9 11:46:05 UTC 2023
Hello Mr. Kesti!
> Then it is something to do with the probe callback function. If you can post your your probe function may be I can help here/
I prepared the complete example of a small app which modifies audio
frames directly in the BUFFER probe on the fly.
In this app I'm adding probe to the CapsFilter 'src' pad - and the app
works as expected,
it properly overwrites the default signal with sine wave of different frequency.
(maybe there's a better way to determine amount of frames in the
current buffer?).
Also in my real app when I'm adding similar probe for example to the
Tee 'src' pad - it also works.
But I need to use it on AudioAggregator 'src' pad - and there the
produced sound is "jammed".
As in my case the entire audio pipeline works in Docker - I use
streaming to get the audio on host,
but it's easy to change the code to force it to play instead.
To play from streaming I'm using this:
gst-launch-1.0 -v udpsrc port=50111 ! \
application/x-rtp,media=audio,payload=96,encoding-name=OPUS,clock-rate=48000
! \
rtpssrcdemux ! rtpopusdepay ! opusdec ! audioconvert ! audioresample
! queue ! autoaudiosink
Best regards, Sergei Hrushev.
-------- Code example -------
// Most of the boilerplate code is just a variation of code took from here:
// https://gstreamer.freedesktop.org/documentation/plugin-development/basics/testapp.html?gi-language=c
#include <gst/gst.h>
#include <cmath>
#include <string>
struct Storage {
GMainLoop* loop;
GstElement* pipeline;
GstBus* bus;
guint idWatch;
GstElement* src1;
GstElement* convert1;
GstElement* resample1;
GstElement* capsfilter1;
gulong idProbe;
GstElement* opusenc1;
GstElement* rtpopuspay1;
GstElement* udpsink1;
guint pos;
};
static gboolean callbackBus(
GstBus* bus,
GstMessage* msg,
gpointer data) {
GMainLoop* loop = static_cast<GMainLoop*>(data);
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
g_print ("End-of-stream\n");
g_main_loop_quit (loop);
break;
case GST_MESSAGE_ERROR: {
gchar* debug = NULL;
GError* err = NULL;
gst_message_parse_error (msg, &err, &debug);
g_print ("Error: %s\n", err->message);
g_error_free (err);
if (debug != nullptr) {
g_print ("Debug details: %s\n", debug);
g_free (debug);
}
g_main_loop_quit (loop);
break;
}
default:
break;
}
return TRUE;
}
void generateSin(
Storage* storage,
int16_t* buf,
int frequency,
guint sampleRate,
guint numChannels,
guint numFrames) {
float valuePi = std::numbers::pi_v<float>;
// Amplitude
int A = 16383; // 32767;
// Frequency
float freq = 1.0f * frequency;
float rate = 1.0f * sampleRate;
float delta = 2 * valuePi / (rate / freq);
for(guint i = 0; i < numFrames; i++) {
float angle = storage->pos * delta;
int16_t v = static_cast<int16_t>(A * std::sin(angle));
for(guint ch = 0; ch < numChannels; ch++) {
*(buf + i * numChannels + ch) = v;
}
storage->pos++;
storage->pos %= 48000;
}
}
GstPadProbeReturn probe(
GstPad* pad,
GstPadProbeInfo* info,
gpointer user_data) {
Storage* storage = static_cast<Storage*>(user_data);
GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
GstBuffer* bufferOut = gst_buffer_copy(buffer);
if (bufferOut == NULL) {
return GST_PAD_PROBE_OK;
}
GST_PAD_PROBE_INFO_DATA(info) = bufferOut;
gst_buffer_unref(buffer);
guint64 framesCountFromOffsets = 0;
if(GST_BUFFER_OFFSET_IS_VALID(bufferOut)) {
guint64 offset = GST_BUFFER_OFFSET(bufferOut);
if(GST_BUFFER_OFFSET_END_IS_VALID(bufferOut)) {
guint64 offsetEnd = GST_BUFFER_OFFSET_END(bufferOut);
framesCountFromOffsets = offsetEnd - offset;
}
}
GstMapInfo mapInfoOut;
if (gst_buffer_map (bufferOut, &mapInfoOut, GST_MAP_WRITE)) {
// Check that we are use correct buffer parameters
printf("Data size = %d byte(s), frames count = %" G_GUINT64_FORMAT "\n",
mapInfoOut.size,
framesCountFromOffsets);
auto audioBufferPtr = reinterpret_cast<int16_t*>(mapInfoOut.data);
generateSin(storage, audioBufferPtr, 250, 48000, 1, mapInfoOut.size / 2);
gst_buffer_unmap (bufferOut, &mapInfoOut);
} else {
g_print("Unable to write to the output buffer!\n");
}
return GST_PAD_PROBE_OK;
}
int pipelineCreate(Storage* storage,
std::string const& outputHost,
uint16_t outputPort,
int argc,
char* argv[]) {
gst_init (&argc, &argv);
storage->loop = g_main_loop_new(NULL, FALSE);
storage->pipeline = gst_pipeline_new("pipeline_test");
// Watch for messages on the pipeline's bus,
// Note: it will work like this only when a GLib main loop is running.
storage->bus = gst_pipeline_get_bus(GST_PIPELINE (storage->pipeline));
storage->idWatch = gst_bus_add_watch(storage->bus, callbackBus,
storage->loop);
storage->src1 = gst_element_factory_make("audiotestsrc", "audiotestsrc1");
storage->convert1 = gst_element_factory_make ("audioconvert",
"audioconvert1");
storage->resample1 = gst_element_factory_make ("audioresample",
"audioresample1");
storage->capsfilter1 = gst_element_factory_make("capsfilter", "capsfilter1");
GstCaps* caps = gst_caps_new_simple(
"audio/x-raw",
"format", G_TYPE_STRING, "S16LE",
"rate", G_TYPE_INT, 48000,
NULL);
g_object_set(storage->capsfilter1, "caps", caps, NULL);
gst_caps_unref(caps);
auto padSrcOfCapsFilter1 =
gst_element_get_static_pad(storage->capsfilter1, "src");
if(padSrcOfCapsFilter1 == nullptr) {
g_print("Failed to get an 'src' pad of the capsfilter1 element!\n");
return -1;
}
storage->idProbe = gst_pad_add_probe(
padSrcOfCapsFilter1,
GST_PAD_PROBE_TYPE_BUFFER,
static_cast<GstPadProbeCallback>(probe),
static_cast<gpointer>(storage),
NULL);
gst_object_unref(padSrcOfCapsFilter1);
storage->opusenc1 = gst_element_factory_make("opusenc", "opusenc1");
storage->rtpopuspay1 = gst_element_factory_make("rtpopuspay", "rtpopuspay1");
g_object_set(storage->rtpopuspay1,
"pt", 96,
NULL);
storage->udpsink1 = gst_element_factory_make("udpsink", "udpsink1");
std::string addr = outputHost + ":" + std::to_string(outputPort);
g_object_set(storage->udpsink1,
"clients", addr.c_str(),
"async", false,
NULL);
gst_bin_add_many(GST_BIN(storage->pipeline),
storage->src1,
storage->convert1,
storage->resample1,
storage->capsfilter1,
storage->opusenc1,
storage->rtpopuspay1,
storage->udpsink1,
NULL);
// Link everything together
if (!gst_element_link_many(
storage->src1,
storage->convert1,
storage->resample1,
storage->capsfilter1,
storage->opusenc1,
storage->rtpopuspay1,
storage->udpsink1,
NULL)) {
g_print("Failed to link one or more elements!\n");
return -1;
}
return 0;
}
int pipelinePlay(Storage& storage) {
GstStateChangeReturn ret = gst_element_set_state(storage.pipeline,
GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
g_print("Failed to start up pipeline!\n");
// Check if there is an error message with details on the bus
GstMessage* msg = gst_bus_poll(storage.bus, GST_MESSAGE_ERROR, 0);
if (msg) {
GError* err = NULL;
gst_message_parse_error(msg, &err, NULL);
g_print("ERROR: %s\n", err->message);
g_error_free(err);
gst_message_unref(msg);
}
return -1;
}
g_print("Playing...\n");
g_main_loop_run(storage.loop);
return 0;
}
int pipelineDestroy(Storage& storage) {
// Cleanup all
gst_element_set_state(storage.pipeline, GST_STATE_NULL);
gst_object_unref(storage.bus);
gst_object_unref(storage.pipeline);
g_source_remove(storage.idWatch);
g_main_loop_unref(storage.loop);
return 0;
}
int main(int argc, char* argv[]) {
Storage storage;
storage.pos = 0;
std::string outputHost = "host.docker.internal";
uint16_t outputPort = 50111;
int res = pipelineCreate(&storage, outputHost, outputPort, argc, argv);
if(res != 0) {
return res;
}
res = pipelinePlay(storage);
if(res != 0) {
return res;
}
res = pipelineDestroy(storage);
if(res != 0) {
return res;
}
return res;
}
More information about the gstreamer-devel
mailing list