[pulseaudio-commits] [SCM] PulseAudio Sound Server branch, master, updated. v0.9.16-test4-67-g1eeddd8

Lennart Poettering gitmailer-noreply at 0pointer.de
Fri Aug 14 16:17:05 PDT 2009


This is an automated email from the git hooks/post-receive script. It was
generated because of a push to the "PulseAudio Sound Server" repository.

The master branch has been updated
      from  0f2a4ed422530b56b3744efe8055540644c0e774 (commit)

- Log -----------------------------------------------------------------
1eeddd8 combine: warn when the latency of a stream gets too high
e1f3f5e combine: big rework
8947d65 combine: drop adjust_timestamp variable because it is unused
a5b2dee ladspa: name sink after human readable plugin name, not the id string
7638662 module-ladspa: allow moving of sink, forward fixed latency
1b3848e module-remap: allow moving of sink, forward fixed latency
c44f518 ladspa: move LADSPA_Data size check to compile time
fb5205d remap: unify argument order with other modules
d9e4605 hook-list: make use of PA_LLIST_FOREACH
d7d86e3 native-protocol: downgrade volume change log messages
3c271ae core: document difference between IO and main thread view on requested latency
c6080d8 core: don't update latency range if not changed
3f9c67a core: call pa_sink_get_latency_within_thread() instead of going directly via process_msg()
350a2bc core: make fixed latency dynamically changeable
4eb59fb core: move rtpoll to thread_info sub structure
58d441f log: place more rate limit invocations
fd1266c rescure-stream: handle failed moves as well as dying sinks/sources
e4db56b core: split of FAIL_ON_SUSPEND into KILL_ON_SUSPEND and NO_CREATE_ON_SUSPEND
e53d2fc native: handle moving() callback with NULL destination properly
0989be1 core: introduce pa_{sink_input|source_output}_fail_move()
7891f96 module-stream-restore: don't fiddle with sinks/sources/streams that are not fully set up yet
-----------------------------------------------------------------------

Summary of changes:
 src/modules/alsa/alsa-sink.c                    |    3 +-
 src/modules/bluetooth/module-bluetooth-device.c |    4 +-
 src/modules/module-combine.c                    |  492 +++++++++++++----------
 src/modules/module-ladspa-sink.c                |  141 ++++---
 src/modules/module-remap-sink.c                 |  138 ++++---
 src/modules/module-rescue-streams.c             |  173 +++++++--
 src/modules/module-stream-restore.c             |  114 ++++--
 src/modules/rtp/module-rtp-recv.c               |    2 +-
 src/pulsecore/cli-text.c                        |   14 +-
 src/pulsecore/hook-list.c                       |    2 +-
 src/pulsecore/protocol-native.c                 |   28 +-
 src/pulsecore/sink-input.c                      |   28 ++-
 src/pulsecore/sink-input.h                      |   12 +-
 src/pulsecore/sink.c                            |  144 +++++--
 src/pulsecore/sink.h                            |   21 +-
 src/pulsecore/source-output.c                   |   28 ++-
 src/pulsecore/source-output.h                   |   12 +-
 src/pulsecore/source.c                          |  123 +++++--
 src/pulsecore/source.h                          |   17 +-
 19 files changed, 982 insertions(+), 514 deletions(-)

-----------------------------------------------------------------------

commit 7891f964e4a1858ccae744ddff5d33b78f00b4d2
Author: Lennart Poettering <lennart at poettering.net>
Date:   Fri Aug 14 23:55:32 2009 +0200

    module-stream-restore: don't fiddle with sinks/sources/streams that are not fully set up yet

diff --git a/src/modules/module-stream-restore.c b/src/modules/module-stream-restore.c
index 8c0bb6b..727a527 100644
--- a/src/modules/module-stream-restore.c
+++ b/src/modules/module-stream-restore.c
@@ -102,15 +102,16 @@ struct userdata {
     pa_idxset *subscribed;
 };
 
-#define ENTRY_VERSION 2
+#define ENTRY_VERSION 3
 
 struct entry {
     uint8_t version;
-    pa_bool_t muted_valid:1, volume_valid:1, device_valid:1;
+    pa_bool_t muted_valid:1, volume_valid:1, device_valid:1, card_valid:1;
     pa_bool_t muted:1;
     pa_channel_map channel_map;
     pa_cvolume volume;
     char device[PA_NAME_MAX];
+    char card[PA_NAME_MAX];
 } PA_GCC_PACKED;
 
 enum {
@@ -196,11 +197,21 @@ static struct entry* read_entry(struct userdata *u, const char *name) {
         goto fail;
     }
 
+    if (!memchr(e->card, 0, sizeof(e->card))) {
+        pa_log_warn("Database contains entry for stream %s with missing NUL byte in card name", name);
+        goto fail;
+    }
+
     if (e->device_valid && !pa_namereg_is_valid_name(e->device)) {
         pa_log_warn("Invalid device name stored in database for stream %s", name);
         goto fail;
     }
 
+    if (e->card_valid && !pa_namereg_is_valid_name(e->card)) {
+        pa_log_warn("Invalid card name stored in database for stream %s", name);
+        goto fail;
+    }
+
     if (e->volume_valid && !pa_channel_map_valid(&e->channel_map)) {
         pa_log_warn("Invalid channel map stored in database for stream %s", name);
         goto fail;
@@ -252,6 +263,10 @@ static pa_bool_t entries_equal(const struct entry *a, const struct entry *b) {
         (a->device_valid && strncmp(a->device, b->device, sizeof(a->device))))
         return FALSE;
 
+    if (a->card_valid != b->card_valid ||
+        (a->card_valid && strncmp(a->card, b->card, sizeof(a->card))))
+        return FALSE;
+
     if (a->muted_valid != b->muted_valid ||
         (a->muted_valid && (a->muted != b->muted)))
         return FALSE;
@@ -308,6 +323,11 @@ static void subscribe_callback(pa_core *c, pa_subscription_event_type_t t, uint3
         if (sink_input->save_sink) {
             pa_strlcpy(entry.device, sink_input->sink->name, sizeof(entry.device));
             entry.device_valid = TRUE;
+
+            if (sink_input->sink->card) {
+                pa_strlcpy(entry.card, sink_input->sink->card->name, sizeof(entry.card));
+                entry.card_valid = TRUE;
+            }
         }
 
     } else {
@@ -327,6 +347,11 @@ static void subscribe_callback(pa_core *c, pa_subscription_event_type_t t, uint3
         if (source_output->save_source) {
             pa_strlcpy(entry.device, source_output->source->name, sizeof(entry.device));
             entry.device_valid = source_output->save_source;
+
+            if (source_output->source->card) {
+                pa_strlcpy(entry.card, source_output->source->card->name, sizeof(entry.card));
+                entry.card_valid = TRUE;
+            }
         }
     }
 
@@ -368,19 +393,28 @@ static pa_hook_result_t sink_input_new_hook_callback(pa_core *c, pa_sink_input_n
     if (!(name = get_name(new_data->proplist, "sink-input")))
         return PA_HOOK_OK;
 
-    if ((e = read_entry(u, name))) {
+    if (new_data->sink)
+        pa_log_debug("Not restoring device for stream %s, because already set.", name);
+    else if ((e = read_entry(u, name))) {
+        pa_sink *s = NULL;
 
-        if (e->device_valid) {
-            pa_sink *s;
+        if (e->device_valid)
+            s = pa_namereg_get(c, e->device, PA_NAMEREG_SINK);
 
-            if ((s = pa_namereg_get(c, e->device, PA_NAMEREG_SINK))) {
-                if (!new_data->sink) {
-                    pa_log_info("Restoring device for stream %s.", name);
-                    new_data->sink = s;
-                    new_data->save_sink = TRUE;
-                } else
-                    pa_log_debug("Not restoring device for stream %s, because already set.", name);
-            }
+        if (!s && e->card_valid) {
+            pa_card *card;
+
+            if ((card = pa_namereg_get(c, e->card, PA_NAMEREG_CARD)))
+                s = pa_idxset_first(card->sinks, NULL);
+        }
+
+        /* It might happen that a stream and a sink are set up at the
+           same time, in which case we want to make sure we don't
+           interfere with that */
+        if (s && PA_SINK_IS_LINKED(pa_sink_get_state(s))) {
+            pa_log_info("Restoring device for stream %s.", name);
+            new_data->sink = s;
+            new_data->save_sink = TRUE;
         }
 
         pa_xfree(e);
@@ -455,18 +489,28 @@ static pa_hook_result_t source_output_new_hook_callback(pa_core *c, pa_source_ou
     if (!(name = get_name(new_data->proplist, "source-output")))
         return PA_HOOK_OK;
 
-    if ((e = read_entry(u, name))) {
-        pa_source *s;
+    if (new_data->source)
+        pa_log_debug("Not restoring device for stream %s, because already set", name);
+    else if ((e = read_entry(u, name))) {
+        pa_source *s = NULL;
 
-        if (e->device_valid) {
-            if ((s = pa_namereg_get(c, e->device, PA_NAMEREG_SOURCE))) {
-                if (!new_data->source) {
-                    pa_log_info("Restoring device for stream %s.", name);
-                    new_data->source = s;
-                    new_data->save_source = TRUE;
-                } else
-                    pa_log_debug("Not restoring device for stream %s, because already set", name);
-            }
+        if (e->device_valid)
+            s = pa_namereg_get(c, e->device, PA_NAMEREG_SOURCE);
+
+        if (!s && e->card_valid) {
+            pa_card *card;
+
+            if ((card = pa_namereg_get(c, e->card, PA_NAMEREG_CARD)))
+                s = pa_idxset_first(card->sources, NULL);
+        }
+
+        /* It might happen that a stream and a sink are set up at the
+           same time, in which case we want to make sure we don't
+           interfere with that */
+        if (s && PA_SOURCE_IS_LINKED(pa_source_get_state(s))) {
+            pa_log_info("Restoring device for stream %s.", name);
+            new_data->source = s;
+            new_data->save_source = TRUE;
         }
 
         pa_xfree(e);
@@ -496,6 +540,12 @@ static pa_hook_result_t sink_put_hook_callback(pa_core *c, pa_sink *sink, struct
         if (si->save_sink)
             continue;
 
+        /* It might happen that a stream and a sink are set up at the
+           same time, in which case we want to make sure we don't
+           interfere with that */
+        if (!PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(si)))
+            continue;
+
         if (!(name = get_name(si->proplist, "sink-input")))
             continue;
 
@@ -534,6 +584,12 @@ static pa_hook_result_t source_put_hook_callback(pa_core *c, pa_source *source,
         if (so->direct_on_input)
             continue;
 
+        /* It might happen that a stream and a sink are set up at the
+           same time, in which case we want to make sure we don't
+           interfere with that */
+        if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(so)))
+            continue;
+
         if (!(name = get_name(so->proplist, "source-input")))
             continue;
 
@@ -575,7 +631,9 @@ static pa_hook_result_t sink_unlink_hook_callback(pa_core *c, pa_sink *sink, str
             if (e->device_valid) {
                 pa_sink *d;
 
-                if ((d = pa_namereg_get(c, e->device, PA_NAMEREG_SINK)) && d != sink)
+                if ((d = pa_namereg_get(c, e->device, PA_NAMEREG_SINK)) &&
+                    d != sink &&
+                    PA_SINK_IS_LINKED(pa_sink_get_state(d)))
                     pa_sink_input_move_to(si, d, TRUE);
             }
 
@@ -613,7 +671,9 @@ static pa_hook_result_t source_unlink_hook_callback(pa_core *c, pa_source *sourc
             if (e->device_valid) {
                 pa_source *d;
 
-                if ((d = pa_namereg_get(c, e->device, PA_NAMEREG_SOURCE)) && d != source)
+                if ((d = pa_namereg_get(c, e->device, PA_NAMEREG_SOURCE)) &&
+                    d != source &&
+                    PA_SOURCE_IS_LINKED(pa_source_get_state(d)))
                     pa_source_output_move_to(so, d, TRUE);
             }
 

commit 0989be13f6b5f71872f381fe2b5a7379702f20bc
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:03:50 2009 +0200

    core: introduce pa_{sink_input|source_output}_fail_move()

diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 1f67d0f..3a9915f 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -1318,6 +1318,24 @@ int pa_sink_input_finish_move(pa_sink_input *i, pa_sink *dest, pa_bool_t save) {
 }
 
 /* Called from main context */
+void pa_sink_input_fail_move(pa_sink_input *i) {
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_ctl_context();
+    pa_assert(PA_SINK_INPUT_IS_LINKED(i->state));
+    pa_assert(!i->sink);
+
+    /* Check if someone wants this sink input? */
+    if (pa_hook_fire(&i->core->hooks[PA_CORE_HOOK_SINK_INPUT_MOVE_FAIL], i) == PA_HOOK_STOP)
+        return;
+
+    if (i->moving)
+        i->moving(i, NULL);
+
+    pa_sink_input_kill(i);
+}
+
+/* Called from main context */
 int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, pa_bool_t save) {
     int r;
 
@@ -1341,6 +1359,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, pa_bool_t save) {
     }
 
     if ((r = pa_sink_input_finish_move(i, dest, save)) < 0) {
+        pa_sink_input_fail_move(i);
         pa_sink_input_unref(i);
         return r;
     }
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index cd424e8..9088d6a 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -159,7 +159,9 @@ struct pa_sink_input {
     /* If non-NULL called whenever the sink input is moved to a new
      * sink. Called from main context after the sink input has been
      * detached from the old sink and before it has been attached to
-     * the new sink. */
+     * the new sink. If dest is NULL the move was executed in two
+     * phases and the second one failed; the stream will be destroyed
+     * after this call. */
     void (*moving) (pa_sink_input *i, pa_sink *dest);   /* may be NULL */
 
     /* Supposed to unlink and destroy this stream. Called from main
@@ -337,6 +339,7 @@ pa_bool_t pa_sink_input_may_move_to(pa_sink_input *i, pa_sink *dest); /* may thi
  * new sink */
 int pa_sink_input_start_move(pa_sink_input *i);
 int pa_sink_input_finish_move(pa_sink_input *i, pa_sink *dest, pa_bool_t save);
+void pa_sink_input_fail_move(pa_sink_input *i);
 
 pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i);
 
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index edcf5bd..65c6374 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -649,7 +649,7 @@ void pa_sink_move_all_finish(pa_sink *s, pa_queue *q, pa_bool_t save) {
 
     while ((i = PA_SINK_INPUT(pa_queue_pop(q)))) {
         if (pa_sink_input_finish_move(i, s, save) < 0)
-            pa_sink_input_kill(i);
+            pa_sink_input_fail_move(i);
 
         pa_sink_input_unref(i);
     }
@@ -665,10 +665,8 @@ void pa_sink_move_all_fail(pa_queue *q) {
     pa_assert(q);
 
     while ((i = PA_SINK_INPUT(pa_queue_pop(q)))) {
-        if (pa_hook_fire(&i->core->hooks[PA_CORE_HOOK_SINK_INPUT_MOVE_FAIL], i) == PA_HOOK_OK) {
-            pa_sink_input_kill(i);
-            pa_sink_input_unref(i);
-        }
+        pa_sink_input_fail_move(i);
+        pa_sink_input_unref(i);
     }
 
     pa_queue_free(q, NULL, NULL);
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 5d79dbb..8cb361c 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -847,6 +847,24 @@ int pa_source_output_finish_move(pa_source_output *o, pa_source *dest, pa_bool_t
 }
 
 /* Called from main context */
+void pa_source_output_fail_move(pa_source_output *o) {
+
+    pa_source_output_assert_ref(o);
+    pa_assert_ctl_context();
+    pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(o->state));
+    pa_assert(!o->source);
+
+    /* Check if someone wants this source output? */
+    if (pa_hook_fire(&o->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], o) == PA_HOOK_STOP)
+        return;
+
+    if (o->moving)
+        o->moving(o, NULL);
+
+    pa_source_output_kill(o);
+}
+
+/* Called from main context */
 int pa_source_output_move_to(pa_source_output *o, pa_source *dest, pa_bool_t save) {
     int r;
 
@@ -870,6 +888,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest, pa_bool_t sav
     }
 
     if ((r = pa_source_output_finish_move(o, dest, save)) < 0) {
+        pa_source_output_fail_move(o);
         pa_source_output_unref(o);
         return r;
     }
diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h
index 4bf88ca..6e3475a 100644
--- a/src/pulsecore/source-output.h
+++ b/src/pulsecore/source-output.h
@@ -127,7 +127,9 @@ struct pa_source_output {
     /* If non-NULL called whenever the source output is moved to a new
      * source. Called from main context after the stream was detached
      * from the old source and before it is attached to the new
-     * source. */
+     * source. If dest is NULL the move was executed in two
+     * phases and the second one failed; the stream will be destroyed
+     * after this call. */
     void (*moving) (pa_source_output *o, pa_source *dest);   /* may be NULL */
 
     /* Supposed to unlink and destroy this stream. Called from main
@@ -262,6 +264,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest, pa_bool_t sav
  * new source */
 int pa_source_output_start_move(pa_source_output *o);
 int pa_source_output_finish_move(pa_source_output *o, pa_source *dest, pa_bool_t save);
+void pa_source_output_fail_move(pa_source_output *o);
 
 #define pa_source_output_get_state(o) ((o)->state)
 
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 97a20b9..5731663 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -574,7 +574,7 @@ void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
 
     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
         if (pa_source_output_finish_move(o, s, save) < 0)
-            pa_source_output_kill(o);
+            pa_source_output_fail_move(o);
 
         pa_source_output_unref(o);
     }
@@ -590,10 +590,8 @@ void pa_source_move_all_fail(pa_queue *q) {
     pa_assert(q);
 
     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
-        if (pa_hook_fire(&o->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], o) == PA_HOOK_OK) {
-            pa_source_output_kill(o);
-            pa_source_output_unref(o);
-        }
+        pa_source_output_fail_move(o);
+        pa_source_output_unref(o);
     }
 
     pa_queue_free(q, NULL, NULL);

commit e53d2fc6b57f90d937f2680fa56461d4042de87a
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:05:17 2009 +0200

    native: handle moving() callback with NULL destination properly

diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index 0337220..a612478 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -762,6 +762,7 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,
         return -1;
 
     switch (code) {
+
         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
             pa_tagstruct *t;
             int l = 0;
@@ -1143,7 +1144,6 @@ static void playback_stream_request_bytes(playback_stream *s) {
         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
 }
 
-
 /* Called from main context */
 static void playback_stream_send_killed(playback_stream *p) {
     pa_tagstruct *t;
@@ -1617,6 +1617,9 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
     s = PLAYBACK_STREAM(i->userdata);
     playback_stream_assert_ref(s);
 
+    if (!dest)
+        return;
+
     fix_playback_buffer_attr(s);
     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
@@ -1752,6 +1755,9 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
     s = RECORD_STREAM(o->userdata);
     record_stream_assert_ref(s);
 
+    if (!dest)
+        return;
+
     fix_record_buffer_attr_pre(s);
     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);

commit e4db56bf0763abaaa34796f5b0234b3cd2cf4d3c
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:12:53 2009 +0200

    core: split of FAIL_ON_SUSPEND into KILL_ON_SUSPEND and NO_CREATE_ON_SUSPEND

diff --git a/src/pulsecore/cli-text.c b/src/pulsecore/cli-text.c
index 9395513..ace5e71 100644
--- a/src/pulsecore/cli-text.c
+++ b/src/pulsecore/cli-text.c
@@ -482,7 +482,7 @@ char *pa_source_output_list_to_string(pa_core *c) {
             s,
             "    index: %u\n"
             "\tdriver: <%s>\n"
-            "\tflags: %s%s%s%s%s%s%s%s%s%s\n"
+            "\tflags: %s%s%s%s%s%s%s%s%s%s%s\n"
             "\tstate: %s\n"
             "\tsource: %u <%s>\n"
             "\tcurrent latency: %0.2f ms\n"
@@ -501,7 +501,8 @@ char *pa_source_output_list_to_string(pa_core *c) {
             o->flags & PA_SOURCE_OUTPUT_FIX_RATE ? "FIX_RATE " : "",
             o->flags & PA_SOURCE_OUTPUT_FIX_CHANNELS ? "FIX_CHANNELS " : "",
             o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND ? "DONT_INHIBIT_AUTO_SUSPEND " : "",
-            o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND ? "FAIL_ON_SUSPEND " : "",
+            o->flags & PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND ? "NO_CREATE_ON_SUSPEND " : "",
+            o->flags & PA_SOURCE_OUTPUT_KILL_ON_SUSPEND ? "KILL_ON_SUSPEND " : "",
             state_table[pa_source_output_get_state(o)],
             o->source->index, o->source->name,
             (double) pa_source_output_get_latency(o, NULL) / PA_USEC_PER_MSEC,
@@ -564,7 +565,7 @@ char *pa_sink_input_list_to_string(pa_core *c) {
             s,
             "    index: %u\n"
             "\tdriver: <%s>\n"
-            "\tflags: %s%s%s%s%s%s%s%s%s%s\n"
+            "\tflags: %s%s%s%s%s%s%s%s%s%s%s\n"
             "\tstate: %s\n"
             "\tsink: %u <%s>\n"
             "\tvolume: %s\n"
@@ -587,7 +588,8 @@ char *pa_sink_input_list_to_string(pa_core *c) {
             i->flags & PA_SINK_INPUT_FIX_RATE ? "FIX_RATE " : "",
             i->flags & PA_SINK_INPUT_FIX_CHANNELS ? "FIX_CHANNELS " : "",
             i->flags & PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND ? "DONT_INHIBIT_AUTO_SUSPEND " : "",
-            i->flags & PA_SINK_INPUT_FAIL_ON_SUSPEND ? "FAIL_ON_SUSPEND " : "",
+            i->flags & PA_SINK_INPUT_NO_CREATE_ON_SUSPEND ? "NO_CREATE_SUSPEND " : "",
+            i->flags & PA_SINK_INPUT_KILL_ON_SUSPEND ? "KILL_ON_SUSPEND " : "",
             state_table[pa_sink_input_get_state(i)],
             i->sink->index, i->sink->name,
             pa_cvolume_snprint(cv, sizeof(cv), &v),
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index a612478..b6989ae 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -1957,7 +1957,7 @@ static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, u
         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
-        (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
+        (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0);
 
     /* Only since protocol version 15 there's a seperate muted_set
      * flag. For older versions we synthesize it here */
@@ -2213,7 +2213,7 @@ static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uin
         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
-        (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
+        (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0);
 
     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
     pa_proplist_free(p);
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 3a9915f..1b3ea92 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -221,7 +221,7 @@ int pa_sink_input_new(
     if ((r = pa_hook_fire(&core->hooks[PA_CORE_HOOK_SINK_INPUT_FIXATE], data)) < 0)
         return r;
 
-    if ((flags & PA_SINK_INPUT_FAIL_ON_SUSPEND) &&
+    if ((flags & PA_SINK_INPUT_NO_CREATE_ON_SUSPEND) &&
         pa_sink_get_state(data->sink) == PA_SINK_SUSPENDED) {
         pa_log_warn("Failed to create sink input: sink is suspended.");
         return -PA_ERR_BADSTATE;
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index 9088d6a..c1f8082 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -58,7 +58,8 @@ typedef enum pa_sink_input_flags {
     PA_SINK_INPUT_FIX_RATE = 64,
     PA_SINK_INPUT_FIX_CHANNELS = 128,
     PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND = 256,
-    PA_SINK_INPUT_FAIL_ON_SUSPEND = 512
+    PA_SINK_INPUT_NO_CREATE_ON_SUSPEND = 512,
+    PA_SINK_INPUT_KILL_ON_SUSPEND = 1024
 } pa_sink_input_flags_t;
 
 struct pa_sink_input {
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 65c6374..90c9d85 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -398,9 +398,9 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
 
         /* We're suspending or resuming, tell everyone about it */
 
-        for (i = PA_SINK_INPUT(pa_idxset_first(s->inputs, &idx)); i; i = PA_SINK_INPUT(pa_idxset_next(s->inputs, &idx)))
+        PA_IDXSET_FOREACH(i, s->inputs, idx)
             if (s->state == PA_SINK_SUSPENDED &&
-                (i->flags & PA_SINK_INPUT_FAIL_ON_SUSPEND))
+                (i->flags & PA_SINK_INPUT_KILL_ON_SUSPEND))
                 pa_sink_input_kill(i);
             else if (i->suspend)
                 i->suspend(i, state == PA_SINK_SUSPENDED);
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 8cb361c..2b3a0c5 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -167,7 +167,7 @@ int pa_source_output_new(
     if ((r = pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_FIXATE], data)) < 0)
         return r;
 
-    if ((flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND) &&
+    if ((flags & PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND) &&
         pa_source_get_state(data->source) == PA_SOURCE_SUSPENDED) {
         pa_log("Failed to create source output: source is suspended.");
         return -PA_ERR_BADSTATE;
diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h
index 6e3475a..b78a02b 100644
--- a/src/pulsecore/source-output.h
+++ b/src/pulsecore/source-output.h
@@ -55,7 +55,8 @@ typedef enum pa_source_output_flags {
     PA_SOURCE_OUTPUT_FIX_RATE = 64,
     PA_SOURCE_OUTPUT_FIX_CHANNELS = 128,
     PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND = 256,
-    PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND = 512
+    PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND = 512,
+    PA_SOURCE_OUTPUT_KILL_ON_SUSPEND = 1024
 } pa_source_output_flags_t;
 
 struct pa_source_output {
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 5731663..a44275c 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -336,15 +336,14 @@ static int source_set_state(pa_source *s, pa_source_state_t state) {
 
         /* We're suspending or resuming, tell everyone about it */
 
-        for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx)))
+        PA_IDXSET_FOREACH(o, s->outputs, idx)
             if (s->state == PA_SOURCE_SUSPENDED &&
-                (o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND))
+                (o->flags & PA_SOURCE_OUTPUT_KILL_ON_SUSPEND))
                 pa_source_output_kill(o);
             else if (o->suspend)
                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
     }
 
-
     return 0;
 }
 

commit fd1266c666f62f1c19bff6c1ab3397300e25ffed
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:15:18 2009 +0200

    rescure-stream: handle failed moves as well as dying sinks/sources

diff --git a/src/modules/module-rescue-streams.c b/src/modules/module-rescue-streams.c
index c23fece..82f693f 100644
--- a/src/modules/module-rescue-streams.c
+++ b/src/modules/module-rescue-streams.c
@@ -45,13 +45,43 @@ static const char* const valid_modargs[] = {
 };
 
 struct userdata {
-    pa_hook_slot *sink_slot, *source_slot;
+    pa_hook_slot
+        *sink_unlink_slot,
+        *source_unlink_slot,
+        *sink_input_move_fail_slot,
+        *source_output_move_fail_slot;
 };
 
-static pa_hook_result_t sink_hook_callback(pa_core *c, pa_sink *sink, void* userdata) {
+static pa_sink* find_evacuation_sink(pa_core *c, pa_sink_input *i, pa_sink *skip) {
+    pa_sink *target, *def;
+    uint32_t idx;
+
+    pa_assert(c);
+    pa_assert(i);
+
+    def = pa_namereg_get_default_sink(c);
+
+    if (def && def != skip && pa_sink_input_may_move_to(i, def))
+        return def;
+
+    PA_IDXSET_FOREACH(target, c->sinks, idx) {
+        if (target == def)
+            continue;
+
+        if (target == skip)
+            continue;
+
+        if (pa_sink_input_may_move_to(i, target))
+            return target;
+    }
+
+    pa_log_debug("No evacuation sink found.");
+    return NULL;
+}
+
+static pa_hook_result_t sink_unlink_hook_callback(pa_core *c, pa_sink *sink, void* userdata) {
     pa_sink_input *i;
     uint32_t idx;
-    pa_sink *target;
 
     pa_assert(c);
     pa_assert(sink);
@@ -65,21 +95,12 @@ static pa_hook_result_t sink_hook_callback(pa_core *c, pa_sink *sink, void* user
         return PA_HOOK_OK;
     }
 
-    if (!(target = pa_namereg_get_default_sink(c)) || target == sink) {
-
-        PA_IDXSET_FOREACH(target, c->sinks, idx)
-            if (target != sink)
-                break;
-
-        if (!target) {
-            pa_log_debug("No evacuation sink found.");
-            return PA_HOOK_OK;
-        }
-    }
+    PA_IDXSET_FOREACH(i, sink->inputs, idx) {
+        pa_sink *target;
 
-    pa_assert(target != sink);
+        if (!(target = find_evacuation_sink(c, i, sink)))
+            continue;
 
-    PA_IDXSET_FOREACH(i, sink->inputs, idx) {
         if (pa_sink_input_move_to(i, target, FALSE) < 0)
             pa_log_info("Failed to move sink input %u \"%s\" to %s.", i->index,
                         pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_APPLICATION_NAME)), target->name);
@@ -91,9 +112,63 @@ static pa_hook_result_t sink_hook_callback(pa_core *c, pa_sink *sink, void* user
     return PA_HOOK_OK;
 }
 
-static pa_hook_result_t source_hook_callback(pa_core *c, pa_source *source, void* userdata) {
+static pa_hook_result_t sink_input_move_fail_hook_callback(pa_core *c, pa_sink_input *i, void *userdata) {
+    pa_sink *target;
+
+    pa_assert(c);
+    pa_assert(i);
+
+    /* There's no point in doing anything if the core is shut down anyway */
+    if (c->state == PA_CORE_SHUTDOWN)
+        return PA_HOOK_OK;
+
+    if (!(target = find_evacuation_sink(c, i, NULL)))
+        return PA_HOOK_OK;
+
+    if (pa_sink_input_finish_move(i, target, FALSE) < 0) {
+        pa_log_info("Failed to move sink input %u \"%s\" to %s.", i->index,
+                        pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_APPLICATION_NAME)), target->name);
+        return PA_HOOK_OK;
+
+    } else {
+        pa_log_info("Sucessfully moved sink input %u \"%s\" to %s.", i->index,
+                    pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_APPLICATION_NAME)), target->name);
+        return PA_HOOK_STOP;
+    }
+}
+
+static pa_source* find_evacuation_source(pa_core *c, pa_source_output *o, pa_source *skip) {
+    pa_source *target, *def;
+    uint32_t idx;
+
+    pa_assert(c);
+    pa_assert(o);
+
+    def = pa_namereg_get_default_source(c);
+
+    if (def && def != skip && pa_source_output_may_move_to(o, def))
+        return def;
+
+    PA_IDXSET_FOREACH(target, c->sources, idx) {
+        if (target == def)
+            continue;
+
+        if (target == skip)
+            continue;
+
+        if (!target->monitor_of != !skip->monitor_of)
+            continue;
+
+        if (pa_source_output_may_move_to(o, target))
+            return target;
+    }
+
+    pa_log_debug("No evacuation source found.");
+    return NULL;
+}
+
+static pa_hook_result_t source_unlink_hook_callback(pa_core *c, pa_source *source, void* userdata) {
     pa_source_output *o;
-    pa_source *target;
     uint32_t idx;
 
     pa_assert(c);
@@ -108,21 +183,12 @@ static pa_hook_result_t source_hook_callback(pa_core *c, pa_source *source, void
         return PA_HOOK_OK;
     }
 
-    if (!(target = pa_namereg_get_default_source(c)) || target == source) {
-
-        PA_IDXSET_FOREACH(target, c->sources, idx)
-            if (target != source && !target->monitor_of == !source->monitor_of)
-                break;
-
-        if (!target) {
-            pa_log_info("No evacuation source found.");
-            return PA_HOOK_OK;
-        }
-    }
+    PA_IDXSET_FOREACH(o, source->outputs, idx) {
+        pa_source *target;
 
-    pa_assert(target != source);
+        if (!(target = find_evacuation_source(c, o, source)))
+            continue;
 
-    PA_IDXSET_FOREACH(o, source->outputs, idx) {
         if (pa_source_output_move_to(o, target, FALSE) < 0)
             pa_log_info("Failed to move source output %u \"%s\" to %s.", o->index,
                         pa_strnull(pa_proplist_gets(o->proplist, PA_PROP_APPLICATION_NAME)), target->name);
@@ -134,6 +200,31 @@ static pa_hook_result_t source_hook_callback(pa_core *c, pa_source *source, void
     return PA_HOOK_OK;
 }
 
+static pa_hook_result_t source_output_move_fail_hook_callback(pa_core *c, pa_source_output *i, void *userdata) {
+    pa_source *target;
+
+    pa_assert(c);
+    pa_assert(i);
+
+    /* There's no point in doing anything if the core is shut down anyway */
+    if (c->state == PA_CORE_SHUTDOWN)
+        return PA_HOOK_OK;
+
+    if (!(target = find_evacuation_source(c, i, NULL)))
+        return PA_HOOK_OK;
+
+    if (pa_source_output_finish_move(i, target, FALSE) < 0) {
+        pa_log_info("Failed to move source input %u \"%s\" to %s.", i->index,
+                        pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_APPLICATION_NAME)), target->name);
+        return PA_HOOK_OK;
+
+    } else {
+        pa_log_info("Sucessfully moved source input %u \"%s\" to %s.", i->index,
+                    pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_APPLICATION_NAME)), target->name);
+        return PA_HOOK_STOP;
+    }
+}
+
 int pa__init(pa_module*m) {
     pa_modargs *ma;
     struct userdata *u;
@@ -148,8 +239,11 @@ int pa__init(pa_module*m) {
     m->userdata = u = pa_xnew(struct userdata, 1);
 
     /* A little bit later than module-stream-restore, module-intended-roles... */
-    u->sink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_LATE+20, (pa_hook_cb_t) sink_hook_callback, u);
-    u->source_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], PA_HOOK_LATE+20, (pa_hook_cb_t) source_hook_callback, u);
+    u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_LATE+20, (pa_hook_cb_t) sink_unlink_hook_callback, u);
+    u->source_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], PA_HOOK_LATE+20, (pa_hook_cb_t) source_unlink_hook_callback, u);
+
+    u->sink_input_move_fail_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_INPUT_MOVE_FAIL], PA_HOOK_LATE+20, (pa_hook_cb_t) sink_input_move_fail_hook_callback, u);
+    u->source_output_move_fail_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], PA_HOOK_LATE+20, (pa_hook_cb_t) source_output_move_fail_hook_callback, u);
 
     pa_modargs_free(ma);
     return 0;
@@ -163,10 +257,15 @@ void pa__done(pa_module*m) {
     if (!(u = m->userdata))
         return;
 
-    if (u->sink_slot)
-        pa_hook_slot_free(u->sink_slot);
-    if (u->source_slot)
-        pa_hook_slot_free(u->source_slot);
+    if (u->sink_unlink_slot)
+        pa_hook_slot_free(u->sink_unlink_slot);
+    if (u->source_unlink_slot)
+        pa_hook_slot_free(u->source_unlink_slot);
+
+    if (u->sink_input_move_fail_slot)
+        pa_hook_slot_free(u->sink_input_move_fail_slot);
+    if (u->source_output_move_fail_slot)
+        pa_hook_slot_free(u->source_output_move_fail_slot);
 
     pa_xfree(u);
 }

commit 58d441f7ea7994d5a0e8bc5397e2986707eb466b
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:16:25 2009 +0200

    log: place more rate limit invocations

diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c
index 1c38430..a91b4b8 100644
--- a/src/modules/alsa/alsa-sink.c
+++ b/src/modules/alsa/alsa-sink.c
@@ -1292,7 +1292,8 @@ static void thread_func(void *userdata) {
                      * we have filled the buffer at least once
                      * completely.*/
 
-                    pa_log_debug("Cutting sleep time for the initial iterations by half.");
+                    if (pa_log_ratelimit())
+                        pa_log_debug("Cutting sleep time for the initial iterations by half.");
                     sleep_usec /= 2;
                 }
 
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index b6989ae..b578746 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -1345,7 +1345,9 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
 
             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
-                pa_log_warn("Failed to push data into queue");
+
+                if (pa_log_ratelimit())
+                    pa_log_warn("Failed to push data into queue");
                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
             }

commit 4eb59fb90e474a81f2d626bc4fc7db083fafed7a
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:26:00 2009 +0200

    core: move rtpoll to thread_info sub structure

diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 16de689..9271655 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -455,12 +455,12 @@ static void sink_input_attach_cb(pa_sink_input *i) {
     pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
 
     o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
-            i->sink->rtpoll,
+            i->sink->thread_info.rtpoll,
             PA_RTPOLL_LATE,  /* This one is not that important, since we check for data in _peek() anyway. */
             o->inq);
 
     o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
-            i->sink->rtpoll,
+            i->sink->thread_info.rtpoll,
             PA_RTPOLL_EARLY,
             o->outq);
 }
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
index 5caf827..c195c04 100644
--- a/src/modules/rtp/module-rtp-recv.c
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -361,7 +361,7 @@ static void sink_input_attach(pa_sink_input *i) {
     pa_assert_se(s = i->userdata);
 
     pa_assert(!s->rtpoll_item);
-    s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
+    s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
 
     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
     p->fd = s->rtp_context.fd;
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 90c9d85..1f9a979 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -262,7 +262,6 @@ pa_sink* pa_sink_new(
     s->userdata = NULL;
 
     s->asyncmsgq = NULL;
-    s->rtpoll = NULL;
 
     /* As a minor optimization we just steal the list instead of
      * copying it here */
@@ -295,6 +294,7 @@ pa_sink* pa_sink_new(
             &s->sample_spec,
             0);
 
+    s->thread_info.rtpoll = NULL;
     s->thread_info.inputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
     s->thread_info.soft_volume =  s->soft_volume;
     s->thread_info.soft_muted = s->muted;
@@ -421,7 +421,6 @@ void pa_sink_put(pa_sink* s) {
 
     /* The following fields must be initialized properly when calling _put() */
     pa_assert(s->asyncmsgq);
-    pa_assert(s->rtpoll);
     pa_assert(s->thread_info.min_latency <= s->thread_info.max_latency);
 
     /* Generally, flags should be initialized via pa_sink_new(). As a
@@ -563,12 +562,12 @@ void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q) {
         pa_source_set_asyncmsgq(s->monitor_source, q);
 }
 
-/* Called from main context */
+/* Called from IO context, or before _put() from main context */
 void pa_sink_set_rtpoll(pa_sink *s, pa_rtpoll *p) {
     pa_sink_assert_ref(s);
-    pa_assert_ctl_context();
+    pa_sink_assert_io_context(s);
 
-    s->rtpoll = p;
+    s->thread_info.rtpoll = p;
 
     if (s->monitor_source)
         pa_source_set_rtpoll(s->monitor_source, p);
@@ -1184,7 +1183,7 @@ pa_usec_t pa_sink_get_latency_within_thread(pa_sink *s) {
 
     o = PA_MSGOBJECT(s);
 
-    /* We probably should make this a proper vtable callback instead of going through process_msg() */
+    /* FIXME: We probably should make this a proper vtable callback instead of going through process_msg() */
 
     if (o->process_msg(o, PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
         return -1;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index 1303396..33145df 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -102,7 +102,6 @@ struct pa_sink {
     pa_bool_t save_muted:1;
 
     pa_asyncmsgq *asyncmsgq;
-    pa_rtpoll *rtpoll;
 
     pa_memchunk silence;
 
@@ -156,6 +155,8 @@ struct pa_sink {
         pa_sink_state_t state;
         pa_hashmap *inputs;
 
+        pa_rtpoll *rtpoll;
+
         pa_cvolume soft_volume;
         pa_bool_t soft_muted:1;
 
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index a44275c..028d679 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -232,7 +232,6 @@ pa_source* pa_source_new(
     s->userdata = NULL;
 
     s->asyncmsgq = NULL;
-    s->rtpoll = NULL;
 
     /* As a minor optimization we just steal the list instead of
      * copying it here */
@@ -265,6 +264,7 @@ pa_source* pa_source_new(
             &s->sample_spec,
             0);
 
+    s->thread_info.rtpoll = NULL;
     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
     s->thread_info.soft_volume = s->soft_volume;
     s->thread_info.soft_muted = s->muted;
@@ -356,7 +356,6 @@ void pa_source_put(pa_source *s) {
 
     /* The following fields must be initialized properly when calling _put() */
     pa_assert(s->asyncmsgq);
-    pa_assert(s->rtpoll);
     pa_assert(s->thread_info.min_latency <= s->thread_info.max_latency);
 
     /* Generally, flags should be initialized via pa_source_new(). As
@@ -465,18 +464,18 @@ static void source_free(pa_object *o) {
 
 /* Called from main context */
 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
-    pa_assert_ctl_context();
     pa_source_assert_ref(s);
+    pa_assert_ctl_context();
 
     s->asyncmsgq = q;
 }
 
 /* Called from main context */
 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
-    pa_assert_ctl_context();
     pa_source_assert_ref(s);
+    pa_source_assert_io_context(s);
 
-    s->rtpoll = p;
+    s->thread_info.rtpoll = p;
 }
 
 /* Called from main context */
diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h
index 001122b..6c0a290 100644
--- a/src/pulsecore/source.h
+++ b/src/pulsecore/source.h
@@ -90,7 +90,6 @@ struct pa_source {
     pa_bool_t save_muted:1;
 
     pa_asyncmsgq *asyncmsgq;
-    pa_rtpoll *rtpoll;
 
     pa_memchunk silence;
 
@@ -140,6 +139,8 @@ struct pa_source {
         pa_source_state_t state;
         pa_hashmap *outputs;
 
+        pa_rtpoll *rtpoll;
+
         pa_cvolume soft_volume;
         pa_bool_t soft_muted:1;
 

commit 350a2bc846559bb274ba70f928bb42a9472050bf
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:48:14 2009 +0200

    core: make fixed latency dynamically changeable
    
    This of course makes the name 'fixed' a bit of a misnomer. However the
    definitions are now like this:
    
    fixed latency: the latency may change during runtime, but is solely
    controlled by the backend, the client has no influence.
    
    dynamic latency: the latency may change during runtime, influenced by
    the requests of the clients.
    
    i.e. fixed vs. dynamic is from the perspective of the client.

diff --git a/src/modules/bluetooth/module-bluetooth-device.c b/src/modules/bluetooth/module-bluetooth-device.c
index e682997..93b14a1 100644
--- a/src/modules/bluetooth/module-bluetooth-device.c
+++ b/src/modules/bluetooth/module-bluetooth-device.c
@@ -881,7 +881,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
                 *((pa_usec_t*) data) = wi > ri ? wi - ri : 0;
             }
 
-            *((pa_usec_t*) data) += u->sink->fixed_latency;
+            *((pa_usec_t*) data) += u->sink->thread_info.fixed_latency;
             return 0;
         }
     }
@@ -943,7 +943,7 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
             wi = pa_smoother_get(u->read_smoother, pa_rtclock_now());
             ri = pa_bytes_to_usec(u->read_index, &u->sample_spec);
 
-            *((pa_usec_t*) data) = (wi > ri ? wi - ri : 0) + u->source->fixed_latency;
+            *((pa_usec_t*) data) = (wi > ri ? wi - ri : 0) + u->source->thread_info.fixed_latency;
             return 0;
         }
 
diff --git a/src/pulsecore/cli-text.c b/src/pulsecore/cli-text.c
index ace5e71..a553099 100644
--- a/src/pulsecore/cli-text.c
+++ b/src/pulsecore/cli-text.c
@@ -296,7 +296,7 @@ char *pa_sink_list_to_string(pa_core *c) {
             pa_strbuf_printf(
                     s,
                     "\tfixed latency: %0.2f ms\n",
-                    (double) pa_sink_get_requested_latency(sink) / PA_USEC_PER_MSEC);
+                    (double) pa_sink_get_fixed_latency(sink) / PA_USEC_PER_MSEC);
 
         if (sink->card)
             pa_strbuf_printf(s, "\tcard: %u <%s>\n", sink->card->index, sink->card->name);
@@ -415,7 +415,7 @@ char *pa_source_list_to_string(pa_core *c) {
             pa_strbuf_printf(
                     s,
                     "\tfixed latency: %0.2f ms\n",
-                    (double) pa_source_get_requested_latency(source) / PA_USEC_PER_MSEC);
+                    (double) pa_source_get_fixed_latency(source) / PA_USEC_PER_MSEC);
 
         if (source->monitor_of)
             pa_strbuf_printf(s, "\tmonitor_of: %u\n", source->monitor_of->index);
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 1b3ea92..f6d9ac7 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -114,6 +114,7 @@ static void reset_callbacks(pa_sink_input *i) {
     i->update_max_request = NULL;
     i->update_sink_requested_latency = NULL;
     i->update_sink_latency_range = NULL;
+    i->update_sink_fixed_latency = NULL;
     i->attach = NULL;
     i->detach = NULL;
     i->suspend = NULL;
@@ -851,13 +852,13 @@ pa_usec_t pa_sink_input_set_requested_latency_within_thread(pa_sink_input *i, pa
     pa_sink_input_assert_io_context(i);
 
     if (!(i->sink->flags & PA_SINK_DYNAMIC_LATENCY))
-        usec = i->sink->fixed_latency;
+        usec = i->sink->thread_info.fixed_latency;
 
     if (usec != (pa_usec_t) -1)
         usec = PA_CLAMP(usec, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
 
     i->thread_info.requested_sink_latency = usec;
-    pa_sink_invalidate_requested_latency(i->sink);
+    pa_sink_invalidate_requested_latency(i->sink, TRUE);
 
     return usec;
 }
@@ -877,7 +878,7 @@ pa_usec_t pa_sink_input_set_requested_latency(pa_sink_input *i, pa_usec_t usec)
 
     if (i->sink) {
         if (!(i->sink->flags & PA_SINK_DYNAMIC_LATENCY))
-            usec = i->sink->fixed_latency;
+            usec = pa_sink_get_fixed_latency(i->sink);
 
         if (usec != (pa_usec_t) -1) {
             pa_usec_t min_latency, max_latency;
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index c1f8082..c182083 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -138,6 +138,10 @@ struct pa_sink_input {
      * from IO context. */
     void (*update_sink_latency_range) (pa_sink_input *i); /* may be NULL */
 
+    /* Called whenver the fixed latency of the sink changes, if there
+     * is one. Called from IO context. */
+    void (*update_sink_fixed_latency) (pa_sink_input *i); /* may be NULL */
+
     /* If non-NULL this function is called when the input is first
      * connected to a sink or when the rtpoll/asyncmsgq fields
      * change. You usually don't need to implement this function
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 1f9a979..fd95f75 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -256,8 +256,6 @@ pa_sink* pa_sink_new(
     s->muted = data->muted;
     s->refresh_volume = s->refresh_muted = FALSE;
 
-    s->fixed_latency = flags & PA_SINK_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
-
     reset_callbacks(s);
     s->userdata = NULL;
 
@@ -307,6 +305,7 @@ pa_sink* pa_sink_new(
     s->thread_info.requested_latency = 0;
     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
+    s->thread_info.fixed_latency = flags & PA_SINK_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
 
     pa_assert_se(pa_idxset_put(core->sinks, s, &s->index) >= 0);
 
@@ -349,6 +348,7 @@ pa_sink* pa_sink_new(
     s->monitor_source->monitor_of = s;
 
     pa_source_set_latency_range(s->monitor_source, s->thread_info.min_latency, s->thread_info.max_latency);
+    pa_source_set_fixed_latency(s->monitor_source, s->thread_info.fixed_latency);
     pa_source_set_max_rewind(s->monitor_source, s->thread_info.max_rewind);
 
     return s;
@@ -438,11 +438,11 @@ void pa_sink_put(pa_sink* s) {
 
     pa_assert((s->flags & PA_SINK_HW_VOLUME_CTRL) || (s->base_volume == PA_VOLUME_NORM && s->flags & PA_SINK_DECIBEL_VOLUME));
     pa_assert(!(s->flags & PA_SINK_DECIBEL_VOLUME) || s->n_volume_steps == PA_VOLUME_NORM+1);
-    pa_assert(!(s->flags & PA_SINK_DYNAMIC_LATENCY) == (s->fixed_latency != 0));
+    pa_assert(!(s->flags & PA_SINK_DYNAMIC_LATENCY) == (s->thread_info.fixed_latency != 0));
     pa_assert(!(s->flags & PA_SINK_LATENCY) == !(s->monitor_source->flags & PA_SOURCE_LATENCY));
     pa_assert(!(s->flags & PA_SINK_DYNAMIC_LATENCY) == !(s->monitor_source->flags & PA_SOURCE_DYNAMIC_LATENCY));
 
-    pa_assert(s->monitor_source->fixed_latency == s->fixed_latency);
+    pa_assert(s->monitor_source->thread_info.fixed_latency == s->thread_info.fixed_latency);
     pa_assert(s->monitor_source->thread_info.min_latency == s->thread_info.min_latency);
     pa_assert(s->monitor_source->thread_info.max_latency == s->thread_info.max_latency);
 
@@ -1748,7 +1748,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
             if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index)))
                 pa_sink_input_unref(i);
 
-            pa_sink_invalidate_requested_latency(s);
+            pa_sink_invalidate_requested_latency(s, TRUE);
             pa_sink_request_rewind(s, (size_t) -1);
 
             /* In flat volume mode we need to update the volume as
@@ -1794,7 +1794,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
             if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index)))
                 pa_sink_input_unref(i);
 
-            pa_sink_invalidate_requested_latency(s);
+            pa_sink_invalidate_requested_latency(s, TRUE);
 
             pa_log_debug("Requesting rewind due to started move");
             pa_sink_request_rewind(s, (size_t) -1);
@@ -1946,6 +1946,16 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
             return 0;
         }
 
+        case PA_SINK_MESSAGE_GET_FIXED_LATENCY:
+
+            *((pa_usec_t*) userdata) = s->thread_info.fixed_latency;
+            return 0;
+
+        case PA_SINK_MESSAGE_SET_FIXED_LATENCY:
+
+            pa_sink_set_fixed_latency_within_thread(s, (pa_usec_t) offset);
+            return 0;
+
         case PA_SINK_MESSAGE_GET_MAX_REWIND:
 
             *((size_t*) userdata) = s->thread_info.max_rewind;
@@ -2082,13 +2092,12 @@ pa_usec_t pa_sink_get_requested_latency_within_thread(pa_sink *s) {
     pa_sink_assert_io_context(s);
 
     if (!(s->flags & PA_SINK_DYNAMIC_LATENCY))
-        return PA_CLAMP(s->fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
+        return PA_CLAMP(s->thread_info.fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
 
     if (s->thread_info.requested_latency_valid)
         return s->thread_info.requested_latency;
 
-    while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)))
-
+    PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state)
         if (i->thread_info.requested_sink_latency != (pa_usec_t) -1 &&
             (result == (pa_usec_t) -1 || result > i->thread_info.requested_sink_latency))
             result = i->thread_info.requested_sink_latency;
@@ -2190,18 +2199,18 @@ void pa_sink_set_max_request(pa_sink *s, size_t max_request) {
 }
 
 /* Called from IO thread */
-void pa_sink_invalidate_requested_latency(pa_sink *s) {
+void pa_sink_invalidate_requested_latency(pa_sink *s, pa_bool_t dynamic) {
     pa_sink_input *i;
     void *state = NULL;
 
     pa_sink_assert_ref(s);
     pa_sink_assert_io_context(s);
 
-    if (!(s->flags & PA_SINK_DYNAMIC_LATENCY))
+    if ((s->flags & PA_SINK_DYNAMIC_LATENCY))
+        s->thread_info.requested_latency_valid = FALSE;
+    else if (dynamic)
         return;
 
-    s->thread_info.requested_latency_valid = FALSE;
-
     if (PA_SINK_IS_LINKED(s->thread_info.state)) {
 
         if (s->update_requested_latency)
@@ -2295,16 +2304,20 @@ void pa_sink_set_latency_range_within_thread(pa_sink *s, pa_usec_t min_latency,
                 i->update_sink_latency_range(i);
     }
 
-    pa_sink_invalidate_requested_latency(s);
+    pa_sink_invalidate_requested_latency(s, FALSE);
 
     pa_source_set_latency_range_within_thread(s->monitor_source, min_latency, max_latency);
 }
 
-/* Called from main thread, before the sink is put */
+/* Called from main thread */
 void pa_sink_set_fixed_latency(pa_sink *s, pa_usec_t latency) {
     pa_sink_assert_ref(s);
     pa_assert_ctl_context();
-    pa_assert(pa_sink_get_state(s) == PA_SINK_INIT);
+
+    if (s->flags & PA_SINK_DYNAMIC_LATENCY) {
+        pa_assert(latency == 0);
+        return;
+    }
 
     if (latency < ABSOLUTE_MIN_LATENCY)
         latency = ABSOLUTE_MIN_LATENCY;
@@ -2312,10 +2325,64 @@ void pa_sink_set_fixed_latency(pa_sink *s, pa_usec_t latency) {
     if (latency > ABSOLUTE_MAX_LATENCY)
         latency = ABSOLUTE_MAX_LATENCY;
 
-    s->fixed_latency = latency;
+    if (PA_SINK_IS_LINKED(s->state))
+        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_SET_FIXED_LATENCY, NULL, (int64_t) latency, NULL) == 0);
+    else
+        s->thread_info.fixed_latency = latency;
+
     pa_source_set_fixed_latency(s->monitor_source, latency);
 }
 
+/* Called from main thread */
+pa_usec_t pa_sink_get_fixed_latency(pa_sink *s) {
+    pa_usec_t latency;
+
+    pa_sink_assert_ref(s);
+    pa_assert_ctl_context();
+
+    if (s->flags & PA_SINK_DYNAMIC_LATENCY)
+        return 0;
+
+    if (PA_SINK_IS_LINKED(s->state))
+        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_FIXED_LATENCY, &latency, 0, NULL) == 0);
+    else
+        latency = s->thread_info.fixed_latency;
+
+    return latency;
+}
+
+/* Called from IO thread */
+void pa_sink_set_fixed_latency_within_thread(pa_sink *s, pa_usec_t latency) {
+    pa_sink_assert_ref(s);
+    pa_sink_assert_io_context(s);
+
+    if (s->flags & PA_SINK_DYNAMIC_LATENCY) {
+        pa_assert(latency == 0);
+        return;
+    }
+
+    pa_assert(latency >= ABSOLUTE_MIN_LATENCY);
+    pa_assert(latency <= ABSOLUTE_MAX_LATENCY);
+
+    if (s->thread_info.fixed_latency == latency)
+        return;
+
+    s->thread_info.fixed_latency = latency;
+
+    if (PA_SINK_IS_LINKED(s->thread_info.state)) {
+        pa_sink_input *i;
+        void *state = NULL;
+
+        PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state)
+            if (i->update_sink_fixed_latency)
+                i->update_sink_fixed_latency(i);
+    }
+
+    pa_sink_invalidate_requested_latency(s, FALSE);
+
+    pa_source_set_fixed_latency_within_thread(s->monitor_source, latency);
+}
+
 /* Called from main context */
 size_t pa_sink_get_max_rewind(pa_sink *s) {
     size_t r;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index 33145df..55bca7f 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -105,8 +105,6 @@ struct pa_sink {
 
     pa_memchunk silence;
 
-    pa_usec_t fixed_latency; /* for sinks with PA_SINK_DYNAMIC_LATENCY this is 0 */
-
     pa_hashmap *ports;
     pa_device_port *active_port;
 
@@ -160,6 +158,9 @@ struct pa_sink {
         pa_cvolume soft_volume;
         pa_bool_t soft_muted:1;
 
+        /* The requested latency is used for dynamic latency
+         * sinks. For fixed latency sinks it is always identical to
+         * the fixed_latency. See below. */
         pa_bool_t requested_latency_valid:1;
         pa_usec_t requested_latency;
 
@@ -175,8 +176,15 @@ struct pa_sink {
         size_t rewind_nbytes;
         pa_bool_t rewind_requested;
 
+        /* Both dynamic and fixed latencies will be clamped to this
+         * range. */
         pa_usec_t min_latency; /* we won't go below this latency */
         pa_usec_t max_latency; /* An upper limit for the latencies */
+
+        /* 'Fixed' simply means that the latency is exclusively
+         * decided on by the sink, and the clients have no influence
+         * in changing it */
+        pa_usec_t fixed_latency; /* for sinks with PA_SINK_DYNAMIC_LATENCY this is 0 */
     } thread_info;
 
     void *userdata;
@@ -202,6 +210,8 @@ typedef enum pa_sink_message {
     PA_SINK_MESSAGE_DETACH,
     PA_SINK_MESSAGE_SET_LATENCY_RANGE,
     PA_SINK_MESSAGE_GET_LATENCY_RANGE,
+    PA_SINK_MESSAGE_SET_FIXED_LATENCY,
+    PA_SINK_MESSAGE_GET_FIXED_LATENCY,
     PA_SINK_MESSAGE_GET_MAX_REWIND,
     PA_SINK_MESSAGE_GET_MAX_REQUEST,
     PA_SINK_MESSAGE_SET_MAX_REWIND,
@@ -282,6 +292,7 @@ pa_bool_t pa_device_init_intended_roles(pa_proplist *p);
 pa_usec_t pa_sink_get_latency(pa_sink *s);
 pa_usec_t pa_sink_get_requested_latency(pa_sink *s);
 void pa_sink_get_latency_range(pa_sink *s, pa_usec_t *min_latency, pa_usec_t *max_latency);
+pa_usec_t pa_sink_get_fixed_latency(pa_sink *s);
 
 size_t pa_sink_get_max_rewind(pa_sink *s);
 size_t pa_sink_get_max_request(pa_sink *s);
@@ -333,12 +344,13 @@ void pa_sink_set_max_rewind_within_thread(pa_sink *s, size_t max_rewind);
 void pa_sink_set_max_request_within_thread(pa_sink *s, size_t max_request);
 
 void pa_sink_set_latency_range_within_thread(pa_sink *s, pa_usec_t min_latency, pa_usec_t max_latency);
+void pa_sink_set_fixed_latency_within_thread(pa_sink *s, pa_usec_t latency);
 
 /*** To be called exclusively by sink input drivers, from IO context */
 
 void pa_sink_request_rewind(pa_sink*s, size_t nbytes);
 
-void pa_sink_invalidate_requested_latency(pa_sink *s);
+void pa_sink_invalidate_requested_latency(pa_sink *s, pa_bool_t dynamic);
 
 pa_usec_t pa_sink_get_latency_within_thread(pa_sink *s);
 
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 2b3a0c5..3803a6c 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -84,6 +84,7 @@ static void reset_callbacks(pa_source_output *o) {
     o->update_max_rewind = NULL;
     o->update_source_requested_latency = NULL;
     o->update_source_latency_range = NULL;
+    o->update_source_fixed_latency = NULL;
     o->attach = NULL;
     o->detach = NULL;
     o->suspend = NULL;
@@ -561,13 +562,13 @@ pa_usec_t pa_source_output_set_requested_latency_within_thread(pa_source_output
     pa_source_output_assert_io_context(o);
 
     if (!(o->source->flags & PA_SOURCE_DYNAMIC_LATENCY))
-        usec = o->source->fixed_latency;
+        usec = o->source->thread_info.fixed_latency;
 
     if (usec != (pa_usec_t) -1)
         usec = PA_CLAMP(usec, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
 
     o->thread_info.requested_source_latency = usec;
-    pa_source_invalidate_requested_latency(o->source);
+    pa_source_invalidate_requested_latency(o->source, TRUE);
 
     return usec;
 }
@@ -587,7 +588,7 @@ pa_usec_t pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t
 
     if (o->source) {
         if (!(o->source->flags & PA_SOURCE_DYNAMIC_LATENCY))
-            usec = o->source->fixed_latency;
+            usec = pa_source_get_fixed_latency(o->source);
 
         if (usec != (pa_usec_t) -1) {
             pa_usec_t min_latency, max_latency;
diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h
index b78a02b..a70a3fd 100644
--- a/src/pulsecore/source-output.h
+++ b/src/pulsecore/source-output.h
@@ -109,6 +109,10 @@ struct pa_source_output {
      * from IO context. */
     void (*update_source_latency_range) (pa_source_output *o); /* may be NULL */
 
+    /* Called whenver the fixed latency of the source changes, if there
+     * is one. Called from IO context. */
+    void (*update_source_fixed_latency) (pa_source_output *i); /* may be NULL */
+
     /* If non-NULL this function is called when the output is first
      * connected to a source. Called from IO thread context */
     void (*attach) (pa_source_output *o);           /* may be NULL */
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 028d679..8970d8e 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -226,8 +226,6 @@ pa_source* pa_source_new(
     s->muted = data->muted;
     s->refresh_volume = s->refresh_muted = FALSE;
 
-    s->fixed_latency = flags & PA_SOURCE_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
-
     reset_callbacks(s);
     s->userdata = NULL;
 
@@ -274,6 +272,7 @@ pa_source* pa_source_new(
     s->thread_info.requested_latency = 0;
     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
+    s->thread_info.fixed_latency = flags & PA_SOURCE_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
 
     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
 
@@ -370,7 +369,7 @@ void pa_source_put(pa_source *s) {
 
     pa_assert((s->flags & PA_SOURCE_HW_VOLUME_CTRL) || (s->base_volume == PA_VOLUME_NORM && s->flags & PA_SOURCE_DECIBEL_VOLUME));
     pa_assert(!(s->flags & PA_SOURCE_DECIBEL_VOLUME) || s->n_volume_steps == PA_VOLUME_NORM+1);
-    pa_assert(!(s->flags & PA_SOURCE_DYNAMIC_LATENCY) == (s->fixed_latency != 0));
+    pa_assert(!(s->flags & PA_SOURCE_DYNAMIC_LATENCY) == (s->thread_info.fixed_latency != 0));
 
     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
 
@@ -1037,7 +1036,7 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_
             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
                 pa_source_output_unref(o);
 
-            pa_source_invalidate_requested_latency(s);
+            pa_source_invalidate_requested_latency(s, TRUE);
 
             return 0;
         }
@@ -1117,6 +1116,16 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_
             return 0;
         }
 
+        case PA_SOURCE_MESSAGE_GET_FIXED_LATENCY:
+
+            *((pa_usec_t*) userdata) = s->thread_info.fixed_latency;
+            return 0;
+
+        case PA_SOURCE_MESSAGE_SET_FIXED_LATENCY:
+
+            pa_source_set_fixed_latency_within_thread(s, (pa_usec_t) offset);
+            return 0;
+
         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
 
             *((size_t*) userdata) = s->thread_info.max_rewind;
@@ -1223,13 +1232,12 @@ pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
     pa_source_assert_io_context(s);
 
     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
-        return PA_CLAMP(s->fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
+        return PA_CLAMP(s->thread_info.fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
 
     if (s->thread_info.requested_latency_valid)
         return s->thread_info.requested_latency;
 
-    while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
-
+    PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
             result = o->thread_info.requested_source_latency;
@@ -1292,18 +1300,18 @@ void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
 }
 
 /* Called from IO thread */
-void pa_source_invalidate_requested_latency(pa_source *s) {
+void pa_source_invalidate_requested_latency(pa_source *s, pa_bool_t dynamic) {
     pa_source_output *o;
     void *state = NULL;
 
     pa_source_assert_ref(s);
     pa_source_assert_io_context(s);
 
-    if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
+    if ((s->flags & PA_SOURCE_DYNAMIC_LATENCY))
+        s->thread_info.requested_latency_valid = FALSE;
+    else if (dynamic)
         return;
 
-    s->thread_info.requested_latency_valid = FALSE;
-
     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
 
         if (s->update_requested_latency)
@@ -1315,7 +1323,7 @@ void pa_source_invalidate_requested_latency(pa_source *s) {
     }
 
     if (s->monitor_of)
-        pa_sink_invalidate_requested_latency(s->monitor_of);
+        pa_sink_invalidate_requested_latency(s->monitor_of, dynamic);
 }
 
 /* Called from main thread */
@@ -1375,8 +1383,6 @@ void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t
 
 /* Called from IO thread, and from main thread before pa_source_put() is called */
 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
-    void *state = NULL;
-
     pa_source_assert_ref(s);
     pa_source_assert_io_context(s);
 
@@ -1390,18 +1396,23 @@ void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_laten
               (s->flags & PA_SOURCE_DYNAMIC_LATENCY) ||
               s->monitor_of);
 
+    if (s->thread_info.min_latency == min_latency &&
+        s->thread_info.max_latency == max_latency)
+        return;
+
     s->thread_info.min_latency = min_latency;
     s->thread_info.max_latency = max_latency;
 
     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
         pa_source_output *o;
+        void *state = NULL;
 
-        while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
+        PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
             if (o->update_source_latency_range)
                 o->update_source_latency_range(o);
     }
 
-    pa_source_invalidate_requested_latency(s);
+    pa_source_invalidate_requested_latency(s, FALSE);
 }
 
 /* Called from main thread, before the source is put */
@@ -1409,7 +1420,10 @@ void pa_source_set_fixed_latency(pa_source *s, pa_usec_t latency) {
     pa_source_assert_ref(s);
     pa_assert_ctl_context();
 
-    pa_assert(pa_source_get_state(s) == PA_SOURCE_INIT);
+    if (s->flags & PA_SOURCE_DYNAMIC_LATENCY) {
+        pa_assert(latency == 0);
+        return;
+    }
 
     if (latency < ABSOLUTE_MIN_LATENCY)
         latency = ABSOLUTE_MIN_LATENCY;
@@ -1417,7 +1431,58 @@ void pa_source_set_fixed_latency(pa_source *s, pa_usec_t latency) {
     if (latency > ABSOLUTE_MAX_LATENCY)
         latency = ABSOLUTE_MAX_LATENCY;
 
-    s->fixed_latency = latency;
+    if (PA_SOURCE_IS_LINKED(s->state))
+        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_FIXED_LATENCY, NULL, (int64_t) latency, NULL) == 0);
+    else
+        s->thread_info.fixed_latency = latency;
+}
+
+/* Called from main thread */
+pa_usec_t pa_source_get_fixed_latency(pa_source *s) {
+    pa_usec_t latency;
+
+    pa_source_assert_ref(s);
+    pa_assert_ctl_context();
+
+    if (s->flags & PA_SOURCE_DYNAMIC_LATENCY)
+        return 0;
+
+    if (PA_SOURCE_IS_LINKED(s->state))
+        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_FIXED_LATENCY, &latency, 0, NULL) == 0);
+    else
+        latency = s->thread_info.fixed_latency;
+
+    return latency;
+}
+
+/* Called from IO thread */
+void pa_source_set_fixed_latency_within_thread(pa_source *s, pa_usec_t latency) {
+    pa_source_assert_ref(s);
+    pa_source_assert_io_context(s);
+
+    if (s->flags & PA_SOURCE_DYNAMIC_LATENCY) {
+        pa_assert(latency == 0);
+        return;
+    }
+
+    pa_assert(latency >= ABSOLUTE_MIN_LATENCY);
+    pa_assert(latency <= ABSOLUTE_MAX_LATENCY);
+
+    if (s->thread_info.fixed_latency == latency)
+        return;
+
+    s->thread_info.fixed_latency = latency;
+
+    if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
+        pa_source_output *o;
+        void *state = NULL;
+
+        PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
+            if (o->update_source_fixed_latency)
+                o->update_source_fixed_latency(o);
+    }
+
+    pa_source_invalidate_requested_latency(s, FALSE);
 }
 
 /* Called from main thread */
diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h
index 6c0a290..bb085a0 100644
--- a/src/pulsecore/source.h
+++ b/src/pulsecore/source.h
@@ -93,8 +93,6 @@ struct pa_source {
 
     pa_memchunk silence;
 
-    pa_usec_t fixed_latency; /* for sources with PA_SOURCE_DYNAMIC_LATENCY this is 0 */
-
     pa_hashmap *ports;
     pa_device_port *active_port;
 
@@ -153,7 +151,9 @@ struct pa_source {
 
         pa_usec_t min_latency; /* we won't go below this latency */
         pa_usec_t max_latency; /* An upper limit for the latencies */
-    } thread_info;
+
+        pa_usec_t fixed_latency; /* for sources with PA_SOURCE_DYNAMIC_LATENCY this is 0 */
+ } thread_info;
 
     void *userdata;
 };
@@ -175,6 +175,8 @@ typedef enum pa_source_message {
     PA_SOURCE_MESSAGE_DETACH,
     PA_SOURCE_MESSAGE_SET_LATENCY_RANGE,
     PA_SOURCE_MESSAGE_GET_LATENCY_RANGE,
+    PA_SOURCE_MESSAGE_SET_FIXED_LATENCY,
+    PA_SOURCE_MESSAGE_GET_FIXED_LATENCY,
     PA_SOURCE_MESSAGE_GET_MAX_REWIND,
     PA_SOURCE_MESSAGE_SET_MAX_REWIND,
     PA_SOURCE_MESSAGE_MAX
@@ -250,6 +252,7 @@ int pa_source_sync_suspend(pa_source *s);
 pa_usec_t pa_source_get_latency(pa_source *s);
 pa_usec_t pa_source_get_requested_latency(pa_source *s);
 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency);
+pa_usec_t pa_source_get_fixed_latency(pa_source *s);
 
 size_t pa_source_get_max_rewind(pa_source *s);
 
@@ -259,6 +262,7 @@ int pa_source_suspend_all(pa_core *c, pa_bool_t suspend, pa_suspend_cause_t caus
 
 void pa_source_set_volume(pa_source *source, const pa_cvolume *volume, pa_bool_t save);
 const pa_cvolume *pa_source_get_volume(pa_source *source, pa_bool_t force_refresh);
+
 void pa_source_set_mute(pa_source *source, pa_bool_t mute, pa_bool_t save);
 pa_bool_t pa_source_get_mute(pa_source *source, pa_bool_t force_refresh);
 
@@ -290,11 +294,13 @@ void pa_source_detach_within_thread(pa_source *s);
 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s);
 
 void pa_source_set_max_rewind_within_thread(pa_source *s, size_t max_rewind);
+
 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency);
+void pa_source_set_fixed_latency_within_thread(pa_source *s, pa_usec_t latency);
 
 /*** To be called exclusively by source output drivers, from IO context */
 
-void pa_source_invalidate_requested_latency(pa_source *s);
+void pa_source_invalidate_requested_latency(pa_source *s, pa_bool_t dynamic);
 pa_usec_t pa_source_get_latency_within_thread(pa_source *s);
 
 #define pa_source_assert_io_context(s) \

commit 3f9c67a7fb959acd35228f1e7455baf2aacc793b
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:52:50 2009 +0200

    core: call pa_sink_get_latency_within_thread() instead of going directly via process_msg()

diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index fd95f75..e826889 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -1770,10 +1770,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
                 size_t sink_nbytes, total_nbytes;
 
                 /* Get the latency of the sink */
-                if (!(s->flags & PA_SINK_LATENCY) ||
-                    PA_MSGOBJECT(s)->process_msg(PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
-                    usec = 0;
-
+                usec = pa_sink_get_latency_within_thread(s);
                 sink_nbytes = pa_usec_to_bytes(usec, &s->sample_spec);
                 total_nbytes = sink_nbytes + pa_memblockq_get_length(i->thread_info.render_memblockq);
 
@@ -1832,10 +1829,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
                 size_t nbytes;
 
                 /* Get the latency of the sink */
-                if (!(s->flags & PA_SINK_LATENCY) ||
-                    PA_MSGOBJECT(s)->process_msg(PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
-                    usec = 0;
-
+                usec = pa_sink_get_latency_within_thread(s);
                 nbytes = pa_usec_to_bytes(usec, &s->sample_spec);
 
                 if (nbytes > 0)

commit c6080d8c61df4991b96f4f144e58848f6c440440
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:54:02 2009 +0200

    core: don't update latency range if not changed

diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index e826889..77908c9 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -2273,8 +2273,6 @@ void pa_sink_get_latency_range(pa_sink *s, pa_usec_t *min_latency, pa_usec_t *ma
 
 /* Called from IO thread */
 void pa_sink_set_latency_range_within_thread(pa_sink *s, pa_usec_t min_latency, pa_usec_t max_latency) {
-    void *state = NULL;
-
     pa_sink_assert_ref(s);
     pa_sink_assert_io_context(s);
 
@@ -2287,11 +2285,16 @@ void pa_sink_set_latency_range_within_thread(pa_sink *s, pa_usec_t min_latency,
                max_latency == ABSOLUTE_MAX_LATENCY) ||
               (s->flags & PA_SINK_DYNAMIC_LATENCY));
 
+    if (s->thread_info.min_latency == min_latency &&
+        s->thread_info.max_latency == max_latency)
+        return;
+
     s->thread_info.min_latency = min_latency;
     s->thread_info.max_latency = max_latency;
 
     if (PA_SINK_IS_LINKED(s->thread_info.state)) {
         pa_sink_input *i;
+        void *state = NULL;
 
         PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state)
             if (i->update_sink_latency_range)

commit 3c271ae0605fcf1b6ca9ddfb21bda54a783e9926
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:54:25 2009 +0200

    core: document difference between IO and main thread view on requested latency

diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 77908c9..c1589f2 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -1917,6 +1917,9 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
             pa_usec_t *usec = userdata;
             *usec = pa_sink_get_requested_latency_within_thread(s);
 
+            /* Yes, that's right, the IO thread will see -1 when no
+             * explicit requested latency is configured, the main
+             * thread will see max_latency */
             if (*usec == (pa_usec_t) -1)
                 *usec = s->thread_info.max_latency;
 

commit d7d86e32ddea61e93e39f55a9f7e91b8d696dfab
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:54:51 2009 +0200

    native-protocol: downgrade volume change log messages

diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index b578746..280707e 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -1131,6 +1131,12 @@ static void playback_stream_request_bytes(playback_stream *s) {
 
     m = pa_memblockq_pop_missing(s->memblockq);
 
+    /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu)", */
+    /*        (unsigned long) m, */
+    /*        pa_memblockq_get_tlength(s->memblockq), */
+    /*        pa_memblockq_get_minreq(s->memblockq), */
+    /*        pa_memblockq_get_length(s->memblockq)); */
+
     if (m <= 0)
         return;
 
@@ -3381,13 +3387,13 @@ static void command_set_volume(
     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
 
     if (sink) {
-        pa_log("Client %s changes volume of sink %s.", client_name, sink->name);
+        pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE, TRUE);
     } else if (source) {
-        pa_log("Client %s changes volume of sink %s.", client_name, source->name);
+        pa_log_debug("Client %s changes volume of sink %s.", client_name, source->name);
         pa_source_set_volume(source, &volume, TRUE);
     } else if (si) {
-        pa_log("Client %s changes volume of sink %s.",
+        pa_log_debug("Client %s changes volume of sink %s.",
                      client_name,
                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);

commit d9e4605e09e01cc64e3d37452ea0b5c2783a0085
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:55:31 2009 +0200

    hook-list: make use of PA_LLIST_FOREACH

diff --git a/src/pulsecore/hook-list.c b/src/pulsecore/hook-list.c
index a00116d..d9b9917 100644
--- a/src/pulsecore/hook-list.c
+++ b/src/pulsecore/hook-list.c
@@ -97,7 +97,7 @@ pa_hook_result_t pa_hook_fire(pa_hook *hook, void *data) {
 
     hook->n_firing ++;
 
-    for (slot = hook->slots; slot; slot = slot->next) {
+    PA_LLIST_FOREACH(slot, hook->slots) {
         if (slot->dead)
             continue;
 

commit fb5205daac937e98736db1448fe7c8d84f3e78c4
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:57:36 2009 +0200

    remap: unify argument order with other modules

diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c
index 0b7b9b8..45f4e2a 100644
--- a/src/modules/module-remap-sink.c
+++ b/src/modules/module-remap-sink.c
@@ -1,7 +1,7 @@
 /***
   This file is part of PulseAudio.
 
-  Copyright 2004-2008 Lennart Poettering
+  Copyright 2004-2009 Lennart Poettering
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
@@ -48,8 +48,8 @@ PA_MODULE_USAGE(
         "master=<name of sink to remap> "
         "master_channel_map=<channel map> "
         "format=<sample format> "
-        "channels=<number of channels> "
         "rate=<sample rate> "
+        "channels=<number of channels> "
         "channel_map=<channel map> "
         "remix=<remix channels?>");
 

commit c44f518eb9d36fd73ada6d49d51bbb6de389e7b7
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:58:19 2009 +0200

    ladspa: move LADSPA_Data size check to compile time

diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c
index b26330c..2433499 100644
--- a/src/modules/module-ladspa-sink.c
+++ b/src/modules/module-ladspa-sink.c
@@ -395,7 +395,7 @@ int pa__init(pa_module*m) {
 
     pa_assert(m);
 
-    pa_assert(sizeof(LADSPA_Data) == sizeof(float));
+    pa_assert_cc(sizeof(LADSPA_Data) == sizeof(float));
 
     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
         pa_log("Failed to parse module arguments.");

commit 1b3848ebd768632f8ca8baedeb53feaf381847c4
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 00:59:26 2009 +0200

    module-remap: allow moving of sink, forward fixed latency
    
    This is a bigger change reworking a number of things:
    
    - We now allow moving of the remap sink betwween backend sinks like any
      other stream.
    
    - We forward the fixed latency parameter of the underlying sinks the
      same way as the dynamic latency.

diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c
index 45f4e2a..7b4c9bb 100644
--- a/src/modules/module-remap-sink.c
+++ b/src/modules/module-remap-sink.c
@@ -54,10 +54,9 @@ PA_MODULE_USAGE(
         "remix=<remix channels?>");
 
 struct userdata {
-    pa_core *core;
     pa_module *module;
 
-    pa_sink *sink, *master;
+    pa_sink *sink;
     pa_sink_input *sink_input;
 };
 
@@ -80,19 +79,24 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
     switch (code) {
 
-        case PA_SINK_MESSAGE_GET_LATENCY: {
-            pa_usec_t usec = 0;
+        case PA_SINK_MESSAGE_GET_LATENCY:
 
-            /* Get the latency of the master sink */
-            if (PA_MSGOBJECT(u->master)->process_msg(PA_MSGOBJECT(u->master), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
-                usec = 0;
+            /* The sink is _put() before the sink input is, so let's
+             * make sure we don't access it yet */
+            if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
+                !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
+                *((pa_usec_t*) data) = 0;
+                return 0;
+            }
 
-            /* Add the latency internal to our sink input on top */
-            usec += pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->master->sample_spec);
+            *((pa_usec_t*) data) =
+                /* Get the latency of the master sink */
+                pa_sink_get_latency_within_thread(u->sink_input->sink) +
+
+                /* Add the latency internal to our sink input on top */
+                pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
-            *((pa_usec_t*) data) = usec;
             return 0;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -105,12 +109,11 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
     pa_sink_assert_ref(s);
     pa_assert_se(u = s->userdata);
 
-    if (PA_SINK_IS_LINKED(state) &&
-        u->sink_input &&
-        PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
-
-        pa_sink_input_cork(u->sink_input, state == PA_SINK_SUSPENDED);
+    if (!PA_SINK_IS_LINKED(state) ||
+        !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
+        return 0;
 
+    pa_sink_input_cork(u->sink_input, state == PA_SINK_SUSPENDED);
     return 0;
 }
 
@@ -121,6 +124,10 @@ static void sink_request_rewind(pa_sink *s) {
     pa_sink_assert_ref(s);
     pa_assert_se(u = s->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
+        !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
+        return;
+
     pa_sink_input_request_rewind(u->sink_input, s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
 }
 
@@ -131,6 +138,10 @@ static void sink_update_requested_latency(pa_sink *s) {
     pa_sink_assert_ref(s);
     pa_assert_se(u = s->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
+        !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
+        return;
+
     /* Just hand this one over to the master sink */
     pa_sink_input_set_requested_latency_within_thread(
             u->sink_input,
@@ -145,9 +156,6 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state))
-        return -1;
-
     /* Hmm, process any rewind request that might be queued up */
     pa_sink_process_rewind(u->sink, 0);
 
@@ -163,9 +171,6 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state))
-        return;
-
     if (u->sink->thread_info.rewind_nbytes > 0) {
         amount = PA_MIN(u->sink->thread_info.rewind_nbytes, nbytes);
         u->sink->thread_info.rewind_nbytes = 0;
@@ -181,9 +186,6 @@ static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
-
     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
 }
 
@@ -194,9 +196,6 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
-
     pa_sink_set_max_request_within_thread(u->sink, nbytes);
 }
 
@@ -207,24 +206,28 @@ static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
-
     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
 }
 
 /* Called from I/O thread context */
-static void sink_input_detach_cb(pa_sink_input *i) {
+static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
     struct userdata *u;
 
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
+    pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
+}
+
+/* Called from I/O thread context */
+static void sink_input_detach_cb(pa_sink_input *i) {
+    struct userdata *u;
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_se(u = i->userdata);
 
     pa_sink_detach_within_thread(u->sink);
-    pa_sink_set_asyncmsgq(u->sink, NULL);
+
     pa_sink_set_rtpoll(u->sink, NULL);
 }
 
@@ -235,14 +238,13 @@ static void sink_input_attach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
+    pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
+    pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
+    pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
+    pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
+    pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
 
-    pa_sink_set_asyncmsgq(u->sink, i->sink->asyncmsgq);
-    pa_sink_set_rtpoll(u->sink, i->sink->rtpoll);
     pa_sink_attach_within_thread(u->sink);
-
-    pa_sink_set_latency_range_within_thread(u->sink, u->master->thread_info.min_latency, u->master->thread_info.max_latency);
 }
 
 /* Called from main context */
@@ -252,14 +254,18 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_unlink(u->sink);
+    /* The order here matters! We first kill the sink input, followed
+     * by the sink. That means the sink callbacks must be protected
+     * against an unconnected sink input! */
     pa_sink_input_unlink(u->sink_input);
+    pa_sink_unlink(u->sink);
 
-    pa_sink_unref(u->sink);
-    u->sink = NULL;
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
 
+    pa_sink_unref(u->sink);
+    u->sink = NULL;
+
     pa_module_unload_request(u->module, TRUE);
 }
 
@@ -289,6 +295,16 @@ static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
     return u->sink != dest;
 }
 
+/* Called from main context */
+static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
+    struct userdata *u;
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_se(u = i->userdata);
+
+    pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
+}
+
 int pa__init(pa_module*m) {
     struct userdata *u;
     pa_sample_spec ss;
@@ -339,12 +355,8 @@ int pa__init(pa_module*m) {
     }
 
     u = pa_xnew0(struct userdata, 1);
-    u->core = m->core;
     u->module = m;
     m->userdata = u;
-    u->master = master;
-    u->sink = NULL;
-    u->sink_input = NULL;
 
     /* Create sink */
     pa_sink_new_data_init(&sink_data);
@@ -365,7 +377,7 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
-    u->sink = pa_sink_new(m->core, &sink_data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
+    u->sink = pa_sink_new(m->core, &sink_data, master->flags & (PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY));
     pa_sink_new_data_done(&sink_data);
 
     if (!u->sink) {
@@ -380,19 +392,18 @@ int pa__init(pa_module*m) {
     u->sink->userdata = u;
 
     pa_sink_set_asyncmsgq(u->sink, master->asyncmsgq);
-    pa_sink_set_rtpoll(u->sink, master->rtpoll);
 
     /* Create sink input */
     pa_sink_input_new_data_init(&sink_input_data);
     sink_input_data.driver = __FILE__;
     sink_input_data.module = m;
-    sink_input_data.sink = u->master;
+    sink_input_data.sink = master;
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Remapped Stream");
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &stream_map);
 
-    pa_sink_input_new(&u->sink_input, m->core, &sink_input_data, PA_SINK_INPUT_DONT_MOVE | (remix ? 0 : PA_SINK_INPUT_NO_REMIX));
+    pa_sink_input_new(&u->sink_input, m->core, &sink_input_data, (remix ? 0 : PA_SINK_INPUT_NO_REMIX));
     pa_sink_input_new_data_done(&sink_input_data);
 
     if (!u->sink_input)
@@ -403,11 +414,13 @@ int pa__init(pa_module*m) {
     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
     u->sink_input->update_max_request = sink_input_update_max_request_cb;
     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
+    u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
     u->sink_input->kill = sink_input_kill_cb;
     u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->may_move_to = sink_input_may_move_to_cb;
+    u->sink_input->moving = sink_input_moving_cb;
     u->sink_input->userdata = u;
 
     pa_sink_put(u->sink);
@@ -443,15 +456,20 @@ void pa__done(pa_module*m) {
     if (!(u = m->userdata))
         return;
 
-    if (u->sink) {
-        pa_sink_unlink(u->sink);
-        pa_sink_unref(u->sink);
-    }
+    /* See comments in sink_input_kill_cb() above regarding
+     * destruction order! */
 
-    if (u->sink_input) {
+    if (u->sink_input)
         pa_sink_input_unlink(u->sink_input);
+
+    if (u->sink)
+        pa_sink_unlink(u->sink);
+
+    if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
-    }
+
+    if (u->sink)
+        pa_sink_unref(u->sink);
 
     pa_xfree(u);
 }

commit 763866280adf3bd50463b0e316af7a7c4fa5aaf9
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 01:01:52 2009 +0200

    module-ladspa: allow moving of sink, forward fixed latency

diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c
index 2433499..e838be3 100644
--- a/src/modules/module-ladspa-sink.c
+++ b/src/modules/module-ladspa-sink.c
@@ -64,10 +64,9 @@ PA_MODULE_USAGE(
 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
 
 struct userdata {
-    pa_core *core;
     pa_module *module;
 
-    pa_sink *sink, *master;
+    pa_sink *sink;
     pa_sink_input *sink_input;
 
     const LADSPA_Descriptor *descriptor;
@@ -105,19 +104,26 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
     switch (code) {
 
-        case PA_SINK_MESSAGE_GET_LATENCY: {
-            pa_usec_t usec = 0;
+        case PA_SINK_MESSAGE_GET_LATENCY:
 
-            /* Get the latency of the master sink */
-            if (PA_MSGOBJECT(u->master)->process_msg(PA_MSGOBJECT(u->master), PA_SINK_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
-                usec = 0;
+            /* The sink is _put() before the sink input is, so let's
+             * make sure we don't access it in that time. Also, the
+             * sink input is first shut down, the sink second. */
+            if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
+                !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
+                *((pa_usec_t*) data) = 0;
+                return 0;
+            }
+
+            *((pa_usec_t*) data) =
 
-            /* Add the latency internal to our sink input on top */
-            usec += pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->master->sample_spec);
+                /* Get the latency of the master sink */
+                pa_sink_get_latency_within_thread(u->sink_input->sink) +
+
+                /* Add the latency internal to our sink input on top */
+                pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
 
-            *((pa_usec_t*) data) = usec;
             return 0;
-        }
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -130,12 +136,11 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
     pa_sink_assert_ref(s);
     pa_assert_se(u = s->userdata);
 
-    if (PA_SINK_IS_LINKED(state) &&
-        u->sink_input &&
-        PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
-
-        pa_sink_input_cork(u->sink_input, state == PA_SINK_SUSPENDED);
+    if (!PA_SINK_IS_LINKED(state) ||
+        !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
+        return 0;
 
+    pa_sink_input_cork(u->sink_input, state == PA_SINK_SUSPENDED);
     return 0;
 }
 
@@ -146,6 +151,10 @@ static void sink_request_rewind(pa_sink *s) {
     pa_sink_assert_ref(s);
     pa_assert_se(u = s->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
+        !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
+        return;
+
     /* Just hand this one over to the master sink */
     pa_sink_input_request_rewind(u->sink_input, s->thread_info.rewind_nbytes + pa_memblockq_get_length(u->memblockq), TRUE, FALSE, FALSE);
 }
@@ -157,6 +166,10 @@ static void sink_update_requested_latency(pa_sink *s) {
     pa_sink_assert_ref(s);
     pa_assert_se(u = s->userdata);
 
+    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
+        !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
+        return;
+
     /* Just hand this one over to the master sink */
     pa_sink_input_set_requested_latency_within_thread(
             u->sink_input,
@@ -175,9 +188,6 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     pa_assert(chunk);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state))
-        return -1;
-
     /* Hmm, process any rewind request that might be queued up */
     pa_sink_process_rewind(u->sink, 0);
 
@@ -228,9 +238,6 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state))
-        return;
-
     if (u->sink->thread_info.rewind_nbytes > 0) {
         size_t max_rewrite;
 
@@ -266,9 +273,6 @@ static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
-
     pa_memblockq_set_maxrewind(u->memblockq, nbytes);
     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
 }
@@ -280,9 +284,6 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
-
     pa_sink_set_max_request_within_thread(u->sink, nbytes);
 }
 
@@ -293,24 +294,28 @@ static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
-
     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
 }
 
 /* Called from I/O thread context */
-static void sink_input_detach_cb(pa_sink_input *i) {
+static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
     struct userdata *u;
 
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
+    pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
+}
+
+/* Called from I/O thread context */
+static void sink_input_detach_cb(pa_sink_input *i) {
+    struct userdata *u;
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_se(u = i->userdata);
 
     pa_sink_detach_within_thread(u->sink);
-    pa_sink_set_asyncmsgq(u->sink, NULL);
+
     pa_sink_set_rtpoll(u->sink, NULL);
 }
 
@@ -321,14 +326,13 @@ static void sink_input_attach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    if (!u->sink || !PA_SINK_IS_LINKED(u->sink->thread_info.state))
-        return;
+    pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
+    pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
+    pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
+    pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
+    pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
 
-    pa_sink_set_asyncmsgq(u->sink, i->sink->asyncmsgq);
-    pa_sink_set_rtpoll(u->sink, i->sink->rtpoll);
     pa_sink_attach_within_thread(u->sink);
-
-    pa_sink_set_latency_range_within_thread(u->sink, u->master->thread_info.min_latency, u->master->thread_info.max_latency);
 }
 
 /* Called from main context */
@@ -338,14 +342,18 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(u = i->userdata);
 
-    pa_sink_unlink(u->sink);
+    /* The order here matters! We first kill the sink input, followed
+     * by the sink. That means the sink callbacks must be protected
+     * against an unconnected sink input! */
     pa_sink_input_unlink(u->sink_input);
+    pa_sink_unlink(u->sink);
 
-    pa_sink_unref(u->sink);
-    u->sink = NULL;
     pa_sink_input_unref(u->sink_input);
     u->sink_input = NULL;
 
+    pa_sink_unref(u->sink);
+    u->sink = NULL;
+
     pa_module_unload_request(u->module, TRUE);
 }
 
@@ -375,6 +383,16 @@ static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
     return u->sink != dest;
 }
 
+/* Called from main context */
+static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
+    struct userdata *u;
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_se(u = i->userdata);
+
+    pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
+}
+
 int pa__init(pa_module*m) {
     struct userdata *u;
     pa_sample_spec ss;
@@ -428,12 +446,8 @@ int pa__init(pa_module*m) {
     cdata = pa_modargs_get_value(ma, "control", NULL);
 
     u = pa_xnew0(struct userdata, 1);
-    u->core = m->core;
     u->module = m;
     m->userdata = u;
-    u->master = master;
-    u->sink = NULL;
-    u->sink_input = NULL;
     u->memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0, pa_frame_size(&ss), 1, 1, 0, NULL);
 
     if (!(e = getenv("LADSPA_PATH")))
@@ -717,7 +731,7 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
-    u->sink = pa_sink_new(m->core, &sink_data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
+    u->sink = pa_sink_new(m->core, &sink_data, master->flags & (PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY));
     pa_sink_new_data_done(&sink_data);
 
     if (!u->sink) {
@@ -732,19 +746,18 @@ int pa__init(pa_module*m) {
     u->sink->userdata = u;
 
     pa_sink_set_asyncmsgq(u->sink, master->asyncmsgq);
-    pa_sink_set_rtpoll(u->sink, master->rtpoll);
 
     /* Create sink input */
     pa_sink_input_new_data_init(&sink_input_data);
     sink_input_data.driver = __FILE__;
     sink_input_data.module = m;
-    sink_input_data.sink = u->master;
+    sink_input_data.sink = master;
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "LADSPA Stream");
     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
 
-    pa_sink_input_new(&u->sink_input, m->core, &sink_input_data, PA_SINK_INPUT_DONT_MOVE);
+    pa_sink_input_new(&u->sink_input, m->core, &sink_input_data, 0);
     pa_sink_input_new_data_done(&sink_input_data);
 
     if (!u->sink_input)
@@ -755,11 +768,13 @@ int pa__init(pa_module*m) {
     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
     u->sink_input->update_max_request = sink_input_update_max_request_cb;
     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
+    u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
     u->sink_input->kill = sink_input_kill_cb;
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
     u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->may_move_to = sink_input_may_move_to_cb;
+    u->sink_input->moving = sink_input_moving_cb;
     u->sink_input->userdata = u;
 
     pa_sink_put(u->sink);
@@ -800,15 +815,20 @@ void pa__done(pa_module*m) {
     if (!(u = m->userdata))
         return;
 
-    if (u->sink) {
-        pa_sink_unlink(u->sink);
-        pa_sink_unref(u->sink);
-    }
+    /* See comments in sink_input_kill_cb() above regarding
+     * destruction order! */
 
-    if (u->sink_input) {
+    if (u->sink_input)
         pa_sink_input_unlink(u->sink_input);
+
+    if (u->sink)
+        pa_sink_unlink(u->sink);
+
+    if (u->sink_input)
         pa_sink_input_unref(u->sink_input);
-    }
+
+    if (u->sink)
+        pa_sink_unref(u->sink);
 
     for (c = 0; c < u->channels; c++)
         if (u->handle[c]) {

commit a5b2dee03c08b72f4b7d27d9c7ac304d98e0513c
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 01:02:16 2009 +0200

    ladspa: name sink after human readable plugin name, not the id string

diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c
index e838be3..3c6e349 100644
--- a/src/modules/module-ladspa-sink.c
+++ b/src/modules/module-ladspa-sink.c
@@ -711,11 +711,10 @@ int pa__init(pa_module*m) {
     sink_data.module = m;
     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
         sink_data.name = pa_sprintf_malloc("%s.ladspa", master->name);
-    sink_data.namereg_fail = FALSE;
     pa_sink_new_data_set_sample_spec(&sink_data, &ss);
     pa_sink_new_data_set_channel_map(&sink_data, &map);
     z = pa_proplist_gets(master->proplist, PA_PROP_DEVICE_DESCRIPTION);
-    pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "LADSPA Plugin %s on %s", label, z ? z : master->name);
+    pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "LADSPA Plugin %s on %s", d->Name, z ? z : master->name);
     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, master->name);
     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
     pa_proplist_sets(sink_data.proplist, "device.ladspa.module", plugin);

commit 8947d6551586d239be206f90adca2f6dace667a2
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 01:04:21 2009 +0200

    combine: drop adjust_timestamp variable because it is unused

diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 9271655..325b898 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -125,8 +125,6 @@ struct userdata {
 
     pa_resample_method_t resample_method;
 
-    struct timeval adjust_timestamp;
-
     pa_usec_t block_usec;
 
     pa_idxset* outputs; /* managed in main context */
@@ -833,14 +831,11 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
     pa_assert(sink);
     pa_assert(u->sink);
 
-    o = pa_xnew(struct output, 1);
+    o = pa_xnew0(struct output, 1);
     o->userdata = u;
     o->inq = pa_asyncmsgq_new(0);
     o->outq = pa_asyncmsgq_new(0);
-    o->inq_rtpoll_item_write = o->inq_rtpoll_item_read = NULL;
-    o->outq_rtpoll_item_write = o->outq_rtpoll_item_read = NULL;
     o->sink = sink;
-    o->sink_input = NULL;
     o->memblockq = pa_memblockq_new(
             0,
             MEMBLOCKQ_MAXLENGTH,
@@ -1029,18 +1024,14 @@ int pa__init(pa_module*m) {
         }
     }
 
-    m->userdata = u = pa_xnew(struct userdata, 1);
+    m->userdata = u = pa_xnew0(struct userdata, 1);
     u->core = m->core;
     u->module = m;
-    u->sink = NULL;
-    u->time_event = NULL;
     u->adjust_time = DEFAULT_ADJUST_TIME;
     u->rtpoll = pa_rtpoll_new();
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
-    u->thread = NULL;
     u->resample_method = resample_method;
     u->outputs = pa_idxset_new(NULL, NULL);
-    memset(&u->adjust_timestamp, 0, sizeof(u->adjust_timestamp));
     u->sink_put_slot = u->sink_unlink_slot = u->sink_state_changed_slot = NULL;
     PA_LLIST_HEAD_INIT(struct output, u->thread_info.active_outputs);
     pa_atomic_store(&u->thread_info.running, FALSE);
@@ -1095,7 +1086,6 @@ int pa__init(pa_module*m) {
         pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
     }
 
-
     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
     pa_sink_new_data_done(&data);
 

commit e1f3f5e0bf3d788ff69d24cd40c465eaaf6e9385
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 01:07:37 2009 +0200

    combine: big rework

diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 325b898..04c0d4d 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -92,6 +92,8 @@ struct output {
     pa_sink *sink;
     pa_sink_input *sink_input;
 
+    pa_bool_t ignore_state_change;
+
     pa_asyncmsgq *inq,    /* Message queue from the sink thread to this sink input */
                  *outq;   /* Message queue from this sink input to the sink thread */
     pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
@@ -99,9 +101,12 @@ struct output {
 
     pa_memblockq *memblockq;
 
+    /* For communication of the stream latencies to the main thread */
     pa_usec_t total_latency;
 
+    /* For coomunication of the stream parameters to the sink thread */
     pa_atomic_t max_request;
+    pa_atomic_t requested_latency;
 
     PA_LLIST_FIELDS(struct output);
 };
@@ -144,13 +149,16 @@ enum {
     SINK_MESSAGE_REMOVE_OUTPUT,
     SINK_MESSAGE_NEED,
     SINK_MESSAGE_UPDATE_LATENCY,
-    SINK_MESSAGE_UPDATE_MAX_REQUEST
+    SINK_MESSAGE_UPDATE_MAX_REQUEST,
+    SINK_MESSAGE_UPDATE_REQUESTED_LATENCY
 };
 
 enum {
     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
 };
 
+static void output_disable(struct output *o);
+static void output_enable(struct output *o);
 static void output_free(struct output *o);
 static int output_create_sink_input(struct output *o);
 
@@ -170,7 +178,7 @@ static void adjust_rates(struct userdata *u) {
     if (!PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)))
         return;
 
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+    PA_IDXSET_FOREACH(o, u->outputs, idx) {
         pa_usec_t sink_latency;
 
         if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
@@ -187,6 +195,8 @@ static void adjust_rates(struct userdata *u) {
 
         avg_total_latency += o->total_latency;
         n++;
+
+        pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC);
     }
 
     if (min_total_latency == (pa_usec_t) -1)
@@ -201,7 +211,7 @@ static void adjust_rates(struct userdata *u) {
 
     base_rate = u->sink->sample_spec.rate;
 
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+    PA_IDXSET_FOREACH(o, u->outputs, idx) {
         uint32_t r = base_rate;
 
         if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
@@ -213,10 +223,10 @@ static void adjust_rates(struct userdata *u) {
             r += (uint32_t) ((((double) (o->total_latency - target_latency))/(double)u->adjust_time)*(double)r/PA_USEC_PER_SEC);
 
         if (r < (uint32_t) (base_rate*0.9) || r > (uint32_t) (base_rate*1.1)) {
-            pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", pa_proplist_gets(o->sink_input->proplist, PA_PROP_MEDIA_NAME), base_rate, r);
+            pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->sink->name, base_rate, r);
             pa_sink_input_set_rate(o->sink_input, base_rate);
         } else {
-            pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", pa_proplist_gets(o->sink_input->proplist, PA_PROP_MEDIA_NAME), r, (double) r / base_rate, (float) o->total_latency);
+            pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.0f usec.", o->sink_input->sink->name, r, (double) r / base_rate, (float) o->total_latency);
             pa_sink_input_set_rate(o->sink_input, r);
         }
     }
@@ -353,18 +363,15 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
         u->thread_info.counter += chunk.length;
 
         /* OK, let's send this data to the other threads */
-        for (j = u->thread_info.active_outputs; j; j = j->next)
-
-            /* Send to other outputs, which are not the requesting
-             * one */
+        PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
+            if (j == o)
+                continue;
 
-            if (j != o)
-                pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+            pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+        }
 
         /* And place it directly into the requesting output's queue */
-        if (o)
-            pa_memblockq_push_align(o->memblockq, &chunk);
-
+        pa_memblockq_push_align(o->memblockq, &chunk);
         pa_memblock_unref(chunk.memblock);
     }
 }
@@ -400,10 +407,18 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk
     /* If necessary, get some new data */
     request_memblock(o, nbytes);
 
+    /* pa_log("%s q size is %u + %u (%u/%u)", */
+    /*        i->sink->name, */
+    /*        pa_memblockq_get_nblocks(o->memblockq), */
+    /*        pa_memblockq_get_nblocks(i->thread_info.render_memblockq), */
+    /*        pa_memblockq_get_maxrewind(o->memblockq), */
+    /*        pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)); */
+
     if (pa_memblockq_peek(o->memblockq, chunk) < 0)
         return -1;
 
     pa_memblockq_drop(o->memblockq, chunk->length);
+
     return 0;
 }
 
@@ -438,13 +453,35 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
         return;
 
     pa_atomic_store(&o->max_request, (int) nbytes);
-
     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
 }
 
+/* Called from thread context */
+static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
+    struct output *o;
+    pa_usec_t c;
+
+    pa_assert(i);
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_se(o = i->userdata);
+
+    c = pa_sink_get_requested_latency_within_thread(i->sink);
+
+    if (c == (pa_usec_t) -1)
+        c = i->sink->thread_info.max_latency;
+
+    if (pa_atomic_load(&o->requested_latency) == (int) c)
+        return;
+
+    pa_atomic_store(&o->requested_latency, (int) c);
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
+}
+
 /* Called from I/O thread context */
 static void sink_input_attach_cb(pa_sink_input *i) {
     struct output *o;
+    pa_usec_t c;
 
     pa_sink_input_assert_ref(i);
     pa_assert_se(o = i->userdata);
@@ -461,6 +498,16 @@ static void sink_input_attach_cb(pa_sink_input *i) {
             i->sink->thread_info.rtpoll,
             PA_RTPOLL_EARLY,
             o->outq);
+
+    pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
+
+    pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i));
+
+    c = pa_sink_get_requested_latency_within_thread(i->sink);
+    pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c));
+
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
+    pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
 }
 
 /* Called from I/O thread context */
@@ -470,14 +517,15 @@ static void sink_input_detach_cb(pa_sink_input *i) {
     pa_sink_input_assert_ref(i);
     pa_assert_se(o = i->userdata);
 
-    /* Shut down the queue from the sink thread to us */
-    pa_assert(o->inq_rtpoll_item_read && o->outq_rtpoll_item_write);
-
-    pa_rtpoll_item_free(o->inq_rtpoll_item_read);
-    o->inq_rtpoll_item_read = NULL;
+    if (o->inq_rtpoll_item_read) {
+        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
+        o->inq_rtpoll_item_read = NULL;
+    }
 
-    pa_rtpoll_item_free(o->outq_rtpoll_item_write);
-    o->outq_rtpoll_item_write = NULL;
+    if (o->outq_rtpoll_item_write) {
+        pa_rtpoll_item_free(o->outq_rtpoll_item_write);
+        o->outq_rtpoll_item_write = NULL;
+    }
 }
 
 /* Called from main context */
@@ -491,20 +539,6 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     output_free(o);
 }
 
-/* Called from IO thread context */
-static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
-    struct userdata *u;
-
-    pa_sink_input_assert_ref(i);
-    pa_assert_se(u = i->userdata);
-
-    /* If we are added for the first time, ask for a rewinding so that
-     * we are heard right-away. */
-    if (PA_SINK_INPUT_IS_LINKED(state) &&
-        i->thread_info.state == PA_SINK_INPUT_INIT)
-        pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
-}
-
 /* Called from thread context */
 static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct output *o = PA_SINK_INPUT(obj)->userdata;
@@ -535,37 +569,6 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
 }
 
 /* Called from main context */
-static void disable_output(struct output *o) {
-    pa_assert(o);
-
-    if (!o->sink_input)
-        return;
-
-    pa_sink_input_unlink(o->sink_input);
-    pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
-    pa_sink_input_unref(o->sink_input);
-    o->sink_input = NULL;
-}
-
-/* Called from main context */
-static void enable_output(struct output *o) {
-    pa_assert(o);
-
-    if (o->sink_input)
-        return;
-
-    if (output_create_sink_input(o) >= 0) {
-
-        pa_memblockq_flush_write(o->memblockq);
-
-        pa_sink_input_put(o->sink_input);
-
-        if (o->userdata->sink && PA_SINK_IS_LINKED(pa_sink_get_state(o->userdata->sink)))
-            pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
-    }
-}
-
-/* Called from main context */
 static void suspend(struct userdata *u) {
     struct output *o;
     uint32_t idx;
@@ -573,8 +576,8 @@ static void suspend(struct userdata *u) {
     pa_assert(u);
 
     /* Let's suspend by unlinking all streams */
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
-        disable_output(o);
+    PA_IDXSET_FOREACH(o, u->outputs, idx)
+        output_disable(o);
 
     pa_log_info("Device suspended...");
 }
@@ -587,13 +590,8 @@ static void unsuspend(struct userdata *u) {
     pa_assert(u);
 
     /* Let's resume */
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
-
-        pa_sink_suspend(o->sink, FALSE, PA_SUSPEND_IDLE);
-
-        if (PA_SINK_IS_OPENED(pa_sink_get_state(o->sink)))
-            enable_output(o);
-    }
+    PA_IDXSET_FOREACH(o, u->outputs, idx)
+        output_enable(o);
 
     pa_log_info("Resumed successfully...");
 }
@@ -637,7 +635,13 @@ static void update_max_request(struct userdata *u) {
     size_t max_request = 0;
     struct output *o;
 
-    for (o = u->thread_info.active_outputs; o; o = o->next) {
+    pa_assert(u);
+    pa_sink_assert_io_context(u->sink);
+
+    /* Collects the max_request values of all streams and sets the
+     * largest one locally */
+
+    PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
         size_t mr = (size_t) pa_atomic_load(&o->max_request);
 
         if (mr > max_request)
@@ -650,6 +654,67 @@ static void update_max_request(struct userdata *u) {
     pa_sink_set_max_request_within_thread(u->sink, max_request);
 }
 
+/* Called from IO context */
+static void update_fixed_latency(struct userdata *u) {
+    pa_usec_t fixed_latency = 0;
+    struct output *o;
+
+    pa_assert(u);
+    pa_sink_assert_io_context(u->sink);
+
+    /* Collects the requested_latency values of all streams and sets
+     * the largest one as fixed_latency locally */
+
+    PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
+        pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency);
+
+        if (rl > fixed_latency)
+            fixed_latency = rl;
+    }
+
+    if (fixed_latency <= 0)
+        fixed_latency = u->block_usec;
+
+    pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency);
+}
+
+/* Called from thread context of the io thread */
+static void output_add_within_thread(struct output *o) {
+    pa_assert(o);
+    pa_sink_assert_io_context(o->sink);
+
+    PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
+
+    pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
+
+    o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+            o->userdata->rtpoll,
+            PA_RTPOLL_EARLY-1,  /* This item is very important */
+            o->outq);
+    o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+            o->userdata->rtpoll,
+            PA_RTPOLL_EARLY,
+            o->inq);
+}
+
+/* Called from thread context of the io thread */
+static void output_remove_within_thread(struct output *o) {
+    pa_assert(o);
+    pa_sink_assert_io_context(o->sink);
+
+    PA_LLIST_REMOVE(struct output, o->userdata->thread_info.active_outputs, o);
+
+    if (o->outq_rtpoll_item_read) {
+        pa_rtpoll_item_free(o->outq_rtpoll_item_read);
+        o->outq_rtpoll_item_read = NULL;
+    }
+
+    if (o->inq_rtpoll_item_write) {
+        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
+        o->inq_rtpoll_item_write = NULL;
+    }
+}
+
 /* Called from thread context of the io thread */
 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
@@ -682,42 +747,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             return 0;
         }
 
-        case SINK_MESSAGE_ADD_OUTPUT: {
-            struct output *op = data;
-
-            PA_LLIST_PREPEND(struct output, u->thread_info.active_outputs, op);
-
-            pa_assert(!op->outq_rtpoll_item_read && !op->inq_rtpoll_item_write);
-
-            op->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
-                    u->rtpoll,
-                    PA_RTPOLL_EARLY-1,  /* This item is very important */
-                    op->outq);
-            op->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
-                    u->rtpoll,
-                    PA_RTPOLL_EARLY,
-                    op->inq);
-
+        case SINK_MESSAGE_ADD_OUTPUT:
+            output_add_within_thread(data);
             update_max_request(u);
+            update_fixed_latency(u);
             return 0;
-        }
-
-        case SINK_MESSAGE_REMOVE_OUTPUT: {
-            struct output *op = data;
-
-            PA_LLIST_REMOVE(struct output, u->thread_info.active_outputs, op);
-
-            pa_assert(op->outq_rtpoll_item_read && op->inq_rtpoll_item_write);
-
-            pa_rtpoll_item_free(op->outq_rtpoll_item_read);
-            op->outq_rtpoll_item_read = NULL;
-
-            pa_rtpoll_item_free(op->inq_rtpoll_item_write);
-            op->inq_rtpoll_item_write = NULL;
 
+        case SINK_MESSAGE_REMOVE_OUTPUT:
+            output_remove_within_thread(data);
             update_max_request(u);
+            update_fixed_latency(u);
             return 0;
-        }
 
         case SINK_MESSAGE_NEED:
             render_memblock(u, (struct output*) data, (size_t) offset);
@@ -739,10 +779,13 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
         }
 
         case SINK_MESSAGE_UPDATE_MAX_REQUEST:
-
             update_max_request(u);
             break;
-    }
+
+        case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY:
+            update_fixed_latency(u);
+            break;
+}
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
@@ -765,7 +808,7 @@ static void update_description(struct userdata *u) {
 
     t = pa_xstrdup("Simultaneous output to");
 
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx)) {
+    PA_IDXSET_FOREACH(o, u->outputs, idx) {
         char *e;
 
         if (first) {
@@ -800,7 +843,7 @@ static int output_create_sink_input(struct output *o) {
     data.module = o->userdata->module;
     data.resample_method = o->userdata->resample_method;
 
-    pa_sink_input_new(&o->sink_input, o->userdata->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE);
+    pa_sink_input_new(&o->sink_input, o->userdata->core, &data, PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE|PA_SINK_INPUT_NO_CREATE_ON_SUSPEND);
 
     pa_sink_input_new_data_done(&data);
 
@@ -810,9 +853,9 @@ static int output_create_sink_input(struct output *o) {
     o->sink_input->parent.process_msg = sink_input_process_msg;
     o->sink_input->pop = sink_input_pop_cb;
     o->sink_input->process_rewind = sink_input_process_rewind_cb;
-    o->sink_input->state_change = sink_input_state_change_cb;
     o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
     o->sink_input->update_max_request = sink_input_update_max_request_cb;
+    o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
     o->sink_input->attach = sink_input_attach_cb;
     o->sink_input->detach = sink_input_detach_cb;
     o->sink_input->kill = sink_input_kill_cb;
@@ -823,9 +866,9 @@ static int output_create_sink_input(struct output *o) {
     return 0;
 }
 
+/* Called from main context */
 static struct output *output_new(struct userdata *u, pa_sink *sink) {
     struct output *o;
-    pa_sink_state_t state;
 
     pa_assert(u);
     pa_assert(sink);
@@ -845,84 +888,135 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
             0,
             0,
             NULL);
-    pa_atomic_store(&o->max_request, 0);
-    PA_LLIST_INIT(struct output, o);
 
     pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
+    update_description(u);
 
-    state = pa_sink_get_state(u->sink);
-
-    if (state != PA_SINK_INIT)
-        pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
-    else {
-        /* If the sink is not yet started, we need to do the activation ourselves */
-        PA_LLIST_PREPEND(struct output, u->thread_info.active_outputs, o);
-
-        o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
-                u->rtpoll,
-                PA_RTPOLL_EARLY-1,  /* This item is very important */
-                o->outq);
-        o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
-                u->rtpoll,
-                PA_RTPOLL_EARLY,
-                o->inq);
-    }
+    return o;
+}
 
-    if (PA_SINK_IS_OPENED(state) || state == PA_SINK_INIT) {
-        pa_sink_suspend(sink, FALSE, PA_SUSPEND_IDLE);
+/* Called from main context */
+static void output_free(struct output *o) {
+    pa_assert(o);
 
-        if (PA_SINK_IS_OPENED(pa_sink_get_state(sink)))
-            if (output_create_sink_input(o) < 0)
-                goto fail;
-    }
+    output_disable(o);
 
-    update_description(u);
+    pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
+    update_description(o->userdata);
 
-    return o;
+    if (o->inq_rtpoll_item_read)
+        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
+    if (o->inq_rtpoll_item_write)
+        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
 
-fail:
+    if (o->outq_rtpoll_item_read)
+        pa_rtpoll_item_free(o->outq_rtpoll_item_read);
+    if (o->outq_rtpoll_item_write)
+        pa_rtpoll_item_free(o->outq_rtpoll_item_write);
 
-    if (o) {
-        pa_idxset_remove_by_data(u->outputs, o, NULL);
+    if (o->inq)
+        pa_asyncmsgq_unref(o->inq);
 
-        if (o->sink_input) {
-            pa_sink_input_unlink(o->sink_input);
-            pa_sink_input_unref(o->sink_input);
-        }
+    if (o->outq)
+        pa_asyncmsgq_unref(o->outq);
+
+    if (o->memblockq)
+        pa_memblockq_free(o->memblockq);
 
-        if (o->memblockq)
-            pa_memblockq_free(o->memblockq);
+    pa_xfree(o);
+}
 
-        if (o->inq)
-            pa_asyncmsgq_unref(o->inq);
+/* Called from main context */
+static void output_enable(struct output *o) {
+    pa_assert(o);
+
+    if (o->sink_input)
+        return;
+
+    /* This might cause the sink to be resumed. The state change hook
+     * of the sink might hence be called from here, which might then
+     * cause us to be called in a loop. Make sure that state changes
+     * for this output don't cause this loop by setting a flag here */
+    o->ignore_state_change = TRUE;
+
+    if (output_create_sink_input(o) >= 0) {
 
-        if (o->outq)
-            pa_asyncmsgq_unref(o->outq);
+        if (pa_sink_get_state(o->sink) != PA_SINK_INIT) {
 
-        pa_xfree(o);
+            /* First we register the output. That means that the sink
+             * will start to pass data to this output. */
+            pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
+
+            /* Then we enable the sink input. That means that the sink
+             * is now asked for new data. */
+            pa_sink_input_put(o->sink_input);
+
+        } else
+            /* Hmm the sink is not yet started, do things right here */
+            output_add_within_thread(o);
     }
 
-    return NULL;
+    o->ignore_state_change = FALSE;
 }
 
+/* Called from main context */
+static void output_disable(struct output *o) {
+    pa_assert(o);
+
+    if (!o->sink_input)
+        return;
+
+    /* First we disable the sink input. That means that the sink is
+     * not asked for new data anymore  */
+    pa_sink_input_unlink(o->sink_input);
+
+    /* Then we unregister the output. That means that the sink doesn't
+     * pass any further data to this output */
+    pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
+
+    /* Now dellocate the stream */
+    pa_sink_input_unref(o->sink_input);
+    o->sink_input = NULL;
+
+    /* Finally, drop all queued data */
+    pa_memblockq_flush_write(o->memblockq);
+    pa_asyncmsgq_flush(o->inq, FALSE);
+    pa_asyncmsgq_flush(o->outq, FALSE);
+}
+
+/* Called from main context */
+static void output_verify(struct output *o) {
+    pa_assert(o);
+
+    if (PA_SINK_IS_OPENED(pa_sink_get_state(o->userdata->sink)))
+        output_enable(o);
+    else
+        output_disable(o);
+}
+
+/* Called from main context */
 static pa_bool_t is_suitable_sink(struct userdata *u, pa_sink *s) {
     const char *t;
 
     pa_sink_assert_ref(s);
 
+    if (s == u->sink)
+        return FALSE;
+
     if (!(s->flags & PA_SINK_HARDWARE))
         return FALSE;
 
-    if (s == u->sink)
+    if (!(s->flags & PA_SINK_LATENCY))
         return FALSE;
 
     if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
-        if (strcmp(t, "sound"))
+        if (!pa_streq(t, "sound"))
             return FALSE;
 
     return TRUE;
 }
 
+/* Called from main context */
 static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
     struct output *o;
 
@@ -935,18 +1029,17 @@ static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata
         return PA_HOOK_OK;
 
     pa_log_info("Configuring new sink: %s", s->name);
-
     if (!(o = output_new(u, s))) {
         pa_log("Failed to create sink input on sink '%s'.", s->name);
         return PA_HOOK_OK;
     }
 
-    if (o->sink_input)
-        pa_sink_input_put(o->sink_input);
+    output_verify(o);
 
     return PA_HOOK_OK;
 }
 
+/* Called from main context */
 static struct output* find_output(struct userdata *u, pa_sink *s) {
     struct output *o;
     uint32_t idx;
@@ -957,13 +1050,14 @@ static struct output* find_output(struct userdata *u, pa_sink *s) {
     if (u->sink == s)
         return NULL;
 
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
+    PA_IDXSET_FOREACH(o, u->outputs, idx)
         if (o->sink == s)
             return o;
 
     return NULL;
 }
 
+/* Called from main context */
 static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
     struct output *o;
 
@@ -975,26 +1069,25 @@ static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userd
         return PA_HOOK_OK;
 
     pa_log_info("Unconfiguring sink: %s", s->name);
-
     output_free(o);
 
     return PA_HOOK_OK;
 }
 
+/* Called from main context */
 static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
     struct output *o;
-    pa_sink_state_t state;
 
     if (!(o = find_output(u, s)))
         return PA_HOOK_OK;
 
-    state = pa_sink_get_state(s);
-
-    if (PA_SINK_IS_OPENED(state) && PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)) && !o->sink_input)
-        enable_output(o);
+    /* This state change might be triggered because we are creating a
+     * stream here, in that case we don't want to create it a second
+     * time here and enter a loop */
+    if (o->ignore_state_change)
+        return PA_HOOK_OK;
 
-    if (state == PA_SINK_SUSPENDED && o->sink_input)
-        disable_output(o);
+    output_verify(o);
 
     return PA_HOOK_OK;
 }
@@ -1139,7 +1232,7 @@ int pa__init(pa_module*m) {
 
         /* We're in automatic mode, we add every sink that matches our needs  */
 
-        for (s = pa_idxset_first(m->core->sinks, &idx); s; s = pa_idxset_next(m->core->sinks, &idx)) {
+        PA_IDXSET_FOREACH(s, m->core->sinks, idx) {
 
             if (!is_suitable_sink(u, s))
                 continue;
@@ -1164,9 +1257,8 @@ int pa__init(pa_module*m) {
     /* Activate the sink and the sink inputs */
     pa_sink_put(u->sink);
 
-    for (o = pa_idxset_first(u->outputs, &idx); o; o = pa_idxset_next(u->outputs, &idx))
-        if (o->sink_input)
-            pa_sink_input_put(o->sink_input);
+    PA_IDXSET_FOREACH(o, u->outputs, idx)
+        output_verify(o);
 
     if (u->adjust_time > 0)
         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time * PA_USEC_PER_SEC, time_callback, u);
@@ -1185,37 +1277,6 @@ fail:
     return -1;
 }
 
-static void output_free(struct output *o) {
-    pa_assert(o);
-
-    disable_output(o);
-
-    pa_assert_se(pa_idxset_remove_by_data(o->userdata->outputs, o, NULL));
-
-    update_description(o->userdata);
-
-    if (o->inq_rtpoll_item_read)
-        pa_rtpoll_item_free(o->inq_rtpoll_item_read);
-    if (o->inq_rtpoll_item_write)
-        pa_rtpoll_item_free(o->inq_rtpoll_item_write);
-
-    if (o->outq_rtpoll_item_read)
-        pa_rtpoll_item_free(o->outq_rtpoll_item_read);
-    if (o->outq_rtpoll_item_write)
-        pa_rtpoll_item_free(o->outq_rtpoll_item_write);
-
-    if (o->inq)
-        pa_asyncmsgq_unref(o->inq);
-
-    if (o->outq)
-        pa_asyncmsgq_unref(o->outq);
-
-    if (o->memblockq)
-        pa_memblockq_free(o->memblockq);
-
-    pa_xfree(o);
-}
-
 void pa__done(pa_module*m) {
     struct userdata *u;
     struct output *o;

commit 1eeddd84d2ff2482dd4a6d2dd43dc8a315ba72a4
Author: Lennart Poettering <lennart at poettering.net>
Date:   Sat Aug 15 01:16:57 2009 +0200

    combine: warn when the latency of a stream gets too high

diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 04c0d4d..155b928 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -197,6 +197,9 @@ static void adjust_rates(struct userdata *u) {
         n++;
 
         pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC);
+
+        if (o->total_latency > 10*PA_USEC_PER_SEC)
+            pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
     }
 
     if (min_total_latency == (pa_usec_t) -1)

-- 
hooks/post-receive
PulseAudio Sound Server



More information about the pulseaudio-commits mailing list