[pulseaudio-discuss] [PATCH v2] bluez5-device: Rewrite of thread function, reduce send buffer size for a2dp sink

Tanu Kaskinen tanuk at iki.fi
Thu Apr 26 13:13:08 UTC 2018


On Thu, 2018-03-29 at 15:33 +0200, Georg Chini wrote:
> The rewrite of the thread function does not change functionality much,
> most of it is only cleanup, minor bug fixing  and documentation work.
> 
> This patch also changes the send buffer size for a2dp sink to avoid lags
> after temporary connection drops, following the proof-of-concept patch
> posted by Dmitry Kalyanov.
> 
> Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746
> ---
> Changes in v2:
>  - fix issues pointed out by Tanu
>  - set writable to false for HSP only if a block really needs to be written
> 
>  src/modules/bluetooth/module-bluez5-device.c | 289 ++++++++++++++++++---------
>  1 file changed, 191 insertions(+), 98 deletions(-)
> 
> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
> index c3acc1dc..dac1eb2a 100644
> --- a/src/modules/bluetooth/module-bluez5-device.c
> +++ b/src/modules/bluetooth/module-bluez5-device.c
> @@ -56,9 +56,8 @@ PA_MODULE_LOAD_ONCE(false);
>  PA_MODULE_USAGE("path=<device object path>"
>                  "autodetect_mtu=<boolean>");
>  
> -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
> -#define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC)
> +#define FIXED_LATENCY_PLAYBACK_SCO  (25 * PA_USEC_PER_MSEC)

Why is this changed? The commit message mentions nothing about this.

>  #define FIXED_LATENCY_RECORD_A2DP   (25 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_RECORD_SCO    (25 * PA_USEC_PER_MSEC)
>  
> @@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
>      return ret;
>  }
>  
> +static void update_buffer_size(struct userdata *u) {
> +    int old_bufsize;
> +    socklen_t len = sizeof(int);
> +    int ret;
> +
> +    ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);
> +    if (ret == -1) {
> +        pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
> +    } else {
> +        int new_bufsize;
> +
> +        /* Set send buffer size as small as possible. The minimum value is 1024 according to the
> +         * socket man page. The data is written to the socket in chunks of write_block_size, so
> +         * there should at least be room for two chunks in the buffer. Generally, write_block_size
> +         * is larger than 512. If not, use the next multiple of write_block_size which is larger
> +         * than 1024. */
> +        new_bufsize = 2 * u->write_block_size;
> +        if (new_bufsize < 1024)
> +            new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size;
> +
> +        /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt
> +         * returns the doubled value. */
> +        if (new_bufsize != old_bufsize / 2) {
> +            ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len);
> +            if (ret == -1)
> +                pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
> +            else
> +                pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize);
> +        }
> +    }
> +}
> +
>  /* Run from I/O thread */
>  static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>      struct sbc_info *sbc_info;
> @@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>      pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
>      pa_sink_set_fixed_latency_within_thread(u->sink,
>              FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
> +
> +    /* If there is still data in the memchunk, we have to discard it
> +     * because the write_block_size may have changed. */
> +    if (u->write_memchunk.memblock) {
> +        pa_memblock_unref(u->write_memchunk.memblock);
> +        pa_memchunk_reset(&u->write_memchunk);
> +    }
> +
> +    update_buffer_size(u);
>  }
>  
>  /* Run from I/O thread */
> @@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
>  
>      pa_log_debug("Stream properly set up, we're ready to roll!");
>  
> -    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
>          a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
> +        update_buffer_size(u);
> +    }
>  
>      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
>      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> @@ -1068,12 +1110,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
>      switch (code) {
>  
>          case PA_SINK_MESSAGE_GET_LATENCY: {
> -            int64_t wi, ri;
> +            int64_t wi = 0, ri = 0;
>  
>              if (u->read_smoother) {
>                  ri = pa_smoother_get(u->read_smoother, pa_rtclock_now());
>                  wi = pa_bytes_to_usec(u->write_index + u->write_block_size, &u->sample_spec);
> -            } else {
> +            } else if (u->started_at) {
>                  ri = pa_rtclock_now() - u->started_at;
>                  wi = pa_bytes_to_usec(u->write_index, &u->sample_spec);
>              }
> @@ -1415,12 +1457,32 @@ static int init_profile(struct userdata *u) {
>      return r;
>  }
>  
> +static int write_block(struct userdata *u) {
> +    int n_written;
> +
> +    if (u->write_index <= 0)
> +        u->started_at = pa_rtclock_now();
> +
> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
> +        if ((n_written = a2dp_process_render(u)) < 0)
> +            return -1;
> +    } else {
> +        if ((n_written = sco_process_render(u)) < 0)
> +            return -1;
> +    }
> +
> +    if (n_written == 0)
> +        pa_log_debug("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss.");
> +
> +    return n_written;
> +}
> +
> +
>  /* I/O thread function */
>  static void thread_func(void *userdata) {
>      struct userdata *u = userdata;
> -    unsigned do_write = 0;
> -    unsigned pending_read_bytes = 0;
> -    bool writable = false;
> +    unsigned blocks_to_write = 0;
> +    unsigned bytes_to_write = 0;
>  
>      pa_assert(u);
>      pa_assert(u->transport);
> @@ -1440,9 +1502,13 @@ static void thread_func(void *userdata) {
>          struct pollfd *pollfd;
>          int ret;
>          bool disable_timer = true;
> +        bool writable = false;
> +        bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
> +        bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
>  
>          pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
>  
> +        /* Check for stream error or close */
>          if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
>              pa_log_info("FD error: %s%s%s%s",
>                          pollfd->revents & POLLERR ? "POLLERR " :"",
> @@ -1453,147 +1519,174 @@ static void thread_func(void *userdata) {
>              if (pollfd->revents & POLLHUP) {
>                  pollfd = NULL;
>                  teardown_stream(u);
> -                do_write = 0;
> -                pending_read_bytes = 0;
> -                writable = false;
> +                blocks_to_write = 0;
> +                bytes_to_write = 0;
>                  pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
>              } else
>                  goto fail;
>          }
>  
> -        if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
> +        /* If there is a pollfd, the stream is set up and we need to do something */
> +        if (pollfd) {
>  
> -            /* We should send two blocks to the device before we expect
> -             * a response. */
> +            /* Handle source if present */
> +            if (have_source) {
>  
> -            if (u->write_index == 0 && u->read_index <= 0)
> -                do_write = 2;
> +                /* We should send two blocks to the device before we expect a response. */
> +                if (u->write_index == 0 && u->read_index <= 0)
> +                    blocks_to_write = 2;
>  
> -            if (pollfd && (pollfd->revents & POLLIN)) {
> -                int n_read;
> +                /* If we got woken up by POLLIN let's do some reading */
> +                if (pollfd->revents & POLLIN) {
> +                    int n_read;
>  
> -                if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
> -                    n_read = a2dp_process_push(u);
> -                else
> -                    n_read = sco_process_push(u);
> +                    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
> +                        n_read = a2dp_process_push(u);
> +                    else
> +                        n_read = sco_process_push(u);
>  
> -                if (n_read < 0)
> -                    goto fail;
> +                    if (n_read < 0)
> +                        goto fail;
>  
> -                if (n_read > 0) {
> -                    /* We just read something, so we are supposed to write something, too */
> -                    pending_read_bytes += n_read;
> -                    do_write += pending_read_bytes / u->write_block_size;
> -                    pending_read_bytes = pending_read_bytes % u->write_block_size;
> +                    if (n_read > 0) {
> +                        /* We just read something, so we are supposed to write something, too */
> +                        bytes_to_write += n_read;
> +                        blocks_to_write += bytes_to_write / u->write_block_size;
> +                        bytes_to_write = bytes_to_write % u->write_block_size;
> +                    }
>                  }
>              }
> -        }
>  
> -        if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
> +            /* Handle sink if present */
> +            if (have_sink) {
>  
> -            if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> -                pa_sink_process_rewind(u->sink, 0);
> +                /* Process rewinds */
> +                if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> +                    pa_sink_process_rewind(u->sink, 0);
>  
> -            if (pollfd) {
> +                /* Test if the stream is writable */
>                  if (pollfd->revents & POLLOUT)
>                      writable = true;
>  
> -                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
> -                    pa_usec_t time_passed;
> -                    pa_usec_t audio_sent;
> +                /* If we have a source, we let the source determine the timing
> +                 * for the sink */
> +                if (have_source) {
>  
> -                    /* Hmm, there is no input stream we could synchronize
> -                     * to. So let's do things by time */
> +                    if (writable && blocks_to_write > 0) {
> +                        int result;
>  
> -                    time_passed = pa_rtclock_now() - u->started_at;
> -                    audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
> +                        if ((result = write_block(u)) < 0)
> +                            goto fail;
>  
> +                        blocks_to_write -= result;
> +                        if (blocks_to_write > 0)
> +                            writable = false;

This "blocks_to_write > 0" check is new. In the previous version
writable was set to false unconditionally, and that's how I believe it
should be done. I guess this is an optimization: we won't write
anything before we get POLLIN, so to you it seemed unnecessary to wake
up on POLLOUT. Getting the POLLOUT notification is indeed unnecessary
in itself, but we still need to set POLLOUT when polling, because
otherwise when we wake up due to POLLIN, pollfd->revents will not have
POLLOUT set even if the socket is writable. As a result we won't write
when we're supposed to.

Or actually... if we wake up on POLLIN, and POLLOUT isn't in revents
even if the socket is writable, we'll set POLLOUT on the subsequent
poll, and that will return immediately, so there's not much delay with
the write. So maybe the "blocks_to_write > 0" is fine after all. Some
kind of a comment would probably be good in any case. For example:

"writable controls whether we set POLLOUT when polling - we set it to
false to enable POLLOUT. If there are more blocks to write, we want to
be woken up immediately when the socket becomes writable. If there
aren't currently any more blocks to write, then we'll have to wait
until we've received more data, so in that case we only want to set
POLLIN. Note that when we are woken up the next time, POLLOUT won't be
set in revents even if the socket has meanwhile become writable, which
may seem bad, but in that case we'll set POLLOUT in the subsequent
poll, and the poll will return immediately, so our writes won't be
delayed."

-- 
Tanu

https://liberapay.com/tanuk
https://www.patreon.com/tanuk


More information about the pulseaudio-discuss mailing list