Buffer-type probe at 'src' pad of GstAudioAggregator

vinod kesti vinodkesti at yahoo.com
Fri Mar 10 17:46:17 UTC 2023


GstBuffer *bufferOut = gst_buffer_copy(buffer); 
you are making copy of the buffer adding the signal and the modifcation never gone to downstream.
Instead, map the buffer directly.


Sent from Yahoo Mail. Get the app 

    On Thursday, 9 March, 2023 at 05:46:34 am GMT-6, Sergei Hrushev via gstreamer-devel <gstreamer-devel at lists.freedesktop.org> wrote:  
 
 Hello Mr. Kesti!

> Then it is something to do with the probe callback function. If you can post your your probe function may be I can help here/

I prepared the complete example of a small app which modifies audio
frames directly in the BUFFER probe on the fly.

In this app I'm adding probe to the CapsFilter 'src' pad - and the app
works as expected,
it properly overwrites the default signal with sine wave of different frequency.
(maybe there's a better way to determine amount of frames in the
current buffer?).

Also in my real app when I'm adding similar probe for example to the
Tee 'src' pad - it also works.
But I need to use it on AudioAggregator 'src' pad - and there the
produced sound is "jammed".

As in my case the entire audio pipeline works in Docker - I use
streaming to get the audio on host,
but it's easy to change the code to force it to play instead.
To play from streaming I'm using this:

gst-launch-1.0 -v udpsrc port=50111 ! \
  application/x-rtp,media=audio,payload=96,encoding-name=OPUS,clock-rate=48000
! \
  rtpssrcdemux ! rtpopusdepay ! opusdec ! audioconvert ! audioresample
! queue ! autoaudiosink

Best regards, Sergei Hrushev.

--------  Code example -------


// Most of the boilerplate code is just a variation of code took from here:
//  https://gstreamer.freedesktop.org/documentation/plugin-development/basics/testapp.html?gi-language=c

#include <gst/gst.h>

#include <cmath>
#include <string>

struct Storage {
  GMainLoop* loop;
  GstElement* pipeline;
  GstBus* bus;
  guint idWatch;
  GstElement* src1;
  GstElement* convert1;
  GstElement* resample1;
  GstElement* capsfilter1;
  gulong idProbe;
  GstElement* opusenc1;
  GstElement* rtpopuspay1;
  GstElement* udpsink1;

  guint pos;
};

static gboolean callbackBus(
  GstBus* bus,
  GstMessage* msg,
  gpointer data) {

  GMainLoop* loop = static_cast<GMainLoop*>(data);

  switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_EOS:
      g_print ("End-of-stream\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_ERROR: {
      gchar* debug = NULL;
      GError* err = NULL;

      gst_message_parse_error (msg, &err, &debug);

      g_print ("Error: %s\n", err->message);
      g_error_free (err);

      if (debug != nullptr) {
        g_print ("Debug details: %s\n", debug);
        g_free (debug);
      }

      g_main_loop_quit (loop);
      break;
    }
    default:
      break;
  }

  return TRUE;
}

void generateSin(
  Storage* storage,
  int16_t* buf,
  int frequency,
  guint sampleRate,
  guint numChannels,
  guint numFrames) {

  float valuePi = std::numbers::pi_v<float>;

  // Amplitude
  int A = 16383; // 32767;
  // Frequency
  float freq = 1.0f * frequency;

  float rate = 1.0f * sampleRate;
  float delta = 2 * valuePi / (rate / freq);

  for(guint i = 0; i < numFrames; i++) {
    float angle = storage->pos * delta;
    int16_t v = static_cast<int16_t>(A * std::sin(angle));

    for(guint ch = 0; ch < numChannels; ch++) {
      *(buf + i * numChannels + ch) = v;
    }

    storage->pos++;
    storage->pos %= 48000;
  }
}

GstPadProbeReturn probe(
  GstPad* pad,
  GstPadProbeInfo* info,
  gpointer user_data) {

  Storage* storage = static_cast<Storage*>(user_data);

  GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);

  GstBuffer* bufferOut = gst_buffer_copy(buffer);
  if (bufferOut == NULL) {
    return GST_PAD_PROBE_OK;
  }
  GST_PAD_PROBE_INFO_DATA(info) = bufferOut;
  gst_buffer_unref(buffer);

  guint64 framesCountFromOffsets = 0;
  if(GST_BUFFER_OFFSET_IS_VALID(bufferOut)) {
    guint64 offset = GST_BUFFER_OFFSET(bufferOut);

    if(GST_BUFFER_OFFSET_END_IS_VALID(bufferOut)) {
      guint64 offsetEnd = GST_BUFFER_OFFSET_END(bufferOut);
      framesCountFromOffsets = offsetEnd - offset;
    }
  }

  GstMapInfo mapInfoOut;
  if (gst_buffer_map (bufferOut, &mapInfoOut, GST_MAP_WRITE)) {

    // Check that we are use correct buffer parameters
    printf("Data size = %d byte(s), frames count = %" G_GUINT64_FORMAT "\n",
      mapInfoOut.size,
      framesCountFromOffsets);

    auto audioBufferPtr = reinterpret_cast<int16_t*>(mapInfoOut.data);
    generateSin(storage, audioBufferPtr, 250, 48000, 1, mapInfoOut.size / 2);

    gst_buffer_unmap (bufferOut, &mapInfoOut);
  } else {
    g_print("Unable to write to the output buffer!\n");
  }

  return GST_PAD_PROBE_OK;
}

int pipelineCreate(Storage* storage,
  std::string const& outputHost,
  uint16_t outputPort,
  int argc,
  char* argv[]) {

  gst_init (&argc, &argv);

  storage->loop = g_main_loop_new(NULL, FALSE);

  storage->pipeline = gst_pipeline_new("pipeline_test");

  // Watch for messages on the pipeline's bus,
  //  Note: it will work like this only when a GLib main loop is running.
  storage->bus = gst_pipeline_get_bus(GST_PIPELINE (storage->pipeline));
  storage->idWatch = gst_bus_add_watch(storage->bus, callbackBus,
storage->loop);

  storage->src1 = gst_element_factory_make("audiotestsrc", "audiotestsrc1");
  storage->convert1 = gst_element_factory_make ("audioconvert",
"audioconvert1");
  storage->resample1 = gst_element_factory_make ("audioresample",
"audioresample1");

  storage->capsfilter1 = gst_element_factory_make("capsfilter", "capsfilter1");
  GstCaps* caps = gst_caps_new_simple(
    "audio/x-raw",
    "format", G_TYPE_STRING, "S16LE",
    "rate", G_TYPE_INT, 48000,
    NULL);
  g_object_set(storage->capsfilter1, "caps", caps, NULL);
  gst_caps_unref(caps);

  auto padSrcOfCapsFilter1 =
gst_element_get_static_pad(storage->capsfilter1, "src");
  if(padSrcOfCapsFilter1 == nullptr) {
    g_print("Failed to get an 'src' pad of the capsfilter1 element!\n");
    return -1;
  }
  storage->idProbe = gst_pad_add_probe(
    padSrcOfCapsFilter1,
    GST_PAD_PROBE_TYPE_BUFFER,
    static_cast<GstPadProbeCallback>(probe),
    static_cast<gpointer>(storage),
    NULL);
  gst_object_unref(padSrcOfCapsFilter1);

  storage->opusenc1 = gst_element_factory_make("opusenc", "opusenc1");

  storage->rtpopuspay1 = gst_element_factory_make("rtpopuspay", "rtpopuspay1");
  g_object_set(storage->rtpopuspay1,
    "pt", 96,
    NULL);

  storage->udpsink1 = gst_element_factory_make("udpsink", "udpsink1");
  std::string addr = outputHost + ":" + std::to_string(outputPort);
  g_object_set(storage->udpsink1,
    "clients", addr.c_str(),
    "async", false,
    NULL);

  gst_bin_add_many(GST_BIN(storage->pipeline),
    storage->src1,
    storage->convert1,
    storage->resample1,
    storage->capsfilter1,
    storage->opusenc1,
    storage->rtpopuspay1,
    storage->udpsink1,
    NULL);

  // Link everything together
  if (!gst_element_link_many(
    storage->src1,
    storage->convert1,
    storage->resample1,
    storage->capsfilter1,
    storage->opusenc1,
    storage->rtpopuspay1,
    storage->udpsink1,
    NULL)) {

    g_print("Failed to link one or more elements!\n");
    return -1;
  }

  return 0;
}

int pipelinePlay(Storage& storage) {
  GstStateChangeReturn ret = gst_element_set_state(storage.pipeline,
GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE) {
    g_print("Failed to start up pipeline!\n");

    // Check if there is an error message with details on the bus
    GstMessage* msg = gst_bus_poll(storage.bus, GST_MESSAGE_ERROR, 0);
    if (msg) {
      GError* err = NULL;

      gst_message_parse_error(msg, &err, NULL);
      g_print("ERROR: %s\n", err->message);
      g_error_free(err);
      gst_message_unref(msg);
    }
    return -1;
  }

  g_print("Playing...\n");

  g_main_loop_run(storage.loop);

  return 0;
}

int pipelineDestroy(Storage& storage) {
  // Cleanup all
  gst_element_set_state(storage.pipeline, GST_STATE_NULL);
  gst_object_unref(storage.bus);
  gst_object_unref(storage.pipeline);
  g_source_remove(storage.idWatch);

  g_main_loop_unref(storage.loop);

  return 0;
}

int main(int argc, char* argv[]) {
  Storage storage;
  storage.pos = 0;

  std::string outputHost = "host.docker.internal";
  uint16_t outputPort = 50111;

  int res = pipelineCreate(&storage, outputHost, outputPort, argc, argv);
  if(res != 0) {
    return res;
  }

  res = pipelinePlay(storage);
  if(res != 0) {
    return res;
  }

  res = pipelineDestroy(storage);
  if(res != 0) {
    return res;
  }

  return res;
}
  
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20230310/918fc0e1/attachment-0001.htm>


More information about the gstreamer-devel mailing list