[pulseaudio-commits] Branch 'next' - 3 commits - src/modules src/pulsecore

Tanu Kaskinen tanuk at kemper.freedesktop.org
Fri Mar 16 18:15:42 UTC 2018


 src/modules/alsa/alsa-sink.c                 |   93 +++++++------
 src/modules/alsa/alsa-source.c               |   93 +++++++------
 src/modules/bluetooth/module-bluez4-device.c |  172 +++++++++++++-----------
 src/modules/bluetooth/module-bluez5-device.c |  174 +++++++++++++-----------
 src/modules/echo-cancel/module-echo-cancel.c |   39 +++--
 src/modules/macosx/module-coreaudio-device.c |    8 -
 src/modules/module-combine-sink.c            |   37 +++--
 src/modules/module-equalizer-sink.c          |   34 ++--
 src/modules/module-esound-sink.c             |   59 ++++----
 src/modules/module-ladspa-sink.c             |   34 ++--
 src/modules/module-null-sink.c               |   25 ++-
 src/modules/module-null-source.c             |   21 ++-
 src/modules/module-pipe-sink.c               |   45 +++---
 src/modules/module-remap-sink.c              |   34 ++--
 src/modules/module-remap-source.c            |    4 
 src/modules/module-sine-source.c             |   21 ++-
 src/modules/module-solaris.c                 |  126 ++++++++++--------
 src/modules/module-tunnel-sink-new.c         |   48 ++++--
 src/modules/module-tunnel-source-new.c       |   48 ++++--
 src/modules/module-tunnel.c                  |    8 -
 src/modules/module-virtual-sink.c            |   34 ++--
 src/modules/module-virtual-source.c          |    8 -
 src/modules/module-virtual-surround-sink.c   |   34 ++--
 src/modules/oss/module-oss.c                 |  189 ++++++++++++++-------------
 src/modules/raop/raop-sink.c                 |  121 +++++++++--------
 src/pulsecore/sink.c                         |   20 ++
 src/pulsecore/sink.h                         |   31 +++-
 src/pulsecore/source.c                       |   20 ++
 src/pulsecore/source.h                       |   31 +++-
 29 files changed, 912 insertions(+), 699 deletions(-)

New commits:
commit 96e1fb18b2cb1fa3206c5a3dc80d35c54cd236fc
Author: Tanu Kaskinen <tanuk at iki.fi>
Date:   Tue Mar 13 19:40:38 2018 +0200

    replace sink/source SET_STATE handlers with callbacks
    
    There are no behaviour changes, the code from almost all the SET_STATE
    handlers is moved with minimal changes to the newly introduced
    set_state_in_io_thread() callback. The only exception is module-tunnel,
    which has to call pa_sink_render() after pa_sink.thread_info.state has
    been updated. The set_state_in_io_thread() callback is called before
    updating that variable, so moving the SET_STATE handler code to the
    callback isn't possible.
    
    The purpose of this change is to make it easier to get state change
    handling right in modules. Hooking to the SET_STATE messages in modules
    required care in calling pa_sink/source_process_msg() at the right time
    (or not calling it at all, as was the case on resume failures), and
    there were a few bugs (fixed before this patch). Now the core takes care
    of ordering things correctly.
    
    Another motivation for this change is that there was some talk about
    adding a suspend_cause variable to pa_sink/source.thread_info. The
    variable would be updated in the core SET_STATE handler, but that would
    not work with the old design, because in case of resume failures modules
    didn't call the core message handler.

diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c
index bc61ce37..e8bb2baa 100644
--- a/src/modules/alsa/alsa-sink.c
+++ b/src/modules/alsa/alsa-sink.c
@@ -1184,46 +1184,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
             return 0;
         }
-
-        case PA_SINK_MESSAGE_SET_STATE:
-
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
-                case PA_SINK_SUSPENDED: {
-                    pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
-
-                    suspend(u);
-
-                    break;
-                }
-
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING: {
-                    int r;
-
-                    if (u->sink->thread_info.state == PA_SINK_INIT) {
-                        if (build_pollfd(u) < 0)
-                            /* FIXME: This will cause an assertion failure in
-                             * pa_sink_put(), because with the current design
-                             * pa_sink_put() is not allowed to fail. */
-                            return -PA_ERR_IO;
-                    }
-
-                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
-                        if ((r = unsuspend(u)) < 0)
-                            return r;
-                    }
-
-                    break;
-                }
-
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                case PA_SINK_INVALID_STATE:
-                    ;
-            }
-
-            break;
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -1248,6 +1208,54 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t new_stat
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+
+        case PA_SINK_SUSPENDED: {
+            pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+
+            suspend(u);
+
+            break;
+        }
+
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING: {
+            int r;
+
+            if (u->sink->thread_info.state == PA_SINK_INIT) {
+                if (build_pollfd(u) < 0)
+                    /* FIXME: This will cause an assertion failure, because
+                     * with the current design pa_sink_put() is not allowed
+                     * to fail and pa_sink_put() has no fallback code that
+                     * would start the sink suspended if opening the device
+                     * fails. */
+                    return -PA_ERR_IO;
+            }
+
+            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+                if ((r = unsuspend(u)) < 0)
+                    return r;
+            }
+
+            break;
+        }
+
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+        case PA_SINK_INVALID_STATE:
+            break;
+    }
+
+    return 0;
+}
+
 static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
     struct userdata *u = snd_mixer_elem_get_callback_private(elem);
 
@@ -2360,6 +2368,7 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca
     if (u->use_tsched)
         u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     if (u->ucm_context)
         u->sink->set_port = sink_set_port_ucm_cb;
     else
diff --git a/src/modules/alsa/alsa-source.c b/src/modules/alsa/alsa-source.c
index bdcb6424..f4d07ae6 100644
--- a/src/modules/alsa/alsa-source.c
+++ b/src/modules/alsa/alsa-source.c
@@ -1039,46 +1039,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 
             return 0;
         }
-
-        case PA_SOURCE_MESSAGE_SET_STATE:
-
-            switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
-
-                case PA_SOURCE_SUSPENDED: {
-                    pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
-
-                    suspend(u);
-
-                    break;
-                }
-
-                case PA_SOURCE_IDLE:
-                case PA_SOURCE_RUNNING: {
-                    int r;
-
-                    if (u->source->thread_info.state == PA_SOURCE_INIT) {
-                        if (build_pollfd(u) < 0)
-                            /* FIXME: This will cause an assertion failure in
-                             * pa_source_put(), because with the current design
-                             * pa_source_put() is not allowed to fail. */
-                            return -PA_ERR_IO;
-                    }
-
-                    if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
-                        if ((r = unsuspend(u)) < 0)
-                            return r;
-                    }
-
-                    break;
-                }
-
-                case PA_SOURCE_UNLINKED:
-                case PA_SOURCE_INIT:
-                case PA_SOURCE_INVALID_STATE:
-                    ;
-            }
-
-            break;
     }
 
     return pa_source_process_msg(o, code, data, offset, chunk);
@@ -1103,6 +1063,54 @@ static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t ne
     return 0;
 }
 
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+
+        case PA_SOURCE_SUSPENDED: {
+            pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
+
+            suspend(u);
+
+            break;
+        }
+
+        case PA_SOURCE_IDLE:
+        case PA_SOURCE_RUNNING: {
+            int r;
+
+            if (u->source->thread_info.state == PA_SOURCE_INIT) {
+                if (build_pollfd(u) < 0)
+                    /* FIXME: This will cause an assertion failure, because
+                     * with the current design pa_source_put() is not allowed
+                     * to fail and pa_source_put() has no fallback code that
+                     * would start the source suspended if opening the device
+                     * fails. */
+                    return -PA_ERR_IO;
+            }
+
+            if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
+                if ((r = unsuspend(u)) < 0)
+                    return r;
+            }
+
+            break;
+        }
+
+        case PA_SOURCE_UNLINKED:
+        case PA_SOURCE_INIT:
+        case PA_SOURCE_INVALID_STATE:
+            ;
+    }
+
+    return 0;
+}
+
 static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
     struct userdata *u = snd_mixer_elem_get_callback_private(elem);
 
@@ -2036,6 +2044,7 @@ pa_source *pa_alsa_source_new(pa_module *m, pa_modargs *ma, const char*driver, p
     if (u->use_tsched)
         u->source->update_requested_latency = source_update_requested_latency_cb;
     u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
+    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
     if (u->ucm_context)
         u->source->set_port = source_set_port_ucm_cb;
     else
diff --git a/src/modules/bluetooth/module-bluez4-device.c b/src/modules/bluetooth/module-bluez4-device.c
index c6baee84..85eb7986 100644
--- a/src/modules/bluetooth/module-bluez4-device.c
+++ b/src/modules/bluetooth/module-bluez4-device.c
@@ -386,45 +386,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
     switch (code) {
 
-        case PA_SINK_MESSAGE_SET_STATE:
-
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
-                case PA_SINK_SUSPENDED:
-                    /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
-                    if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
-                        break;
-
-                    /* Stop the device if the source is suspended as well */
-                    if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
-                        /* We deliberately ignore whether stopping
-                         * actually worked. Since the stream_fd is
-                         * closed it doesn't really matter */
-                        bt_transport_release(u);
-
-                    break;
-
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING:
-                    if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
-                        break;
-
-                    /* Resume the device if the source was suspended as well */
-                    if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
-                        if (bt_transport_acquire(u, false) < 0)
-                            return -1;
-                        else
-                            setup_stream(u);
-                    }
-                    break;
-
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                case PA_SINK_INVALID_STATE:
-                    ;
-            }
-            break;
-
         case PA_SINK_MESSAGE_GET_LATENCY: {
 
             if (u->read_smoother) {
@@ -451,55 +412,61 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
-/* Run from IO thread */
-static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
-    struct userdata *u = PA_SOURCE(o)->userdata;
-
-    pa_assert(u->source == PA_SOURCE(o));
-    pa_assert(u->transport);
-
-    switch (code) {
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
 
-        case PA_SOURCE_MESSAGE_SET_STATE:
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
 
-            switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
+    switch (new_state) {
 
-                case PA_SOURCE_SUSPENDED:
-                    /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
-                    if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
-                        break;
+        case PA_SINK_SUSPENDED:
+            /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
+            if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
+                break;
 
-                    /* Stop the device if the sink is suspended as well */
-                    if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
-                        bt_transport_release(u);
+            /* Stop the device if the source is suspended as well */
+            if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
+                /* We deliberately ignore whether stopping
+                 * actually worked. Since the stream_fd is
+                 * closed it doesn't really matter */
+                bt_transport_release(u);
 
-                    if (u->read_smoother)
-                        pa_smoother_pause(u->read_smoother, pa_rtclock_now());
-                    break;
+            break;
 
-                case PA_SOURCE_IDLE:
-                case PA_SOURCE_RUNNING:
-                    if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
-                        break;
-
-                    /* Resume the device if the sink was suspended as well */
-                    if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
-                        if (bt_transport_acquire(u, false) < 0)
-                            return -1;
-                        else
-                            setup_stream(u);
-                    }
-                    /* We don't resume the smoother here. Instead we
-                     * wait until the first packet arrives */
-                    break;
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING:
+            if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
+                break;
 
-                case PA_SOURCE_UNLINKED:
-                case PA_SOURCE_INIT:
-                case PA_SOURCE_INVALID_STATE:
-                    ;
+            /* Resume the device if the source was suspended as well */
+            if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
+                if (bt_transport_acquire(u, false) < 0)
+                    return -1;
+                else
+                    setup_stream(u);
             }
             break;
 
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+        case PA_SINK_INVALID_STATE:
+            break;
+    }
+
+    return 0;
+}
+
+/* Run from IO thread */
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+    struct userdata *u = PA_SOURCE(o)->userdata;
+
+    pa_assert(u->source == PA_SOURCE(o));
+    pa_assert(u->transport);
+
+    switch (code) {
+
         case PA_SOURCE_MESSAGE_GET_LATENCY: {
             int64_t wi, ri;
 
@@ -519,6 +486,53 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
     return pa_source_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+
+        case PA_SOURCE_SUSPENDED:
+            /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
+            if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+                break;
+
+            /* Stop the device if the sink is suspended as well */
+            if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
+                bt_transport_release(u);
+
+            if (u->read_smoother)
+                pa_smoother_pause(u->read_smoother, pa_rtclock_now());
+            break;
+
+        case PA_SOURCE_IDLE:
+        case PA_SOURCE_RUNNING:
+            if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
+                break;
+
+            /* Resume the device if the sink was suspended as well */
+            if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
+                if (bt_transport_acquire(u, false) < 0)
+                    return -1;
+                else
+                    setup_stream(u);
+            }
+            /* We don't resume the smoother here. Instead we
+             * wait until the first packet arrives */
+            break;
+
+        case PA_SOURCE_UNLINKED:
+        case PA_SOURCE_INIT:
+        case PA_SOURCE_INVALID_STATE:
+            break;
+    }
+
+    return 0;
+}
+
 /* Called from main thread context */
 static int device_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct bluetooth_msg *u = BLUETOOTH_MSG(obj);
@@ -1591,6 +1605,7 @@ static int add_sink(struct userdata *u) {
 
         u->sink->userdata = u;
         u->sink->parent.process_msg = sink_process_msg;
+        u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
         u->sink->set_port = sink_set_port_cb;
     }
 
@@ -1663,6 +1678,7 @@ static int add_source(struct userdata *u) {
 
         u->source->userdata = u;
         u->source->parent.process_msg = source_process_msg;
+        u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
         u->source->set_port = source_set_port_cb;
     }
 
diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c
index b83f0eaf..5e189ba2 100644
--- a/src/modules/bluetooth/module-bluez5-device.c
+++ b/src/modules/bluetooth/module-bluez5-device.c
@@ -891,48 +891,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 
     switch (code) {
 
-        case PA_SOURCE_MESSAGE_SET_STATE:
-
-            switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
-
-                case PA_SOURCE_SUSPENDED:
-                    /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
-                    if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
-                        break;
-
-                    /* Stop the device if the sink is suspended as well */
-                    if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
-                        transport_release(u);
-
-                    if (u->read_smoother)
-                        pa_smoother_pause(u->read_smoother, pa_rtclock_now());
-
-                    break;
-
-                case PA_SOURCE_IDLE:
-                case PA_SOURCE_RUNNING:
-                    if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
-                        break;
-
-                    /* Resume the device if the sink was suspended as well */
-                    if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
-                        if (!setup_transport_and_stream(u))
-                            return -1;
-                    }
-
-                    /* We don't resume the smoother here. Instead we
-                     * wait until the first packet arrives */
-
-                    break;
-
-                case PA_SOURCE_UNLINKED:
-                case PA_SOURCE_INIT:
-                case PA_SOURCE_INVALID_STATE:
-                    break;
-            }
-
-            break;
-
         case PA_SOURCE_MESSAGE_GET_LATENCY: {
             int64_t wi, ri;
 
@@ -956,6 +914,53 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
     return pa_source_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+
+        case PA_SOURCE_SUSPENDED:
+            /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
+            if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+                break;
+
+            /* Stop the device if the sink is suspended as well */
+            if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
+                transport_release(u);
+
+            if (u->read_smoother)
+                pa_smoother_pause(u->read_smoother, pa_rtclock_now());
+
+            break;
+
+        case PA_SOURCE_IDLE:
+        case PA_SOURCE_RUNNING:
+            if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
+                break;
+
+            /* Resume the device if the sink was suspended as well */
+            if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state))
+                if (!setup_transport_and_stream(u))
+                    return -1;
+
+            /* We don't resume the smoother here. Instead we
+             * wait until the first packet arrives */
+
+            break;
+
+        case PA_SOURCE_UNLINKED:
+        case PA_SOURCE_INIT:
+        case PA_SOURCE_INVALID_STATE:
+            break;
+    }
+
+    return 0;
+}
+
 /* Run from main thread */
 static void source_set_volume_cb(pa_source *s) {
     uint16_t gain;
@@ -1044,6 +1049,7 @@ static int add_source(struct userdata *u) {
 
     u->source->userdata = u;
     u->source->parent.process_msg = source_process_msg;
+    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
 
     if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) {
         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
@@ -1061,45 +1067,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
     switch (code) {
 
-        case PA_SINK_MESSAGE_SET_STATE:
-
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
-                case PA_SINK_SUSPENDED:
-                    /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
-                    if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
-                        break;
-
-                    /* Stop the device if the source is suspended as well */
-                    if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
-                        /* We deliberately ignore whether stopping
-                         * actually worked. Since the stream_fd is
-                         * closed it doesn't really matter */
-                        transport_release(u);
-
-                    break;
-
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING:
-                    if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
-                        break;
-
-                    /* Resume the device if the source was suspended as well */
-                    if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
-                        if (!setup_transport_and_stream(u))
-                            return -1;
-                    }
-
-                    break;
-
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                case PA_SINK_INVALID_STATE:
-                    break;
-            }
-
-            break;
-
         case PA_SINK_MESSAGE_GET_LATENCY: {
             int64_t wi, ri;
 
@@ -1124,6 +1091,50 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+
+        case PA_SINK_SUSPENDED:
+            /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
+            if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
+                break;
+
+            /* Stop the device if the source is suspended as well */
+            if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
+                /* We deliberately ignore whether stopping
+                 * actually worked. Since the stream_fd is
+                 * closed it doesn't really matter */
+                transport_release(u);
+
+            break;
+
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING:
+            if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
+                break;
+
+            /* Resume the device if the source was suspended as well */
+            if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+                if (!setup_transport_and_stream(u))
+                    return -1;
+
+            break;
+
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+        case PA_SINK_INVALID_STATE:
+            break;
+    }
+
+    return 0;
+}
+
 /* Run from main thread */
 static void sink_set_volume_cb(pa_sink *s) {
     uint16_t gain;
@@ -1213,6 +1224,7 @@ static int add_sink(struct userdata *u) {
 
     u->sink->userdata = u;
     u->sink->parent.process_msg = sink_process_msg;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
 
     if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) {
         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
diff --git a/src/modules/echo-cancel/module-echo-cancel.c b/src/modules/echo-cancel/module-echo-cancel.c
index 7af2f4b2..893c41ee 100644
--- a/src/modules/echo-cancel/module-echo-cancel.c
+++ b/src/modules/echo-cancel/module-echo-cancel.c
@@ -458,19 +458,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
-
-        case PA_SINK_MESSAGE_SET_STATE: {
-            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
-            /* When set to running or idle for the first time, request a rewind
-             * of the master sink to make sure we are heard immediately */
-            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
-                pa_log_debug("Requesting rewind due to state change.");
-                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
-            }
-            break;
-        }
-
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -526,6 +513,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    /* When set to running or idle for the first time, request a rewind
+     * of the master sink to make sure we are heard immediately */
+    if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+        pa_log_debug("Requesting rewind due to state change.");
+        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+    }
+
+    return 0;
+}
+
 /* Called from source I/O thread context */
 static void source_update_requested_latency_cb(pa_source *s) {
     struct userdata *u;
@@ -1926,6 +1930,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg_cb;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
index 22800a8b..bbd416b6 100644
--- a/src/modules/module-combine-sink.c
+++ b/src/modules/module-combine-sink.c
@@ -718,6 +718,25 @@ static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+    bool running;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    running = new_state == PA_SINK_RUNNING;
+    pa_atomic_store(&u->thread_info.running, running);
+
+    if (running)
+        pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
+    else
+        pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
+
+    return 0;
+}
+
 /* Called from IO context */
 static void update_max_request(struct userdata *u) {
     size_t max_request = 0;
@@ -859,19 +878,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
     switch (code) {
 
-        case PA_SINK_MESSAGE_SET_STATE: {
-            bool running = (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING);
-
-            pa_atomic_store(&u->thread_info.running, running);
-
-            if (running)
-                pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
-            else
-                pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
-
-            break;
-        }
-
         case PA_SINK_MESSAGE_GET_LATENCY: {
             pa_usec_t x, y, c;
             int64_t *delay = data;
@@ -1426,6 +1432,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency;
     u->sink->userdata = u;
 
diff --git a/src/modules/module-equalizer-sink.c b/src/modules/module-equalizer-sink.c
index efe95b3f..36029b38 100644
--- a/src/modules/module-equalizer-sink.c
+++ b/src/modules/module-equalizer-sink.c
@@ -267,18 +267,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
             //+ pa_bytes_to_usec(u->latency * fs, ss)
             return 0;
         }
-
-        case PA_SINK_MESSAGE_SET_STATE: {
-            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
-            /* When set to running or idle for the first time, request a rewind
-             * of the master sink to make sure we are heard immediately */
-            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
-                pa_log_debug("Requesting rewind due to state change.");
-                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
-            }
-            break;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -299,6 +287,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    /* When set to running or idle for the first time, request a rewind
+     * of the master sink to make sure we are heard immediately */
+    if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+        pa_log_debug("Requesting rewind due to state change.");
+        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+    }
+
+    return 0;
+}
+
 /* Called from I/O thread context */
 static void sink_request_rewind_cb(pa_sink *s) {
     struct userdata *u;
@@ -1230,6 +1235,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg_cb;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-esound-sink.c b/src/modules/module-esound-sink.c
index d93ad9c5..9fea2da7 100644
--- a/src/modules/module-esound-sink.c
+++ b/src/modules/module-esound-sink.c
@@ -141,32 +141,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
     switch (code) {
 
-        case PA_SINK_MESSAGE_SET_STATE:
-
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
-                case PA_SINK_SUSPENDED:
-                    pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
-
-                    pa_smoother_pause(u->smoother, pa_rtclock_now());
-                    break;
-
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING:
-
-                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
-                        pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
-
-                    break;
-
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                case PA_SINK_INVALID_STATE:
-                    ;
-            }
-
-            break;
-
         case PA_SINK_MESSAGE_GET_LATENCY: {
             pa_usec_t w, r;
 
@@ -194,6 +168,38 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+
+        case PA_SINK_SUSPENDED:
+            pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+
+            pa_smoother_pause(u->smoother, pa_rtclock_now());
+            break;
+
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING:
+
+            if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
+                pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
+
+            break;
+
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+        case PA_SINK_INVALID_STATE:
+            ;
+    }
+
+    return 0;
+}
+
 static void thread_func(void *userdata) {
     struct userdata *u = userdata;
     int write_type = 0;
@@ -611,6 +617,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->userdata = u;
 
     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c
index a2db68e1..de866d96 100644
--- a/src/modules/module-ladspa-sink.c
+++ b/src/modules/module-ladspa-sink.c
@@ -374,18 +374,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
         connect_control_ports(u);
 
         return 0;
-
-        case PA_SINK_MESSAGE_SET_STATE: {
-            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
-            /* When set to running or idle for the first time, request a rewind
-             * of the master sink to make sure we are heard immediately */
-            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
-                pa_log_debug("Requesting rewind due to state change.");
-                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
-            }
-            break;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -406,6 +394,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    /* When set to running or idle for the first time, request a rewind
+     * of the master sink to make sure we are heard immediately */
+    if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+        pa_log_debug("Requesting rewind due to state change.");
+        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+    }
+
+    return 0;
+}
+
 /* Called from I/O thread context */
 static void sink_request_rewind_cb(pa_sink *s) {
     struct userdata *u;
@@ -1298,6 +1303,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg_cb;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c
index 3ace082d..16b0b687 100644
--- a/src/modules/module-null-sink.c
+++ b/src/modules/module-null-sink.c
@@ -89,15 +89,6 @@ static int sink_process_msg(
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
-        case PA_SINK_MESSAGE_SET_STATE:
-
-            if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
-                if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data)))
-                    u->timestamp = pa_rtclock_now();
-            }
-
-            break;
-
         case PA_SINK_MESSAGE_GET_LATENCY: {
             pa_usec_t now;
 
@@ -111,6 +102,21 @@ static int sink_process_msg(
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
+        if (PA_SINK_IS_OPENED(new_state))
+            u->timestamp = pa_rtclock_now();
+    }
+
+    return 0;
+}
+
 static void sink_update_requested_latency_cb(pa_sink *s) {
     struct userdata *u;
     size_t nbytes;
@@ -297,6 +303,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->userdata = u;
 
diff --git a/src/modules/module-null-source.c b/src/modules/module-null-source.c
index 41f17bd9..ae67206a 100644
--- a/src/modules/module-null-source.c
+++ b/src/modules/module-null-source.c
@@ -89,13 +89,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
     struct userdata *u = PA_SOURCE(o)->userdata;
 
     switch (code) {
-        case PA_SOURCE_MESSAGE_SET_STATE:
-
-            if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING)
-                u->timestamp = pa_rtclock_now();
-
-            break;
-
         case PA_SOURCE_MESSAGE_GET_LATENCY: {
             pa_usec_t now;
 
@@ -109,6 +102,19 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
     return pa_source_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    if (new_state == PA_SOURCE_RUNNING)
+        u->timestamp = pa_rtclock_now();
+
+    return 0;
+}
+
 static void source_update_requested_latency_cb(pa_source *s) {
     struct userdata *u;
 
@@ -229,6 +235,7 @@ int pa__init(pa_module*m) {
     u->latency_time = latency_time;
 
     u->source->parent.process_msg = source_process_msg;
+    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
     u->source->update_requested_latency = source_update_requested_latency_cb;
     u->source->userdata = u;
 
diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c
index 995785e1..b2378059 100644
--- a/src/modules/module-pipe-sink.c
+++ b/src/modules/module-pipe-sink.c
@@ -110,24 +110,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
-        case PA_SINK_MESSAGE_SET_STATE:
-            if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
-                if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data)))
-                    u->timestamp = pa_rtclock_now();
-            } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) {
-                if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) {
-                    /* Clear potential FIFO error flag */
-                    u->fifo_error = false;
-
-                    /* Continuously dropping data (clear counter on entering suspended state. */
-                    if (u->bytes_dropped != 0) {
-                        pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped);
-                        u->bytes_dropped = 0;
-                    }
-                }
-            }
-            break;
-
         case PA_SINK_MESSAGE_GET_LATENCY:
             if (u->use_system_clock_for_timing) {
                 pa_usec_t now;
@@ -153,6 +135,32 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
+        if (PA_SINK_IS_OPENED(new_state))
+            u->timestamp = pa_rtclock_now();
+    } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) {
+        if (new_state == PA_SINK_SUSPENDED) {
+            /* Clear potential FIFO error flag */
+            u->fifo_error = false;
+
+            /* Continuously dropping data (clear counter on entering suspended state. */
+            if (u->bytes_dropped != 0) {
+                pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped);
+                u->bytes_dropped = 0;
+            }
+        }
+    }
+
+    return 0;
+}
+
 static void sink_update_requested_latency_cb(pa_sink *s) {
     struct userdata *u;
     size_t nbytes;
@@ -505,6 +513,7 @@ int pa__init(pa_module *m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     if (u->use_system_clock_for_timing)
         u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->userdata = u;
diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c
index ec669879..56e7a85f 100644
--- a/src/modules/module-remap-sink.c
+++ b/src/modules/module-remap-sink.c
@@ -94,18 +94,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
-
-        case PA_SINK_MESSAGE_SET_STATE: {
-            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
-            /* When set to running or idle for the first time, request a rewind
-             * of the master sink to make sure we are heard immediately */
-            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
-                pa_log_debug("Requesting rewind due to state change.");
-                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
-            }
-            break;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -126,6 +114,23 @@ static int sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_s
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    /* When set to running or idle for the first time, request a rewind
+     * of the master sink to make sure we are heard immediately */
+    if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+        pa_log_debug("Requesting rewind due to state change.");
+        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+    }
+
+    return 0;
+}
+
 /* Called from I/O thread context */
 static void sink_request_rewind(pa_sink *s) {
     struct userdata *u;
@@ -411,6 +416,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency;
     u->sink->request_rewind = sink_request_rewind;
     u->sink->userdata = u;
diff --git a/src/modules/module-sine-source.c b/src/modules/module-sine-source.c
index f4c29738..39fb71ab 100644
--- a/src/modules/module-sine-source.c
+++ b/src/modules/module-sine-source.c
@@ -87,13 +87,6 @@ static int source_process_msg(
 
     switch (code) {
 
-        case PA_SOURCE_MESSAGE_SET_STATE:
-
-            if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING)
-                u->timestamp = pa_rtclock_now();
-
-            break;
-
         case PA_SOURCE_MESSAGE_GET_LATENCY: {
             pa_usec_t now, left_to_fill;
 
@@ -109,6 +102,19 @@ static int source_process_msg(
     return pa_source_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    if (new_state == PA_SOURCE_RUNNING)
+        u->timestamp = pa_rtclock_now();
+
+    return 0;
+}
+
 static void source_update_requested_latency_cb(pa_source *s) {
     struct userdata *u;
 
@@ -257,6 +263,7 @@ int pa__init(pa_module*m) {
     }
 
     u->source->parent.process_msg = source_process_msg;
+    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
     u->source->update_requested_latency = source_update_requested_latency_cb;
     u->source->userdata = u;
 
diff --git a/src/modules/module-solaris.c b/src/modules/module-solaris.c
index a4960b8b..e68f2a93 100644
--- a/src/modules/module-solaris.c
+++ b/src/modules/module-solaris.c
@@ -390,51 +390,57 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
         case PA_SINK_MESSAGE_GET_LATENCY:
             *((int64_t*) data) = sink_get_latency(u, &PA_SINK(o)->sample_spec);
             return 0;
+    }
 
-        case PA_SINK_MESSAGE_SET_STATE:
+    return pa_sink_process_msg(o, code, data, offset, chunk);
+}
 
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
 
-                case PA_SINK_SUSPENDED:
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
 
-                    pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+    switch (new_state) {
 
-                    pa_smoother_pause(u->smoother, pa_rtclock_now());
+        case PA_SINK_SUSPENDED:
 
-                    if (!u->source || u->source_suspended)
-                        suspend(u);
+            pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
 
-                    u->sink_suspended = true;
-                    break;
+            pa_smoother_pause(u->smoother, pa_rtclock_now());
 
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING:
-
-                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
-                        pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
-
-                        if (!u->source || u->source_suspended) {
-                            bool mute;
-                            if (unsuspend(u) < 0)
-                                return -1;
-                            u->sink->get_volume(u->sink);
-                            if (u->sink->get_mute(u->sink, &mute) >= 0)
-                                pa_sink_set_mute(u->sink, mute, false);
-                        }
-                        u->sink_suspended = false;
-                    }
-                    break;
+            if (!u->source || u->source_suspended)
+                suspend(u);
 
-                case PA_SINK_INVALID_STATE:
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                    ;
-            }
+            u->sink_suspended = true;
+            break;
+
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING:
+
+            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+                pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
 
+                if (!u->source || u->source_suspended) {
+                    bool mute;
+                    if (unsuspend(u) < 0)
+                        return -1;
+                    u->sink->get_volume(u->sink);
+                    if (u->sink->get_mute(u->sink, &mute) >= 0)
+                        pa_sink_set_mute(u->sink, mute, false);
+                }
+                u->sink_suspended = false;
+            }
             break;
+
+        case PA_SINK_INVALID_STATE:
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+            ;
     }
 
-    return pa_sink_process_msg(o, code, data, offset, chunk);
+    return 0;
 }
 
 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
@@ -445,45 +451,51 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
         case PA_SOURCE_MESSAGE_GET_LATENCY:
             *((pa_usec_t*) data) = source_get_latency(u, &PA_SOURCE(o)->sample_spec);
             return 0;
+    }
 
-        case PA_SOURCE_MESSAGE_SET_STATE:
+    return pa_source_process_msg(o, code, data, offset, chunk);
+}
 
-            switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
 
-                case PA_SOURCE_SUSPENDED:
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
 
-                    pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
+    switch (new_state) {
 
-                    if (!u->sink || u->sink_suspended)
-                        suspend(u);
+        case PA_SOURCE_SUSPENDED:
 
-                    u->source_suspended = true;
-                    break;
+            pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
 
-                case PA_SOURCE_IDLE:
-                case PA_SOURCE_RUNNING:
+            if (!u->sink || u->sink_suspended)
+                suspend(u);
 
-                    if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
-                        if (!u->sink || u->sink_suspended) {
-                            if (unsuspend(u) < 0)
-                                return -1;
-                            u->source->get_volume(u->source);
-                        }
-                        u->source_suspended = false;
-                    }
-                    break;
+            u->source_suspended = true;
+            break;
 
-                case PA_SOURCE_UNLINKED:
-                case PA_SOURCE_INIT:
-                case PA_SOURCE_INVALID_STATE:
-                    ;
+        case PA_SOURCE_IDLE:
+        case PA_SOURCE_RUNNING:
 
+            if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
+                if (!u->sink || u->sink_suspended) {
+                    if (unsuspend(u) < 0)
+                        return -1;
+                    u->source->get_volume(u->source);
+                }
+                u->source_suspended = false;
             }
             break;
 
+        case PA_SOURCE_UNLINKED:
+        case PA_SOURCE_INIT:
+        case PA_SOURCE_INVALID_STATE:
+            ;
+
     }
 
-    return pa_source_process_msg(o, code, data, offset, chunk);
+    return 0;
 }
 
 static void sink_set_volume(pa_sink *s) {
@@ -960,6 +972,7 @@ int pa__init(pa_module *m) {
 
         u->source->userdata = u;
         u->source->parent.process_msg = source_process_msg;
+        u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
 
         pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
         pa_source_set_rtpoll(u->source, u->rtpoll);
@@ -1003,6 +1016,7 @@ int pa__init(pa_module *m) {
         pa_assert(u->sink);
         u->sink->userdata = u;
         u->sink->parent.process_msg = sink_process_msg;
+        u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
 
         pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
         pa_sink_set_rtpoll(u->sink, u->rtpoll);
diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index 8a67b81f..31390337 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -429,28 +429,37 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
             *((int64_t*) data) = remote_latency;
             return 0;
         }
-        case PA_SINK_MESSAGE_SET_STATE:
-            if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
-                break;
+    }
+    return pa_sink_process_msg(o, code, data, offset, chunk);
+}
 
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-                case PA_SINK_SUSPENDED: {
-                    cork_stream(u, true);
-                    break;
-                }
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING: {
-                    cork_stream(u, false);
-                    break;
-                }
-                case PA_SINK_INVALID_STATE:
-                case PA_SINK_INIT:
-                case PA_SINK_UNLINKED:
-                    break;
-            }
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
+        return 0;
+
+    switch (new_state) {
+        case PA_SINK_SUSPENDED: {
+            cork_stream(u, true);
+            break;
+        }
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING: {
+            cork_stream(u, false);
+            break;
+        }
+        case PA_SINK_INVALID_STATE:
+        case PA_SINK_INIT:
+        case PA_SINK_UNLINKED:
             break;
     }
-    return pa_sink_process_msg(o, code, data, offset, chunk);
+
+    return 0;
 }
 
 int pa__init(pa_module *m) {
@@ -545,6 +554,7 @@ int pa__init(pa_module *m) {
     pa_sink_new_data_done(&sink_data);
     u->sink->userdata = u;
     u->sink->parent.process_msg = sink_process_msg_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
 
diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c
index 7ad07711..d0a5414a 100644
--- a/src/modules/module-tunnel-source-new.c
+++ b/src/modules/module-tunnel-source-new.c
@@ -428,28 +428,37 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t
 
             return 0;
         }
-        case PA_SOURCE_MESSAGE_SET_STATE:
-            if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
-                break;
+    }
+    return pa_source_process_msg(o, code, data, offset, chunk);
+}
 
-            switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
-                case PA_SOURCE_SUSPENDED: {
-                    cork_stream(u, true);
-                    break;
-                }
-                case PA_SOURCE_IDLE:
-                case PA_SOURCE_RUNNING: {
-                    cork_stream(u, false);
-                    break;
-                }
-                case PA_SOURCE_INVALID_STATE:
-                case PA_SOURCE_INIT:
-                case PA_SOURCE_UNLINKED:
-                    break;
-            }
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
+        return 0;
+
+    switch (new_state) {
+        case PA_SOURCE_SUSPENDED: {
+            cork_stream(u, true);
+            break;
+        }
+        case PA_SOURCE_IDLE:
+        case PA_SOURCE_RUNNING: {
+            cork_stream(u, false);
+            break;
+        }
+        case PA_SOURCE_INVALID_STATE:
+        case PA_SOURCE_INIT:
+        case PA_SOURCE_UNLINKED:
             break;
     }
-    return pa_source_process_msg(o, code, data, offset, chunk);
+
+    return 0;
 }
 
 int pa__init(pa_module *m) {
@@ -541,6 +550,7 @@ int pa__init(pa_module *m) {
     pa_source_new_data_done(&source_data);
     u->source->userdata = u;
     u->source->parent.process_msg = source_process_msg_cb;
+    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
     u->source->update_requested_latency = source_update_requested_latency_cb;
 
     pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
diff --git a/src/modules/module-virtual-sink.c b/src/modules/module-virtual-sink.c
index ca6ce569..68ad2007 100644
--- a/src/modules/module-virtual-sink.c
+++ b/src/modules/module-virtual-sink.c
@@ -106,18 +106,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
-
-        case PA_SINK_MESSAGE_SET_STATE: {
-            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
-            /* When set to running or idle for the first time, request a rewind
-             * of the master sink to make sure we are heard immediately */
-            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
-                pa_log_debug("Requesting rewind due to state change.");
-                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
-            }
-            break;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -138,6 +126,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    /* When set to running or idle for the first time, request a rewind
+     * of the master sink to make sure we are heard immediately */
+    if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+        pa_log_debug("Requesting rewind due to state change.");
+        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+    }
+
+    return 0;
+}
+
 /* Called from I/O thread context */
 static void sink_request_rewind_cb(pa_sink *s) {
     struct userdata *u;
@@ -556,6 +561,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg_cb;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-virtual-surround-sink.c b/src/modules/module-virtual-surround-sink.c
index 00780d8b..7c5e246c 100644
--- a/src/modules/module-virtual-surround-sink.c
+++ b/src/modules/module-virtual-surround-sink.c
@@ -134,18 +134,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
-
-        case PA_SINK_MESSAGE_SET_STATE: {
-            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
-            /* When set to running or idle for the first time, request a rewind
-             * of the master sink to make sure we are heard immediately */
-            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
-                pa_log_debug("Requesting rewind due to state change.");
-                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
-            }
-            break;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -166,6 +154,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p
     return 0;
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    /* When set to running or idle for the first time, request a rewind
+     * of the master sink to make sure we are heard immediately */
+    if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+        pa_log_debug("Requesting rewind due to state change.");
+        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+    }
+
+    return 0;
+}
+
 /* Called from I/O thread context */
 static void sink_request_rewind_cb(pa_sink *s) {
     struct userdata *u;
@@ -730,6 +735,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg_cb;
     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/oss/module-oss.c b/src/modules/oss/module-oss.c
index 7d1b9f52..d2551bcf 100644
--- a/src/modules/oss/module-oss.c
+++ b/src/modules/oss/module-oss.c
@@ -643,8 +643,6 @@ fail:
 /* Called from IO context */
 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
-    bool do_trigger = false, quick = true;
-    pa_sink_state_t new_state;
 
     switch (code) {
 
@@ -662,68 +660,73 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
             return 0;
         }
+    }
 
-        case PA_SINK_MESSAGE_SET_STATE:
-            new_state = PA_PTR_TO_UINT(data);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
+}
 
-            switch (new_state) {
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+    bool do_trigger = false;
+    bool quick = true;
 
-                case PA_SINK_SUSPENDED:
-                    pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
 
-                    if (!u->source || u->source_suspended)
-                        suspend(u);
+    switch (new_state) {
 
-                    do_trigger = true;
+        case PA_SINK_SUSPENDED:
+            pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
 
-                    u->sink_suspended = true;
-                    break;
+            if (!u->source || u->source_suspended)
+                suspend(u);
 
-                case PA_SINK_IDLE:
-                case PA_SINK_RUNNING:
+            do_trigger = true;
 
-                    if (u->sink->thread_info.state == PA_SINK_INIT) {
-                        do_trigger = true;
-                        quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state);
-                    }
+            u->sink_suspended = true;
+            break;
 
-                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+        case PA_SINK_IDLE:
+        case PA_SINK_RUNNING:
 
-                        if (!u->source || u->source_suspended) {
-                            if (unsuspend(u) < 0)
-                                return -1;
-                            quick = false;
-                        }
+            if (u->sink->thread_info.state == PA_SINK_INIT) {
+                do_trigger = true;
+                quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state);
+            }
 
-                        do_trigger = true;
+            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
 
-                        u->out_mmap_current = 0;
-                        u->out_mmap_saved_nfrags = 0;
+                if (!u->source || u->source_suspended) {
+                    if (unsuspend(u) < 0)
+                        return -1;
+                    quick = false;
+                }
 
-                        u->sink_suspended = false;
-                    }
+                do_trigger = true;
 
-                    break;
+                u->out_mmap_current = 0;
+                u->out_mmap_saved_nfrags = 0;
 
-                case PA_SINK_INVALID_STATE:
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                    ;
+                u->sink_suspended = false;
             }
 
             break;
+
+        case PA_SINK_INVALID_STATE:
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+            ;
     }
 
     if (do_trigger)
         trigger(u, new_state, u->source ? u->source->thread_info.state : PA_SOURCE_INVALID_STATE, quick);
 
-    return pa_sink_process_msg(o, code, data, offset, chunk);
+    return 0;
 }
 
 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SOURCE(o)->userdata;
-    bool do_trigger = false, quick = true;
-    pa_source_state_t new_state;
 
     switch (code) {
 
@@ -740,61 +743,68 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             *((int64_t*) data) = (int64_t)r;
             return 0;
         }
+    }
 
-        case PA_SOURCE_MESSAGE_SET_STATE:
-            new_state = PA_PTR_TO_UINT(data);
+    return pa_source_process_msg(o, code, data, offset, chunk);
+}
 
-            switch (new_state) {
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+    struct userdata *u;
+    bool do_trigger = false;
+    bool quick = true;
 
-                case PA_SOURCE_SUSPENDED:
-                    pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
 
-                    if (!u->sink || u->sink_suspended)
-                        suspend(u);
+    switch (new_state) {
 
-                    do_trigger = true;
+        case PA_SOURCE_SUSPENDED:
+            pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
 
-                    u->source_suspended = true;
-                    break;
+            if (!u->sink || u->sink_suspended)
+                suspend(u);
 
-                case PA_SOURCE_IDLE:
-                case PA_SOURCE_RUNNING:
+            do_trigger = true;
 
-                    if (u->source->thread_info.state == PA_SOURCE_INIT) {
-                        do_trigger = true;
-                        quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state);
-                    }
+            u->source_suspended = true;
+            break;
 
-                    if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
+        case PA_SOURCE_IDLE:
+        case PA_SOURCE_RUNNING:
 
-                        if (!u->sink || u->sink_suspended) {
-                            if (unsuspend(u) < 0)
-                                return -1;
-                            quick = false;
-                        }
+            if (u->source->thread_info.state == PA_SOURCE_INIT) {
+                do_trigger = true;
+                quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state);
+            }
 
-                        do_trigger = true;
+            if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
 
-                        u->in_mmap_current = 0;
-                        u->in_mmap_saved_nfrags = 0;
+                if (!u->sink || u->sink_suspended) {
+                    if (unsuspend(u) < 0)
+                        return -1;
+                    quick = false;
+                }
 
-                        u->source_suspended = false;
-                    }
-                    break;
+                do_trigger = true;
 
-                case PA_SOURCE_UNLINKED:
-                case PA_SOURCE_INIT:
-                case PA_SOURCE_INVALID_STATE:
-                    ;
+                u->in_mmap_current = 0;
+                u->in_mmap_saved_nfrags = 0;
 
+                u->source_suspended = false;
             }
             break;
+
+        case PA_SOURCE_UNLINKED:
+        case PA_SOURCE_INIT:
+        case PA_SOURCE_INVALID_STATE:
+            ;
     }
 
     if (do_trigger)
         trigger(u, u->sink ? u->sink->thread_info.state : PA_SINK_INVALID_STATE, new_state, quick);
 
-    return pa_source_process_msg(o, code, data, offset, chunk);
+    return 0;
 }
 
 static void sink_get_volume(pa_sink *s) {
@@ -1334,6 +1344,7 @@ int pa__init(pa_module*m) {
         }
 
         u->source->parent.process_msg = source_process_msg;
+        u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
         u->source->userdata = u;
 
         pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
@@ -1403,6 +1414,7 @@ int pa__init(pa_module*m) {
         }
 
         u->sink->parent.process_msg = sink_process_msg;
+        u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
         u->sink->userdata = u;
 
         pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
diff --git a/src/modules/raop/raop-sink.c b/src/modules/raop/raop-sink.c
index 936129cf..baa34664 100644
--- a/src/modules/raop/raop-sink.c
+++ b/src/modules/raop/raop-sink.c
@@ -136,64 +136,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     pa_assert(u->raop);
 
     switch (code) {
-        case PA_SINK_MESSAGE_SET_STATE: {
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-                case PA_SINK_SUSPENDED: {
-                    pa_log_debug("RAOP: SUSPENDED");
-
-                    pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
-
-                    /* Issue a TEARDOWN if we are still connected */
-                    if (pa_raop_client_is_alive(u->raop)) {
-                        pa_raop_client_teardown(u->raop);
-                    }
-
-                    break;
-                }
-
-                case PA_SINK_IDLE: {
-                    pa_log_debug("RAOP: IDLE");
-
-                    /* Issue a FLUSH if we're comming from running state */
-                    if (u->sink->thread_info.state == PA_SINK_RUNNING) {
-                        pa_rtpoll_set_timer_disabled(u->rtpoll);
-                        pa_raop_client_flush(u->raop);
-                    }
-
-                    break;
-                }
-
-                case PA_SINK_RUNNING: {
-                    pa_usec_t now;
-
-                    pa_log_debug("RAOP: RUNNING");
-
-                    now = pa_rtclock_now();
-                    pa_smoother_reset(u->smoother, now, false);
-
-                    if (!pa_raop_client_is_alive(u->raop)) {
-                        /* Connecting will trigger a RECORD and start steaming */
-                        pa_raop_client_announce(u->raop);
-                    } else if (!pa_raop_client_can_stream(u->raop)) {
-                        /* RECORD alredy sent, simply start streaming */
-                        pa_raop_client_stream(u->raop);
-                        pa_rtpoll_set_timer_absolute(u->rtpoll, now);
-                        u->write_count = 0;
-                        u->start = now;
-                    }
-
-                    break;
-                }
-
-                case PA_SINK_UNLINKED:
-                case PA_SINK_INIT:
-                case PA_SINK_INVALID_STATE:
-                    break;
-            }
-
-            break;
-        }
-
         case PA_SINK_MESSAGE_GET_LATENCY: {
             int64_t r = 0;
 
@@ -278,6 +220,68 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+    struct userdata *u;
+
+    pa_assert(s);
+    pa_assert_se(u = s->userdata);
+
+    switch (new_state) {
+        case PA_SINK_SUSPENDED:
+            pa_log_debug("RAOP: SUSPENDED");
+
+            pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+
+            /* Issue a TEARDOWN if we are still connected */
+            if (pa_raop_client_is_alive(u->raop)) {
+                pa_raop_client_teardown(u->raop);
+            }
+
+            break;
+
+        case PA_SINK_IDLE:
+            pa_log_debug("RAOP: IDLE");
+
+            /* Issue a FLUSH if we're comming from running state */
+            if (u->sink->thread_info.state == PA_SINK_RUNNING) {
+                pa_rtpoll_set_timer_disabled(u->rtpoll);
+                pa_raop_client_flush(u->raop);
+            }
+
+            break;
+
+        case PA_SINK_RUNNING: {
+            pa_usec_t now;
+
+            pa_log_debug("RAOP: RUNNING");
+
+            now = pa_rtclock_now();
+            pa_smoother_reset(u->smoother, now, false);
+
+            if (!pa_raop_client_is_alive(u->raop)) {
+                /* Connecting will trigger a RECORD and start steaming */
+                pa_raop_client_announce(u->raop);
+            } else if (!pa_raop_client_can_stream(u->raop)) {
+                /* RECORD alredy sent, simply start streaming */
+                pa_raop_client_stream(u->raop);
+                pa_rtpoll_set_timer_absolute(u->rtpoll, now);
+                u->write_count = 0;
+                u->start = now;
+            }
+
+            break;
+        }
+
+        case PA_SINK_UNLINKED:
+        case PA_SINK_INIT:
+        case PA_SINK_INVALID_STATE:
+            break;
+    }
+
+    return 0;
+}
+
 static void sink_set_volume_cb(pa_sink *s) {
     struct userdata *u = s->userdata;
     pa_cvolume hw;
@@ -696,6 +700,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
     pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
     u->sink->userdata = u;
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 6549515b..2c933493 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -151,6 +151,7 @@ static void reset_callbacks(pa_sink *s) {
     pa_assert(s);
 
     s->set_state_in_main_thread = NULL;
+    s->set_state_in_io_thread = NULL;
     s->get_volume = NULL;
     s->set_volume = NULL;
     s->write_volume = NULL;
@@ -2850,6 +2851,13 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
                 (s->thread_info.state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
                 (PA_SINK_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SINK_SUSPENDED);
 
+            if (s->set_state_in_io_thread) {
+                int r;
+
+                if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0)
+                    return r;
+            }
+
             s->thread_info.state = PA_PTR_TO_UINT(userdata);
 
             if (s->thread_info.state == PA_SINK_SUSPENDED) {
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index 0caeb550..e1ea5249 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -124,19 +124,31 @@ struct pa_sink {
 
     bool set_mute_in_progress;
 
-    /* Called when the main loop requests a state change. Called from
-     * main loop context. If returns -1 the state change will be
-     * inhibited. This will also be called even if only the suspend cause
+    /* Callbacks for doing things when the sink state and/or suspend cause is
+     * changed. It's fine to set either or both of the callbacks to NULL if the
+     * implementation doesn't have anything to do on state or suspend cause
      * changes.
      *
-     * s->state and s->suspend_cause haven't been updated yet when this is
-     * called, so the callback can get the old state through those variables.
+     * set_state_in_main_thread() is called first. The callback is allowed to
+     * report failure if and only if the sink changes its state from
+     * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow
+     * failure also when changing state from INIT to IDLE or RUNNING, but
+     * currently that will crash pa_sink_put().) If
+     * set_state_in_main_thread() fails, set_state_in_io_thread() won't be
+     * called.
      *
-     * If set_state_in_main_thread() is successful, the IO thread will be
-     * notified with the SET_STATE message. The message handler is allowed to
-     * fail, in which case the old state is restored, and
-     * set_state_in_main_thread() is called again. */
+     * If set_state_in_main_thread() is successful (or not set), then
+     * set_state_in_io_thread() is called. Again, failure is allowed if and
+     * only if the sink changes state from SUSPENDED to IDLE or RUNNING. If
+     * set_state_in_io_thread() fails, then set_state_in_main_thread() is
+     * called again, this time with the state parameter set to SUSPENDED and
+     * the suspend_cause parameter set to 0.
+     *
+     * pa_sink.state, pa_sink.thread_info.state and pa_sink.suspend_cause
+     * are updated only after all the callback calls. In case of failure, the
+     * state is set to SUSPENDED and the suspend cause is set to 0. */
     int (*set_state_in_main_thread)(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+    int (*set_state_in_io_thread)(pa_sink *s, pa_sink_state_t state); /* may be NULL */
 
     /* Sink drivers that support hardware volume may set this
      * callback. This is called when the current volume needs to be
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index ad8e5e36..dd56eb08 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -142,6 +142,7 @@ static void reset_callbacks(pa_source *s) {
     pa_assert(s);
 
     s->set_state_in_main_thread = NULL;
+    s->set_state_in_io_thread = NULL;
     s->get_volume = NULL;
     s->set_volume = NULL;
     s->write_volume = NULL;
@@ -2224,6 +2225,13 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_
                 (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
                 (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
 
+            if (s->set_state_in_io_thread) {
+                int r;
+
+                if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0)
+                    return r;
+            }
+
             s->thread_info.state = PA_PTR_TO_UINT(userdata);
 
             if (suspend_change) {
diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h
index d60e8a1a..c4fda796 100644
--- a/src/pulsecore/source.h
+++ b/src/pulsecore/source.h
@@ -125,19 +125,31 @@ struct pa_source {
 
     bool set_mute_in_progress;
 
-    /* Called when the main loop requests a state change. Called from
-     * main loop context. If returns -1 the state change will be
-     * inhibited. This will also be called even if only the suspend cause
+    /* Callbacks for doing things when the source state and/or suspend cause is
+     * changed. It's fine to set either or both of the callbacks to NULL if the
+     * implementation doesn't have anything to do on state or suspend cause
      * changes.
      *
-     * s->state and s->suspend_cause haven't been updated yet when this is
-     * called, so the callback can get the old state through those variables.
+     * set_state_in_main_thread() is called first. The callback is allowed to
+     * report failure if and only if the source changes its state from
+     * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow
+     * failure also when changing state from INIT to IDLE or RUNNING, but
+     * currently that will crash pa_source_put().) If
+     * set_state_in_main_thread() fails, set_state_in_io_thread() won't be
+     * called.
      *
-     * If set_state_in_main_thread() is successful, the IO thread will be
-     * notified with the SET_STATE message. The message handler is allowed to
-     * fail, in which case the old state is restored, and
-     * set_state_in_main_thread() is called again. */
-    int (*set_state_in_main_thread)(pa_source *source, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+     * If set_state_in_main_thread() is successful (or not set), then
+     * set_state_in_io_thread() is called. Again, failure is allowed if and
+     * only if the source changes state from SUSPENDED to IDLE or RUNNING. If
+     * set_state_in_io_thread() fails, then set_state_in_main_thread() is
+     * called again, this time with the state parameter set to SUSPENDED and
+     * the suspend_cause parameter set to 0.
+     *
+     * pa_source.state, pa_source.thread_info.state and pa_source.suspend_cause
+     * are updated only after all the callback calls. In case of failure, the
+     * state is set to SUSPENDED and the suspend cause is set to 0. */
+    int (*set_state_in_main_thread)(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+    int (*set_state_in_io_thread)(pa_source *s, pa_source_state_t state); /* may be NULL */
 
     /* Called when the volume is queried. Called from main loop
      * context. If this is NULL a PA_SOURCE_MESSAGE_GET_VOLUME message

commit f9a32974bfda2c18f1d655fd57353c23eac9fc9e
Author: Tanu Kaskinen <tanuk at iki.fi>
Date:   Tue Mar 13 19:40:37 2018 +0200

    oss: don't fail resume if trigger() fails
    
    The previous code made the SET_STATE message fail if trigger() failed.
    However, trigger() was called after pa_sink/source_process_msg(), which
    meant that the main thread that sent the SET_STATE thought that resuming
    failed, but nothing was undone in the IO thread, so in the IO thread
    things seemed as if the sink/source was successfully resumed. (I don't
    use OSS myself, so I don't know what kind of practical problems this
    could cause).
    
    Unless some complex undo logic is implemented, I believe it's best to
    ignore all failures in trigger(). Most error cases were already ignored,
    and the only one that wasn't ignored doesn't seem too serious.
    
    I also moved trigger() to happen before pa_sink/source_process_msg(),
    which made it necessary to add new state parameters to trigger(). The
    reason for this move is that I want to move the SET_STATE handler code
    into a separate callback, and if things are done both before and after
    pa_sink/source_process_msg(), that makes things more complicated.
    
    The previous code checked the return value of
    pa_sink/source_process_msg() before calling trigger(), but that was
    unnecessary, since pa_sink/source_process_msg() never fails when
    processing the SET_STATE messages.

diff --git a/src/modules/oss/module-oss.c b/src/modules/oss/module-oss.c
index fb978b5e..7d1b9f52 100644
--- a/src/modules/oss/module-oss.c
+++ b/src/modules/oss/module-oss.c
@@ -154,20 +154,23 @@ static const char* const valid_modargs[] = {
     NULL
 };
 
-static int trigger(struct userdata *u, bool quick) {
+/* Sink and source states are passed as arguments, because this is called
+ * during state changes, and we need the new state, but thread_info.state
+ * has not yet been updated. */
+static void trigger(struct userdata *u, pa_sink_state_t sink_state, pa_source_state_t source_state, bool quick) {
     int enable_bits = 0, zero = 0;
 
     pa_assert(u);
 
     if (u->fd < 0)
-        return 0;
+        return;
 
     pa_log_debug("trigger");
 
-    if (u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+    if (u->source && PA_SOURCE_IS_OPENED(source_state))
         enable_bits |= PCM_ENABLE_INPUT;
 
-    if (u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state))
+    if (u->sink && PA_SINK_IS_OPENED(sink_state))
         enable_bits |= PCM_ENABLE_OUTPUT;
 
     pa_log_debug("trigger: %i", enable_bits);
@@ -204,21 +207,20 @@ static int trigger(struct userdata *u, bool quick) {
              * register the fd as ready.
              */
 
-            if (u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
+            if (u->source && PA_SOURCE_IS_OPENED(source_state)) {
                 uint8_t *buf = pa_xnew(uint8_t, u->in_fragment_size);
 
-                if (pa_read(u->fd, buf, u->in_fragment_size, NULL) < 0) {
+                /* XXX: Shouldn't this be done only when resuming the source?
+                 * Currently this code path is executed also when resuming the
+                 * sink while the source is already running. */
+
+                if (pa_read(u->fd, buf, u->in_fragment_size, NULL) < 0)
                     pa_log("pa_read() failed: %s", pa_cstrerror(errno));
-                    pa_xfree(buf);
-                    return -1;
-                }
 
                 pa_xfree(buf);
             }
         }
     }
-
-    return 0;
 }
 
 static void mmap_fill_memblocks(struct userdata *u, unsigned n) {
@@ -641,8 +643,8 @@ fail:
 /* Called from IO context */
 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
-    int ret;
     bool do_trigger = false, quick = true;
+    pa_sink_state_t new_state;
 
     switch (code) {
 
@@ -662,8 +664,9 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
         }
 
         case PA_SINK_MESSAGE_SET_STATE:
+            new_state = PA_PTR_TO_UINT(data);
 
-            switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
+            switch (new_state) {
 
                 case PA_SINK_SUSPENDED:
                     pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
@@ -709,23 +712,18 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             }
 
             break;
-
     }
 
-    ret = pa_sink_process_msg(o, code, data, offset, chunk);
+    if (do_trigger)
+        trigger(u, new_state, u->source ? u->source->thread_info.state : PA_SOURCE_INVALID_STATE, quick);
 
-    if (ret >= 0 && do_trigger) {
-        if (trigger(u, quick) < 0)
-            return -1;
-    }
-
-    return ret;
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 
 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SOURCE(o)->userdata;
-    int ret;
-    int do_trigger = false, quick = true;
+    bool do_trigger = false, quick = true;
+    pa_source_state_t new_state;
 
     switch (code) {
 
@@ -744,8 +742,10 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
         }
 
         case PA_SOURCE_MESSAGE_SET_STATE:
+            new_state = PA_PTR_TO_UINT(data);
+
+            switch (new_state) {
 
-            switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
                 case PA_SOURCE_SUSPENDED:
                     pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
 
@@ -789,17 +789,12 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 
             }
             break;
-
     }
 
-    ret = pa_source_process_msg(o, code, data, offset, chunk);
-
-    if (ret >= 0 && do_trigger) {
-        if (trigger(u, quick) < 0)
-            return -1;
-    }
+    if (do_trigger)
+        trigger(u, u->sink ? u->sink->thread_info.state : PA_SINK_INVALID_STATE, new_state, quick);
 
-    return ret;
+    return pa_source_process_msg(o, code, data, offset, chunk);
 }
 
 static void sink_get_volume(pa_sink *s) {

commit ac2846cf0a18df4e61424743fba704f5d381d766
Author: Tanu Kaskinen <tanuk at iki.fi>
Date:   Tue Mar 13 19:40:36 2018 +0200

    sink, source: rename set_state() to set_state_in_main_thread()
    
    There will be a new callback named set_state_in_io_thread(). It seems
    like a good idea to have a similar name for the main thread variant.

diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c
index 5c8ccf31..bc61ce37 100644
--- a/src/modules/alsa/alsa-sink.c
+++ b/src/modules/alsa/alsa-sink.c
@@ -1230,7 +1230,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
     pa_sink_state_t old_state;
     struct userdata *u;
 
@@ -2359,7 +2359,7 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca
     u->sink->parent.process_msg = sink_process_msg;
     if (u->use_tsched)
         u->sink->update_requested_latency = sink_update_requested_latency_cb;
-    u->sink->set_state = sink_set_state_cb;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     if (u->ucm_context)
         u->sink->set_port = sink_set_port_ucm_cb;
     else
diff --git a/src/modules/alsa/alsa-source.c b/src/modules/alsa/alsa-source.c
index fec6c4e0..bdcb6424 100644
--- a/src/modules/alsa/alsa-source.c
+++ b/src/modules/alsa/alsa-source.c
@@ -1085,7 +1085,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 }
 
 /* Called from main context */
-static int source_set_state_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
+static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
     pa_source_state_t old_state;
     struct userdata *u;
 
@@ -2035,7 +2035,7 @@ pa_source *pa_alsa_source_new(pa_module *m, pa_modargs *ma, const char*driver, p
     u->source->parent.process_msg = source_process_msg;
     if (u->use_tsched)
         u->source->update_requested_latency = source_update_requested_latency_cb;
-    u->source->set_state = source_set_state_cb;
+    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
     if (u->ucm_context)
         u->source->set_port = source_set_port_ucm_cb;
     else
diff --git a/src/modules/echo-cancel/module-echo-cancel.c b/src/modules/echo-cancel/module-echo-cancel.c
index 8e416563..7af2f4b2 100644
--- a/src/modules/echo-cancel/module-echo-cancel.c
+++ b/src/modules/echo-cancel/module-echo-cancel.c
@@ -477,7 +477,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 }
 
 /* Called from main context */
-static int source_set_state_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
+static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_source_assert_ref(s);
@@ -502,7 +502,7 @@ static int source_set_state_cb(pa_source *s, pa_source_state_t state, pa_suspend
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -1875,7 +1875,7 @@ int pa__init(pa_module*m) {
     }
 
     u->source->parent.process_msg = source_process_msg_cb;
-    u->source->set_state = source_set_state_cb;
+    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
     u->source->update_requested_latency = source_update_requested_latency_cb;
     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
     if (!u->use_volume_sharing) {
@@ -1925,7 +1925,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg_cb;
-    u->sink->set_state = sink_set_state_cb;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/macosx/module-coreaudio-device.c b/src/modules/macosx/module-coreaudio-device.c
index f9ef7c5a..149109d4 100644
--- a/src/modules/macosx/module-coreaudio-device.c
+++ b/src/modules/macosx/module-coreaudio-device.c
@@ -353,7 +353,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
     return pa_source_process_msg(o, code, data, offset, chunk);;
 }
 
-static int ca_sink_set_state(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int ca_sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     coreaudio_sink *sink = s->userdata;
 
     switch (state) {
@@ -498,7 +498,7 @@ static int ca_device_create_sink(pa_module *m, AudioBuffer *buf, int channel_idx
 
     sink->parent.process_msg = sink_process_msg;
     sink->userdata = ca_sink;
-    sink->set_state = ca_sink_set_state;
+    sink->set_state_in_main_thread = ca_sink_set_state_in_main_thread;
 
     pa_sink_set_asyncmsgq(sink, u->thread_mq.inq);
     pa_sink_set_rtpoll(sink, u->rtpoll);
@@ -511,7 +511,7 @@ static int ca_device_create_sink(pa_module *m, AudioBuffer *buf, int channel_idx
     return 0;
 }
 
-static int ca_source_set_state(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
+static int ca_source_set_state_in_main_thread(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
     coreaudio_source *source = s->userdata;
 
     switch (state) {
@@ -632,7 +632,7 @@ static int ca_device_create_source(pa_module *m, AudioBuffer *buf, int channel_i
 
     source->parent.process_msg = source_process_msg;
     source->userdata = ca_source;
-    source->set_state = ca_source_set_state;
+    source->set_state_in_main_thread = ca_source_set_state_in_main_thread;
 
     pa_source_set_asyncmsgq(source, u->thread_mq.inq);
     pa_source_set_rtpoll(source, u->rtpoll);
diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
index 7a80028a..22800a8b 100644
--- a/src/modules/module-combine-sink.c
+++ b/src/modules/module-combine-sink.c
@@ -680,7 +680,7 @@ static void unsuspend(struct userdata *u) {
 }
 
 /* Called from main context */
-static int sink_set_state(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(sink);
@@ -1425,7 +1425,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
-    u->sink->set_state = sink_set_state;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency;
     u->sink->userdata = u;
 
diff --git a/src/modules/module-equalizer-sink.c b/src/modules/module-equalizer-sink.c
index bcc8dafe..efe95b3f 100644
--- a/src/modules/module-equalizer-sink.c
+++ b/src/modules/module-equalizer-sink.c
@@ -285,7 +285,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -1229,7 +1229,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg_cb;
-    u->sink->set_state = sink_set_state_cb;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c
index 4d5cd68f..a2db68e1 100644
--- a/src/modules/module-ladspa-sink.c
+++ b/src/modules/module-ladspa-sink.c
@@ -392,7 +392,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -1297,7 +1297,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg_cb;
-    u->sink->set_state = sink_set_state_cb;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c
index f063576f..ec669879 100644
--- a/src/modules/module-remap-sink.c
+++ b/src/modules/module-remap-sink.c
@@ -112,7 +112,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 }
 
 /* Called from main context */
-static int sink_set_state(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -410,7 +410,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg;
-    u->sink->set_state = sink_set_state;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread;
     u->sink->update_requested_latency = sink_update_requested_latency;
     u->sink->request_rewind = sink_request_rewind;
     u->sink->userdata = u;
diff --git a/src/modules/module-remap-source.c b/src/modules/module-remap-source.c
index 88eccc22..8901eb90 100644
--- a/src/modules/module-remap-source.c
+++ b/src/modules/module-remap-source.c
@@ -108,7 +108,7 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t
 }
 
 /* Called from main context */
-static int source_set_state_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
+static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_source_assert_ref(s);
@@ -367,7 +367,7 @@ int pa__init(pa_module*m) {
     }
 
     u->source->parent.process_msg = source_process_msg_cb;
-    u->source->set_state = source_set_state_cb;
+    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
     u->source->update_requested_latency = source_update_requested_latency_cb;
 
     u->source->userdata = u;
diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index 1db79ef6..a9f26ad7 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -568,7 +568,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 }
 
 /* Called from main context */
-static int sink_set_state(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
     pa_sink_assert_ref(s);
     u = s->userdata;
@@ -670,7 +670,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 }
 
 /* Called from main context */
-static int source_set_state(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
+static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
     pa_source_assert_ref(s);
     u = s->userdata;
@@ -2156,7 +2156,7 @@ int pa__init(pa_module*m) {
 
     u->sink->parent.process_msg = sink_process_msg;
     u->sink->userdata = u;
-    u->sink->set_state = sink_set_state;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
 
@@ -2199,7 +2199,7 @@ int pa__init(pa_module*m) {
     }
 
     u->source->parent.process_msg = source_process_msg;
-    u->source->set_state = source_set_state;
+    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
     u->source->userdata = u;
 
 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
diff --git a/src/modules/module-virtual-sink.c b/src/modules/module-virtual-sink.c
index 5fa4ce4d..ca6ce569 100644
--- a/src/modules/module-virtual-sink.c
+++ b/src/modules/module-virtual-sink.c
@@ -124,7 +124,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -555,7 +555,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg_cb;
-    u->sink->set_state = sink_set_state_cb;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/modules/module-virtual-source.c b/src/modules/module-virtual-source.c
index c002ae84..c40ffb61 100644
--- a/src/modules/module-virtual-source.c
+++ b/src/modules/module-virtual-source.c
@@ -111,7 +111,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -194,7 +194,7 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t
 }
 
 /* Called from main context */
-static int source_set_state_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
+static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_source_assert_ref(s);
@@ -579,7 +579,7 @@ int pa__init(pa_module*m) {
     }
 
     u->source->parent.process_msg = source_process_msg_cb;
-    u->source->set_state = source_set_state_cb;
+    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
     u->source->update_requested_latency = source_update_requested_latency_cb;
     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
     if (!use_volume_sharing) {
@@ -667,7 +667,7 @@ int pa__init(pa_module*m) {
         u->sink->parent.process_msg = sink_process_msg_cb;
         u->sink->update_requested_latency = sink_update_requested_latency_cb;
         u->sink->request_rewind = sink_request_rewind_cb;
-        u->sink->set_state = sink_set_state_cb;
+        u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
         u->sink->userdata = u;
 
         pa_sink_set_asyncmsgq(u->sink, master->asyncmsgq);
diff --git a/src/modules/module-virtual-surround-sink.c b/src/modules/module-virtual-surround-sink.c
index 876d618e..00780d8b 100644
--- a/src/modules/module-virtual-surround-sink.c
+++ b/src/modules/module-virtual-surround-sink.c
@@ -152,7 +152,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 }
 
 /* Called from main context */
-static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
+static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
     struct userdata *u;
 
     pa_sink_assert_ref(s);
@@ -729,7 +729,7 @@ int pa__init(pa_module*m) {
     }
 
     u->sink->parent.process_msg = sink_process_msg_cb;
-    u->sink->set_state = sink_set_state_cb;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
     u->sink->update_requested_latency = sink_update_requested_latency_cb;
     u->sink->request_rewind = sink_request_rewind_cb;
     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index f19e8b09..6549515b 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -150,7 +150,7 @@ void pa_sink_new_data_done(pa_sink_new_data *data) {
 static void reset_callbacks(pa_sink *s) {
     pa_assert(s);
 
-    s->set_state = NULL;
+    s->set_state_in_main_thread = NULL;
     s->get_volume = NULL;
     s->set_volume = NULL;
     s->write_volume = NULL;
@@ -427,9 +427,9 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t
      * cause, or it might just add unnecessary complexity, given that the
      * current approach of not setting any suspend cause works well enough. */
 
-    if (s->set_state) {
-        ret = s->set_state(s, state, suspend_cause);
-        /* set_state() is allowed to fail only when resuming. */
+    if (s->set_state_in_main_thread) {
+        ret = s->set_state_in_main_thread(s, state, suspend_cause);
+        /* set_state_in_main_thread() is allowed to fail only when resuming. */
         pa_assert(ret >= 0 || resuming);
     }
 
@@ -438,8 +438,8 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t
             /* SET_STATE is allowed to fail only when resuming. */
             pa_assert(resuming);
 
-            if (s->set_state)
-                s->set_state(s, PA_SINK_SUSPENDED, 0);
+            if (s->set_state_in_main_thread)
+                s->set_state_in_main_thread(s, PA_SINK_SUSPENDED, 0);
         }
 
     if (suspend_cause_changed) {
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index b7e21f9f..0caeb550 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -132,10 +132,11 @@ struct pa_sink {
      * s->state and s->suspend_cause haven't been updated yet when this is
      * called, so the callback can get the old state through those variables.
      *
-     * If set_state() is successful, the IO thread will be notified with the
-     * SET_STATE message. The message handler is allowed to fail, in which
-     * case the old state is restored, and set_state() is called again. */
-    int (*set_state)(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+     * If set_state_in_main_thread() is successful, the IO thread will be
+     * notified with the SET_STATE message. The message handler is allowed to
+     * fail, in which case the old state is restored, and
+     * set_state_in_main_thread() is called again. */
+    int (*set_state_in_main_thread)(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
 
     /* Sink drivers that support hardware volume may set this
      * callback. This is called when the current volume needs to be
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 7ea75ff0..ad8e5e36 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -141,7 +141,7 @@ void pa_source_new_data_done(pa_source_new_data *data) {
 static void reset_callbacks(pa_source *s) {
     pa_assert(s);
 
-    s->set_state = NULL;
+    s->set_state_in_main_thread = NULL;
     s->get_volume = NULL;
     s->set_volume = NULL;
     s->write_volume = NULL;
@@ -381,9 +381,9 @@ static int source_set_state(pa_source *s, pa_source_state_t state, pa_suspend_ca
      * cause, or it might just add unnecessary complexity, given that the
      * current approach of not setting any suspend cause works well enough. */
 
-    if (s->set_state) {
-        ret = s->set_state(s, state, suspend_cause);
-        /* set_state() is allowed to fail only when resuming. */
+    if (s->set_state_in_main_thread) {
+        ret = s->set_state_in_main_thread(s, state, suspend_cause);
+        /* set_state_in_main_thread() is allowed to fail only when resuming. */
         pa_assert(ret >= 0 || resuming);
     }
 
@@ -392,8 +392,8 @@ static int source_set_state(pa_source *s, pa_source_state_t state, pa_suspend_ca
             /* SET_STATE is allowed to fail only when resuming. */
             pa_assert(resuming);
 
-            if (s->set_state)
-                s->set_state(s, PA_SOURCE_SUSPENDED, 0);
+            if (s->set_state_in_main_thread)
+                s->set_state_in_main_thread(s, PA_SOURCE_SUSPENDED, 0);
         }
 
     if (suspend_cause_changed) {
diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h
index ea314725..d60e8a1a 100644
--- a/src/pulsecore/source.h
+++ b/src/pulsecore/source.h
@@ -133,10 +133,11 @@ struct pa_source {
      * s->state and s->suspend_cause haven't been updated yet when this is
      * called, so the callback can get the old state through those variables.
      *
-     * If set_state() is successful, the IO thread will be notified with the
-     * SET_STATE message. The message handler is allowed to fail, in which
-     * case the old state is restored, and set_state() is called again. */
-    int (*set_state)(pa_source *source, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+     * If set_state_in_main_thread() is successful, the IO thread will be
+     * notified with the SET_STATE message. The message handler is allowed to
+     * fail, in which case the old state is restored, and
+     * set_state_in_main_thread() is called again. */
+    int (*set_state_in_main_thread)(pa_source *source, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
 
     /* Called when the volume is queried. Called from main loop
      * context. If this is NULL a PA_SOURCE_MESSAGE_GET_VOLUME message



More information about the pulseaudio-commits mailing list