[pulseaudio-discuss] [PATCH 52/56] bluetooth: Create I/O thread function for BlueZ 5 cards

Tanu Kaskinen tanu.kaskinen at linux.intel.com
Thu Jul 25 02:29:38 PDT 2013


On Fri, 2013-07-12 at 15:07 -0300, jprvita at gmail.com wrote:
> From: João Paulo Rechi Vita <jprvita at openbossa.org>
> 
> Create the thread function, the render and push functions for A2DP, the
> process message function for communication between the I/O thread and
> the main thread, and other helper functions related to them.
> ---
>  src/modules/bluetooth/module-bluez5-device.c | 637 +++++++++++++++++++++++++++
>  1 file changed, 637 insertions(+)
> 
> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
> index d0971d5..9a5534c 100644
> --- a/src/modules/bluetooth/module-bluez5-device.c
> +++ b/src/modules/bluetooth/module-bluez5-device.c
> @@ -24,19 +24,29 @@
>  #include <config.h>
>  #endif
>  
> +#include <errno.h>
> +
> +#include <arpa/inet.h>
>  #include <sbc/sbc.h>
>  
> +#include <pulse/rtclock.h>
> +#include <pulse/timeval.h>
> +
> +#include <pulsecore/core-error.h>
>  #include <pulsecore/core-util.h>
>  #include <pulsecore/i18n.h>
>  #include <pulsecore/module.h>
>  #include <pulsecore/modargs.h>
>  #include <pulsecore/poll.h>
>  #include <pulsecore/rtpoll.h>
> +#include <pulsecore/socket-util.h>
>  #include <pulsecore/thread.h>
>  #include <pulsecore/thread-mq.h>
> +#include <pulsecore/time-smoother.h>
>  
>  #include "a2dp-codecs.h"
>  #include "bluez5-util.h"
> +#include "rtp.h"
>  
>  #include "module-bluez5-device-symdef.h"
>  
> @@ -46,11 +56,30 @@ PA_MODULE_VERSION(PACKAGE_VERSION);
>  PA_MODULE_LOAD_ONCE(false);
>  PA_MODULE_USAGE("path=<device object path>");
>  
> +#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
> +#define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
> +#define FIXED_LATENCY_RECORD_A2DP   (25 * PA_USEC_PER_MSEC)
> +
> +#define BITPOOL_DEC_LIMIT 32
> +#define BITPOOL_DEC_STEP 5
> +
>  static const char* const valid_modargs[] = {
>      "path",
>      NULL
>  };
>  
> +enum {
> +    BLUETOOTH_MESSAGE_IO_THREAD_FAILED,
> +    BLUETOOTH_MESSAGE_MAX
> +};
> +
> +typedef struct bluetooth_msg {
> +    pa_msgobject parent;
> +    pa_card *card;
> +} bluetooth_msg;
> +PA_DEFINE_PRIVATE_CLASS(bluetooth_msg, pa_msgobject);
> +#define BLUETOOTH_MSG(o) (bluetooth_msg_cast(o))
> +
>  typedef struct sbc_info {
>      sbc_t sbc;                           /* Codec data */
>      bool sbc_initialized;                /* Keep track if the encoder is initialized */
> @@ -86,12 +115,19 @@ struct userdata {
>      pa_thread_mq thread_mq;
>      pa_rtpoll *rtpoll;
>      pa_rtpoll_item *rtpoll_item;
> +    bluetooth_msg *msg;
>  
>      int stream_fd;
> +    int stream_write_type;
>      size_t read_link_mtu;
>      size_t write_link_mtu;
>      size_t read_block_size;
>      size_t write_block_size;
> +    uint64_t read_index;
> +    uint64_t write_index;
> +    pa_usec_t started_at;
> +    pa_smoother *read_smoother;
> +    pa_memchunk write_memchunk;
>      pa_sample_spec sample_spec;
>      struct sbc_info sbc_info;
>  };
> @@ -210,6 +246,320 @@ static void connect_ports(struct userdata *u, void *new_data, pa_direction_t dir
>      }
>  }
>  
> +/* Run from IO thread */
> +static void a2dp_prepare_buffer(struct userdata *u) {
> +    size_t min_buffer_size = PA_MAX(u->read_link_mtu, u->write_link_mtu);
> +
> +    pa_assert(u);
> +
> +    if (u->sbc_info.buffer_size >= min_buffer_size)
> +        return;
> +
> +    u->sbc_info.buffer_size = 2 * min_buffer_size;
> +    pa_xfree(u->sbc_info.buffer);
> +    u->sbc_info.buffer = pa_xmalloc(u->sbc_info.buffer_size);
> +}
> +
> +/* Run from IO thread */
> +static int a2dp_process_render(struct userdata *u) {
> +    struct sbc_info *sbc_info;
> +    struct rtp_header *header;
> +    struct rtp_payload *payload;
> +    size_t nbytes;
> +    void *d;
> +    const void *p;
> +    size_t to_write, to_encode;
> +    unsigned frame_count;
> +    int ret = 0;
> +
> +    pa_assert(u);
> +    pa_assert(u->profile == PROFILE_A2DP_SINK);
> +    pa_assert(u->sink);
> +
> +    /* First, render some data */
> +    if (!u->write_memchunk.memblock)
> +        pa_sink_render_full(u->sink, u->write_block_size, &u->write_memchunk);
> +
> +    pa_assert(u->write_memchunk.length == u->write_block_size);

u->write_block_size may change due to bitpool reduction. If there was
audio left in write_memchunk due to an earlier EAGAIN error, this
assertion will crash. a2dp_process_render() needs to prepare for
write_block_size changing between 

> +
> +    a2dp_prepare_buffer(u);
> +
> +    sbc_info = &u->sbc_info;
> +    header = sbc_info->buffer;
> +    payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header));
> +
> +    frame_count = 0;
> +
> +    /* Try to create a packet of the full MTU */
> +
> +    p = (const uint8_t *) pa_memblock_acquire_chunk(&u->write_memchunk);
> +    to_encode = u->write_memchunk.length;
> +
> +    d = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload);
> +    to_write = sbc_info->buffer_size - sizeof(*header) - sizeof(*payload);

This assumes that sbc_info->buffer_size is big enough to hold at least
rtp_header and rtp_payload structs. buffer_size is determined by the
MTU, and there seems to be no check that the MTU values are sensible.

> +
> +    while (PA_LIKELY(to_encode > 0 && to_write > 0)) {
> +        ssize_t written;
> +        ssize_t encoded;
> +
> +        encoded = sbc_encode(&sbc_info->sbc,
> +                             p, to_encode,
> +                             d, to_write,
> +                             &written);
> +
> +        if (PA_UNLIKELY(encoded <= 0)) {
> +            pa_log_error("SBC encoding error (%li)", (long) encoded);
> +            pa_memblock_release(u->write_memchunk.memblock);
> +            return -1;
> +        }
> +
> +        pa_assert_fp((size_t) encoded <= to_encode);
> +        pa_assert_fp((size_t) encoded == sbc_info->codesize);
> +
> +        pa_assert_fp((size_t) written <= to_write);
> +        pa_assert_fp((size_t) written == sbc_info->frame_length);

These equality assertions make me nervous. I suggest adding a comment:

        /* These assertions are safe, because sbc_encode() encodes exactly one
         * input block to exactly one output block (documented). If the buffer
         * sizes are too small, sbc_encode() returns zero or an error (not
         * documented), which we catch above. */
        pa_assert_fp((size_t) encoded <= to_encode);
        pa_assert_fp((size_t) encoded == sbc_info->codesize);

        pa_assert_fp((size_t) written <= to_write);
        pa_assert_fp((size_t) written == sbc_info->frame_length);

> +
> +        p = (const uint8_t*) p + encoded;
> +        to_encode -= encoded;
> +
> +        d = (uint8_t*) d + written;
> +        to_write -= written;
> +
> +        frame_count++;
> +    }
> +
> +    pa_memblock_release(u->write_memchunk.memblock);
> +
> +    pa_assert(to_encode == 0);
> +
> +    PA_ONCE_BEGIN {
> +        pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&sbc_info->sbc)));
> +    } PA_ONCE_END;
> +
> +    /* write it to the fifo */
> +    memset(sbc_info->buffer, 0, sizeof(*header) + sizeof(*payload));
> +    header->v = 2;
> +    header->pt = 1;
> +    header->sequence_number = htons(sbc_info->seq_num++);
> +    header->timestamp = htonl(u->write_index / pa_frame_size(&u->sample_spec));
> +    header->ssrc = htonl(1);
> +    payload->frame_count = frame_count;
> +
> +    nbytes = (uint8_t*) d - (uint8_t*) sbc_info->buffer;
> +
> +    for (;;) {
> +        ssize_t l;
> +
> +        l = pa_write(u->stream_fd, sbc_info->buffer, nbytes, &u->stream_write_type);
> +
> +        pa_assert(l != 0);
> +
> +        if (l < 0) {
> +
> +            if (errno == EINTR)
> +                /* Retry right away if we got interrupted */
> +                continue;
> +
> +            else if (errno == EAGAIN)
> +                /* Hmm, apparently the socket was not writable, give up for now */
> +                break;
> +
> +            pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno));
> +            ret = -1;
> +            break;
> +        }
> +
> +        pa_assert((size_t) l <= nbytes);
> +
> +        if ((size_t) l != nbytes) {
> +            pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.",
> +                        (unsigned long long) l,
> +                        (unsigned long long) nbytes);
> +            ret = -1;
> +            break;
> +        }
> +
> +        u->write_index += (uint64_t) u->write_memchunk.length;
> +        pa_memblock_unref(u->write_memchunk.memblock);
> +        pa_memchunk_reset(&u->write_memchunk);
> +
> +        ret = 1;
> +
> +        break;
> +    }
> +
> +    return ret;
> +}
> +
> +/* Run from IO thread */
> +static int a2dp_process_push(struct userdata *u) {
> +    int ret = 0;
> +    pa_memchunk memchunk;
> +
> +    pa_assert(u);
> +    pa_assert(u->profile == PROFILE_A2DP_SOURCE);
> +    pa_assert(u->source);
> +    pa_assert(u->read_smoother);
> +
> +    memchunk.memblock = pa_memblock_new(u->core->mempool, u->read_block_size);
> +    memchunk.index = memchunk.length = 0;
> +
> +    for (;;) {
> +        bool found_tstamp = false;
> +        pa_usec_t tstamp;
> +        struct sbc_info *sbc_info;
> +        struct rtp_header *header;
> +        struct rtp_payload *payload;
> +        const void *p;
> +        void *d;
> +        ssize_t l;
> +        size_t to_write, to_decode;
> +
> +        a2dp_prepare_buffer(u);
> +
> +        sbc_info = &u->sbc_info;
> +        header = sbc_info->buffer;
> +        payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header));
> +
> +        l = pa_read(u->stream_fd, sbc_info->buffer, sbc_info->buffer_size, &u->stream_write_type);

The code assumes later that what we read here is exactly one complete
rtp packet, nothing more, nothing less. I don't understand how such
assumption can be done. I think we should parse the rtp header to find
the packet boundary, and buffer any incomplete packets for later
reading. Or perhaps recvmsg() should be used (I guess that's needed also
for reading the timestamp, which is currently a TODO item).

> +
> +        if (l <= 0) {
> +
> +            if (l < 0 && errno == EINTR)
> +                /* Retry right away if we got interrupted */
> +                continue;
> +
> +            else if (l < 0 && errno == EAGAIN)
> +                /* Hmm, apparently the socket was not readable, give up for now. */
> +                break;
> +
> +            pa_log_error("Failed to read data from socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF");
> +            ret = -1;
> +            break;
> +        }
> +
> +        pa_assert((size_t) l <= sbc_info->buffer_size);
> +
> +        u->read_index += (uint64_t) l;
> +
> +        /* TODO: get timestamp from rtp */
> +        if (!found_tstamp) {
> +            /* pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!"); */
> +            tstamp = pa_rtclock_now();
> +        }

There are two kind of timestamps being talked about here: RTP timestamps
and socket message timestamps. I don't know what we would use the socket
message timestamps for, but I think supporting RTP timestamps would be
very important, because it would allow us to detect dropped packets. We
should generate silence as a substitute for the lost packets. Not
detecting dropped packets means that our timing information gets messed
up, making it impossible for module-loopback to work properly.

Are the RTP timestamp semantics defined well enough so that we could
know how much silence we need to generate? We seem to use the number of
written samples in our outgoing packets, is that what we can expect also
from incoming packets?

> +
> +        pa_smoother_put(u->read_smoother, tstamp, pa_bytes_to_usec(u->read_index, &u->sample_spec));
> +        pa_smoother_resume(u->read_smoother, tstamp, true);
> +
> +        p = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload);
> +        to_decode = l - sizeof(*header) - sizeof(*payload);

to_decode is unsigned, so this assumes that we read at least
sizeof(*header) + sizeof(*payload) amount of data. That is not checked.

> +
> +        d = pa_memblock_acquire(memchunk.memblock);
> +        to_write = memchunk.length = pa_memblock_get_length(memchunk.memblock);
> +
> +        while (PA_LIKELY(to_decode > 0)) {
> +            size_t written;
> +            ssize_t decoded;
> +
> +            decoded = sbc_decode(&sbc_info->sbc,
> +                                 p, to_decode,
> +                                 d, to_write,
> +                                 &written);
> +
> +            if (PA_UNLIKELY(decoded <= 0)) {
> +                pa_log_error("SBC decoding error (%li)", (long) decoded);
> +                pa_memblock_release(memchunk.memblock);
> +                pa_memblock_unref(memchunk.memblock);
> +                return -1;
> +            }
> +
> +            /* Reset frame length, it can be changed due to bitpool change */
> +            sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc);

read_block_size is derived from the frame length, so that needs to be
updated too. And the source fixed latency depends on read_block_size.
And so does the buffer length that was passed to sbc_decode, so it may
happen that we passed too small buffer to sbc_decode().

> +
> +            pa_assert_fp((size_t) decoded <= to_decode);
> +            pa_assert_fp((size_t) decoded == sbc_info->frame_length);
> +
> +            pa_assert_fp((size_t) written == sbc_info->codesize);

This will crash if the output buffer was too small (an undocumented
feature of sbc_decode is that it truncates the output if the given
buffer is too small, instead of failing like sbc_encode() does).
Removing this assertion isn't the right fix, though, the right fix is to
ensure that we have big enough output buffer.

> +
> +            p = (const uint8_t*) p + decoded;
> +            to_decode -= decoded;
> +
> +            d = (uint8_t*) d + written;
> +            to_write -= written;
> +        }
> +
> +        memchunk.length -= to_write;
> +
> +        pa_memblock_release(memchunk.memblock);
> +
> +        pa_source_post(u->source, &memchunk);
> +
> +        ret = l;
> +        break;
> +    }
> +
> +    pa_memblock_unref(memchunk.memblock);
> +
> +    return ret;
> +}
> +
> +/* Run from I/O thread */
> +static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
> +    struct sbc_info *sbc_info;
> +
> +    pa_assert(u);
> +
> +    sbc_info = &u->sbc_info;
> +
> +    if (sbc_info->sbc.bitpool == bitpool)
> +        return;
> +
> +    if (bitpool > sbc_info->max_bitpool)
> +        bitpool = sbc_info->max_bitpool;
> +    else if (bitpool < sbc_info->min_bitpool)
> +        bitpool = sbc_info->min_bitpool;
> +
> +    sbc_info->sbc.bitpool = bitpool;
> +
> +    sbc_info->codesize = sbc_get_codesize(&sbc_info->sbc);
> +    sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc);
> +
> +    pa_log_debug("Bitpool has changed to %u", sbc_info->sbc.bitpool);

Could you calculate the resulting SBC bitrate and add it to the log
message? I think that would be nice to see in the log, because the
bitpool value doesn't really tell anything meaningful to the vast
majority of people (including me). If you do that, log the bitrate also
in transport_config() and when the bitpool changes in the a2dp_source
mode.

> +
> +    u->read_block_size =
> +        (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload))
> +        / sbc_info->frame_length * sbc_info->codesize;
> +
> +    u->write_block_size =
> +        (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload))
> +        / sbc_info->frame_length * sbc_info->codesize;
> +
> +    pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
> +    pa_sink_set_fixed_latency_within_thread(u->sink,
> +            FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
> +}
> +
> +/* Run from I/O thread */
> +static void a2dp_reduce_bitpool(struct userdata *u) {
> +    struct sbc_info *sbc_info;
> +    uint8_t bitpool;
> +
> +    pa_assert(u);
> +
> +    sbc_info = &u->sbc_info;
> +
> +    /* Check if bitpool is already at its limit */
> +    if (sbc_info->sbc.bitpool <= BITPOOL_DEC_LIMIT)
> +        return;
> +
> +    bitpool = sbc_info->sbc.bitpool - BITPOOL_DEC_STEP;
> +
> +    if (bitpool < BITPOOL_DEC_LIMIT)
> +        bitpool = BITPOOL_DEC_LIMIT;
> +
> +    a2dp_set_bitpool(u, bitpool);
> +}
> +
>  static void teardown_stream(struct userdata *u) {
>      if (u->rtpoll_item) {
>          pa_rtpoll_item_free(u->rtpoll_item);
> @@ -221,6 +571,16 @@ static void teardown_stream(struct userdata *u) {
>          u->stream_fd = -1;
>      }
>  
> +    if (u->read_smoother) {
> +        pa_smoother_free(u->read_smoother);
> +        u->read_smoother = NULL;
> +    }
> +
> +    if (u->write_memchunk.memblock) {
> +        pa_memblock_unref(u->write_memchunk.memblock);
> +        pa_memchunk_reset(&u->write_memchunk);
> +    }
> +
>      pa_log_debug("Audio stream torn down");
>  }
>  
> @@ -258,6 +618,62 @@ static void transport_release(struct userdata *u) {
>      teardown_stream(u);
>  }
>  
> +/* Run from I/O thread */
> +static void transport_config_mtu(struct userdata *u) {
> +    u->read_block_size =
> +        (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload))
> +        / u->sbc_info.frame_length * u->sbc_info.codesize;
> +
> +    u->write_block_size =
> +        (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload))
> +        / u->sbc_info.frame_length * u->sbc_info.codesize;
> +
> +    if (u->sink) {
> +        pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
> +        pa_sink_set_fixed_latency_within_thread(u->sink,
> +                                                FIXED_LATENCY_PLAYBACK_A2DP +
> +                                                pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
> +    }
> +
> +    if (u->source)
> +        pa_source_set_fixed_latency_within_thread(u->source,
> +                                                  FIXED_LATENCY_RECORD_A2DP +
> +                                                  pa_bytes_to_usec(u->read_block_size, &u->sample_spec));
> +}
> +
> +/* Run from I/O thread */
> +static void setup_stream(struct userdata *u) {
> +    struct pollfd *pollfd;
> +    int one;
> +
> +    pa_log_info("Transport %s resuming", u->transport->path);
> +
> +    transport_config_mtu(u);
> +
> +    pa_make_fd_nonblock(u->stream_fd);
> +    pa_make_socket_low_delay(u->stream_fd);
> +
> +    one = 1;
> +    if (setsockopt(u->stream_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0)
> +        pa_log_warn("Failed to enable SO_TIMESTAMP: %s", pa_cstrerror(errno));
> +
> +    pa_log_debug("Stream properly set up, we're ready to roll!");
> +
> +    if (u->profile == PROFILE_A2DP_SINK)
> +        a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
> +
> +    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
> +    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> +    pollfd->fd = u->stream_fd;
> +    pollfd->events = pollfd->revents = 0;
> +
> +    u->read_index = u->write_index = 0;
> +    u->started_at = 0;
> +
> +    if (u->source)
> +        u->read_smoother = pa_smoother_new(PA_USEC_PER_SEC, 2*PA_USEC_PER_SEC, true, true, 10, pa_rtclock_now(), true);
> +}
> +
>  /* Run from main thread */
>  static int add_source(struct userdata *u) {
>      pa_source_new_data data;
> @@ -496,6 +912,194 @@ static int init_profile(struct userdata *u) {
>  
>  /* I/O thread function */
>  static void thread_func(void *userdata) {
> +    struct userdata *u = userdata;
> +    unsigned do_write = 0;
> +    unsigned pending_read_bytes = 0;
> +    bool writable = false;
> +
> +    pa_assert(u);
> +    pa_assert(u->transport);
> +
> +    pa_log_debug("IO Thread starting up");
> +
> +    if (u->core->realtime_scheduling)
> +        pa_make_realtime(u->core->realtime_priority);
> +
> +    pa_thread_mq_install(&u->thread_mq);
> +
> +    /* Setup the stream only if the transport was already acquired */
> +    if (u->transport_acquired)
> +        setup_stream(u);
> +
> +    for (;;) {
> +        struct pollfd *pollfd;
> +        int ret;
> +        bool disable_timer = true;
> +
> +        pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
> +
> +        if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
> +
> +            /* We should send two blocks to the device before we expect
> +             * a response. */

What response? I don't understand what this logic is trying to achieve.

> +
> +            if (u->write_index == 0 && u->read_index <= 0)

Using == 0 with write_index and <= 0 with read_index looks odd.
read_index isn't any more suspectible to be negative than write_index
(both have unsigned type anyway).

> +                do_write = 2;
> +
> +            if (pollfd && (pollfd->revents & POLLIN)) {
> +                int n_read;
> +
> +                n_read = a2dp_process_push(u);
> +
> +                if (n_read < 0)
> +                    goto io_fail;
> +
> +                /* We just read something, so we are supposed to write something, too */
> +                pending_read_bytes += n_read;
> +                do_write += pending_read_bytes / u->write_block_size;
> +                pending_read_bytes = pending_read_bytes % u->write_block_size;

This is not relevant for A2DP. Although this doesn't actively do any
harm in the A2DP mode either, for readability I think it would be a good
idea to run this code only when it's actually useful.

> +            }
> +        }
> +
> +        if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
> +
> +            if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> +                pa_sink_process_rewind(u->sink, 0);
> +
> +            if (pollfd) {
> +                if (pollfd->revents & POLLOUT)
> +                    writable = true;
> +
> +                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
> +                    pa_usec_t time_passed;
> +                    pa_usec_t audio_sent;
> +
> +                    /* Hmm, there is no input stream we could synchronize
> +                     * to. So let's do things by time */
> +
> +                    time_passed = pa_rtclock_now() - u->started_at;

u->started_at is 0 in the beginning, and initialized to a sensible value
only at the time of the first write attempt (BTW, I think it should be
initialized only if the write actually succeeds). So in the first round
time_passed will be very large, when it should be zero.

This doesn't cause practical problems, because skipping is triggered
only if write_index is greater than zero, but I think the code would be
clearer if time_passed would be assigned zero here.

> +                    audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
> +
> +                    if (audio_sent <= time_passed) {
> +                        pa_usec_t audio_to_send = time_passed - audio_sent;
> +
> +                        /* Never try to catch up for more than 100ms */
> +                        if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) {

Here the "write_index > 0" check wouldn't be needed if time_passed would
always have a sane value.

> +                            pa_usec_t skip_usec;
> +                            uint64_t skip_bytes;
> +
> +                            skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC;
> +                            skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec);
> +
> +                            if (skip_bytes > 0) {
> +                                pa_memchunk tmp;
> +
> +                                pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
> +                                            (unsigned long long) skip_usec,
> +                                            (unsigned long long) skip_bytes);
> +
> +                                pa_sink_render_full(u->sink, skip_bytes, &tmp);

write_memchunk may contain buffered audio, which should be consumed
first. I propose that the amount of audio to skip is saved in userdata,
and handled within a2dp_process_render(), because fiddling with
write_memchunk should ideally be done only in a2dp_process_render().

> +                                pa_memblock_unref(tmp.memblock);
> +                                u->write_index += skip_bytes;
> +
> +                                if (u->profile == PROFILE_A2DP_SINK)
> +                                    a2dp_reduce_bitpool(u);
> +                            }
> +                        }
> +
> +                        do_write = 1;
> +                        pending_read_bytes = 0;
> +                    }
> +                }
> +
> +                if (writable && do_write > 0) {
> +                    int n_written;
> +
> +                    if (u->write_index <= 0)
> +                        u->started_at = pa_rtclock_now();
> +
> +                    if ((n_written = a2dp_process_render(u)) < 0)
> +                        goto io_fail;
> +
> +                    if (n_written == 0)
> +                        pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!");
> +
> +                    do_write -= n_written;
> +                    writable = false;
> +                }
> +
> +                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) {

We should enable the timer only if we have already successfully written
something, so this condition should be added: "&& u->write_index > 0".

> +                    pa_usec_t sleep_for;
> +                    pa_usec_t time_passed, next_write_at;
> +
> +                    if (writable) {
> +                        /* Hmm, there is no input stream we could synchronize
> +                         * to. So let's estimate when we need to wake up the latest */
> +                        time_passed = pa_rtclock_now() - u->started_at;
> +                        next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec);
> +                        sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
> +                        /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
> +                    } else
> +                        /* drop stream every 500 ms */
> +                        sleep_for = PA_USEC_PER_MSEC * 500;

I don't think this works. I guess "drop stream" refers to the skipping
functionality, but skipping is only done if the socket is writable. If
the 500 ms timer fires, it means that the socket is not writable
(otherwise we would have woken up due to a POLLOUT event). If dropping
data every 500 ms is something that we want to do, the skipping code
needs to be modified so that it is run also if the socket is not
writable.

Phew, this one took long.

-- 
Tanu



More information about the pulseaudio-discuss mailing list