[pulseaudio-discuss] [PATCH v2] bluez5-device: Rewrite of thread function, reduce send buffer size for a2dp sink

Georg Chini georg at chini.tk
Thu Apr 26 15:38:00 UTC 2018


On 26.04.2018 15:13, Tanu Kaskinen wrote:
> On Thu, 2018-03-29 at 15:33 +0200, Georg Chini wrote:
>> The rewrite of the thread function does not change functionality much,
>> most of it is only cleanup, minor bug fixing  and documentation work.
>>
>> This patch also changes the send buffer size for a2dp sink to avoid lags
>> after temporary connection drops, following the proof-of-concept patch
>> posted by Dmitry Kalyanov.
>>
>> Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746
>> ---
>> Changes in v2:
>>   - fix issues pointed out by Tanu
>>   - set writable to false for HSP only if a block really needs to be written
>>
>>   src/modules/bluetooth/module-bluez5-device.c | 289 ++++++++++++++++++---------
>>   1 file changed, 191 insertions(+), 98 deletions(-)
>>
>> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
>> index c3acc1dc..dac1eb2a 100644
>> --- a/src/modules/bluetooth/module-bluez5-device.c
>> +++ b/src/modules/bluetooth/module-bluez5-device.c
>> @@ -56,9 +56,8 @@ PA_MODULE_LOAD_ONCE(false);
>>   PA_MODULE_USAGE("path=<device object path>"
>>                   "autodetect_mtu=<boolean>");
>>   
>> -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
>>   #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
>> -#define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC)
>> +#define FIXED_LATENCY_PLAYBACK_SCO  (25 * PA_USEC_PER_MSEC)
> Why is this changed? The commit message mentions nothing about this.

Ups, sorry, I forgot to mention. I changed it because it delivers much
better A/V sync for SCO. When testing the A2DP delay issue, I also
tested HSP. In fact, as HSP is a synchronous connection, the delay
for SCO playback/record should be smaller than for A2DP, but I
decided to use the same value for all four constants to keep it
simple.

>
>>   #define FIXED_LATENCY_RECORD_A2DP   (25 * PA_USEC_PER_MSEC)
>>   #define FIXED_LATENCY_RECORD_SCO    (25 * PA_USEC_PER_MSEC)
>>   
>> @@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
>>       return ret;
>>   }
>>   
>> +static void update_buffer_size(struct userdata *u) {
>> +    int old_bufsize;
>> +    socklen_t len = sizeof(int);
>> +    int ret;
>> +
>> +    ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);
>> +    if (ret == -1) {
>> +        pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
>> +    } else {
>> +        int new_bufsize;
>> +
>> +        /* Set send buffer size as small as possible. The minimum value is 1024 according to the
>> +         * socket man page. The data is written to the socket in chunks of write_block_size, so
>> +         * there should at least be room for two chunks in the buffer. Generally, write_block_size
>> +         * is larger than 512. If not, use the next multiple of write_block_size which is larger
>> +         * than 1024. */
>> +        new_bufsize = 2 * u->write_block_size;
>> +        if (new_bufsize < 1024)
>> +            new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size;
>> +
>> +        /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt
>> +         * returns the doubled value. */
>> +        if (new_bufsize != old_bufsize / 2) {
>> +            ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len);
>> +            if (ret == -1)
>> +                pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
>> +            else
>> +                pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize);
>> +        }
>> +    }
>> +}
>> +
>>   /* Run from I/O thread */
>>   static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>>       struct sbc_info *sbc_info;
>> @@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>>       pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
>>       pa_sink_set_fixed_latency_within_thread(u->sink,
>>               FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
>> +
>> +    /* If there is still data in the memchunk, we have to discard it
>> +     * because the write_block_size may have changed. */
>> +    if (u->write_memchunk.memblock) {
>> +        pa_memblock_unref(u->write_memchunk.memblock);
>> +        pa_memchunk_reset(&u->write_memchunk);
>> +    }
>> +
>> +    update_buffer_size(u);
>>   }
>>   
>>   /* Run from I/O thread */
>> @@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
>>   
>>       pa_log_debug("Stream properly set up, we're ready to roll!");
>>   
>> -    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
>> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
>>           a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
>> +        update_buffer_size(u);
>> +    }
>>   
>>       u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
>>       pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
>> @@ -1068,12 +1110,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>>       switch (code) {
>>   
>>           case PA_SINK_MESSAGE_GET_LATENCY: {
>> -            int64_t wi, ri;
>> +            int64_t wi = 0, ri = 0;
>>   
>>               if (u->read_smoother) {
>>                   ri = pa_smoother_get(u->read_smoother, pa_rtclock_now());
>>                   wi = pa_bytes_to_usec(u->write_index + u->write_block_size, &u->sample_spec);
>> -            } else {
>> +            } else if (u->started_at) {
>>                   ri = pa_rtclock_now() - u->started_at;
>>                   wi = pa_bytes_to_usec(u->write_index, &u->sample_spec);
>>               }
>> @@ -1415,12 +1457,32 @@ static int init_profile(struct userdata *u) {
>>       return r;
>>   }
>>   
>> +static int write_block(struct userdata *u) {
>> +    int n_written;
>> +
>> +    if (u->write_index <= 0)
>> +        u->started_at = pa_rtclock_now();
>> +
>> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
>> +        if ((n_written = a2dp_process_render(u)) < 0)
>> +            return -1;
>> +    } else {
>> +        if ((n_written = sco_process_render(u)) < 0)
>> +            return -1;
>> +    }
>> +
>> +    if (n_written == 0)
>> +        pa_log_debug("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss.");
>> +
>> +    return n_written;
>> +}
>> +
>> +
>>   /* I/O thread function */
>>   static void thread_func(void *userdata) {
>>       struct userdata *u = userdata;
>> -    unsigned do_write = 0;
>> -    unsigned pending_read_bytes = 0;
>> -    bool writable = false;
>> +    unsigned blocks_to_write = 0;
>> +    unsigned bytes_to_write = 0;
>>   
>>       pa_assert(u);
>>       pa_assert(u->transport);
>> @@ -1440,9 +1502,13 @@ static void thread_func(void *userdata) {
>>           struct pollfd *pollfd;
>>           int ret;
>>           bool disable_timer = true;
>> +        bool writable = false;
>> +        bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
>> +        bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
>>   
>>           pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
>>   
>> +        /* Check for stream error or close */
>>           if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
>>               pa_log_info("FD error: %s%s%s%s",
>>                           pollfd->revents & POLLERR ? "POLLERR " :"",
>> @@ -1453,147 +1519,174 @@ static void thread_func(void *userdata) {
>>               if (pollfd->revents & POLLHUP) {
>>                   pollfd = NULL;
>>                   teardown_stream(u);
>> -                do_write = 0;
>> -                pending_read_bytes = 0;
>> -                writable = false;
>> +                blocks_to_write = 0;
>> +                bytes_to_write = 0;
>>                   pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
>>               } else
>>                   goto fail;
>>           }
>>   
>> -        if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
>> +        /* If there is a pollfd, the stream is set up and we need to do something */
>> +        if (pollfd) {
>>   
>> -            /* We should send two blocks to the device before we expect
>> -             * a response. */
>> +            /* Handle source if present */
>> +            if (have_source) {
>>   
>> -            if (u->write_index == 0 && u->read_index <= 0)
>> -                do_write = 2;
>> +                /* We should send two blocks to the device before we expect a response. */
>> +                if (u->write_index == 0 && u->read_index <= 0)
>> +                    blocks_to_write = 2;
>>   
>> -            if (pollfd && (pollfd->revents & POLLIN)) {
>> -                int n_read;
>> +                /* If we got woken up by POLLIN let's do some reading */
>> +                if (pollfd->revents & POLLIN) {
>> +                    int n_read;
>>   
>> -                if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
>> -                    n_read = a2dp_process_push(u);
>> -                else
>> -                    n_read = sco_process_push(u);
>> +                    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
>> +                        n_read = a2dp_process_push(u);
>> +                    else
>> +                        n_read = sco_process_push(u);
>>   
>> -                if (n_read < 0)
>> -                    goto fail;
>> +                    if (n_read < 0)
>> +                        goto fail;
>>   
>> -                if (n_read > 0) {
>> -                    /* We just read something, so we are supposed to write something, too */
>> -                    pending_read_bytes += n_read;
>> -                    do_write += pending_read_bytes / u->write_block_size;
>> -                    pending_read_bytes = pending_read_bytes % u->write_block_size;
>> +                    if (n_read > 0) {
>> +                        /* We just read something, so we are supposed to write something, too */
>> +                        bytes_to_write += n_read;
>> +                        blocks_to_write += bytes_to_write / u->write_block_size;
>> +                        bytes_to_write = bytes_to_write % u->write_block_size;
>> +                    }
>>                   }
>>               }
>> -        }
>>   
>> -        if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
>> +            /* Handle sink if present */
>> +            if (have_sink) {
>>   
>> -            if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
>> -                pa_sink_process_rewind(u->sink, 0);
>> +                /* Process rewinds */
>> +                if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
>> +                    pa_sink_process_rewind(u->sink, 0);
>>   
>> -            if (pollfd) {
>> +                /* Test if the stream is writable */
>>                   if (pollfd->revents & POLLOUT)
>>                       writable = true;
>>   
>> -                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
>> -                    pa_usec_t time_passed;
>> -                    pa_usec_t audio_sent;
>> +                /* If we have a source, we let the source determine the timing
>> +                 * for the sink */
>> +                if (have_source) {
>>   
>> -                    /* Hmm, there is no input stream we could synchronize
>> -                     * to. So let's do things by time */
>> +                    if (writable && blocks_to_write > 0) {
>> +                        int result;
>>   
>> -                    time_passed = pa_rtclock_now() - u->started_at;
>> -                    audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
>> +                        if ((result = write_block(u)) < 0)
>> +                            goto fail;
>>   
>> +                        blocks_to_write -= result;
>> +                        if (blocks_to_write > 0)
>> +                            writable = false;
> This "blocks_to_write > 0" check is new. In the previous version
> writable was set to false unconditionally, and that's how I believe it
> should be done. I guess this is an optimization: we won't write
> anything before we get POLLIN, so to you it seemed unnecessary to wake
> up on POLLOUT. Getting the POLLOUT notification is indeed unnecessary
> in itself, but we still need to set POLLOUT when polling, because
> otherwise when we wake up due to POLLIN, pollfd->revents will not have
> POLLOUT set even if the socket is writable. As a result we won't write
> when we're supposed to.
>
> Or actually... if we wake up on POLLIN, and POLLOUT isn't in revents
> even if the socket is writable, we'll set POLLOUT on the subsequent
> poll, and that will return immediately, so there's not much delay with
> the write. So maybe the "blocks_to_write > 0" is fine after all. Some
> kind of a comment would probably be good in any case. For example:
>
> "writable controls whether we set POLLOUT when polling - we set it to
> false to enable POLLOUT. If there are more blocks to write, we want to
> be woken up immediately when the socket becomes writable. If there
> aren't currently any more blocks to write, then we'll have to wait
> until we've received more data, so in that case we only want to set
> POLLIN. Note that when we are woken up the next time, POLLOUT won't be
> set in revents even if the socket has meanwhile become writable, which
> may seem bad, but in that case we'll set POLLOUT in the subsequent
> poll, and the poll will return immediately, so our writes won't be
> delayed."
>
Yes it is an optimization and it works fine and significantly reduces
CPU load on slow systems (which was the reason to implement it).
I will add your comment to clarify.



More information about the pulseaudio-discuss mailing list