[pulseaudio-discuss] [PATCH 8/8] rtp: Add a GStreamer-based RTP implementation

Tanu Kaskinen tanuk at iki.fi
Thu Apr 21 16:03:36 UTC 2016


On Mon, 2016-02-29 at 15:46 +0530, arun at accosted.net wrote:
>  module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
>  module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
> -module_rtp_send_la_CFLAGS = $(AM_CFLAGS)
> +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>  
>  module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
>  module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
> -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
> +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>  
>  # JACK
>  
> @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS)
>  module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c
>  module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la
> -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp
> +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS)

Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only
librtp contains GStreamer code.

> +typedef struct pa_rtp_context {

The typedef is already done in rtp.h, no need to do it again.

> +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
> +    pa_rtp_context *c = NULL;
> +    GError *error = NULL;
> +
> +    pa_assert(fd >= 0);
> +
> +    c = pa_xnew0(pa_rtp_context, 1);
> +
> +    c->fdsem = pa_fdsem_new();

As far as I can tell, the fdsem is only needed when receiving data, not
when sending.

> +    c->ss = *ss;
> +
> +    if (!gst_init_check(NULL, NULL, &error)) {
> +        pa_log_error("Could not initialise GStreamer: %s", error->message);
> +        g_error_free(error);
> +        goto fail;
> +    }
> +
> +    if (!init_send_pipeline(c, fd, payload, mtu, ss))
> +        goto fail;
> +
> +    return c;
> +
> +fail:
> +    pa_xfree(c);

You should call pa_rtp_context_free() to be sure that everything gets
properly deinitialized. Now you're leaking the fdsem. The same comment
applies to pa_rtp_context_new_recv() too.

> +static bool process_bus_messages(pa_rtp_context *c) {
> +    GstBus *bus;
> +    GstMessage *message;
> +    bool ret = true;
> +
> +    bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
> +
> +    while (ret && (message = gst_bus_pop(bus))) {
> +        if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
> +            GError *error = NULL;
> +
> +            ret = false;
> +
> +            gst_message_parse_error(message, &error, NULL);
> +            pa_log("Got an error: %s", error->message);
> +
> +            g_error_free(error);
> +
> +            pa_fdsem_post(c->fdsem);

What's the purpose of this? If I understand correctly, the fdsem is
used to wake up the pulseaudio source IO thread when we receive data in
a gstreamer thread, but this code runs in the IO thread, so this seems
pointless.

(By the way, it would be good to have comments in each function about
which thread they run in.)

> +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
> +    pa_memchunk chunk = { 0, };
> +    GstBuffer *buf;
> +    void *data;
> +    bool stop = false;
> +    int ret = 0;
> +
> +    pa_assert(c);
> +    pa_assert(q);
> +
> +    if (!process_bus_messages(c))
> +        return -1;
> +
> +    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
> +        pa_assert(chunk.memblock);
> +
> +        data = pa_memblock_acquire(chunk.memblock);
> +
> +        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
> +                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
> +                                          (GDestroyNotify) free_buffer);
> +
> +        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
> +            pa_log_error("Could not push buffer");
> +            stop = true;
> +            ret = -1;
> +        }
> +
> +        pa_memblockq_drop(q, chunk.length);
> +    }
> +
> +    return ret;
> +}

I wonder about the error handling in this function. module-rtp-send.c,
which calls this function, doesn't care about the return value.
Unloading the module would make sense to me, but if you don't do that,
how do you ensure that we don't end up spamming errors to the log
infinitely if the pipeline stops working?

> +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
> +    pa_rtp_context *c = (pa_rtp_context *) userdata;
> +    GstElement *depay;
> +    GstPad *sinkpad;
> +    GstPadLinkReturn ret;
> +
> +    depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
> +    pa_assert(depay);
> +
> +    sinkpad = gst_element_get_static_pad(depay, "sink");
> +
> +    ret = gst_pad_link(pad, sinkpad);
> +    if (ret != GST_PAD_LINK_OK) {
> +        GstBus *bus;
> +        GError *error;
> +
> +        bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
> +        error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
> +        gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));

It's not clear to me how the messages are dispatched. How does the sink
IO thread get notified of this message? Messages are processed in
pa_rtp_recv(), which is I think is called when we post to the fdsem,
but we don't call pa_fdsem_post() at least in this function. Does the
error produce an EOS event, which then will trigger a pa_fdsem_post()
call?

> +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
> +    GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
> +    GstCaps *caps;
> +    GSocket *socket;
> +    GError *error = NULL;
> +
> +    MAKE_ELEMENT(udpsrc, "udpsrc");
> +    MAKE_ELEMENT(rtpbin, "rtpbin");
> +    MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
> +    MAKE_ELEMENT(appsink, "appsink");
> +
> +    c->pipeline = gst_pipeline_new(NULL);
> +
> +    gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
> +
> +    socket = g_socket_new_from_fd(fd, &error);
> +    if (error) {
> +        pa_log("Could not create socket: %s", error->message);
> +        g_error_free(error);
> +        goto fail;
> +    }
> +
> +    caps = rtp_caps_from_sample_spec(ss);
> +    if (!caps) {
> +        pa_log("Unsupported format to payload");
> +        goto fail;
> +    }
> +
> +    g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
> +    g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
> +    g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
> +
> +    gst_caps_unref(caps);
> +    g_object_unref(socket);
> +
> +    if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
> +        !gst_element_link(depay, appsink)) {
> +
> +        pa_log("Could not set up send pipeline");

s/send/receive/

> +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
> +    GstSample *sample = NULL;
> +    GstBuffer *buf;
> +    GstMapInfo info;
> +    void *data;
> +
> +    if (!process_bus_messages(c))
> +        goto fail;
> +
> +    sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
> +    if (!sample) {
> +        pa_log_warn("Could not get any more data");
> +        goto fail;
> +    }
> +
> +    buf = gst_sample_get_buffer(sample);
> +
> +    if (GST_BUFFER_IS_DISCONT(buf))
> +        pa_log_info("Discontinuity detected, possibly lost some packets");
> +
> +    if (!gst_buffer_map(buf, &info, GST_MAP_READ))
> +        goto fail;

It would be good to log something here.

-- 
Tanu


More information about the pulseaudio-discuss mailing list