[pulseaudio-discuss] [PATCH v6 10/37] raop: Packet retransmission support for UDP
Hajime Fujita
crisp.fujita at gmail.com
Sun Jan 31 20:16:07 PST 2016
From: Matthias Wabersich <pulseaudio at niafc.de>
This patch adds an RTP audio packet retransmission support and a
circular buffer implementation for it.
This patch was originally written by Matthias Wabersich [1] and
later debugged and integrated into the latest tree by Hajime Fujita
[1]: https://bugs.freedesktop.org/show_bug.cgi?id=42804#c44
---
src/Makefile.am | 3 +-
src/modules/raop/raop_client.c | 109 ++++++++++++++++++---
src/modules/raop/raop_client.h | 2 +
src/modules/raop/raop_packet_buffer.c | 172 ++++++++++++++++++++++++++++++++++
src/modules/raop/raop_packet_buffer.h | 42 +++++++++
5 files changed, 316 insertions(+), 12 deletions(-)
create mode 100644 src/modules/raop/raop_packet_buffer.c
create mode 100644 src/modules/raop/raop_packet_buffer.h
diff --git a/src/Makefile.am b/src/Makefile.am
index b0ca2bc..d60cc19 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1133,7 +1133,8 @@ librtp_la_LIBADD = $(AM_LIBADD) libpulsecore- at PA_MAJORMINOR@.la libpulsecommon-@
libraop_la_SOURCES = \
modules/raop/raop_client.c modules/raop/raop_client.h \
- modules/raop/base64.c modules/raop/base64.h
+ modules/raop/base64.c modules/raop/base64.h \
+ modules/raop/raop_packet_buffer.h modules/raop/raop_packet_buffer.c
libraop_la_CFLAGS = $(AM_CFLAGS) $(OPENSSL_CFLAGS) -I$(top_srcdir)/src/modules/rtp
libraop_la_LDFLAGS = $(AM_LDFLAGS) $(AM_LIBLDFLAGS) -avoid-version
libraop_la_LIBADD = $(AM_LIBADD) $(OPENSSL_LIBS) libpulsecore- at PA_MAJORMINOR@.la librtp.la libpulsecommon- at PA_MAJORMINOR@.la libpulse.la
diff --git a/src/modules/raop/raop_client.c b/src/modules/raop/raop_client.c
index c0be2ec..1c6c49e 100644
--- a/src/modules/raop/raop_client.c
+++ b/src/modules/raop/raop_client.c
@@ -59,7 +59,8 @@
#include "rtsp_client.h"
#include "base64.h"
-#define UDP_FRAMES_PER_PACKET 352
+#include "raop_packet_buffer.h"
+
#define AES_CHUNKSIZE 16
#define JACK_STATUS_DISCONNECTED 0
@@ -77,6 +78,8 @@
#define UDP_DEFAULT_CONTROL_PORT 6001
#define UDP_DEFAULT_TIMING_PORT 6002
+#define UDP_DEFAULT_PKT_BUF_SIZE 1000
+
typedef enum {
UDP_PAYLOAD_TIMING_REQUEST = 0x52,
UDP_PAYLOAD_TIMING_RESPONSE = 0x53,
@@ -140,6 +143,8 @@ struct pa_raop_client {
pa_raop_client_disconnected_cb_t udp_disconnected_callback;
void *udp_disconnected_userdata;
+
+ pa_raop_packet_buffer *packet_buffer;
};
/* Timming packet header (8x8):
@@ -530,12 +535,35 @@ static void udp_build_audio_header(pa_raop_client *c, uint32_t *buffer, size_t s
buffer[2] = htonl(c->udp_ssrc);
}
-static ssize_t udp_send_audio_packet(pa_raop_client *c, uint8_t *buffer, size_t size) {
- ssize_t length;
+/* Audio retransmission header:
+ * [0] RTP v2: 0x80
+ * [1] Payload type: 0x56 + 0x80 (marker == on)
+ * [2] Unknown; seems always 0x01
+ * [3] Unknown; seems some random number around 0x20~0x40
+ * [4,5] Original RTP header htons(0x8060)
+ * [6,7] Packet sequence number to be retransmitted
+ * [8,11] Original RTP timestamp on the lost packet */
+static void udp_build_retrans_header(uint32_t *buffer, size_t size, uint16_t seq_num) {
+ uint8_t x = 0x30; /* FIXME: what's this?? */
+
+ pa_assert(size >= sizeof(uint32_t) * 2);
+
+ buffer[0] = htonl((uint32_t) 0x80000000
+ | ((uint32_t) UDP_PAYLOAD_RETRANSMIT_REPLY | 0x80) << 16
+ | 0x0100
+ | x);
+ buffer[1] = htonl((uint32_t) 0x80600000 | seq_num);
+}
- length = pa_write(c->udp_stream_fd, buffer, size, NULL);
- c->seq++;
+static ssize_t udp_send_audio_packet(pa_raop_client *c, bool retrans, uint8_t *buffer, size_t size) {
+ ssize_t length;
+ int fd = retrans ? c->udp_control_fd : c->udp_stream_fd;
+ length = pa_write(fd, buffer, size, NULL);
+ if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ pa_log_debug("Discarding audio packet %d due to EAGAIN", c->seq);
+ length = size;
+ }
return length;
}
@@ -947,6 +975,8 @@ static void udp_rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist
pa_log_debug("RTSP control channel closed (teardown)");
+ pa_raop_pb_clear(c->packet_buffer);
+
pa_rtsp_client_free(c->rtsp);
pa_xfree(c->sid);
c->rtsp = NULL;
@@ -983,6 +1013,8 @@ static void udp_rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist
pa_log_debug("RTSP control channel closed (disconnected)");
+ pa_raop_pb_clear(c->packet_buffer);
+
pa_rtsp_client_free(c->rtsp);
pa_xfree(c->sid);
c->rtsp = NULL;
@@ -1046,7 +1078,8 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot
pa_raop_client_free(c);
return NULL;
}
- }
+ } else
+ c->packet_buffer = pa_raop_pb_new(UDP_DEFAULT_PKT_BUF_SIZE);
return c;
}
@@ -1054,6 +1087,7 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot
void pa_raop_client_free(pa_raop_client *c) {
pa_assert(c);
+ pa_raop_pb_delete(c->packet_buffer);
if (c->rtsp)
pa_rtsp_client_free(c->rtsp);
if (c->sid)
@@ -1170,14 +1204,48 @@ int pa_raop_client_udp_handle_timing_packet(pa_raop_client *c, const uint8_t pac
return rv;
}
+static int udp_resend_packets(pa_raop_client *c, uint16_t seq_num, uint16_t num_packets) {
+ int rv = -1;
+ uint8_t *data = NULL;
+ ssize_t len = 0;
+ int i = 0;
+
+ pa_assert(c);
+ pa_assert(num_packets > 0);
+ pa_assert(c->packet_buffer);
+
+ for (i = seq_num; i < seq_num + num_packets; i++) {
+ len = pa_raop_pb_read_packet(c->packet_buffer, i, (uint8_t **) &data);
+
+ if (len > 0) {
+ ssize_t r;
+
+ /* Obtained buffer has a header room for retransmission
+ header */
+ udp_build_retrans_header((uint32_t *) data, len, seq_num);
+ r = udp_send_audio_packet(c, true /* retrans */, data, len);
+ if (r == len)
+ rv = 0;
+ else
+ rv = -1;
+ } else
+ pa_log_debug("Packet not found in retrans buffer: %u", i);
+ }
+
+ return rv;
+}
+
int pa_raop_client_udp_handle_control_packet(pa_raop_client *c, const uint8_t packet[], ssize_t size) {
uint8_t payload = 0;
int rv = 0;
+ uint16_t seq_num;
+ uint16_t num_packets;
+
pa_assert(c);
pa_assert(packet);
- if (size != 20 || packet[0] != 0x80)
+ if ((size != 20 && size != 8) || packet[0] != 0x80)
{
pa_log_debug("Received an invalid control packet.");
return 1;
@@ -1188,12 +1256,24 @@ int pa_raop_client_udp_handle_control_packet(pa_raop_client *c, const uint8_t pa
payload = packet[1] ^ 0x80;
switch (payload) {
case UDP_PAYLOAD_RETRANSMIT_REQUEST:
- /* Packet retransmission not implemented yet... */
- /* rv = ... */
+ pa_assert(size == 8);
+
+ /* Requested start sequence number */
+ seq_num = ((uint16_t) packet[4]) << 8;
+ seq_num |= (uint16_t) packet[5];
+ /* Number of requested packets starting at requested seq. number */
+ num_packets = (uint16_t) packet[6] << 8;
+ num_packets |= (uint16_t) packet[7];
+ pa_log_debug("Resending %d packets starting at %d", num_packets, seq_num);
+ rv = udp_resend_packets(c, seq_num, num_packets);
break;
+
case UDP_PAYLOAD_RETRANSMIT_REPLY:
+ pa_log_debug("Received a retransmit reply packet on control port (this should never happen)");
+ break;
+
default:
- pa_log_debug("Got an unexpected payload type on control channel !");
+ pa_log_debug("Got an unexpected payload type on control channel: %u !", payload);
return 1;
}
@@ -1230,7 +1310,14 @@ ssize_t pa_raop_client_udp_send_audio_packet(pa_raop_client *c, pa_memchunk *blo
pa_assert(buf);
pa_assert(block->length > 0);
udp_build_audio_header(c, (uint32_t *) (buf + block->index), block->length);
- len = udp_send_audio_packet(c, buf + block->index, block->length);
+ len = udp_send_audio_packet(c, false, buf + block->index, block->length);
+
+ /* Store packet for resending in the packet buffer */
+ pa_raop_pb_write_packet(c->packet_buffer, c->seq, buf + block->index,
+ block->length);
+
+ c->seq++;
+
pa_memblock_release(block->memblock);
if (len > 0) {
diff --git a/src/modules/raop/raop_client.h b/src/modules/raop/raop_client.h
index 36be8dc..578e9d0 100644
--- a/src/modules/raop/raop_client.h
+++ b/src/modules/raop/raop_client.h
@@ -25,6 +25,8 @@
#include <pulsecore/core.h>
#include <pulsecore/memchunk.h>
+#define UDP_FRAMES_PER_PACKET 352
+
typedef enum pa_raop_protocol {
RAOP_TCP,
RAOP_UDP,
diff --git a/src/modules/raop/raop_packet_buffer.c b/src/modules/raop/raop_packet_buffer.c
new file mode 100644
index 0000000..b8c1bc8
--- /dev/null
+++ b/src/modules/raop/raop_packet_buffer.c
@@ -0,0 +1,172 @@
+/***
+ Circular buffer for RTP audio packets with random access support
+ by RTP sequence number.
+
+ Copyright 2013 Matthias Wabersich, Hajime Fujita
+
+ This is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ This is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+
+***/
+
+#include <stdlib.h>
+#include <limits.h>
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <pulsecore/core-error.h>
+#include "raop_client.h"
+
+#include "raop_packet_buffer.h"
+
+/* FRAMES_PER_PACKET*2*2 + sizeof(udp_audio_header) + sizeof(ALAC header), unencoded */
+#define PACKET_SIZE_MAX (UDP_FRAMES_PER_PACKET*2*2 + 12 + 7)
+/* Header room for packet retransmission header */
+#define RETRANS_HEADER_ROOM 4
+
+/* Packet element */
+struct pa_raop_packet_element {
+ uint16_t seq_num; /* RTP sequence number (in host byte order) */
+ ssize_t length; /* Actual packet length */
+ /* Packet data including RTP header */
+ uint8_t data[PACKET_SIZE_MAX + RETRANS_HEADER_ROOM];
+};
+
+/* Buffer struct */
+struct pa_raop_packet_buffer {
+ size_t size; /* max number of packets in buffer */
+ size_t start; /* index of oldest packet */
+ size_t count; /* number of packets in buffer */
+ uint16_t first_seq_num; /* Sequence number of first packet in buffer */
+ uint16_t latest_seq_num; /* Debug purpose */
+ pa_raop_packet_element *packets; /* Packet element pointer */
+};
+
+pa_raop_packet_buffer *pa_raop_pb_new(size_t size) {
+ pa_raop_packet_buffer *pb = pa_xmalloc0(sizeof(*pb));
+
+ pb->size = size;
+ pb->packets = (pa_raop_packet_element *)
+ pa_xmalloc(size * sizeof(pa_raop_packet_element));
+
+ pa_raop_pb_clear(pb);
+
+ return pb;
+}
+
+void pa_raop_pb_clear(pa_raop_packet_buffer *pb) {
+ pb->start = 0;
+ pb->count = 0;
+ pb->first_seq_num = 0;
+ pb->latest_seq_num = 0;
+ memset(pb->packets, 0, pb->size * sizeof(pa_raop_packet_element));
+}
+
+void pa_raop_pb_delete(pa_raop_packet_buffer *pb) {
+ pa_xfree(pb->packets);
+ pa_xfree(pb);
+}
+
+static int pb_is_full(pa_raop_packet_buffer *pb) {
+ return pb->count == pb->size;
+}
+
+static int pb_is_empty(pa_raop_packet_buffer *pb) {
+ return pb->count == 0;
+}
+
+static pa_raop_packet_element *pb_prepare_write(pa_raop_packet_buffer *pb, uint16_t seq) {
+ size_t end = (pb->start + pb->count) % pb->size;
+ pa_raop_packet_element *packet;
+
+ /* Set first packet sequence number in buffer if buffer is empty */
+ if (pb_is_empty(pb))
+ pb->first_seq_num = seq;
+ else
+ pa_assert((uint16_t) (pb->latest_seq_num + 1) == seq);
+
+ packet = &pb->packets[end];
+
+ if (pb_is_full(pb)) {
+ pb->start = (pb->start + 1) % pb->size; /* full, overwrite */
+
+ /* Set first packet sequence number in buffer
+ to new start packet sequence number */
+ pb->first_seq_num = pb->packets[pb->start].seq_num;
+ } else
+ ++ pb->count;
+
+ pb->latest_seq_num = seq;
+
+ return packet;
+}
+
+/* Write packet data to packet buffer */
+void pa_raop_pb_write_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, const uint8_t *packet_data, ssize_t packet_length) {
+ pa_raop_packet_element *packet;
+
+ pa_assert(pb);
+ pa_assert(packet_data);
+ pa_assert(packet_length <= PACKET_SIZE_MAX);
+
+ packet = pb_prepare_write(pb, seq_num);
+ packet->seq_num = seq_num;
+ packet->length = packet_length + RETRANS_HEADER_ROOM;
+
+ /* Insert RETRANS_HEADER_ROOM bytes in front of packet data,
+ for retransmission header */
+ memset(packet->data, 0, RETRANS_HEADER_ROOM);
+ memcpy(packet->data + RETRANS_HEADER_ROOM, packet_data, packet_length);
+}
+
+/* l < r?, considers wrapping */
+static bool seq_lt(uint16_t l, uint16_t r) {
+ return l - r > USHRT_MAX/2;
+}
+
+/* Random access to packet from buffer by sequence number for (re-)sending. */
+ssize_t pa_raop_pb_read_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, uint8_t **packet_data) {
+ uint16_t index = 0; /* Index of requested packet */
+ pa_raop_packet_element *packet;
+
+ /* If the buffer is empty, there is no use in calculating indices */
+ if (pb_is_empty(pb))
+ return -1;
+
+ /* If the requested packet is too old (seq_num below first seq number
+ in buffer) or too young (seq_num greater than current seq number),
+ do nothing and return */
+ if (seq_lt(seq_num, pb->first_seq_num))
+ return -1;
+
+ index = (uint16_t) (seq_num - pb->first_seq_num);
+ if (index >= pb->count)
+ return -1;
+
+ /* Index of the requested packet in the buffer is calculated
+ using the first sequence number stored in the buffer.
+ The offset (seq_num - first_seq_num) is used to access the array. */
+ packet = &pb->packets[(pb->start + index) % pb->size];
+
+ pa_assert(packet->data[RETRANS_HEADER_ROOM + 2] == (seq_num >> 8));
+ pa_assert(packet->data[RETRANS_HEADER_ROOM + 3] == (seq_num & 0xff));
+ pa_assert(packet_data);
+
+ *packet_data = packet->data;
+
+ return packet->length;
+}
diff --git a/src/modules/raop/raop_packet_buffer.h b/src/modules/raop/raop_packet_buffer.h
new file mode 100644
index 0000000..d8a08a0
--- /dev/null
+++ b/src/modules/raop/raop_packet_buffer.h
@@ -0,0 +1,42 @@
+#ifndef RAOP_PACKET_BUFFER_H_INCLUDED
+#define RAOP_PACKET_BUFFER_H_INCLUDED
+
+/***
+ Circular buffer for RTP audio packets with random access support
+ by RTP sequence number.
+
+ Copyright 2013 Matthias Wabersich, Hajime Fujita
+
+ This is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ This is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+
+***/
+
+struct pa_raop_packet_element;
+typedef struct pa_raop_packet_element pa_raop_packet_element;
+
+struct pa_raop_packet_buffer;
+typedef struct pa_raop_packet_buffer pa_raop_packet_buffer;
+
+/* Allocates a new circular packet buffer
+ size: Maximum number of packets to store */
+pa_raop_packet_buffer *pa_raop_pb_new(size_t size);
+void pa_raop_pb_clear(pa_raop_packet_buffer *pb);
+void pa_raop_pb_delete(pa_raop_packet_buffer *pb);
+
+void pa_raop_pb_write_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, const uint8_t *packet_data, ssize_t packet_length);
+ssize_t pa_raop_pb_read_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, uint8_t **packet_data);
+
+#endif /* RAOP_PACKET_BUFFER_H_INCLUDED */
--
2.5.0
More information about the pulseaudio-discuss
mailing list