[pulseaudio-discuss] [PATCH v2] bluez5-device: Rewrite of thread function, reduce send buffer size for a2dp sink

Tanu Kaskinen tanuk at iki.fi
Sat Mar 24 17:13:40 UTC 2018


On Mon, 2018-03-05 at 08:49 +0100, Georg Chini wrote:
> The rewrite of the thread function does not change functionality much,
> most of it is only cleanup, minor bug fixing  and documentation work.
> 
> This patch also changes the send buffer size for a2dp sink to avoid lags
> after temporary connection drops, following the proof-of-concept patch
> posted by Dmitry Kalyanov.
> 
> Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746
> ---
>  src/modules/bluetooth/module-bluez5-device.c | 275 ++++++++++++++++++---------
>  1 file changed, 182 insertions(+), 93 deletions(-)

Thanks! Reading the new code caused less trouble than I recall the
bluetooth thread_func() previously having caused, so the changes are
good.

There are some issues pointed out below.

> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
> index 7970dda7..149d1708 100644
> --- a/src/modules/bluetooth/module-bluez5-device.c
> +++ b/src/modules/bluetooth/module-bluez5-device.c
> @@ -56,7 +56,6 @@ PA_MODULE_LOAD_ONCE(false);
>  PA_MODULE_USAGE("path=<device object path>"
>                  "autodetect_mtu=<boolean>");
>  
> -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_RECORD_A2DP   (25 * PA_USEC_PER_MSEC)
> @@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
>      return ret;
>  }
>  
> +static void update_buffer_size(struct userdata *u) {
> +    int old_bufsize;
> +    socklen_t len = sizeof(int);
> +    int ret;
> +
> +    ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);

In case the type of old_bufsize changes (which it will never do, but
what if it does anyway) then the calculation of len has to be
separately updated. Therefore it would be safer to pass
sizeof(new_bufsize) here and get rid of the len variable.

> +    if (ret == -1) {
> +        pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
> +    } else {
> +        int new_bufsize;
> +
> +        /* Set send buffer size as small as possible. The minimum value is 1024 according to the
> +         * socket man page. The data is written to the socket in chunks of write_block_size, so
> +         * there should at least be room for two chunks in the buffer. Generally, write_block_size
> +         * is larger than 512. If not, use the next multiple of write_block_size which is larger
> +         * than 1024. */
> +        new_bufsize = 2 * u->write_block_size;
> +        if (new_bufsize < 1024)
> +            new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size;
> +
> +        /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt
> +         * returns the doubled value. */
> +        if (new_bufsize != old_bufsize / 2) {
> +            ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len);

Same as above.

> +            if (ret == -1)
> +                pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
> +            else
> +                pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize);
> +        }
> +    }
> +}
> +
>  /* Run from I/O thread */
>  static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>      struct sbc_info *sbc_info;
> @@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>      pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
>      pa_sink_set_fixed_latency_within_thread(u->sink,
>              FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
> +
> +    /* If there is still data in the memchunk, we have to discard it
> +     * because the write_block_size may have changed. */
> +    if (u->write_memchunk.memblock) {
> +        pa_memblock_unref(u->write_memchunk.memblock);
> +        pa_memchunk_reset(&u->write_memchunk);
> +    }
> +
> +    update_buffer_size(u);
>  }
>  
>  /* Run from I/O thread */
> @@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
>  
>      pa_log_debug("Stream properly set up, we're ready to roll!");
>  
> -    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
>          a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
> +        update_buffer_size(u);

This is redundant, a2dp_set_bitpool() calls update_buffer_size() by
itself.

> +    }
>  
>      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
>      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> @@ -861,7 +903,7 @@ static void setup_stream(struct userdata *u) {
>      pollfd->events = pollfd->revents = 0;
>  
>      u->read_index = u->write_index = 0;
> -    u->started_at = 0;
> +    u->started_at = pa_rtclock_now();

This seems unnecessary. write_block() resets started_at anyway when the
first write is done.

Hmm... Now I noticed that u->started_at is used before the first write.
But it seems wrong to set u->started_at twice using (slightly)
different values. u->started_at is used in this code before the first
write:

    time_passed = pa_rtclock_now() - u->started_at;

I think before the first write time_passed should be set to exactly 0.

>      u->stream_setup_done = true;
>  
>      if (u->source)
> @@ -1407,12 +1449,32 @@ static int init_profile(struct userdata *u) {
>      return r;
>  }
>  
> +static int write_block(struct userdata *u) {
> +    int n_written;
> +
> +    if (u->write_index <= 0)
> +        u->started_at = pa_rtclock_now();
> +
> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
> +        if ((n_written = a2dp_process_render(u)) < 0)
> +            return -1;
> +    } else {
> +        if ((n_written = sco_process_render(u)) < 0)
> +            return -1;
> +    }
> +
> +    if (n_written == 0)
> +        pa_log("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss.");

If EAGAIN after POLLOUT is normal behaviour when the radio link drops
packets, then this shouldn't be an error level message, and not even a
warning. Wireless connections aren't perfect, and occasional
transmission issues shouldn't cause any messages in syslog in my
opinion.

> +
> +    return n_written;
> +}
> +
> +
>  /* I/O thread function */
>  static void thread_func(void *userdata) {
>      struct userdata *u = userdata;
> -    unsigned do_write = 0;
> -    unsigned pending_read_bytes = 0;
> -    bool writable = false;
> +    unsigned blocks_to_write = 0;
> +    unsigned bytes_to_write = 0;
>  
>      pa_assert(u);
>      pa_assert(u->transport);
> @@ -1432,9 +1494,13 @@ static void thread_func(void *userdata) {
>          struct pollfd *pollfd;
>          int ret;
>          bool disable_timer = true;
> +        bool writable = false;
> +        bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
> +        bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
>  
>          pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
>  
> +        /* Check for stream error or close */
>          if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
>              pa_log_info("FD error: %s%s%s%s",
>                          pollfd->revents & POLLERR ? "POLLERR " :"",
> @@ -1445,147 +1511,170 @@ static void thread_func(void *userdata) {
>              if (pollfd->revents & POLLHUP) {
>                  pollfd = NULL;
>                  teardown_stream(u);
> -                do_write = 0;
> -                pending_read_bytes = 0;
> +                blocks_to_write = 0;
> +                bytes_to_write = 0;
>                  writable = false;

writable is always false here, so this line can be dropped.

>                  pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
>              } else
>                  goto fail;
>          }
>  
> -        if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
> +        /* If there is a pollfd, the stream is set up and we need to do something */
> +        if (pollfd) {
>  
> -            /* We should send two blocks to the device before we expect
> -             * a response. */
> +            /* Handle source if present */
> +            if (have_source) {
>  
> -            if (u->write_index == 0 && u->read_index <= 0)
> -                do_write = 2;
> +                /* We should send two blocks to the device before we expect a response. */
> +                if (u->write_index == 0 && u->read_index <= 0)
> +                    blocks_to_write = 2;
>  
> -            if (pollfd && (pollfd->revents & POLLIN)) {
> -                int n_read;
> +                /* If we got woken up by POLLIN let's do some reading */
> +                if (pollfd->revents & POLLIN) {
> +                    int n_read;
>  
> -                if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
> -                    n_read = a2dp_process_push(u);
> -                else
> -                    n_read = sco_process_push(u);
> +                    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
> +                        n_read = a2dp_process_push(u);
> +                    else
> +                        n_read = sco_process_push(u);
>  
> -                if (n_read < 0)
> -                    goto fail;
> +                    if (n_read < 0)
> +                        goto fail;
>  
> -                if (n_read > 0) {
> -                    /* We just read something, so we are supposed to write something, too */
> -                    pending_read_bytes += n_read;
> -                    do_write += pending_read_bytes / u->write_block_size;
> -                    pending_read_bytes = pending_read_bytes % u->write_block_size;
> +                    if (n_read > 0) {
> +                        /* We just read something, so we are supposed to write something, too */
> +                        bytes_to_write += n_read;
> +                        blocks_to_write += bytes_to_write / u->write_block_size;
> +                        bytes_to_write = bytes_to_write % u->write_block_size;
> +                    }
>                  }
>              }
> -        }
>  
> -        if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
> +            /* Handle sink if present */
> +            if (have_sink) {
>  
> -            if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> -                pa_sink_process_rewind(u->sink, 0);
> +                /* Process rewinds */
> +                if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> +                    pa_sink_process_rewind(u->sink, 0);
>  
> -            if (pollfd) {
> +                /* Test if the stream is writable */
>                  if (pollfd->revents & POLLOUT)
>                      writable = true;
>  
> -                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
> +                /* If we have a source, we let the source determine the timing
> +                 * for the sink */
> +                if (have_source) {
> +
> +                    if (writable && blocks_to_write > 0) {
> +                        int result;
> +
> +                        if ((result = write_block(u)) < 0)
> +                            goto fail;
> +
> +                        blocks_to_write -= result;
> +                        writable = false;
> +                    }
> +
> +                /* There is no source, we have to use the system clock for timing */
> +                } else {
> +                    bool have_written = false;
>                      pa_usec_t time_passed;
>                      pa_usec_t audio_sent;
>  
> -                    /* Hmm, there is no input stream we could synchronize
> -                     * to. So let's do things by time */
> -
>                      time_passed = pa_rtclock_now() - u->started_at;
>                      audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
>  
> +                    /* A new block needs to be sent. */
>                      if (audio_sent <= time_passed) {
> -                        pa_usec_t audio_to_send = time_passed - audio_sent;
> +                        size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec);
>  
> -                        /* Never try to catch up for more than 100ms */
> -                        if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) {
> -                            pa_usec_t skip_usec;
> +                        /* There are more than two blocks that need to be written.
> +                         * We cannot catch up, therefore discard everything older
> +                         * than two block sizes. */

It seems that "cannot catch up" is not always true. It's probably true
that usually the buffer has room for only two blocks, but it's possible
that the buffer size is large enough for more blocks. Suggested
wording:

"There are more than two blocks that need to be written. Usually the
socket buffer has room for only two blocks, but even if there's more
room, let's not bother trying to catch up more than two blocks. We'll
discard everything older than two block sizes."

> +                        if (bytes_to_send > 2 * u->write_block_size) {
>                              uint64_t skip_bytes;
> +                            pa_memchunk tmp;
> +                            size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool);
> +                            pa_usec_t skip_usec;
>  
> -                            skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC;
> -                            skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec);
> +                            skip_bytes = bytes_to_send - 2 * u->write_block_size;
> +                            skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec);
>  
> -                            if (skip_bytes > 0) {
> -                                pa_memchunk tmp;
> +                            pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
> +                                        (unsigned long long) skip_usec,
> +                                        (unsigned long long) skip_bytes);

I think this shouldn't be a warning, since it's normal to have packets
dropping.

>  
> -                                pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
> -                                            (unsigned long long) skip_usec,
> -                                            (unsigned long long) skip_bytes);
> +                            while (skip_bytes > 0) {
> +                                size_t bytes_to_render;
>  
> -                                pa_sink_render_full(u->sink, skip_bytes, &tmp);
> -                                pa_memblock_unref(tmp.memblock);
> -                                u->write_index += skip_bytes;
> +                                if (skip_bytes > mempool_max_block_size)
> +                                    bytes_to_render = mempool_max_block_size;
> +                                else
> +                                    bytes_to_render = skip_bytes;
>  
> -                                if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> -                                    a2dp_reduce_bitpool(u);
> +                                pa_sink_render_full(u->sink, bytes_to_render, &tmp);
> +                                pa_memblock_unref(tmp.memblock);
> +                                u->write_index += bytes_to_render;
> +                                skip_bytes -= bytes_to_render;
>                              }
> +
> +                            if (u->write_index > 0 && u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> +                                a2dp_reduce_bitpool(u);
>                          }
>  
> -                        do_write = 1;
> -                        pending_read_bytes = 0;
> +                        blocks_to_write = 1;
>                      }
> -                }
> -
> -                if (writable && do_write > 0) {
> -                    int n_written;
>  
> -                    if (u->write_index <= 0)
> -                        u->started_at = pa_rtclock_now();
> +                    /* If the stream is writable, send some data if necessary */
> +                    if (writable && blocks_to_write > 0) {
> +                        int result;
>  
> -                    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
> -                        if ((n_written = a2dp_process_render(u)) < 0)
> -                            goto fail;
> -                    } else {
> -                        if ((n_written = sco_process_render(u)) < 0)
> +                        if ((result = write_block(u)) < 0)
>                              goto fail;
> -                    }
> -
> -                    if (n_written == 0)
> -                        pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!");
>  
> -                    do_write -= n_written;
> -                    writable = false;
> -                }
> +                        blocks_to_write -= result;
> +                        writable = false;
> +                        have_written = true;

Shouldn't have_written be set to true only if result > 0? If we got
EAGAIN, then no write actually happened.

-- 
Tanu

https://liberapay.com/tanuk
https://www.patreon.com/tanuk


More information about the pulseaudio-discuss mailing list