[pulseaudio-discuss] [PATCH 8/8] rtp: Add a GStreamer-based RTP implementation

Arun Raghavan arun at accosted.net
Thu May 12 13:49:05 UTC 2016


On 21 April 2016 at 21:33, Tanu Kaskinen <tanuk at iki.fi> wrote:
> On Mon, 2016-02-29 at 15:46 +0530, arun at accosted.net wrote:
>>  module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
>>  module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
>>  module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
>> -module_rtp_send_la_CFLAGS = $(AM_CFLAGS)
>> +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>>
>>  module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
>>  module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
>>  module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
>> -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
>> +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>>
>>  # JACK
>>
>> @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS)
>>  module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c
>>  module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
>>  module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la
>> -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp
>> +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS)
>
> Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only
> librtp contains GStreamer code.
>
>> +typedef struct pa_rtp_context {
>
> The typedef is already done in rtp.h, no need to do it again.
>
>> +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
>> +    pa_rtp_context *c = NULL;
>> +    GError *error = NULL;
>> +
>> +    pa_assert(fd >= 0);
>> +
>> +    c = pa_xnew0(pa_rtp_context, 1);
>> +
>> +    c->fdsem = pa_fdsem_new();
>
> As far as I can tell, the fdsem is only needed when receiving data, not
> when sending.

Right, fixed.

>> +    c->ss = *ss;
>> +
>> +    if (!gst_init_check(NULL, NULL, &error)) {
>> +        pa_log_error("Could not initialise GStreamer: %s", error->message);
>> +        g_error_free(error);
>> +        goto fail;
>> +    }
>> +
>> +    if (!init_send_pipeline(c, fd, payload, mtu, ss))
>> +        goto fail;
>> +
>> +    return c;
>> +
>> +fail:
>> +    pa_xfree(c);
>
> You should call pa_rtp_context_free() to be sure that everything gets
> properly deinitialized. Now you're leaking the fdsem. The same comment
> applies to pa_rtp_context_new_recv() too.

Done.

>> +static bool process_bus_messages(pa_rtp_context *c) {
>> +    GstBus *bus;
>> +    GstMessage *message;
>> +    bool ret = true;
>> +
>> +    bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
>> +
>> +    while (ret && (message = gst_bus_pop(bus))) {
>> +        if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
>> +            GError *error = NULL;
>> +
>> +            ret = false;
>> +
>> +            gst_message_parse_error(message, &error, NULL);
>> +            pa_log("Got an error: %s", error->message);
>> +
>> +            g_error_free(error);
>> +
>> +            pa_fdsem_post(c->fdsem);
>
> What's the purpose of this? If I understand correctly, the fdsem is
> used to wake up the pulseaudio source IO thread when we receive data in
> a gstreamer thread, but this code runs in the IO thread, so this seems
> pointless.
>
> (By the way, it would be good to have comments in each function about
> which thread they run in.)

Added comments about this. And you're right, this one is not needed.

>
>> +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
>> +    pa_memchunk chunk = { 0, };
>> +    GstBuffer *buf;
>> +    void *data;
>> +    bool stop = false;
>> +    int ret = 0;
>> +
>> +    pa_assert(c);
>> +    pa_assert(q);
>> +
>> +    if (!process_bus_messages(c))
>> +        return -1;
>> +
>> +    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
>> +        pa_assert(chunk.memblock);
>> +
>> +        data = pa_memblock_acquire(chunk.memblock);
>> +
>> +        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
>> +                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
>> +                                          (GDestroyNotify) free_buffer);
>> +
>> +        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
>> +            pa_log_error("Could not push buffer");
>> +            stop = true;
>> +            ret = -1;
>> +        }
>> +
>> +        pa_memblockq_drop(q, chunk.length);
>> +    }
>> +
>> +    return ret;
>> +}
>
> I wonder about the error handling in this function. module-rtp-send.c,
> which calls this function, doesn't care about the return value.
> Unloading the module would make sense to me, but if you don't do that,
> how do you ensure that we don't end up spamming errors to the log
> infinitely if the pipeline stops working?

I can add another patch on top of this to unload the module of send fails.

>> +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
>> +    pa_rtp_context *c = (pa_rtp_context *) userdata;
>> +    GstElement *depay;
>> +    GstPad *sinkpad;
>> +    GstPadLinkReturn ret;
>> +
>> +    depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
>> +    pa_assert(depay);
>> +
>> +    sinkpad = gst_element_get_static_pad(depay, "sink");
>> +
>> +    ret = gst_pad_link(pad, sinkpad);
>> +    if (ret != GST_PAD_LINK_OK) {
>> +        GstBus *bus;
>> +        GError *error;
>> +
>> +        bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
>> +        error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
>> +        gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
>
> It's not clear to me how the messages are dispatched. How does the sink
> IO thread get notified of this message? Messages are processed in
> pa_rtp_recv(), which is I think is called when we post to the fdsem,
> but we don't call pa_fdsem_post() at least in this function. Does the
> error produce an EOS event, which then will trigger a pa_fdsem_post()
> call?

No, we need to call pa_fdsem_post(). Will fix that.

>> +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
>> +    GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
>> +    GstCaps *caps;
>> +    GSocket *socket;
>> +    GError *error = NULL;
>> +
>> +    MAKE_ELEMENT(udpsrc, "udpsrc");
>> +    MAKE_ELEMENT(rtpbin, "rtpbin");
>> +    MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
>> +    MAKE_ELEMENT(appsink, "appsink");
>> +
>> +    c->pipeline = gst_pipeline_new(NULL);
>> +
>> +    gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
>> +
>> +    socket = g_socket_new_from_fd(fd, &error);
>> +    if (error) {
>> +        pa_log("Could not create socket: %s", error->message);
>> +        g_error_free(error);
>> +        goto fail;
>> +    }
>> +
>> +    caps = rtp_caps_from_sample_spec(ss);
>> +    if (!caps) {
>> +        pa_log("Unsupported format to payload");
>> +        goto fail;
>> +    }
>> +
>> +    g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
>> +    g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
>> +    g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
>> +
>> +    gst_caps_unref(caps);
>> +    g_object_unref(socket);
>> +
>> +    if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
>> +        !gst_element_link(depay, appsink)) {
>> +
>> +        pa_log("Could not set up send pipeline");
>
> s/send/receive/
>
>> +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
>> +    GstSample *sample = NULL;
>> +    GstBuffer *buf;
>> +    GstMapInfo info;
>> +    void *data;
>> +
>> +    if (!process_bus_messages(c))
>> +        goto fail;
>> +
>> +    sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
>> +    if (!sample) {
>> +        pa_log_warn("Could not get any more data");
>> +        goto fail;
>> +    }
>> +
>> +    buf = gst_sample_get_buffer(sample);
>> +
>> +    if (GST_BUFFER_IS_DISCONT(buf))
>> +        pa_log_info("Discontinuity detected, possibly lost some packets");
>> +
>> +    if (!gst_buffer_map(buf, &info, GST_MAP_READ))
>> +        goto fail;
>
> It would be good to log something here.

Sure. This is just a sanity check though, and should never happen.

-- Arun


More information about the pulseaudio-discuss mailing list