[pulseaudio-discuss] [PATCH v2 5/6] rtpoll: Implement pa_mainloop_api support

Tanu Kaskinen tanu.kaskinen at linux.intel.com
Wed Jan 7 06:56:51 PST 2015


The new tunnel sink and source use libpulse to talk to the remote
server, and libpulse requires a pa_mainloop_api implementation for
interacting with the event loop. The tunnel sink and source have so
far been using pa_mainloop, but there are some modules that assume
that all sinks and sources use pa_rtpoll in their IO threads, and
trying to use those modules together with the pa_mainloop based
tunnel sinks and sources will result in crashes (see [1]).

I will switch the event loop implementation in the tunnel modules
to pa_rtpoll, but that requires a pa_mainloop_api implementation in
pa_rtpoll first - that's implemented here.

pa_rtpoll_run() is changed so that it processes all defer events
first, and then all expired time events. The rest of pa_rtpoll_run()
works as before, except the poll timeout calculation now has to also
take the time events into account. IO events use the existing
pa_rtpoll_item interface.

The time events are handled separately from the old timer
functionality, which is somewhat ugly. It might be a good idea to
remove the old timer functionality and only use the time events.
I didn't attempt to do that at this time, because I feared that
adapting the pa_rtpoll users to the new system would be difficult.

[1] https://bugs.freedesktop.org/show_bug.cgi?id=73429
---
 src/pulsecore/rtpoll.c | 504 ++++++++++++++++++++++++++++++++++++++++++++++++-
 src/pulsecore/rtpoll.h |   4 +
 2 files changed, 500 insertions(+), 8 deletions(-)

diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c
index 5f3ca8b..2e1907c 100644
--- a/src/pulsecore/rtpoll.c
+++ b/src/pulsecore/rtpoll.c
@@ -32,6 +32,7 @@
 #include <pulse/xmalloc.h>
 #include <pulse/timeval.h>
 
+#include <pulsecore/dynarray.h>
 #include <pulsecore/poll.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/core-rtclock.h>
@@ -65,6 +66,18 @@ struct pa_rtpoll {
 #endif
 
     PA_LLIST_HEAD(pa_rtpoll_item, items);
+
+    pa_mainloop_api mainloop_api;
+
+    pa_dynarray *io_events;
+
+    pa_dynarray *time_events;
+    pa_dynarray *enabled_time_events;
+    pa_dynarray *expired_time_events;
+    pa_time_event *cached_next_time_event;
+
+    pa_dynarray *defer_events;
+    pa_dynarray *enabled_defer_events;
 };
 
 struct pa_rtpoll_item {
@@ -84,8 +97,321 @@ struct pa_rtpoll_item {
     PA_LLIST_FIELDS(pa_rtpoll_item);
 };
 
+struct pa_io_event {
+    pa_rtpoll *rtpoll;
+    pa_rtpoll_item *rtpoll_item;
+    pa_io_event_flags_t events;
+    pa_io_event_cb_t callback;
+    pa_io_event_destroy_cb_t destroy_callback;
+    void *userdata;
+};
+
+static void io_event_enable(pa_io_event *event, pa_io_event_flags_t events);
+
+struct pa_time_event {
+    pa_rtpoll *rtpoll;
+    pa_usec_t time;
+    bool use_rtclock;
+    bool enabled;
+    pa_time_event_cb_t callback;
+    pa_time_event_destroy_cb_t destroy_callback;
+    void *userdata;
+};
+
+static void time_event_restart(pa_time_event *event, const struct timeval *tv);
+
+struct pa_defer_event {
+    pa_rtpoll *rtpoll;
+    bool enabled;
+    pa_defer_event_cb_t callback;
+    pa_defer_event_destroy_cb_t destroy_callback;
+    void *userdata;
+};
+
+static void defer_event_enable(pa_defer_event *event, int enable);
+
 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
 
+static short map_flags_to_libc(pa_io_event_flags_t flags) {
+    return (short)
+        ((flags & PA_IO_EVENT_INPUT ? POLLIN : 0) |
+         (flags & PA_IO_EVENT_OUTPUT ? POLLOUT : 0) |
+         (flags & PA_IO_EVENT_ERROR ? POLLERR : 0) |
+         (flags & PA_IO_EVENT_HANGUP ? POLLHUP : 0));
+}
+
+static pa_io_event_flags_t map_flags_from_libc(short flags) {
+    return
+        (flags & POLLIN ? PA_IO_EVENT_INPUT : 0) |
+        (flags & POLLOUT ? PA_IO_EVENT_OUTPUT : 0) |
+        (flags & POLLERR ? PA_IO_EVENT_ERROR : 0) |
+        (flags & POLLHUP ? PA_IO_EVENT_HANGUP : 0);
+}
+
+static int io_event_work_cb(pa_rtpoll_item *item) {
+    pa_io_event *event;
+    struct pollfd *pollfd;
+
+    pa_assert(item);
+
+    event = pa_rtpoll_item_get_userdata(item);
+    pollfd = pa_rtpoll_item_get_pollfd(item, NULL);
+    event->callback(&event->rtpoll->mainloop_api, event, pollfd->fd, map_flags_from_libc(pollfd->revents), event->userdata);
+
+    return 0;
+}
+
+static pa_io_event* io_event_new(pa_mainloop_api *api, int fd, pa_io_event_flags_t events, pa_io_event_cb_t callback,
+                                 void *userdata) {
+    pa_rtpoll *rtpoll;
+    pa_io_event *event;
+    struct pollfd *pollfd;
+
+    pa_assert(api);
+    pa_assert(api->userdata);
+    pa_assert(fd >= 0);
+    pa_assert(callback);
+
+    rtpoll = api->userdata;
+    pa_assert(api == &rtpoll->mainloop_api);
+
+    event = pa_xnew0(pa_io_event, 1);
+    event->rtpoll = rtpoll;
+    event->rtpoll_item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_NORMAL, 1);
+    pa_rtpoll_item_set_work_callback(event->rtpoll_item, io_event_work_cb);
+    pa_rtpoll_item_set_userdata(event->rtpoll_item, event);
+    pollfd = pa_rtpoll_item_get_pollfd(event->rtpoll_item, NULL);
+    pollfd->fd = fd;
+    event->callback = callback;
+    event->userdata = userdata;
+
+    pa_dynarray_append(rtpoll->io_events, event);
+    io_event_enable(event, events);
+
+    return event;
+}
+
+static void io_event_free(pa_io_event *event) {
+    pa_assert(event);
+
+    pa_dynarray_remove_by_data(event->rtpoll->io_events, event);
+
+    if (event->destroy_callback)
+        event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata);
+
+    if (event->rtpoll_item)
+        pa_rtpoll_item_free(event->rtpoll_item);
+
+    pa_xfree(event);
+}
+
+static void io_event_enable(pa_io_event *event, pa_io_event_flags_t events) {
+    struct pollfd *pollfd;
+
+    pa_assert(event);
+
+    if (events == event->events)
+        return;
+
+    event->events = events;
+
+    pollfd = pa_rtpoll_item_get_pollfd(event->rtpoll_item, NULL);
+    pollfd->events = map_flags_to_libc(events);
+}
+
+static void io_event_set_destroy(pa_io_event *event, pa_io_event_destroy_cb_t callback) {
+    pa_assert(event);
+
+    event->destroy_callback = callback;
+}
+
+static pa_usec_t make_rt(const struct timeval *tv, bool *use_rtclock) {
+    struct timeval ttv;
+
+    if (!tv) {
+        *use_rtclock = false;
+        return PA_USEC_INVALID;
+    }
+
+    ttv = *tv;
+    *use_rtclock = !!(ttv.tv_usec & PA_TIMEVAL_RTCLOCK);
+
+    if (*use_rtclock)
+        ttv.tv_usec &= ~PA_TIMEVAL_RTCLOCK;
+    else
+        pa_rtclock_from_wallclock(&ttv);
+
+    return pa_timeval_load(&ttv);
+}
+
+static pa_time_event* time_event_new(pa_mainloop_api *api, const struct timeval *tv, pa_time_event_cb_t callback,
+                                     void *userdata) {
+    pa_rtpoll *rtpoll;
+    pa_time_event *event;
+
+    pa_assert(api);
+    pa_assert(api->userdata);
+    pa_assert(callback);
+
+    rtpoll = api->userdata;
+    pa_assert(api == &rtpoll->mainloop_api);
+
+    event = pa_xnew0(pa_time_event, 1);
+    event->rtpoll = rtpoll;
+    event->time = PA_USEC_INVALID;
+    event->callback = callback;
+    event->userdata = userdata;
+
+    pa_dynarray_append(rtpoll->time_events, event);
+    time_event_restart(event, tv);
+
+    return event;
+}
+
+static void time_event_free(pa_time_event *event) {
+    pa_assert(event);
+
+    time_event_restart(event, NULL);
+    pa_dynarray_remove_by_data(event->rtpoll->time_events, event);
+
+    if (event->destroy_callback)
+        event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata);
+
+    pa_xfree(event);
+}
+
+static void time_event_restart(pa_time_event *event, const struct timeval *tv) {
+    pa_usec_t t;
+    bool use_rtclock;
+    bool enabled;
+    bool old_enabled;
+
+    pa_assert(event);
+
+    t = make_rt(tv, &use_rtclock);
+    enabled = (t != PA_USEC_INVALID);
+    old_enabled = event->enabled;
+
+    /* We return early only if the event stays disabled. If the event stays
+     * enabled, we can't return early, because the event time may change. */
+    if (!enabled && !old_enabled)
+        return;
+
+    event->enabled = enabled;
+    event->time = t;
+    event->use_rtclock = use_rtclock;
+
+    if (enabled && !old_enabled)
+        pa_dynarray_append(event->rtpoll->enabled_time_events, event);
+    else if (!enabled) {
+        pa_dynarray_remove_by_data(event->rtpoll->enabled_time_events, event);
+        pa_dynarray_remove_by_data(event->rtpoll->expired_time_events, event);
+    }
+
+    if (event->rtpoll->cached_next_time_event == event)
+        event->rtpoll->cached_next_time_event = NULL;
+
+    if (event->rtpoll->cached_next_time_event && enabled) {
+        pa_assert(event->rtpoll->cached_next_time_event->enabled);
+
+        if (t < event->rtpoll->cached_next_time_event->time)
+            event->rtpoll->cached_next_time_event = event;
+    }
+}
+
+static void time_event_set_destroy(pa_time_event *event, pa_time_event_destroy_cb_t callback) {
+    pa_assert(event);
+
+    event->destroy_callback = callback;
+}
+
+static pa_defer_event* defer_event_new(pa_mainloop_api *api, pa_defer_event_cb_t callback, void *userdata) {
+    pa_rtpoll *rtpoll;
+    pa_defer_event *event;
+
+    pa_assert(api);
+    pa_assert(api->userdata);
+    pa_assert(callback);
+
+    rtpoll = api->userdata;
+    pa_assert(api == &rtpoll->mainloop_api);
+
+    event = pa_xnew0(pa_defer_event, 1);
+    event->rtpoll = rtpoll;
+    event->callback = callback;
+    event->userdata = userdata;
+
+    pa_dynarray_append(rtpoll->defer_events, event);
+    defer_event_enable(event, true);
+
+    return event;
+}
+
+static void defer_event_free(pa_defer_event *event) {
+    pa_assert(event);
+
+    defer_event_enable(event, false);
+    pa_dynarray_remove_by_data(event->rtpoll->defer_events, event);
+
+    if (event->destroy_callback)
+        event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata);
+
+    pa_xfree(event);
+}
+
+static void defer_event_enable(pa_defer_event *event, int enable) {
+    pa_assert(event);
+
+    if (enable == event->enabled)
+        return;
+
+    event->enabled = enable;
+
+    if (enable)
+        pa_dynarray_append(event->rtpoll->enabled_defer_events, event);
+    else
+        pa_dynarray_remove_by_data(event->rtpoll->enabled_defer_events, event);
+}
+
+static void defer_event_set_destroy(pa_defer_event *event, pa_defer_event_destroy_cb_t callback) {
+    pa_assert(event);
+
+    event->destroy_callback = callback;
+}
+
+static void mainloop_api_quit(pa_mainloop_api *api, int retval) {
+    pa_rtpoll *rtpoll;
+
+    pa_assert(api);
+    pa_assert(api->userdata);
+
+    rtpoll = api->userdata;
+    pa_assert(api == &rtpoll->mainloop_api);
+
+    pa_rtpoll_quit(rtpoll);
+}
+
+static const pa_mainloop_api vtable = {
+    .userdata = NULL,
+
+    .io_new = io_event_new,
+    .io_enable = io_event_enable,
+    .io_free = io_event_free,
+    .io_set_destroy = io_event_set_destroy,
+
+    .time_new = time_event_new,
+    .time_restart = time_event_restart,
+    .time_free = time_event_free,
+    .time_set_destroy = time_event_set_destroy,
+
+    .defer_new = defer_event_new,
+    .defer_enable = defer_event_enable,
+    .defer_free = defer_event_free,
+    .defer_set_destroy = defer_event_set_destroy,
+
+    .quit = mainloop_api_quit,
+};
+
 pa_rtpoll *pa_rtpoll_new(void) {
     pa_rtpoll *p;
 
@@ -99,6 +425,15 @@ pa_rtpoll *pa_rtpoll_new(void) {
     p->timestamp = pa_rtclock_now();
 #endif
 
+    p->mainloop_api = vtable;
+    p->mainloop_api.userdata = p;
+    p->io_events = pa_dynarray_new(NULL);
+    p->time_events = pa_dynarray_new(NULL);
+    p->enabled_time_events = pa_dynarray_new(NULL);
+    p->expired_time_events = pa_dynarray_new(NULL);
+    p->defer_events = pa_dynarray_new(NULL);
+    p->enabled_defer_events = pa_dynarray_new(NULL);
+
     return p;
 }
 
@@ -167,15 +502,108 @@ static void rtpoll_item_destroy(pa_rtpoll_item *i) {
 void pa_rtpoll_free(pa_rtpoll *p) {
     pa_assert(p);
 
+    if (p->defer_events) {
+        pa_defer_event *event;
+
+        while ((event = pa_dynarray_last(p->defer_events)))
+            defer_event_free(event);
+    }
+
+    if (p->time_events) {
+        pa_time_event *event;
+
+        while ((event = pa_dynarray_last(p->time_events)))
+            time_event_free(event);
+    }
+
+    if (p->io_events) {
+        pa_io_event *event;
+
+        while ((event = pa_dynarray_last(p->io_events)))
+            io_event_free(event);
+    }
+
     while (p->items)
         rtpoll_item_destroy(p->items);
 
+    if (p->enabled_defer_events) {
+        pa_assert(pa_dynarray_size(p->enabled_defer_events) == 0);
+        pa_dynarray_free(p->enabled_defer_events);
+    }
+
+    if (p->defer_events) {
+        pa_assert(pa_dynarray_size(p->defer_events) == 0);
+        pa_dynarray_free(p->defer_events);
+    }
+
+    if (p->expired_time_events) {
+        pa_assert(pa_dynarray_size(p->expired_time_events) == 0);
+        pa_dynarray_free(p->expired_time_events);
+    }
+
+    if (p->enabled_time_events) {
+        pa_assert(pa_dynarray_size(p->enabled_time_events) == 0);
+        pa_dynarray_free(p->enabled_time_events);
+    }
+
+    if (p->time_events) {
+        pa_assert(pa_dynarray_size(p->time_events) == 0);
+        pa_dynarray_free(p->time_events);
+    }
+
+    if (p->io_events) {
+        pa_assert(pa_dynarray_size(p->io_events) == 0);
+        pa_dynarray_free(p->io_events);
+    }
+
     pa_xfree(p->pollfd);
     pa_xfree(p->pollfd2);
 
     pa_xfree(p);
 }
 
+pa_mainloop_api *pa_rtpoll_get_mainloop_api(pa_rtpoll *rtpoll) {
+    pa_assert(rtpoll);
+
+    return &rtpoll->mainloop_api;
+}
+
+static void find_expired_time_events(pa_rtpoll *rtpoll) {
+    pa_usec_t now;
+    pa_time_event *event;
+    unsigned idx;
+
+    pa_assert(rtpoll);
+    pa_assert(pa_dynarray_size(rtpoll->expired_time_events) == 0);
+
+    now = pa_rtclock_now();
+
+    PA_DYNARRAY_FOREACH(event, rtpoll->enabled_time_events, idx) {
+        if (event->time <= now)
+            pa_dynarray_append(rtpoll->expired_time_events, event);
+    }
+}
+
+static pa_time_event *find_next_time_event(pa_rtpoll *rtpoll) {
+    pa_time_event *event;
+    pa_time_event *result = NULL;
+    unsigned idx;
+
+    pa_assert(rtpoll);
+
+    if (rtpoll->cached_next_time_event)
+        return rtpoll->cached_next_time_event;
+
+    PA_DYNARRAY_FOREACH(event, rtpoll->enabled_time_events, idx) {
+        if (!result || event->time < result->time)
+            result = event;
+    }
+
+    rtpoll->cached_next_time_event = result;
+
+    return result;
+}
+
 static void reset_revents(pa_rtpoll_item *i) {
     struct pollfd *f;
     unsigned n;
@@ -204,9 +632,14 @@ static void reset_all_revents(pa_rtpoll *p) {
 }
 
 int pa_rtpoll_run(pa_rtpoll *p) {
+    pa_defer_event *defer_event;
+    pa_time_event *time_event;
     pa_rtpoll_item *i;
     int r = 0;
     struct timeval timeout;
+    pa_time_event *next_time_event;
+    struct timeval next_time_event_elapse;
+    bool timer_enabled;
 
     pa_assert(p);
     pa_assert(!p->running);
@@ -218,7 +651,28 @@ int pa_rtpoll_run(pa_rtpoll *p) {
     p->running = true;
     p->timer_elapsed = false;
 
-    /* First, let's do some work */
+    /* Dispatch all enabled defer events. */
+    while ((defer_event = pa_dynarray_last(p->enabled_defer_events))) {
+        if (p->quit)
+            break;
+
+        defer_event->callback(&p->mainloop_api, defer_event, defer_event->userdata);
+    }
+
+    /* Dispatch all expired time events. */
+    find_expired_time_events(p);
+    while ((time_event = pa_dynarray_last(p->expired_time_events))) {
+        struct timeval tv;
+
+        if (p->quit)
+            break;
+
+        time_event_restart(time_event, NULL);
+        time_event->callback(&p->mainloop_api, time_event, pa_timeval_rtstore(&tv, time_event->time, time_event->use_rtclock),
+                             time_event->userdata);
+    }
+
+    /* Let's do some work */
     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
         int k;
 
@@ -282,15 +736,40 @@ int pa_rtpoll_run(pa_rtpoll *p) {
     if (p->rebuild_needed)
         rtpoll_rebuild(p);
 
+    /* Calculate timeout */
+
     pa_zero(timeout);
 
-    /* Calculate timeout */
-    if (!p->quit && p->timer_enabled) {
+    next_time_event = find_next_time_event(p);
+    if (next_time_event)
+        pa_timeval_rtstore(&next_time_event_elapse, next_time_event->time, next_time_event->use_rtclock);
+
+    /* p->timer_enabled and p->next_elapse are controlled by the rtpoll owner,
+     * while the time events can be created by anyone through pa_mainloop_api.
+     * It might be a good idea to merge p->timer_enabled and p->next_elapse
+     * with the time events so that we wouldn't need to handle them separately
+     * here. The reason why they are currently separate is that the
+     * pa_mainloop_api interface was bolted on pa_rtpoll as an afterthought. */
+    timer_enabled = p->timer_enabled || next_time_event;
+
+    if (!p->quit && timer_enabled) {
+        struct timeval *next_elapse;
         struct timeval now;
+
+        if (p->timer_enabled && next_time_event) {
+            if (pa_timeval_cmp(&p->next_elapse, &next_time_event_elapse) > 0)
+                next_elapse = &next_time_event_elapse;
+            else
+                next_elapse = &p->next_elapse;
+        } else if (p->timer_enabled)
+            next_elapse = &p->next_elapse;
+        else
+            next_elapse = &next_time_event_elapse;
+
         pa_rtclock_get(&now);
 
-        if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
-            pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
+        if (pa_timeval_cmp(next_elapse, &now) > 0)
+            pa_timeval_add(&timeout, pa_timeval_diff(next_elapse, &now));
     }
 
 #ifdef DEBUG_TIMING
@@ -298,7 +777,7 @@ int pa_rtpoll_run(pa_rtpoll *p) {
         pa_usec_t now = pa_rtclock_now();
         p->awake = now - p->timestamp;
         p->timestamp = now;
-        if (!p->quit && p->timer_enabled)
+        if (!p->quit && timer_enabled)
             pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
         else if (q->quit)
             pa_log("poll timeout is ZERO");
@@ -313,12 +792,21 @@ int pa_rtpoll_run(pa_rtpoll *p) {
         struct timespec ts;
         ts.tv_sec = timeout.tv_sec;
         ts.tv_nsec = timeout.tv_usec * 1000;
-        r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL);
+        r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || timer_enabled) ? &ts : NULL, NULL);
     }
 #else
-    r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
+    r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
 #endif
 
+    /* FIXME: We don't know whether the pa_rtpoll owner's timer elapsed or one
+     * of the time events created by others through pa_mainloop_api. The alsa
+     * sink and source use pa_rtpoll_timer_elapsed() to check whether *their*
+     * timer elapsed, so this ambiguity is a problem for them in theory.
+     * However, currently the pa_rtpoll objects of the alsa sink and source are
+     * not being used through pa_mainloop_api, so in practice there's no
+     * ambiguity. We could use pa_rtclock_now() to check whether p->next_elapse
+     * is in the past, but we don't do that currently, because pa_rtclock_now()
+     * is somewhat expensive and this ambiguity isn't currently a big issue. */
     p->timer_elapsed = r == 0;
 
 #ifdef DEBUG_TIMING
diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h
index c0a4dda..e2aee73 100644
--- a/src/pulsecore/rtpoll.h
+++ b/src/pulsecore/rtpoll.h
@@ -25,7 +25,9 @@
 #include <sys/types.h>
 #include <limits.h>
 
+#include <pulse/mainloop-api.h>
 #include <pulse/sample.h>
+
 #include <pulsecore/asyncmsgq.h>
 #include <pulsecore/fdsem.h>
 #include <pulsecore/macro.h>
@@ -56,6 +58,8 @@ typedef enum pa_rtpoll_priority {
 pa_rtpoll *pa_rtpoll_new(void);
 void pa_rtpoll_free(pa_rtpoll *p);
 
+pa_mainloop_api *pa_rtpoll_get_mainloop_api(pa_rtpoll *rtpoll);
+
 /* Sleep on the rtpoll until the time event, or any of the fd events
  * is triggered. Returns negative on error, positive if the loop
  * should continue to run, 0 when the loop should be terminated
-- 
1.9.3



More information about the pulseaudio-discuss mailing list