[pulseaudio-commits] r1585 - in /branches/lennart/src: Makefile.am pulsecore/protocol-esound.c

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Mon Aug 6 14:47:54 PDT 2007


Author: lennart
Date: Mon Aug  6 23:47:53 2007
New Revision: 1585

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=3D1585&root=3Dpulseaudio&vi=
ew=3Drev
Log:
port esound protocol to new lock-free core

Modified:
    branches/lennart/src/Makefile.am
    branches/lennart/src/pulsecore/protocol-esound.c

Modified: branches/lennart/src/Makefile.am
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/Makefile.a=
m?rev=3D1585&root=3Dpulseaudio&r1=3D1584&r2=3D1585&view=3Ddiff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/Makefile.am (original)
+++ branches/lennart/src/Makefile.am Mon Aug  6 23:47:53 2007
@@ -723,9 +723,8 @@
 		libstrlist.la \
 		libprotocol-simple.la \
 		libprotocol-http.la \
-		libprotocol-native.la
-
-#		libprotocol-esound.la
+		libprotocol-native.la \
+		libprotocol-esound.la
 =

 # We need to emulate sendmsg/recvmsg to support this on Win32
 if !OS_IS_WIN32
@@ -881,8 +880,8 @@
 		module-http-protocol-tcp.la \
 		module-sine.la \
 		module-native-protocol-tcp.la \
-		module-native-protocol-fd.la
-#		module-esound-protocol-tcp.la \
+		module-native-protocol-fd.la \
+		module-esound-protocol-tcp.la
 #		module-combine.la \
 #		module-tunnel-sink.la \
 #		module-tunnel-source.la \
@@ -900,8 +899,8 @@
 		module-cli-protocol-unix.la \
 		module-simple-protocol-unix.la \
 		module-http-protocol-unix.la \
-		module-native-protocol-unix.la
-#		module-esound-protocol-unix.la =

+		module-native-protocol-unix.la \
+		module-esound-protocol-unix.la =

 endif
 =

 if HAVE_MKFIFO
@@ -910,11 +909,11 @@
 		module-pipe-source.la
 endif
 =

-#if !OS_IS_WIN32
-#modlibexec_LTLIBRARIES +=3D \
-#		module-esound-compat-spawnfd.la \
-#		module-esound-compat-spawnpid.la
-#endif
+if !OS_IS_WIN32
+modlibexec_LTLIBRARIES +=3D \
+		module-esound-compat-spawnfd.la \
+		module-esound-compat-spawnpid.la
+endif
 =

 if HAVE_REGEX
 modlibexec_LTLIBRARIES +=3D \
@@ -940,10 +939,10 @@
 		module-alsa-source.la
 endif
 =

-if HAVE_SOLARIS
-modlibexec_LTLIBRARIES +=3D \
-		module-solaris.la
-endif
+#if HAVE_SOLARIS
+#modlibexec_LTLIBRARIES +=3D \
+#		module-solaris.la
+#endif
 =

 if HAVE_AVAHI
 modlibexec_LTLIBRARIES +=3D \
@@ -974,10 +973,10 @@
 		gconf-helper
 endif
 =

-if OS_IS_WIN32
-modlibexec_LTLIBRARIES +=3D \
-		module-waveout.la
-endif
+#if OS_IS_WIN32
+#modlibexec_LTLIBRARIES +=3D \
+#		module-waveout.la
+#endif
 =

 if HAVE_HAL
 modlibexec_LTLIBRARIES +=3D \
@@ -1099,23 +1098,23 @@
 =

 # EsounD protocol
 =

-#module_esound_protocol_tcp_la_SOURCES =3D modules/module-protocol-stub.c
-#module_esound_protocol_tcp_la_CFLAGS =3D -DUSE_TCP_SOCKETS -DUSE_PROTOCOL=
_ESOUND $(AM_CFLAGS)
-#module_esound_protocol_tcp_la_LDFLAGS =3D -module -avoid-version
-#module_esound_protocol_tcp_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la lib=
protocol-esound.la libsocket-server.la
-
-#module_esound_protocol_unix_la_SOURCES =3D modules/module-protocol-stub.c
-#module_esound_protocol_unix_la_CFLAGS =3D -DUSE_UNIX_SOCKETS -DUSE_PROTOC=
OL_ESOUND $(AM_CFLAGS)
-#module_esound_protocol_unix_la_LDFLAGS =3D -module -avoid-version
-#module_esound_protocol_unix_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la li=
bprotocol-esound.la libsocket-server.la libsocket-util.la
-
-#module_esound_compat_spawnfd_la_SOURCES =3D modules/module-esound-compat-=
spawnfd.c
-#module_esound_compat_spawnfd_la_LDFLAGS =3D -module -avoid-version
-#module_esound_compat_spawnfd_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la
-
-#module_esound_compat_spawnpid_la_SOURCES =3D modules/module-esound-compat=
-spawnpid.c
-#module_esound_compat_spawnpid_la_LDFLAGS =3D -module -avoid-version
-#module_esound_compat_spawnpid_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la
+module_esound_protocol_tcp_la_SOURCES =3D modules/module-protocol-stub.c
+module_esound_protocol_tcp_la_CFLAGS =3D -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_=
ESOUND $(AM_CFLAGS)
+module_esound_protocol_tcp_la_LDFLAGS =3D -module -avoid-version
+module_esound_protocol_tcp_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la libp=
rotocol-esound.la libsocket-server.la
+
+module_esound_protocol_unix_la_SOURCES =3D modules/module-protocol-stub.c
+module_esound_protocol_unix_la_CFLAGS =3D -DUSE_UNIX_SOCKETS -DUSE_PROTOCO=
L_ESOUND $(AM_CFLAGS)
+module_esound_protocol_unix_la_LDFLAGS =3D -module -avoid-version
+module_esound_protocol_unix_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la lib=
protocol-esound.la libsocket-server.la libsocket-util.la
+
+module_esound_compat_spawnfd_la_SOURCES =3D modules/module-esound-compat-s=
pawnfd.c
+module_esound_compat_spawnfd_la_LDFLAGS =3D -module -avoid-version
+module_esound_compat_spawnfd_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la
+
+module_esound_compat_spawnpid_la_SOURCES =3D modules/module-esound-compat-=
spawnpid.c
+module_esound_compat_spawnpid_la_LDFLAGS =3D -module -avoid-version
+module_esound_compat_spawnpid_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la
 =

 #module_esound_sink_la_SOURCES =3D modules/module-esound-sink.c
 #module_esound_sink_la_LDFLAGS =3D -module -avoid-version
@@ -1201,9 +1200,9 @@
 =

 # Solaris
 =

-module_solaris_la_SOURCES =3D modules/module-solaris.c
-module_solaris_la_LDFLAGS =3D -module -avoid-version
-module_solaris_la_LIBADD =3D $(AM_LIBADD) libiochannel.la
+#module_solaris_la_SOURCES =3D modules/module-solaris.c
+#module_solaris_la_LDFLAGS =3D -module -avoid-version
+#module_solaris_la_LIBADD =3D $(AM_LIBADD) libiochannel.la
 =

 # Avahi
 =

@@ -1228,10 +1227,10 @@
 =

 # Windows waveout
 =

-module_waveout_la_SOURCES =3D modules/module-waveout.c
-module_waveout_la_LDFLAGS =3D -module -avoid-version
-module_waveout_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la -lwinmm
-module_waveout_la_CFLAGS =3D $(AM_CFLAGS)
+#module_waveout_la_SOURCES =3D modules/module-waveout.c
+#module_waveout_la_LDFLAGS =3D -module -avoid-version
+#module_waveout_la_LIBADD =3D $(AM_LIBADD) libpulsecore.la -lwinmm
+#module_waveout_la_CFLAGS =3D $(AM_CFLAGS)
 =

 # Hardware autodetection module
 module_detect_la_SOURCES =3D modules/module-detect.c

Modified: branches/lennart/src/pulsecore/protocol-esound.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/=
protocol-esound.c?rev=3D1585&root=3Dpulseaudio&r1=3D1584&r2=3D1585&view=3Dd=
iff
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D
--- branches/lennart/src/pulsecore/protocol-esound.c (original)
+++ branches/lennart/src/pulsecore/protocol-esound.c Mon Aug  6 23:47:53 20=
07
@@ -29,7 +29,6 @@
 #include <errno.h>
 #include <string.h>
 #include <stdio.h>
-#include <assert.h>
 #include <stdlib.h>
 #include <limits.h>
 =

@@ -53,6 +52,7 @@
 #include <pulsecore/core-util.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/ipacl.h>
+#include <pulsecore/macro.h>
 =

 #include "endianmacros.h"
 =

@@ -77,7 +77,9 @@
 =

 /* This is heavily based on esound's code */
 =

-struct connection {
+typedef struct connection {
+    pa_msgobject parent;
+    =

     uint32_t index;
     int dead;
     pa_protocol_esound *protocol;
@@ -100,6 +102,7 @@
     struct {
         pa_memblock *current_memblock;
         size_t memblock_index, fragment_size;
+        pa_atomic_t missing;
     } playback;
 =

     struct {
@@ -109,46 +112,62 @@
     } scache;
 =

     pa_time_event *auth_timeout_event;
-};
+} connection;
+
+PA_DECLARE_CLASS(connection);
+#define CONNECTION(o) (connection_cast(o))
+static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
 =

 struct pa_protocol_esound {
-    int public;
     pa_module *module;
     pa_core *core;
+    int public;
     pa_socket_server *server;
     pa_idxset *connections;
+    =

     char *sink_name, *source_name;
     unsigned n_player;
     uint8_t esd_key[ESD_KEY_LEN];
     pa_ip_acl *auth_ip_acl;
 };
 =

+enum {
+    SINK_INPUT_MESSAGE_POST_DATA =3D PA_SINK_INPUT_MESSAGE_MAX, /* data fr=
om main loop to sink input */
+    SINK_INPUT_MESSAGE_DISABLE_PREBUF
+};
+
+enum {
+    CONNECTION_MESSAGE_REQUEST_DATA,
+    CONNECTION_MESSAGE_POST_DATA,
+    CONNECTION_MESSAGE_UNLINK_CONNECTION
+};
+
 typedef struct proto_handler {
     size_t data_length;
-    int (*proc)(struct connection *c, esd_proto_t request, const void *dat=
a, size_t length);
+    int (*proc)(connection *c, esd_proto_t request, const void *data, size=
_t length);
     const char *description;
 } esd_proto_handler_info_t;
 =

-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk,=
 size_t length);
+static void sink_input_drop_cb(pa_sink_input *i, size_t length);
 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk);
 static void sink_input_kill_cb(pa_sink_input *i);
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i);
+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdat=
a, int64_t offset, pa_memchunk *chunk);
 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
 =

 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *=
chunk);
 static void source_output_kill_cb(pa_source_output *o);
 =

-static int esd_proto_connect(struct connection *c, esd_proto_t request, co=
nst void *data, size_t length);
-static int esd_proto_stream_play(struct connection *c, esd_proto_t request=
, const void *data, size_t length);
-static int esd_proto_stream_record(struct connection *c, esd_proto_t reque=
st, const void *data, size_t length);
-static int esd_proto_get_latency(struct connection *c, esd_proto_t request=
, const void *data, size_t length);
-static int esd_proto_server_info(struct connection *c, esd_proto_t request=
, const void *data, size_t length);
-static int esd_proto_all_info(struct connection *c, esd_proto_t request, c=
onst void *data, size_t length);
-static int esd_proto_stream_pan(struct connection *c, esd_proto_t request,=
 const void *data, size_t length);
-static int esd_proto_sample_cache(struct connection *c, esd_proto_t reques=
t, const void *data, size_t length);
-static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t=
 request, const void *data, size_t length);
-static int esd_proto_sample_get_id(struct connection *c, esd_proto_t reque=
st, const void *data, size_t length);
-static int esd_proto_standby_or_resume(struct connection *c, esd_proto_t r=
equest, const void *data, size_t length);
+static int esd_proto_connect(connection *c, esd_proto_t request, const voi=
d *data, size_t length);
+static int esd_proto_stream_play(connection *c, esd_proto_t request, const=
 void *data, size_t length);
+static int esd_proto_stream_record(connection *c, esd_proto_t request, con=
st void *data, size_t length);
+static int esd_proto_get_latency(connection *c, esd_proto_t request, const=
 void *data, size_t length);
+static int esd_proto_server_info(connection *c, esd_proto_t request, const=
 void *data, size_t length);
+static int esd_proto_all_info(connection *c, esd_proto_t request, const vo=
id *data, size_t length);
+static int esd_proto_stream_pan(connection *c, esd_proto_t request, const =
void *data, size_t length);
+static int esd_proto_sample_cache(connection *c, esd_proto_t request, cons=
t void *data, size_t length);
+static int esd_proto_sample_free_or_play(connection *c, esd_proto_t reques=
t, const void *data, size_t length);
+static int esd_proto_sample_get_id(connection *c, esd_proto_t request, con=
st void *data, size_t length);
+static int esd_proto_standby_or_resume(connection *c, esd_proto_t request,=
 const void *data, size_t length);
 =

 /* the big map of protocol handler info */
 static struct proto_handler proto_map[ESD_PROTO_MAX] =3D {
@@ -185,25 +204,56 @@
     { 0,                              esd_proto_get_latency, "get latency"=
 }
 };
 =

-static void connection_free(struct connection *c) {
-    assert(c);
-    pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
-
-    if (c->state =3D=3D ESD_STREAMING_DATA)
-        c->protocol->n_player--;
-
-    pa_client_free(c->client);
+static void connection_unlink(connection *c) {
+    pa_assert(c);
+
+    if (!c->protocol)
+        return;
 =

     if (c->sink_input) {
         pa_sink_input_disconnect(c->sink_input);
         pa_sink_input_unref(c->sink_input);
+        c->sink_input =3D NULL;
     }
 =

     if (c->source_output) {
         pa_source_output_disconnect(c->source_output);
         pa_source_output_unref(c->source_output);
-    }
-
+        c->source_output =3D NULL;
+    }
+
+    if (c->client) {
+        pa_client_free(c->client);
+        c->client =3D NULL;
+    }
+    =

+    if (c->state =3D=3D ESD_STREAMING_DATA)
+        c->protocol->n_player--;
+
+    if (c->io) {
+        pa_iochannel_free(c->io);
+        c->io =3D NULL;
+    }
+
+    if (c->defer_event) {
+        c->protocol->core->mainloop->defer_free(c->defer_event);
+        c->defer_event =3D NULL;
+    }
+
+    if (c->auth_timeout_event) {
+        c->protocol->core->mainloop->time_free(c->auth_timeout_event);
+        c->auth_timeout_event =3D NULL;
+    }
+
+    pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NUL=
L) =3D=3D c);
+    c->protocol =3D NULL;
+    connection_unref(c);
+}
+
+static void connection_free(pa_object *obj) {
+    connection *c =3D CONNECTION(obj);
+    pa_assert(c);
+    =

     if (c->input_memblockq)
         pa_memblockq_free(c->input_memblockq);
     if (c->output_memblockq)
@@ -215,54 +265,44 @@
     pa_xfree(c->read_data);
     pa_xfree(c->write_data);
 =

-    if (c->io)
-        pa_iochannel_free(c->io);
-
-    if (c->defer_event)
-        c->protocol->core->mainloop->defer_free(c->defer_event);
-
     if (c->scache.memchunk.memblock)
         pa_memblock_unref(c->scache.memchunk.memblock);
     pa_xfree(c->scache.name);
 =

-    if (c->auth_timeout_event)
-        c->protocol->core->mainloop->time_free(c->auth_timeout_event);
-
     pa_xfree(c->original_name);
     pa_xfree(c);
 }
 =

-static void connection_write_prepare(struct connection *c, size_t length) {
+static void connection_write_prepare(connection *c, size_t length) {
     size_t t;
-    assert(c);
+    pa_assert(c);
 =

     t =3D c->write_data_length+length;
 =

     if (c->write_data_alloc < t)
         c->write_data =3D pa_xrealloc(c->write_data, c->write_data_alloc =
=3D t);
 =

-    assert(c->write_data);
-}
-
-static void connection_write(struct connection *c, const void *data, size_=
t length) {
+    pa_assert(c->write_data);
+}
+
+static void connection_write(connection *c, const void *data, size_t lengt=
h) {
     size_t i;
-    assert(c);
-
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop=
 && c->protocol->core->mainloop->defer_enable);
+    pa_assert(c);
+
     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
 =

     connection_write_prepare(c, length);
 =

-    assert(c->write_data);
+    pa_assert(c->write_data);
 =

     i =3D c->write_data_length;
     c->write_data_length +=3D length;
 =

-    memcpy((char*)c->write_data + i, data, length);
+    memcpy((uint8_t*) c->write_data + i, data, length);
 }
 =

 static void format_esd2native(int format, int swap_bytes, pa_sample_spec *=
ss) {
-    assert(ss);
+    pa_assert(ss);
 =

     ss->channels =3D ((format & ESD_MASK_CHAN) =3D=3D ESD_STEREO) ? 2 : 1;
     if ((format & ESD_MASK_BITS) =3D=3D ESD_BITS16)
@@ -289,11 +329,13 @@
 =

 /*** esound commands ***/
 =

-static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto=
_t request, const void *data, size_t length) {
+static int esd_proto_connect(connection *c, PA_GCC_UNUSED esd_proto_t requ=
est, const void *data, size_t length) {
     uint32_t ekey;
     int ok;
 =

-    assert(length =3D=3D (ESD_KEY_LEN + sizeof(uint32_t)));
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D (ESD_KEY_LEN + sizeof(uint32_t)));
 =

     if (!c->authorized) {
         if (memcmp(data, c->protocol->esd_key, ESD_KEY_LEN) !=3D 0) {
@@ -316,7 +358,7 @@
     else if (ekey =3D=3D ESD_SWAP_ENDIAN_KEY)
         c->swap_byte_order =3D 1;
     else {
-        pa_log("client sent invalid endian key");
+        pa_log_warn("Client sent invalid endian key");
         return -1;
     }
 =

@@ -325,7 +367,7 @@
     return 0;
 }
 =

-static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_p=
roto_t request, const void *data, size_t length) {
+static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t =
request, const void *data, size_t length) {
     char name[ESD_NAME_MAX], *utf8_name;
     int32_t format, rate;
     pa_sample_spec ss;
@@ -333,15 +375,17 @@
     pa_sink *sink =3D NULL;
     pa_sink_input_new_data sdata;
 =

-    assert(c && length =3D=3D (sizeof(int32_t)*2+ESD_NAME_MAX));
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D (sizeof(int32_t)*2+ESD_NAME_MAX));
 =

     memcpy(&format, data, sizeof(int32_t));
     format =3D MAYBE_INT32_SWAP(c->swap_byte_order, format);
-    data =3D (const char*)data + sizeof(int32_t);
+    data =3D (const char*) data + sizeof(int32_t);
 =

     memcpy(&rate, data, sizeof(int32_t));
     rate =3D MAYBE_INT32_SWAP(c->swap_byte_order, rate);
-    data =3D (const char*)data + sizeof(int32_t);
+    data =3D (const char*) data + sizeof(int32_t);
 =

     ss.rate =3D rate;
     format_esd2native(format, c->swap_byte_order, &ss);
@@ -362,7 +406,7 @@
 =

     c->original_name =3D pa_xstrdup(name);
 =

-    assert(!c->sink_input && !c->input_memblockq);
+    pa_assert(!c->sink_input && !c->input_memblockq);
 =

     pa_sink_input_new_data_init(&sdata);
     sdata.sink =3D sink;
@@ -385,22 +429,26 @@
             l/PLAYBACK_BUFFER_FRAGMENTS,
             NULL);
     pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2);
-    c->playback.fragment_size =3D l/10;
-
+    c->playback.fragment_size =3D l/PLAYBACK_BUFFER_FRAGMENTS;
+
+    c->sink_input->parent.process_msg =3D sink_input_process_msg;
     c->sink_input->peek =3D sink_input_peek_cb;
     c->sink_input->drop =3D sink_input_drop_cb;
     c->sink_input->kill =3D sink_input_kill_cb;
-    c->sink_input->get_latency =3D sink_input_get_latency_cb;
     c->sink_input->userdata =3D c;
 =

     c->state =3D ESD_STREAMING_DATA;
 =

     c->protocol->n_player++;
 =

+    pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_me=
mblockq));
+
+    pa_sink_input_put(c->sink_input);
+    =

     return 0;
 }
 =

-static int esd_proto_stream_record(struct connection *c, esd_proto_t reque=
st, const void *data, size_t length) {
+static int esd_proto_stream_record(connection *c, esd_proto_t request, con=
st void *data, size_t length) {
     char name[ESD_NAME_MAX], *utf8_name;
     int32_t format, rate;
     pa_source *source =3D NULL;
@@ -408,15 +456,17 @@
     size_t l;
     pa_source_output_new_data sdata;
 =

-    assert(c && length =3D=3D (sizeof(int32_t)*2+ESD_NAME_MAX));
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D (sizeof(int32_t)*2+ESD_NAME_MAX));
 =

     memcpy(&format, data, sizeof(int32_t));
     format =3D MAYBE_INT32_SWAP(c->swap_byte_order, format);
-    data =3D (const char*)data + sizeof(int32_t);
+    data =3D (const char*) data + sizeof(int32_t);
 =

     memcpy(&rate, data, sizeof(int32_t));
     rate =3D MAYBE_INT32_SWAP(c->swap_byte_order, rate);
-    data =3D (const char*)data + sizeof(int32_t);
+    data =3D (const char*) data + sizeof(int32_t);
 =

     ss.rate =3D rate;
     format_esd2native(format, c->swap_byte_order, &ss);
@@ -436,7 +486,7 @@
             return -1;
         }
     } else {
-        assert(request =3D=3D ESD_PROTO_STREAM_REC);
+        pa_assert(request =3D=3D ESD_PROTO_STREAM_REC);
 =

         if (c->protocol->source_name) {
             if (!(source =3D pa_namereg_get(c->protocol->core, c->protocol=
->source_name, PA_NAMEREG_SOURCE, 1))) {
@@ -455,7 +505,7 @@
 =

     c->original_name =3D pa_xstrdup(name);
 =

-    assert(!c->output_memblockq && !c->source_output);
+    pa_assert(!c->output_memblockq && !c->source_output);
 =

     pa_source_output_new_data_init(&sdata);
     sdata.source =3D source;
@@ -488,14 +538,18 @@
 =

     c->protocol->n_player++;
 =

+    pa_source_output_put(c->source_output);
+
     return 0;
 }
 =

-static int esd_proto_get_latency(struct connection *c, PA_GCC_UNUSED esd_p=
roto_t request, const void *data, size_t length) {
+static int esd_proto_get_latency(connection *c, PA_GCC_UNUSED esd_proto_t =
request, const void *data, size_t length) {
     pa_sink *sink;
     int32_t latency;
 =

-    assert(c && !data && length =3D=3D 0);
+    connection_ref(c);
+    pa_assert(!data);
+    pa_assert(length =3D=3D 0);
 =

     if (!(sink =3D pa_namereg_get(c->protocol->core, c->protocol->sink_nam=
e, PA_NAMEREG_SINK, 1)))
         latency =3D 0;
@@ -509,12 +563,14 @@
     return 0;
 }
 =

-static int esd_proto_server_info(struct connection *c, PA_GCC_UNUSED esd_p=
roto_t request, const void *data, size_t length) {
+static int esd_proto_server_info(connection *c, PA_GCC_UNUSED esd_proto_t =
request, const void *data, size_t length) {
     int32_t rate =3D 44100, format =3D ESD_STEREO|ESD_BITS16;
     int32_t response;
     pa_sink *sink;
 =

-    assert(c && data && length =3D=3D sizeof(int32_t));
+    connection_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D sizeof(int32_t));
 =

     if ((sink =3D pa_namereg_get(c->protocol->core, c->protocol->sink_name=
, PA_NAMEREG_SINK, 1))) {
         rate =3D sink->sample_spec.rate;
@@ -533,14 +589,16 @@
     return 0;
 }
 =

-static int esd_proto_all_info(struct connection *c, esd_proto_t request, c=
onst void *data, size_t length) {
+static int esd_proto_all_info(connection *c, esd_proto_t request, const vo=
id *data, size_t length) {
     size_t t, k, s;
-    struct connection *conn;
+    connection *conn;
     uint32_t idx =3D PA_IDXSET_INVALID;
     unsigned nsamples;
     char terminator[sizeof(int32_t)*6+ESD_NAME_MAX];
 =

-    assert(c && data && length =3D=3D sizeof(int32_t));
+    connection_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D sizeof(int32_t));
 =

     if (esd_proto_server_info(c, request, data, length) < 0)
         return -1;
@@ -561,7 +619,7 @@
         if (conn->state !=3D ESD_STREAMING_DATA)
             continue;
 =

-        assert(t >=3D k*2+s);
+        pa_assert(t >=3D k*2+s);
 =

         if (conn->sink_input) {
             pa_cvolume volume =3D *pa_sink_input_get_volume(conn->sink_inp=
ut);
@@ -602,7 +660,7 @@
         t -=3D k;
     }
 =

-    assert(t =3D=3D s*(nsamples+1)+k);
+    pa_assert(t =3D=3D s*(nsamples+1)+k);
     t -=3D k;
 =

     connection_write(c, terminator, k);
@@ -615,7 +673,7 @@
             int32_t id, rate, lvolume, rvolume, format, len;
             char name[ESD_NAME_MAX];
 =

-            assert(t >=3D s*2);
+            pa_assert(t >=3D s*2);
 =

             /* id */
             id =3D MAYBE_INT32_SWAP(c->swap_byte_order, (int) (ce->index+1=
));
@@ -653,19 +711,21 @@
         }
     }
 =

-    assert(t =3D=3D s);
+    pa_assert(t =3D=3D s);
 =

     connection_write(c, terminator, s);
 =

     return 0;
 }
 =

-static int esd_proto_stream_pan(struct connection *c, PA_GCC_UNUSED esd_pr=
oto_t request, const void *data, size_t length) {
+static int esd_proto_stream_pan(connection *c, PA_GCC_UNUSED esd_proto_t r=
equest, const void *data, size_t length) {
     int32_t ok;
     uint32_t idx, lvolume, rvolume;
-    struct connection *conn;
-
-    assert(c && data && length =3D=3D sizeof(int32_t)*3);
+    connection *conn;
+
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D sizeof(int32_t)*3);
 =

     memcpy(&idx, data, sizeof(uint32_t));
     idx =3D MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
@@ -694,13 +754,15 @@
     return 0;
 }
 =

-static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_=
proto_t request, const void *data, size_t length) {
+static int esd_proto_sample_cache(connection *c, PA_GCC_UNUSED esd_proto_t=
 request, const void *data, size_t length) {
     pa_sample_spec ss;
     int32_t format, rate, sc_length;
     uint32_t idx;
     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
 =

-    assert(c && data && length =3D=3D (ESD_NAME_MAX+3*sizeof(int32_t)));
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D (ESD_NAME_MAX+3*sizeof(int32_t)));
 =

     memcpy(&format, data, sizeof(int32_t));
     format =3D MAYBE_INT32_SWAP(c->swap_byte_order, format);
@@ -727,12 +789,12 @@
 =

     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
 =

-    assert(!c->scache.memchunk.memblock);
+    pa_assert(!c->scache.memchunk.memblock);
     c->scache.memchunk.memblock =3D pa_memblock_new(c->protocol->core->mem=
pool, sc_length);
     c->scache.memchunk.index =3D 0;
     c->scache.memchunk.length =3D sc_length;
     c->scache.sample_spec =3D ss;
-    assert(!c->scache.name);
+    pa_assert(!c->scache.name);
     c->scache.name =3D pa_xstrdup(name);
 =

     c->state =3D ESD_CACHING_SAMPLE;
@@ -745,12 +807,14 @@
     return 0;
 }
 =

-static int esd_proto_sample_get_id(struct connection *c, PA_GCC_UNUSED esd=
_proto_t request, const void *data, size_t length) {
+static int esd_proto_sample_get_id(connection *c, PA_GCC_UNUSED esd_proto_=
t request, const void *data, size_t length) {
     int32_t ok;
     uint32_t idx;
     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
 =

-    assert(c && data && length =3D=3D ESD_NAME_MAX);
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D ESD_NAME_MAX);
 =

     strcpy(name, SCACHE_PREFIX);
     strncpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
@@ -767,12 +831,14 @@
     return 0;
 }
 =

-static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t=
 request, const void *data, size_t length) {
+static int esd_proto_sample_free_or_play(connection *c, esd_proto_t reques=
t, const void *data, size_t length) {
     int32_t ok;
     const char *name;
     uint32_t idx;
 =

-    assert(c && data && length =3D=3D sizeof(int32_t));
+    connection_assert_ref(c);
+    pa_assert(data);
+    pa_assert(length =3D=3D sizeof(int32_t));
 =

     memcpy(&idx, data, sizeof(uint32_t));
     idx =3D MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
@@ -787,7 +853,7 @@
                 if (pa_scache_play_item(c->protocol->core, name, sink, PA_=
VOLUME_NORM) >=3D 0)
                     ok =3D idx + 1;
         } else {
-            assert(request =3D=3D ESD_PROTO_SAMPLE_FREE);
+            pa_assert(request =3D=3D ESD_PROTO_SAMPLE_FREE);
 =

             if (pa_scache_remove_item(c->protocol->core, name) >=3D 0)
                 ok =3D idx + 1;
@@ -799,8 +865,10 @@
     return 0;
 }
 =

-static int esd_proto_standby_or_resume(struct connection *c, PA_GCC_UNUSED=
 esd_proto_t request, PA_GCC_UNUSED const void *data, PA_GCC_UNUSED size_t =
length) {
+static int esd_proto_standby_or_resume(connection *c, PA_GCC_UNUSED esd_pr=
oto_t request, PA_GCC_UNUSED const void *data, PA_GCC_UNUSED size_t length)=
 {
     int32_t ok;
+
+    connection_assert_ref(c);
 =

     connection_write_prepare(c, sizeof(int32_t) * 2);
 =

@@ -814,20 +882,21 @@
 /*** client callbacks ***/
 =

 static void client_kill_cb(pa_client *c) {
-    assert(c && c->userdata);
-    connection_free(c->userdata);
+    pa_assert(c);
+    =

+    connection_unlink(CONNECTION(c->userdata));
 }
 =

 /*** pa_iochannel callbacks ***/
 =

-static int do_read(struct connection *c) {
-    assert(c && c->io);
+static int do_read(connection *c) {
+    connection_assert_ref(c);
 =

 /*      pa_log("READ");  */
 =

     if (c->state =3D=3D ESD_NEXT_REQUEST) {
         ssize_t r;
-        assert(c->read_data_length < sizeof(c->request));
+        pa_assert(c->read_data_length < sizeof(c->request));
 =

         if ((r =3D pa_iochannel_read(c->io, ((uint8_t*) &c->request) + c->=
read_data_length, sizeof(c->request) - c->read_data_length)) <=3D 0) {
             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"=
);
@@ -862,7 +931,7 @@
             } else {
                 if (c->read_data_alloc < handler->data_length)
                     c->read_data =3D pa_xrealloc(c->read_data, c->read_dat=
a_alloc =3D handler->data_length);
-                assert(c->read_data);
+                pa_assert(c->read_data);
 =

                 c->state =3D ESD_NEEDS_REQDATA;
                 c->read_data_length =3D 0;
@@ -873,18 +942,21 @@
         ssize_t r;
         struct proto_handler *handler =3D proto_map+c->request;
 =

-        assert(handler->proc);
-
-        assert(c->read_data && c->read_data_length < handler->data_length);
+        pa_assert(handler->proc);
+
+        pa_assert(c->read_data && c->read_data_length < handler->data_leng=
th);
 =

         if ((r =3D pa_iochannel_read(c->io, (uint8_t*) c->read_data + c->r=
ead_data_length, handler->data_length - c->read_data_length)) <=3D 0) {
+            if (errno =3D=3D EINTR || errno =3D=3D EAGAIN)
+                return 0;
+            =

             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"=
);
             return -1;
         }
 =

         if ((c->read_data_length +=3D r) >=3D handler->data_length) {
             size_t l =3D c->read_data_length;
-            assert(handler->proc);
+            pa_assert(handler->proc);
 =

             c->state =3D ESD_NEXT_REQUEST;
             c->read_data_length =3D 0;
@@ -896,22 +968,24 @@
         ssize_t r;
         void *p;
 =

-        assert(c->scache.memchunk.memblock);
-        assert(c->scache.name);
-        assert(c->scache.memchunk.index < c->scache.memchunk.length);
+        pa_assert(c->scache.memchunk.memblock);
+        pa_assert(c->scache.name);
+        pa_assert(c->scache.memchunk.index < c->scache.memchunk.length);
 =

         p =3D pa_memblock_acquire(c->scache.memchunk.memblock);
-
-        if ((r =3D pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchun=
k.index, c->scache.memchunk.length-c->scache.memchunk.index)) <=3D 0) {
-            pa_memblock_release(c->scache.memchunk.memblock);
+        r =3D pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.ind=
ex, c->scache.memchunk.length-c->scache.memchunk.index);
+        pa_memblock_release(c->scache.memchunk.memblock);
+        =

+        if (r <=3D 0) {
+            if (errno =3D=3D EINTR || errno =3D=3D EAGAIN)
+                return 0;
+            =

             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"=
);
             return -1;
         }
 =

-        pa_memblock_release(c->scache.memchunk.memblock);
-
         c->scache.memchunk.index +=3D r;
-        assert(c->scache.memchunk.index <=3D c->scache.memchunk.length);
+        pa_assert(c->scache.memchunk.index <=3D c->scache.memchunk.length);
 =

         if (c->scache.memchunk.index =3D=3D c->scache.memchunk.length) {
             uint32_t idx;
@@ -938,11 +1012,11 @@
         size_t l;
         void *p;
 =

-        assert(c->input_memblockq);
+        pa_assert(c->input_memblockq);
 =

 /*         pa_log("STREAMING_DATA"); */
 =

-        if (!(l =3D pa_memblockq_missing(c->input_memblockq)))
+        if (!(l =3D pa_atomic_load(&c->playback.missing)))
             return 0;
 =

         if (l > c->playback.fragment_size)
@@ -956,47 +1030,50 @@
             }
 =

         if (!c->playback.current_memblock) {
-            c->playback.current_memblock =3D pa_memblock_new(c->protocol->=
core->mempool, c->playback.fragment_size*2);
-            assert(c->playback.current_memblock);
-            assert(pa_memblock_get_length(c->playback.current_memblock) >=
=3D l);
+            pa_assert_se(c->playback.current_memblock =3D pa_memblock_new(=
c->protocol->core->mempool, c->playback.fragment_size*2));
             c->playback.memblock_index =3D 0;
         }
 =

         p =3D pa_memblock_acquire(c->playback.current_memblock);
-
-        if ((r =3D pa_iochannel_read(c->io, (uint8_t*) p+c->playback.membl=
ock_index, l)) <=3D 0) {
-            pa_memblock_release(c->playback.current_memblock);
+        r =3D pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_i=
ndex, l);
+        pa_memblock_release(c->playback.current_memblock);
+        =

+        if (r <=3D 0) {
+            =

+            if (errno =3D=3D EINTR || errno =3D=3D EAGAIN)
+                return 0;
+
             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"=
);
             return -1;
         }
-        pa_memblock_release(c->playback.current_memblock);
 =

         chunk.memblock =3D c->playback.current_memblock;
         chunk.index =3D c->playback.memblock_index;
         chunk.length =3D r;
-        assert(chunk.memblock);
 =

         c->playback.memblock_index +=3D r;
 =

-        assert(c->input_memblockq);
-        pa_memblockq_push_align(c->input_memblockq, &chunk);
-        assert(c->sink_input);
-        pa_sink_notify(c->sink_input->sink);
+        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->=
sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
+        pa_atomic_sub(&c->playback.missing, r);
     }
 =

     return 0;
 }
 =

-static int do_write(struct connection *c) {
-    assert(c && c->io);
+static int do_write(connection *c) {
+    connection_assert_ref(c);
 =

 /*     pa_log("WRITE"); */
 =

     if (c->write_data_length) {
         ssize_t r;
 =

-        assert(c->write_data_index < c->write_data_length);
+        pa_assert(c->write_data_index < c->write_data_length);
         if ((r =3D pa_iochannel_write(c->io, (uint8_t*) c->write_data+c->w=
rite_data_index, c->write_data_length-c->write_data_index)) < 0) {
+
+            if (errno =3D=3D EINTR || errno =3D=3D EAGAIN)
+                return 0;
+            =

             pa_log("write(): %s", pa_cstrerror(errno));
             return -1;
         }
@@ -1009,37 +1086,36 @@
         ssize_t r;
         void *p;
 =

-        assert(c->output_memblockq);
         if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
             return 0;
 =

-        assert(chunk.memblock);
-        assert(chunk.length);
+        pa_assert(chunk.memblock);
+        pa_assert(chunk.length);
 =

         p =3D pa_memblock_acquire(chunk.memblock);
-
-        if ((r =3D pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chu=
nk.length)) < 0) {
-            pa_memblock_release(chunk.memblock);
-            pa_memblock_unref(chunk.memblock);
+        r =3D pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.le=
ngth);
+        pa_memblock_release(chunk.memblock);
+
+        pa_memblock_unref(chunk.memblock);
+        =

+        if (r < 0) {
+
+            if (errno =3D=3D EINTR || errno =3D=3D EAGAIN)
+                return 0;
+        =

             pa_log("write(): %s", pa_cstrerror(errno));
             return -1;
         }
 =

-        pa_memblock_release(chunk.memblock);
-
-        pa_memblockq_drop(c->output_memblockq, &chunk, r);
-        pa_memblock_unref(chunk.memblock);
-
-        pa_source_notify(c->source_output->source);
+        pa_memblockq_drop(c->output_memblockq, r);
     }
 =

     return 0;
 }
 =

-static void do_work(struct connection *c) {
-    assert(c);
-
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop=
 && c->protocol->core->mainloop->defer_enable);
+static void do_work(connection *c) {
+    connection_assert_ref(c);
+
     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
 =

     if (c->dead)
@@ -1070,117 +1146,188 @@
         pa_iochannel_free(c->io);
         c->io =3D NULL;
 =

-        pa_memblockq_prebuf_disable(c->input_memblockq);
-        pa_sink_notify(c->sink_input->sink);
+        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->=
sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
     } else
-        connection_free(c);
+        connection_unlink(c);
 }
 =

 static void io_callback(pa_iochannel*io, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(io && c && c->io =3D=3D io);
+    connection *c =3D CONNECTION(userdata);
+
+    connection_assert_ref(c);
+    pa_assert(io);
 =

     do_work(c);
 }
 =

-/*** defer callback ***/
-
 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *use=
rdata) {
-    struct connection *c =3D userdata;
-    assert(a && c && c->defer_event =3D=3D e);
-
-/*     pa_log("DEFER"); */
+    connection *c =3D CONNECTION(userdata);
+
+    connection_assert_ref(c);
+    pa_assert(e);
 =

     do_work(c);
 }
 =

+static int connection_process_msg(pa_msgobject *o, int code, void*userdata=
, int64_t offset, pa_memchunk *chunk) {
+    connection *c =3D CONNECTION(o);
+    connection_assert_ref(c);
+
+    switch (code) {
+        case CONNECTION_MESSAGE_REQUEST_DATA:
+            do_work(c);
+            break;
+            =

+        case CONNECTION_MESSAGE_POST_DATA:
+/*             pa_log("got data %u", chunk->length); */
+            pa_memblockq_push_align(c->output_memblockq, chunk);
+            do_work(c);
+            break;
+
+        case CONNECTION_MESSAGE_UNLINK_CONNECTION:
+            connection_unlink(c);
+            break;
+    }
+
+    return 0;
+}
+
 /*** sink_input callbacks ***/
 =

+static int sink_input_process_msg(pa_msgobject *o, int code, void *userdat=
a, int64_t offset, pa_memchunk *chunk) {
+    pa_sink_input *i =3D PA_SINK_INPUT(o);
+    connection*c;
+
+    pa_sink_input_assert_ref(i);
+    c =3D CONNECTION(i->userdata);
+    connection_assert_ref(c);
+
+    switch (code) {
+
+        case SINK_INPUT_MESSAGE_POST_DATA: {
+            pa_assert(chunk);
+
+            /* New data from the main loop */
+            pa_memblockq_push_align(c->input_memblockq, chunk);
+
+/*             pa_log("got data, %u", pa_memblockq_get_length(c->input_mem=
blockq)); */
+            =

+            return 0;
+        }
+
+        case SINK_INPUT_MESSAGE_DISABLE_PREBUF: {
+            pa_memblockq_prebuf_disable(c->input_memblockq);
+            return 0;
+        }
+
+        case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
+            pa_usec_t *r =3D userdata;
+
+            *r =3D pa_bytes_to_usec(pa_memblockq_get_length(c->input_membl=
ockq), &c->sink_input->sample_spec);
+
+            /* Fall through, the default handler will add in the extra
+             * latency added by the resampler */
+        }
+
+        default:
+            return pa_sink_input_process_msg(o, code, userdata, offset, ch=
unk);
+    }
+}
+
+
 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
-    struct connection*c;
-    assert(i && i->userdata && chunk);
-    c =3D i->userdata;
-
-    if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
-
-        if (c->dead)
-            connection_free(c);
-
-        return -1;
-    }
-
-    return 0;
-}
-
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk,=
 size_t length) {
-    struct connection*c =3D i->userdata;
-    assert(i && c && length);
-
-/*     pa_log("DROP"); */
-
-    pa_memblockq_drop(c->input_memblockq, chunk, length);
-
-    /* do something */
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop=
 && c->protocol->core->mainloop->defer_enable);
-
-    if (!c->dead)
-        c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
-
-/*     assert(pa_memblockq_get_length(c->input_memblockq) > 2048); */
+    connection*c;
+    int r;
+    =

+    pa_assert(i);
+    c =3D CONNECTION(i->userdata);
+    connection_assert_ref(c);
+    pa_assert(chunk);
+
+    if ((r =3D pa_memblockq_peek(c->input_memblockq, chunk)) < 0 && c->dea=
d)
+        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), C=
ONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
+
+    return r;
+}
+
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
+    connection*c;
+    size_t old, new;
+
+    pa_assert(i);
+    c =3D CONNECTION(i->userdata);
+    connection_assert_ref(c);
+    pa_assert(length);
+
+    /*     pa_log("DROP"); */
+
+    old =3D pa_memblockq_missing(c->input_memblockq);
+    pa_memblockq_drop(c->input_memblockq, length);
+    new =3D pa_memblockq_missing(c->input_memblockq);
+
+    if (new > old) {
+        if (pa_atomic_add(&c->playback.missing, new - old) <=3D 0)
+            pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c=
), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
+    }
 }
 =

 static void sink_input_kill_cb(pa_sink_input *i) {
-    assert(i && i->userdata);
-    connection_free((struct connection *) i->userdata);
-}
-
-static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
-    struct connection*c =3D i->userdata;
-    assert(i && c);
-    return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &=
c->sink_input->sample_spec);
+    pa_sink_input_assert_ref(i);
+
+    connection_unlink(CONNECTION(i->userdata));
 }
 =

 /*** source_output callbacks ***/
 =

 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *=
chunk) {
-    struct connection *c =3D o->userdata;
-    assert(o && c && chunk);
-
-    pa_memblockq_push(c->output_memblockq, chunk);
-
-    /* do something */
-    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop=
 && c->protocol->core->mainloop->defer_enable);
-
-    if (!c->dead)
-        c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
+    connection *c;
+
+    pa_assert(o);
+    c =3D CONNECTION(o->userdata);
+    pa_assert(c);
+    pa_assert(chunk);
+
+    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNE=
CTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
 }
 =

 static void source_output_kill_cb(pa_source_output *o) {
-    assert(o && o->userdata);
-    connection_free((struct connection *) o->userdata);
+    pa_source_output_assert_ref(o);
+
+    connection_unlink(CONNECTION(o->userdata));
 }
 =

 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
-    struct connection*c =3D o->userdata;
-    assert(o && c);
+    connection*c;
+
+    pa_assert(o);
+    c =3D CONNECTION(o->userdata);
+    pa_assert(c);
+
     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), =
&c->source_output->sample_spec);
 }
 =

 /*** socket server callback ***/
 =

 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct=
 timeval *tv, void *userdata) {
-    struct connection *c =3D userdata;
-    assert(m && tv && c && c->auth_timeout_event =3D=3D e);
+    connection *c =3D CONNECTION(userdata);
+    =

+    pa_assert(m);
+    pa_assert(tv);
+    connection_assert_ref(c);
+    pa_assert(c->auth_timeout_event =3D=3D e);
 =

     if (!c->authorized)
-        connection_free(c);
+        connection_unlink(c);
 }
 =

 static void on_connection(pa_socket_server*s, pa_iochannel *io, void *user=
data) {
-    struct connection *c;
+    connection *c;
     pa_protocol_esound *p =3D userdata;
     char cname[256], pname[128];
-    assert(s && io && p);
+    =

+    pa_assert(s);
+    pa_assert(io);
+    pa_assert(p);
 =

     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
         pa_log("Warning! Too many connections (%u), dropping incoming conn=
ection.", MAX_CONNECTIONS);
@@ -1188,16 +1335,16 @@
         return;
     }
 =

-    c =3D pa_xnew(struct connection, 1);
+    c =3D pa_msgobject_new(connection);
+    c->parent.parent.free =3D connection_free;
+    c->parent.process_msg =3D connection_process_msg;
     c->protocol =3D p;
     c->io =3D io;
     pa_iochannel_set_callback(c->io, io_callback, c);
 =

     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
     pa_snprintf(cname, sizeof(cname), "EsounD client (%s)", pname);
-    assert(p->core);
     c->client =3D pa_client_new(p->core, __FILE__, cname);
-    assert(c->client);
     c->client->owner =3D p->module;
     c->client->kill =3D client_kill_cb;
     c->client->userdata =3D c;
@@ -1224,6 +1371,7 @@
     c->playback.current_memblock =3D NULL;
     c->playback.memblock_index =3D 0;
     c->playback.fragment_size =3D 0;
+    pa_atomic_store(&c->playback.missing, 0);
 =

     c->scache.memchunk.length =3D c->scache.memchunk.index =3D 0;
     c->scache.memchunk.memblock =3D NULL;
@@ -1245,7 +1393,6 @@
         c->auth_timeout_event =3D NULL;
 =

     c->defer_event =3D p->core->mainloop->defer_new(p->core->mainloop, def=
er_callback, c);
-    assert(c->defer_event);
     p->core->mainloop->defer_enable(c->defer_event, 0);
 =

     pa_idxset_put(p->connections, c, &c->index);
@@ -1254,22 +1401,22 @@
 /*** entry points ***/
 =

 pa_protocol_esound* pa_protocol_esound_new(pa_core*core, pa_socket_server =
*server, pa_module *m, pa_modargs *ma) {
-    pa_protocol_esound *p;
+    pa_protocol_esound *p =3D NULL;
     int public =3D 0;
     const char *acl;
 =

-    assert(core);
-    assert(server);
-    assert(m);
-    assert(ma);
-
-    p =3D pa_xnew(pa_protocol_esound, 1);
+    pa_assert(core);
+    pa_assert(server);
+    pa_assert(m);
+    pa_assert(ma);
 =

     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &public) < 0) {
         pa_log("auth-anonymous=3D expects a boolean argument.");
         goto fail;
     }
 =

+    p =3D pa_xnew(pa_protocol_esound, 1);
+
     if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", DEFAULT_CO=
OKIE_FILE), p->esd_key, sizeof(p->esd_key)) < 0)
         goto fail;
 =

@@ -1282,13 +1429,12 @@
     } else
         p->auth_ip_acl =3D NULL;
 =

+    p->core =3D core;
     p->module =3D m;
     p->public =3D public;
     p->server =3D server;
     pa_socket_server_set_callback(p->server, on_connection, p);
-    p->core =3D core;
     p->connections =3D pa_idxset_new(NULL, NULL);
-    assert(p->connections);
 =

     p->sink_name =3D pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
     p->source_name =3D pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)=
);
@@ -1302,17 +1448,20 @@
 }
 =

 void pa_protocol_esound_free(pa_protocol_esound *p) {
-    struct connection *c;
-    assert(p);
+    connection *c;
+    pa_assert(p);
 =

     while ((c =3D pa_idxset_first(p->connections, NULL)))
-        connection_free(c);
-
+        connection_unlink(c);
     pa_idxset_free(p->connections, NULL, NULL);
+
     pa_socket_server_unref(p->server);
 =

     if (p->auth_ip_acl)
         pa_ip_acl_free(p->auth_ip_acl);
 =

+    pa_xfree(p->sink_name);
+    pa_xfree(p->source_name);
+
     pa_xfree(p);
 }




More information about the pulseaudio-commits mailing list