[pulseaudio-commits] [Git][pulseaudio/pulseaudio][master] 11 commits: rtp: Make init return a value for success/failure
Arun Raghavan
gitlab at gitlab.freedesktop.org
Fri Nov 8 12:15:57 UTC 2019
Arun Raghavan pushed to branch master at PulseAudio / pulseaudio
Commits:
a9a3f087 by Arun Raghavan at 2019-11-08T12:09:03Z
rtp: Make init return a value for success/failure
This will be used in the future when writing other RTP implementations
that can fail on init.
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
7766f0e8 by Arun Raghavan at 2019-11-08T12:09:03Z
rtp: Don't use cookie for SSRC
Publishing the cookie on multicast seems to be a bad idea.
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
02fa9d5f by Arun Raghavan at 2019-11-08T12:09:03Z
rtp: Drop support for non-L16 media
There doesn't seem much value in supporting streaming U8/mulaw/alaw on
the network, and it's unlikely these get any testing. Makes more sense
to drop these formats and just convert to L16 if we're dealing with
source media in that format.
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
0548cdc6 by Arun Raghavan at 2019-11-08T12:09:03Z
rtp: Move MTU handling to the RTP implementation
module-rtp-send itself doesn't really need to handle this, the
implementation can keep track (and make sure sending happens in MTU
sized chunks).
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
d15291bb by Arun Raghavan at 2019-11-08T12:09:03Z
rtp: Hide RTP implementation details from module-rtp-*
This moves RTP implementation-specific information out of
module-rtp-send/recv. This is basically done by making the
pa_rtp_context structure opaque from the perspective of these modules.
We can then potentially replace the underlying RTP implementation with
something else transparently.
One RTP detail that does "leak" is the RTP timestamp. We provide this to
module-rtp-recv so that it can perform rate adjustments to match the
sender rate.
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
eb912d36 by Arun Raghavan at 2019-11-08T12:09:03Z
rtpoll: Separate out before/after/work callback userdata
It is possible that we might want to have a separate userdata to be used
for these callbacks, so let's split them out.
This is particularly needed when using an pa_rtpoll_item around pa_fdsem
since that uses its own before/after callback but will essentially have
whatever is using the fdsem set up the work callback appropriately (and
thus at least the work callback's userdata needs to be separated from
the before/after callback -- we might as well then just separate all
three).
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
74f8456a by Arun Raghavan at 2019-11-08T12:09:40Z
rtp: Add a GStreamer-based RTP implementation
This adds a GStreamer-based RTP implementation to replace our own. The
original implementation is retained for cases where it is not possible
to include GStreamer as a dependency.
The idea with this is to be able to start supporting more advanced RTP
features such as RTCP, non-PCM audio, and potentially synchronised
playback.
Signed-off-by: Arun Raghavan <arun at arunraghavan.net>
- - - - -
7f6571e1 by Sebastian Dröge at 2019-11-08T12:09:40Z
rtp: Use yes/no in configure instead of 1/0
- - - - -
72dbbcbc by Sebastian Dröge at 2019-11-08T12:09:40Z
rtp: Use udpsink instead of fdsink for the GStreamer RTP implementation
- - - - -
a34d2e54 by Sebastian Dröge at 2019-11-08T12:09:40Z
rtp: Properly timestamp buffers in the GStreamer sender pipeline
Otherwise default timestamping will happen, which might not be correct,
especially not after the stream was suspended for a while.
- - - - -
a17cc55c by Arun Raghavan at 2019-11-08T12:09:40Z
rtp: Add some logging to know what backend is being used
Should make debugging any issues that come up easier.
- - - - -
17 changed files:
- configure.ac
- meson.build
- meson_options.txt
- po/POTFILES.in
- src/Makefile.am
- src/modules/alsa/alsa-mixer.c
- src/modules/rtp/meson.build
- src/modules/rtp/module-rtp-recv.c
- src/modules/rtp/module-rtp-send.c
- + src/modules/rtp/rtp-common.c
- + src/modules/rtp/rtp-gstreamer.c
- src/modules/rtp/rtp.c → src/modules/rtp/rtp-native.c
- src/modules/rtp/rtp.h
- src/modules/rtp/sdp.c
- src/pulsecore/rtpoll.c
- src/pulsecore/rtpoll.h
- src/tests/rtpoll-test.c
Changes:
=====================================
configure.ac
=====================================
@@ -1310,6 +1310,22 @@ AC_SUBST(HAVE_SYSTEMD_JOURNAL)
AM_CONDITIONAL([HAVE_SYSTEMD_JOURNAL], [test "x$HAVE_SYSTEMD_JOURNAL" = x1])
AS_IF([test "x$HAVE_SYSTEMD_JOURNAL" = "x1"], AC_DEFINE([HAVE_SYSTEMD_JOURNAL], 1, [Have SYSTEMDJOURNAL?]))
+#### GStreamer-based RTP support (optional) ####
+
+AC_ARG_ENABLE([gstreamer],
+ AS_HELP_STRING([--disable-gstreamer],[Disable optional GStreamer-based RTP support]))
+
+AS_IF([test "x$enable_gstreamer" != "xno"],
+ [PKG_CHECK_MODULES(GSTREAMER, [ gstreamer-1.0 gstreamer-app-1.0 gstreamer-rtp-1.0 gio-2.0 ],
+ HAVE_GSTREAMER=yes, HAVE_GSTREAMER=no)],
+ HAVE_GSTREAMER=no)
+
+AS_IF([test "x$enable_gstreamer" = "xyes" && test "x$HAVE_GSTREAMER" = "xno"],
+ [AC_MSG_ERROR([*** GStreamer 1.0 support not found])])
+
+AM_CONDITIONAL([HAVE_GSTREAMER], [test "x$HAVE_GSTREAMER" = xyes])
+AS_IF([test "x$HAVE_GSTREAMER" = "xyes"], AC_DEFINE([HAVE_GSTREAMER], 1, [Have GStreamer?]))
+
#### Build and Install man pages ####
AC_ARG_ENABLE([manpages],
@@ -1614,6 +1630,7 @@ AS_IF([test "x$HAVE_ADRIAN_EC" = "x1"], ENABLE_ADRIAN_EC=yes, ENABLE_ADRIAN_EC=n
AS_IF([test "x$HAVE_SPEEX" = "x1"], ENABLE_SPEEX=yes, ENABLE_SPEEX=no)
AS_IF([test "x$HAVE_SOXR" = "x1"], ENABLE_SOXR=yes, ENABLE_SOXR=no)
AS_IF([test "x$HAVE_WEBRTC" = "x1"], ENABLE_WEBRTC=yes, ENABLE_WEBRTC=no)
+AS_IF([test "x$HAVE_GSTREAMER" = "x1"], ENABLE_GSTREAMER=yes, ENABLE_GSTREAMER=no)
AS_IF([test "x$HAVE_TDB" = "x1"], ENABLE_TDB=yes, ENABLE_TDB=no)
AS_IF([test "x$HAVE_GDBM" = "x1"], ENABLE_GDBM=yes, ENABLE_GDBM=no)
AS_IF([test "x$HAVE_SIMPLEDB" = "x1"], ENABLE_SIMPLEDB=yes, ENABLE_SIMPLEDB=no)
@@ -1677,6 +1694,7 @@ echo "
Enable speex (resampler, AEC): ${ENABLE_SPEEX}
Enable soxr (resampler): ${ENABLE_SOXR}
Enable WebRTC echo canceller: ${ENABLE_WEBRTC}
+ Enable GStreamer-based RTP: ${ENABLE_GSTREAMER}
Enable gcov coverage: ${ENABLE_GCOV}
Enable unit tests: ${ENABLE_TESTS}
Database
=====================================
meson.build
=====================================
@@ -669,6 +669,15 @@ if webrtc_dep.found()
cdata.set('HAVE_WEBRTC', 1)
endif
+gst_dep = dependency('gstreamer-1.0', required : get_option('gstreamer'))
+gstapp_dep = dependency('gstreamer-app-1.0', required : get_option('gstreamer'))
+gstrtp_dep = dependency('gstreamer-rtp-1.0', required : get_option('gstreamer'))
+
+have_gstreamer = false
+if gst_dep.found() and gstapp_dep.found() and gstrtp_dep.found()
+ have_gstreamer = true
+endif
+
# These are required for the CMake file generation
cdata.set('PA_LIBDIR', libdir)
cdata.set('PA_INCDIR', includedir)
@@ -815,6 +824,7 @@ summary = [
'Enable OpenSSL (for Airtunes): @0@'.format(openssl_dep.found()),
'Enable FFTW: @0@'.format(fftw_dep.found()),
'Enable ORC: @0@'.format(have_orcc),
+ 'Enable GStreamer: @0@'.format(have_gstreamer),
'Enable Adrian echo canceller: @0@'.format(get_option('adrian-aec')),
'Enable Speex (resampler, AEC): @0@'.format(speex_dep.found()),
'Enable SoXR (resampler): @0@'.format(soxr_dep.found()),
=====================================
meson_options.txt
=====================================
@@ -93,6 +93,9 @@ option('glib',
option('gsettings',
type : 'feature', value : 'auto',
description : 'Optional GSettings support')
+option('gstreamer',
+ type : 'feature', value : 'auto',
+ description : 'Optional GStreamer dependency for media-related functionality')
option('gtk',
type : 'feature', value : 'auto',
description : 'Optional Gtk+ 3 support')
=====================================
po/POTFILES.in
=====================================
@@ -66,7 +66,9 @@ src/modules/raop/raop-sink.c
src/modules/reserve-wrap.c
src/modules/rtp/module-rtp-recv.c
src/modules/rtp/module-rtp-send.c
-src/modules/rtp/rtp.c
+src/modules/rtp/rtp-common.c
+src/modules/rtp/rtp-native.c
+src/modules/rtp/rtp-gstreamer.c
src/modules/rtp/sap.c
src/modules/rtp/sdp.c
src/modules/x11/module-x11-bell.c
=====================================
src/Makefile.am
=====================================
@@ -1176,13 +1176,21 @@ libprotocol_esound_la_LIBADD = $(AM_LIBADD) libpulsecore- at PA_MAJORMINOR@.la libp
endif
librtp_la_SOURCES = \
- modules/rtp/rtp.c modules/rtp/rtp.h \
+ modules/rtp/rtp-common.c modules/rtp/rtp.h \
modules/rtp/sdp.c modules/rtp/sdp.h \
modules/rtp/sap.c modules/rtp/sap.h \
modules/rtp/rtsp_client.c modules/rtp/rtsp_client.h \
modules/rtp/headerlist.c modules/rtp/headerlist.h
+librtp_la_CFLAGS = $(AM_CFLAGS)
librtp_la_LDFLAGS = $(AM_LDFLAGS) $(AM_LIBLDFLAGS) -avoid-version
librtp_la_LIBADD = $(AM_LIBADD) libpulsecore- at PA_MAJORMINOR@.la libpulsecommon- at PA_MAJORMINOR@.la libpulse.la
+if HAVE_GSTREAMER
+librtp_la_SOURCES += modules/rtp/rtp-gstreamer.c
+librtp_la_CFLAGS += $(GSTREAMER_CFLAGS)
+librtp_la_LIBADD += $(GSTREAMER_LIBS)
+else
+librtp_la_SOURCES += modules/rtp/rtp-native.c
+endif
libraop_la_SOURCES = \
modules/raop/raop-util.c modules/raop/raop-util.h \
@@ -2049,12 +2057,12 @@ endif
module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
-module_rtp_send_la_CFLAGS = $(AM_CFLAGS) -DPA_MODULE_NAME=module_rtp_send
+module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS) -DPA_MODULE_NAME=module_rtp_send
module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
-module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) -DPA_MODULE_NAME=module_rtp_recv
+module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS) -DPA_MODULE_NAME=module_rtp_recv
# JACK
=====================================
src/modules/alsa/alsa-mixer.c
=====================================
@@ -467,7 +467,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
unsigned short revents = 0;
int err, ret = 0;
- pd = pa_rtpoll_item_get_userdata(i);
+ pd = pa_rtpoll_item_get_work_userdata(i);
pa_assert_fp(pd);
pa_assert_fp(i == pd->poll_item);
@@ -547,8 +547,7 @@ int pa_alsa_set_mixer_rtpoll(struct pa_alsa_mixer_pdata *pd, snd_mixer_t *mixer,
pd->poll_item = i;
pd->mixer = mixer;
- pa_rtpoll_item_set_userdata(i, pd);
- pa_rtpoll_item_set_work_callback(i, rtpoll_work_cb);
+ pa_rtpoll_item_set_work_callback(i, rtpoll_work_cb, pd);
return 0;
}
=====================================
src/modules/rtp/meson.build
=====================================
@@ -1,5 +1,5 @@
librtp_sources = [
- 'rtp.c',
+ 'rtp-common.c',
'sdp.c',
'sap.c',
'rtsp_client.c',
@@ -14,13 +14,19 @@ librtp_headers = [
'headerlist.h',
]
+if have_gstreamer
+ librtp_sources += 'rtp-gstreamer.c'
+else
+ librtp_sources += 'rtp-native.c'
+endif
+
librtp = shared_library('rtp',
librtp_sources,
librtp_headers,
c_args : [pa_c_args, server_c_args],
link_args : [nodelete_link_args],
include_directories : [configinc, topinc],
- dependencies : [libpulse_dep, libpulsecommon_dep, libpulsecore_dep, libatomic_ops_dep],
+ dependencies : [libpulse_dep, libpulsecommon_dep, libpulsecore_dep, libatomic_ops_dep, gst_dep, gstapp_dep, gstrtp_dep, gio_dep],
install : true,
install_rpath : privlibdir,
install_dir : modlibexecdir,
=====================================
src/modules/rtp/module-rtp-recv.c
=====================================
@@ -90,12 +90,11 @@ struct session {
pa_memblockq *memblockq;
bool first_packet;
- uint32_t ssrc;
uint32_t offset;
struct pa_sdp_info sdp_info;
- pa_rtp_context rtp_context;
+ pa_rtp_context *rtp_context;
pa_rtpoll_item *rtpoll_item;
@@ -205,12 +204,13 @@ static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
/* Called from I/O thread context */
static int rtpoll_work_cb(pa_rtpoll_item *i) {
pa_memchunk chunk;
+ uint32_t timestamp;
int64_t k, j, delta;
struct timeval now = { 0, 0 };
struct session *s;
struct pollfd *p;
- pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
+ pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
p = pa_rtpoll_item_get_pollfd(i, NULL);
@@ -224,40 +224,30 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
p->revents = 0;
- if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
+ if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0)
return 0;
- if (s->sdp_info.payload != s->rtp_context.payload ||
- !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
+ if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
pa_memblock_unref(chunk.memblock);
return 0;
}
if (!s->first_packet) {
s->first_packet = true;
-
- s->ssrc = s->rtp_context.ssrc;
- s->offset = s->rtp_context.timestamp;
-
- if (s->ssrc == s->userdata->module->core->cookie)
- pa_log_warn("Detected RTP packet loop!");
- } else {
- if (s->ssrc != s->rtp_context.ssrc) {
- pa_memblock_unref(chunk.memblock);
- return 0;
- }
+ s->offset = timestamp;
}
/* Check whether there was a timestamp overflow */
- k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
- j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
+ k = (int64_t) timestamp - (int64_t) s->offset;
+ j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
delta = k;
else
delta = j;
- pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
+ pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
+ true);
if (now.tv_sec == 0) {
PA_ONCE_BEGIN {
@@ -277,7 +267,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
pa_memblock_unref(chunk.memblock);
/* The next timestamp we expect */
- s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
+ s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
pa_atomic_store(&s->timestamp, (int) now.tv_sec);
@@ -386,21 +376,14 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
/* Called from I/O thread context */
static void sink_input_attach(pa_sink_input *i) {
struct session *s;
- struct pollfd *p;
pa_sink_input_assert_ref(i);
pa_assert_se(s = i->userdata);
pa_assert(!s->rtpoll_item);
- s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
-
- p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
- p->fd = s->rtp_context.fd;
- p->events = POLLIN;
- p->revents = 0;
+ s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
- pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
- pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
+ pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
}
/* Called from I/O thread context */
@@ -585,7 +568,8 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
pa_memblock_unref(silence.memblock);
- pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
+ if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec)))
+ goto fail;
pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
u->n_sessions++;
@@ -620,7 +604,7 @@ static void session_free(struct session *s) {
pa_memblockq_free(s->memblockq);
pa_sdp_info_destroy(&s->sdp_info);
- pa_rtp_context_destroy(&s->rtp_context);
+ pa_rtp_context_free(s->rtp_context);
pa_xfree(s);
}
=====================================
src/modules/rtp/module-rtp-send.c
=====================================
@@ -107,9 +107,8 @@ struct userdata {
pa_source_output *source_output;
pa_memblockq *memblockq;
- pa_rtp_context rtp_context;
+ pa_rtp_context *rtp_context;
pa_sap_context sap_context;
- size_t mtu;
pa_time_event *sap_event;
@@ -144,7 +143,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
return;
}
- pa_rtp_send(&u->rtp_context, u->mtu, u->memblockq);
+ pa_rtp_send(u->rtp_context, u->memblockq);
}
static pa_source_output_flags_t get_dont_inhibit_auto_suspend_flag(pa_source *source,
@@ -466,8 +465,6 @@ int pa__init(pa_module*m) {
0,
NULL);
- u->mtu = mtu;
-
k = sizeof(sa_dst);
pa_assert_se((r = getsockname(fd, (struct sockaddr*) &sa_dst, &k)) >= 0);
@@ -491,10 +488,12 @@ int pa__init(pa_module*m) {
pa_xfree(n);
- pa_rtp_context_init_send(&u->rtp_context, fd, m->core->cookie, payload, pa_frame_size(&ss));
+ if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, &ss)))
+ goto fail;
pa_sap_context_init_send(&u->sap_context, sap_fd, p);
- pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, SSRC=0x%08x, payload=%u, initial sequence #%u", mtu, dst_addr, port, src_addr, ttl, u->rtp_context.ssrc, payload, u->rtp_context.sequence);
+ pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, payload=%u",
+ mtu, dst_addr, port, src_addr, ttl, payload);
pa_log_info("SDP-Data:\n%s\nEOF", p);
pa_sap_send(&u->sap_context, 0);
@@ -536,7 +535,7 @@ void pa__done(pa_module*m) {
pa_source_output_unref(u->source_output);
}
- pa_rtp_context_destroy(&u->rtp_context);
+ pa_rtp_context_free(u->rtp_context);
pa_sap_send(&u->sap_context, 1);
pa_sap_context_destroy(&u->sap_context);
=====================================
src/modules/rtp/rtp-common.c
=====================================
@@ -0,0 +1,97 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "rtp.h"
+
+#include <pulsecore/core-util.h>
+
+uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
+ pa_assert(ss);
+
+ if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 2)
+ return 10;
+ if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 1)
+ return 11;
+
+ return 127;
+}
+
+pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) {
+ pa_assert(ss);
+
+ switch (payload) {
+ case 10:
+ ss->channels = 2;
+ ss->format = PA_SAMPLE_S16BE;
+ ss->rate = 44100;
+ break;
+
+ case 11:
+ ss->channels = 1;
+ ss->format = PA_SAMPLE_S16BE;
+ ss->rate = 44100;
+ break;
+
+ default:
+ return NULL;
+ }
+
+ return ss;
+}
+
+pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) {
+ pa_assert(ss);
+
+ if (!pa_rtp_sample_spec_valid(ss))
+ ss->format = PA_SAMPLE_S16BE;
+
+ pa_assert(pa_rtp_sample_spec_valid(ss));
+ return ss;
+}
+
+int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
+ pa_assert(ss);
+
+ if (!pa_sample_spec_valid(ss))
+ return 0;
+
+ return ss->format == PA_SAMPLE_S16BE;
+}
+
+const char* pa_rtp_format_to_string(pa_sample_format_t f) {
+ switch (f) {
+ case PA_SAMPLE_S16BE:
+ return "L16";
+ default:
+ return NULL;
+ }
+}
+
+pa_sample_format_t pa_rtp_string_to_format(const char *s) {
+ pa_assert(s);
+
+ if (pa_streq(s, "L16"))
+ return PA_SAMPLE_S16BE;
+ else
+ return PA_SAMPLE_INVALID;
+}
=====================================
src/modules/rtp/rtp-gstreamer.c
=====================================
@@ -0,0 +1,524 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2016 Arun Raghavan <mail at arunraghavan.net>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <pulse/timeval.h>
+#include <pulsecore/fdsem.h>
+#include <pulsecore/core-rtclock.h>
+
+#include "rtp.h"
+
+#include <gio/gio.h>
+
+#include <gst/gst.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappsink.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#define MAKE_ELEMENT_NAMED(v, e, n) \
+ v = gst_element_factory_make(e, n); \
+ if (!v) { \
+ pa_log("Could not create %s element", e); \
+ goto fail; \
+ }
+
+#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
+
+struct pa_rtp_context {
+ pa_fdsem *fdsem;
+ pa_sample_spec ss;
+
+ GstElement *pipeline;
+ GstElement *appsrc;
+ GstElement *appsink;
+
+ uint32_t last_timestamp;
+};
+
+static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
+ if (ss->format != PA_SAMPLE_S16BE)
+ return NULL;
+
+ return gst_caps_new_simple("audio/x-raw",
+ "format", G_TYPE_STRING, "S16BE",
+ "rate", G_TYPE_INT, (int) ss->rate,
+ "channels", G_TYPE_INT, (int) ss->channels,
+ "layout", G_TYPE_STRING, "interleaved",
+ NULL);
+}
+static bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+ GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL;
+ GstCaps *caps;
+ GSocket *socket;
+ GInetSocketAddress *addr;
+ GInetAddress *iaddr;
+ guint16 port;
+ gchar *addr_str;
+
+ MAKE_ELEMENT(appsrc, "appsrc");
+ MAKE_ELEMENT(pay, "rtpL16pay");
+ MAKE_ELEMENT(capsf, "capsfilter");
+ MAKE_ELEMENT(rtpbin, "rtpbin");
+ MAKE_ELEMENT(sink, "udpsink");
+
+ c->pipeline = gst_pipeline_new(NULL);
+
+ gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL);
+
+ caps = caps_from_sample_spec(ss);
+ if (!caps) {
+ pa_log("Unsupported format to payload");
+ goto fail;
+ }
+
+ socket = g_socket_new_from_fd(fd, NULL);
+ if (!socket) {
+ pa_log("Failed to create socket");
+ goto fail;
+ }
+
+ addr = G_INET_SOCKET_ADDRESS(g_socket_get_remote_address(socket, NULL));
+ iaddr = g_inet_socket_address_get_address(addr);
+ addr_str = g_inet_address_to_string(iaddr);
+ port = g_inet_socket_address_get_port(addr);
+
+ g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL);
+ g_object_set(pay, "mtu", mtu, NULL);
+ g_object_set(sink, "socket", socket, "host", addr_str, "port", port,
+ "enable-last-sample", FALSE, "sync", FALSE, "loop",
+ g_socket_get_multicast_loopback(socket), "ttl",
+ g_socket_get_ttl(socket), "ttl-mc",
+ g_socket_get_multicast_ttl(socket), "auto-multicast", FALSE,
+ NULL);
+
+ g_free(addr_str);
+ g_object_unref(addr);
+ g_object_unref(socket);
+
+ gst_caps_unref(caps);
+
+ /* Force the payload type that we want */
+ caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, NULL);
+ g_object_set(capsf, "caps", caps, NULL);
+ gst_caps_unref(caps);
+
+ if (!gst_element_link(appsrc, pay) ||
+ !gst_element_link(pay, capsf) ||
+ !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") ||
+ !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) {
+
+ pa_log("Could not set up send pipeline");
+ goto fail;
+ }
+
+ if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+ pa_log("Could not start pipeline");
+ goto fail;
+ }
+
+ c->appsrc = gst_object_ref(appsrc);
+
+ return true;
+
+fail:
+ if (c->pipeline) {
+ gst_object_unref(c->pipeline);
+ } else {
+ /* These weren't yet added to pipeline, so we still have a ref */
+ if (appsrc)
+ gst_object_unref(appsrc);
+ if (pay)
+ gst_object_unref(pay);
+ if (capsf)
+ gst_object_unref(capsf);
+ if (rtpbin)
+ gst_object_unref(rtpbin);
+ if (sink)
+ gst_object_unref(sink);
+ }
+
+ return false;
+}
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+ pa_rtp_context *c = NULL;
+ GError *error = NULL;
+
+ pa_assert(fd >= 0);
+
+ pa_log_info("Initialising GStreamer RTP backend for send");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
+ c->ss = *ss;
+
+ if (!gst_init_check(NULL, NULL, &error)) {
+ pa_log_error("Could not initialise GStreamer: %s", error->message);
+ g_error_free(error);
+ goto fail;
+ }
+
+ if (!init_send_pipeline(c, fd, payload, mtu, ss))
+ goto fail;
+
+ return c;
+
+fail:
+ pa_rtp_context_free(c);
+ return NULL;
+}
+
+/* Called from I/O thread context */
+static bool process_bus_messages(pa_rtp_context *c) {
+ GstBus *bus;
+ GstMessage *message;
+ bool ret = true;
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
+
+ while (ret && (message = gst_bus_pop(bus))) {
+ if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
+ GError *error = NULL;
+
+ ret = false;
+
+ gst_message_parse_error(message, &error, NULL);
+ pa_log("Got an error: %s", error->message);
+
+ g_error_free(error);
+ }
+
+ gst_message_unref(message);
+ }
+
+ gst_object_unref(bus);
+
+ return ret;
+}
+
+static void free_buffer(pa_memblock *memblock) {
+ pa_memblock_release(memblock);
+ pa_memblock_unref(memblock);
+}
+
+/* Called from I/O thread context */
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
+ pa_memchunk chunk = { 0, };
+ GstBuffer *buf;
+ void *data;
+ bool stop = false;
+ int ret = 0;
+
+ pa_assert(c);
+ pa_assert(q);
+
+ if (!process_bus_messages(c))
+ return -1;
+
+ while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
+ GstClock *clock;
+ GstClockTime timestamp, clock_time;
+
+ clock = gst_element_get_clock(c->pipeline);
+ clock_time = gst_clock_get_time(clock);
+ gst_object_unref(clock);
+
+ timestamp = gst_element_get_base_time(c->pipeline);
+ if (timestamp > clock_time)
+ timestamp -= clock_time;
+ else
+ timestamp = 0;
+
+ pa_assert(chunk.memblock);
+
+ data = pa_memblock_acquire(chunk.memblock);
+
+ buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
+ data, chunk.length, chunk.index, chunk.length, chunk.memblock,
+ (GDestroyNotify) free_buffer);
+
+ GST_BUFFER_PTS(buf) = timestamp;
+
+ if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
+ pa_log_error("Could not push buffer");
+ stop = true;
+ ret = -1;
+ }
+
+ pa_memblockq_drop(q, chunk.length);
+ }
+
+ return ret;
+}
+
+static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
+ if (ss->format != PA_SAMPLE_S16BE)
+ return NULL;
+
+ return gst_caps_new_simple("application/x-rtp",
+ "media", G_TYPE_STRING, "audio",
+ "encoding-name", G_TYPE_STRING, "L16",
+ "clock-rate", G_TYPE_INT, (int) ss->rate,
+ "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss),
+ "layout", G_TYPE_STRING, "interleaved",
+ NULL);
+}
+
+static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+ GstElement *depay;
+ GstPad *sinkpad;
+ GstPadLinkReturn ret;
+
+ depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
+ pa_assert(depay);
+
+ sinkpad = gst_element_get_static_pad(depay, "sink");
+
+ ret = gst_pad_link(pad, sinkpad);
+ if (ret != GST_PAD_LINK_OK) {
+ GstBus *bus;
+ GError *error;
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
+ error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
+ gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
+
+ /* Actually cause the I/O thread to wake up and process the error */
+ pa_fdsem_post(c->fdsem);
+
+ g_error_free(error);
+ gst_object_unref(bus);
+ }
+
+ gst_object_unref(sinkpad);
+ gst_object_unref(depay);
+}
+
+static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
+ GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
+ GstCaps *caps;
+ GSocket *socket;
+ GError *error = NULL;
+
+ MAKE_ELEMENT(udpsrc, "udpsrc");
+ MAKE_ELEMENT(rtpbin, "rtpbin");
+ MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
+ MAKE_ELEMENT(appsink, "appsink");
+
+ c->pipeline = gst_pipeline_new(NULL);
+
+ gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
+
+ socket = g_socket_new_from_fd(fd, &error);
+ if (error) {
+ pa_log("Could not create socket: %s", error->message);
+ g_error_free(error);
+ goto fail;
+ }
+
+ caps = rtp_caps_from_sample_spec(ss);
+ if (!caps) {
+ pa_log("Unsupported format to payload");
+ goto fail;
+ }
+
+ g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
+ g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
+ g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
+
+ gst_caps_unref(caps);
+ g_object_unref(socket);
+
+ if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
+ !gst_element_link(depay, appsink)) {
+
+ pa_log("Could not set up receive pipeline");
+ goto fail;
+ }
+
+ g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c);
+
+ if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+ pa_log("Could not start pipeline");
+ goto fail;
+ }
+
+ c->appsink = gst_object_ref(appsink);
+
+ return true;
+
+fail:
+ if (c->pipeline) {
+ gst_object_unref(c->pipeline);
+ } else {
+ /* These weren't yet added to pipeline, so we still have a ref */
+ if (udpsrc)
+ gst_object_unref(udpsrc);
+ if (depay)
+ gst_object_unref(depay);
+ if (rtpbin)
+ gst_object_unref(rtpbin);
+ if (appsink)
+ gst_object_unref(appsink);
+ }
+
+ return false;
+}
+
+/* Called from the GStreamer streaming thread */
+static void appsink_eos(GstAppSink *appsink, gpointer userdata) {
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+ pa_fdsem_post(c->fdsem);
+}
+
+/* Called from the GStreamer streaming thread */
+static GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) {
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+ pa_fdsem_post(c->fdsem);
+
+ return GST_FLOW_OK;
+}
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
+ pa_rtp_context *c = NULL;
+ GstAppSinkCallbacks callbacks = { 0, };
+ GError *error = NULL;
+
+ pa_assert(fd >= 0);
+
+ pa_log_info("Initialising GStreamer RTP backend for receive");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
+ c->fdsem = pa_fdsem_new();
+ c->ss = *ss;
+
+ if (!gst_init_check(NULL, NULL, &error)) {
+ pa_log_error("Could not initialise GStreamer: %s", error->message);
+ g_error_free(error);
+ goto fail;
+ }
+
+ if (!init_receive_pipeline(c, fd, ss))
+ goto fail;
+
+ callbacks.eos = appsink_eos;
+ callbacks.new_sample = appsink_new_sample;
+ gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL);
+
+ return c;
+
+fail:
+ pa_rtp_context_free(c);
+ return NULL;
+}
+
+/* Called from I/O thread context */
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
+ GstSample *sample = NULL;
+ GstBuffer *buf;
+ GstMapInfo info;
+ void *data;
+
+ if (!process_bus_messages(c))
+ goto fail;
+
+ sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
+ if (!sample) {
+ pa_log_warn("Could not get any more data");
+ goto fail;
+ }
+
+ buf = gst_sample_get_buffer(sample);
+
+ if (GST_BUFFER_IS_DISCONT(buf))
+ pa_log_info("Discontinuity detected, possibly lost some packets");
+
+ if (!gst_buffer_map(buf, &info, GST_MAP_READ))
+ goto fail;
+
+ pa_assert(pa_mempool_block_size_max(pool) >= info.size);
+
+ chunk->memblock = pa_memblock_new(pool, info.size);
+ chunk->index = 0;
+ chunk->length = info.size;
+
+ data = pa_memblock_acquire_chunk(chunk);
+ /* TODO: we could probably just provide an allocator and avoid a memcpy */
+ memcpy(data, info.data, info.size);
+ pa_memblock_release(chunk->memblock);
+
+ /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted
+ * to time units (instead of clock-rate units as is in the header) and
+ * wraparound-corrected, and the DTS is the pipeline clock timestamp from
+ * when the buffer was acquired at the source (this is actually the running
+ * time which is why we need to add base time). */
+ *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(buf), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU;
+ pa_timeval_rtstore(tstamp, (GST_BUFFER_DTS(buf) + gst_element_get_base_time(c->pipeline)) / GST_USECOND, false);
+
+ gst_buffer_unmap(buf, &info);
+ gst_sample_unref(sample);
+
+ return 0;
+
+fail:
+ if (sample)
+ gst_sample_unref(sample);
+
+ if (chunk->memblock)
+ pa_memblock_unref(chunk->memblock);
+
+ return -1;
+}
+
+void pa_rtp_context_free(pa_rtp_context *c) {
+ pa_assert(c);
+
+ if (c->appsrc) {
+ gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
+ gst_object_unref(c->appsrc);
+ }
+
+ if (c->appsink)
+ gst_object_unref(c->appsink);
+
+ if (c->pipeline) {
+ gst_element_set_state(c->pipeline, GST_STATE_NULL);
+ gst_object_unref(c->pipeline);
+ }
+
+ if (c->fdsem)
+ pa_fdsem_free(c->fdsem);
+
+ pa_xfree(c);
+}
+
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+ return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem);
+}
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+ return pa_frame_size(&c->ss);
+}
=====================================
src/modules/rtp/rtp.c → src/modules/rtp/rtp-native.c
=====================================
@@ -40,19 +40,40 @@
#include <pulsecore/macro.h>
#include <pulsecore/core-util.h>
#include <pulsecore/arpa-inet.h>
+#include <pulsecore/poll.h>
#include "rtp.h"
-pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssrc, uint8_t payload, size_t frame_size) {
- pa_assert(c);
+typedef struct pa_rtp_context {
+ int fd;
+ uint16_t sequence;
+ uint32_t timestamp;
+ uint32_t ssrc;
+ uint8_t payload;
+ size_t frame_size;
+ size_t mtu;
+
+ uint8_t *recv_buf;
+ size_t recv_buf_size;
+ pa_memchunk memchunk;
+} pa_rtp_context;
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+ pa_rtp_context *c;
+
pa_assert(fd >= 0);
+ pa_log_info("Initialising native RTP backend for send");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
c->fd = fd;
c->sequence = (uint16_t) (rand()*rand());
c->timestamp = 0;
- c->ssrc = ssrc ? ssrc : (uint32_t) (rand()*rand());
+ c->ssrc = (uint32_t) (rand()*rand());
c->payload = (uint8_t) (payload & 127U);
- c->frame_size = frame_size;
+ c->frame_size = pa_frame_size(ss);
+ c->mtu = mtu;
c->recv_buf = NULL;
c->recv_buf_size = 0;
@@ -63,17 +84,16 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr
#define MAX_IOVECS 16
-int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
struct iovec iov[MAX_IOVECS];
pa_memblock* mb[MAX_IOVECS];
int iov_idx = 1;
size_t n = 0;
pa_assert(c);
- pa_assert(size > 0);
pa_assert(q);
- if (pa_memblockq_get_length(q) < size)
+ if (pa_memblockq_get_length(q) < c->mtu)
return 0;
for (;;) {
@@ -84,7 +104,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
- size_t k = n + chunk.length > size ? size - n : chunk.length;
+ size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
pa_assert(chunk.memblock);
@@ -99,7 +119,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
pa_assert(n % c->frame_size == 0);
- if (r < 0 || n >= size || iov_idx >= MAX_IOVECS) {
+ if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) {
uint32_t header[3];
struct msghdr m;
ssize_t k;
@@ -140,7 +160,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
return -1;
}
- if (r < 0 || pa_memblockq_get_length(q) < size)
+ if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
break;
n = 0;
@@ -151,19 +171,25 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
return 0;
}
-pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size) {
- pa_assert(c);
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
+ pa_rtp_context *c;
+
+ pa_log_info("Initialising native RTP backend for receive");
+
+ c = pa_xnew0(pa_rtp_context, 1);
c->fd = fd;
- c->frame_size = frame_size;
+ c->payload = payload;
+ c->frame_size = pa_frame_size(ss);
c->recv_buf_size = 2000;
c->recv_buf = pa_xmalloc(c->recv_buf_size);
pa_memchunk_reset(&c->memchunk);
+
return c;
}
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp) {
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
int size;
size_t audio_length;
size_t metadata_length;
@@ -171,6 +197,8 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct
struct cmsghdr *cm;
struct iovec iov;
uint32_t header;
+ uint32_t ssrc;
+ uint8_t payload;
unsigned cc;
ssize_t r;
uint8_t aux[1024];
@@ -246,12 +274,12 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct
}
memcpy(&header, iov.iov_base, sizeof(uint32_t));
- memcpy(&c->timestamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
- memcpy(&c->ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
+ memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
+ memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
header = ntohl(header);
- c->timestamp = ntohl(c->timestamp);
- c->ssrc = ntohl(c->ssrc);
+ *rtp_tstamp = ntohl(*rtp_tstamp);
+ ssrc = ntohl(c->ssrc);
if ((header >> 30) != 2) {
pa_log_warn("Unsupported RTP version.");
@@ -268,12 +296,22 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct
goto fail;
}
+ if (ssrc != c->ssrc) {
+ pa_log_debug("Got unexpected SSRC");
+ goto fail;
+ }
+
cc = (header >> 24) & 0xF;
- c->payload = (uint8_t) ((header >> 16) & 127U);
+ payload = (uint8_t) ((header >> 16) & 127U);
c->sequence = (uint16_t) (header & 0xFFFFU);
metadata_length = 12 + cc * 4;
+ if (payload != c->payload) {
+ pa_log_debug("Got unexpected payload: %u", payload);
+ goto fail;
+ }
+
if (metadata_length > (unsigned) size) {
pa_log_warn("RTP packet too short. (CSRC)");
goto fail;
@@ -335,80 +373,7 @@ fail:
return -1;
}
-uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
- pa_assert(ss);
-
- if (ss->format == PA_SAMPLE_ULAW && ss->rate == 8000 && ss->channels == 1)
- return 0;
- if (ss->format == PA_SAMPLE_ALAW && ss->rate == 8000 && ss->channels == 1)
- return 8;
- if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 2)
- return 10;
- if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 1)
- return 11;
-
- return 127;
-}
-
-pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) {
- pa_assert(ss);
-
- switch (payload) {
- case 0:
- ss->channels = 1;
- ss->format = PA_SAMPLE_ULAW;
- ss->rate = 8000;
- break;
-
- case 8:
- ss->channels = 1;
- ss->format = PA_SAMPLE_ALAW;
- ss->rate = 8000;
- break;
-
- case 10:
- ss->channels = 2;
- ss->format = PA_SAMPLE_S16BE;
- ss->rate = 44100;
- break;
-
- case 11:
- ss->channels = 1;
- ss->format = PA_SAMPLE_S16BE;
- ss->rate = 44100;
- break;
-
- default:
- return NULL;
- }
-
- return ss;
-}
-
-pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) {
- pa_assert(ss);
-
- if (!pa_rtp_sample_spec_valid(ss))
- ss->format = PA_SAMPLE_S16BE;
-
- pa_assert(pa_rtp_sample_spec_valid(ss));
- return ss;
-}
-
-int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
- pa_assert(ss);
-
- if (!pa_sample_spec_valid(ss))
- return 0;
-
- return
- ss->format == PA_SAMPLE_U8 ||
- ss->format == PA_SAMPLE_ALAW ||
- ss->format == PA_SAMPLE_ULAW ||
- ss->format == PA_SAMPLE_S16BE;
-}
-
-void pa_rtp_context_destroy(pa_rtp_context *c) {
+void pa_rtp_context_free(pa_rtp_context *c) {
pa_assert(c);
pa_assert_se(pa_close(c->fd) == 0);
@@ -417,36 +382,23 @@ void pa_rtp_context_destroy(pa_rtp_context *c) {
pa_memblock_unref(c->memchunk.memblock);
pa_xfree(c->recv_buf);
- c->recv_buf = NULL;
- c->recv_buf_size = 0;
+ pa_xfree(c);
}
-const char* pa_rtp_format_to_string(pa_sample_format_t f) {
- switch (f) {
- case PA_SAMPLE_S16BE:
- return "L16";
- case PA_SAMPLE_U8:
- return "L8";
- case PA_SAMPLE_ALAW:
- return "PCMA";
- case PA_SAMPLE_ULAW:
- return "PCMU";
- default:
- return NULL;
- }
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+ return c->frame_size;
}
-pa_sample_format_t pa_rtp_string_to_format(const char *s) {
- pa_assert(s);
-
- if (pa_streq(s, "L16"))
- return PA_SAMPLE_S16BE;
- else if (pa_streq(s, "L8"))
- return PA_SAMPLE_U8;
- else if (pa_streq(s, "PCMA"))
- return PA_SAMPLE_ALAW;
- else if (pa_streq(s, "PCMU"))
- return PA_SAMPLE_ULAW;
- else
- return PA_SAMPLE_INVALID;
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+ pa_rtpoll_item *item;
+ struct pollfd *p;
+
+ item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
+
+ p = pa_rtpoll_item_get_pollfd(item, NULL);
+ p->fd = c->fd;
+ p->events = POLLIN;
+ p->revents = 0;
+
+ return item;
}
=====================================
src/modules/rtp/rtp.h
=====================================
@@ -25,30 +25,24 @@
#include <sys/types.h>
#include <pulsecore/memblockq.h>
#include <pulsecore/memchunk.h>
+#include <pulsecore/rtpoll.h>
-typedef struct pa_rtp_context {
- int fd;
- uint16_t sequence;
- uint32_t timestamp;
- uint32_t ssrc;
- uint8_t payload;
- size_t frame_size;
+typedef struct pa_rtp_context pa_rtp_context;
- uint8_t *recv_buf;
- size_t recv_buf_size;
- pa_memchunk memchunk;
-} pa_rtp_context;
-
-pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssrc, uint8_t payload, size_t frame_size);
+int pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size);
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss);
/* If the memblockq doesn't have a silence memchunk set, then the caller must
* guarantee that the current read index doesn't point to a hole. */
-int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q);
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q);
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp);
-pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size);
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp);
+void pa_rtp_context_free(pa_rtp_context *c);
-void pa_rtp_context_destroy(pa_rtp_context *c);
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c);
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll);
pa_sample_spec* pa_rtp_sample_spec_fixup(pa_sample_spec *ss);
int pa_rtp_sample_spec_valid(const pa_sample_spec *ss);
=====================================
src/modules/rtp/sdp.c
=====================================
@@ -89,15 +89,6 @@ static pa_sample_spec *parse_sdp_sample_spec(pa_sample_spec *ss, char *c) {
if (pa_startswith(c, "L16/")) {
ss->format = PA_SAMPLE_S16BE;
c += 4;
- } else if (pa_startswith(c, "L8/")) {
- ss->format = PA_SAMPLE_U8;
- c += 3;
- } else if (pa_startswith(c, "PCMA/")) {
- ss->format = PA_SAMPLE_ALAW;
- c += 5;
- } else if (pa_startswith(c, "PCMU/")) {
- ss->format = PA_SAMPLE_ULAW;
- c += 5;
} else
return NULL;
=====================================
src/pulsecore/rtpoll.c
=====================================
@@ -77,7 +77,9 @@ struct pa_rtpoll_item {
int (*work_cb)(pa_rtpoll_item *i);
int (*before_cb)(pa_rtpoll_item *i);
void (*after_cb)(pa_rtpoll_item *i);
- void *userdata;
+ void *work_userdata;
+ void *before_userdata;
+ void *after_userdata;
PA_LLIST_FIELDS(pa_rtpoll_item);
};
@@ -411,7 +413,9 @@ pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsi
i->pollfd = NULL;
i->priority = prio;
- i->userdata = NULL;
+ i->work_userdata = NULL;
+ i->before_userdata = NULL;
+ i->work_userdata = NULL;
i->before_cb = NULL;
i->after_cb = NULL;
i->work_cb = NULL;
@@ -458,42 +462,39 @@ struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
return i->pollfd;
}
-void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
+void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata) {
pa_assert(i);
pa_assert(i->priority < PA_RTPOLL_NEVER);
i->before_cb = before_cb;
+ i->before_userdata = userdata;
}
-void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
+void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata) {
pa_assert(i);
pa_assert(i->priority < PA_RTPOLL_NEVER);
i->after_cb = after_cb;
+ i->after_userdata = userdata;
}
-void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
+void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata) {
pa_assert(i);
pa_assert(i->priority < PA_RTPOLL_NEVER);
i->work_cb = work_cb;
+ i->work_userdata = userdata;
}
-void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
+void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i) {
pa_assert(i);
- i->userdata = userdata;
-}
-
-void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
- pa_assert(i);
-
- return i->userdata;
+ return i->work_userdata;
}
static int fdsem_before(pa_rtpoll_item *i) {
- if (pa_fdsem_before_poll(i->userdata) < 0)
+ if (pa_fdsem_before_poll(i->before_userdata) < 0)
return 1; /* 1 means immediate restart of the loop */
return 0;
@@ -503,7 +504,7 @@ static void fdsem_after(pa_rtpoll_item *i) {
pa_assert(i);
pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
- pa_fdsem_after_poll(i->userdata);
+ pa_fdsem_after_poll(i->after_userdata);
}
pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
@@ -520,9 +521,8 @@ pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio
pollfd->fd = pa_fdsem_get(f);
pollfd->events = POLLIN;
- i->before_cb = fdsem_before;
- i->after_cb = fdsem_after;
- i->userdata = f;
+ pa_rtpoll_item_set_before_callback(i, fdsem_before, f);
+ pa_rtpoll_item_set_after_callback(i, fdsem_after, f);
return i;
}
@@ -530,7 +530,7 @@ pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio
static int asyncmsgq_read_before(pa_rtpoll_item *i) {
pa_assert(i);
- if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
+ if (pa_asyncmsgq_read_before_poll(i->before_userdata) < 0)
return 1; /* 1 means immediate restart of the loop */
return 0;
@@ -540,7 +540,7 @@ static void asyncmsgq_read_after(pa_rtpoll_item *i) {
pa_assert(i);
pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
- pa_asyncmsgq_read_after_poll(i->userdata);
+ pa_asyncmsgq_read_after_poll(i->after_userdata);
}
static int asyncmsgq_read_work(pa_rtpoll_item *i) {
@@ -552,11 +552,11 @@ static int asyncmsgq_read_work(pa_rtpoll_item *i) {
pa_assert(i);
- if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
+ if (pa_asyncmsgq_get(i->work_userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
int ret;
if (!object && code == PA_MESSAGE_SHUTDOWN) {
- pa_asyncmsgq_done(i->userdata, 0);
+ pa_asyncmsgq_done(i->work_userdata, 0);
/* Requests the loop to exit. Will cause the next iteration of
* pa_rtpoll_run() to return 0 */
i->rtpoll->quit = true;
@@ -564,7 +564,7 @@ static int asyncmsgq_read_work(pa_rtpoll_item *i) {
}
ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
- pa_asyncmsgq_done(i->userdata, ret);
+ pa_asyncmsgq_done(i->work_userdata, ret);
return 1;
}
@@ -584,10 +584,9 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priori
pollfd->fd = pa_asyncmsgq_read_fd(q);
pollfd->events = POLLIN;
- i->before_cb = asyncmsgq_read_before;
- i->after_cb = asyncmsgq_read_after;
- i->work_cb = asyncmsgq_read_work;
- i->userdata = q;
+ pa_rtpoll_item_set_before_callback(i, asyncmsgq_read_before, q);
+ pa_rtpoll_item_set_after_callback(i, asyncmsgq_read_after, q);
+ pa_rtpoll_item_set_work_callback(i, asyncmsgq_read_work, q);
return i;
}
@@ -595,7 +594,7 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priori
static int asyncmsgq_write_before(pa_rtpoll_item *i) {
pa_assert(i);
- pa_asyncmsgq_write_before_poll(i->userdata);
+ pa_asyncmsgq_write_before_poll(i->before_userdata);
return 0;
}
@@ -603,7 +602,7 @@ static void asyncmsgq_write_after(pa_rtpoll_item *i) {
pa_assert(i);
pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
- pa_asyncmsgq_write_after_poll(i->userdata);
+ pa_asyncmsgq_write_after_poll(i->after_userdata);
}
pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
@@ -619,10 +618,8 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_prior
pollfd->fd = pa_asyncmsgq_write_fd(q);
pollfd->events = POLLIN;
- i->before_cb = asyncmsgq_write_before;
- i->after_cb = asyncmsgq_write_after;
- i->work_cb = NULL;
- i->userdata = q;
+ pa_rtpoll_item_set_before_callback(i, asyncmsgq_write_before, q);
+ pa_rtpoll_item_set_after_callback(i, asyncmsgq_write_after, q);
return i;
}
=====================================
src/pulsecore/rtpoll.h
=====================================
@@ -80,19 +80,18 @@ struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds);
/* Set the callback that shall be called when there's time to do some work: If the
* callback returns a value > 0, the poll is skipped and the next
* iteration of the loop will start immediately. */
-void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i));
+void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata);
/* Set the callback that shall be called immediately before entering
* the sleeping poll: If the callback returns a value > 0, the poll is
* skipped and the next iteration of the loop will start immediately. */
-void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i));
+void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata);
/* Set the callback that shall be called immediately after having
* entered the sleeping poll */
-void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i));
+void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata);
-void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata);
-void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i);
+void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i);
pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s);
pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
=====================================
src/tests/rtpoll-test.c
=====================================
@@ -48,15 +48,15 @@ START_TEST (rtpoll_test) {
p = pa_rtpoll_new();
i = pa_rtpoll_item_new(p, PA_RTPOLL_EARLY, 1);
- pa_rtpoll_item_set_before_callback(i, before);
- pa_rtpoll_item_set_after_callback(i, after);
+ pa_rtpoll_item_set_before_callback(i, before, NULL);
+ pa_rtpoll_item_set_after_callback(i, after, NULL);
pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
pollfd->fd = 0;
pollfd->events = POLLIN;
w = pa_rtpoll_item_new(p, PA_RTPOLL_NORMAL, 0);
- pa_rtpoll_item_set_before_callback(w, worker);
+ pa_rtpoll_item_set_before_callback(w, worker, NULL);
pa_rtpoll_set_timer_relative(p, 10000000); /* 10 s */
@@ -65,8 +65,8 @@ START_TEST (rtpoll_test) {
pa_rtpoll_item_free(i);
i = pa_rtpoll_item_new(p, PA_RTPOLL_EARLY, 1);
- pa_rtpoll_item_set_before_callback(i, before);
- pa_rtpoll_item_set_after_callback(i, after);
+ pa_rtpoll_item_set_before_callback(i, before, NULL);
+ pa_rtpoll_item_set_after_callback(i, after, NULL);
pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
pollfd->fd = 0;
View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/compare/7670dffe596170eedb083526857abf21484ff8ad...a17cc55c4c9a0901d527d21c00093fd57a2847b5
--
View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/compare/7670dffe596170eedb083526857abf21484ff8ad...a17cc55c4c9a0901d527d21c00093fd57a2847b5
You're receiving this email because of your account on gitlab.freedesktop.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/pulseaudio-commits/attachments/20191108/483e9f86/attachment-0001.html>
More information about the pulseaudio-commits
mailing list