[pulseaudio-commits] [Git][pulseaudio/pulseaudio][master] 3 commits: bluetooth/gst: Use GStreamer synchronously within PA's IO thread

PulseAudio Marge Bot (@pulseaudio-merge-bot) gitlab at gitlab.freedesktop.org
Mon Feb 21 17:53:20 UTC 2022



PulseAudio Marge Bot pushed to branch master at PulseAudio / pulseaudio


Commits:
201dc654 by Marijn Suijten at 2022-02-21T12:31:32-05:00
bluetooth/gst: Use GStreamer synchronously within PA's IO thread

Handling multiple threads does not come without overhead, especially
when the end-goal is to ping-pong them making the whole system run
serially.  This patch rips out all that thread handling and instead
"chains" buffers to be encoded/decoded directly into the pipeline,
making them execute their work on the current thread.  The resulting
buffer can be pulled out from appsink immediately without require extra
locking and signalling.  While the overhead on modern systems is found
to be negligible or unnoticable, code complexity of such locking and
signalling systems is prevalent making it the main drive behind this
refactor.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/494>

- - - - -
5f37914e by Marijn Suijten at 2022-02-21T12:31:32-05:00
bluetooth/gst: Replace buffer accumulation in adapter with direct pull

Bluetooth codecs should always have fixed in/output and are hence able
to have their results directly read from the codec, instead of
accumulating in a buffer asynchronously that is subsequently only read
in the transcode callback.  The Bluetooth backends calling encode/decode
also expect these fixed buffer sizes.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/494>

- - - - -
5af2afba by Marijn Suijten at 2022-02-21T12:33:18-05:00
bluetooth/gst: Timestamp encoding buffers according to PA clock

Commit c6d6ca541 ("bluetooth/gst: Replace buffer accumulation in adapter
with direct pull") removed the `timestamp` parameter from GStreamer
transcoders due to being unused, but these should instead be propagated
to the GStreamer encoding buffers.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/494>

- - - - -


4 changed files:

- src/modules/bluetooth/a2dp-codec-aptx-gst.c
- src/modules/bluetooth/a2dp-codec-gst.c
- src/modules/bluetooth/a2dp-codec-gst.h
- src/modules/bluetooth/a2dp-codec-ldac-gst.c


Changes:

=====================================
src/modules/bluetooth/a2dp-codec-aptx-gst.c
=====================================
@@ -490,7 +490,7 @@ static size_t reduce_encoder_bitrate(void *codec_info, size_t write_link_mtu) {
 static size_t encode_buffer(void *codec_info, uint32_t timestamp, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
     size_t written;
 
-    written = gst_transcode_buffer(codec_info, input_buffer, input_size, output_buffer, output_size, processed);
+    written = gst_transcode_buffer(codec_info, timestamp, input_buffer, input_size, output_buffer, output_size, processed);
     if (PA_UNLIKELY(*processed == 0 || *processed != input_size))
         pa_log_error("aptX encoding error");
 
@@ -526,7 +526,7 @@ static size_t encode_buffer_hd(void *codec_info, uint32_t timestamp, const uint8
 static size_t decode_buffer(void *codec_info, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
     size_t written;
 
-    written = gst_transcode_buffer(codec_info, input_buffer, input_size, output_buffer, output_size, processed);
+    written = gst_transcode_buffer(codec_info, -1, input_buffer, input_size, output_buffer, output_size, processed);
 
     /* Due to aptX latency, aptx_decode starts filling output buffer after 90 input samples.
      * If input buffer contains less than 90 samples, aptx_decode returns zero (=no output)


=====================================
src/modules/bluetooth/a2dp-codec-gst.c
=====================================
@@ -28,6 +28,7 @@
 #include <pulsecore/once.h>
 #include <pulsecore/core-util.h>
 #include <pulse/sample.h>
+#include <pulse/timeval.h>
 #include <pulse/util.h>
 
 #include "a2dp-codecs.h"
@@ -39,83 +40,19 @@ static void app_sink_eos(GstAppSink *appsink, gpointer userdata) {
     pa_log_debug("Sink got EOS");
 }
 
-/* Called from the GStreamer streaming thread */
-static GstFlowReturn app_sink_new_sample(GstAppSink *appsink, gpointer userdata) {
-    struct gst_info *info = (struct gst_info *) userdata;
-    GstSample *sample = NULL;
-    GstBuffer *buf;
-
-    sample = gst_app_sink_pull_sample(GST_APP_SINK(info->app_sink));
-    if (!sample)
-        return GST_FLOW_OK;
-
-    buf = gst_sample_get_buffer(sample);
-    gst_buffer_ref(buf);
-    gst_adapter_push(info->sink_adapter, buf);
-    gst_sample_unref(sample);
-    pa_fdsem_post(info->sample_ready_fdsem);
-
-    return GST_FLOW_OK;
-}
-
 static void gst_deinit_common(struct gst_info *info) {
     if (!info)
         return;
-    if (info->sample_ready_fdsem)
-        pa_fdsem_free(info->sample_ready_fdsem);
-    if (info->app_src)
-        gst_object_unref(info->app_src);
     if (info->app_sink)
         gst_object_unref(info->app_sink);
-    if (info->sink_adapter)
-        g_object_unref(info->sink_adapter);
-    if (info->pipeline)
-        gst_object_unref(info->pipeline);
-}
-
-static GstBusSyncReply sync_bus_handler (GstBus *bus, GstMessage *message, struct gst_info *info) {
-    GstStreamStatusType type;
-    GstElement *owner;
-
-    switch (GST_MESSAGE_TYPE (message)) {
-        case GST_MESSAGE_STREAM_STATUS:
-
-            gst_message_parse_stream_status (message, &type, &owner);
-
-            switch (type) {
-            case GST_STREAM_STATUS_TYPE_ENTER:
-                pa_log_debug("GStreamer pipeline thread starting up");
-                if (info->core->realtime_scheduling)
-                    pa_thread_make_realtime(info->core->realtime_priority);
-                break;
-            case GST_STREAM_STATUS_TYPE_LEAVE:
-                pa_log_debug("GStreamer pipeline thread shutting down");
-                break;
-            default:
-                break;
-            }
-        break;
-        default:
-            break;
-    }
-
-    /* pass all messages on the async queue */
-    return GST_BUS_PASS;
+    if (info->bin)
+        gst_object_unref(info->bin);
 }
 
 bool gst_init_common(struct gst_info *info) {
-    GstElement *pipeline = NULL;
-    GstElement *appsrc = NULL, *appsink = NULL;
-    GstAdapter *adapter;
+    GstElement *bin = NULL;
+    GstElement *appsink = NULL;
     GstAppSinkCallbacks callbacks = { 0, };
-    GstBus *bus;
-
-    appsrc = gst_element_factory_make("appsrc", "app_source");
-    if (!appsrc) {
-        pa_log_error("Could not create appsrc element");
-        goto fail;
-    }
-    g_object_set(appsrc, "is-live", FALSE, "format", GST_FORMAT_TIME, "stream-type", 0, "max-bytes", 0, NULL);
 
     appsink = gst_element_factory_make("appsink", "app_sink");
     if (!appsink) {
@@ -125,71 +62,23 @@ bool gst_init_common(struct gst_info *info) {
     g_object_set(appsink, "sync", FALSE, "async", FALSE, "enable-last-sample", FALSE, NULL);
 
     callbacks.eos = app_sink_eos;
-    callbacks.new_sample = app_sink_new_sample;
     gst_app_sink_set_callbacks(GST_APP_SINK(appsink), &callbacks, info, NULL);
 
-    adapter = gst_adapter_new();
-    pa_assert(adapter);
+    bin = gst_bin_new(NULL);
+    pa_assert(bin);
 
-    pipeline = gst_pipeline_new(NULL);
-    pa_assert(pipeline);
-
-    bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
-    gst_bus_set_sync_handler (bus, (GstBusSyncHandler) sync_bus_handler, info, NULL);
-    gst_object_unref (bus);
-
-    info->app_src = appsrc;
     info->app_sink = appsink;
-    info->sink_adapter = adapter;
-    info->pipeline = pipeline;
-    info->sample_ready_fdsem = pa_fdsem_new();
+    info->bin = bin;
 
     return true;
 
 fail:
-    if (appsrc)
-        gst_object_unref(appsrc);
     if (appsink)
         gst_object_unref(appsink);
 
     return false;
 }
 
-/*
- * The idea of using buffer probes is as follows. We set a buffer probe on the
- * encoder sink pad. In the buffer probe, we set an idle probe on the upstream
- * source pad. In encode_buffer, we wait on the fdsem. The fdsem gets posted
- * when either new_sample or idle probe gets called. We do this, to make the
- * appsink behave synchronously.
- *
- * For buffer probes, see
- * https://gstreamer.freedesktop.org/documentation/additional/design/probes.html?gi-language=c
- */
-static GstPadProbeReturn gst_sink_buffer_idle_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
-{
-    struct gst_info *info = (struct gst_info *)userdata;
-
-    pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_IDLE);
-
-    pa_fdsem_post(info->sample_ready_fdsem);
-
-    return GST_PAD_PROBE_REMOVE;
-}
-
-static GstPadProbeReturn gst_sink_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
-{
-    struct gst_info *info = (struct gst_info *)userdata;
-    GstPad *peer_pad;
-
-    pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_BUFFER);
-
-    peer_pad = gst_pad_get_peer(pad);
-    gst_pad_add_probe(peer_pad, GST_PAD_PROBE_TYPE_IDLE, gst_sink_buffer_idle_probe, info, NULL);
-    gst_object_unref(peer_pad);
-
-    return GST_PAD_PROBE_OK;
-}
-
 static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) {
     gchar *sample_format;
     GstCaps *caps;
@@ -240,6 +129,10 @@ static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) {
 bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transcoder) {
     GstPad *pad;
     GstCaps *caps;
+    GstEvent *event;
+    GstSegment segment;
+    GstEvent *stream_start;
+    guint group_id;
 
     pa_assert(transcoder);
 
@@ -248,30 +141,49 @@ bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transc
     if (!gst_init_common(info))
         goto common_fail;
 
-    caps = gst_create_caps_from_sample_spec(info->ss);
-    if (for_encoding)
-        g_object_set(info->app_src, "caps", caps, NULL);
-    else
-        g_object_set(info->app_sink, "caps", caps, NULL);
-    gst_caps_unref(caps);
-
-
-    gst_bin_add_many(GST_BIN(info->pipeline), info->app_src, transcoder, info->app_sink, NULL);
+    gst_bin_add_many(GST_BIN(info->bin), transcoder, info->app_sink, NULL);
 
-    if (!gst_element_link_many(info->app_src, transcoder, info->app_sink, NULL)) {
+    if (!gst_element_link_many(transcoder, info->app_sink, NULL)) {
         pa_log_error("Failed to link codec elements into pipeline");
         goto pipeline_fail;
     }
 
-    if (gst_element_set_state(info->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+    pad = gst_element_get_static_pad(transcoder, "sink");
+    pa_assert_se(gst_element_add_pad(info->bin, gst_ghost_pad_new("sink", pad)));
+    /**
+     * Only the sink pad is needed to push buffers.  Cache it since
+     * gst_element_get_static_pad is relatively expensive and verbose
+     * on higher log levels.
+     */
+    info->pad_sink = pad;
+
+    if (gst_element_set_state(info->bin, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
         pa_log_error("Could not start pipeline");
         goto pipeline_fail;
     }
 
-    /* See the comment on buffer probe functions */
-    pad = gst_element_get_static_pad(transcoder, "sink");
-    gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, gst_sink_buffer_probe, info, NULL);
-    gst_object_unref(pad);
+    /* First, send stream-start sticky event */
+    group_id = gst_util_group_id_next();
+    stream_start = gst_event_new_stream_start("gst-codec-pa");
+    gst_event_set_group_id(stream_start, group_id);
+    gst_pad_send_event(info->pad_sink, stream_start);
+
+    /* Retrieve the pad that handles the PCM format between PA and GStreamer */
+    if (for_encoding)
+        pad = gst_element_get_static_pad(transcoder, "sink");
+    else
+        pad = gst_element_get_static_pad(transcoder, "src");
+
+    /* Second, send caps sticky event */
+    caps = gst_create_caps_from_sample_spec(info->ss);
+    pa_assert_se(gst_pad_set_caps(pad, caps));
+    gst_caps_unref(caps);
+    gst_object_unref(GST_OBJECT(pad));
+
+    /* Third, send segment sticky event */
+    gst_segment_init(&segment, GST_FORMAT_TIME);
+    event = gst_event_new_segment(&segment);
+    gst_pad_send_event(info->pad_sink, event);
 
     pa_log_info("GStreamer pipeline initialisation succeeded");
 
@@ -295,40 +207,58 @@ common_fail:
     return false;
 }
 
-size_t gst_transcode_buffer(void *codec_info, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
+size_t gst_transcode_buffer(void *codec_info, uint32_t timestamp, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
     struct gst_info *info = (struct gst_info *) codec_info;
-    gsize available, transcoded;
+    gsize transcoded;
     GstBuffer *in_buf;
-    GstMapInfo map_info;
     GstFlowReturn ret;
     size_t written = 0;
+    GstSample *sample;
+
+    pa_assert(info->pad_sink);
 
-    in_buf = gst_buffer_new_allocate(NULL, input_size, NULL);
+    in_buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY,
+                (gpointer)input_buffer, input_size, 0, input_size, NULL, NULL);
     pa_assert(in_buf);
+    /* Acquire an extra reference to validate refcount afterwards */
+    gst_mini_object_ref(GST_MINI_OBJECT_CAST(in_buf));
+    pa_assert(GST_MINI_OBJECT_REFCOUNT_VALUE(in_buf) == 2);
+
+    if (timestamp == -1)
+        GST_BUFFER_TIMESTAMP(in_buf) = GST_CLOCK_TIME_NONE;
+    else {
+        // Timestamp is monotonically increasing with samplerate/packets-per-second;
+        // convert it to a timestamp in nanoseconds:
+        GST_BUFFER_TIMESTAMP(in_buf) = timestamp * PA_USEC_PER_SEC / info->ss->rate;
+    }
 
-    pa_assert_se(gst_buffer_map(in_buf, &map_info, GST_MAP_WRITE));
-    memcpy(map_info.data, input_buffer, input_size);
-    gst_buffer_unmap(in_buf, &map_info);
+    ret = gst_pad_chain(info->pad_sink, in_buf);
+    /**
+     * Ensure we're the only one holding a reference to this buffer after gst_pad_chain,
+     * which internally holds a pointer reference to input_buffer.  The caller provides
+     * no guarantee to the validity of this pointer after returning from this function.
+     */
+    pa_assert(GST_MINI_OBJECT_REFCOUNT_VALUE(in_buf) == 1);
+    gst_mini_object_unref(GST_MINI_OBJECT_CAST(in_buf));
 
-    ret = gst_app_src_push_buffer(GST_APP_SRC(info->app_src), in_buf);
     if (ret != GST_FLOW_OK) {
         pa_log_error("failed to push buffer for transcoding %d", ret);
         goto fail;
     }
 
-    pa_fdsem_wait(info->sample_ready_fdsem);
-
-    available = gst_adapter_available(info->sink_adapter);
-
-    if (available) {
-        transcoded = PA_MIN(available, output_size);
-
-        gst_adapter_copy(info->sink_adapter, output_buffer, 0, transcoded);
-        gst_adapter_flush(info->sink_adapter, transcoded);
+    while ((sample = gst_app_sink_try_pull_sample(GST_APP_SINK(info->app_sink), 0))) {
+        in_buf = gst_sample_get_buffer(sample);
 
+        transcoded = gst_buffer_get_size(in_buf);
         written += transcoded;
-    } else
-        pa_log_debug("No transcoded data available in adapter");
+        pa_assert(written <= output_size);
+
+        GstMapInfo map_info;
+        pa_assert_se(gst_buffer_map(in_buf, &map_info, GST_MAP_READ));
+        memcpy(output_buffer, map_info.data, transcoded);
+        gst_buffer_unmap(in_buf, &map_info);
+        gst_sample_unref(sample);
+    }
 
     *processed = input_size;
 
@@ -343,17 +273,13 @@ fail:
 void gst_codec_deinit(void *codec_info) {
     struct gst_info *info = (struct gst_info *) codec_info;
 
-    if (info->sample_ready_fdsem)
-        pa_fdsem_free(info->sample_ready_fdsem);
-
-
-    if (info->pipeline) {
-        gst_element_set_state(info->pipeline, GST_STATE_NULL);
-        gst_object_unref(info->pipeline);
+    if (info->bin) {
+        gst_element_set_state(info->bin, GST_STATE_NULL);
+        gst_object_unref(info->bin);
     }
 
-    if (info->sink_adapter)
-        g_object_unref(info->sink_adapter);
+    if (info->pad_sink)
+        gst_object_unref(GST_OBJECT(info->pad_sink));
 
     pa_xfree(info);
 }


=====================================
src/modules/bluetooth/a2dp-codec-gst.h
=====================================
@@ -43,15 +43,15 @@ struct gst_info {
         const a2dp_ldac_t *ldac_config;
     } a2dp_codec_t;
 
-    GstElement *app_src, *app_sink;
-    GstElement *pipeline;
-    GstAdapter *sink_adapter;
-
-    pa_fdsem *sample_ready_fdsem;
+    /* The appsink element that accumulates encoded/decoded buffers */
+    GstElement *app_sink;
+    GstElement *bin;
+    /* The sink pad to push to-be-encoded/decoded buffers into */
+    GstPad *pad_sink;
 
     uint16_t seq_num;
 };
 
 bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transcoder);
-size_t gst_transcode_buffer(void *codec_info, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed);
+size_t gst_transcode_buffer(void *codec_info, uint32_t timestamp, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed);
 void gst_codec_deinit(void *codec_info);


=====================================
src/modules/bluetooth/a2dp-codec-ldac-gst.c
=====================================
@@ -411,7 +411,7 @@ static size_t encode_buffer(void *codec_info, uint32_t timestamp, const uint8_t
         return 0;
     }
 
-    written = gst_transcode_buffer(codec_info, input_buffer, input_size, output_buffer + sizeof(*header) + sizeof(*payload), output_size - sizeof(*header) - sizeof(*payload), processed);
+    written = gst_transcode_buffer(codec_info, timestamp, input_buffer, input_size, output_buffer + sizeof(*header) + sizeof(*payload), output_size - sizeof(*header) - sizeof(*payload), processed);
     if (PA_UNLIKELY(*processed != input_size))
         pa_log_error("LDAC encoding error");
 



View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/compare/62deab21a3b0e0043b17f8a217ae1a2f23da9afb...5af2afba85b8cc33b12189a8b0d40cfc4477ed89

-- 
View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/compare/62deab21a3b0e0043b17f8a217ae1a2f23da9afb...5af2afba85b8cc33b12189a8b0d40cfc4477ed89
You're receiving this email because of your account on gitlab.freedesktop.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/pulseaudio-commits/attachments/20220221/1b90220b/attachment-0001.htm>


More information about the pulseaudio-commits mailing list