[pulseaudio-commits] r1562 - in /branches/lennart/src: ./ modules/ pulsecore/ tests/

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Tue Jul 31 15:44:55 PDT 2007


Author: lennart
Date: Wed Aug  1 00:44:53 2007
New Revision: 1562

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=3D1562&root=3Dpulseaudio&vi=
ew=3Drev
Log:
A lot of updates, all necessary to get the native protocol ported:

* add an int64_t argument to pa_asyncmsgq because it is very difficult to p=
ass 64 values otherwise
* simplify subclassing in pa_object
* s/drop/unlink/ at some places
* port the native protocol to the lock-free core (not tested, compiles fine)
* move synchronisation of playback streams into pa_sink_input
* add "start_corked" field to pa_sink_input_new_data
* allow casting of NULL values in pa_object

Modified:
    branches/lennart/src/Makefile.am
    branches/lennart/src/modules/module-alsa-sink.c
    branches/lennart/src/modules/module-alsa-source.c
    branches/lennart/src/modules/module-null-sink.c
    branches/lennart/src/modules/module-oss.c
    branches/lennart/src/modules/module-pipe-sink.c
    branches/lennart/src/modules/module-pipe-source.c
    branches/lennart/src/pulsecore/asyncmsgq.c
    branches/lennart/src/pulsecore/asyncmsgq.h
    branches/lennart/src/pulsecore/core.c
    branches/lennart/src/pulsecore/msgobject.c
    branches/lennart/src/pulsecore/msgobject.h
    branches/lennart/src/pulsecore/native-common.h
    branches/lennart/src/pulsecore/object.c
    branches/lennart/src/pulsecore/object.h
    branches/lennart/src/pulsecore/protocol-native.c
    branches/lennart/src/pulsecore/protocol-simple.c
    branches/lennart/src/pulsecore/sink-input.c
    branches/lennart/src/pulsecore/sink-input.h
    branches/lennart/src/pulsecore/sink.c
    branches/lennart/src/pulsecore/sink.h
    branches/lennart/src/pulsecore/sound-file-stream.c
    branches/lennart/src/pulsecore/source-output.c
    branches/lennart/src/pulsecore/source-output.h
    branches/lennart/src/pulsecore/source.c
    branches/lennart/src/pulsecore/source.h
    branches/lennart/src/tests/asyncmsgq-test.c

Modified: branches/lennart/src/Makefile.am
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/Makefile.a=
m?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/Makefile.am (original)
+++ branches/lennart/src/Makefile.am Wed Aug  1 00:44:53 2007
@@ -722,10 +722,10 @@
 		libauthkey-prop.la \
 		libstrlist.la \
 		libprotocol-simple.la \
-		libprotocol-http.la =

+		libprotocol-http.la \
+		libprotocol-native.la
 =

 #		libprotocol-esound.la
-#		libprotocol-native.la
 =

 # We need to emulate sendmsg/recvmsg to support this on Win32
 if !OS_IS_WIN32
@@ -879,11 +879,10 @@
 		module-volume-restore.la \
 		module-rescue-streams.la \
 		module-http-protocol-tcp.la \
-		module-sine.la
-
+		module-sine.la \
+		module-native-protocol-tcp.la \
+		module-native-protocol-fd.la
 #		module-esound-protocol-tcp.la \
-#		module-native-protocol-tcp.la \
-#		module-native-protocol-fd.la \
 #		module-combine.la \
 #		module-tunnel-sink.la \
 #		module-tunnel-source.la \
@@ -899,10 +898,10 @@
 if HAVE_AF_UNIX
 modlibexec_LTLIBRARIES +=3D \
 		module-cli-protocol-unix.la \
-		module-simple-protocol-unix.la
-		module-http-protocol-unix.la
-#		module-esound-protocol-unix.la \
-#		module-native-protocol-unix.la
+		module-simple-protocol-unix.la \
+		module-http-protocol-unix.la \
+		module-native-protocol-unix.la
+#		module-esound-protocol-unix.la =

 endif
 =

 if HAVE_MKFIFO
@@ -1083,20 +1082,20 @@
 =

 # Native protocol
 =

-#module_native_protocol_tcp_la_SOURCES =3D modules/module-protocol-stub.c
-#module_native_protocol_tcp_la_CFLAGS =3D -DUSE_TCP_SOCKETS -DUSE_PROTOCOL=
_NATIVE $(AM_CFLAGS)
-#module_native_protocol_tcp_la_LDFLAGS =3D -module -avoid-version
-#module_native_protocol_tcp_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la lib=
protocol-native.la libsocket-server.la
-
-#module_native_protocol_unix_la_SOURCES =3D modules/module-protocol-stub.c
-#module_native_protocol_unix_la_CFLAGS =3D -DUSE_UNIX_SOCKETS -DUSE_PROTOC=
OL_NATIVE $(AM_CFLAGS)
-#module_native_protocol_unix_la_LDFLAGS =3D -module -avoid-version
-#module_native_protocol_unix_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la li=
bprotocol-native.la libsocket-server.la libsocket-util.la
-
-#module_native_protocol_fd_la_SOURCES =3D modules/module-native-protocol-f=
d.c
-#module_native_protocol_fd_la_CFLAGS =3D $(AM_CFLAGS)
-#module_native_protocol_fd_la_LDFLAGS =3D -module -avoid-version
-#module_native_protocol_fd_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la libp=
rotocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
+module_native_protocol_tcp_la_SOURCES =3D modules/module-protocol-stub.c
+module_native_protocol_tcp_la_CFLAGS =3D -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_=
NATIVE $(AM_CFLAGS)
+module_native_protocol_tcp_la_LDFLAGS =3D -module -avoid-version
+module_native_protocol_tcp_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la libp=
rotocol-native.la libsocket-server.la
+
+module_native_protocol_unix_la_SOURCES =3D modules/module-protocol-stub.c
+module_native_protocol_unix_la_CFLAGS =3D -DUSE_UNIX_SOCKETS -DUSE_PROTOCO=
L_NATIVE $(AM_CFLAGS)
+module_native_protocol_unix_la_LDFLAGS =3D -module -avoid-version
+module_native_protocol_unix_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la lib=
protocol-native.la libsocket-server.la libsocket-util.la
+
+module_native_protocol_fd_la_SOURCES =3D modules/module-native-protocol-fd=
.c
+module_native_protocol_fd_la_CFLAGS =3D $(AM_CFLAGS)
+module_native_protocol_fd_la_LDFLAGS =3D -module -avoid-version
+module_native_protocol_fd_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la libpr=
otocol-native.la libsocket-server.la libsocket-util.la libiochannel.la
 =

 # EsounD protocol
 =


Modified: branches/lennart/src/modules/module-alsa-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/mo=
dule-alsa-sink.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddi=
ff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/modules/module-alsa-sink.c (original)
+++ branches/lennart/src/modules/module-alsa-sink.c Wed Aug  1 00:44:53 2007
@@ -302,7 +302,7 @@
     return -1;
 }
 =

-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memc=
hunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t=
 offset, pa_memchunk *chunk) {
     struct userdata *u =3D PA_SINK(o)->userdata;
 =

     switch (code) {
@@ -347,7 +347,7 @@
             break;
     }
 =

-    return pa_sink_process_msg(o, code, data, chunk);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 =

 static int mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
@@ -510,12 +510,13 @@
         int code;
         void *data;
         pa_memchunk chunk;
+        int64_t offset;
         int r;
 =

 /*         pa_log("loop");     */
         =

         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, =
0) =3D=3D 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset,=
 &chunk, 0) =3D=3D 0) {
             int ret;
 =

 /*             pa_log("processing msg"); */
@@ -525,7 +526,7 @@
                 goto finish;
             }
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         } =

@@ -660,7 +661,7 @@
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 =

 finish:
@@ -893,7 +894,7 @@
         pa_sink_disconnect(u->sink);
 =

     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, N=
ULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0=
, NULL);
         pa_thread_free(u->thread);
     }
 =


Modified: branches/lennart/src/modules/module-alsa-source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/mo=
dule-alsa-source.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3D=
diff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/modules/module-alsa-source.c (original)
+++ branches/lennart/src/modules/module-alsa-source.c Wed Aug  1 00:44:53 2=
007
@@ -290,7 +290,7 @@
     return -1;
 }
 =

-static int source_process_msg(pa_msgobject *o, int code, void *data, pa_me=
mchunk *chunk) {
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64=
_t offset, pa_memchunk *chunk) {
     struct userdata *u =3D PA_SOURCE(o)->userdata;
 =

     switch (code) {
@@ -335,7 +335,7 @@
             break;
     }
 =

-    return pa_source_process_msg(o, code, data, chunk);
+    return pa_source_process_msg(o, code, data, offset, chunk);
 }
 =

 static int mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
@@ -498,12 +498,13 @@
         int code;
         void *data;
         int r;
+        int64_t offset;
         pa_memchunk chunk;
 =

 /*         pa_log("loop");     */
         =

         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, =
0) =3D=3D 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset,=
 &chunk, 0) =3D=3D 0) {
             int ret;
 =

 /*             pa_log("processing msg"); */
@@ -513,7 +514,7 @@
                 goto finish;
             }
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         } =

@@ -634,7 +635,7 @@
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 =

 finish:
@@ -864,7 +865,7 @@
         pa_source_disconnect(u->source);
 =

     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, N=
ULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0=
, NULL);
         pa_thread_free(u->thread);
     }
 =


Modified: branches/lennart/src/modules/module-null-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/mo=
dule-null-sink.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddi=
ff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/modules/module-null-sink.c (original)
+++ branches/lennart/src/modules/module-null-sink.c Wed Aug  1 00:44:53 2007
@@ -83,7 +83,7 @@
     NULL
 };
 =

-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memc=
hunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t=
 offset, pa_memchunk *chunk) {
     struct userdata *u =3D PA_SINK(o)->userdata;
 =

     switch (code) {
@@ -107,7 +107,7 @@
         }
     }
     =

-    return pa_sink_process_msg(o, code, data, chunk);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 =

 static void thread_func(void *userdata) {
@@ -131,9 +131,10 @@
         pa_memchunk chunk;
         int r, timeout;
         struct timeval now;
+        int64_t offset;
 =

         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, =
0) =3D=3D 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset,=
 &chunk, 0) =3D=3D 0) {
             int ret;
 =

             if (!object && code =3D=3D PA_MESSAGE_SHUTDOWN) {
@@ -141,7 +142,7 @@
                 goto finish;
             }
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
@@ -190,7 +191,7 @@
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 =

 finish:
@@ -271,7 +272,7 @@
         pa_sink_disconnect(u->sink);
 =

     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, N=
ULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0=
, NULL);
         pa_thread_free(u->thread);
     }
 =


Modified: branches/lennart/src/modules/module-oss.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/mo=
dule-oss.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/modules/module-oss.c (original)
+++ branches/lennart/src/modules/module-oss.c Wed Aug  1 00:44:53 2007
@@ -581,7 +581,7 @@
     return -1;
 }
 =

-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memc=
hunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t=
 offset, pa_memchunk *chunk) {
     struct userdata *u =3D PA_SINK(o)->userdata;
     int do_trigger =3D 0, ret, quick =3D 1;
 =

@@ -673,7 +673,7 @@
             break;
     }
 =

-    ret =3D pa_sink_process_msg(o, code, data, chunk);
+    ret =3D pa_sink_process_msg(o, code, data, offset, chunk);
 =

     if (do_trigger)
         trigger(u, quick);
@@ -681,7 +681,7 @@
     return ret;
 }
 =

-static int source_process_msg(pa_msgobject *o, int code, void *data, pa_me=
mchunk *chunk) {
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64=
_t offset, pa_memchunk *chunk) {
     struct userdata *u =3D PA_SOURCE(o)->userdata;
     int do_trigger =3D 0, ret, quick =3D 1;
 =

@@ -770,7 +770,7 @@
             break;
     }
 =

-    ret =3D pa_source_process_msg(o, code, data, chunk);
+    ret =3D pa_source_process_msg(o, code, data, offset, chunk);
 =

     if (do_trigger)
         trigger(u, quick);
@@ -807,11 +807,12 @@
         void *data;
         pa_memchunk chunk;
         int r;
+        int64_t offset;
 =

 /*        pa_log("loop");    */
         =

         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, =
0) =3D=3D 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset,=
 &chunk, 0) =3D=3D 0) {
             int ret;
 =

 /*             pa_log("processing msg"); */
@@ -821,7 +822,7 @@
                 goto finish;
             }
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         } =

@@ -1051,7 +1052,7 @@
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 =

 finish:
@@ -1300,9 +1301,9 @@
 =

     /* Read mixer settings */
     if (u->source)
-        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE=
_MESSAGE_GET_VOLUME, &u->source->volume, NULL, NULL);
+        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE=
_MESSAGE_GET_VOLUME, &u->source->volume, 0, NULL, NULL);
     if (u->sink)
-        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MES=
SAGE_GET_VOLUME, &u->sink->volume, NULL, NULL);
+        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MES=
SAGE_GET_VOLUME, &u->sink->volume, 0, NULL, NULL);
 =

     return 0;
 =

@@ -1335,7 +1336,7 @@
         pa_source_disconnect(u->source);
 =

     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, N=
ULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0=
, NULL);
         pa_thread_free(u->thread);
     }
 =


Modified: branches/lennart/src/modules/module-pipe-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/mo=
dule-pipe-sink.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddi=
ff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/modules/module-pipe-sink.c (original)
+++ branches/lennart/src/modules/module-pipe-sink.c Wed Aug  1 00:44:53 2007
@@ -84,7 +84,7 @@
     NULL
 };
 =

-static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_memc=
hunk *chunk) {
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t=
 offset, pa_memchunk *chunk) {
     struct userdata *u =3D PA_SINK(o)->userdata;
 =

     switch (code) {
@@ -103,7 +103,7 @@
         }
     }
     =

-    return pa_sink_process_msg(o, code, data, chunk);
+    return pa_sink_process_msg(o, code, data, offset, chunk);
 }
 =

 static void thread_func(void *userdata) {
@@ -133,9 +133,10 @@
         void *data;
         pa_memchunk chunk;
         int r;
+        int64_t offset;
 =

         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, =
0) =3D=3D 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset,=
 &chunk, 0) =3D=3D 0) {
             int ret;
 =

             if (!object && code =3D=3D PA_MESSAGE_SHUTDOWN) {
@@ -143,7 +144,7 @@
                 goto finish;
             }
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
@@ -224,7 +225,7 @@
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 =

 finish:
@@ -326,7 +327,7 @@
         pa_sink_disconnect(u->sink);
 =

     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, N=
ULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0=
, NULL);
         pa_thread_free(u->thread);
     }
 =


Modified: branches/lennart/src/modules/module-pipe-source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/mo=
dule-pipe-source.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3D=
diff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/modules/module-pipe-source.c (original)
+++ branches/lennart/src/modules/module-pipe-source.c Wed Aug  1 00:44:53 2=
007
@@ -111,9 +111,10 @@
         void *data;
         pa_memchunk chunk;
         int r;
+        int64_t offset;
 =

         /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &chunk, =
0) =3D=3D 0) {
+        if (pa_asyncmsgq_get(u->asyncmsgq, &object, &code, &data, &offset,=
 &chunk, 0) =3D=3D 0) {
             int ret;
 =

             if (!object && code =3D=3D PA_MESSAGE_SHUTDOWN) {
@@ -121,7 +122,7 @@
                 goto finish;
             }
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(u->asyncmsgq, ret);
             continue;
         }
@@ -202,7 +203,7 @@
 fail:
     /* We have to continue processing messages until we receive the
      * SHUTDOWN message */
-    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, NULL, NULL);
+    pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u->core), PA_CORE_M=
ESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->asyncmsgq, PA_MESSAGE_SHUTDOWN);
 =

 finish:
@@ -303,7 +304,7 @@
         pa_source_disconnect(u->source);
 =

     if (u->thread) {
-        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, N=
ULL);
+        pa_asyncmsgq_send(u->asyncmsgq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0=
, NULL);
         pa_thread_free(u->thread);
     }
 =


Modified: branches/lennart/src/pulsecore/asyncmsgq.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
asyncmsgq.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/asyncmsgq.c (original)
+++ branches/lennart/src/pulsecore/asyncmsgq.c Wed Aug  1 00:44:53 2007
@@ -46,6 +46,7 @@
     pa_msgobject *object;
     void *userdata;
     pa_free_cb_t free_cb;
+    int64_t offset;
     pa_memchunk memchunk;
     pa_semaphore *semaphore;
     int ret;
@@ -96,7 +97,7 @@
     pa_xfree(a);
 }
 =

-void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, co=
nst void *userdata, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
+void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, co=
nst void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t =
free_cb) {
     struct asyncmsgq_item *i;
     pa_assert(a);
 =

@@ -107,6 +108,7 @@
     i->object =3D object ? pa_msgobject_ref(object) : NULL;
     i->userdata =3D (void*) userdata;
     i->free_cb =3D free_cb;
+    i->offset =3D offset;
     if (chunk) {
         pa_assert(chunk->memblock);
         i->memchunk =3D *chunk;
@@ -121,7 +123,7 @@
     pa_mutex_unlock(a->mutex);
 }
 =

-int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, con=
st void *userdata, const pa_memchunk *chunk) {
+int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, con=
st void *userdata, int64_t offset, const pa_memchunk *chunk) {
     struct asyncmsgq_item i;
     pa_assert(a);
 =

@@ -130,6 +132,7 @@
     i.userdata =3D (void*) userdata;
     i.free_cb =3D NULL;
     i.ret =3D -1;
+    i.offset =3D offset;
     if (chunk) {
         pa_assert(chunk->memblock);
         i.memchunk =3D *chunk;
@@ -148,7 +151,7 @@
     return i.ret;
 }
 =

-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, vo=
id **userdata, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, vo=
id **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
     pa_assert(a);
     pa_assert(code);
     pa_assert(!a->current);
@@ -163,6 +166,8 @@
     *code =3D a->current->code;
     if (userdata)
         *userdata =3D a->current->userdata;
+    if (offset)
+        *offset =3D a->current->offset;
     if (object) {
         if ((*object =3D a->current->object))
             pa_msgobject_assert_ref(*object);
@@ -207,13 +212,14 @@
     do {
         pa_msgobject *o;
         void *data;
+    int64_t offset;
         pa_memchunk chunk;
         int ret;
 =

-        if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0)
+        if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
             return -1;
 =

-        ret =3D pa_asyncmsgq_dispatch(o, c, data, &chunk);
+        ret =3D pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
         pa_asyncmsgq_done(a, ret);
 =

     } while (c !=3D code);
@@ -239,10 +245,10 @@
     pa_asyncq_after_poll(a->asyncq);
 }
 =

-int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, =
pa_memchunk *memchunk) {
+int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, =
int64_t offset, pa_memchunk *memchunk) {
 =

     if (object)
-        return object->process_msg(object, code, userdata, memchunk);
+        return object->process_msg(object, code, userdata, offset, memchun=
k);
 =

     return 0;
 }

Modified: branches/lennart/src/pulsecore/asyncmsgq.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
asyncmsgq.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/asyncmsgq.h (original)
+++ branches/lennart/src/pulsecore/asyncmsgq.h Wed Aug  1 00:44:53 2007
@@ -57,11 +57,11 @@
 pa_asyncmsgq* pa_asyncmsgq_new(size_t size);
 void pa_asyncmsgq_free(pa_asyncmsgq* q);
 =

-void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, co=
nst void *userdata, const pa_memchunk *memchunk, pa_free_cb_t userdata_free=
_cb);
-int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, con=
st void *userdata, const pa_memchunk *memchunk);
+void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, co=
nst void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb=
_t userdata_free_cb);
+int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, con=
st void *userdata, int64_t offset, const pa_memchunk *memchunk);
 =

-int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, vo=
id **userdata, pa_memchunk *memchunk, int wait);
-int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, =
pa_memchunk *memchunk);
+int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, vo=
id **userdata, int64_t *offset, pa_memchunk *memchunk, int wait);
+int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, =
int64_t offset, pa_memchunk *memchunk);
 void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
 =


Modified: branches/lennart/src/pulsecore/core.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
core.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/core.c (original)
+++ branches/lennart/src/pulsecore/core.c Wed Aug  1 00:44:53 2007
@@ -49,9 +49,9 @@
 =

 #include "core.h"
 =

-static PA_DEFINE_CHECK_TYPE(pa_core, core_check_type, pa_msgobject_check_t=
ype);
-
-static int core_process_msg(pa_msgobject *o, int code, void *userdata, pa_=
memchunk *chunk) {
+static PA_DEFINE_CHECK_TYPE(pa_core, pa_msgobject);
+
+static int core_process_msg(pa_msgobject *o, int code, void *userdata, int=
64_t offset, pa_memchunk *chunk) {
     pa_core *c =3D PA_CORE(o);
 =

     pa_core_assert_ref(c);
@@ -79,13 +79,14 @@
         pa_msgobject *object;
         int code;
         void *data;
+        int64_t offset;
         pa_memchunk chunk;
 =

         /* Check whether there is a message for us to process */
-        while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &chun=
k, 0) =3D=3D 0) {
+        while (pa_asyncmsgq_get(c->asyncmsgq, &object, &code, &data, &offs=
et, &chunk, 0) =3D=3D 0) {
             int ret;
 =

-            ret =3D pa_asyncmsgq_dispatch(object, code, data, &chunk);
+            ret =3D pa_asyncmsgq_dispatch(object, code, data, offset, &chu=
nk);
             pa_asyncmsgq_done(c->asyncmsgq, ret);
         }
 =

@@ -116,7 +117,7 @@
         }
     }
 =

-    c =3D pa_msgobject_new(pa_core, core_check_type);
+    c =3D pa_msgobject_new(pa_core);
     c->parent.parent.free =3D core_free;
     c->parent.process_msg =3D core_process_msg;
 =


Modified: branches/lennart/src/pulsecore/msgobject.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
msgobject.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/msgobject.c (original)
+++ branches/lennart/src/pulsecore/msgobject.c Wed Aug  1 00:44:53 2007
@@ -28,15 +28,22 @@
 =

 #include "msgobject.h"
 =

-PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_msgobject_check_type, pa_object_chec=
k_type);
+PA_DEFINE_CHECK_TYPE(pa_msgobject, pa_object);
 =

-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name=
, int (*check_type)(pa_object *o, const char *type_name)) {
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name=
, int (*check_type)(const char *type_name)) {
     pa_msgobject *o;
 =

     pa_assert(size > sizeof(pa_msgobject));
     pa_assert(type_name);
 =

-    o =3D PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type =
? check_type : pa_msgobject_check_type));
+    if (!check_type)
+        check_type =3D pa_msgobject_check_type;
+
+    pa_assert(check_type(type_name));
+    pa_assert(check_type("pa_object"));
+    pa_assert(check_type("pa_msgobject"));
+
+    o =3D PA_MSGOBJECT(pa_object_new_internal(size, type_name, check_type)=
);
     o->process_msg =3D NULL;
     return o;
 }

Modified: branches/lennart/src/pulsecore/msgobject.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
msgobject.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/msgobject.h (original)
+++ branches/lennart/src/pulsecore/msgobject.h Wed Aug  1 00:44:53 2007
@@ -37,14 +37,14 @@
 =

 struct pa_msgobject {
     pa_object parent;
-    int (*process_msg)(pa_msgobject *o, int code, void *userdata, pa_memch=
unk *chunk);
+    int (*process_msg)(pa_msgobject *o, int code, void *userdata, int64_t =
offset, pa_memchunk *chunk);
 };
 =

-pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name=
, int (*check_type)(pa_object *o, const char *type_name));
+pa_msgobject *pa_msgobject_new_internal(size_t size, const char *type_name=
, int (*check_type)(const char *type_name));
 =

-int pa_msgobject_check_type(pa_object *o, const char *type);
+int pa_msgobject_check_type(const char *type);
 =

-#define pa_msgobject_new(type, check_type) ((type*) pa_msgobject_new_inter=
nal(sizeof(type), #type, check_type))
+#define pa_msgobject_new(type) ((type*) pa_msgobject_new_internal(sizeof(t=
ype), #type, type##_check_type))
 #define pa_msgobject_free ((void (*) (pa_msgobject* o)) pa_object_free)
 =

 #define PA_MSGOBJECT(o) pa_msgobject_cast(o)

Modified: branches/lennart/src/pulsecore/native-common.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
native-common.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/native-common.h (original)
+++ branches/lennart/src/pulsecore/native-common.h Wed Aug  1 00:44:53 2007
@@ -115,6 +115,8 @@
     PA_COMMAND_MOVE_SINK_INPUT,
     PA_COMMAND_MOVE_SOURCE_OUTPUT,
 =

+    PA_COMMAND_SET_SINK_INPUT_MUTE,
+
     PA_COMMAND_MAX
 };
 =


Modified: branches/lennart/src/pulsecore/object.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
object.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/object.c (original)
+++ branches/lennart/src/pulsecore/object.c Wed Aug  1 00:44:53 2007
@@ -28,17 +28,23 @@
 =

 #include "object.h"
 =

-pa_object *pa_object_new_internal(size_t size, const char *type_name, int =
(*check_type)(pa_object *o, const char *type_name)) {
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int =
(*check_type)(const char *type_name)) {
     pa_object *o;
 =

     pa_assert(size > sizeof(pa_object));
     pa_assert(type_name);
 =

+    if (!check_type)
+        check_type =3D pa_object_check_type;
+
+    pa_assert(check_type(type_name));
+    pa_assert(check_type("pa_object"));
+    =

     o =3D pa_xmalloc(size);
     PA_REFCNT_INIT(o);
     o->type_name =3D type_name;
     o->free =3D pa_object_free;
-    o->check_type =3D check_type ? check_type : pa_object_check_type;
+    o->check_type =3D check_type;
 =

     return o;
 }
@@ -59,8 +65,7 @@
     }
 }
 =

-int pa_object_check_type(pa_object *o, const char *type_name) {
-    pa_assert(o);
+int pa_object_check_type(const char *type_name) {
     pa_assert(type_name);
     =

     return type_name =3D=3D "pa_object" || strcmp(type_name, "pa_object") =
=3D=3D 0;

Modified: branches/lennart/src/pulsecore/object.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
object.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/object.h (original)
+++ branches/lennart/src/pulsecore/object.h Wed Aug  1 00:44:53 2007
@@ -38,20 +38,19 @@
     PA_REFCNT_DECLARE;
     const char *type_name;
     void (*free)(pa_object *o);
-    int (*check_type)(pa_object *o, const char *type_name);
+    int (*check_type)(const char *type_name);
 };
 =

-pa_object *pa_object_new_internal(size_t size, const char *type_name, int =
(*check_type)(pa_object *o, const char *type_name));
-#define pa_object_new(type, check_type) ((type*) pa_object_new_internal(si=
zeof(type), #type, check_type)
+pa_object *pa_object_new_internal(size_t size, const char *type_name, int =
(*check_type)(const char *type_name));
+#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), =
#type, type##_check_type)
 =

 #define pa_object_free ((void (*) (pa_object* o)) pa_xfree)
 =

-int pa_object_check_type(pa_object *o, const char *type);
+int pa_object_check_type(const char *type);
 =

 static inline int pa_object_isinstance(void *o) {
     pa_object *obj =3D (pa_object*) o;
-    pa_assert(obj);
-    return obj->check_type(obj, "pa_object");
+    return obj ? obj->check_type("pa_object") : 0;
 }
 =

 pa_object *pa_object_ref(pa_object *o);
@@ -63,19 +62,18 @@
 =

 static inline pa_object* pa_object_cast(void *o) {
     pa_object *obj =3D (pa_object*) o;
-    pa_assert(obj->check_type(obj, "pa_object"));
+    pa_assert(!obj || obj->check_type("pa_object"));
     return obj;
 }
 =

-#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o))
+#define pa_object_assert_ref(o) pa_assert(pa_object_refcnt(o) > 0)
 =

 #define PA_OBJECT(o) pa_object_cast(o)
 =

 #define PA_DECLARE_CLASS(c)                                             \
     static inline int c##_isinstance(void *o) {                         \
         pa_object *obj =3D (pa_object*) o;                                \
-        pa_assert(obj);                                                 \
-        return obj->check_type(obj, #c);                                \
+        return obj ? obj->check_type(#c) : 1;                           \
     }                                                                   \
     static inline c* c##_cast(void *o) {                                \
         pa_assert(c##_isinstance(o));                                   \
@@ -95,14 +93,13 @@
     }                                                                   \
     struct __stupid_useless_struct_to_allow_trailing_semicolon
 =

-#define PA_DEFINE_CHECK_TYPE(c, func, parent)                           \
-    int func(pa_object *o, const char *type) {                          \
-        pa_assert(o);                                                   \
+#define PA_DEFINE_CHECK_TYPE(c, parent)                                 \
+    int c##_check_type(const char *type) {                              \
         pa_assert(type);                                                \
         if (type =3D=3D #c ||                                             =
  \
             strcmp(type, #c) =3D=3D 0)                                    =
  \
             return 1;                                                   \
-        return parent(o, type);                                         \
+        return parent##_check_type(type);                               \
     }                                                                   \
     struct __stupid_useless_struct_to_allow_trailing_semicolon
 =


Modified: branches/lennart/src/pulsecore/protocol-native.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
protocol-native.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Dd=
iff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/protocol-native.c (original)
+++ branches/lennart/src/pulsecore/protocol-native.c Wed Aug  1 00:44:53 20=
07
@@ -28,7 +28,6 @@
 =

 #include <string.h>
 #include <stdio.h>
-#include <assert.h>
 #include <stdlib.h>
 #include <unistd.h>
 =

@@ -72,54 +71,57 @@
 =

 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
 =

-struct connection;
+typedef struct connection connection;
 struct pa_protocol_native;
 =

-struct record_stream {
-    struct connection *connection;
+typedef struct record_stream {
+    pa_msgobject parent;
+
+    connection *connection;
     uint32_t index;
+    =

     pa_source_output *source_output;
     pa_memblockq *memblockq;
     size_t fragment_size;
-};
-
-struct playback_stream {
-    int type;
-    struct connection *connection;
+} record_stream;
+
+typedef struct output_stream {
+    pa_msgobject parent;
+} output_stream;
+
+typedef struct playback_stream {
+    output_stream parent;
+    =

+    connection *connection;
     uint32_t index;
+    =

     pa_sink_input *sink_input;
     pa_memblockq *memblockq;
-    size_t requested_bytes;
     int drain_request;
     uint32_t drain_tag;
     uint32_t syncid;
     int underrun;
 =

-    /* Sync group members */
-    PA_LLIST_FIELDS(struct playback_stream);
-};
-
-struct upload_stream {
-    int type;
-    struct connection *connection;
+    pa_atomic_t missing;
+    size_t last_missing;
+} playback_stream;
+
+typedef struct upload_stream {
+    output_stream parent;
+    =

+    connection *connection;
     uint32_t index;
+    =

     pa_memchunk memchunk;
     size_t length;
     char *name;
     pa_sample_spec sample_spec;
     pa_channel_map channel_map;
-};
-
-struct output_stream {
-    int type;
-};
-
-enum {
-    UPLOAD_STREAM,
-    PLAYBACK_STREAM
-};
+} upload_stream;
 =

 struct connection {
+    pa_msgobject parent;
+    =

     int authorized;
     uint32_t version;
     pa_protocol_native *protocol;
@@ -132,10 +134,31 @@
     pa_time_event *auth_timeout_event;
 };
 =

+
+PA_DECLARE_CLASS(record_stream);
+#define RECORD_STREAM(o) (record_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
+
+PA_DECLARE_CLASS(output_stream);
+#define OUTPUT_STREAM(o) (output_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
+
+PA_DECLARE_CLASS(playback_stream);
+#define PLAYBACK_STREAM(o) (playback_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
+
+PA_DECLARE_CLASS(upload_stream);
+#define UPLOAD_STREAM(o) (upload_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
+
+PA_DECLARE_CLASS(connection);
+#define CONNECTION(o) (connection_cast(o))
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
+
 struct pa_protocol_native {
     pa_module *module;
+    pa_core *core;
     int public;
-    pa_core *core;
     pa_socket_server *server;
     pa_idxset *connections;
     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
@@ -146,16 +169,38 @@
     pa_ip_acl *auth_ip_acl;
 };
 =

+enum {
+    SINK_INPUT_MESSAGE_POST_DATA =3D PA_SINK_INPUT_MESSAGE_MAX, /* data fr=
om main loop to sink input */
+    SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
+    SINK_INPUT_MESSAGE_FLUSH,
+    SINK_INPUT_MESSAGE_TRIGGER,
+    SINK_INPUT_MESSAGE_SEEK,
+    SINK_INPUT_MESSAGE_PREBUF_FORCE
+};
+
+enum {
+    PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink=
 input from the main loop */
+    PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
+    PLAYBACK_STREAM_MESSAGE_OVERFLOW,
+    PLAYBACK_STREAM_MESSAGE_DRAIN_ACK
+};
+
+enum {
+    RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to =
main loop */
+};
+
 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk);
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk,=
 size_t length);
+static void sink_input_drop_cb(pa_sink_input *i, size_t length);
 static void sink_input_kill_cb(pa_sink_input *i);
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i);
-
+
+static void send_memblock(connection *c);
 static void request_bytes(struct playback_stream*s);
 =

 static void source_output_kill_cb(pa_source_output *o);
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *=
chunk);
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
+
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdat=
a, int64_t offset, pa_memchunk *chunk);
 =

 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag,=
 pa_tagstruct *t, void *userdata);
 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t comm=
and, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -179,8 +224,7 @@
 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_=
t tag, pa_tagstruct *t, void *userdata);
 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t =
tag, pa_tagstruct *t, void *userdata);
 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t comman=
d, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_flush_playback_stream(pa_pdispatch *pd, uint32_t comma=
nd, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_trigger_or_prebuf_playback_stream(pa_pdispatch *pd, ui=
nt32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatc=
h *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t =
command, uint32_t tag, pa_tagstruct *t, void *userdata);
 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, ui=
nt32_t tag, pa_tagstruct *t, void *userdata);
 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag,=
 pa_tagstruct *t, void *userdata);
@@ -239,12 +283,13 @@
     [PA_COMMAND_SET_SOURCE_VOLUME] =3D command_set_volume,
 =

     [PA_COMMAND_SET_SINK_MUTE] =3D command_set_mute,
+    [PA_COMMAND_SET_SINK_INPUT_MUTE] =3D command_set_mute,
     [PA_COMMAND_SET_SOURCE_MUTE] =3D command_set_mute,
 =

     [PA_COMMAND_CORK_PLAYBACK_STREAM] =3D command_cork_playback_stream,
-    [PA_COMMAND_FLUSH_PLAYBACK_STREAM] =3D command_flush_playback_stream,
-    [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] =3D command_trigger_or_prebuf_pla=
yback_stream,
-    [PA_COMMAND_PREBUF_PLAYBACK_STREAM] =3D command_trigger_or_prebuf_play=
back_stream,
+    [PA_COMMAND_FLUSH_PLAYBACK_STREAM] =3D command_trigger_or_flush_or_pre=
buf_playback_stream,
+    [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] =3D command_trigger_or_flush_or_p=
rebuf_playback_stream,
+    [PA_COMMAND_PREBUF_PLAYBACK_STREAM] =3D command_trigger_or_flush_or_pr=
ebuf_playback_stream,
 =

     [PA_COMMAND_CORK_RECORD_STREAM] =3D command_cork_record_stream,
     [PA_COMMAND_FLUSH_RECORD_STREAM] =3D command_flush_record_stream,
@@ -269,74 +314,145 @@
 =

 /* structure management */
 =

-static struct upload_stream* upload_stream_new(
-    struct connection *c,
-    const pa_sample_spec *ss,
-    const pa_channel_map *map,
-    const char *name, size_t length) {
-
-    struct upload_stream *s;
-    assert(c && ss && name && length);
-
-    s =3D pa_xnew(struct upload_stream, 1);
-    s->type =3D UPLOAD_STREAM;
+static void upload_stream_unlink(upload_stream *s) {
+    pa_assert(s);
+
+    if (!s->connection)
+        return;
+
+    pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s=
, NULL) =3D=3D s);
+    upload_stream_unref(s);
+    s->connection =3D NULL;
+}
+
+static void upload_stream_free(pa_object *o) {
+    upload_stream *s =3D UPLOAD_STREAM(o);
+    pa_assert(s);
+
+    upload_stream_unlink(s);
+
+    pa_xfree(s->name);
+
+    if (s->memchunk.memblock)
+        pa_memblock_unref(s->memchunk.memblock);
+
+    pa_xfree(s);
+}
+
+static upload_stream* upload_stream_new(
+        connection *c,
+        const pa_sample_spec *ss,
+        const pa_channel_map *map,
+        const char *name, size_t length) {
+
+    upload_stream *s;
+    =

+    pa_assert(c);
+    pa_assert(ss);
+    pa_assert(name);
+    pa_assert(length > 0);
+
+    s =3D pa_msgobject_new(upload_stream);
+    c->parent.parent.free =3D upload_stream_free;
     s->connection =3D c;
     s->sample_spec =3D *ss;
     s->channel_map =3D *map;
     s->name =3D pa_xstrdup(name);
-
-    s->memchunk.memblock =3D NULL;
-    s->memchunk.index =3D 0;
-    s->memchunk.length =3D 0;
-
+    pa_memchunk_reset(&s->memchunk);
     s->length =3D length;
 =

     pa_idxset_put(c->output_streams, s, &s->index);
+    =

     return s;
 }
 =

-static void upload_stream_free(struct upload_stream *o) {
-    assert(o && o->connection);
-
-    pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
-
-    pa_xfree(o->name);
-
-    if (o->memchunk.memblock)
-        pa_memblock_unref(o->memchunk.memblock);
-
-    pa_xfree(o);
-}
-
-static struct record_stream* record_stream_new(
-    struct connection *c,
-    pa_source *source,
-    const pa_sample_spec *ss,
-    const pa_channel_map *map,
-    const char *name,
-    size_t maxlength,
-    size_t fragment_size) {
-
-    struct record_stream *s;
+static void record_stream_unlink(record_stream *s) {
+    pa_assert(s);
+
+    if (!s->connection)
+        return;
+
+    if (s->source_output) {
+        pa_source_output_disconnect(s->source_output);
+        pa_source_output_unref(s->source_output);
+        s->source_output =3D NULL;
+    }
+
+    pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s=
, NULL) =3D=3D s);
+    record_stream_unref(s);    =

+    s->connection =3D NULL;
+}
+
+static void record_stream_free(pa_object *o) {
+    record_stream *s =3D RECORD_STREAM(o);
+    pa_assert(s);
+
+    record_stream_unlink(s);
+    =

+    pa_memblockq_free(s->memblockq);
+    pa_xfree(s);
+}
+
+static int record_stream_process_msg(pa_msgobject *o, int code, void*userd=
ata, int64_t offset, pa_memchunk *chunk) {
+    record_stream *s =3D RECORD_STREAM(o);
+    record_stream_assert_ref(s);
+
+    switch (code) {
+        =

+        case RECORD_STREAM_MESSAGE_POST_DATA:
+            =

+            if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+                pa_log_warn("Failed to push data into output queue.");
+                return -1;
+            }
+
+            if (!pa_pstream_is_pending(s->connection->pstream))
+                send_memblock(s->connection);
+            =

+            pa_pstream_send_memblock(s->connection->pstream, s->index, 0, =
PA_SEEK_RELATIVE, chunk);
+            break;
+    }
+
+    return 0;
+}
+
+static record_stream* record_stream_new(
+        connection *c,
+        pa_source *source,
+        const pa_sample_spec *ss,
+        const pa_channel_map *map,
+        const char *name,
+        size_t *maxlength,
+        size_t fragment_size,
+        int corked) {
+
+    record_stream *s;
     pa_source_output *source_output;
     size_t base;
     pa_source_output_new_data data;
 =

-    assert(c && ss && name && maxlength);
+    pa_assert(c);
+    pa_assert(ss);
+    pa_assert(name);
+    pa_assert(maxlength);
+    pa_assert(*maxlength > 0);
 =

     pa_source_output_new_data_init(&data);
+    data.module =3D c->protocol->module;
+    data.client =3D c->client;
     data.source =3D source;
     data.driver =3D __FILE__;
     data.name =3D name;
+    data.corked =3D corked;
     pa_source_output_new_data_set_sample_spec(&data, ss);
     pa_source_output_new_data_set_channel_map(&data, map);
-    data.module =3D c->protocol->module;
-    data.client =3D c->client;
 =

     if (!(source_output =3D pa_source_output_new(c->protocol->core, &data,=
 0)))
         return NULL;
 =

-    s =3D pa_xnew(struct record_stream, 1);
+    s =3D pa_msgobject_new(record_stream);
+    c->parent.parent.free =3D record_stream_free;
+    c->parent.process_msg =3D record_stream_process_msg;
     s->connection =3D c;
     s->source_output =3D source_output;
     s->source_output->push =3D source_output_push_cb;
@@ -346,58 +462,143 @@
 =

     s->memblockq =3D pa_memblockq_new(
             0,
-            maxlength,
+            *maxlength,
             0,
             base =3D pa_frame_size(ss),
             1,
             0,
             NULL);
-    assert(s->memblockq);
 =

     s->fragment_size =3D (fragment_size/base)*base;
-    if (!s->fragment_size)
+    if (s->fragment_size <=3D 0)
         s->fragment_size =3D base;
+    *maxlength =3D pa_memblockq_get_maxlength(s->memblockq);
 =

     pa_idxset_put(c->record_streams, s, &s->index);
+
+    pa_source_output_put(s->source_output);
     return s;
 }
 =

-static void record_stream_free(struct record_stream* r) {
-    assert(r && r->connection);
-
-    pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
-    pa_source_output_disconnect(r->source_output);
-    pa_source_output_unref(r->source_output);
-    pa_memblockq_free(r->memblockq);
-    pa_xfree(r);
-}
-
-static struct playback_stream* playback_stream_new(
-        struct connection *c,
+static void playback_stream_unlink(playback_stream *s) {
+    pa_assert(s);
+
+    if (!s->connection)
+        return;
+
+    if (s->sink_input) {
+        pa_sink_input_disconnect(s->sink_input);
+        pa_sink_input_unref(s->sink_input);
+        s->sink_input =3D NULL;
+    }
+
+    if (s->drain_request)
+        pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR=
_NOENTITY);
+
+    pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s=
, NULL) =3D=3D s);
+    playback_stream_unref(s);    =

+    s->connection =3D NULL;
+}
+
+static void playback_stream_free(pa_object* o) {
+    playback_stream *s =3D PLAYBACK_STREAM(o);
+    pa_assert(s);
+
+    playback_stream_unlink(s);
+    =

+    pa_memblockq_free(s->memblockq);
+    pa_xfree(s);
+}
+
+static int playback_stream_process_msg(pa_msgobject *o, int code, void*use=
rdata, int64_t offset, pa_memchunk *chunk) {
+    playback_stream *s =3D PLAYBACK_STREAM(o);
+    playback_stream_assert_ref(s);
+
+    switch (code) {
+        case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
+            pa_tagstruct *t;
+            int32_t l;
+
+            if ((l =3D pa_atomic_load(&s->missing)) <=3D 0)
+                break;
+            =

+            pa_assert_se(pa_atomic_sub(&s->missing, l) >=3D l);
+            =

+            t =3D pa_tagstruct_new(NULL, 0);
+            pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
+            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+            pa_tagstruct_putu32(t, s->index);
+            pa_tagstruct_putu32(t, l);
+            pa_pstream_send_tagstruct(s->connection->pstream, t);
+
+     /*     pa_log("Requesting %u bytes", l);  */
+            break;
+        }
+
+        case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
+            pa_tagstruct *t;
+
+            /* Report that we're empty */
+            t =3D pa_tagstruct_new(NULL, 0);
+            pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
+            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+            pa_tagstruct_putu32(t, s->index);
+            pa_pstream_send_tagstruct(s->connection->pstream, t);
+            break;
+        }
+
+        case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
+            pa_tagstruct *t;
+
+            /* Notify the user we're overflowed*/
+            t =3D pa_tagstruct_new(NULL, 0);
+            pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
+            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+            pa_tagstruct_putu32(t, s->index);
+            pa_pstream_send_tagstruct(s->connection->pstream, t);
+            break;
+        }
+
+        case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
+            pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_U=
INT(userdata));
+            break;
+
+    }
+
+    return 0;
+}
+
+static playback_stream* playback_stream_new(
+        connection *c,
         pa_sink *sink,
         const pa_sample_spec *ss,
         const pa_channel_map *map,
         const char *name,
-        size_t maxlength,
-        size_t tlength,
-        size_t prebuf,
-        size_t minreq,
+        size_t *maxlength,
+        size_t *tlength,
+        size_t *prebuf,
+        size_t *minreq,
         pa_cvolume *volume,
-        uint32_t syncid) {
-
-    struct playback_stream *s, *ssync;
+        uint32_t syncid,
+        int corked,
+        size_t *missing) {
+
+    playback_stream *s, *ssync;
     pa_sink_input *sink_input;
     pa_memblock *silence;
     uint32_t idx;
     int64_t start_index;
     pa_sink_input_new_data data;
 =

-    assert(c && ss && name && maxlength);
+    pa_assert(c);
+    pa_assert(ss);
+    pa_assert(name);
+    pa_assert(maxlength);
 =

     /* Find syncid group */
     for (ssync =3D pa_idxset_first(c->output_streams, &idx); ssync; ssync =
=3D pa_idxset_next(c->output_streams, &idx)) {
 =

-        if (ssync->type !=3D PLAYBACK_STREAM)
+        if (!playback_stream_isinstance(ssync))
             continue;
 =

         if (ssync->syncid =3D=3D syncid)
@@ -405,8 +606,13 @@
     }
 =

     /* Synced streams must connect to the same sink */
-    if (ssync)
-        sink =3D ssync->sink_input->sink;
+    if (ssync) {
+
+        if (!sink)
+            sink =3D ssync->sink_input->sink;
+        else if (sink !=3D ssync->sink_input->sink)
+            return NULL;
+    }
 =

     pa_sink_input_new_data_init(&data);
     data.sink =3D sink;
@@ -417,146 +623,136 @@
     pa_sink_input_new_data_set_volume(&data, volume);
     data.module =3D c->protocol->module;
     data.client =3D c->client;
+    data.start_corked =3D corked;
+    data.sync_base =3D ssync ? ssync->sink_input : NULL;
 =

     if (!(sink_input =3D pa_sink_input_new(c->protocol->core, &data, 0)))
         return NULL;
 =

-    s =3D pa_xnew(struct playback_stream, 1);
-    s->type =3D PLAYBACK_STREAM;
+    s =3D pa_msgobject_new(playback_stream);
+    c->parent.parent.free =3D playback_stream_free;
+    c->parent.process_msg =3D playback_stream_process_msg;
     s->connection =3D c;
     s->syncid =3D syncid;
     s->sink_input =3D sink_input;
     s->underrun =3D 1;
 =

+    s->sink_input->parent.process_msg =3D sink_input_process_msg;
     s->sink_input->peek =3D sink_input_peek_cb;
     s->sink_input->drop =3D sink_input_drop_cb;
     s->sink_input->kill =3D sink_input_kill_cb;
-    s->sink_input->get_latency =3D sink_input_get_latency_cb;
     s->sink_input->userdata =3D s;
 =

-    if (ssync) {
-        /* Sync id found, now find head of list */
-        PA_LLIST_FIND_HEAD(struct playback_stream, ssync, &ssync);
-
-        /* Prepend ourselves */
-        PA_LLIST_PREPEND(struct playback_stream, ssync, s);
-
-        /* Set our start index to the current read index of the other groz=
p member(s) */
-        assert(ssync->next);
-        start_index =3D pa_memblockq_get_read_index(ssync->next->memblockq=
);
-    } else {
-        /* This ia a new sync group */
-        PA_LLIST_INIT(struct playback_stream, s);
-        start_index =3D 0;
-    }
+    start_index =3D ssync ? pa_memblockq_get_read_index(ssync->memblockq) =
: 0;
 =

     silence =3D pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
 =

     s->memblockq =3D pa_memblockq_new(
             start_index,
-            maxlength,
-            tlength,
+            *maxlength,
+            *tlength,
             pa_frame_size(ss),
-            prebuf,
-            minreq,
+            *prebuf,
+            *minreq,
             silence);
 =

     pa_memblock_unref(silence);
 =

-    s->requested_bytes =3D 0;
+    *maxlength =3D pa_memblockq_get_maxlength(s->memblockq);
+    *tlength =3D pa_memblockq_get_tlength(s->memblockq);
+    *prebuf =3D pa_memblockq_get_prebuf(s->memblockq);
+    *minreq =3D pa_memblockq_get_minreq(s->memblockq);
+    *missing =3D pa_memblockq_missing(s->memblockq);
+    =

+    pa_atomic_store(&s->missing, 0);
+    s->last_missing =3D *missing;
     s->drain_request =3D 0;
 =

     pa_idxset_put(c->output_streams, s, &s->index);
 =

+    pa_sink_input_put(s->sink_input);
+    =

     return s;
 }
 =

-static void playback_stream_free(struct playback_stream* p) {
-    struct playback_stream *head;
-    assert(p && p->connection);
-
-    if (p->drain_request)
-        pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERR=
_NOENTITY);
-
-    PA_LLIST_FIND_HEAD(struct playback_stream, p, &head);
-    PA_LLIST_REMOVE(struct playback_stream, head, p);
-
-    pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
-    pa_sink_input_disconnect(p->sink_input);
-    pa_sink_input_unref(p->sink_input);
-    pa_memblockq_free(p->memblockq);
-    pa_xfree(p);
-}
-
-static void connection_free(struct connection *c) {
-    struct record_stream *r;
-    struct output_stream *o;
-    assert(c && c->protocol);
-
-    pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
+static void connection_unlink(connection *c) {
+    record_stream *r;
+    output_stream *o;
+
+    pa_assert(c);
+
+    if (!c->protocol)
+        return;
+
     while ((r =3D pa_idxset_first(c->record_streams, NULL)))
-        record_stream_free(r);
+        record_stream_unlink(r);
+
+    while ((o =3D pa_idxset_first(c->output_streams, NULL)))
+        if (playback_stream_isinstance(o))
+            playback_stream_unlink(PLAYBACK_STREAM(o));
+        else
+            upload_stream_unlink(UPLOAD_STREAM(o));
+
+    if (c->subscription)
+        pa_subscription_free(c->subscription);
+
+    if (c->pstream)
+        pa_pstream_close(c->pstream);
+
+    if (c->auth_timeout_event) {
+        c->protocol->core->mainloop->time_free(c->auth_timeout_event);
+        c->auth_timeout_event =3D NULL;
+    }
+    =

+    pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NUL=
L) =3D=3D c);
+    connection_unref(c);
+    c->protocol =3D NULL;
+}
+
+static void connection_free(pa_object *o) {
+    connection *c =3D CONNECTION(o);
+    =

+    pa_assert(c);
+
+    connection_unlink(c);
+    =

     pa_idxset_free(c->record_streams, NULL, NULL);
-
-    while ((o =3D pa_idxset_first(c->output_streams, NULL)))
-        if (o->type =3D=3D PLAYBACK_STREAM)
-            playback_stream_free((struct playback_stream*) o);
-        else
-            upload_stream_free((struct upload_stream*) o);
     pa_idxset_free(c->output_streams, NULL, NULL);
 =

     pa_pdispatch_unref(c->pdispatch);
-    pa_pstream_close(c->pstream);
     pa_pstream_unref(c->pstream);
     pa_client_free(c->client);
 =

-    if (c->subscription)
-        pa_subscription_free(c->subscription);
-
-    if (c->auth_timeout_event)
-        c->protocol->core->mainloop->time_free(c->auth_timeout_event);
-
     pa_xfree(c);
 }
 =

-static void request_bytes(struct playback_stream *s) {
-    pa_tagstruct *t;
-    size_t l;
-    assert(s);
-
-    if (!(l =3D pa_memblockq_missing(s->memblockq)))
-        return;
-
-    if (l <=3D s->requested_bytes)
-        return;
-
-    l -=3D s->requested_bytes;
-
-    if (l < pa_memblockq_get_minreq(s->memblockq))
-        return;
-
-    s->requested_bytes +=3D l;
-
-    t =3D pa_tagstruct_new(NULL, 0);
-    assert(t);
-    pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
-    pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
-    pa_tagstruct_putu32(t, s->index);
-    pa_tagstruct_putu32(t, l);
-    pa_pstream_send_tagstruct(s->connection->pstream, t);
-
-/*     pa_log("Requesting %u bytes", l);  */
-}
-
-static void send_memblock(struct connection *c) {
+static void request_bytes(playback_stream *s) {
+    size_t new_missing, delta, previous_missing;
+
+    playback_stream_assert_ref(s);
+
+    new_missing =3D pa_memblockq_missing(s->memblockq);
+
+    if (new_missing <=3D s->last_missing)
+        return;
+
+    delta =3D new_missing - s->last_missing;
+    s->last_missing =3D new_missing;
+
+    previous_missing =3D pa_atomic_add(&s->missing, delta);
+    if (previous_missing < pa_memblockq_get_minreq(s->memblockq) && previo=
us_missing+delta >=3D pa_memblockq_get_minreq(s->memblockq))
+        pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSG=
OBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
+}
+
+static void send_memblock(connection *c) {
     uint32_t start;
-    struct record_stream *r;
+    record_stream *r;
 =

     start =3D PA_IDXSET_INVALID;
     for (;;) {
         pa_memchunk chunk;
 =

-        if (!(r =3D pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
+        if (!(r =3D RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->=
rrobin_index))))
             return;
 =

         if (start =3D=3D PA_IDXSET_INVALID)
@@ -571,7 +767,8 @@
                 schunk.length =3D r->fragment_size;
 =

             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELA=
TIVE, &schunk);
-            pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
+            =

+            pa_memblockq_drop(r->memblockq, schunk.length);
             pa_memblock_unref(schunk.memblock);
 =

             return;
@@ -579,9 +776,9 @@
     }
 }
 =

-static void send_playback_stream_killed(struct playback_stream *p) {
+static void send_playback_stream_killed(playback_stream *p) {
     pa_tagstruct *t;
-    assert(p);
+    playback_stream_assert_ref(p);
 =

     t =3D pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
@@ -590,9 +787,9 @@
     pa_pstream_send_tagstruct(p->connection->pstream, t);
 }
 =

-static void send_record_stream_killed(struct record_stream *r) {
+static void send_record_stream_killed(record_stream *r) {
     pa_tagstruct *t;
-    assert(r);
+    record_stream_assert_ref(r);
 =

     t =3D pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
@@ -603,22 +800,123 @@
 =

 /*** sinkinput callbacks ***/
 =

+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdat=
a, int64_t offset, pa_memchunk *chunk) {
+    pa_sink_input *i =3D PA_SINK_INPUT(o);
+    playback_stream *s;
+
+    pa_sink_input_assert_ref(i);
+    s =3D PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+
+    switch (code) {
+
+        case SINK_INPUT_MESSAGE_SEEK: =

+            pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdat=
a));
+            return 0;
+
+        case SINK_INPUT_MESSAGE_POST_DATA: {
+            pa_assert(chunk);
+
+            if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+
+                pa_log_warn("Failed to push data into queue");
+                pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq=
, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
+                pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_REL=
ATIVE);
+            }
+
+            s->underrun =3D 0;
+            return 0;
+        }
+
+        case SINK_INPUT_MESSAGE_DRAIN: {
+
+            pa_memblockq_prebuf_disable(s->memblockq);
+
+            if (!pa_memblockq_is_readable(s->memblockq))
+                pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq=
, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NU=
LL);
+            else {
+                s->drain_tag =3D PA_PTR_TO_UINT(userdata);
+                s->drain_request =3D 1;
+            }
+
+            return 0;
+        }
+
+        case SINK_INPUT_MESSAGE_FLUSH:
+        case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+        case SINK_INPUT_MESSAGE_TRIGGER: {
+            =

+            pa_sink_input *isync;
+            void (*func)(pa_memblockq *bq);
+
+            switch  (code) {
+                case SINK_INPUT_MESSAGE_FLUSH:
+                    func =3D pa_memblockq_flush;
+                    break;
+                    =

+                case SINK_INPUT_MESSAGE_PREBUF_FORCE:
+                    func =3D pa_memblockq_prebuf_force;
+                    break;
+                    =

+                case SINK_INPUT_MESSAGE_TRIGGER:
+                    func =3D pa_memblockq_prebuf_disable;
+                    break;
+
+                default:
+                    pa_assert_not_reached();
+            }
+            =

+            func(s->memblockq);
+            s->underrun =3D 0;
+            request_bytes(s);
+
+            /* Do the same for all other members in the sync group */
+            for (isync =3D i->sync_prev; isync; isync =3D isync->sync_prev=
) {
+                playback_stream *ssync =3D PLAYBACK_STREAM(isync->userdata=
);
+                func(ssync->memblockq);
+                ssync->underrun =3D 0;
+                request_bytes(ssync);
+            }
+
+            for (isync =3D i->sync_next; isync; isync =3D isync->sync_next=
) {
+                playback_stream *ssync =3D PLAYBACK_STREAM(isync->userdata=
);
+                func(ssync->memblockq);
+                ssync->underrun =3D 0;
+                request_bytes(ssync);
+            }
+            =

+            return 0;
+        }
+            =

+        case PA_SINK_INPUT_MESSAGE_SET_STATE:
+
+            pa_memblockq_prebuf_force(s->memblockq);
+            break;
+
+        case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
+            pa_usec_t *r =3D userdata;
+
+            *r =3D pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq),=
 &i->sample_spec);
+
+            /* Fall through, the default handler will add in the extra
+             * latency added by the resampler */
+            break;
+        }
+    }
+
+    return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
+}
+
 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
-    struct playback_stream *s;
-    assert(i && i->userdata && chunk);
-    s =3D i->userdata;
+    playback_stream *s;
+
+    pa_sink_input_assert_ref(i);
+    s =3D PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+    pa_assert(chunk);
 =

     if (pa_memblockq_get_length(s->memblockq) <=3D 0 && !s->underrun) {
-        pa_tagstruct *t;
-
-        /* Report that we're empty */
-
-        t =3D pa_tagstruct_new(NULL, 0);
-        pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
-        pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
-        pa_tagstruct_putu32(t, s->index);
-        pa_pstream_send_tagstruct(s->connection->pstream, t);
-
+        pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSG=
OBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
         s->underrun =3D 1;
     }
 =

@@ -632,76 +930,78 @@
     return 0;
 }
 =

-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk,=
 size_t length) {
-    struct playback_stream *s;
-    assert(i && i->userdata && length);
-    s =3D i->userdata;
-
-    pa_memblockq_drop(s->memblockq, chunk, length);
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
+    playback_stream *s;
+
+    pa_sink_input_assert_ref(i);
+    s =3D PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+    pa_assert(length > 0);
+
+    pa_memblockq_drop(s->memblockq, length);
+
+    if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
+        s->drain_request =3D 0;
+        pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSG=
OBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag),=
 0, NULL, NULL);
+    }
 =

     request_bytes(s);
 =

-    if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
-        pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
-        s->drain_request =3D 0;
-    }
-
 /*     pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), =
pa_memblockq_is_readable(s->memblockq));   */
 }
 =

 static void sink_input_kill_cb(pa_sink_input *i) {
-    assert(i && i->userdata);
-    send_playback_stream_killed((struct playback_stream *) i->userdata);
-    playback_stream_free((struct playback_stream *) i->userdata);
-}
-
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
-    struct playback_stream *s;
-    assert(i && i->userdata);
-    s =3D i->userdata;
+    playback_stream *s;
+
+    pa_sink_input_assert_ref(i);
+    s =3D PLAYBACK_STREAM(i->userdata);
+    playback_stream_assert_ref(s);
+
+    send_playback_stream_killed(s);
+    playback_stream_unlink(s);
+}
+
+/*** source_output callbacks ***/
+
+static void source_output_push_cb(pa_source_output *o, const pa_memchunk *=
chunk) {
+    record_stream *s;
+
+    pa_source_output_assert_ref(o);
+    s =3D RECORD_STREAM(o->userdata);
+    record_stream_assert_ref(s);
+    pa_assert(chunk);
+
+    pa_asyncmsgq_post(s->connection->protocol->core->asyncmsgq, PA_MSGOBJE=
CT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
+}
+
+static void source_output_kill_cb(pa_source_output *o) {
+    record_stream *s;
+
+    pa_source_output_assert_ref(o);
+    s =3D RECORD_STREAM(o->userdata);
+    record_stream_assert_ref(s);
+
+    send_record_stream_killed(s);
+    record_stream_unlink(s);
+}
+
+static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
+    record_stream *s;
+
+    pa_source_output_assert_ref(o);
+    s =3D RECORD_STREAM(o->userdata);
+    record_stream_assert_ref(s);
 =

     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
 =

-    return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sin=
k_input->sample_spec);
-}
-
-/*** source_output callbacks ***/
-
-static void source_output_push_cb(pa_source_output *o, const pa_memchunk *=
chunk) {
-    struct record_stream *s;
-    assert(o && o->userdata && chunk);
-    s =3D o->userdata;
-
-    if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
-        pa_log_warn("Failed to push data into output queue.");
-        return;
-    }
-
-    if (!pa_pstream_is_pending(s->connection->pstream))
-        send_memblock(s->connection);
-}
-
-static void source_output_kill_cb(pa_source_output *o) {
-    assert(o && o->userdata);
-    send_record_stream_killed((struct record_stream *) o->userdata);
-    record_stream_free((struct record_stream *) o->userdata);
-}
-
-static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
-    struct record_stream *s;
-    assert(o && o->userdata);
-    s =3D o->userdata;
-
-    /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
-
     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sam=
ple_spec);
 }
 =

 /*** pdispatch callbacks ***/
 =

-static void protocol_error(struct connection *c) {
+static void protocol_error(connection *c) {
     pa_log("protocol error, kicking client");
-    connection_free(c);
+    connection_unlink(c);
 }
 =

 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
@@ -721,9 +1021,9 @@
 }
 =

 static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd,=
 PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userd=
ata) {
-    struct connection *c =3D userdata;
-    struct playback_stream *s;
-    uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid;
+    connection *c =3D CONNECTION(userdata);
+    playback_stream *s;
+    uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid, missi=
ng;
     const char *name, *sink_name;
     pa_sample_spec ss;
     pa_channel_map map;
@@ -731,8 +1031,9 @@
     pa_sink *sink =3D NULL;
     pa_cvolume volume;
     int corked;
-
-    assert(c && t && c->protocol && c->protocol->core);
+    =

+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_get(
             t,
@@ -773,34 +1074,33 @@
         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
     }
 =

-    s =3D playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength=
, prebuf, minreq, &volume, syncid);
+    s =3D playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tleng=
th, &prebuf, &minreq, &volume, syncid, corked, &missing);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
-
-    pa_sink_input_cork(s->sink_input, corked);
 =

     reply =3D reply_new(tag);
     pa_tagstruct_putu32(reply, s->index);
-    assert(s->sink_input);
+    pa_assert(s->sink_input);
     pa_tagstruct_putu32(reply, s->sink_input->index);
-    pa_tagstruct_putu32(reply, s->requested_bytes =3D pa_memblockq_missing=
(s->memblockq));
+    pa_tagstruct_putu32(reply, missing);
 =

     if (c->version >=3D 9) {
         /* Since 0.9 we support sending the buffer metrics back to the cli=
ent */
 =

-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_maxlength(s=
->memblockq));
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_tlength(s->=
memblockq));
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_prebuf(s->m=
emblockq));
-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_minreq(s->m=
emblockq));
+        pa_tagstruct_putu32(reply, (uint32_t) maxlength);
+        pa_tagstruct_putu32(reply, (uint32_t) tlength);
+        pa_tagstruct_putu32(reply, (uint32_t) prebuf);
+        pa_tagstruct_putu32(reply, (uint32_t) minreq);
     }
 =

     pa_pstream_send_tagstruct(c->pstream, reply);
-    request_bytes(s);
 }
 =

 static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t=
 command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t channel;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &channel) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -810,39 +1110,52 @@
 =

     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 =

-    if (command =3D=3D PA_COMMAND_DELETE_PLAYBACK_STREAM) {
-        struct playback_stream *s;
-        if (!(s =3D pa_idxset_get_by_index(c->output_streams, channel)) ||=
 (s->type !=3D PLAYBACK_STREAM)) {
-            pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
-            return;
+    switch (command) {
+        =

+        case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
+            playback_stream *s;
+            if (!(s =3D pa_idxset_get_by_index(c->output_streams, channel)=
) || !playback_stream_isinstance(s)) {
+                pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+                return;
+            }
+            =

+            playback_stream_unlink(s);
+            break;
         }
-
-        playback_stream_free(s);
-    } else if (command =3D=3D PA_COMMAND_DELETE_RECORD_STREAM) {
-        struct record_stream *s;
-        if (!(s =3D pa_idxset_get_by_index(c->record_streams, channel))) {
-            pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
-            return;
+            =

+        case PA_COMMAND_DELETE_RECORD_STREAM: {
+            record_stream *s;
+            if (!(s =3D pa_idxset_get_by_index(c->record_streams, channel)=
)) {
+                pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+                return;
+            }
+            =

+            record_stream_unlink(s);
+            break;
         }
 =

-        record_stream_free(s);
-    } else {
-        struct upload_stream *s;
-        assert(command =3D=3D PA_COMMAND_DELETE_UPLOAD_STREAM);
-        if (!(s =3D pa_idxset_get_by_index(c->output_streams, channel)) ||=
 (s->type !=3D UPLOAD_STREAM)) {
-            pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
-            return;
+        case PA_COMMAND_DELETE_UPLOAD_STREAM: {
+            upload_stream *s;
+
+            if (!(s =3D pa_idxset_get_by_index(c->output_streams, channel)=
) || !upload_stream_isinstance(s)) {
+                pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
+                return;
+            }
+            =

+            upload_stream_unlink(s);
+            break;
         }
 =

-        upload_stream_free(s);
+        default:
+            pa_assert_not_reached();
     }
 =

     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 =

 static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, P=
A_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdat=
a) {
-    struct connection *c =3D userdata;
-    struct record_stream *s;
+    connection *c =3D CONNECTION(userdata);
+    record_stream *s;
     uint32_t maxlength, fragment_size;
     uint32_t source_index;
     const char *name, *source_name;
@@ -851,7 +1164,9 @@
     pa_tagstruct *reply;
     pa_source *source =3D NULL;
     int corked;
-    assert(c && t && c->protocol && c->protocol->core);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
@@ -882,20 +1197,18 @@
         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
     }
 =

-    s =3D record_stream_new(c, source, &ss, &map, name, maxlength, fragmen=
t_size);
+    s =3D record_stream_new(c, source, &ss, &map, name, &maxlength, fragme=
nt_size, corked);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
-
-    pa_source_output_cork(s->source_output, corked);
 =

     reply =3D reply_new(tag);
     pa_tagstruct_putu32(reply, s->index);
-    assert(s->source_output);
+    pa_assert(s->source_output);
     pa_tagstruct_putu32(reply, s->source_output->index);
 =

     if (c->version >=3D 9) {
         /* Since 0.9 we support sending the buffer metrics back to the cli=
ent */
 =

-        pa_tagstruct_putu32(reply, (uint32_t) pa_memblockq_get_maxlength(s=
->memblockq));
+        pa_tagstruct_putu32(reply, (uint32_t) maxlength);
         pa_tagstruct_putu32(reply, (uint32_t) s->fragment_size);
     }
 =

@@ -903,9 +1216,11 @@
 }
 =

 static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uin=
t32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(c && t);
-
+    connection *c =3D CONNECTION(userdata);
+
+    connection_assert_ref(c);
+    pa_assert(t);
+    =

     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
         return;
@@ -913,16 +1228,17 @@
 =

     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 =

-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop=
);
     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
 }
 =

 static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uin=
t32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const void*cookie;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < =
0 ||
@@ -1015,9 +1331,11 @@
 }
 =

 static void command_set_client_name(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC=
_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const char *name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1032,10 +1350,12 @@
 }
 =

 static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comman=
d, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const char *name;
     uint32_t idx =3D PA_IDXSET_INVALID;
-    assert(c && t);
+    =

+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1052,7 +1372,7 @@
             idx =3D sink->index;
     } else {
         pa_source *source;
-        assert(command =3D=3D PA_COMMAND_LOOKUP_SOURCE);
+        pa_assert(command =3D=3D PA_COMMAND_LOOKUP_SOURCE);
         if ((source =3D pa_namereg_get(c->protocol->core, name, PA_NAMEREG=
_SOURCE, 1)))
             idx =3D source->index;
     }
@@ -1068,10 +1388,12 @@
 }
 =

 static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, =
PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userda=
ta) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
-    struct playback_stream *s;
-    assert(c && t);
+    playback_stream *s;
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1082,29 +1404,18 @@
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
     s =3D pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type =3D=3D PLAYBACK_STREAM, tag, PA_ERR=
_NOENTITY);
-
-    s->drain_request =3D 0;
-
-    pa_memblockq_prebuf_disable(s->memblockq);
-
-    if (!pa_memblockq_is_readable(s->memblockq)) {
-/*         pa_log("immediate drain: %u", pa_memblockq_get_length(s->memblo=
ckq));  */
-        pa_pstream_send_simple_ack(c->pstream, tag);
-    } else {
-/*         pa_log("slow drain triggered");  */
-        s->drain_request =3D 1;
-        s->drain_tag =3D tag;
-
-        pa_sink_notify(s->sink_input->sink);
-    }
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_=
NOENTITY);
+
+    pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink=
_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
 }
 =

 static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uin=
t32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_tagstruct *reply;
     const pa_mempool_stat *stat;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1125,13 +1436,15 @@
 }
 =

 static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, P=
A_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdat=
a) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_tagstruct *reply;
-    struct playback_stream *s;
+    playback_stream *s;
     struct timeval tv, now;
     uint32_t idx;
     pa_usec_t latency;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_timeval(t, &tv) < 0 ||
@@ -1143,13 +1456,13 @@
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
     s =3D pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type =3D=3D PLAYBACK_STREAM, tag, PA_ERR=
_NOENTITY);
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_=
NOENTITY);
 =

     reply =3D reply_new(tag);
 =

     latency =3D pa_sink_get_latency(s->sink_input->sink);
-    if (s->sink_input->resampled_chunk.memblock)
-        latency +=3D pa_bytes_to_usec(s->sink_input->resampled_chunk.lengt=
h, &s->sink_input->sample_spec);
+/*     if (s->sink_input->resampled_chunk.memblock) */  /* FIXME*/ =

+/*         latency +=3D pa_bytes_to_usec(s->sink_input->resampled_chunk.le=
ngth, &s->sink_input->sample_spec); */
     pa_tagstruct_put_usec(reply, latency);
 =

     pa_tagstruct_put_usec(reply, 0);
@@ -1162,12 +1475,14 @@
 }
 =

 static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_=
GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)=
 {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_tagstruct *reply;
-    struct record_stream *s;
+    record_stream *s;
     struct timeval tv, now;
     uint32_t idx;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_timeval(t, &tv) < 0 ||
@@ -1192,14 +1507,16 @@
 }
 =

 static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, P=
A_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdat=
a) {
-    struct connection *c =3D userdata;
-    struct upload_stream *s;
+    connection *c =3D CONNECTION(userdata);
+    upload_stream *s;
     uint32_t length;
     const char *name;
     pa_sample_spec ss;
     pa_channel_map map;
     pa_tagstruct *reply;
-    assert(c && t && c->protocol && c->protocol->core);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
@@ -1228,11 +1545,13 @@
 }
 =

 static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, P=
A_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdat=
a) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t channel;
-    struct upload_stream *s;
+    upload_stream *s;
     uint32_t idx;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &channel) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1244,23 +1563,25 @@
 =

     s =3D pa_idxset_get_by_index(c->output_streams, channel);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type =3D=3D UPLOAD_STREAM, tag, PA_ERR_N=
OENTITY);
+    CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NO=
ENTITY);
 =

     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s=
->channel_map, &s->memchunk, &idx) < 0)
         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
     else
         pa_pstream_send_simple_ack(c->pstream, tag);
 =

-    upload_stream_free(s);
+    upload_stream_unlink(s);
 }
 =

 static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU=
SED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t sink_index;
     pa_volume_t volume;
     pa_sink *sink;
     const char *name, *sink_name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
         pa_tagstruct_gets(t, &sink_name) < 0 ||
@@ -1291,9 +1612,11 @@
 }
 =

 static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U=
NUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const char *name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1313,7 +1636,9 @@
 }
 =

 static void sink_fill_tagstruct(pa_tagstruct *t, pa_sink *sink) {
-    assert(t && sink);
+    pa_assert(t);
+    pa_sink_assert_ref(sink);
+
     pa_tagstruct_put(
         t,
         PA_TAG_U32, sink->index,
@@ -1321,22 +1646,24 @@
         PA_TAG_STRING, sink->description,
         PA_TAG_SAMPLE_SPEC, &sink->sample_spec,
         PA_TAG_CHANNEL_MAP, &sink->channel_map,
-        PA_TAG_U32, sink->owner ? sink->owner->index : PA_INVALID_INDEX,
-        PA_TAG_CVOLUME, pa_sink_get_volume(sink, PA_MIXER_HARDWARE),
-        PA_TAG_BOOLEAN, pa_sink_get_mute(sink, PA_MIXER_HARDWARE),
+        PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
+        PA_TAG_CVOLUME, pa_sink_get_volume(sink),
+        PA_TAG_BOOLEAN, pa_sink_get_mute(sink),
         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : P=
A_INVALID_INDEX,
         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name :=
 NULL,
         PA_TAG_USEC, pa_sink_get_latency(sink),
         PA_TAG_STRING, sink->driver,
         PA_TAG_U32,
-        (sink->get_hw_volume ? PA_SINK_HW_VOLUME_CTRL : 0) |
-        (sink->get_latency ? PA_SINK_LATENCY : 0) |
+        (sink->get_volume ? PA_SINK_HW_VOLUME_CTRL : 0) |  /* FIXME */
+        (sink->get_latency ? PA_SINK_LATENCY : 0) |        /* FIXME */ =

         (sink->is_hardware ? PA_SINK_HARDWARE : 0),
         PA_TAG_INVALID);
 }
 =

 static void source_fill_tagstruct(pa_tagstruct *t, pa_source *source) {
-    assert(t && source);
+    pa_assert(t);
+    pa_source_assert_ref(source);
+
     pa_tagstruct_put(
         t,
         PA_TAG_U32, source->index,
@@ -1344,22 +1671,24 @@
         PA_TAG_STRING, source->description,
         PA_TAG_SAMPLE_SPEC, &source->sample_spec,
         PA_TAG_CHANNEL_MAP, &source->channel_map,
-        PA_TAG_U32, source->owner ? source->owner->index : PA_INVALID_INDE=
X,
-        PA_TAG_CVOLUME, pa_source_get_volume(source, PA_MIXER_HARDWARE),
-        PA_TAG_BOOLEAN, pa_source_get_mute(source, PA_MIXER_HARDWARE),
+        PA_TAG_U32, source->module ? source->module->index : PA_INVALID_IN=
DEX,
+        PA_TAG_CVOLUME, pa_source_get_volume(source),
+        PA_TAG_BOOLEAN, pa_source_get_mute(source),
         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_IN=
VALID_INDEX,
         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NUL=
L,
         PA_TAG_USEC, pa_source_get_latency(source),
         PA_TAG_STRING, source->driver,
         PA_TAG_U32,
-        (source->get_hw_volume ? PA_SOURCE_HW_VOLUME_CTRL : 0) |
-        (source->get_latency ? PA_SOURCE_LATENCY : 0) |
+        (source->get_volume ? PA_SOURCE_HW_VOLUME_CTRL : 0) |     /* FIXME=
 */
+        (source->get_latency ? PA_SOURCE_LATENCY : 0) |              /* FI=
XME */
         (source->is_hardware ? PA_SOURCE_HARDWARE : 0),
         PA_TAG_INVALID);
 }
 =

 static void client_fill_tagstruct(pa_tagstruct *t, pa_client *client) {
-    assert(t && client);
+    pa_assert(t);
+    pa_assert(client);
+
     pa_tagstruct_putu32(t, client->index);
     pa_tagstruct_puts(t, client->name);
     pa_tagstruct_putu32(t, client->owner ? client->owner->index : PA_INVAL=
ID_INDEX);
@@ -1367,7 +1696,9 @@
 }
 =

 static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) {
-    assert(t && module);
+    pa_assert(t);
+    pa_assert(module);
+
     pa_tagstruct_putu32(t, module->index);
     pa_tagstruct_puts(t, module->name);
     pa_tagstruct_puts(t, module->argument);
@@ -1376,7 +1707,9 @@
 }
 =

 static void sink_input_fill_tagstruct(pa_tagstruct *t, pa_sink_input *s) {
-    assert(t && s);
+    pa_assert(t);
+    pa_sink_input_assert_ref(s);
+
     pa_tagstruct_putu32(t, s->index);
     pa_tagstruct_puts(t, s->name);
     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX=
);
@@ -1392,7 +1725,9 @@
 }
 =

 static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output=
 *s) {
-    assert(t && s);
+    pa_assert(t);
+    pa_source_output_assert_ref(s);
+
     pa_tagstruct_putu32(t, s->index);
     pa_tagstruct_puts(t, s->name);
     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX=
);
@@ -1407,7 +1742,9 @@
 }
 =

 static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) {
-    assert(t && e);
+    pa_assert(t);
+    pa_assert(e);
+    =

     pa_tagstruct_putu32(t, e->index);
     pa_tagstruct_puts(t, e->name);
     pa_tagstruct_put_cvolume(t, &e->volume);
@@ -1420,7 +1757,7 @@
 }
 =

 static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comm=
and, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
     pa_sink *sink =3D NULL;
     pa_source *source =3D NULL;
@@ -1431,7 +1768,9 @@
     pa_scache_entry *sce =3D NULL;
     const char *name;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         (command !=3D PA_COMMAND_GET_CLIENT_INFO &&
@@ -1466,7 +1805,7 @@
     else if (command =3D=3D PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
         so =3D pa_idxset_get_by_index(c->protocol->core->source_outputs, i=
dx);
     else {
-        assert(command =3D=3D PA_COMMAND_GET_SAMPLE_INFO);
+        pa_assert(command =3D=3D PA_COMMAND_GET_SAMPLE_INFO);
         if (idx !=3D PA_INVALID_INDEX)
             sce =3D pa_idxset_get_by_index(c->protocol->core->scache, idx);
         else
@@ -1497,12 +1836,14 @@
 }
 =

 static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t=
 command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_idxset *i;
     uint32_t idx;
     void *p;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1526,7 +1867,7 @@
     else if (command =3D=3D PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
         i =3D c->protocol->core->source_outputs;
     else {
-        assert(command =3D=3D PA_COMMAND_GET_SAMPLE_INFO_LIST);
+        pa_assert(command =3D=3D PA_COMMAND_GET_SAMPLE_INFO_LIST);
         i =3D c->protocol->core->scache;
     }
 =

@@ -1545,7 +1886,7 @@
             else if (command =3D=3D PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
                 source_output_fill_tagstruct(reply, p);
             else {
-                assert(command =3D=3D PA_COMMAND_GET_SAMPLE_INFO_LIST);
+                pa_assert(command =3D=3D PA_COMMAND_GET_SAMPLE_INFO_LIST);
                 scache_fill_tagstruct(reply, p);
             }
         }
@@ -1555,11 +1896,13 @@
 }
 =

 static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC=
_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_tagstruct *reply;
     char txt[256];
     const char *n;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1587,8 +1930,10 @@
 =

 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e,=
 uint32_t idx, void *userdata) {
     pa_tagstruct *t;
-    struct connection *c =3D userdata;
-    assert(c && core);
+    connection *c =3D CONNECTION(userdata);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     t =3D pa_tagstruct_new(NULL, 0);
     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
@@ -1599,9 +1944,11 @@
 }
 =

 static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE=
D uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_subscription_mask_t m;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &m) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1617,7 +1964,7 @@
 =

     if (m !=3D 0) {
         c->subscription =3D pa_subscription_new(c->protocol->core, m, subs=
cription_cb, c);
-        assert(c->subscription);
+        pa_assert(c->subscription);
     } else
         c->subscription =3D NULL;
 =

@@ -1631,14 +1978,16 @@
         pa_tagstruct *t,
         void *userdata) {
 =

-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
     pa_cvolume volume;
     pa_sink *sink =3D NULL;
     pa_source *source =3D NULL;
     pa_sink_input *si =3D NULL;
     const char *name =3D NULL;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         (command =3D=3D PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t,=
 &name) < 0) ||
@@ -1653,27 +2002,36 @@
     CHECK_VALIDITY(c->pstream, idx !=3D PA_INVALID_INDEX || !name || (*nam=
e && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVA=
LID);
 =

-    if (command =3D=3D PA_COMMAND_SET_SINK_VOLUME) {
-        if (idx !=3D PA_INVALID_INDEX)
-            sink =3D pa_idxset_get_by_index(c->protocol->core->sinks, idx);
-        else
-            sink =3D pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SI=
NK, 1);
-    } else if (command =3D=3D PA_COMMAND_SET_SOURCE_VOLUME) {
-        if (idx !=3D (uint32_t) -1)
-            source =3D pa_idxset_get_by_index(c->protocol->core->sources, =
idx);
-        else
-            source =3D pa_namereg_get(c->protocol->core, name, PA_NAMEREG_=
SOURCE, 1);
-    }  else {
-        assert(command =3D=3D PA_COMMAND_SET_SINK_INPUT_VOLUME);
-        si =3D pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
+    switch (command) {
+        =

+        case PA_COMMAND_SET_SINK_VOLUME:
+            if (idx !=3D PA_INVALID_INDEX)
+                sink =3D pa_idxset_get_by_index(c->protocol->core->sinks, =
idx);
+            else
+                sink =3D pa_namereg_get(c->protocol->core, name, PA_NAMERE=
G_SINK, 1);
+            break;
+
+        case PA_COMMAND_SET_SOURCE_VOLUME:
+            if (idx !=3D PA_INVALID_INDEX)
+                source =3D pa_idxset_get_by_index(c->protocol->core->sourc=
es, idx);
+            else
+                source =3D pa_namereg_get(c->protocol->core, name, PA_NAME=
REG_SOURCE, 1);
+            break;
+            =

+        case PA_COMMAND_SET_SINK_INPUT_VOLUME:
+            si =3D pa_idxset_get_by_index(c->protocol->core->sink_inputs, =
idx);
+            break;
+
+        default:
+            pa_assert_not_reached();
     }
 =

     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
 =

     if (sink)
-        pa_sink_set_volume(sink, PA_MIXER_HARDWARE, &volume);
+        pa_sink_set_volume(sink, &volume);
     else if (source)
-        pa_source_set_volume(source, PA_MIXER_HARDWARE, &volume);
+        pa_source_set_volume(source, &volume);
     else if (si)
         pa_sink_input_set_volume(si, &volume);
 =

@@ -1687,16 +2045,20 @@
         pa_tagstruct *t,
         void *userdata) {
 =

-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
     int mute;
     pa_sink *sink =3D NULL;
     pa_source *source =3D NULL;
+    pa_sink_input *si =3D NULL;
     const char *name =3D NULL;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
-        pa_tagstruct_gets(t, &name) < 0 ||
+        (command =3D=3D PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &=
name) < 0) ||
+        (command =3D=3D PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t,=
 &name) < 0) ||
         pa_tagstruct_get_boolean(t, &mute) ||
         !pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1706,35 +2068,53 @@
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
     CHECK_VALIDITY(c->pstream, idx !=3D PA_INVALID_INDEX || !name || (*nam=
e && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
 =

-    if (command =3D=3D PA_COMMAND_SET_SINK_MUTE) {
-        if (idx !=3D PA_INVALID_INDEX)
-            sink =3D pa_idxset_get_by_index(c->protocol->core->sinks, idx);
-        else
-            sink =3D pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SI=
NK, 1);
-    } else {
-        assert(command =3D=3D PA_COMMAND_SET_SOURCE_MUTE);
-        if (idx !=3D (uint32_t) -1)
-            source =3D pa_idxset_get_by_index(c->protocol->core->sources, =
idx);
-        else
-            source =3D pa_namereg_get(c->protocol->core, name, PA_NAMEREG_=
SOURCE, 1);
-    }
-
-    CHECK_VALIDITY(c->pstream, sink || source, tag, PA_ERR_NOENTITY);
+    switch (command) {
+        =

+        case PA_COMMAND_SET_SINK_MUTE:
+
+            if (idx !=3D PA_INVALID_INDEX)
+                sink =3D pa_idxset_get_by_index(c->protocol->core->sinks, =
idx);
+            else
+                sink =3D pa_namereg_get(c->protocol->core, name, PA_NAMERE=
G_SINK, 1);
+
+            break;
+
+        case PA_COMMAND_SET_SOURCE_MUTE:
+            if (idx !=3D PA_INVALID_INDEX)
+                source =3D pa_idxset_get_by_index(c->protocol->core->sourc=
es, idx);
+            else
+                source =3D pa_namereg_get(c->protocol->core, name, PA_NAME=
REG_SOURCE, 1);
+
+            break;
+
+        case PA_COMMAND_SET_SINK_INPUT_MUTE:
+            si =3D pa_idxset_get_by_index(c->protocol->core->sink_inputs, =
idx);
+            break;
+
+        default:
+            pa_assert_not_reached();
+    }
+
+    CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
 =

     if (sink)
-        pa_sink_set_mute(sink, PA_MIXER_HARDWARE, mute);
+        pa_sink_set_mute(sink, mute);
     else if (source)
-        pa_source_set_mute(source, PA_MIXER_HARDWARE, mute);
+        pa_source_set_mute(source, mute);
+    else if (si)
+        pa_sink_input_set_mute(si, mute);
 =

     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 =

 static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, P=
A_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdat=
a) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
     int b;
-    struct playback_stream *s, *ssync;
-    assert(c && t);
+    playback_stream *s;
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_boolean(t, &b) < 0 ||
@@ -1747,30 +2127,19 @@
     CHECK_VALIDITY(c->pstream, idx !=3D PA_INVALID_INDEX, tag, PA_ERR_INVA=
LID);
     s =3D pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type =3D=3D PLAYBACK_STREAM, tag, PA_ERR=
_NOENTITY);
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_=
NOENTITY);
 =

     pa_sink_input_cork(s->sink_input, b);
-    pa_memblockq_prebuf_force(s->memblockq);
-
-    /* Do the same for all other members in the sync group */
-    for (ssync =3D s->prev; ssync; ssync =3D ssync->prev) {
-        pa_sink_input_cork(ssync->sink_input, b);
-        pa_memblockq_prebuf_force(ssync->memblockq);
-    }
-
-    for (ssync =3D s->next; ssync; ssync =3D ssync->next) {
-        pa_sink_input_cork(ssync->sink_input, b);
-        pa_memblockq_prebuf_force(ssync->memblockq);
-    }
-
     pa_pstream_send_simple_ack(c->pstream, tag);
 }
 =

-static void command_flush_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, =
PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userda=
ta) {
-    struct connection *c =3D userdata;
+static void command_trigger_or_flush_or_prebuf_playback_stream(PA_GCC_UNUS=
ED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagst=
ruct *t, void *userdata) {
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
-    struct playback_stream *s, *ssync;
-    assert(c && t);
+    playback_stream *s;
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1782,75 +2151,36 @@
     CHECK_VALIDITY(c->pstream, idx !=3D PA_INVALID_INDEX, tag, PA_ERR_INVA=
LID);
     s =3D pa_idxset_get_by_index(c->output_streams, idx);
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type =3D=3D PLAYBACK_STREAM, tag, PA_ERR=
_NOENTITY);
-
-    pa_memblockq_flush(s->memblockq);
-    s->underrun =3D 0;
-
-    /* Do the same for all other members in the sync group */
-    for (ssync =3D s->prev; ssync; ssync =3D ssync->prev) {
-        pa_memblockq_flush(ssync->memblockq);
-        ssync->underrun =3D 0;
-    }
-
-    for (ssync =3D s->next; ssync; ssync =3D ssync->next) {
-        pa_memblockq_flush(ssync->memblockq);
-        ssync->underrun =3D 0;
+    CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_=
NOENTITY);
+
+    switch (command) {
+        case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
+            pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT=
(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
+            break;
+            =

+        case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
+            pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT=
(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
+            break;
+
+        case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
+            pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT=
(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
+            break;
+
+        default:
+            pa_assert_not_reached();
     }
 =

     pa_pstream_send_simple_ack(c->pstream, tag);
-    pa_sink_notify(s->sink_input->sink);
-    request_bytes(s);
-
-    for (ssync =3D s->prev; ssync; ssync =3D ssync->prev)
-        request_bytes(ssync);
-
-    for (ssync =3D s->next; ssync; ssync =3D ssync->next)
-        request_bytes(ssync);
-}
-
-static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdi=
spatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, =
void *userdata) {
-    struct connection *c =3D userdata;
+}
+
+static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_=
GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)=
 {
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
-    struct playback_stream *s;
-    assert(c && t);
-
-    if (pa_tagstruct_getu32(t, &idx) < 0 ||
-        !pa_tagstruct_eof(t)) {
-        protocol_error(c);
-        return;
-    }
-
-    CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
-    CHECK_VALIDITY(c->pstream, idx !=3D PA_INVALID_INDEX, tag, PA_ERR_INVA=
LID);
-    s =3D pa_idxset_get_by_index(c->output_streams, idx);
-    CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-    CHECK_VALIDITY(c->pstream, s->type =3D=3D PLAYBACK_STREAM, tag, PA_ERR=
_NOENTITY);
-
-    switch (command) {
-        case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
-            pa_memblockq_prebuf_force(s->memblockq);
-            break;
-
-        case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
-            pa_memblockq_prebuf_disable(s->memblockq);
-            break;
-
-        default:
-            abort();
-    }
-
-    pa_sink_notify(s->sink_input->sink);
-    pa_pstream_send_simple_ack(c->pstream, tag);
-    request_bytes(s);
-}
-
-static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_=
GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)=
 {
-    struct connection *c =3D userdata;
-    uint32_t idx;
-    struct record_stream *s;
+    record_stream *s;
     int b;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_get_boolean(t, &b) < 0 ||
@@ -1869,11 +2199,13 @@
 }
 =

 static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA=
_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata=
) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
-    struct record_stream *s;
-    assert(c && t);
-
+    record_stream *s;
+
+    connection_assert_ref(c);
+    pa_assert(t);
+    =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -1889,9 +2221,11 @@
 }
 =

 static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch =
*pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const char *s;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &s) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1907,10 +2241,12 @@
 }
 =

 static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32=
_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
     const char *name;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_gets(t, &name) < 0 ||
@@ -1923,16 +2259,16 @@
     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_IN=
VALID);
 =

     if (command =3D=3D PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
-        struct playback_stream *s;
+        playback_stream *s;
 =

         s =3D pa_idxset_get_by_index(c->output_streams, idx);
         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
-        CHECK_VALIDITY(c->pstream, s->type =3D=3D PLAYBACK_STREAM, tag, PA=
_ERR_NOENTITY);
+        CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_=
ERR_NOENTITY);
 =

         pa_sink_input_set_name(s->sink_input, name);
 =

     } else {
-        struct record_stream *s;
+        record_stream *s;
 =

         s =3D pa_idxset_get_by_index(c->record_streams, idx);
         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
@@ -1944,9 +2280,11 @@
 }
 =

 static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command,=
 uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -1973,7 +2311,7 @@
     } else {
         pa_source_output *s;
 =

-        assert(command =3D=3D PA_COMMAND_KILL_SOURCE_OUTPUT);
+        pa_assert(command =3D=3D PA_COMMAND_KILL_SOURCE_OUTPUT);
 =

         s =3D pa_idxset_get_by_index(c->protocol->core->source_outputs, id=
x);
         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
@@ -1985,12 +2323,14 @@
 }
 =

 static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU=
SED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_module *m;
     const char *name, *argument;
     pa_tagstruct *reply;
-    assert(c && t);
-
+
+    connection_assert_ref(c);
+    pa_assert(t);
+    =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_gets(t, &argument) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -2013,10 +2353,12 @@
 }
 =

 static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U=
NUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx;
     pa_module *m;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -2033,12 +2375,14 @@
 }
 =

 static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN=
USED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const char *name, *module, *argument;
     uint32_t type;
     uint32_t idx;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_getu32(t, &type) < 0 ||
@@ -2066,11 +2410,13 @@
 }
 =

 static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC=
_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const char *name =3D NULL;
     uint32_t type, idx =3D PA_IDXSET_INVALID;
     int r;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if ((pa_tagstruct_getu32(t, &idx) < 0 &&
         (pa_tagstruct_gets(t, &name) < 0 ||
@@ -2095,7 +2441,7 @@
 }
 =

 static void autoload_fill_tagstruct(pa_tagstruct *t, const pa_autoload_ent=
ry *e) {
-    assert(t && e);
+    pa_assert(t && e);
 =

     pa_tagstruct_putu32(t, e->index);
     pa_tagstruct_puts(t, e->name);
@@ -2105,12 +2451,14 @@
 }
 =

 static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_G=
CC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     const pa_autoload_entry *a =3D NULL;
     uint32_t type, idx;
     const char *name;
     pa_tagstruct *reply;
-    assert(c && t);
+
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if ((pa_tagstruct_getu32(t, &idx) < 0 &&
         (pa_tagstruct_gets(t, &name) < 0 ||
@@ -2137,9 +2485,11 @@
 }
 =

 static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd,=
 PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userd=
ata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     pa_tagstruct *reply;
-    assert(c && t);
+    =

+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -2162,12 +2512,12 @@
 }
 =

 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32=
_t tag, pa_tagstruct *t, void *userdata) {
-    struct connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
     uint32_t idx =3D PA_INVALID_INDEX, idx_device =3D PA_INVALID_INDEX;
     const char *name =3D NULL;
 =

-    assert(c);
-    assert(t);
+    connection_assert_ref(c);
+    pa_assert(t);
 =

     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         pa_tagstruct_getu32(t, &idx_device) < 0 ||
@@ -2218,68 +2568,48 @@
     }
 =

     pa_pstream_send_simple_ack(c->pstream, tag);
-
 }
 =

 /*** pstream callbacks ***/
 =

 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, cons=
t pa_creds *creds, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(p && packet && packet->data && c);
+    connection *c =3D CONNECTION(userdata);
+
+    pa_assert(p);
+    pa_assert(packet);
+    connection_assert_ref(c);
 =

     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
         pa_log("invalid packet.");
-        connection_free(c);
+        connection_unlink(c);
     }
 }
 =

 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int=
64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata)=
 {
-    struct connection *c =3D userdata;
-    struct output_stream *stream;
-    assert(p && chunk && userdata);
-
-    if (!(stream =3D pa_idxset_get_by_index(c->output_streams, channel))) {
+    connection *c =3D CONNECTION(userdata);
+    output_stream *stream;
+    =

+    pa_assert(p);
+    pa_assert(chunk);
+    connection_assert_ref(c);
+
+    if (!(stream =3D OUTPUT_STREAM(pa_idxset_get_by_index(c->output_stream=
s, channel)))) {
         pa_log("client sent block for invalid stream.");
         /* Ignoring */
         return;
     }
 =

-    if (stream->type =3D=3D PLAYBACK_STREAM) {
-        struct playback_stream *ps =3D (struct playback_stream*) stream;
-        if (chunk->length >=3D ps->requested_bytes)
-            ps->requested_bytes =3D 0;
-        else
-            ps->requested_bytes -=3D chunk->length;
-
-        pa_memblockq_seek(ps->memblockq, offset, seek);
-
-        if (pa_memblockq_push_align(ps->memblockq, chunk) < 0) {
-            pa_tagstruct *t;
-
-            pa_log_warn("failed to push data into queue");
-
-            /* Pushing this block into the queue failed, so we simulate
-             * it by skipping ahead */
-
-            pa_memblockq_seek(ps->memblockq, chunk->length, PA_SEEK_RELATI=
VE);
-
-            /* Notify the user */
-            t =3D pa_tagstruct_new(NULL, 0);
-            pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
-            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
-            pa_tagstruct_putu32(t, ps->index);
-            pa_pstream_send_tagstruct(p, t);
-        }
-
-        ps->underrun =3D 0;
-
-        pa_sink_notify(ps->sink_input->sink);
-
+    if (playback_stream_isinstance(stream)) {
+        playback_stream *ps =3D PLAYBACK_STREAM(stream);
+        =

+        if (seek !=3D PA_SEEK_RELATIVE || offset !=3D 0)
+            pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJEC=
T(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, N=
ULL, NULL);
+        =

+        pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps=
->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
+        =

     } else {
-        struct upload_stream *u =3D (struct upload_stream*) stream;
+        upload_stream *u =3D UPLOAD_STREAM(stream);
         size_t l;
-
-        assert(u->type =3D=3D UPLOAD_STREAM);
 =

         if (!u->memchunk.memblock) {
             if (u->length =3D=3D chunk->length) {
@@ -2292,7 +2622,7 @@
             }
         }
 =

-        assert(u->memchunk.memblock);
+        pa_assert(u->memchunk.memblock);
 =

         l =3D u->length;
         if (l > chunk->length)
@@ -2317,17 +2647,21 @@
 }
 =

 static void pstream_die_callback(pa_pstream *p, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(p && c);
-    connection_free(c);
-
+    connection *c =3D CONNECTION(userdata);
+    =

+    pa_assert(p);
+    connection_assert_ref(c);
+
+    connection_unlink(c);
 /*    pa_log("connection died.");*/
 }
 =

 =

 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(p && c);
+    connection *c =3D CONNECTION(userdata);
+
+    pa_assert(p);
+    connection_assert_ref(c);
 =

     send_memblock(c);
 }
@@ -2335,25 +2669,32 @@
 /*** client callbacks ***/
 =

 static void client_kill_cb(pa_client *c) {
-    assert(c && c->userdata);
-    connection_free(c->userdata);
+    pa_assert(c);
+    =

+    connection_unlink(CONNECTION(c->userdata));
 }
 =

 /*** socket server callbacks ***/
 =

 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct=
 timeval *tv, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(m && tv && c && c->auth_timeout_event =3D=3D e);
+    connection *c =3D CONNECTION(userdata);
+    =

+    pa_assert(m);
+    pa_assert(tv);
+    connection_assert_ref(c);
+    pa_assert(c->auth_timeout_event =3D=3D e);
 =

     if (!c->authorized)
-        connection_free(c);
+        connection_unlink(c);
 }
 =

 static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *=
io, void *userdata) {
     pa_protocol_native *p =3D userdata;
-    struct connection *c;
+    connection *c;
     char cname[256], pname[128];
-    assert(io && p);
+    =

+    pa_assert(io);
+    pa_assert(p);
 =

     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log_warn("Warning! Too many connections (%u), dropping incoming=
 connection.", MAX_CONNECTIONS);
@@ -2361,7 +2702,8 @@
         return;
     }
 =

-    c =3D pa_xmalloc(sizeof(struct connection));
+    c =3D pa_msgobject_new(connection);
+    c->parent.parent.free =3D connection_free;
 =

     c->authorized =3D !!p->public;
 =

@@ -2382,15 +2724,15 @@
     c->protocol =3D p;
     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
     pa_snprintf(cname, sizeof(cname), "Native client (%s)", pname);
-    assert(p->core);
+    pa_assert(p->core);
     c->client =3D pa_client_new(p->core, __FILE__, cname);
-    assert(c->client);
+    pa_assert(c->client);
     c->client->kill =3D client_kill_cb;
     c->client->userdata =3D c;
     c->client->owner =3D p->module;
 =

     c->pstream =3D pa_pstream_new(p->core->mainloop, io, p->core->mempool);
-    assert(c->pstream);
+    pa_assert(c->pstream);
 =

     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_call=
back, c);
     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_=
callback, c);
@@ -2398,11 +2740,11 @@
     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
 =

     c->pdispatch =3D pa_pdispatch_new(p->core->mainloop, command_table, PA=
_COMMAND_MAX);
-    assert(c->pdispatch);
+    pa_assert(c->pdispatch);
 =

     c->record_streams =3D pa_idxset_new(NULL, NULL);
     c->output_streams =3D pa_idxset_new(NULL, NULL);
-    assert(c->record_streams && c->output_streams);
+    pa_assert(c->record_streams && c->output_streams);
 =

     c->rrobin_index =3D PA_IDXSET_INVALID;
     c->subscription =3D NULL;
@@ -2420,7 +2762,7 @@
 /*** module entry points ***/
 =

 static int load_key(pa_protocol_native*p, const char*fn) {
-    assert(p);
+    pa_assert(p);
 =

     p->auth_cookie_in_property =3D 0;
 =

@@ -2450,8 +2792,8 @@
     int public =3D 0;
     const char *acl;
 =

-    assert(c);
-    assert(ma);
+    pa_assert(c);
+    pa_assert(ma);
 =

     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &public) < 0) {
         pa_log("auth-anonymous=3D expects a boolean argument.");
@@ -2492,7 +2834,7 @@
         goto fail;
 =

     p->connections =3D pa_idxset_new(NULL, NULL);
-    assert(p->connections);
+    pa_assert(p->connections);
 =

     return p;
 =

@@ -2527,11 +2869,11 @@
 }
 =

 void pa_protocol_native_free(pa_protocol_native *p) {
-    struct connection *c;
-    assert(p);
+    connection *c;
+    pa_assert(p);
 =

     while ((c =3D pa_idxset_first(p->connections, NULL)))
-        connection_free(c);
+        connection_unlink(c);
     pa_idxset_free(p->connections, NULL, NULL);
 =

     if (p->server) {
@@ -2563,7 +2905,12 @@
     pa_xfree(p);
 }
 =

-pa_protocol_native* pa_protocol_native_new_iochannel(pa_core*core, pa_ioch=
annel *io, pa_module *m, pa_modargs *ma) {
+pa_protocol_native* pa_protocol_native_new_iochannel(
+        pa_core*core,
+        pa_iochannel *io,
+        pa_module *m,
+        pa_modargs *ma) {
+    =

     pa_protocol_native *p;
 =

     if (!(p =3D protocol_new_internal(core, m, ma)))

Modified: branches/lennart/src/pulsecore/protocol-simple.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
protocol-simple.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Dd=
iff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/protocol-simple.c (original)
+++ branches/lennart/src/pulsecore/protocol-simple.c Wed Aug  1 00:44:53 20=
07
@@ -67,7 +67,7 @@
 =

 PA_DECLARE_CLASS(connection);
 #define CONNECTION(o) (connection_cast(o))
-static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobjec=
t_check_type);
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
                      =

 struct pa_protocol_simple {
     pa_module *module;
@@ -91,9 +91,9 @@
 };
 =

 enum {
-    MESSAGE_REQUEST_DATA,      /* data requested from sink input from the =
main loop */
-    MESSAGE_POST_DATA,         /* data from source output to main loop */
-    MESSAGE_DROP_CONNECTION    /* Please drop a aconnection now */
+    CONNECTION_MESSAGE_REQUEST_DATA,      /* data requested from sink inpu=
t from the main loop */
+    CONNECTION_MESSAGE_POST_DATA,         /* data from source output to ma=
in loop */
+    CONNECTION_MESSAGE_DROP_CONNECTION    /* Please drop a aconnection now=
 */
 };
 =

 =

@@ -102,10 +102,40 @@
 #define RECORD_BUFFER_SECONDS (5)
 #define RECORD_BUFFER_FRAGMENTS (100)
 =

+static void connection_unlink(connection *c) {
+    pa_assert(c);
+
+    if (!c->protocol)
+        return;
+    =

+    if (c->sink_input) {
+        pa_sink_input_disconnect(c->sink_input);
+        pa_sink_input_unref(c->sink_input);
+        c->sink_input =3D NULL;
+    }
+
+    if (c->source_output) {
+        pa_source_output_disconnect(c->source_output);
+        pa_source_output_unref(c->source_output);
+        c->source_output =3D NULL;
+    }
+
+    if (c->client) {
+        pa_client_free(c->client);
+        c->client =3D NULL;
+    }
+
+    pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NUL=
L) =3D=3D c);
+    c->protocol =3D NULL;
+    connection_unref(c);
+}
+
 static void connection_free(pa_object *o) {
     connection *c =3D CONNECTION(o);
     pa_assert(c);
 =

+    connection_unref(c);
+    =

     if (c->playback.current_memblock)
         pa_memblock_unref(c->playback.current_memblock);
 =

@@ -119,32 +149,6 @@
     pa_xfree(c);
 }
 =

-static void connection_drop(connection *c) {
-    pa_assert(c);
-    =

-    if (!pa_idxset_remove_by_data(c->protocol->connections, c, NULL))
-        return;
-
-    if (c->sink_input) {
-        pa_sink_input_disconnect(c->sink_input);
-        pa_sink_input_unref(c->sink_input);
-        c->sink_input =3D NULL;
-    }
-
-    if (c->source_output) {
-        pa_source_output_disconnect(c->source_output);
-        pa_source_output_unref(c->source_output);
-        c->source_output =3D NULL;
-    }
-
-    if (c->client) {
-        pa_client_free(c->client);
-        c->client =3D NULL;
-    }
-
-    connection_unref(c);
-}
-
 static int do_read(connection *c) {
     pa_memchunk chunk;
     ssize_t r;
@@ -190,7 +194,7 @@
 =

     c->playback.memblock_index +=3D r;
 =

-    pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink=
_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
+    pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink=
_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
     pa_atomic_sub(&c->playback.missing, r);
 =

     return 0;
@@ -263,28 +267,28 @@
         pa_iochannel_free(c->io);
         c->io =3D NULL;
 =

-        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->=
sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL);
+        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->=
sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
     } else
-        connection_drop(c);
-}
-
-static int connection_process_msg(pa_msgobject *o, int code, void*userdata=
, pa_memchunk *chunk) {
+        connection_unlink(c);
+}
+
+static int connection_process_msg(pa_msgobject *o, int code, void*userdata=
, int64_t offset, pa_memchunk *chunk) {
     connection *c =3D CONNECTION(o);
     connection_assert_ref(c);
 =

     switch (code) {
-        case MESSAGE_REQUEST_DATA:
+        case CONNECTION_MESSAGE_REQUEST_DATA:
             do_work(c);
             break;
             =

-        case MESSAGE_POST_DATA:
+        case CONNECTION_MESSAGE_POST_DATA:
 /*             pa_log("got data %u", chunk->length); */
             pa_memblockq_push_align(c->output_memblockq, chunk);
             do_work(c);
             break;
 =

-        case MESSAGE_DROP_CONNECTION:
-            connection_drop(c);
+        case CONNECTION_MESSAGE_DROP_CONNECTION:
+            connection_unlink(c);
             break;
     }
 =

@@ -294,13 +298,13 @@
 /*** sink_input callbacks ***/
 =

 /* Called from thread context */
-static int sink_input_process_msg(pa_msgobject *o, int code, void *userdat=
a, pa_memchunk *chunk) {
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdat=
a, int64_t offset, pa_memchunk *chunk) {
     pa_sink_input *i =3D PA_SINK_INPUT(o);
     connection*c;
 =

-    pa_assert(i);
-    c =3D i->userdata;
-    pa_assert(c);
+    pa_sink_input_assert_ref(i);
+    c =3D CONNECTION(i->userdata);
+    connection_assert_ref(c);
 =

     switch (code) {
 =

@@ -330,7 +334,7 @@
         }
 =

         default:
-            return pa_sink_input_process_msg(o, code, userdata, chunk);
+            return pa_sink_input_process_msg(o, code, userdata, offset, ch=
unk);
     }
 }
 =

@@ -349,7 +353,7 @@
 /*     pa_log("peeked %u %i", r >=3D 0 ? chunk->length: 0, r); */
 =

     if (c->dead && r < 0)
-        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), M=
ESSAGE_DROP_CONNECTION, NULL, NULL, NULL);
+        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), C=
ONNECTION_MESSAGE_DROP_CONNECTION, NULL, 0, NULL, NULL);
 =

     return r;
 }
@@ -369,19 +373,20 @@
 =

     if (new > old) {
         if (pa_atomic_add(&c->playback.missing, new - old) <=3D 0)
-            pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c=
), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
+            pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c=
), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
     }
 }
 =

 /* Called from main context */
 static void sink_input_kill_cb(pa_sink_input *i) {
-    pa_assert(i);
-
-    connection_drop(CONNECTION(i->userdata));
+    pa_sink_input_assert_ref(i);
+
+    connection_unlink(CONNECTION(i->userdata));
 }
 =

 /*** source_output callbacks ***/
 =

+/* Called from thread context */
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *=
chunk) {
     connection *c;
 =

@@ -390,24 +395,22 @@
     pa_assert(c);
     pa_assert(chunk);
 =

-    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSA=
GE_POST_DATA, NULL, chunk, NULL);
-}
-
+    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNE=
CTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
+}
+
+/* Called from main context */
 static void source_output_kill_cb(pa_source_output *o) {
-    connection*c;
-
-    pa_assert(o);
-    c =3D o->userdata;
-    pa_assert(c);
-
-    connection_drop(c);
-}
-
+    pa_source_output_assert_ref(o);
+
+    connection_unlink(CONNECTION(o->userdata));
+}
+
+/* Called from main context */
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
     connection*c;
 =

     pa_assert(o);
-    c =3D o->userdata;
+    c =3D CONNECTION(o->userdata);
     pa_assert(c);
 =

     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), =
&c->source_output->sample_spec);
@@ -419,16 +422,16 @@
     connection*c;
 =

     pa_assert(client);
-    c =3D client->userdata;
-    pa_assert(c);
-
-    connection_drop(c);
+    c =3D CONNECTION(client->userdata);
+    pa_assert(c);
+
+    connection_unlink(c);
 }
 =

 /*** pa_iochannel callbacks ***/
 =

 static void io_callback(pa_iochannel*io, void *userdata) {
-    connection *c =3D userdata;
+    connection *c =3D CONNECTION(userdata);
 =

     pa_assert(io);
     pa_assert(c);
@@ -453,7 +456,7 @@
         return;
     }
 =

-    c =3D pa_msgobject_new(connection, connection_check_type);
+    c =3D pa_msgobject_new(connection);
     c->parent.parent.free =3D connection_free;
     c->parent.process_msg =3D connection_process_msg;
     c->io =3D io;
@@ -547,7 +550,6 @@
         pa_source_output_put(c->source_output);
     }
 =

-
     pa_iochannel_set_callback(c->io, io_callback, c);
     pa_idxset_put(p->connections, c, NULL);
 =

@@ -555,7 +557,7 @@
 =

 fail:
     if (c)
-        connection_drop(c);
+        connection_unlink(c);
 }
 =

 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server=
 *server, pa_module *m, pa_modargs *ma) {
@@ -618,7 +620,7 @@
 =

     if (p->connections) {
         while((c =3D pa_idxset_first(p->connections, NULL)))
-            connection_drop(c);
+            connection_unlink(c);
 =

         pa_idxset_free(p->connections, NULL, NULL);
     }

Modified: branches/lennart/src/pulsecore/sink-input.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
sink-input.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/sink-input.c (original)
+++ branches/lennart/src/pulsecore/sink-input.c Wed Aug  1 00:44:53 2007
@@ -45,7 +45,7 @@
 #define MOVE_BUFFER_LENGTH (1024*1024)
 #define SILENCE_BUFFER_LENGTH (64*1024)
 =

-static PA_DEFINE_CHECK_TYPE(pa_sink_input, sink_input_check_type, pa_msgob=
ject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_sink_input, pa_msgobject);
 =

 static void sink_input_free(pa_object *o);
 =

@@ -110,6 +110,7 @@
 =

     pa_return_null_if_fail(data->sink);
     pa_return_null_if_fail(pa_sink_get_state(data->sink) !=3D PA_SINK_DISC=
ONNECTED);
+    pa_return_null_if_fail(!data->sync_base || (data->sync_base->sink =3D=
=3D data->sink && pa_sink_input_get_state(data->sync_base) =3D=3D PA_SINK_I=
NPUT_CORKED));
 =

     if (!data->sample_spec_is_set)
         data->sample_spec =3D data->sink->sample_spec;
@@ -161,12 +162,12 @@
         data->resample_method =3D pa_resampler_get_method(resampler);
     }
 =

-    i =3D pa_msgobject_new(pa_sink_input, sink_input_check_type);
+    i =3D pa_msgobject_new(pa_sink_input);
     i->parent.parent.free =3D sink_input_free;
     i->parent.process_msg =3D pa_sink_input_process_msg;
 =

     i->core =3D core;
-    i->state =3D PA_SINK_INPUT_RUNNING;
+    i->state =3D data->start_corked ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT=
_RUNNING;
     i->flags =3D flags;
     i->name =3D pa_xstrdup(data->name);
     i->driver =3D pa_xstrdup(data->driver);
@@ -181,6 +182,16 @@
     i->volume =3D data->volume;
     i->muted =3D data->muted;
 =

+    if (data->sync_base) {
+        i->sync_next =3D data->sync_base->sync_next;
+        i->sync_prev =3D data->sync_base;
+
+        if (data->sync_base->sync_next)
+            data->sync_base->sync_next->sync_prev =3D i;
+        data->sync_base->sync_next =3D i;
+    } else =

+        i->sync_next =3D i->sync_prev =3D NULL;
+    =

     i->peek =3D NULL;
     i->drop =3D NULL;
     i->kill =3D NULL;
@@ -213,6 +224,7 @@
 }
 =

 static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t st=
ate) {
+    pa_sink_input *ssync;
     pa_assert(i);
 =

     if (state =3D=3D PA_SINK_INPUT_DRAINED)
@@ -221,10 +233,15 @@
     if (i->state =3D=3D state)
         return 0;
 =

-    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INP=
UT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INP=
UT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 =

     i->state =3D state;
+    for (ssync =3D i->sync_prev; ssync; ssync =3D ssync->sync_prev)
+        ssync->state =3D state;
+    for (ssync =3D i->sync_next; ssync; ssync =3D ssync->sync_next)
+        ssync->state =3D state;
+    =

     return 0;
 }
 =

@@ -232,10 +249,16 @@
     pa_assert(i);
     pa_return_if_fail(i->state !=3D PA_SINK_INPUT_DISCONNECTED);
 =

-    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_M=
ESSAGE_REMOVE_INPUT, i, NULL);
+    if (i->sync_prev)
+        i->sync_prev->sync_next =3D i->sync_next;
+    if (i->sync_next)
+        i->sync_next->sync_prev =3D i->sync_prev;
+        =

+    i->sync_prev =3D i->sync_next =3D NULL;
+    =

+    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_M=
ESSAGE_REMOVE_INPUT, i, 0, NULL);
     pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL);
     pa_idxset_remove_by_data(i->sink->inputs, i, NULL);
-    pa_sink_input_unref(i);
 =

     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|P=
A_SUBSCRIPTION_EVENT_REMOVE, i->index);
 =

@@ -248,6 +271,7 @@
     i->kill =3D NULL;
     i->get_latency =3D NULL;
     i->underrun =3D NULL;
+    pa_sink_input_unref(i);
 }
 =

 static void sink_input_free(pa_object *o) {
@@ -281,7 +305,7 @@
     i->thread_info.volume =3D i->volume;
     i->thread_info.muted =3D i->muted;
 =

-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_M=
ESSAGE_ADD_INPUT, pa_sink_input_ref(i), NULL, (pa_free_cb_t) pa_sink_input_=
unref);
+    pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_M=
ESSAGE_ADD_INPUT, i, 0, NULL);
     pa_sink_update_status(i->sink);
 =

     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|P=
A_SUBSCRIPTION_EVENT_NEW, i->index);
@@ -299,7 +323,7 @@
 =

     pa_sink_input_assert_ref(i);
 =

-    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INP=
UT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+    if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INP=
UT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0)
         r =3D 0;
 =

     if (i->get_latency)
@@ -509,7 +533,7 @@
 =

     i->volume =3D *volume;
 =

-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_M=
ESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree=
);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_M=
ESSAGE_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xf=
ree);
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|P=
A_SUBSCRIPTION_EVENT_CHANGE, i->index);
 }
 =

@@ -528,7 +552,7 @@
 =

     i->muted =3D mute;
 =

-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_M=
ESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_M=
ESSAGE_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|P=
A_SUBSCRIPTION_EVENT_CHANGE, i->index);
 }
 =

@@ -553,7 +577,7 @@
 =

     i->sample_spec.rate =3D rate;
 =

-    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_M=
ESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
+    pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_M=
ESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), 0, NULL, NULL);
 =

     pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|P=
A_SUBSCRIPTION_EVENT_CHANGE, i->index);
     return 0;
@@ -741,7 +765,7 @@
 /*     return 0; */
 }
 =

-int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, p=
a_memchunk *chunk) {
+int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, i=
nt64_t offset, pa_memchunk *chunk) {
     pa_sink_input *i =3D PA_SINK_INPUT(o);
 =

     pa_sink_input_assert_ref(i);
@@ -776,12 +800,28 @@
         }
 =

         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
+            pa_sink_input *ssync;
+            =

             if ((PA_PTR_TO_UINT(userdata) =3D=3D PA_SINK_INPUT_DRAINED || =
PA_PTR_TO_UINT(userdata) =3D=3D PA_SINK_INPUT_RUNNING) &&
                 (i->thread_info.state !=3D PA_SINK_INPUT_DRAINED) && (i->t=
hread_info.state !=3D PA_SINK_INPUT_RUNNING))
                 pa_atomic_store(&i->thread_info.drained, 1);
             =

             i->thread_info.state =3D PA_PTR_TO_UINT(userdata);
 =

+            for (ssync =3D i->thread_info.sync_prev; ssync; ssync =3D ssyn=
c->thread_info.sync_prev) {
+                if ((PA_PTR_TO_UINT(userdata) =3D=3D PA_SINK_INPUT_DRAINED=
 || PA_PTR_TO_UINT(userdata) =3D=3D PA_SINK_INPUT_RUNNING) &&
+                    (ssync->thread_info.state !=3D PA_SINK_INPUT_DRAINED) =
&& (ssync->thread_info.state !=3D PA_SINK_INPUT_RUNNING))
+                    pa_atomic_store(&ssync->thread_info.drained, 1);
+                ssync->thread_info.state =3D PA_PTR_TO_UINT(userdata);
+            }
+            =

+            for (ssync =3D i->thread_info.sync_next; ssync; ssync =3D ssyn=
c->thread_info.sync_next) {
+                if ((PA_PTR_TO_UINT(userdata) =3D=3D PA_SINK_INPUT_DRAINED=
 || PA_PTR_TO_UINT(userdata) =3D=3D PA_SINK_INPUT_RUNNING) &&
+                    (ssync->thread_info.state !=3D PA_SINK_INPUT_DRAINED) =
&& (ssync->thread_info.state !=3D PA_SINK_INPUT_RUNNING))
+                    pa_atomic_store(&ssync->thread_info.drained, 1);
+                ssync->thread_info.state =3D PA_PTR_TO_UINT(userdata);
+            }
+            =

             return 0;
         }
     }

Modified: branches/lennart/src/pulsecore/sink-input.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
sink-input.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/sink-input.h (original)
+++ branches/lennart/src/pulsecore/sink-input.h Wed Aug  1 00:44:53 2007
@@ -71,6 +71,8 @@
     pa_sample_spec sample_spec;
     pa_channel_map channel_map;
 =

+    pa_sink_input *sync_prev, *sync_next;
+    =

     pa_cvolume volume;
     int muted;
 =

@@ -97,6 +99,8 @@
         /*         size_t move_silence; */
         pa_memblock *silence_memblock;               /* may be NULL */
 =

+        pa_sink_input *sync_prev, *sync_next;
+        =

         pa_cvolume volume;
         int muted;
     } thread_info;
@@ -133,6 +137,9 @@
     int muted_is_set;
 =

     pa_resample_method_t resample_method;
+
+    int start_corked;
+    pa_sink_input *sync_base;
 } pa_sink_input_new_data;
 =

 pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data=
 *data);
@@ -179,6 +186,6 @@
 =

 int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *v=
olume);
 void pa_sink_input_drop(pa_sink_input *i, size_t length);
-int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, p=
a_memchunk *chunk);
+int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, i=
nt64_t offset, pa_memchunk *chunk);
 =

 #endif

Modified: branches/lennart/src/pulsecore/sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
sink.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/sink.c (original)
+++ branches/lennart/src/pulsecore/sink.c Wed Aug  1 00:44:53 2007
@@ -48,7 +48,7 @@
 #define MAX_MIX_CHANNELS 32
 #define SILENCE_BUFFER_LENGTH (64*1024)
 =

-static PA_DEFINE_CHECK_TYPE(pa_sink, sink_check_type, pa_msgobject_check_t=
ype);
+static PA_DEFINE_CHECK_TYPE(pa_sink, pa_msgobject);
 =

 static void sink_free(pa_object *s);
 =

@@ -80,7 +80,7 @@
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(name && pa_utf8_valid(name) && *name);
 =

-    s =3D pa_msgobject_new(pa_sink, sink_check_type);
+    s =3D pa_msgobject_new(pa_sink);
 =

     if (!(name =3D pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fai=
l))) {
         pa_xfree(s);
@@ -161,7 +161,7 @@
         if ((ret =3D s->set_state(s, state)) < 0)
             return -1;
 =

-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_S=
ET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_S=
ET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 =

     s->state =3D state;
@@ -264,7 +264,7 @@
 void pa_sink_ping(pa_sink *s) {
     pa_sink_assert_ref(s);
 =

-    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING,=
 NULL, NULL, NULL);
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING,=
 NULL, 0, NULL, NULL);
 }
 =

 static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxi=
nfo) {
@@ -530,7 +530,7 @@
     if (s->get_latency)
         return s->get_latency(s);
 =

-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_G=
ET_LATENCY, &usec, NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_G=
ET_LATENCY, &usec, 0, NULL) < 0)
         return 0;
 =

     return usec;
@@ -549,7 +549,7 @@
         s->set_volume =3D NULL;
 =

     if (!s->set_volume)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_S=
ET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_S=
ET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
 =

     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCR=
IPTION_EVENT_CHANGE, s->index);
@@ -566,7 +566,7 @@
         s->get_volume =3D NULL;
 =

     if (!s->get_volume && s->refresh_volume)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_G=
ET_VOLUME, &s->volume, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_G=
ET_VOLUME, &s->volume, 0, NULL);
 =

     if (!pa_cvolume_equal(&old_volume, &s->volume))
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCR=
IPTION_EVENT_CHANGE, s->index);
@@ -585,7 +585,7 @@
         s->set_mute =3D NULL;
 =

     if (!s->set_mute)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_S=
ET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_S=
ET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
 =

     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCR=
IPTION_EVENT_CHANGE, s->index);
@@ -602,7 +602,7 @@
         s->get_mute =3D NULL;
 =

     if (!s->get_mute && s->refresh_mute)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_G=
ET_MUTE, &s->muted, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_G=
ET_MUTE, &s->muted, 0, NULL);
 =

     if (old_muted !=3D s->muted)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCR=
IPTION_EVENT_CHANGE, s->index);
@@ -660,21 +660,58 @@
     return ret;
 }
 =

-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc=
hunk *chunk) {
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t=
 offset, pa_memchunk *chunk) {
     pa_sink *s =3D PA_SINK(o);
     pa_sink_assert_ref(s);
 =

     switch ((pa_sink_message_t) code) {
+        =

         case PA_SINK_MESSAGE_ADD_INPUT: {
             pa_sink_input *i =3D userdata;
             pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->inde=
x), pa_sink_input_ref(i));
+
+            /* Since the caller sleeps in pa_sink_input_put(), we can
+             * safely access data outside of thread_info even though
+             * it is mutable */
+
+            if ((i->thread_info.sync_prev =3D i->sync_prev)) {
+                pa_assert(i->sink =3D=3D i->thread_info.sync_prev->sink);
+                pa_assert(i->sync_prev->sync_next =3D=3D i);
+                i->thread_info.sync_prev->thread_info.sync_next =3D i;
+            }
+
+            if ((i->thread_info.sync_next =3D i->sync_next)) {
+                pa_assert(i->sink =3D=3D i->thread_info.sync_next->sink);
+                pa_assert(i->sync_next->sync_prev =3D=3D i);
+                i->thread_info.sync_next->thread_info.sync_prev =3D i;
+            }
+
             return 0;
         }
 =

         case PA_SINK_MESSAGE_REMOVE_INPUT: {
             pa_sink_input *i =3D userdata;
+
+            /* Since the caller sleeps in pa_sink_input_disconnect(),
+             * we can safely access data outside of thread_info even
+             * though it is mutable */
+
+            pa_assert(!i->thread_info.sync_prev);
+            pa_assert(!i->thread_info.sync_next);
+            =

+            if (i->thread_info.sync_prev) {
+                i->thread_info.sync_prev->thread_info.sync_next =3D i->thr=
ead_info.sync_prev->sync_next;
+                i->thread_info.sync_prev =3D NULL;
+            }
+
+            if (i->thread_info.sync_next) {
+                i->thread_info.sync_next->thread_info.sync_prev =3D i->thr=
ead_info.sync_next->sync_prev;
+                i->thread_info.sync_next =3D NULL;
+            }
+            =

             if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(=
i->index)))
                 pa_sink_input_unref(i);
+            =

             return 0;
         }
 =

@@ -698,6 +735,7 @@
             return 0;
 =

         case PA_SINK_MESSAGE_SET_STATE:
+            =

             s->thread_info.state =3D PA_PTR_TO_UINT(userdata);
             return 0;
 =


Modified: branches/lennart/src/pulsecore/sink.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
sink.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/sink.h (original)
+++ branches/lennart/src/pulsecore/sink.h Wed Aug  1 00:44:53 2007
@@ -156,7 +156,7 @@
 void pa_sink_render_into(pa_sink*s, pa_memchunk *target);
 void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target);
 =

-int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc=
hunk *chunk);
+int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t=
 offset, pa_memchunk *chunk);
 =

 static inline int PA_SINK_OPENED(pa_sink_state_t x) {
     return x =3D=3D PA_SINK_RUNNING || x =3D=3D PA_SINK_IDLE;

Modified: branches/lennart/src/pulsecore/sound-file-stream.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
sound-file-stream.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=
=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/sound-file-stream.c (original)
+++ branches/lennart/src/pulsecore/sound-file-stream.c Wed Aug  1 00:44:53 =
2007
@@ -56,7 +56,7 @@
 =

 PA_DECLARE_CLASS(file_stream);
 #define FILE_STREAM(o) (file_stream_cast(o))
-static PA_DEFINE_CHECK_TYPE(file_stream, file_stream_check_type, pa_msgobj=
ect_check_type);
+static PA_DEFINE_CHECK_TYPE(file_stream, pa_msgobject);
 =

 static void file_stream_free(pa_object *o) {
     file_stream *u =3D FILE_STREAM(o);
@@ -85,7 +85,7 @@
     }
 }
 =

-static int file_stream_process_msg(pa_msgobject *o, int code, void*userdat=
a, pa_memchunk *chunk) {
+static int file_stream_process_msg(pa_msgobject *o, int code, void*userdat=
a, int64_t offset, pa_memchunk *chunk) {
     file_stream *u =3D FILE_STREAM(o);
     file_stream_assert_ref(u);
     =

@@ -154,7 +154,7 @@
                 pa_memblock_unref(u->memchunk.memblock);
                 pa_memchunk_reset(&u->memchunk);
                 =

-                pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MES=
SAGE_DROP_FILE_STREAM, NULL, NULL, NULL);
+                pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MES=
SAGE_DROP_FILE_STREAM, NULL, 0, NULL, NULL);
 =

                 sf_close(u->sndfile);
                 u->sndfile =3D NULL;
@@ -224,7 +224,7 @@
     pa_assert(sink);
     pa_assert(fname);
 =

-    u =3D pa_msgobject_new(file_stream, file_stream_check_type);
+    u =3D pa_msgobject_new(file_stream);
     u->parent.parent.free =3D file_stream_free;
     u->parent.process_msg =3D file_stream_process_msg;
     u->core =3D sink->core;

Modified: branches/lennart/src/pulsecore/source-output.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
source-output.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/source-output.c (original)
+++ branches/lennart/src/pulsecore/source-output.c Wed Aug  1 00:44:53 2007
@@ -38,7 +38,7 @@
 =

 #include "source-output.h"
 =

-static PA_DEFINE_CHECK_TYPE(pa_source_output, source_output_check_type, pa=
_msgobject_check_type);
+static PA_DEFINE_CHECK_TYPE(pa_source_output, pa_msgobject);
 =

 static void source_output_free(pa_object* mo);
 =

@@ -130,12 +130,12 @@
         data->resample_method =3D pa_resampler_get_method(resampler);
     }
 =

-    o =3D pa_msgobject_new(pa_source_output, source_output_check_type);
+    o =3D pa_msgobject_new(pa_source_output);
     o->parent.parent.free =3D source_output_free;
     o->parent.process_msg =3D pa_source_output_process_msg;
 =

     o->core =3D core;
-    o->state =3D PA_SOURCE_OUTPUT_RUNNING;
+    o->state =3D data->corked ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT=
_RUNNING;
     o->flags =3D flags;
     o->name =3D pa_xstrdup(data->name);
     o->driver =3D pa_xstrdup(data->driver);
@@ -176,7 +176,7 @@
     if (o->state =3D=3D state)
         return 0;
 =

-    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE=
_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE=
_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 =

     o->state =3D state;
@@ -187,7 +187,7 @@
     pa_assert(o);
     pa_return_if_fail(o->state !=3D PA_SOURCE_OUTPUT_DISCONNECTED);
 =

-    pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SO=
URCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
+    pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SO=
URCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
 =

     pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL);
     pa_idxset_remove_by_data(o->source->outputs, o, NULL);
@@ -225,7 +225,7 @@
 void pa_source_output_put(pa_source_output *o) {
     pa_source_output_assert_ref(o);
 =

-    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SO=
URCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), NULL, (pa_free_cb_t) pa_s=
ource_output_unref);
+    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SO=
URCE_MESSAGE_ADD_OUTPUT, pa_source_output_ref(o), 0, NULL, (pa_free_cb_t) p=
a_source_output_unref);
     pa_source_update_status(o->source);
 =

     pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUT=
PUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
@@ -243,7 +243,7 @@
 =

     pa_source_output_assert_ref(o);
 =

-    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE=
_OUTPUT_MESSAGE_GET_LATENCY, &r, NULL) < 0)
+    if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE=
_OUTPUT_MESSAGE_GET_LATENCY, &r, 0, NULL) < 0)
         r =3D 0;
 =

     if (o->get_latency)
@@ -293,7 +293,7 @@
 =

     o->sample_spec.rate =3D rate;
 =

-    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUT=
PUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), NULL, NULL);
+    pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUT=
PUT_MESSAGE_SET_RATE, PA_UINT_TO_PTR(rate), 0, NULL, NULL);
 =

     pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUT=
PUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index);
     return 0;
@@ -380,7 +380,7 @@
 /*     return 0; */
 }
 =

-int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdat=
a, pa_memchunk* chunk) {
+int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdat=
a, int64_t offset, pa_memchunk* chunk) {
     pa_source_output *o =3D PA_SOURCE_OUTPUT(mo);
 =

     pa_source_output_assert_ref(o);

Modified: branches/lennart/src/pulsecore/source-output.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
source-output.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/source-output.h (original)
+++ branches/lennart/src/pulsecore/source-output.h Wed Aug  1 00:44:53 2007
@@ -103,6 +103,8 @@
     int channel_map_is_set;
 =

     pa_resample_method_t resample_method;
+
+    int corked;
 } pa_source_output_new_data;
 =

 pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output=
_new_data *data);
@@ -142,6 +144,6 @@
 /* To be used exclusively by the source driver thread */
 =

 void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk);
-int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdat=
a, pa_memchunk *chunk);
+int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdat=
a, int64_t offset, pa_memchunk *chunk);
 =

 #endif

Modified: branches/lennart/src/pulsecore/source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
source.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/source.c (original)
+++ branches/lennart/src/pulsecore/source.c Wed Aug  1 00:44:53 2007
@@ -42,7 +42,7 @@
 =

 #include "source.h"
 =

-static PA_DEFINE_CHECK_TYPE(pa_source, source_check_type, pa_msgobject_che=
ck_type);
+static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
 =

 static void source_free(pa_object *o);
 =

@@ -73,7 +73,7 @@
     pa_return_null_if_fail(!driver || pa_utf8_valid(driver));
     pa_return_null_if_fail(pa_utf8_valid(name) && *name);
 =

-    s =3D pa_msgobject_new(pa_source, source_check_type);
+    s =3D pa_msgobject_new(pa_source);
 =

     if (!(name =3D pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, f=
ail))) {
         pa_xfree(s);
@@ -140,7 +140,7 @@
         if ((ret =3D s->set_state(s, state)) < 0)
             return -1;
 =

-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
         return -1;
 =

     s->state =3D state;
@@ -222,7 +222,7 @@
 void pa_source_ping(pa_source *s) {
     pa_source_assert_ref(s);
 =

-    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PIN=
G, NULL, NULL, NULL);
+    pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PIN=
G, NULL, 0, NULL, NULL);
 }
 =

 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
@@ -266,7 +266,7 @@
     if (s->get_latency)
         return s->get_latency(s);
 =

-    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_GET_LATENCY, &usec, NULL) < 0)
+    if (pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_GET_LATENCY, &usec, 0, NULL) < 0)
         return 0;
 =

     return usec;
@@ -285,7 +285,7 @@
         s->set_volume =3D NULL;
 =

     if (!s->set_volume)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), NULL, pa_xfree);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_SET_VOLUME, pa_xnewdup(struct pa_cvolume, volume, 1), 0, NULL, pa_xfree);
 =

     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBS=
CRIPTION_EVENT_CHANGE, s->index);
@@ -301,7 +301,7 @@
         s->get_volume =3D NULL;
 =

     if (!s->get_volume && s->refresh_volume)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_GET_VOLUME, &s->volume, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_GET_VOLUME, &s->volume, 0, NULL);
 =

     if (!pa_cvolume_equal(&old_volume, &s->volume))
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBS=
CRIPTION_EVENT_CHANGE, s->index);
@@ -320,7 +320,7 @@
         s->set_mute =3D NULL;
 =

     if (!s->set_mute)
-        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_SET_MUTE, PA_UINT_TO_PTR(mute), NULL, NULL);
+        pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_SET_MUTE, PA_UINT_TO_PTR(mute), 0, NULL, NULL);
 =

     if (changed)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBS=
CRIPTION_EVENT_CHANGE, s->index);
@@ -337,7 +337,7 @@
         s->get_mute =3D NULL;
 =

     if (!s->get_mute && s->refresh_muted)
-        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_GET_MUTE, &s->muted, NULL);
+        pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE=
_GET_MUTE, &s->muted, 0, NULL);
 =

     if (old_muted !=3D s->muted)
         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBS=
CRIPTION_EVENT_CHANGE, s->index);
@@ -384,7 +384,7 @@
     return pa_idxset_size(s->outputs);
 }
 =

-int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, =
pa_memchunk *chunk) {
+int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, =
int64_t offset, pa_memchunk *chunk) {
     pa_source *s =3D PA_SOURCE(object);
     pa_source_assert_ref(s);
 =


Modified: branches/lennart/src/pulsecore/source.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
source.h?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/source.h (original)
+++ branches/lennart/src/pulsecore/source.h Wed Aug  1 00:44:53 2007
@@ -146,7 +146,7 @@
 /* To be used exclusively by the source driver thread */
 =

 void pa_source_post(pa_source*s, const pa_memchunk *b);
-int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, pa_me=
mchunk *chunk);
+int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64=
_t, pa_memchunk *chunk);
 =

 static inline int PA_SOURCE_OPENED(pa_source_state_t x) {
     return x =3D=3D PA_SOURCE_RUNNING || x =3D=3D PA_SOURCE_IDLE;

Modified: branches/lennart/src/tests/asyncmsgq-test.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/tests/asyn=
cmsgq-test.c?rev=3D1562&root=3Dpulseaudio&r1=3D1561&r2=3D1562&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/tests/asyncmsgq-test.c (original)
+++ branches/lennart/src/tests/asyncmsgq-test.c Wed Aug  1 00:44:53 2007
@@ -49,7 +49,7 @@
     do {
         int code =3D 0;
 =

-        pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, 1) =3D=
=3D 0);
+        pa_assert_se(pa_asyncmsgq_get(q, NULL, &code, NULL, NULL, NULL, 1)=
 =3D=3D 0);
 =

         switch (code) {
 =

@@ -85,22 +85,22 @@
     pa_assert_se(t =3D pa_thread_new(the_thread, q));
 =

     printf("Operation A post\n");
-    pa_asyncmsgq_post(q, NULL, OPERATION_A, NULL, NULL, NULL);
+    pa_asyncmsgq_post(q, NULL, OPERATION_A, NULL, 0, NULL, NULL);
 =

     pa_thread_yield();
 =

     printf("Operation B post\n");
-    pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, NULL, NULL);
+    pa_asyncmsgq_post(q, NULL, OPERATION_B, NULL, 0, NULL, NULL);
     =

     pa_thread_yield();
 =

     printf("Operation C send\n");
-    pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, NULL);
+    pa_asyncmsgq_send(q, NULL, OPERATION_C, NULL, 0, NULL);
 =

     pa_thread_yield();
 =

     printf("Quit post\n");
-    pa_asyncmsgq_post(q, NULL, QUIT, NULL, NULL, NULL);
+    pa_asyncmsgq_post(q, NULL, QUIT, NULL, 0, NULL, NULL);
 =

     pa_thread_free(t);
 =





More information about the pulseaudio-commits mailing list