[pulseaudio-discuss] [PATCH] virtual sources and sinks: Don't double attach a sink input or source output on filter load

Georg Chini georg at chini.tk
Tue May 9 14:42:19 UTC 2017


When a filter is loaded and module-switch-on-connect is present, switch-on-connect
will make the filter the default sink or source and move streams from the old
default to the filter. This is done from the sink/source put hook, therefore streams
are moved to the filter before the module init function of the filter calls
sink_input_put() or source_output_put(). The move succeeds because the asyncmsq
already points to the queue of the master sink or source. When the master sink or
source is attached to the sink input or source output, the attach callback will call
pa_{sink,source}_attach_within_thread(). These functions assume that all streams
are detached. Because streams were already moved to the filter by switch-on-connect,
this assumption leads to an assertion in pa_{sink_input,source_output}_attach().

This patch fixes the problem by reverting the order of the pa_{sink,source}_put()
calls and the pa_{sink_input,source_output}_put calls and creating the sink input
or source output corked. The initial rewind that is done for the master sink is
moved to the sink input message handler. The order of the unlink calls is swapped
as well to prevent that the filter appears to be moving during module unload.

The patch also seems to improve user experience, the move of a stream to the filter
sink is now done without any audible interruption on my system.

The patch is only tested for module-echo-cancel.

Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=100065
---
 src/modules/echo-cancel/module-echo-cancel.c | 87 ++++++++++++++++++++--------
 src/modules/module-ladspa-sink.c             | 62 ++++++++++++--------
 src/modules/module-remap-sink.c              | 63 +++++++++++---------
 src/modules/module-remap-source.c            | 35 +++++++----
 src/modules/module-virtual-sink.c            | 62 ++++++++++++--------
 src/modules/module-virtual-source.c          | 34 ++++++++---
 src/modules/module-virtual-surround-sink.c   | 62 ++++++++++++--------
 7 files changed, 261 insertions(+), 144 deletions(-)

diff --git a/src/modules/echo-cancel/module-echo-cancel.c b/src/modules/echo-cancel/module-echo-cancel.c
index 04984f32..8c630f67 100644
--- a/src/modules/echo-cancel/module-echo-cancel.c
+++ b/src/modules/echo-cancel/module-echo-cancel.c
@@ -459,6 +459,18 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
+
+        case PA_SINK_MESSAGE_SET_STATE: {
+            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
+
+            /* When set to running or idle for the first time, request a rewind
+             * of the master sink to make sure we are heard immediately */
+            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+                pa_log_debug("Requesting rewind due to state change.");
+                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+            }
+        }
+
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -883,6 +895,9 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
+    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        return;
+
     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
         pa_log("Push when no link?");
         return;
@@ -959,6 +974,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return -1;
+
     if (u->sink->thread_info.rewind_requested)
         pa_sink_process_rewind(u->sink, 0);
 
@@ -986,6 +1004,10 @@ static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes)
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
+    /* If the source is not yet linked, there is nothing to rewind */
+    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        return;
+
     pa_source_process_rewind(u->source, nbytes);
 
     /* go back on read side, we need to use older sink data for this */
@@ -1005,6 +1027,10 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
+    /* If the sink is not yet linked, there is nothing to rewind */
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return;
+
     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
 
     pa_sink_process_rewind(u->sink, nbytes);
@@ -1248,7 +1274,8 @@ static void source_output_attach_cb(pa_source_output *o) {
 
     pa_log_debug("Source output %d attach", o->index);
 
-    pa_source_attach_within_thread(u->source);
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_attach_within_thread(u->source);
 
     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
             o->source->thread_info.rtpoll,
@@ -1286,7 +1313,8 @@ static void sink_input_attach_cb(pa_sink_input *i) {
             PA_RTPOLL_LATE,
             u->asyncmsgq);
 
-    pa_sink_attach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_attach_within_thread(u->sink);
 }
 
 /* Called from source I/O thread context. */
@@ -1297,7 +1325,8 @@ static void source_output_detach_cb(pa_source_output *o) {
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
-    pa_source_detach_within_thread(u->source);
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_detach_within_thread(u->source);
     pa_source_set_rtpoll(u->source, NULL);
 
     pa_log_debug("Source output %d detach", o->index);
@@ -1315,7 +1344,8 @@ static void sink_input_detach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_detach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_detach_within_thread(u->sink);
 
     pa_sink_set_rtpoll(u->sink, NULL);
 
@@ -1345,14 +1375,6 @@ static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t s
     pa_assert_se(u = i->userdata);
 
     pa_log_debug("Sink input %d state %d", i->index, state);
-
-    /* If we are added for the first time, ask for a rewinding so that
-     * we are heard right-away. */
-    if (PA_SINK_INPUT_IS_LINKED(state) &&
-        i->thread_info.state == PA_SINK_INPUT_INIT && i->sink) {
-        pa_log_debug("Requesting rewind due to state change.");
-        pa_sink_input_request_rewind(i, 0, false, true, true);
-    }
 }
 
 /* Called from main context. */
@@ -1365,11 +1387,12 @@ static void source_output_kill_cb(pa_source_output *o) {
 
     u->dead = true;
 
-    /* The order here matters! We first kill the source output, followed
-     * by the source. That means the source callbacks must be protected
-     * against an unconnected source output! */
-    pa_source_output_unlink(u->source_output);
+    /* The order here matters! We first kill the source so that streams
+     * can properly be moved away while the source output is still conneted
+     * to the master. */
+    pa_source_output_cork(u->source_output, true);
     pa_source_unlink(u->source);
+    pa_source_output_unlink(u->source_output);
 
     pa_source_output_unref(u->source_output);
     u->source_output = NULL;
@@ -1391,11 +1414,12 @@ static void sink_input_kill_cb(pa_sink_input *i) {
 
     u->dead = true;
 
-    /* The order here matters! We first kill the sink input, followed
-     * by the sink. That means the sink callbacks must be protected
-     * against an unconnected sink input! */
-    pa_sink_input_unlink(u->sink_input);
+    /* The order here matters! We first kill the sink so that streams
+     * can properly be moved away while the sink input is still conneted
+     * to the master. */
+    pa_sink_input_cork(u->sink_input, true);
     pa_sink_unlink(u->sink);
+    pa_sink_input_unlink(u->sink_input);
 
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
@@ -1910,6 +1934,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_output_ss);
     pa_source_output_new_data_set_channel_map(&source_output_data, &source_output_map);
+    source_output_data.flags |= PA_SOURCE_OUTPUT_START_CORKED;
 
     if (autoloaded)
         source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
@@ -1947,7 +1972,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
-    sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
+    sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
 
     if (autoloaded)
         sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
@@ -2035,11 +2060,18 @@ int pa__init(pa_module*m) {
     pa_sink_set_latency_range(u->sink, blocksize_usec, blocksize_usec * MAX_LATENCY_BLOCKS);
     pa_sink_input_set_requested_latency(u->sink_input, blocksize_usec * MAX_LATENCY_BLOCKS);
 
+    /* The order here is important. The input/output must be put first,
+     * otherwise streams might attach to the sink/source before the
+     * sink input or sourec output is attached to the master. */
+    pa_sink_input_put(u->sink_input);
+    pa_source_output_put(u->source_output);
+
     pa_sink_put(u->sink);
     pa_source_put(u->source);
 
-    pa_sink_input_put(u->sink_input);
-    pa_source_output_put(u->source_output);
+    pa_source_output_cork(u->source_output, false);
+    pa_sink_input_cork(u->sink_input, false);
+
     pa_modargs_free(ma);
 
     return 0;
@@ -2081,9 +2113,9 @@ void pa__done(pa_module*m) {
         u->core->mainloop->time_free(u->time_event);
 
     if (u->source_output)
-        pa_source_output_unlink(u->source_output);
+        pa_source_output_cork(u->source_output, true);
     if (u->sink_input)
-        pa_sink_input_unlink(u->sink_input);
+        pa_sink_input_cork(u->sink_input, true);
 
     if (u->source)
         pa_source_unlink(u->source);
@@ -2091,6 +2123,11 @@ void pa__done(pa_module*m) {
         pa_sink_unlink(u->sink);
 
     if (u->source_output)
+        pa_source_output_unlink(u->source_output);
+    if (u->sink_input)
+        pa_sink_input_unlink(u->sink_input);
+
+    if (u->source_output)
         pa_source_output_unref(u->source_output);
     if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c
index 549ceca6..59ce1b0b 100644
--- a/src/modules/module-ladspa-sink.c
+++ b/src/modules/module-ladspa-sink.c
@@ -373,6 +373,17 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
         connect_control_ports(u);
 
         return 0;
+
+        case PA_SINK_MESSAGE_SET_STATE: {
+            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
+
+            /* When set to running or idle for the first time, request a rewind
+             * of the master sink to make sure we are heard immediately */
+            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+                pa_log_debug("Requesting rewind due to state change.");
+                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+            }
+        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -453,6 +464,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return -1;
+
     /* Hmm, process any rewind request that might be queued up */
     pa_sink_process_rewind(u->sink, 0);
 
@@ -505,6 +519,10 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
+    /* If the sink is not yet linked, there is nothing to rewind */
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return;
+
     if (u->sink->thread_info.rewind_nbytes > 0) {
         size_t max_rewrite;
 
@@ -583,7 +601,8 @@ static void sink_input_detach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_detach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_detach_within_thread(u->sink);
 
     pa_sink_set_rtpoll(u->sink, NULL);
 }
@@ -604,7 +623,8 @@ static void sink_input_attach_cb(pa_sink_input *i) {
      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
 
-    pa_sink_attach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_attach_within_thread(u->sink);
 }
 
 /* Called from main context */
@@ -614,11 +634,12 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    /* The order here matters! We first kill the sink input, followed
-     * by the sink. That means the sink callbacks must be protected
-     * against an unconnected sink input! */
-    pa_sink_input_unlink(u->sink_input);
+    /* The order here matters! We first kill the sink so that streams
+     * can properly be moved away while the sink input is still conneted
+     * to the master. */
+    pa_sink_input_cork(u->sink_input, true);
     pa_sink_unlink(u->sink);
+    pa_sink_input_unlink(u->sink_input);
 
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
@@ -629,22 +650,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_module_unload_request(u->module, true);
 }
 
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
-    struct userdata *u;
-
-    pa_sink_input_assert_ref(i);
-    pa_assert_se(u = i->userdata);
-
-    /* If we are added for the first time, ask for a rewinding so that
-     * we are heard right-away. */
-    if (PA_SINK_INPUT_IS_LINKED(state) &&
-            i->thread_info.state == PA_SINK_INPUT_INIT && i->sink) {
-        pa_log_debug("Requesting rewind due to state change.");
-        pa_sink_input_request_rewind(i, 0, false, true, true);
-    }
-}
-
 /* Called from main context */
 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
     struct userdata *u;
@@ -1301,6 +1306,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
+    sink_input_data.flags |= PA_SINK_INPUT_START_CORKED;
 
     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
     pa_sink_input_new_data_done(&sink_input_data);
@@ -1317,7 +1323,6 @@ int pa__init(pa_module*m) {
     u->sink_input->kill = sink_input_kill_cb;
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
-    u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->may_move_to = sink_input_may_move_to_cb;
     u->sink_input->moving = sink_input_moving_cb;
     u->sink_input->mute_changed = sink_input_mute_changed_cb;
@@ -1329,8 +1334,12 @@ int pa__init(pa_module*m) {
     u->memblockq = pa_memblockq_new("module-ladspa-sink memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0, &ss, 1, 1, 0, &silence);
     pa_memblock_unref(silence.memblock);
 
-    pa_sink_put(u->sink);
+    /* The order here is important. The input must be put first,
+     * otherwise streams might attach to the sink before the sink
+     * input is attached to the master. */
     pa_sink_input_put(u->sink_input);
+    pa_sink_put(u->sink);
+    pa_sink_input_cork(u->sink_input, false);
 
 #ifdef HAVE_DBUS
     dbus_init(u);
@@ -1375,12 +1384,15 @@ void pa__done(pa_module*m) {
 #endif
 
     if (u->sink_input)
-        pa_sink_input_unlink(u->sink_input);
+        pa_sink_input_cork(u->sink_input, true);
 
     if (u->sink)
         pa_sink_unlink(u->sink);
 
     if (u->sink_input)
+        pa_sink_input_unlink(u->sink_input);
+
+    if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
 
     if (u->sink)
diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c
index 37f4f56c..48760291 100644
--- a/src/modules/module-remap-sink.c
+++ b/src/modules/module-remap-sink.c
@@ -96,6 +96,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
+
+        case PA_SINK_MESSAGE_SET_STATE: {
+            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
+
+            /* When set to running or idle for the first time, request a rewind
+             * of the master sink to make sure we are heard immediately */
+            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+                pa_log_debug("Requesting rewind due to state change.");
+                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+            }
+        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -155,6 +166,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return -1;
+
     /* Hmm, process any rewind request that might be queued up */
     pa_sink_process_rewind(u->sink, 0);
 
@@ -170,6 +184,10 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
+    /* If the sink is not yet linked, there is nothing to rewind */
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return;
+
     if (u->sink->thread_info.rewind_nbytes > 0) {
         amount = PA_MIN(u->sink->thread_info.rewind_nbytes, nbytes);
         u->sink->thread_info.rewind_nbytes = 0;
@@ -227,7 +245,8 @@ static void sink_input_detach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_detach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_detach_within_thread(u->sink);
 
     pa_sink_set_rtpoll(u->sink, NULL);
 }
@@ -248,7 +267,8 @@ static void sink_input_attach_cb(pa_sink_input *i) {
      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
 
-    pa_sink_attach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_attach_within_thread(u->sink);
 }
 
 /* Called from main context */
@@ -258,11 +278,12 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    /* The order here matters! We first kill the sink input, followed
-     * by the sink. That means the sink callbacks must be protected
-     * against an unconnected sink input! */
-    pa_sink_input_unlink(u->sink_input);
+    /* The order here matters! We first kill the sink so that streams
+     * can properly be moved away while the sink input is still conneted
+     * to the master. */
+    pa_sink_input_cork(u->sink_input, true);
     pa_sink_unlink(u->sink);
+    pa_sink_input_unlink(u->sink_input);
 
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
@@ -273,22 +294,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_module_unload_request(u->module, true);
 }
 
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
-    struct userdata *u;
-
-    pa_sink_input_assert_ref(i);
-    pa_assert_se(u = i->userdata);
-
-    /* If we are added for the first time, ask for a rewinding so that
-     * we are heard right-away. */
-    if (PA_SINK_INPUT_IS_LINKED(state) &&
-        i->thread_info.state == PA_SINK_INPUT_INIT && i->sink) {
-        pa_log_debug("Requesting rewind due to state change.");
-        pa_sink_input_request_rewind(i, 0, false, true, true);
-    }
-}
-
 /* Called from main context */
 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
     struct userdata *u;
@@ -423,7 +428,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &stream_map);
-    sink_input_data.flags = (remix ? 0 : PA_SINK_INPUT_NO_REMIX);
+    sink_input_data.flags = (remix ? 0 : PA_SINK_INPUT_NO_REMIX) | PA_SINK_INPUT_START_CORKED;
     sink_input_data.resample_method = resample_method;
 
     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
@@ -441,14 +446,17 @@ int pa__init(pa_module*m) {
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
     u->sink_input->kill = sink_input_kill_cb;
-    u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->moving = sink_input_moving_cb;
     u->sink_input->userdata = u;
 
     u->sink->input_to_master = u->sink_input;
 
-    pa_sink_put(u->sink);
+    /* The order here is important. The input must be put first,
+     * otherwise streams might attach to the sink before the sink
+     * input is attached to the master. */
     pa_sink_input_put(u->sink_input);
+    pa_sink_put(u->sink);
+    pa_sink_input_cork(u->sink_input, false);
 
     pa_modargs_free(ma);
 
@@ -484,12 +492,15 @@ void pa__done(pa_module*m) {
      * destruction order! */
 
     if (u->sink_input)
-        pa_sink_input_unlink(u->sink_input);
+        pa_sink_input_cork(u->sink_input, true);
 
     if (u->sink)
         pa_sink_unlink(u->sink);
 
     if (u->sink_input)
+        pa_sink_input_unlink(u->sink_input);
+
+    if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
 
     if (u->sink)
diff --git a/src/modules/module-remap-source.c b/src/modules/module-remap-source.c
index 0bdeb381..8ac2f66f 100644
--- a/src/modules/module-remap-source.c
+++ b/src/modules/module-remap-source.c
@@ -151,6 +151,9 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
+    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        return;
+
     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
         pa_log("push when no link?");
         return;
@@ -167,7 +170,9 @@ static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes)
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
-    pa_source_process_rewind(u->source, nbytes);
+    /* If the source is not yet linked, there is nothing to rewind */
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_process_rewind(u->source, nbytes);
 }
 
 /* Called from output thread context */
@@ -178,7 +183,8 @@ static void source_output_detach_cb(pa_source_output *o) {
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
-    pa_source_detach_within_thread(u->source);
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_detach_within_thread(u->source);
 
     pa_source_set_rtpoll(u->source, NULL);
 }
@@ -196,7 +202,8 @@ static void source_output_attach_cb(pa_source_output *o) {
     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
 
-    pa_source_attach_within_thread(u->source);
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_attach_within_thread(u->source);
 }
 
 /* Called from main thread */
@@ -207,11 +214,12 @@ static void source_output_kill_cb(pa_source_output *o) {
     pa_assert_ctl_context();
     pa_assert_se(u = o->userdata);
 
-    /* The order here matters! We first kill the source output, followed
-     * by the source. That means the source callbacks must be protected
-     * against an unconnected source output! */
-    pa_source_output_unlink(u->source_output);
+    /* The order here matters! We first kill the source so that streams
+     * can properly be moved away while the source output is still conneted
+     * to the master. */
+    pa_source_output_cork(u->source_output, true);
     pa_source_unlink(u->source);
+    pa_source_output_unlink(u->source_output);
 
     pa_source_output_unref(u->source_output);
     u->source_output = NULL;
@@ -368,7 +376,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
     pa_source_output_new_data_set_channel_map(&source_output_data, &stream_map);
-    source_output_data.flags = remix ? 0 : PA_SOURCE_OUTPUT_NO_REMIX;
+    source_output_data.flags = (remix ? 0 : PA_SOURCE_OUTPUT_NO_REMIX) | PA_SOURCE_OUTPUT_START_CORKED;
     source_output_data.resample_method = resample_method;
 
     pa_source_output_new(&u->source_output, m->core, &source_output_data);
@@ -388,8 +396,12 @@ int pa__init(pa_module*m) {
 
     u->source->output_from_master = u->source_output;
 
-    pa_source_put(u->source);
+    /* The order here is important. The output must be put first,
+     * otherwise streams might attach to the source before the
+     * sourec output is attached to the master. */
     pa_source_output_put(u->source_output);
+    pa_source_put(u->source);
+    pa_source_output_cork(u->source_output, false);
 
     pa_modargs_free(ma);
 
@@ -425,12 +437,15 @@ void pa__done(pa_module*m) {
      * destruction order! */
 
     if (u->source_output)
-        pa_source_output_unlink(u->source_output);
+        pa_source_output_cork(u->source_output, true);
 
     if (u->source)
         pa_source_unlink(u->source);
 
     if (u->source_output)
+        pa_source_output_unlink(u->source_output);
+
+    if (u->source_output)
         pa_source_output_unref(u->source_output);
 
     if (u->source)
diff --git a/src/modules/module-virtual-sink.c b/src/modules/module-virtual-sink.c
index 4fa4a56e..18dcea81 100644
--- a/src/modules/module-virtual-sink.c
+++ b/src/modules/module-virtual-sink.c
@@ -108,6 +108,17 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
+
+        case PA_SINK_MESSAGE_SET_STATE: {
+            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
+
+            /* When set to running or idle for the first time, request a rewind
+             * of the master sink to make sure we are heard immediately */
+            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+                pa_log_debug("Requesting rewind due to state change.");
+                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+            }
+        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -203,6 +214,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return -1;
+
     /* Hmm, process any rewind request that might be queued up */
     pa_sink_process_rewind(u->sink, 0);
 
@@ -271,6 +285,10 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
+    /* If the sink is not yet linked, there is nothing to rewind */
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return;
+
     if (u->sink->thread_info.rewind_nbytes > 0) {
         size_t max_rewrite;
 
@@ -346,7 +364,8 @@ static void sink_input_detach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_detach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_detach_within_thread(u->sink);
 
     pa_sink_set_rtpoll(u->sink, NULL);
 }
@@ -374,7 +393,8 @@ static void sink_input_attach_cb(pa_sink_input *i) {
      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
 
-    pa_sink_attach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_attach_within_thread(u->sink);
 }
 
 /* Called from main context */
@@ -384,11 +404,12 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    /* The order here matters! We first kill the sink input, followed
-     * by the sink. That means the sink callbacks must be protected
-     * against an unconnected sink input! */
-    pa_sink_input_unlink(u->sink_input);
+    /* The order here matters! We first kill the sink so that streams
+     * can properly be moved away while the sink input is still conneted
+     * to the master. */
+    pa_sink_input_cork(u->sink_input, true);
     pa_sink_unlink(u->sink);
+    pa_sink_input_unlink(u->sink_input);
 
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
@@ -399,22 +420,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_module_unload_request(u->module, true);
 }
 
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
-    struct userdata *u;
-
-    pa_sink_input_assert_ref(i);
-    pa_assert_se(u = i->userdata);
-
-    /* If we are added for the first time, ask for a rewinding so that
-     * we are heard right-away. */
-    if (PA_SINK_INPUT_IS_LINKED(state) &&
-        i->thread_info.state == PA_SINK_INPUT_INIT && i->sink) {
-        pa_log_debug("Requesting rewind due to state change.");
-        pa_sink_input_request_rewind(i, 0, false, true, true);
-    }
-}
-
 /* Called from main context */
 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
     struct userdata *u;
@@ -576,6 +581,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
+    sink_input_data.flags |= PA_SINK_INPUT_START_CORKED;
 
     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
     pa_sink_input_new_data_done(&sink_input_data);
@@ -592,7 +598,6 @@ int pa__init(pa_module*m) {
     u->sink_input->kill = sink_input_kill_cb;
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
-    u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->moving = sink_input_moving_cb;
     u->sink_input->volume_changed = use_volume_sharing ? NULL : sink_input_volume_changed_cb;
     u->sink_input->mute_changed = sink_input_mute_changed_cb;
@@ -606,8 +611,12 @@ int pa__init(pa_module*m) {
 
     /* (9) INITIALIZE ANYTHING ELSE YOU NEED HERE */
 
-    pa_sink_put(u->sink);
+    /* The order here is important. The input must be put first,
+     * otherwise streams might attach to the sink before the sink
+     * input is attached to the master. */
     pa_sink_input_put(u->sink_input);
+    pa_sink_put(u->sink);
+    pa_sink_input_cork(u->sink_input, false);
 
     pa_modargs_free(ma);
 
@@ -643,12 +652,15 @@ void pa__done(pa_module*m) {
      * destruction order! */
 
     if (u->sink_input)
-        pa_sink_input_unlink(u->sink_input);
+        pa_sink_input_cork(u->sink_input, true);
 
     if (u->sink)
         pa_sink_unlink(u->sink);
 
     if (u->sink_input)
+        pa_sink_input_unlink(u->sink_input);
+
+    if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
 
     if (u->sink)
diff --git a/src/modules/module-virtual-source.c b/src/modules/module-virtual-source.c
index 42aefd05..9ab7fa9c 100644
--- a/src/modules/module-virtual-source.c
+++ b/src/modules/module-virtual-source.c
@@ -263,6 +263,9 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
+    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        return;
+
     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
         pa_log("push when no link?");
         return;
@@ -356,6 +359,10 @@ static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes)
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
+    /* If the source is not yet linked, there is nothing to rewind */
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_process_rewind(u->source, nbytes);
+
     /* FIXME, no idea what I am doing here */
 #if 0
     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
@@ -376,7 +383,8 @@ static void source_output_attach_cb(pa_source_output *o) {
     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
 
-    pa_source_attach_within_thread(u->source);
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_attach_within_thread(u->source);
 }
 
 /* Called from output thread context */
@@ -387,7 +395,8 @@ static void source_output_detach_cb(pa_source_output *o) {
     pa_source_output_assert_io_context(o);
     pa_assert_se(u = o->userdata);
 
-    pa_source_detach_within_thread(u->source);
+    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
+        pa_source_detach_within_thread(u->source);
     pa_source_set_rtpoll(u->source, NULL);
 }
 
@@ -419,11 +428,12 @@ static void source_output_kill_cb(pa_source_output *o) {
     pa_assert_ctl_context();
     pa_assert_se(u = o->userdata);
 
-    /* The order here matters! We first kill the source output, followed
-     * by the source. That means the source callbacks must be protected
-     * against an unconnected source output! */
-    pa_source_output_unlink(u->source_output);
+    /* The order here matters! We first kill the source so that streams
+     * can properly be moved away while the source output is still conneted
+     * to the master. */
+    pa_source_output_cork(u->source_output, true);
     pa_source_unlink(u->source);
+    pa_source_output_unlink(u->source_output);
 
     pa_source_output_unref(u->source_output);
     u->source_output = NULL;
@@ -585,6 +595,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
     pa_source_output_new_data_set_channel_map(&source_output_data, &map);
+    source_output_data.flags |= PA_SOURCE_OUTPUT_START_CORKED;
 
     pa_source_output_new(&u->source_output, m->core, &source_output_data);
     pa_source_output_new_data_done(&source_output_data);
@@ -603,8 +614,12 @@ int pa__init(pa_module*m) {
 
     u->source->output_from_master = u->source_output;
 
-    pa_source_put(u->source);
+    /* The order here is important. The output must be put first,
+     * otherwise streams might attach to the source before the
+     * sourec output is attached to the master. */
     pa_source_output_put(u->source_output);
+    pa_source_put(u->source);
+    pa_source_output_cork(u->source_output, false);
 
     /* Create optional uplink sink */
     pa_sink_new_data_init(&sink_data);
@@ -694,12 +709,15 @@ void pa__done(pa_module*m) {
      * destruction order! */
 
     if (u->source_output)
-        pa_source_output_unlink(u->source_output);
+        pa_source_output_cork(u->source_output, true);
 
     if (u->source)
         pa_source_unlink(u->source);
 
     if (u->source_output)
+        pa_source_output_unlink(u->source_output);
+
+    if (u->source_output)
         pa_source_output_unref(u->source_output);
 
     if (u->source)
diff --git a/src/modules/module-virtual-surround-sink.c b/src/modules/module-virtual-surround-sink.c
index 23c6bdc5..153d03ab 100644
--- a/src/modules/module-virtual-surround-sink.c
+++ b/src/modules/module-virtual-surround-sink.c
@@ -136,6 +136,17 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
             return 0;
+
+        case PA_SINK_MESSAGE_SET_STATE: {
+            pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
+
+            /* When set to running or idle for the first time, request a rewind
+             * of the master sink to make sure we are heard immediately */
+            if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+                pa_log_debug("Requesting rewind due to state change.");
+                pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+            }
+        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -233,6 +244,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return -1;
+
     /* Hmm, process any rewind request that might be queued up */
     pa_sink_process_rewind(u->sink, 0);
 
@@ -300,6 +314,10 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
+    /* If the sink is not yet linked, there is nothing to rewind */
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        return;
+
     if (u->sink->thread_info.rewind_nbytes > 0) {
         size_t max_rewrite;
 
@@ -370,7 +388,8 @@ static void sink_input_detach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_detach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_detach_within_thread(u->sink);
 
     pa_sink_set_rtpoll(u->sink, NULL);
 }
@@ -393,7 +412,8 @@ static void sink_input_attach_cb(pa_sink_input *i) {
      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i) * u->sink_fs / u->fs);
 
-    pa_sink_attach_within_thread(u->sink);
+    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
+        pa_sink_attach_within_thread(u->sink);
 }
 
 /* Called from main context */
@@ -403,11 +423,12 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    /* The order here matters! We first kill the sink input, followed
-     * by the sink. That means the sink callbacks must be protected
-     * against an unconnected sink input! */
-    pa_sink_input_unlink(u->sink_input);
+    /* The order here matters! We first kill the sink so that streams
+     * can properly be moved away while the sink input is still conneted
+     * to the master. */
+    pa_sink_input_cork(u->sink_input, true);
     pa_sink_unlink(u->sink);
+    pa_sink_input_unlink(u->sink_input);
 
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
@@ -418,22 +439,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_module_unload_request(u->module, true);
 }
 
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
-    struct userdata *u;
-
-    pa_sink_input_assert_ref(i);
-    pa_assert_se(u = i->userdata);
-
-    /* If we are added for the first time, ask for a rewinding so that
-     * we are heard right-away. */
-    if (PA_SINK_INPUT_IS_LINKED(state) &&
-        i->thread_info.state == PA_SINK_INPUT_INIT && i->sink) {
-        pa_log_debug("Requesting rewind due to state change.");
-        pa_sink_input_request_rewind(i, 0, false, true, true);
-    }
-}
-
 /* Called from main context */
 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
     struct userdata *u;
@@ -750,6 +755,7 @@ int pa__init(pa_module*m) {
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_input_ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_input_map);
+    sink_input_data.flags |= PA_SINK_INPUT_START_CORKED;
 
     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
     pa_sink_input_new_data_done(&sink_input_data);
@@ -766,7 +772,6 @@ int pa__init(pa_module*m) {
     u->sink_input->kill = sink_input_kill_cb;
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
-    u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->may_move_to = sink_input_may_move_to_cb;
     u->sink_input->moving = sink_input_moving_cb;
     u->sink_input->volume_changed = use_volume_sharing ? NULL : sink_input_volume_changed_cb;
@@ -865,8 +870,12 @@ int pa__init(pa_module*m) {
     u->input_buffer = pa_xmalloc0(u->hrir_samples * u->sink_fs);
     u->input_buffer_offset = 0;
 
-    pa_sink_put(u->sink);
+    /* The order here is important. The input must be put first,
+     * otherwise streams might attach to the sink before the sink
+     * input is attached to the master. */
     pa_sink_input_put(u->sink_input);
+    pa_sink_put(u->sink);
+    pa_sink_input_cork(u->sink_input, false);
 
     pa_modargs_free(ma);
     return 0;
@@ -907,12 +916,15 @@ void pa__done(pa_module*m) {
      * destruction order! */
 
     if (u->sink_input)
-        pa_sink_input_unlink(u->sink_input);
+        pa_sink_input_cork(u->sink_input, true);
 
     if (u->sink)
         pa_sink_unlink(u->sink);
 
     if (u->sink_input)
+        pa_sink_input_unlink(u->sink_input);
+
+    if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
 
     if (u->sink)
-- 
2.11.0



More information about the pulseaudio-discuss mailing list