[pulseaudio-discuss] [PATCH 8/8] rtp: Add a GStreamer-based RTP implementation
Arun Raghavan
arun at accosted.net
Thu May 12 13:49:05 UTC 2016
On 21 April 2016 at 21:33, Tanu Kaskinen <tanuk at iki.fi> wrote:
> On Mon, 2016-02-29 at 15:46 +0530, arun at accosted.net wrote:
>> module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
>> module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
>> module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
>> -module_rtp_send_la_CFLAGS = $(AM_CFLAGS)
>> +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>>
>> module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
>> module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
>> module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
>> -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
>> +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>>
>> # JACK
>>
>> @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS)
>> module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c
>> module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
>> module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la
>> -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp
>> +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS)
>
> Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only
> librtp contains GStreamer code.
>
>> +typedef struct pa_rtp_context {
>
> The typedef is already done in rtp.h, no need to do it again.
>
>> +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
>> + pa_rtp_context *c = NULL;
>> + GError *error = NULL;
>> +
>> + pa_assert(fd >= 0);
>> +
>> + c = pa_xnew0(pa_rtp_context, 1);
>> +
>> + c->fdsem = pa_fdsem_new();
>
> As far as I can tell, the fdsem is only needed when receiving data, not
> when sending.
Right, fixed.
>> + c->ss = *ss;
>> +
>> + if (!gst_init_check(NULL, NULL, &error)) {
>> + pa_log_error("Could not initialise GStreamer: %s", error->message);
>> + g_error_free(error);
>> + goto fail;
>> + }
>> +
>> + if (!init_send_pipeline(c, fd, payload, mtu, ss))
>> + goto fail;
>> +
>> + return c;
>> +
>> +fail:
>> + pa_xfree(c);
>
> You should call pa_rtp_context_free() to be sure that everything gets
> properly deinitialized. Now you're leaking the fdsem. The same comment
> applies to pa_rtp_context_new_recv() too.
Done.
>> +static bool process_bus_messages(pa_rtp_context *c) {
>> + GstBus *bus;
>> + GstMessage *message;
>> + bool ret = true;
>> +
>> + bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
>> +
>> + while (ret && (message = gst_bus_pop(bus))) {
>> + if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
>> + GError *error = NULL;
>> +
>> + ret = false;
>> +
>> + gst_message_parse_error(message, &error, NULL);
>> + pa_log("Got an error: %s", error->message);
>> +
>> + g_error_free(error);
>> +
>> + pa_fdsem_post(c->fdsem);
>
> What's the purpose of this? If I understand correctly, the fdsem is
> used to wake up the pulseaudio source IO thread when we receive data in
> a gstreamer thread, but this code runs in the IO thread, so this seems
> pointless.
>
> (By the way, it would be good to have comments in each function about
> which thread they run in.)
Added comments about this. And you're right, this one is not needed.
>
>> +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
>> + pa_memchunk chunk = { 0, };
>> + GstBuffer *buf;
>> + void *data;
>> + bool stop = false;
>> + int ret = 0;
>> +
>> + pa_assert(c);
>> + pa_assert(q);
>> +
>> + if (!process_bus_messages(c))
>> + return -1;
>> +
>> + while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
>> + pa_assert(chunk.memblock);
>> +
>> + data = pa_memblock_acquire(chunk.memblock);
>> +
>> + buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
>> + data, chunk.length, chunk.index, chunk.length, chunk.memblock,
>> + (GDestroyNotify) free_buffer);
>> +
>> + if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
>> + pa_log_error("Could not push buffer");
>> + stop = true;
>> + ret = -1;
>> + }
>> +
>> + pa_memblockq_drop(q, chunk.length);
>> + }
>> +
>> + return ret;
>> +}
>
> I wonder about the error handling in this function. module-rtp-send.c,
> which calls this function, doesn't care about the return value.
> Unloading the module would make sense to me, but if you don't do that,
> how do you ensure that we don't end up spamming errors to the log
> infinitely if the pipeline stops working?
I can add another patch on top of this to unload the module of send fails.
>> +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
>> + pa_rtp_context *c = (pa_rtp_context *) userdata;
>> + GstElement *depay;
>> + GstPad *sinkpad;
>> + GstPadLinkReturn ret;
>> +
>> + depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
>> + pa_assert(depay);
>> +
>> + sinkpad = gst_element_get_static_pad(depay, "sink");
>> +
>> + ret = gst_pad_link(pad, sinkpad);
>> + if (ret != GST_PAD_LINK_OK) {
>> + GstBus *bus;
>> + GError *error;
>> +
>> + bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
>> + error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
>> + gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
>
> It's not clear to me how the messages are dispatched. How does the sink
> IO thread get notified of this message? Messages are processed in
> pa_rtp_recv(), which is I think is called when we post to the fdsem,
> but we don't call pa_fdsem_post() at least in this function. Does the
> error produce an EOS event, which then will trigger a pa_fdsem_post()
> call?
No, we need to call pa_fdsem_post(). Will fix that.
>> +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
>> + GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
>> + GstCaps *caps;
>> + GSocket *socket;
>> + GError *error = NULL;
>> +
>> + MAKE_ELEMENT(udpsrc, "udpsrc");
>> + MAKE_ELEMENT(rtpbin, "rtpbin");
>> + MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
>> + MAKE_ELEMENT(appsink, "appsink");
>> +
>> + c->pipeline = gst_pipeline_new(NULL);
>> +
>> + gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
>> +
>> + socket = g_socket_new_from_fd(fd, &error);
>> + if (error) {
>> + pa_log("Could not create socket: %s", error->message);
>> + g_error_free(error);
>> + goto fail;
>> + }
>> +
>> + caps = rtp_caps_from_sample_spec(ss);
>> + if (!caps) {
>> + pa_log("Unsupported format to payload");
>> + goto fail;
>> + }
>> +
>> + g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
>> + g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
>> + g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
>> +
>> + gst_caps_unref(caps);
>> + g_object_unref(socket);
>> +
>> + if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
>> + !gst_element_link(depay, appsink)) {
>> +
>> + pa_log("Could not set up send pipeline");
>
> s/send/receive/
>
>> +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
>> + GstSample *sample = NULL;
>> + GstBuffer *buf;
>> + GstMapInfo info;
>> + void *data;
>> +
>> + if (!process_bus_messages(c))
>> + goto fail;
>> +
>> + sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
>> + if (!sample) {
>> + pa_log_warn("Could not get any more data");
>> + goto fail;
>> + }
>> +
>> + buf = gst_sample_get_buffer(sample);
>> +
>> + if (GST_BUFFER_IS_DISCONT(buf))
>> + pa_log_info("Discontinuity detected, possibly lost some packets");
>> +
>> + if (!gst_buffer_map(buf, &info, GST_MAP_READ))
>> + goto fail;
>
> It would be good to log something here.
Sure. This is just a sanity check though, and should never happen.
-- Arun
More information about the pulseaudio-discuss
mailing list