Buffer-type probe at 'src' pad of GstAudioAggregator

Sergei Hrushev hrushev at gmail.com
Thu Mar 9 11:46:05 UTC 2023


Hello Mr. Kesti!

> Then it is something to do with the probe callback function. If you can post your your probe function may be I can help here/

I prepared the complete example of a small app which modifies audio
frames directly in the BUFFER probe on the fly.

In this app I'm adding probe to the CapsFilter 'src' pad - and the app
works as expected,
it properly overwrites the default signal with sine wave of different frequency.
(maybe there's a better way to determine amount of frames in the
current buffer?).

Also in my real app when I'm adding similar probe for example to the
Tee 'src' pad - it also works.
But I need to use it on AudioAggregator 'src' pad - and there the
produced sound is "jammed".

As in my case the entire audio pipeline works in Docker - I use
streaming to get the audio on host,
but it's easy to change the code to force it to play instead.
To play from streaming I'm using this:

gst-launch-1.0 -v udpsrc port=50111 ! \
  application/x-rtp,media=audio,payload=96,encoding-name=OPUS,clock-rate=48000
! \
  rtpssrcdemux ! rtpopusdepay ! opusdec ! audioconvert ! audioresample
! queue ! autoaudiosink

Best regards, Sergei Hrushev.

--------  Code example -------


// Most of the boilerplate code is just a variation of code took from here:
//   https://gstreamer.freedesktop.org/documentation/plugin-development/basics/testapp.html?gi-language=c

#include <gst/gst.h>

#include <cmath>
#include <string>

struct Storage {
  GMainLoop* loop;
  GstElement* pipeline;
  GstBus* bus;
  guint idWatch;
  GstElement* src1;
  GstElement* convert1;
  GstElement* resample1;
  GstElement* capsfilter1;
  gulong idProbe;
  GstElement* opusenc1;
  GstElement* rtpopuspay1;
  GstElement* udpsink1;

  guint pos;
};

static gboolean callbackBus(
  GstBus* bus,
  GstMessage* msg,
  gpointer data) {

  GMainLoop* loop = static_cast<GMainLoop*>(data);

  switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_EOS:
      g_print ("End-of-stream\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_ERROR: {
      gchar* debug = NULL;
      GError* err = NULL;

      gst_message_parse_error (msg, &err, &debug);

      g_print ("Error: %s\n", err->message);
      g_error_free (err);

      if (debug != nullptr) {
        g_print ("Debug details: %s\n", debug);
        g_free (debug);
      }

      g_main_loop_quit (loop);
      break;
    }
    default:
      break;
  }

  return TRUE;
}

void generateSin(
  Storage* storage,
  int16_t* buf,
  int frequency,
  guint sampleRate,
  guint numChannels,
  guint numFrames) {

  float valuePi = std::numbers::pi_v<float>;

  // Amplitude
  int A = 16383; // 32767;
  // Frequency
  float freq = 1.0f * frequency;

  float rate = 1.0f * sampleRate;
  float delta = 2 * valuePi / (rate / freq);

  for(guint i = 0; i < numFrames; i++) {
    float angle = storage->pos * delta;
    int16_t v = static_cast<int16_t>(A * std::sin(angle));

    for(guint ch = 0; ch < numChannels; ch++) {
      *(buf + i * numChannels + ch) = v;
    }

    storage->pos++;
    storage->pos %= 48000;
  }
}

GstPadProbeReturn probe(
  GstPad* pad,
  GstPadProbeInfo* info,
  gpointer user_data) {

  Storage* storage = static_cast<Storage*>(user_data);

  GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);

  GstBuffer* bufferOut = gst_buffer_copy(buffer);
  if (bufferOut == NULL) {
    return GST_PAD_PROBE_OK;
  }
  GST_PAD_PROBE_INFO_DATA(info) = bufferOut;
  gst_buffer_unref(buffer);

  guint64 framesCountFromOffsets = 0;
  if(GST_BUFFER_OFFSET_IS_VALID(bufferOut)) {
    guint64 offset = GST_BUFFER_OFFSET(bufferOut);

    if(GST_BUFFER_OFFSET_END_IS_VALID(bufferOut)) {
      guint64 offsetEnd = GST_BUFFER_OFFSET_END(bufferOut);
      framesCountFromOffsets = offsetEnd - offset;
    }
  }

  GstMapInfo mapInfoOut;
  if (gst_buffer_map (bufferOut, &mapInfoOut, GST_MAP_WRITE)) {

    // Check that we are use correct buffer parameters
    printf("Data size = %d byte(s), frames count = %" G_GUINT64_FORMAT "\n",
      mapInfoOut.size,
      framesCountFromOffsets);

    auto audioBufferPtr = reinterpret_cast<int16_t*>(mapInfoOut.data);
    generateSin(storage, audioBufferPtr, 250, 48000, 1, mapInfoOut.size / 2);

    gst_buffer_unmap (bufferOut, &mapInfoOut);
  } else {
    g_print("Unable to write to the output buffer!\n");
  }

  return GST_PAD_PROBE_OK;
}

int pipelineCreate(Storage* storage,
  std::string const& outputHost,
  uint16_t outputPort,
  int argc,
  char* argv[]) {

  gst_init (&argc, &argv);

  storage->loop = g_main_loop_new(NULL, FALSE);

  storage->pipeline = gst_pipeline_new("pipeline_test");

  // Watch for messages on the pipeline's bus,
  //   Note: it will work like this only when a GLib main loop is running.
  storage->bus = gst_pipeline_get_bus(GST_PIPELINE (storage->pipeline));
  storage->idWatch = gst_bus_add_watch(storage->bus, callbackBus,
storage->loop);

  storage->src1 = gst_element_factory_make("audiotestsrc", "audiotestsrc1");
  storage->convert1 = gst_element_factory_make ("audioconvert",
"audioconvert1");
  storage->resample1 = gst_element_factory_make ("audioresample",
"audioresample1");

  storage->capsfilter1 = gst_element_factory_make("capsfilter", "capsfilter1");
  GstCaps* caps = gst_caps_new_simple(
    "audio/x-raw",
    "format", G_TYPE_STRING, "S16LE",
    "rate", G_TYPE_INT, 48000,
    NULL);
  g_object_set(storage->capsfilter1, "caps", caps, NULL);
  gst_caps_unref(caps);

  auto padSrcOfCapsFilter1 =
gst_element_get_static_pad(storage->capsfilter1, "src");
  if(padSrcOfCapsFilter1 == nullptr) {
    g_print("Failed to get an 'src' pad of the capsfilter1 element!\n");
    return -1;
  }
  storage->idProbe = gst_pad_add_probe(
    padSrcOfCapsFilter1,
    GST_PAD_PROBE_TYPE_BUFFER,
    static_cast<GstPadProbeCallback>(probe),
    static_cast<gpointer>(storage),
    NULL);
  gst_object_unref(padSrcOfCapsFilter1);

  storage->opusenc1 = gst_element_factory_make("opusenc", "opusenc1");

  storage->rtpopuspay1 = gst_element_factory_make("rtpopuspay", "rtpopuspay1");
  g_object_set(storage->rtpopuspay1,
    "pt", 96,
    NULL);

  storage->udpsink1 = gst_element_factory_make("udpsink", "udpsink1");
  std::string addr = outputHost + ":" + std::to_string(outputPort);
  g_object_set(storage->udpsink1,
    "clients", addr.c_str(),
    "async", false,
    NULL);

  gst_bin_add_many(GST_BIN(storage->pipeline),
    storage->src1,
    storage->convert1,
    storage->resample1,
    storage->capsfilter1,
    storage->opusenc1,
    storage->rtpopuspay1,
    storage->udpsink1,
    NULL);

  // Link everything together
  if (!gst_element_link_many(
    storage->src1,
    storage->convert1,
    storage->resample1,
    storage->capsfilter1,
    storage->opusenc1,
    storage->rtpopuspay1,
    storage->udpsink1,
    NULL)) {

    g_print("Failed to link one or more elements!\n");
    return -1;
  }

  return 0;
}

int pipelinePlay(Storage& storage) {
  GstStateChangeReturn ret = gst_element_set_state(storage.pipeline,
GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE) {
    g_print("Failed to start up pipeline!\n");

    // Check if there is an error message with details on the bus
    GstMessage* msg = gst_bus_poll(storage.bus, GST_MESSAGE_ERROR, 0);
    if (msg) {
      GError* err = NULL;

      gst_message_parse_error(msg, &err, NULL);
      g_print("ERROR: %s\n", err->message);
      g_error_free(err);
      gst_message_unref(msg);
    }
    return -1;
  }

  g_print("Playing...\n");

  g_main_loop_run(storage.loop);

  return 0;
}

int pipelineDestroy(Storage& storage) {
  // Cleanup all
  gst_element_set_state(storage.pipeline, GST_STATE_NULL);
  gst_object_unref(storage.bus);
  gst_object_unref(storage.pipeline);
  g_source_remove(storage.idWatch);

  g_main_loop_unref(storage.loop);

  return 0;
}

int main(int argc, char* argv[]) {
  Storage storage;
  storage.pos = 0;

  std::string outputHost = "host.docker.internal";
  uint16_t outputPort = 50111;

  int res = pipelineCreate(&storage, outputHost, outputPort, argc, argv);
  if(res != 0) {
    return res;
  }

  res = pipelinePlay(storage);
  if(res != 0) {
    return res;
  }

  res = pipelineDestroy(storage);
  if(res != 0) {
    return res;
  }

  return res;
}


More information about the gstreamer-devel mailing list