[pulseaudio-discuss] [PATCH v2] bluez5-device: Rewrite of thread function, reduce send buffer size for a2dp sink

Luiz Augusto von Dentz luiz.dentz at gmail.com
Fri Mar 9 09:20:56 UTC 2018


 Hi George,

On Mon, Mar 5, 2018 at 9:49 AM, Georg Chini <georg at chini.tk> wrote:
> The rewrite of the thread function does not change functionality much,
> most of it is only cleanup, minor bug fixing  and documentation work.
>
> This patch also changes the send buffer size for a2dp sink to avoid lags
> after temporary connection drops, following the proof-of-concept patch
> posted by Dmitry Kalyanov.
>
> Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746
> ---
>  src/modules/bluetooth/module-bluez5-device.c | 275 ++++++++++++++++++---------
>  1 file changed, 182 insertions(+), 93 deletions(-)
>
> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
> index 7970dda7..149d1708 100644
> --- a/src/modules/bluetooth/module-bluez5-device.c
> +++ b/src/modules/bluetooth/module-bluez5-device.c
> @@ -56,7 +56,6 @@ PA_MODULE_LOAD_ONCE(false);
>  PA_MODULE_USAGE("path=<device object path>"
>                  "autodetect_mtu=<boolean>");
>
> -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC)
>  #define FIXED_LATENCY_RECORD_A2DP   (25 * PA_USEC_PER_MSEC)
> @@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) {
>      return ret;
>  }
>
> +static void update_buffer_size(struct userdata *u) {
> +    int old_bufsize;
> +    socklen_t len = sizeof(int);
> +    int ret;
> +
> +    ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len);
> +    if (ret == -1) {
> +        pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno));
> +    } else {
> +        int new_bufsize;
> +
> +        /* Set send buffer size as small as possible. The minimum value is 1024 according to the
> +         * socket man page. The data is written to the socket in chunks of write_block_size, so
> +         * there should at least be room for two chunks in the buffer. Generally, write_block_size
> +         * is larger than 512. If not, use the next multiple of write_block_size which is larger
> +         * than 1024. */

Btw, the man page actually says it is 2048 bytes but perhaps you are
considering the kernel doubles the valuse set for bookkeeping? Anyway
getsockopt shall always return 2048 at least.

> +        new_bufsize = 2 * u->write_block_size;
> +        if (new_bufsize < 1024)
> +            new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size;
> +
> +        /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt
> +         * returns the doubled value. */

Btw, what happens if the buffer is reduced, for example when the
bitpool is reduced, and there are packets still on the buffer? Are
they discarded?

> +        if (new_bufsize != old_bufsize / 2) {
> +            ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len);
> +            if (ret == -1)
> +                pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno));
> +            else
> +                pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize);
> +        }
> +    }
> +}
> +
>  /* Run from I/O thread */
>  static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>      struct sbc_info *sbc_info;
> @@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) {
>      pa_sink_set_max_request_within_thread(u->sink, u->write_block_size);
>      pa_sink_set_fixed_latency_within_thread(u->sink,
>              FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec));
> +
> +    /* If there is still data in the memchunk, we have to discard it
> +     * because the write_block_size may have changed. */
> +    if (u->write_memchunk.memblock) {
> +        pa_memblock_unref(u->write_memchunk.memblock);
> +        pa_memchunk_reset(&u->write_memchunk);
> +    }
> +
> +    update_buffer_size(u);
>  }
>
>  /* Run from I/O thread */
> @@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) {
>
>      pa_log_debug("Stream properly set up, we're ready to roll!");
>
> -    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
>          a2dp_set_bitpool(u, u->sbc_info.max_bitpool);
> +        update_buffer_size(u);
> +    }
>
>      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
>      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
> @@ -861,7 +903,7 @@ static void setup_stream(struct userdata *u) {
>      pollfd->events = pollfd->revents = 0;
>
>      u->read_index = u->write_index = 0;
> -    u->started_at = 0;
> +    u->started_at = pa_rtclock_now();
>      u->stream_setup_done = true;
>
>      if (u->source)
> @@ -1407,12 +1449,32 @@ static int init_profile(struct userdata *u) {
>      return r;
>  }
>
> +static int write_block(struct userdata *u) {
> +    int n_written;
> +
> +    if (u->write_index <= 0)
> +        u->started_at = pa_rtclock_now();
> +
> +    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
> +        if ((n_written = a2dp_process_render(u)) < 0)
> +            return -1;
> +    } else {
> +        if ((n_written = sco_process_render(u)) < 0)
> +            return -1;
> +    }
> +
> +    if (n_written == 0)
> +        pa_log("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss.");
> +
> +    return n_written;
> +}
> +
> +
>  /* I/O thread function */
>  static void thread_func(void *userdata) {
>      struct userdata *u = userdata;
> -    unsigned do_write = 0;
> -    unsigned pending_read_bytes = 0;
> -    bool writable = false;
> +    unsigned blocks_to_write = 0;
> +    unsigned bytes_to_write = 0;
>
>      pa_assert(u);
>      pa_assert(u->transport);
> @@ -1432,9 +1494,13 @@ static void thread_func(void *userdata) {
>          struct pollfd *pollfd;
>          int ret;
>          bool disable_timer = true;
> +        bool writable = false;
> +        bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false;
> +        bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false;
>
>          pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL;
>
> +        /* Check for stream error or close */
>          if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
>              pa_log_info("FD error: %s%s%s%s",
>                          pollfd->revents & POLLERR ? "POLLERR " :"",
> @@ -1445,147 +1511,170 @@ static void thread_func(void *userdata) {
>              if (pollfd->revents & POLLHUP) {
>                  pollfd = NULL;
>                  teardown_stream(u);
> -                do_write = 0;
> -                pending_read_bytes = 0;
> +                blocks_to_write = 0;
> +                bytes_to_write = 0;
>                  writable = false;
>                  pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL);
>              } else
>                  goto fail;
>          }
>
> -        if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
> +        /* If there is a pollfd, the stream is set up and we need to do something */
> +        if (pollfd) {
>
> -            /* We should send two blocks to the device before we expect
> -             * a response. */
> +            /* Handle source if present */
> +            if (have_source) {
>
> -            if (u->write_index == 0 && u->read_index <= 0)
> -                do_write = 2;
> +                /* We should send two blocks to the device before we expect a response. */
> +                if (u->write_index == 0 && u->read_index <= 0)
> +                    blocks_to_write = 2;
>
> -            if (pollfd && (pollfd->revents & POLLIN)) {
> -                int n_read;
> +                /* If we got woken up by POLLIN let's do some reading */
> +                if (pollfd->revents & POLLIN) {
> +                    int n_read;
>
> -                if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
> -                    n_read = a2dp_process_push(u);
> -                else
> -                    n_read = sco_process_push(u);
> +                    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE)
> +                        n_read = a2dp_process_push(u);
> +                    else
> +                        n_read = sco_process_push(u);
>
> -                if (n_read < 0)
> -                    goto fail;
> +                    if (n_read < 0)
> +                        goto fail;
>
> -                if (n_read > 0) {
> -                    /* We just read something, so we are supposed to write something, too */
> -                    pending_read_bytes += n_read;
> -                    do_write += pending_read_bytes / u->write_block_size;
> -                    pending_read_bytes = pending_read_bytes % u->write_block_size;
> +                    if (n_read > 0) {
> +                        /* We just read something, so we are supposed to write something, too */
> +                        bytes_to_write += n_read;
> +                        blocks_to_write += bytes_to_write / u->write_block_size;
> +                        bytes_to_write = bytes_to_write % u->write_block_size;
> +                    }
>                  }
>              }
> -        }
>
> -        if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
> +            /* Handle sink if present */
> +            if (have_sink) {
>
> -            if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> -                pa_sink_process_rewind(u->sink, 0);
> +                /* Process rewinds */
> +                if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> +                    pa_sink_process_rewind(u->sink, 0);
>
> -            if (pollfd) {
> +                /* Test if the stream is writable */
>                  if (pollfd->revents & POLLOUT)
>                      writable = true;
>
> -                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) {
> +                /* If we have a source, we let the source determine the timing
> +                 * for the sink */
> +                if (have_source) {
> +
> +                    if (writable && blocks_to_write > 0) {
> +                        int result;
> +
> +                        if ((result = write_block(u)) < 0)
> +                            goto fail;
> +
> +                        blocks_to_write -= result;
> +                        writable = false;
> +                    }
> +
> +                /* There is no source, we have to use the system clock for timing */
> +                } else {
> +                    bool have_written = false;
>                      pa_usec_t time_passed;
>                      pa_usec_t audio_sent;
>
> -                    /* Hmm, there is no input stream we could synchronize
> -                     * to. So let's do things by time */
> -
>                      time_passed = pa_rtclock_now() - u->started_at;
>                      audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec);
>
> +                    /* A new block needs to be sent. */
>                      if (audio_sent <= time_passed) {
> -                        pa_usec_t audio_to_send = time_passed - audio_sent;
> +                        size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec);
>
> -                        /* Never try to catch up for more than 100ms */
> -                        if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) {
> -                            pa_usec_t skip_usec;
> +                        /* There are more than two blocks that need to be written.
> +                         * We cannot catch up, therefore discard everything older
> +                         * than two block sizes. */
> +                        if (bytes_to_send > 2 * u->write_block_size) {
>                              uint64_t skip_bytes;
> +                            pa_memchunk tmp;
> +                            size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool);
> +                            pa_usec_t skip_usec;
>
> -                            skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC;
> -                            skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec);
> +                            skip_bytes = bytes_to_send - 2 * u->write_block_size;
> +                            skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec);
>
> -                            if (skip_bytes > 0) {
> -                                pa_memchunk tmp;
> +                            pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
> +                                        (unsigned long long) skip_usec,
> +                                        (unsigned long long) skip_bytes);
>
> -                                pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
> -                                            (unsigned long long) skip_usec,
> -                                            (unsigned long long) skip_bytes);
> +                            while (skip_bytes > 0) {
> +                                size_t bytes_to_render;
>
> -                                pa_sink_render_full(u->sink, skip_bytes, &tmp);
> -                                pa_memblock_unref(tmp.memblock);
> -                                u->write_index += skip_bytes;
> +                                if (skip_bytes > mempool_max_block_size)
> +                                    bytes_to_render = mempool_max_block_size;
> +                                else
> +                                    bytes_to_render = skip_bytes;
>
> -                                if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> -                                    a2dp_reduce_bitpool(u);
> +                                pa_sink_render_full(u->sink, bytes_to_render, &tmp);
> +                                pa_memblock_unref(tmp.memblock);
> +                                u->write_index += bytes_to_render;
> +                                skip_bytes -= bytes_to_render;
>                              }
> +
> +                            if (u->write_index > 0 && u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK)
> +                                a2dp_reduce_bitpool(u);
>                          }
>
> -                        do_write = 1;
> -                        pending_read_bytes = 0;
> +                        blocks_to_write = 1;
>                      }
> -                }
> -
> -                if (writable && do_write > 0) {
> -                    int n_written;
>
> -                    if (u->write_index <= 0)
> -                        u->started_at = pa_rtclock_now();
> +                    /* If the stream is writable, send some data if necessary */
> +                    if (writable && blocks_to_write > 0) {
> +                        int result;
>
> -                    if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) {
> -                        if ((n_written = a2dp_process_render(u)) < 0)
> -                            goto fail;
> -                    } else {
> -                        if ((n_written = sco_process_render(u)) < 0)
> +                        if ((result = write_block(u)) < 0)
>                              goto fail;
> -                    }
> -
> -                    if (n_written == 0)
> -                        pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!");
>
> -                    do_write -= n_written;
> -                    writable = false;
> -                }
> +                        blocks_to_write -= result;
> +                        writable = false;
> +                        have_written = true;
> +                    }
>
> -                if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) {
> -                    pa_usec_t sleep_for;
> -                    pa_usec_t time_passed, next_write_at;
> -
> -                    if (writable) {
> -                        /* Hmm, there is no input stream we could synchronize
> -                         * to. So let's estimate when we need to wake up the latest */
> -                        time_passed = pa_rtclock_now() - u->started_at;
> -                        next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec);
> -                        sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
> -                        /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
> -                    } else
> -                        /* drop stream every 500 ms */
> -                        sleep_for = PA_USEC_PER_MSEC * 500;
> -
> -                    pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for);
> -                    disable_timer = false;
> +                    /* If nothing was written during this iteration, either the stream
> +                     * is not writable or there was no write pending. Set up a timer that
> +                     * will wake up the thread when the next data needs to be written. */
> +                    if (!have_written) {
> +                        pa_usec_t sleep_for;
> +                        pa_usec_t next_write_at;
> +
> +                        if (writable) {
> +                            /* There was no write pending on this iteration of the loop.
> +                             * Let's estimate when we need to wake up next */
> +                            next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec);
> +                            sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
> +                            /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
> +                        } else
> +                            /* We could not write because the stream was not ready. Let's try
> +                             * again in 500 ms and drop audio if we still can't write. The
> +                             * thread will also be woken up when we can write again. */
> +                            sleep_for = PA_USEC_PER_MSEC * 500;
> +
> +                        pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for);
> +                        disable_timer = false;
> +                    }
>                  }
>              }
> +
> +            /* Set events to wake up the thread */
> +            pollfd->events = (short) (((have_sink && !writable) ? POLLOUT : 0) | (have_source ? POLLIN : 0));
> +
>          }
>
>          if (disable_timer)
>              pa_rtpoll_set_timer_disabled(u->rtpoll);
>
> -        /* Hmm, nothing to do. Let's sleep */
> -        if (pollfd)
> -            pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) |
> -                                      (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0));
> -
>          if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
>              pa_log_debug("pa_rtpoll_run failed with: %d", ret);
>              goto fail;
>          }
> +
>          if (ret == 0) {
>              pa_log_debug("IO thread shutdown requested, stopping cleanly");
>              transport_release(u);
> --
> 2.14.1
>
> _______________________________________________
> pulseaudio-discuss mailing list
> pulseaudio-discuss at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/pulseaudio-discuss



-- 
Luiz Augusto von Dentz


More information about the pulseaudio-discuss mailing list