[pulseaudio-discuss] [PATCH 4/5] rtpoll: Implement pa_mainloop_api support
Tanu Kaskinen
tanu.kaskinen at linux.intel.com
Fri Jan 2 05:04:33 PST 2015
The new tunnel sink and source use libpulse to talk to the remote
server, and libpulse requires a pa_mainloop_api implementation for
interacting with the event loop. The tunnel sink and source have so
far been using pa_mainloop, but there are some modules that assume
that all sinks and sources use pa_rtpoll in their IO threads, and
trying to use those modules together with the pa_mainloop based
tunnel sinks and sources will result in crashes (see [1]).
I will switch the event loop implementation in the tunnel modules
to pa_rtpoll, but that requires a pa_mainloop_api implementation in
pa_rtpoll first - that's implemented here.
pa_rtpoll_run() is changed so that it processes all defer events
first, and then all expired time events. The rest of pa_rtpoll_run()
works as before, except the poll timeout calculation now has to also
take the time events into account. IO events use the existing
pa_rtpoll_item interface.
The time events are handled separately from the old timer
functionality, which is somewhat ugly. It might be a good idea to
remove the old timer functionality and only use the time events.
I didn't attempt to do that at this time, because I feared that
adapting the pa_rtpoll users to the new system would be difficult.
[1] https://bugs.freedesktop.org/show_bug.cgi?id=73429
---
src/pulsecore/rtpoll.c | 504 ++++++++++++++++++++++++++++++++++++++++++++++++-
src/pulsecore/rtpoll.h | 4 +
2 files changed, 500 insertions(+), 8 deletions(-)
diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c
index 5f3ca8b..2e1907c 100644
--- a/src/pulsecore/rtpoll.c
+++ b/src/pulsecore/rtpoll.c
@@ -32,6 +32,7 @@
#include <pulse/xmalloc.h>
#include <pulse/timeval.h>
+#include <pulsecore/dynarray.h>
#include <pulsecore/poll.h>
#include <pulsecore/core-error.h>
#include <pulsecore/core-rtclock.h>
@@ -65,6 +66,18 @@ struct pa_rtpoll {
#endif
PA_LLIST_HEAD(pa_rtpoll_item, items);
+
+ pa_mainloop_api mainloop_api;
+
+ pa_dynarray *io_events;
+
+ pa_dynarray *time_events;
+ pa_dynarray *enabled_time_events;
+ pa_dynarray *expired_time_events;
+ pa_time_event *cached_next_time_event;
+
+ pa_dynarray *defer_events;
+ pa_dynarray *enabled_defer_events;
};
struct pa_rtpoll_item {
@@ -84,8 +97,321 @@ struct pa_rtpoll_item {
PA_LLIST_FIELDS(pa_rtpoll_item);
};
+struct pa_io_event {
+ pa_rtpoll *rtpoll;
+ pa_rtpoll_item *rtpoll_item;
+ pa_io_event_flags_t events;
+ pa_io_event_cb_t callback;
+ pa_io_event_destroy_cb_t destroy_callback;
+ void *userdata;
+};
+
+static void io_event_enable(pa_io_event *event, pa_io_event_flags_t events);
+
+struct pa_time_event {
+ pa_rtpoll *rtpoll;
+ pa_usec_t time;
+ bool use_rtclock;
+ bool enabled;
+ pa_time_event_cb_t callback;
+ pa_time_event_destroy_cb_t destroy_callback;
+ void *userdata;
+};
+
+static void time_event_restart(pa_time_event *event, const struct timeval *tv);
+
+struct pa_defer_event {
+ pa_rtpoll *rtpoll;
+ bool enabled;
+ pa_defer_event_cb_t callback;
+ pa_defer_event_destroy_cb_t destroy_callback;
+ void *userdata;
+};
+
+static void defer_event_enable(pa_defer_event *event, int enable);
+
PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
+static short map_flags_to_libc(pa_io_event_flags_t flags) {
+ return (short)
+ ((flags & PA_IO_EVENT_INPUT ? POLLIN : 0) |
+ (flags & PA_IO_EVENT_OUTPUT ? POLLOUT : 0) |
+ (flags & PA_IO_EVENT_ERROR ? POLLERR : 0) |
+ (flags & PA_IO_EVENT_HANGUP ? POLLHUP : 0));
+}
+
+static pa_io_event_flags_t map_flags_from_libc(short flags) {
+ return
+ (flags & POLLIN ? PA_IO_EVENT_INPUT : 0) |
+ (flags & POLLOUT ? PA_IO_EVENT_OUTPUT : 0) |
+ (flags & POLLERR ? PA_IO_EVENT_ERROR : 0) |
+ (flags & POLLHUP ? PA_IO_EVENT_HANGUP : 0);
+}
+
+static int io_event_work_cb(pa_rtpoll_item *item) {
+ pa_io_event *event;
+ struct pollfd *pollfd;
+
+ pa_assert(item);
+
+ event = pa_rtpoll_item_get_userdata(item);
+ pollfd = pa_rtpoll_item_get_pollfd(item, NULL);
+ event->callback(&event->rtpoll->mainloop_api, event, pollfd->fd, map_flags_from_libc(pollfd->revents), event->userdata);
+
+ return 0;
+}
+
+static pa_io_event* io_event_new(pa_mainloop_api *api, int fd, pa_io_event_flags_t events, pa_io_event_cb_t callback,
+ void *userdata) {
+ pa_rtpoll *rtpoll;
+ pa_io_event *event;
+ struct pollfd *pollfd;
+
+ pa_assert(api);
+ pa_assert(api->userdata);
+ pa_assert(fd >= 0);
+ pa_assert(callback);
+
+ rtpoll = api->userdata;
+ pa_assert(api == &rtpoll->mainloop_api);
+
+ event = pa_xnew0(pa_io_event, 1);
+ event->rtpoll = rtpoll;
+ event->rtpoll_item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_NORMAL, 1);
+ pa_rtpoll_item_set_work_callback(event->rtpoll_item, io_event_work_cb);
+ pa_rtpoll_item_set_userdata(event->rtpoll_item, event);
+ pollfd = pa_rtpoll_item_get_pollfd(event->rtpoll_item, NULL);
+ pollfd->fd = fd;
+ event->callback = callback;
+ event->userdata = userdata;
+
+ pa_dynarray_append(rtpoll->io_events, event);
+ io_event_enable(event, events);
+
+ return event;
+}
+
+static void io_event_free(pa_io_event *event) {
+ pa_assert(event);
+
+ pa_dynarray_remove_by_data(event->rtpoll->io_events, event);
+
+ if (event->destroy_callback)
+ event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata);
+
+ if (event->rtpoll_item)
+ pa_rtpoll_item_free(event->rtpoll_item);
+
+ pa_xfree(event);
+}
+
+static void io_event_enable(pa_io_event *event, pa_io_event_flags_t events) {
+ struct pollfd *pollfd;
+
+ pa_assert(event);
+
+ if (events == event->events)
+ return;
+
+ event->events = events;
+
+ pollfd = pa_rtpoll_item_get_pollfd(event->rtpoll_item, NULL);
+ pollfd->events = map_flags_to_libc(events);
+}
+
+static void io_event_set_destroy(pa_io_event *event, pa_io_event_destroy_cb_t callback) {
+ pa_assert(event);
+
+ event->destroy_callback = callback;
+}
+
+static pa_usec_t make_rt(const struct timeval *tv, bool *use_rtclock) {
+ struct timeval ttv;
+
+ if (!tv) {
+ *use_rtclock = false;
+ return PA_USEC_INVALID;
+ }
+
+ ttv = *tv;
+ *use_rtclock = !!(ttv.tv_usec & PA_TIMEVAL_RTCLOCK);
+
+ if (*use_rtclock)
+ ttv.tv_usec &= ~PA_TIMEVAL_RTCLOCK;
+ else
+ pa_rtclock_from_wallclock(&ttv);
+
+ return pa_timeval_load(&ttv);
+}
+
+static pa_time_event* time_event_new(pa_mainloop_api *api, const struct timeval *tv, pa_time_event_cb_t callback,
+ void *userdata) {
+ pa_rtpoll *rtpoll;
+ pa_time_event *event;
+
+ pa_assert(api);
+ pa_assert(api->userdata);
+ pa_assert(callback);
+
+ rtpoll = api->userdata;
+ pa_assert(api == &rtpoll->mainloop_api);
+
+ event = pa_xnew0(pa_time_event, 1);
+ event->rtpoll = rtpoll;
+ event->time = PA_USEC_INVALID;
+ event->callback = callback;
+ event->userdata = userdata;
+
+ pa_dynarray_append(rtpoll->time_events, event);
+ time_event_restart(event, tv);
+
+ return event;
+}
+
+static void time_event_free(pa_time_event *event) {
+ pa_assert(event);
+
+ time_event_restart(event, NULL);
+ pa_dynarray_remove_by_data(event->rtpoll->time_events, event);
+
+ if (event->destroy_callback)
+ event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata);
+
+ pa_xfree(event);
+}
+
+static void time_event_restart(pa_time_event *event, const struct timeval *tv) {
+ pa_usec_t t;
+ bool use_rtclock;
+ bool enabled;
+ bool old_enabled;
+
+ pa_assert(event);
+
+ t = make_rt(tv, &use_rtclock);
+ enabled = (t != PA_USEC_INVALID);
+ old_enabled = event->enabled;
+
+ /* We return early only if the event stays disabled. If the event stays
+ * enabled, we can't return early, because the event time may change. */
+ if (!enabled && !old_enabled)
+ return;
+
+ event->enabled = enabled;
+ event->time = t;
+ event->use_rtclock = use_rtclock;
+
+ if (enabled && !old_enabled)
+ pa_dynarray_append(event->rtpoll->enabled_time_events, event);
+ else if (!enabled) {
+ pa_dynarray_remove_by_data(event->rtpoll->enabled_time_events, event);
+ pa_dynarray_remove_by_data(event->rtpoll->expired_time_events, event);
+ }
+
+ if (event->rtpoll->cached_next_time_event == event)
+ event->rtpoll->cached_next_time_event = NULL;
+
+ if (event->rtpoll->cached_next_time_event && enabled) {
+ pa_assert(event->rtpoll->cached_next_time_event->enabled);
+
+ if (t < event->rtpoll->cached_next_time_event->time)
+ event->rtpoll->cached_next_time_event = event;
+ }
+}
+
+static void time_event_set_destroy(pa_time_event *event, pa_time_event_destroy_cb_t callback) {
+ pa_assert(event);
+
+ event->destroy_callback = callback;
+}
+
+static pa_defer_event* defer_event_new(pa_mainloop_api *api, pa_defer_event_cb_t callback, void *userdata) {
+ pa_rtpoll *rtpoll;
+ pa_defer_event *event;
+
+ pa_assert(api);
+ pa_assert(api->userdata);
+ pa_assert(callback);
+
+ rtpoll = api->userdata;
+ pa_assert(api == &rtpoll->mainloop_api);
+
+ event = pa_xnew0(pa_defer_event, 1);
+ event->rtpoll = rtpoll;
+ event->callback = callback;
+ event->userdata = userdata;
+
+ pa_dynarray_append(rtpoll->defer_events, event);
+ defer_event_enable(event, true);
+
+ return event;
+}
+
+static void defer_event_free(pa_defer_event *event) {
+ pa_assert(event);
+
+ defer_event_enable(event, false);
+ pa_dynarray_remove_by_data(event->rtpoll->defer_events, event);
+
+ if (event->destroy_callback)
+ event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata);
+
+ pa_xfree(event);
+}
+
+static void defer_event_enable(pa_defer_event *event, int enable) {
+ pa_assert(event);
+
+ if (enable == event->enabled)
+ return;
+
+ event->enabled = enable;
+
+ if (enable)
+ pa_dynarray_append(event->rtpoll->enabled_defer_events, event);
+ else
+ pa_dynarray_remove_by_data(event->rtpoll->enabled_defer_events, event);
+}
+
+static void defer_event_set_destroy(pa_defer_event *event, pa_defer_event_destroy_cb_t callback) {
+ pa_assert(event);
+
+ event->destroy_callback = callback;
+}
+
+static void mainloop_api_quit(pa_mainloop_api *api, int retval) {
+ pa_rtpoll *rtpoll;
+
+ pa_assert(api);
+ pa_assert(api->userdata);
+
+ rtpoll = api->userdata;
+ pa_assert(api == &rtpoll->mainloop_api);
+
+ pa_rtpoll_quit(rtpoll);
+}
+
+static const pa_mainloop_api vtable = {
+ .userdata = NULL,
+
+ .io_new = io_event_new,
+ .io_enable = io_event_enable,
+ .io_free = io_event_free,
+ .io_set_destroy = io_event_set_destroy,
+
+ .time_new = time_event_new,
+ .time_restart = time_event_restart,
+ .time_free = time_event_free,
+ .time_set_destroy = time_event_set_destroy,
+
+ .defer_new = defer_event_new,
+ .defer_enable = defer_event_enable,
+ .defer_free = defer_event_free,
+ .defer_set_destroy = defer_event_set_destroy,
+
+ .quit = mainloop_api_quit,
+};
+
pa_rtpoll *pa_rtpoll_new(void) {
pa_rtpoll *p;
@@ -99,6 +425,15 @@ pa_rtpoll *pa_rtpoll_new(void) {
p->timestamp = pa_rtclock_now();
#endif
+ p->mainloop_api = vtable;
+ p->mainloop_api.userdata = p;
+ p->io_events = pa_dynarray_new(NULL);
+ p->time_events = pa_dynarray_new(NULL);
+ p->enabled_time_events = pa_dynarray_new(NULL);
+ p->expired_time_events = pa_dynarray_new(NULL);
+ p->defer_events = pa_dynarray_new(NULL);
+ p->enabled_defer_events = pa_dynarray_new(NULL);
+
return p;
}
@@ -167,15 +502,108 @@ static void rtpoll_item_destroy(pa_rtpoll_item *i) {
void pa_rtpoll_free(pa_rtpoll *p) {
pa_assert(p);
+ if (p->defer_events) {
+ pa_defer_event *event;
+
+ while ((event = pa_dynarray_last(p->defer_events)))
+ defer_event_free(event);
+ }
+
+ if (p->time_events) {
+ pa_time_event *event;
+
+ while ((event = pa_dynarray_last(p->time_events)))
+ time_event_free(event);
+ }
+
+ if (p->io_events) {
+ pa_io_event *event;
+
+ while ((event = pa_dynarray_last(p->io_events)))
+ io_event_free(event);
+ }
+
while (p->items)
rtpoll_item_destroy(p->items);
+ if (p->enabled_defer_events) {
+ pa_assert(pa_dynarray_size(p->enabled_defer_events) == 0);
+ pa_dynarray_free(p->enabled_defer_events);
+ }
+
+ if (p->defer_events) {
+ pa_assert(pa_dynarray_size(p->defer_events) == 0);
+ pa_dynarray_free(p->defer_events);
+ }
+
+ if (p->expired_time_events) {
+ pa_assert(pa_dynarray_size(p->expired_time_events) == 0);
+ pa_dynarray_free(p->expired_time_events);
+ }
+
+ if (p->enabled_time_events) {
+ pa_assert(pa_dynarray_size(p->enabled_time_events) == 0);
+ pa_dynarray_free(p->enabled_time_events);
+ }
+
+ if (p->time_events) {
+ pa_assert(pa_dynarray_size(p->time_events) == 0);
+ pa_dynarray_free(p->time_events);
+ }
+
+ if (p->io_events) {
+ pa_assert(pa_dynarray_size(p->io_events) == 0);
+ pa_dynarray_free(p->io_events);
+ }
+
pa_xfree(p->pollfd);
pa_xfree(p->pollfd2);
pa_xfree(p);
}
+pa_mainloop_api *pa_rtpoll_get_mainloop_api(pa_rtpoll *rtpoll) {
+ pa_assert(rtpoll);
+
+ return &rtpoll->mainloop_api;
+}
+
+static void find_expired_time_events(pa_rtpoll *rtpoll) {
+ pa_usec_t now;
+ pa_time_event *event;
+ unsigned idx;
+
+ pa_assert(rtpoll);
+ pa_assert(pa_dynarray_size(rtpoll->expired_time_events) == 0);
+
+ now = pa_rtclock_now();
+
+ PA_DYNARRAY_FOREACH(event, rtpoll->enabled_time_events, idx) {
+ if (event->time <= now)
+ pa_dynarray_append(rtpoll->expired_time_events, event);
+ }
+}
+
+static pa_time_event *find_next_time_event(pa_rtpoll *rtpoll) {
+ pa_time_event *event;
+ pa_time_event *result = NULL;
+ unsigned idx;
+
+ pa_assert(rtpoll);
+
+ if (rtpoll->cached_next_time_event)
+ return rtpoll->cached_next_time_event;
+
+ PA_DYNARRAY_FOREACH(event, rtpoll->enabled_time_events, idx) {
+ if (!result || event->time < result->time)
+ result = event;
+ }
+
+ rtpoll->cached_next_time_event = result;
+
+ return result;
+}
+
static void reset_revents(pa_rtpoll_item *i) {
struct pollfd *f;
unsigned n;
@@ -204,9 +632,14 @@ static void reset_all_revents(pa_rtpoll *p) {
}
int pa_rtpoll_run(pa_rtpoll *p) {
+ pa_defer_event *defer_event;
+ pa_time_event *time_event;
pa_rtpoll_item *i;
int r = 0;
struct timeval timeout;
+ pa_time_event *next_time_event;
+ struct timeval next_time_event_elapse;
+ bool timer_enabled;
pa_assert(p);
pa_assert(!p->running);
@@ -218,7 +651,28 @@ int pa_rtpoll_run(pa_rtpoll *p) {
p->running = true;
p->timer_elapsed = false;
- /* First, let's do some work */
+ /* Dispatch all enabled defer events. */
+ while ((defer_event = pa_dynarray_last(p->enabled_defer_events))) {
+ if (p->quit)
+ break;
+
+ defer_event->callback(&p->mainloop_api, defer_event, defer_event->userdata);
+ }
+
+ /* Dispatch all expired time events. */
+ find_expired_time_events(p);
+ while ((time_event = pa_dynarray_last(p->expired_time_events))) {
+ struct timeval tv;
+
+ if (p->quit)
+ break;
+
+ time_event_restart(time_event, NULL);
+ time_event->callback(&p->mainloop_api, time_event, pa_timeval_rtstore(&tv, time_event->time, time_event->use_rtclock),
+ time_event->userdata);
+ }
+
+ /* Let's do some work */
for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
int k;
@@ -282,15 +736,40 @@ int pa_rtpoll_run(pa_rtpoll *p) {
if (p->rebuild_needed)
rtpoll_rebuild(p);
+ /* Calculate timeout */
+
pa_zero(timeout);
- /* Calculate timeout */
- if (!p->quit && p->timer_enabled) {
+ next_time_event = find_next_time_event(p);
+ if (next_time_event)
+ pa_timeval_rtstore(&next_time_event_elapse, next_time_event->time, next_time_event->use_rtclock);
+
+ /* p->timer_enabled and p->next_elapse are controlled by the rtpoll owner,
+ * while the time events can be created by anyone through pa_mainloop_api.
+ * It might be a good idea to merge p->timer_enabled and p->next_elapse
+ * with the time events so that we wouldn't need to handle them separately
+ * here. The reason why they are currently separate is that the
+ * pa_mainloop_api interface was bolted on pa_rtpoll as an afterthought. */
+ timer_enabled = p->timer_enabled || next_time_event;
+
+ if (!p->quit && timer_enabled) {
+ struct timeval *next_elapse;
struct timeval now;
+
+ if (p->timer_enabled && next_time_event) {
+ if (pa_timeval_cmp(&p->next_elapse, &next_time_event_elapse) > 0)
+ next_elapse = &next_time_event_elapse;
+ else
+ next_elapse = &p->next_elapse;
+ } else if (p->timer_enabled)
+ next_elapse = &p->next_elapse;
+ else
+ next_elapse = &next_time_event_elapse;
+
pa_rtclock_get(&now);
- if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
- pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
+ if (pa_timeval_cmp(next_elapse, &now) > 0)
+ pa_timeval_add(&timeout, pa_timeval_diff(next_elapse, &now));
}
#ifdef DEBUG_TIMING
@@ -298,7 +777,7 @@ int pa_rtpoll_run(pa_rtpoll *p) {
pa_usec_t now = pa_rtclock_now();
p->awake = now - p->timestamp;
p->timestamp = now;
- if (!p->quit && p->timer_enabled)
+ if (!p->quit && timer_enabled)
pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
else if (q->quit)
pa_log("poll timeout is ZERO");
@@ -313,12 +792,21 @@ int pa_rtpoll_run(pa_rtpoll *p) {
struct timespec ts;
ts.tv_sec = timeout.tv_sec;
ts.tv_nsec = timeout.tv_usec * 1000;
- r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL);
+ r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || timer_enabled) ? &ts : NULL, NULL);
}
#else
- r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
+ r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
#endif
+ /* FIXME: We don't know whether the pa_rtpoll owner's timer elapsed or one
+ * of the time events created by others through pa_mainloop_api. The alsa
+ * sink and source use pa_rtpoll_timer_elapsed() to check whether *their*
+ * timer elapsed, so this ambiguity is a problem for them in theory.
+ * However, currently the pa_rtpoll objects of the alsa sink and source are
+ * not being used through pa_mainloop_api, so in practice there's no
+ * ambiguity. We could use pa_rtclock_now() to check whether p->next_elapse
+ * is in the past, but we don't do that currently, because pa_rtclock_now()
+ * is somewhat expensive and this ambiguity isn't currently a big issue. */
p->timer_elapsed = r == 0;
#ifdef DEBUG_TIMING
diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h
index c0a4dda..e2aee73 100644
--- a/src/pulsecore/rtpoll.h
+++ b/src/pulsecore/rtpoll.h
@@ -25,7 +25,9 @@
#include <sys/types.h>
#include <limits.h>
+#include <pulse/mainloop-api.h>
#include <pulse/sample.h>
+
#include <pulsecore/asyncmsgq.h>
#include <pulsecore/fdsem.h>
#include <pulsecore/macro.h>
@@ -56,6 +58,8 @@ typedef enum pa_rtpoll_priority {
pa_rtpoll *pa_rtpoll_new(void);
void pa_rtpoll_free(pa_rtpoll *p);
+pa_mainloop_api *pa_rtpoll_get_mainloop_api(pa_rtpoll *rtpoll);
+
/* Sleep on the rtpoll until the time event, or any of the fd events
* is triggered. Returns negative on error, positive if the loop
* should continue to run, 0 when the loop should be terminated
--
1.9.3
More information about the pulseaudio-discuss
mailing list