[pulseaudio-commits] 12 commits - PROTOCOL configure.ac src/Makefile.am src/modules src/pulse src/pulsecore src/tests
David Henningsson
diwic at kemper.freedesktop.org
Fri Jun 27 05:44:56 PDT 2014
PROTOCOL | 20 ++
configure.ac | 2
src/Makefile.am | 9 -
src/modules/module-protocol-stub.c | 6
src/modules/module-tunnel.c | 4
src/pulse/context.c | 112 ++++++++++++
src/pulse/internal.h | 3
src/pulsecore/core.c | 14 +
src/pulsecore/core.h | 5
src/pulsecore/creds.h | 13 +
src/pulsecore/iochannel.c | 88 +++++++++-
src/pulsecore/iochannel.h | 3
src/pulsecore/memblock.c | 40 ++++
src/pulsecore/memblock.h | 6
src/pulsecore/native-common.h | 5
src/pulsecore/pdispatch.c | 26 ++-
src/pulsecore/pdispatch.h | 4
src/pulsecore/protocol-native.c | 79 ++++++++-
src/pulsecore/protocol-native.h | 1
src/pulsecore/pstream-util.c | 31 +++
src/pulsecore/pstream-util.h | 1
src/pulsecore/pstream.c | 320 ++++++++++++++++++++++++-------------
src/pulsecore/pstream.h | 9 -
src/pulsecore/shm.c | 12 -
src/pulsecore/shm.h | 2
src/pulsecore/srbchannel.c | 305 +++++++++++++++++++++++++++++++++++
src/pulsecore/srbchannel.h | 62 +++++++
src/tests/memblock-test.c | 4
src/tests/srbchannel-test.c | 138 +++++++++++++++
29 files changed, 1169 insertions(+), 155 deletions(-)
New commits:
commit 1afec0e5a37012655c87b204e343d6548678e329
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri May 30 12:04:21 2014 +0200
tests: Add pstream/srbchannel test
Runs four tests:
1) Small packets, iochannel
2) Big packets, iochannel
3) Small packets, srbchannel
4) Big packets, srbchannel
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/Makefile.am b/src/Makefile.am
index e388bd2..44d9050 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -271,6 +271,7 @@ TESTS_daemon = \
if !OS_IS_WIN32
TESTS_default += \
+ srbchannel-test \
sigbus-test \
usergroup-test
endif
@@ -349,6 +350,11 @@ format_test_CFLAGS = $(AM_CFLAGS) $(LIBCHECK_CFLAGS)
format_test_LDADD = $(AM_LDADD) libpulsecore- at PA_MAJORMINOR@.la libpulse.la libpulsecommon- at PA_MAJORMINOR@.la
format_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS) $(LIBCHECK_LIBS)
+srbchannel_test_SOURCES = tests/srbchannel-test.c
+srbchannel_test_CFLAGS = $(AM_CFLAGS) $(LIBCHECK_CFLAGS)
+srbchannel_test_LDADD = $(AM_LDADD) libpulse.la libpulsecommon- at PA_MAJORMINOR@.la
+srbchannel_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS) $(LIBCHECK_LIBS)
+
get_binary_name_test_SOURCES = tests/get-binary-name-test.c
get_binary_name_test_CFLAGS = $(AM_CFLAGS) $(LIBCHECK_CFLAGS)
get_binary_name_test_LDADD = $(AM_LDADD) libpulse.la libpulsecommon- at PA_MAJORMINOR@.la
diff --git a/src/tests/srbchannel-test.c b/src/tests/srbchannel-test.c
new file mode 100644
index 0000000..78a401b
--- /dev/null
+++ b/src/tests/srbchannel-test.c
@@ -0,0 +1,138 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2014 David Henningsson, Canonical Ltd.
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <unistd.h>
+#include <check.h>
+
+#include <pulse/mainloop.h>
+#include <pulsecore/packet.h>
+#include <pulsecore/pstream.h>
+#include <pulsecore/iochannel.h>
+#include <pulsecore/memblock.h>
+
+int packets_received;
+int packets_checksum;
+int packets_length;
+
+static void packet_received(pa_pstream *p, pa_packet *packet, const pa_ancil *ancil, void *userdata) {
+ unsigned int i;
+ fail_unless(packets_length == (int) packet->length);
+ packets_received++;
+ for (i = 0; i < packet->length; i++)
+ packets_checksum += packet->data[i];
+}
+
+static void packet_test(int npackets, int plength, pa_mainloop *ml, pa_pstream *p1, pa_pstream *p2) {
+ pa_packet *packet = pa_packet_new(plength);
+ int i;
+ int psum = 0, totalsum = 0;
+ pa_log_info("Sending %d packets of length %d", npackets, plength);
+ packets_received = 0;
+ packets_checksum = 0;
+ packets_length = plength;
+ pa_pstream_set_receive_packet_callback(p2, packet_received, NULL);
+
+ for (i = 0; i < plength; i++) {
+ packet->data[i] = i;
+ psum += packet->data[i];
+ }
+
+ for (i = 0; i < npackets; i++) {
+ pa_pstream_send_packet(p1, packet, NULL);
+ totalsum += psum;
+ pa_mainloop_iterate(ml, 0, NULL);
+ }
+
+ while (packets_received < npackets)
+ pa_mainloop_iterate(ml, 1, NULL);
+
+ fail_unless(packets_checksum == totalsum);
+ pa_log_debug("Correct checksum received (%d)", packets_checksum);
+ pa_packet_unref(packet);
+}
+
+START_TEST (srbchannel_test) {
+
+ int pipefd[4];
+
+ pa_mainloop *ml = pa_mainloop_new();
+ pa_mempool *mp = pa_mempool_new(true, 0);
+ pa_iochannel *io1, *io2;
+ pa_pstream *p1, *p2;
+ pa_srbchannel *sr1, *sr2;
+ pa_srbchannel_template srt;
+
+ fail_unless(pipe(pipefd) == 0);
+ fail_unless(pipe(&pipefd[2]) == 0);
+ io1 = pa_iochannel_new(pa_mainloop_get_api(ml), pipefd[2], pipefd[1]);
+ io2 = pa_iochannel_new(pa_mainloop_get_api(ml), pipefd[0], pipefd[3]);
+ p1 = pa_pstream_new(pa_mainloop_get_api(ml), io1, mp);
+ p2 = pa_pstream_new(pa_mainloop_get_api(ml), io2, mp);
+
+ pa_log_debug("Pipes: fd %d -> %d, %d -> %d", pipefd[1], pipefd[0], pipefd[3], pipefd[2]);
+
+ packet_test(250, 5, ml, p1, p2);
+ packet_test(10, 1234567, ml, p1, p2);
+
+ pa_log_debug("And now the same thing with srbchannel...");
+
+ sr1 = pa_srbchannel_new(pa_mainloop_get_api(ml), mp);
+ pa_srbchannel_export(sr1, &srt);
+ pa_pstream_set_srbchannel(p1, sr1);
+ sr2 = pa_srbchannel_new_from_template(pa_mainloop_get_api(ml), &srt);
+ pa_pstream_set_srbchannel(p2, sr2);
+
+ packet_test(250, 5, ml, p1, p2);
+ packet_test(10, 1234567, ml, p1, p2);
+
+ pa_pstream_unref(p1);
+ pa_pstream_unref(p2);
+ pa_mempool_free(mp);
+ pa_mainloop_free(ml);
+}
+END_TEST
+
+
+int main(int argc, char *argv[]) {
+ int failed = 0;
+ Suite *s;
+ TCase *tc;
+ SRunner *sr;
+
+ if (!getenv("MAKE_CHECK"))
+ pa_log_set_level(PA_LOG_DEBUG);
+
+ s = suite_create("srbchannel");
+ tc = tcase_create("srbchannel");
+ tcase_add_test(tc, srbchannel_test);
+ suite_add_tcase(s, tc);
+
+ sr = srunner_create(s);
+ srunner_run_all(sr, CK_NORMAL);
+ failed = srunner_ntests_failed(sr);
+ srunner_free(sr);
+
+ return (failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}
commit a476371254eb78071db243f4fb9cecc0c52fabce
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri Apr 25 17:57:18 2014 +0200
protocol-native: Enable srbchannel
The srbchannel is enabled if protocol version >= 30 and
SHM is available. There is also a module parameter
srbchannel=false that can be used for disabling the srbchannel.
The setup is done in these steps:
1) Server receives authentication (like today)
2) Server sends enable_srbchannel to client
3) Server sends memblock to client
4) Client receives enable_srbchannel
5) Client receives memblock
6) Client sends enable_srbchannel back to server
7) Client switches over
8) Server receives enable_srbchannel and switches over
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/modules/module-protocol-stub.c b/src/modules/module-protocol-stub.c
index 3218a01..118351e 100644
--- a/src/modules/module-protocol-stub.c
+++ b/src/modules/module-protocol-stub.c
@@ -120,14 +120,17 @@
# endif
# if defined(HAVE_CREDS) && !defined(USE_TCP_SOCKETS)
-# define MODULE_ARGUMENTS MODULE_ARGUMENTS_COMMON "auth-group", "auth-group-enable",
+# define MODULE_ARGUMENTS MODULE_ARGUMENTS_COMMON "auth-group", "auth-group-enable", "srbchannel",
# define AUTH_USAGE "auth-group=<system group to allow access> auth-group-enable=<enable auth by UNIX group?> "
+# define SRB_USAGE "srbchannel=<enable shared ringbuffer communication channel?> "
# elif defined(USE_TCP_SOCKETS)
# define MODULE_ARGUMENTS MODULE_ARGUMENTS_COMMON "auth-ip-acl",
# define AUTH_USAGE "auth-ip-acl=<IP address ACL to allow access> "
+# define SRB_USAGE
# else
# define MODULE_ARGUMENTS MODULE_ARGUMENTS_COMMON
# define AUTH_USAGE
+# define SRB_USAGE
# endif
PA_MODULE_DESCRIPTION("Native protocol "SOCKET_DESCRIPTION);
@@ -135,6 +138,7 @@
"auth-cookie=<path to cookie file> "
"auth-cookie-enabled=<enable cookie authentication?> "
AUTH_USAGE
+ SRB_USAGE
SOCKET_USAGE);
#elif defined(USE_PROTOCOL_ESOUND)
# include <pulsecore/protocol-esound.h>
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index 86865ee..5f2c35d 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -181,6 +181,7 @@ struct pa_native_connection {
uint32_t rrobin_index;
pa_subscription *subscription;
pa_time_event *auth_timeout_event;
+ pa_srbchannel *srbpending;
};
#define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
@@ -294,6 +295,7 @@ static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag,
static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_port_latency_offset(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = NULL,
@@ -397,6 +399,8 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_SET_PORT_LATENCY_OFFSET] = command_set_port_latency_offset,
+ [PA_COMMAND_ENABLE_SRBCHANNEL] = command_enable_srbchannel,
+
[PA_COMMAND_EXTENSION] = command_extension
};
@@ -1327,6 +1331,9 @@ static void native_connection_unlink(pa_native_connection *c) {
if (c->options)
pa_native_options_unref(c->options);
+ if (c->srbpending)
+ pa_srbchannel_free(c->srbpending);
+
while ((r = pa_idxset_first(c->record_streams, NULL)))
record_stream_unlink(r);
@@ -2578,6 +2585,65 @@ static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_ta
pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
}
+static void setup_srbchannel(pa_native_connection *c) {
+ pa_srbchannel_template srbt;
+ pa_srbchannel *srb;
+ pa_memchunk mc;
+ pa_tagstruct *t;
+ int fdlist[2];
+
+ if (!c->options->srbchannel) {
+ pa_log_debug("Disabling srbchannel, reason: Disabled by module parameter");
+ return;
+ }
+
+ if (c->version < 30) {
+ pa_log_debug("Disabling srbchannel, reason: Protocol too old");
+ return;
+ }
+
+ if (!pa_pstream_get_shm(c->pstream)) {
+ pa_log_debug("Disabling srbchannel, reason: No SHM support");
+ return;
+ }
+
+ if (!c->protocol->core->rw_mempool) {
+ pa_log_debug("Disabling srbchannel, reason: No rw memory pool");
+ return;
+ }
+
+ pa_log_debug("Enabling srbchannel...");
+ srb = pa_srbchannel_new(c->protocol->core->mainloop, c->protocol->core->rw_mempool);
+ pa_srbchannel_export(srb, &srbt);
+
+ /* Send enable command to client */
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_ENABLE_SRBCHANNEL);
+ pa_tagstruct_putu32(t, (size_t) srb); /* tag */
+ fdlist[0] = srbt.readfd;
+ fdlist[1] = srbt.writefd;
+ pa_pstream_send_tagstruct_with_fds(c->pstream, t, 2, fdlist);
+
+ /* Send ringbuffer memblock to client */
+ mc.memblock = srbt.memblock;
+ mc.index = 0;
+ mc.length = pa_memblock_get_length(srbt.memblock);
+ pa_pstream_send_memblock(c->pstream, 0, 0, 0, &mc);
+
+ c->srbpending = srb;
+}
+
+static void command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
+
+ if (tag != (uint32_t) (size_t) c->srbpending)
+ protocol_error(c);
+
+ pa_log_debug("Client enabled srbchannel.");
+ pa_pstream_set_srbchannel(c->pstream, c->srbpending);
+ c->srbpending = NULL;
+}
+
static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
const void*cookie;
@@ -2709,6 +2775,8 @@ static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_ta
#else
pa_pstream_send_tagstruct(c->pstream, reply);
#endif
+
+ setup_srbchannel(c);
}
static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
@@ -5017,6 +5085,7 @@ void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_nati
c->protocol = p;
c->options = pa_native_options_ref(o);
c->authorized = false;
+ c->srbpending = NULL;
if (o->auth_anonymous) {
pa_log_info("Client authenticated anonymously.");
@@ -5247,6 +5316,12 @@ int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
pa_assert(PA_REFCNT_VALUE(o) >= 1);
pa_assert(ma);
+ o->srbchannel = true;
+ if (pa_modargs_get_value_boolean(ma, "srbchannel", &o->srbchannel) < 0) {
+ pa_log("srbchannel= expects a boolean argument.");
+ return -1;
+ }
+
if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
pa_log("auth-anonymous= expects a boolean argument.");
return -1;
diff --git a/src/pulsecore/protocol-native.h b/src/pulsecore/protocol-native.h
index 30b99f9..df30d44 100644
--- a/src/pulsecore/protocol-native.h
+++ b/src/pulsecore/protocol-native.h
@@ -44,6 +44,7 @@ typedef struct pa_native_options {
pa_module *module;
bool auth_anonymous;
+ bool srbchannel;
char *auth_group;
pa_ip_acl *auth_ip_acl;
pa_auth_cookie *auth_cookie;
commit 1827991548495f1b2c3f44fe45b51901109f791c
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri Apr 25 17:23:21 2014 +0200
Protocol, client: Add commands to enable srbchannel
This increments protocol version to v30 and adds two new commands
to enable and disable an shm ringbuffer, as well as client side
implementation.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/PROTOCOL b/PROTOCOL
index 850b953..3c08fea 100644
--- a/PROTOCOL
+++ b/PROTOCOL
@@ -351,6 +351,26 @@ New field in all commands that send/receive profile introspection data
The field is added once for every profile.
+## v30, implemented by >= 6.0
+#
+A new protocol mechanism supported: Two ringbuffers in shared memory.
+Pulseaudio fdsem (wrappers around event file descriptors) are used for
+signalling new data.
+The protocol has a new SHM flag telling whether a SHM memblock is writable
+by both sides.
+
+PA_COMMAND_ENABLE_SRBCHANNEL
+First sent from server to client, tells the client to start listening on
+the additional SHM ringbuffer channel.
+This command also has ancillary data (two eventfds attached to it).
+Must be directly followed by a memblock which is the ringbuffer memory.
+When memblock is received by the client, it acks by sending
+PA_COMMAND_ENABLE_SRBCHANNEL back (without ancillary or memblock data).
+
+PA_COMMAND_DISABLE_SRBCHANNEL
+Tells the client to stop listening on the additional SHM ringbuffer channel.
+Acked by client by sending PA_COMMAND_DISABLE_SRBCHANNEL back.
+
#### If you just changed the protocol, read this
## module-tunnel depends on the sink/source/sink-input/source-input protocol
## internals, so if you changed these, you might have broken module-tunnel.
diff --git a/configure.ac b/configure.ac
index 39bb5c3..837e81e 100644
--- a/configure.ac
+++ b/configure.ac
@@ -41,7 +41,7 @@ AC_SUBST(PA_MINOR, pa_minor)
AC_SUBST(PA_MAJORMINOR, pa_major.pa_minor)
AC_SUBST(PA_API_VERSION, 12)
-AC_SUBST(PA_PROTOCOL_VERSION, 29)
+AC_SUBST(PA_PROTOCOL_VERSION, 30)
# The stable ABI for client applications, for the version info x:y:z
# always will hold y=z
diff --git a/src/pulse/context.c b/src/pulse/context.c
index e1cd900..45ed344 100644
--- a/src/pulse/context.c
+++ b/src/pulse/context.c
@@ -69,6 +69,8 @@
#include "context.h"
void pa_command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void pa_command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void pa_command_disable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_REQUEST] = pa_command_request,
@@ -87,7 +89,9 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_RECORD_STREAM_EVENT] = pa_command_stream_event,
[PA_COMMAND_CLIENT_EVENT] = pa_command_client_event,
[PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = pa_command_stream_buffer_attr,
- [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = pa_command_stream_buffer_attr
+ [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = pa_command_stream_buffer_attr,
+ [PA_COMMAND_ENABLE_SRBCHANNEL] = pa_command_enable_srbchannel,
+ [PA_COMMAND_DISABLE_SRBCHANNEL] = pa_command_disable_srbchannel,
};
static void context_free(pa_context *c);
@@ -165,6 +169,9 @@ pa_context *pa_context_new_with_proplist(pa_mainloop_api *mainloop, const char *
c->conf = pa_client_conf_new();
pa_client_conf_load(c->conf, true, true);
+ c->srb_template.readfd = -1;
+ c->srb_template.writefd = -1;
+
if (!(c->mempool = pa_mempool_new(!c->conf->disable_shm, c->conf->shm_size))) {
if (!c->conf->disable_shm)
@@ -206,6 +213,11 @@ static void context_unlink(pa_context *c) {
c->pstream = NULL;
}
+ if (c->srb_template.memblock) {
+ pa_memblock_unref(c->srb_template.memblock);
+ c->srb_template.memblock = NULL;
+ }
+
if (c->client) {
pa_socket_client_unref(c->client);
c->client = NULL;
@@ -331,6 +343,35 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_a
pa_context_unref(c);
}
+static void handle_srbchannel_memblock(pa_context *c, pa_memblock *memblock) {
+ pa_srbchannel *sr;
+ pa_tagstruct *t;
+
+ pa_assert(c);
+
+ /* Memblock sanity check */
+ if (!memblock)
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ else if (pa_memblock_is_read_only(memblock))
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ else if (pa_memblock_is_ours(memblock))
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+
+ /* Create the srbchannel */
+ c->srb_template.memblock = memblock;
+ pa_memblock_ref(memblock);
+ sr = pa_srbchannel_new_from_template(c->mainloop, &c->srb_template);
+
+ /* Ack the enable command */
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_ENABLE_SRBCHANNEL);
+ pa_tagstruct_putu32(t, c->srb_setup_tag);
+ pa_pstream_send_tagstruct(c->pstream, t);
+
+ /* ...and switch over */
+ pa_pstream_set_srbchannel(c->pstream, sr);
+}
+
static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
pa_context *c = userdata;
pa_stream *s;
@@ -343,6 +384,12 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
pa_context_ref(c);
+ if (c->srb_template.readfd != -1 && c->srb_template.memblock == NULL) {
+ handle_srbchannel_memblock(c, chunk->memblock);
+ pa_context_unref(c);
+ return;
+ }
+
if ((s = pa_hashmap_get(c->record_streams, PA_UINT32_TO_PTR(channel)))) {
if (chunk->memblock) {
@@ -1362,6 +1409,65 @@ finish:
pa_context_unref(c);
}
+static void pa_command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_context *c = userdata;
+ const int *fds;
+ int nfd;
+
+ pa_assert(pd);
+ pa_assert(command == PA_COMMAND_ENABLE_SRBCHANNEL);
+ pa_assert(t);
+ pa_assert(c);
+ pa_assert(PA_REFCNT_VALUE(c) >= 1);
+
+ /* Currently only one srb channel is supported, might change in future versions */
+ if (c->srb_template.readfd != -1) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ return;
+ }
+
+ fds = pa_pdispatch_fds(pd, &nfd);
+ if (nfd != 2 || !fds || fds[0] == -1 || fds[1] == -1) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ return;
+ }
+
+ pa_context_ref(c);
+
+ c->srb_template.readfd = fds[0];
+ c->srb_template.writefd = fds[1];
+ c->srb_setup_tag = tag;
+
+ pa_context_unref(c);
+}
+
+static void pa_command_disable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_context *c = userdata;
+ pa_tagstruct *t2;
+
+ pa_assert(pd);
+ pa_assert(command == PA_COMMAND_DISABLE_SRBCHANNEL);
+ pa_assert(t);
+ pa_assert(c);
+ pa_assert(PA_REFCNT_VALUE(c) >= 1);
+
+ pa_pstream_set_srbchannel(c->pstream, NULL);
+
+ c->srb_template.readfd = -1;
+ c->srb_template.writefd = -1;
+ if (c->srb_template.memblock) {
+ pa_memblock_unref(c->srb_template.memblock);
+ c->srb_template.memblock = NULL;
+ }
+
+ /* Send disable command back again */
+ t2 = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t2, PA_COMMAND_DISABLE_SRBCHANNEL);
+ pa_tagstruct_putu32(t2, tag);
+ pa_pstream_send_tagstruct(c->pstream, t2);
+}
+
+
void pa_command_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_context *c = userdata;
pa_proplist *pl = NULL;
diff --git a/src/pulse/internal.h b/src/pulse/internal.h
index c5084d5..fc2b702 100644
--- a/src/pulse/internal.h
+++ b/src/pulse/internal.h
@@ -66,6 +66,9 @@ struct pa_context {
pa_pstream *pstream;
pa_pdispatch *pdispatch;
+ pa_srbchannel_template srb_template;
+ uint32_t srb_setup_tag;
+
pa_hashmap *record_streams, *playback_streams;
PA_LLIST_HEAD(pa_stream, streams);
PA_LLIST_HEAD(pa_operation, operations);
diff --git a/src/pulsecore/native-common.h b/src/pulsecore/native-common.h
index e7d2970..ca1b430 100644
--- a/src/pulsecore/native-common.h
+++ b/src/pulsecore/native-common.h
@@ -176,6 +176,11 @@ enum {
/* Supported since protocol v27 (3.0) */
PA_COMMAND_SET_PORT_LATENCY_OFFSET,
+ /* Supported since protocol v30 (6.0) */
+ /* BOTH DIRECTIONS */
+ PA_COMMAND_ENABLE_SRBCHANNEL,
+ PA_COMMAND_DISABLE_SRBCHANNEL,
+
PA_COMMAND_MAX
};
commit 4931637f822f23888c7477ad5bbfcf87b337db48
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri Apr 25 16:58:03 2014 +0200
pstream: Allow reading/writing through srbchannel
For writing, we prefer writing through the srbchannel if one is available,
and we have no ancil data to send.
For reading, we support reading from both in parallel. This meant replicating
a struct used for reading, so a lot of this patch is just a search/replace in
do_read to use the appropriate channel for reading.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 539c4a2..ceda728 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -109,12 +109,23 @@ struct item_info {
uint32_t block_id;
};
+struct pstream_read {
+ pa_pstream_descriptor descriptor;
+ pa_memblock *memblock;
+ pa_packet *packet;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+ void *data;
+ size_t index;
+};
+
struct pa_pstream {
PA_REFCNT_DECLARE;
pa_mainloop_api *mainloop;
pa_defer_event *defer_event;
pa_iochannel *io;
+ pa_srbchannel *srb, *srbpending;
+ bool is_srbpending;
pa_queue *send_queue;
@@ -132,14 +143,7 @@ struct pa_pstream {
pa_memchunk memchunk;
} write;
- struct {
- pa_pstream_descriptor descriptor;
- pa_memblock *memblock;
- pa_packet *packet;
- uint32_t shm_info[PA_PSTREAM_SHM_MAX];
- void *data;
- size_t index;
- } read;
+ struct pstream_read readio, readsrb;
bool use_shm;
pa_memimport *import;
@@ -172,7 +176,7 @@ struct pa_pstream {
};
static int do_write(pa_pstream *p);
-static int do_read(pa_pstream *p);
+static int do_read(pa_pstream *p, struct pstream_read *re);
static void do_pstream_read_write(pa_pstream *p) {
pa_assert(p);
@@ -182,8 +186,13 @@ static void do_pstream_read_write(pa_pstream *p) {
p->mainloop->defer_enable(p->defer_event, 0);
+ if (!p->dead && p->srb) {
+ do_write(p);
+ while (!p->dead && do_read(p, &p->readsrb) == 0);
+ }
+
if (!p->dead && pa_iochannel_is_readable(p->io)) {
- if (do_read(p) < 0)
+ if (do_read(p, &p->readio) < 0)
goto fail;
} else if (!p->dead && pa_iochannel_is_hungup(p->io))
goto fail;
@@ -208,6 +217,17 @@ fail:
pa_pstream_unref(p);
}
+static bool srb_callback(pa_srbchannel *srb, void *userdata) {
+ pa_pstream *p = userdata;
+
+ pa_assert(p);
+ pa_assert(PA_REFCNT_VALUE(p) > 0);
+ pa_assert(p->srb == srb);
+
+ do_pstream_read_write(p);
+ return p->srb != NULL;
+}
+
static void io_callback(pa_iochannel*io, void *userdata) {
pa_pstream *p = userdata;
@@ -289,11 +309,17 @@ static void pstream_free(pa_pstream *p) {
if (p->write.memchunk.memblock)
pa_memblock_unref(p->write.memchunk.memblock);
- if (p->read.memblock)
- pa_memblock_unref(p->read.memblock);
+ if (p->readsrb.memblock)
+ pa_memblock_unref(p->readsrb.memblock);
- if (p->read.packet)
- pa_packet_unref(p->read.packet);
+ if (p->readsrb.packet)
+ pa_packet_unref(p->readsrb.packet);
+
+ if (p->readio.memblock)
+ pa_memblock_unref(p->readio.memblock);
+
+ if (p->readio.packet)
+ pa_packet_unref(p->readio.packet);
pa_xfree(p);
}
@@ -556,6 +582,20 @@ static void prepare_next_write_item(pa_pstream *p) {
#endif
}
+static void check_srbpending(pa_pstream *p) {
+ if (!p->is_srbpending)
+ return;
+
+ if (p->srb)
+ pa_srbchannel_free(p->srb);
+
+ p->srb = p->srbpending;
+ p->is_srbpending = false;
+
+ if (p->srb)
+ pa_srbchannel_set_callback(p->srb, srb_callback, p);
+}
+
static int do_write(pa_pstream *p) {
void *d;
size_t l;
@@ -568,8 +608,11 @@ static int do_write(pa_pstream *p) {
if (!p->write.current)
prepare_next_write_item(p);
- if (!p->write.current)
+ if (!p->write.current) {
+ /* The out queue is empty, so switching channels is safe */
+ check_srbpending(p);
return 0;
+ }
if (p->write.minibuf_validsize > 0) {
d = p->write.minibuf + p->write.index;
@@ -606,8 +649,9 @@ static int do_write(pa_pstream *p) {
p->send_ancil_now = false;
} else
#endif
-
- if ((r = pa_iochannel_write(p->io, d, l)) < 0)
+ if (p->srb)
+ r = pa_srbchannel_write(p->srb, d, l);
+ else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
goto fail;
if (release_memblock)
@@ -639,7 +683,7 @@ fail:
return -1;
}
-static int do_read(pa_pstream *p) {
+static int do_read(pa_pstream *p, struct pstream_read *re) {
void *d;
size_t l;
ssize_t r;
@@ -647,23 +691,32 @@ static int do_read(pa_pstream *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
- if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
- d = (uint8_t*) p->read.descriptor + p->read.index;
- l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
+ if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
+ d = (uint8_t*) re->descriptor + re->index;
+ l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
} else {
- pa_assert(p->read.data || p->read.memblock);
+ pa_assert(re->data || re->memblock);
- if (p->read.data)
- d = p->read.data;
+ if (re->data)
+ d = re->data;
else {
- d = pa_memblock_acquire(p->read.memblock);
- release_memblock = p->read.memblock;
+ d = pa_memblock_acquire(re->memblock);
+ release_memblock = re->memblock;
}
- d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
- l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
+ d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
+ l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
+ if (re == &p->readsrb) {
+ r = pa_srbchannel_read(p->srb, d, l);
+ if (r == 0) {
+ if (release_memblock)
+ pa_memblock_release(release_memblock);
+ return 1;
+ }
+ }
+ else
#ifdef HAVE_CREDS
{
pa_ancil b;
@@ -689,13 +742,13 @@ static int do_read(pa_pstream *p) {
if (release_memblock)
pa_memblock_release(release_memblock);
- p->read.index += (size_t) r;
+ re->index += (size_t) r;
- if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+ if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
uint32_t flags, length, channel;
/* Reading of frame descriptor complete */
- flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
@@ -706,10 +759,10 @@ static int do_read(pa_pstream *p) {
/* This is a SHM memblock release frame with no payload */
-/* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
pa_assert(p->export);
- pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+ pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
goto frame_done;
@@ -717,24 +770,24 @@ static int do_read(pa_pstream *p) {
/* This is a SHM memblock revoke frame with no payload */
-/* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
pa_assert(p->import);
- pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+ pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
goto frame_done;
}
- length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+ length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
return -1;
}
- pa_assert(!p->read.packet && !p->read.memblock);
+ pa_assert(!re->packet && !re->memblock);
- channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+ channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
if (channel == (uint32_t) -1) {
@@ -744,8 +797,8 @@ static int do_read(pa_pstream *p) {
}
/* Frame is a packet frame */
- p->read.packet = pa_packet_new(length);
- p->read.data = p->read.packet->data;
+ re->packet = pa_packet_new(length);
+ re->data = re->packet->data;
} else {
@@ -756,20 +809,20 @@ static int do_read(pa_pstream *p) {
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
- if (length != sizeof(p->read.shm_info)) {
+ if (length != sizeof(re->shm_info)) {
pa_log_warn("Received SHM memblock frame with invalid frame length.");
return -1;
}
/* Frame is a memblock frame referencing an SHM memblock */
- p->read.data = p->read.shm_info;
+ re->data = re->shm_info;
} else if ((flags & PA_FLAG_SHMMASK) == 0) {
/* Frame is a memblock frame */
- p->read.memblock = pa_memblock_new(p->mempool, length);
- p->read.data = NULL;
+ re->memblock = pa_memblock_new(p->mempool, length);
+ re->data = NULL;
} else {
pa_log_warn("Received memblock frame with invalid flags value.");
@@ -777,74 +830,74 @@ static int do_read(pa_pstream *p) {
}
}
- } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
+ } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
/* Frame payload available */
- if (p->read.memblock && p->receive_memblock_callback) {
+ if (re->memblock && p->receive_memblock_callback) {
/* Is this memblock data? Than pass it to the user */
- l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
+ l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
if (l > 0) {
pa_memchunk chunk;
- chunk.memblock = p->read.memblock;
- chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
+ chunk.memblock = re->memblock;
+ chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
chunk.length = l;
if (p->receive_memblock_callback) {
int64_t offset;
offset = (int64_t) (
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
p->receive_memblock_callback(
p,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->receive_memblock_callback_userdata);
}
/* Drop seek info for following callbacks */
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
}
}
/* Frame complete */
- if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+ if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
- if (p->read.memblock) {
+ if (re->memblock) {
/* This was a memblock frame. We can unref the memblock now */
- pa_memblock_unref(p->read.memblock);
+ pa_memblock_unref(re->memblock);
- } else if (p->read.packet) {
+ } else if (re->packet) {
if (p->receive_packet_callback)
#ifdef HAVE_CREDS
- p->receive_packet_callback(p, p->read.packet, &p->read_ancil, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata);
#else
- p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
#endif
- pa_packet_unref(p->read.packet);
+ pa_packet_unref(re->packet);
} else {
pa_memblock *b;
- uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
pa_assert(p->import);
if (!(b = pa_memimport_get(p->import,
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
!!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
@@ -857,17 +910,17 @@ static int do_read(pa_pstream *p) {
chunk.memblock = b;
chunk.index = 0;
- chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
+ chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
offset = (int64_t) (
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
p->receive_memblock_callback(
p,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->receive_memblock_callback_userdata);
}
@@ -883,10 +936,10 @@ static int do_read(pa_pstream *p) {
return 0;
frame_done:
- p->read.memblock = NULL;
- p->read.packet = NULL;
- p->read.index = 0;
- p->read.data = NULL;
+ re->memblock = NULL;
+ re->packet = NULL;
+ re->index = 0;
+ re->data = NULL;
#ifdef HAVE_CREDS
p->read_ancil.creds_valid = false;
@@ -988,6 +1041,9 @@ void pa_pstream_unlink(pa_pstream *p) {
p->dead = true;
+ while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
+ pa_pstream_set_srbchannel(p, NULL);
+
if (p->import) {
pa_memimport_free(p->import);
p->import = NULL;
@@ -1040,3 +1096,23 @@ bool pa_pstream_get_shm(pa_pstream *p) {
return p->use_shm;
}
+
+void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
+ pa_assert(p);
+ pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
+
+ if (srb == p->srb)
+ return;
+
+ /* We can't handle quick switches between srbchannels. */
+ pa_assert(!p->is_srbpending);
+
+ p->srbpending = srb;
+ p->is_srbpending = true;
+
+ /* Switch immediately, if possible. */
+ if (p->dead)
+ check_srbpending(p);
+ else
+ do_write(p);
+}
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index 4961570..8e4056c 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -31,6 +31,7 @@
#include <pulsecore/packet.h>
#include <pulsecore/memblock.h>
#include <pulsecore/iochannel.h>
+#include <pulsecore/srbchannel.h>
#include <pulsecore/memchunk.h>
#include <pulsecore/creds.h>
#include <pulsecore/macro.h>
@@ -66,4 +67,8 @@ bool pa_pstream_is_pending(pa_pstream *p);
void pa_pstream_enable_shm(pa_pstream *p, bool enable);
bool pa_pstream_get_shm(pa_pstream *p);
+/* Enables shared ringbuffer channel. Note that the srbchannel is now owned by the pstream.
+ Setting srb to NULL will free any existing srbchannel. */
+void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb);
+
#endif
commit b06e61652533201f0b58ed339d4435099d180a36
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri Apr 25 14:20:57 2014 +0200
core: Add a second rw mempool
To keep the data and the ringbuffer separate, let's add another
mempool just for the ringbuffer(s). That way, the client can open
the ringbuffer shm file in rw mode and keep the data in ro mode.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/pulsecore/core.c b/src/pulsecore/core.c
index e6f2dfc..b0f2314 100644
--- a/src/pulsecore/core.c
+++ b/src/pulsecore/core.c
@@ -127,6 +127,11 @@ pa_core* pa_core_new(pa_mainloop_api *m, bool shared, size_t shm_size) {
c->mempool = pool;
pa_silence_cache_init(&c->silence_cache);
+ if (shared && !(c->rw_mempool = pa_mempool_new(shared, shm_size)))
+ pa_log_warn("failed to allocate shared writable memory pool.");
+ if (c->rw_mempool)
+ pa_mempool_set_is_remote_writable(c->rw_mempool, true);
+
c->exit_event = NULL;
c->exit_idle_time = -1;
@@ -208,6 +213,8 @@ static void core_free(pa_object *o) {
pa_assert(!c->default_sink);
pa_silence_cache_done(&c->silence_cache);
+ if (c->rw_mempool)
+ pa_mempool_free(c->rw_mempool);
pa_mempool_free(c->mempool);
for (j = 0; j < PA_CORE_HOOK_MAX; j++)
@@ -254,7 +261,6 @@ void pa_core_maybe_vacuum(pa_core *c) {
if (pa_idxset_isempty(c->sink_inputs) && pa_idxset_isempty(c->source_outputs)) {
pa_log_debug("Hmm, no streams around, trying to vacuum.");
- pa_mempool_vacuum(c->mempool);
} else {
pa_sink *si;
pa_source *so;
@@ -271,8 +277,12 @@ void pa_core_maybe_vacuum(pa_core *c) {
return;
pa_log_info("All sinks and sources are suspended, vacuuming memory");
- pa_mempool_vacuum(c->mempool);
}
+
+ pa_mempool_vacuum(c->mempool);
+
+ if (c->rw_mempool)
+ pa_mempool_vacuum(c->rw_mempool);
}
pa_time_event* pa_core_rttime_new(pa_core *c, pa_usec_t usec, pa_time_event_cb_t cb, void *userdata) {
diff --git a/src/pulsecore/core.h b/src/pulsecore/core.h
index b443ce4..1f9df73 100644
--- a/src/pulsecore/core.h
+++ b/src/pulsecore/core.h
@@ -170,7 +170,10 @@ struct pa_core {
PA_LLIST_HEAD(pa_subscription_event, subscription_event_queue);
pa_subscription_event *subscription_event_last;
- pa_mempool *mempool;
+ /* The mempool is used for data we write to, it's readonly for the client.
+ The rw_mempool is used for data writable by both server and client (and
+ can be NULL in some cases). */
+ pa_mempool *mempool, *rw_mempool;
pa_silence_cache silence_cache;
pa_time_event *exit_event;
commit 073128fbc878d0434cc89d73fd80db3d97f62764
Author: Peter Meerwald <pmeerw at pmeerw.net>
Date: Mon May 26 22:18:30 2014 +0200
tests: Adapt memblock-test to changed pa_memimport_get()
patch 'memblock, pstream: Allow send/receive of remote writable memblocks'
adds an extra parameter to pa_memimport_get()
change test program accordingly
Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net>
Cc: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/tests/memblock-test.c b/src/tests/memblock-test.c
index d46da6c..9581daa 100644
--- a/src/tests/memblock-test.c
+++ b/src/tests/memblock-test.c
@@ -130,7 +130,7 @@ START_TEST (memblock_test) {
pa_log("A: Memory block exported as %u", id);
- mb_b = pa_memimport_get(import_b, id, shm_id, offset, size);
+ mb_b = pa_memimport_get(import_b, id, shm_id, offset, size, false);
fail_unless(mb_b != NULL);
r = pa_memexport_put(export_b, mb_b, &id, &shm_id, &offset, &size);
fail_unless(r >= 0);
@@ -139,7 +139,7 @@ START_TEST (memblock_test) {
pa_log("B: Memory block exported as %u", id);
- mb_c = pa_memimport_get(import_c, id, shm_id, offset, size);
+ mb_c = pa_memimport_get(import_c, id, shm_id, offset, size, false);
fail_unless(mb_c != NULL);
x = pa_memblock_acquire(mb_c);
pa_log_debug("1 data=%s", x);
commit 710c4b39af680c842d57e7b394068f1005fda438
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri Apr 25 15:30:41 2014 +0200
memblock, pstream: Allow send/receive of remote writable memblocks
The shared ringbuffer memblock must be writable by both sides.
This makes it possible to send such a memblock over a pstream without
the "both sides writable" information getting lost.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 8da0fcd..5ef2aa9 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -97,6 +97,7 @@ struct pa_memimport_segment {
pa_shm memory;
pa_memtrap *trap;
unsigned n_blocks;
+ bool writable;
};
/* A collection of multiple segments */
@@ -146,6 +147,7 @@ struct pa_mempool {
pa_shm memory;
size_t block_size;
unsigned n_blocks;
+ bool is_remote_writable;
pa_atomic_t n_init;
@@ -303,6 +305,19 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
}
/* No lock necessary */
+bool pa_mempool_is_remote_writable(pa_mempool *p) {
+ pa_assert(p);
+ return p->is_remote_writable;
+}
+
+/* No lock necessary */
+void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable) {
+ pa_assert(p);
+ pa_assert(!writable || pa_mempool_is_shared(p));
+ p->is_remote_writable = writable;
+}
+
+/* No lock necessary */
pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
pa_memblock *b = NULL;
struct mempool_slot *slot;
@@ -416,6 +431,14 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free
}
/* No lock necessary */
+bool pa_memblock_is_ours(pa_memblock *b) {
+ pa_assert(b);
+ pa_assert(PA_REFCNT_VALUE(b) > 0);
+
+ return b->type != PA_MEMBLOCK_IMPORTED;
+}
+
+/* No lock necessary */
bool pa_memblock_is_read_only(pa_memblock *b) {
pa_assert(b);
pa_assert(PA_REFCNT_VALUE(b) > 0);
@@ -905,7 +928,7 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
/* Should be called locked */
-static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
pa_memimport_segment* seg;
if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
@@ -913,11 +936,12 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
seg = pa_xnew0(pa_memimport_segment, 1);
- if (pa_shm_attach(&seg->memory, shm_id, false) < 0) {
+ if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
pa_xfree(seg);
return NULL;
}
+ seg->writable = writable;
seg->import = i;
seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size);
@@ -973,7 +997,8 @@ void pa_memimport_free(pa_memimport *i) {
}
/* Self-locked */
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+ size_t offset, size_t size, bool writable) {
pa_memblock *b = NULL;
pa_memimport_segment *seg;
@@ -990,9 +1015,14 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
goto finish;
if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
- if (!(seg = segment_attach(i, shm_id)))
+ if (!(seg = segment_attach(i, shm_id, writable)))
goto finish;
+ if (writable != seg->writable) {
+ pa_log("Cannot open segment - writable status changed!");
+ goto finish;
+ }
+
if (offset+size > seg->memory.size)
goto finish;
@@ -1002,7 +1032,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
PA_REFCNT_INIT(b);
b->pool = i->pool;
b->type = PA_MEMBLOCK_IMPORTED;
- b->read_only = true;
+ b->read_only = !writable;
b->is_silence = false;
pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
b->length = size;
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 502f207..d60f3c3 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -104,6 +104,7 @@ function is not multiple caller safe, i.e. needs to be locked
manually if called from more than one thread at the same time. */
void pa_memblock_unref_fixed(pa_memblock*b);
+bool pa_memblock_is_ours(pa_memblock *b);
bool pa_memblock_is_read_only(pa_memblock *b);
bool pa_memblock_is_silence(pa_memblock *b);
bool pa_memblock_ref_is_one(pa_memblock *b);
@@ -125,12 +126,15 @@ const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
void pa_mempool_vacuum(pa_mempool *p);
int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
bool pa_mempool_is_shared(pa_mempool *p);
+bool pa_mempool_is_remote_writable(pa_mempool *p);
+void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable);
size_t pa_mempool_block_size_max(pa_mempool *p);
/* For receiving blocks from other nodes */
pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
void pa_memimport_free(pa_memimport *i);
-pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
+ size_t offset, size_t size, bool writable);
int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
/* For sending blocks to other nodes */
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 22ea250..539c4a2 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -45,11 +45,12 @@
#include "pstream.h"
/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
-#define PA_FLAG_SHMDATA 0x80000000LU
-#define PA_FLAG_SHMRELEASE 0x40000000LU
-#define PA_FLAG_SHMREVOKE 0xC0000000LU
-#define PA_FLAG_SHMMASK 0xFF000000LU
-#define PA_FLAG_SEEKMASK 0x000000FFLU
+#define PA_FLAG_SHMDATA 0x80000000LU
+#define PA_FLAG_SHMRELEASE 0x40000000LU
+#define PA_FLAG_SHMREVOKE 0xC0000000LU
+#define PA_FLAG_SHMMASK 0xFF000000LU
+#define PA_FLAG_SEEKMASK 0x000000FFLU
+#define PA_FLAG_SHMWRITABLE 0x00800000LU
/* The sequence descriptor header consists of 5 32bit integers: */
enum {
@@ -504,10 +505,15 @@ static void prepare_next_write_item(pa_pstream *p) {
size_t offset, length;
uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
+ pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
+ pa_memexport *current_export;
- pa_assert(p->export);
+ if (p->mempool == current_pool)
+ pa_assert_se(current_export = p->export);
+ else
+ pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
- if (pa_memexport_put(p->export,
+ if (pa_memexport_put(current_export,
p->write.current->chunk.memblock,
&block_id,
&shm_id,
@@ -515,6 +521,8 @@ static void prepare_next_write_item(pa_pstream *p) {
&length) >= 0) {
flags |= PA_FLAG_SHMDATA;
+ if (pa_mempool_is_remote_writable(current_pool))
+ flags |= PA_FLAG_SHMWRITABLE;
send_payload = false;
shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
@@ -527,6 +535,9 @@ static void prepare_next_write_item(pa_pstream *p) {
}
/* else */
/* pa_log_warn("Failed to export memory block."); */
+
+ if (current_export != p->export)
+ pa_memexport_free(current_export);
}
if (send_payload) {
@@ -824,8 +835,8 @@ static int do_read(pa_pstream *p) {
pa_packet_unref(p->read.packet);
} else {
pa_memblock *b;
-
- pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+ uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
pa_assert(p->import);
@@ -833,7 +844,8 @@ static int do_read(pa_pstream *p) {
ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
+ !!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
pa_log_debug("Failed to import memory block.");
commit 613177919f0876c082e4db3b610afcfced5f593a
Author: David Henningsson <david.henningsson at canonical.com>
Date: Fri Apr 25 13:58:26 2014 +0200
shm: Allow to open shm in writable mode
This is a preparation for the shm ringbuffer, which needs to be able
to be writable by both sides, because there are atomic variables they
both need to modify.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 9cc02c1..8da0fcd 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -913,7 +913,7 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
seg = pa_xnew0(pa_memimport_segment, 1);
- if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
+ if (pa_shm_attach(&seg->memory, shm_id, false) < 0) {
pa_xfree(seg);
return NULL;
}
diff --git a/src/pulsecore/shm.c b/src/pulsecore/shm.c
index efaee57..075c8bd 100644
--- a/src/pulsecore/shm.c
+++ b/src/pulsecore/shm.c
@@ -290,16 +290,17 @@ void pa_shm_punch(pa_shm *m, size_t offset, size_t size) {
#ifdef HAVE_SHM_OPEN
-int pa_shm_attach_ro(pa_shm *m, unsigned id) {
+int pa_shm_attach(pa_shm *m, unsigned id, bool writable) {
char fn[32];
int fd = -1;
+ int prot;
struct stat st;
pa_assert(m);
segment_name(fn, sizeof(fn), m->id = id);
- if ((fd = shm_open(fn, O_RDONLY, 0)) < 0) {
+ if ((fd = shm_open(fn, writable ? O_RDWR : O_RDONLY, 0)) < 0) {
if (errno != EACCES && errno != ENOENT)
pa_log("shm_open() failed: %s", pa_cstrerror(errno));
goto fail;
@@ -319,7 +320,8 @@ int pa_shm_attach_ro(pa_shm *m, unsigned id) {
m->size = (size_t) st.st_size;
- if ((m->ptr = mmap(NULL, PA_PAGE_ALIGN(m->size), PROT_READ, MAP_SHARED, fd, (off_t) 0)) == MAP_FAILED) {
+ prot = writable ? PROT_READ | PROT_WRITE : PROT_READ;
+ if ((m->ptr = mmap(NULL, PA_PAGE_ALIGN(m->size), prot, MAP_SHARED, fd, (off_t) 0)) == MAP_FAILED) {
pa_log("mmap() failed: %s", pa_cstrerror(errno));
goto fail;
}
@@ -340,7 +342,7 @@ fail:
#else /* HAVE_SHM_OPEN */
-int pa_shm_attach_ro(pa_shm *m, unsigned id) {
+int pa_shm_attach(pa_shm *m, unsigned id, bool writable) {
return -1;
}
@@ -375,7 +377,7 @@ int pa_shm_cleanup(void) {
if (pa_atou(de->d_name + SHM_ID_LEN, &id) < 0)
continue;
- if (pa_shm_attach_ro(&seg, id) < 0)
+ if (pa_shm_attach(&seg, id, false) < 0)
continue;
if (seg.size < SHM_MARKER_SIZE) {
diff --git a/src/pulsecore/shm.h b/src/pulsecore/shm.h
index 9d61551..2238239 100644
--- a/src/pulsecore/shm.h
+++ b/src/pulsecore/shm.h
@@ -35,7 +35,7 @@ typedef struct pa_shm {
} pa_shm;
int pa_shm_create_rw(pa_shm *m, size_t size, bool shared, mode_t mode);
-int pa_shm_attach_ro(pa_shm *m, unsigned id);
+int pa_shm_attach(pa_shm *m, unsigned id, bool writable);
void pa_shm_punch(pa_shm *m, size_t offset, size_t size);
commit 0cd4d3531acc9894275959309967d0bc2adcc8f5
Author: David Henningsson <david.henningsson at canonical.com>
Date: Tue Apr 15 17:20:05 2014 +0200
srbchannel: Add the shared ringbuffer object
An shm ringbuffer that is used for low overhead server-client communication.
Signalling is done through eventfd semaphores - it's based on pa_fdsem to avoid
syscalls if nothing is waiting on the other side.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/Makefile.am b/src/Makefile.am
index c5fea5d..e388bd2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -618,6 +618,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
pulsecore/creds.h \
pulsecore/dynarray.c pulsecore/dynarray.h \
pulsecore/endianmacros.h \
+ pulsecore/fdsem.c pulsecore/fdsem.h \
pulsecore/flist.c pulsecore/flist.h \
pulsecore/g711.c pulsecore/g711.h \
pulsecore/hashmap.c pulsecore/hashmap.h \
@@ -651,6 +652,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \
pulsecore/queue.c pulsecore/queue.h \
pulsecore/random.c pulsecore/random.h \
pulsecore/refcnt.h \
+ pulsecore/srbchannel.c pulsecore/srbchannel.h \
pulsecore/sample-util.c pulsecore/sample-util.h \
pulsecore/shm.c pulsecore/shm.h \
pulsecore/bitset.c pulsecore/bitset.h \
@@ -880,7 +882,6 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \
pulsecore/core-scache.c pulsecore/core-scache.h \
pulsecore/core-subscribe.c pulsecore/core-subscribe.h \
pulsecore/core.c pulsecore/core.h \
- pulsecore/fdsem.c pulsecore/fdsem.h \
pulsecore/hook-list.c pulsecore/hook-list.h \
pulsecore/ltdl-helper.c pulsecore/ltdl-helper.h \
pulsecore/modargs.c pulsecore/modargs.h \
diff --git a/src/pulsecore/srbchannel.c b/src/pulsecore/srbchannel.c
new file mode 100644
index 0000000..5fe2220
--- /dev/null
+++ b/src/pulsecore/srbchannel.c
@@ -0,0 +1,305 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2014 David Henningsson, Canonical Ltd.
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of the
+ License, or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "srbchannel.h"
+
+#include <pulsecore/atomic.h>
+#include <pulse/xmalloc.h>
+
+/* #define DEBUG_SRBCHANNEL */
+
+/* This ringbuffer might be useful in other contexts too, but
+ right now it's only used inside the srbchannel, so let's keep it here
+ for the time being. */
+typedef struct pa_ringbuffer pa_ringbuffer;
+struct pa_ringbuffer {
+ pa_atomic_t *count;
+ int capacity;
+ uint8_t *memory;
+ int readindex, writeindex;
+};
+
+static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
+ int c = pa_atomic_load(r->count);
+ if (r->readindex + c > r->capacity)
+ *count = r->capacity - r->readindex;
+ else
+ *count = c;
+ return r->memory + r->readindex;
+}
+
+/* Returns true only if the buffer was completely full before the drop. */
+static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
+ bool b = pa_atomic_sub(r->count, count) >= r->capacity;
+ r->readindex += count;
+ r->readindex %= r->capacity;
+ return b;
+}
+
+static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
+ int c = pa_atomic_load(r->count);
+ *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
+ return r->memory + r->writeindex;
+}
+
+static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) {
+ pa_atomic_add(r->count, count);
+ r->writeindex += count;
+ r->writeindex %= r->capacity;
+}
+
+struct pa_srbchannel {
+ pa_ringbuffer rb_read, rb_write;
+ pa_fdsem *sem_read, *sem_write;
+ pa_memblock *memblock;
+ void *cb_userdata;
+ pa_srbchannel_cb_t callback;
+ pa_io_event *read_event;
+ pa_mainloop_api *mainloop;
+};
+
+/* We always listen to sem_read, and always signal on sem_write.
+
+ This means we signal the same semaphore for two scenarios:
+ 1) We have written something to our send buffer, and want the other
+ side to read it
+ 2) We have read something from our receive buffer that was previously
+ completely full, and want the other side to continue writing
+*/
+
+size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
+ size_t written = 0;
+ while (l > 0) {
+ int towrite;
+ void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
+ if ((size_t) towrite > l)
+ towrite = l;
+ if (towrite == 0) {
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("srbchannel output buffer full");
+#endif
+ break;
+ }
+ memcpy(ptr, data, towrite);
+ pa_ringbuffer_end_write(&sr->rb_write, towrite);
+ written += towrite;
+ data = (uint8_t*) data + towrite;
+ l -= towrite;
+ }
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
+#endif
+ pa_fdsem_post(sr->sem_write);
+ return written;
+}
+
+size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
+ size_t isread = 0;
+ while (l > 0) {
+ int toread;
+ void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
+ if ((size_t) toread > l)
+ toread = l;
+ if (toread == 0)
+ break;
+ memcpy(data, ptr, toread);
+ if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("read from full output buffer, signalling fdsem");
+#endif
+ pa_fdsem_post(sr->sem_write);
+ }
+
+ isread += toread;
+ data = (uint8_t*) data + toread;
+ l -= toread;
+ }
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("Read %d bytes from srbchannel", (int) isread);
+#endif
+ return isread;
+}
+
+/* This is the memory layout of the ringbuffer shm block. It is followed by
+ read and write ringbuffer memory. */
+struct srbheader {
+ pa_atomic_t read_count;
+ pa_atomic_t write_count;
+ pa_fdsem_data read_semdata;
+ pa_fdsem_data write_semdata;
+ int capacity;
+ int readbuf_offset;
+ int writebuf_offset;
+ /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
+};
+
+static void srbchannel_rwloop(pa_srbchannel* sr) {
+ do {
+#ifdef DEBUG_SRBCHANNEL
+ int q;
+ pa_ringbuffer_peek(&sr->rb_read, &q);
+ pa_log("In rw loop from srbchannel, before callback, count = %d", q);
+#endif
+
+ if (sr->callback)
+ if (!sr->callback(sr, sr->cb_userdata)) {
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("Aborting read loop from srbchannel");
+#endif
+ return;
+ }
+
+#ifdef DEBUG_SRBCHANNEL
+ pa_ringbuffer_peek(&sr->rb_read, &q);
+ pa_log("In rw loop from srbchannel, after callback, count = %d", q);
+#endif
+
+ } while (pa_fdsem_before_poll(sr->sem_read) < 0);
+}
+
+static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) {
+ pa_srbchannel* sr = userdata;
+
+ pa_fdsem_after_poll(sr->sem_read);
+ srbchannel_rwloop(sr);
+}
+
+pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
+ int capacity;
+ int readfd;
+ struct srbheader *srh;
+
+ pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
+ sr->mainloop = m;
+ sr->memblock = pa_memblock_new_pool(p, -1);
+ srh = pa_memblock_acquire(sr->memblock);
+ pa_zero(*srh);
+
+ sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
+ srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
+ capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
+ sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
+ srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
+ capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
+ pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
+ (int) pa_memblock_get_length(sr->memblock), capacity);
+
+ srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
+ sr->rb_read.count = &srh->read_count;
+ sr->rb_write.count = &srh->write_count;
+
+ sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata);
+ sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata);
+
+ readfd = pa_fdsem_get(sr->sem_read);
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("Enabling io event on fd %d", readfd);
+#endif
+ sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
+ m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
+
+ return sr;
+}
+
+static void pa_srbchannel_swap(pa_srbchannel *sr) {
+ pa_srbchannel temp = *sr;
+ sr->sem_read = temp.sem_write;
+ sr->sem_write = temp.sem_read;
+ sr->rb_read = temp.rb_write;
+ sr->rb_write = temp.rb_read;
+}
+
+pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t)
+{
+ int temp;
+ struct srbheader *srh;
+ pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
+
+ sr->mainloop = m;
+ sr->memblock = t->memblock;
+ pa_memblock_ref(sr->memblock);
+ srh = pa_memblock_acquire(sr->memblock);
+
+ sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
+ sr->rb_read.count = &srh->read_count;
+ sr->rb_write.count = &srh->write_count;
+ sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
+ sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
+
+ sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
+ sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
+
+ pa_srbchannel_swap(sr);
+ temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;
+
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("Enabling io event on fd %d", t->readfd);
+#endif
+ sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
+ m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
+
+ return sr;
+}
+
+void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) {
+ t->memblock = sr->memblock;
+ t->readfd = pa_fdsem_get(sr->sem_read);
+ t->writefd = pa_fdsem_get(sr->sem_write);
+}
+
+void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {
+ if (sr->callback)
+ pa_fdsem_after_poll(sr->sem_read);
+
+ sr->callback = callback;
+ sr->cb_userdata = userdata;
+
+ if (sr->callback)
+ /* Maybe deferred event? */
+ srbchannel_rwloop(sr);
+}
+
+void pa_srbchannel_free(pa_srbchannel *sr)
+{
+#ifdef DEBUG_SRBCHANNEL
+ pa_log("Freeing srbchannel");
+#endif
+ pa_assert(sr);
+
+ if (sr->read_event)
+ sr->mainloop->io_free(sr->read_event);
+
+ if (sr->sem_read)
+ pa_fdsem_free(sr->sem_read);
+ if (sr->sem_write)
+ pa_fdsem_free(sr->sem_write);
+
+ if (sr->memblock) {
+ pa_memblock_release(sr->memblock);
+ pa_memblock_unref(sr->memblock);
+ }
+
+ pa_xfree(sr);
+}
diff --git a/src/pulsecore/srbchannel.h b/src/pulsecore/srbchannel.h
new file mode 100644
index 0000000..843bf96
--- /dev/null
+++ b/src/pulsecore/srbchannel.h
@@ -0,0 +1,62 @@
+#ifndef foopulsesrbchannelhfoo
+#define foopulsesrbchannelhfoo
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2014 David Henningsson, Canonical Ltd.
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of the
+ License, or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+***/
+
+#include <pulse/mainloop-api.h>
+#include <pulsecore/fdsem.h>
+#include <pulsecore/memblock.h>
+
+/* An shm ringbuffer that is used for low overhead server-client communication.
+ Signaling is done through eventfd semaphores (pa_fdsem). */
+
+typedef struct pa_srbchannel pa_srbchannel;
+
+typedef struct pa_srbchannel_template {
+ int readfd, writefd;
+ pa_memblock *memblock;
+} pa_srbchannel_template;
+
+pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p);
+/* Note: this creates a srbchannel with swapped read and write. */
+pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t);
+
+void pa_srbchannel_free(pa_srbchannel *sr);
+
+void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t);
+
+size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l);
+size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l);
+
+/* Set the callback function that is called whenever data becomes available for reading.
+ It can also be called if the output buffer was full and can now be written to.
+
+ Return false to abort all processing (e g if the srbchannel has been freed during the callback).
+ Otherwise return true.
+
+ Note that the callback will be called immediately, to be able to process stuff that
+ might already be in the buffer.
+*/
+typedef bool (*pa_srbchannel_cb_t)(pa_srbchannel *sr, void *userdata);
+void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata);
+
+#endif
commit cb484805c1343e4cef78e3f83931e71ffa9cb588
Author: David Henningsson <david.henningsson at canonical.com>
Date: Tue Apr 15 16:12:25 2014 +0200
iochannel/pstream: Support sending file descriptors
This patch adds support to iochannel, pstream and pstream-util
to send file descriptors over a unix pipe.
Currently we don't support writing both creds and fds in the same
packet, it's either one or the other (or neither).
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/pulsecore/iochannel.c b/src/pulsecore/iochannel.c
index e158018..2971c89 100644
--- a/src/pulsecore/iochannel.c
+++ b/src/pulsecore/iochannel.c
@@ -348,6 +348,49 @@ ssize_t pa_iochannel_write_with_creds(pa_iochannel*io, const void*data, size_t l
return r;
}
+ssize_t pa_iochannel_write_with_fds(pa_iochannel*io, const void*data, size_t l, int nfd, const int *fds) {
+ ssize_t r;
+ int *msgdata;
+ struct msghdr mh;
+ struct iovec iov;
+ union {
+ struct cmsghdr hdr;
+ uint8_t data[CMSG_SPACE(sizeof(int) * MAX_ANCIL_FDS)];
+ } cmsg;
+
+ pa_assert(io);
+ pa_assert(data);
+ pa_assert(l);
+ pa_assert(io->ofd >= 0);
+ pa_assert(fds);
+ pa_assert(nfd > 0);
+ pa_assert(nfd <= MAX_ANCIL_FDS);
+
+ pa_zero(iov);
+ iov.iov_base = (void*) data;
+ iov.iov_len = l;
+
+ pa_zero(cmsg);
+ cmsg.hdr.cmsg_level = SOL_SOCKET;
+ cmsg.hdr.cmsg_type = SCM_RIGHTS;
+
+ msgdata = (int*) CMSG_DATA(&cmsg.hdr);
+ memcpy(msgdata, fds, nfd * sizeof(int));
+ cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int) * nfd);
+
+ pa_zero(mh);
+ mh.msg_iov = &iov;
+ mh.msg_iovlen = 1;
+ mh.msg_control = &cmsg;
+ mh.msg_controllen = sizeof(cmsg);
+
+ if ((r = sendmsg(io->ofd, &mh, MSG_NOSIGNAL)) >= 0) {
+ io->writable = io->hungup = false;
+ enable_events(io);
+ }
+ return r;
+}
+
ssize_t pa_iochannel_read_with_ancil(pa_iochannel*io, void*data, size_t l, pa_ancil *ancil) {
ssize_t r;
struct msghdr mh;
diff --git a/src/pulsecore/iochannel.h b/src/pulsecore/iochannel.h
index 4da8902..390f798 100644
--- a/src/pulsecore/iochannel.h
+++ b/src/pulsecore/iochannel.h
@@ -57,6 +57,7 @@ ssize_t pa_iochannel_read(pa_iochannel*io, void*data, size_t l);
bool pa_iochannel_creds_supported(pa_iochannel *io);
int pa_iochannel_creds_enable(pa_iochannel *io);
+ssize_t pa_iochannel_write_with_fds(pa_iochannel*io, const void*data, size_t l, int nfd, const int *fds);
ssize_t pa_iochannel_write_with_creds(pa_iochannel*io, const void*data, size_t l, const pa_creds *ucred);
ssize_t pa_iochannel_read_with_ancil(pa_iochannel*io, void*data, size_t l, pa_ancil *ancil);
#endif
diff --git a/src/pulsecore/pstream-util.c b/src/pulsecore/pstream-util.c
index f84f486..ac256e2 100644
--- a/src/pulsecore/pstream-util.c
+++ b/src/pulsecore/pstream-util.c
@@ -28,7 +28,7 @@
#include "pstream-util.h"
-void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_creds *creds) {
+static void pa_pstream_send_tagstruct_with_ancil(pa_pstream *p, pa_tagstruct *t, const pa_ancil *ancil) {
size_t length;
uint8_t *data;
pa_packet *packet;
@@ -38,10 +38,37 @@ void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const
pa_assert_se(data = pa_tagstruct_free_data(t, &length));
pa_assert_se(packet = pa_packet_new_dynamic(data, length));
- pa_pstream_send_packet(p, packet, creds);
+ pa_pstream_send_packet(p, packet, ancil);
pa_packet_unref(packet);
}
+void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_creds *creds) {
+ if (creds) {
+ pa_ancil a;
+
+ a.nfd = 0;
+ a.creds_valid = true;
+ a.creds = *creds;
+ pa_pstream_send_tagstruct_with_ancil(p, t, &a);
+ }
+ else
+ pa_pstream_send_tagstruct_with_ancil(p, t, NULL);
+}
+
+void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds) {
+ if (nfd > 0) {
+ pa_ancil a;
+
+ a.nfd = nfd;
+ a.creds_valid = false;
+ pa_assert(nfd <= MAX_ANCIL_FDS);
+ memcpy(a.fds, fds, sizeof(int) * nfd);
+ pa_pstream_send_tagstruct_with_ancil(p, t, &a);
+ }
+ else
+ pa_pstream_send_tagstruct_with_ancil(p, t, NULL);
+}
+
void pa_pstream_send_error(pa_pstream *p, uint32_t tag, uint32_t error) {
pa_tagstruct *t;
diff --git a/src/pulsecore/pstream-util.h b/src/pulsecore/pstream-util.h
index ae0d79c..7ea89ba 100644
--- a/src/pulsecore/pstream-util.h
+++ b/src/pulsecore/pstream-util.h
@@ -29,6 +29,7 @@
/* The tagstruct is freed!*/
void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_creds *creds);
+void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds);
#define pa_pstream_send_tagstruct(p, t) pa_pstream_send_tagstruct_with_creds((p), (t), NULL)
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index a3afed5..22ea250 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -94,8 +94,8 @@ struct item_info {
/* packet info */
pa_packet *packet;
#ifdef HAVE_CREDS
- bool with_creds;
- pa_creds creds;
+ bool with_ancil;
+ pa_ancil ancil;
#endif
/* memblock info */
@@ -165,9 +165,8 @@ struct pa_pstream {
pa_mempool *mempool;
#ifdef HAVE_CREDS
- pa_ancil read_ancil;
- pa_creds write_creds;
- bool send_creds_now;
+ pa_ancil read_ancil, write_ancil;
+ bool send_ancil_now;
#endif
};
@@ -298,7 +297,7 @@ static void pstream_free(pa_pstream *p) {
pa_xfree(p);
}
-void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
+void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_ancil *ancil) {
struct item_info *i;
pa_assert(p);
@@ -315,8 +314,13 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
i->packet = pa_packet_ref(packet);
#ifdef HAVE_CREDS
- if ((i->with_creds = !!creds))
- i->creds = *creds;
+ if ((i->with_ancil = !!ancil)) {
+ i->ancil = *ancil;
+ if (ancil->creds_valid)
+ pa_assert(ancil->nfd == 0);
+ else
+ pa_assert(ancil->nfd > 0);
+ }
#endif
pa_queue_push(p->send_queue, i);
@@ -358,7 +362,7 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
i->offset = offset;
i->seek_mode = seek_mode;
#ifdef HAVE_CREDS
- i->with_creds = false;
+ i->with_ancil = false;
#endif
pa_queue_push(p->send_queue, i);
@@ -385,7 +389,7 @@ void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
item->type = PA_PSTREAM_ITEM_SHMRELEASE;
item->block_id = block_id;
#ifdef HAVE_CREDS
- item->with_creds = false;
+ item->with_ancil = false;
#endif
pa_queue_push(p->send_queue, item);
@@ -422,7 +426,7 @@ void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
item->type = PA_PSTREAM_ITEM_SHMREVOKE;
item->block_id = block_id;
#ifdef HAVE_CREDS
- item->with_creds = false;
+ item->with_ancil = false;
#endif
pa_queue_push(p->send_queue, item);
@@ -536,8 +540,8 @@ static void prepare_next_write_item(pa_pstream *p) {
}
#ifdef HAVE_CREDS
- if ((p->send_creds_now = p->write.current->with_creds))
- p->write_creds = p->write.current->creds;
+ if ((p->send_ancil_now = p->write.current->with_ancil))
+ p->write_ancil = p->write.current->ancil;
#endif
}
@@ -579,12 +583,16 @@ static int do_write(pa_pstream *p) {
pa_assert(l > 0);
#ifdef HAVE_CREDS
- if (p->send_creds_now) {
-
- if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
- goto fail;
-
- p->send_creds_now = false;
+ if (p->send_ancil_now) {
+ if (p->write_ancil.creds_valid) {
+ pa_assert(p->write_ancil.nfd == 0);
+ if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil.creds)) < 0)
+ goto fail;
+ }
+ else
+ if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil.nfd, p->write_ancil.fds)) < 0)
+ goto fail;
+ p->send_ancil_now = false;
} else
#endif
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index 9316d92..4961570 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -49,7 +49,7 @@ void pa_pstream_unref(pa_pstream*p);
void pa_pstream_unlink(pa_pstream *p);
-void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds);
+void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_ancil *ancil);
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk);
void pa_pstream_send_release(pa_pstream *p, uint32_t block_id);
void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id);
commit 06bc22b220905b127b02ddd95196d0b2bb4ea246
Author: David Henningsson <david.henningsson at canonical.com>
Date: Tue Apr 15 15:37:44 2014 +0200
iochannel/pstream/pdispatch: Add support for receiving file descriptors
The file descriptors are read from the iochannel just like the creds are.
So instead of passing just creds (and creds_valid), we now pass the
entire pa_ancil struct.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index f0f0e31..193d091 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -1780,14 +1780,14 @@ static void pstream_die_callback(pa_pstream *p, void *userdata) {
}
/* Called from main context */
-static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
+static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_ancil *ancil, void *userdata) {
struct userdata *u = userdata;
pa_assert(p);
pa_assert(packet);
pa_assert(u);
- if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
+ if (pa_pdispatch_run(u->pdispatch, packet, ancil, u) < 0) {
pa_log("Invalid packet");
pa_module_unload_request(u->module, true);
return;
diff --git a/src/pulse/context.c b/src/pulse/context.c
index d908023..e1cd900 100644
--- a/src/pulse/context.c
+++ b/src/pulse/context.c
@@ -316,7 +316,7 @@ static void pstream_die_callback(pa_pstream *p, void *userdata) {
pa_context_fail(c, PA_ERR_CONNECTIONTERMINATED);
}
-static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
+static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_ancil *ancil, void *userdata) {
pa_context *c = userdata;
pa_assert(p);
@@ -325,7 +325,7 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_c
pa_context_ref(c);
- if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0)
+ if (pa_pdispatch_run(c->pdispatch, packet, ancil, c) < 0)
pa_context_fail(c, PA_ERR_PROTOCOL);
pa_context_unref(c);
diff --git a/src/pulsecore/iochannel.c b/src/pulsecore/iochannel.c
index dce6734..e158018 100644
--- a/src/pulsecore/iochannel.c
+++ b/src/pulsecore/iochannel.c
@@ -348,21 +348,26 @@ ssize_t pa_iochannel_write_with_creds(pa_iochannel*io, const void*data, size_t l
return r;
}
-ssize_t pa_iochannel_read_with_creds(pa_iochannel*io, void*data, size_t l, pa_creds *creds, bool *creds_valid) {
+ssize_t pa_iochannel_read_with_ancil(pa_iochannel*io, void*data, size_t l, pa_ancil *ancil) {
ssize_t r;
struct msghdr mh;
struct iovec iov;
union {
struct cmsghdr hdr;
- uint8_t data[CMSG_SPACE(sizeof(struct ucred))];
+ uint8_t data[CMSG_SPACE(sizeof(struct ucred)) + CMSG_SPACE(sizeof(int) * MAX_ANCIL_FDS)];
} cmsg;
pa_assert(io);
pa_assert(data);
pa_assert(l);
pa_assert(io->ifd >= 0);
- pa_assert(creds);
- pa_assert(creds_valid);
+ pa_assert(ancil);
+
+ if (io->ifd_type > 0) {
+ ancil->creds_valid = false;
+ ancil->nfd = 0;
+ return pa_iochannel_read(io, data, l);
+ }
pa_zero(iov);
iov.iov_base = data;
@@ -378,19 +383,34 @@ ssize_t pa_iochannel_read_with_creds(pa_iochannel*io, void*data, size_t l, pa_cr
if ((r = recvmsg(io->ifd, &mh, 0)) >= 0) {
struct cmsghdr *cmh;
- *creds_valid = false;
+ ancil->creds_valid = false;
+ ancil->nfd = 0;
for (cmh = CMSG_FIRSTHDR(&mh); cmh; cmh = CMSG_NXTHDR(&mh, cmh)) {
- if (cmh->cmsg_level == SOL_SOCKET && cmh->cmsg_type == SCM_CREDENTIALS) {
+ if (cmh->cmsg_level != SOL_SOCKET)
+ continue;
+
+ if (cmh->cmsg_type == SCM_CREDENTIALS) {
struct ucred u;
pa_assert(cmh->cmsg_len == CMSG_LEN(sizeof(struct ucred)));
memcpy(&u, CMSG_DATA(cmh), sizeof(struct ucred));
- creds->gid = u.gid;
- creds->uid = u.uid;
- *creds_valid = true;
- break;
+ ancil->creds.gid = u.gid;
+ ancil->creds.uid = u.uid;
+ ancil->creds_valid = true;
+ }
+ else if (cmh->cmsg_type == SCM_RIGHTS) {
+ int nfd = (cmh->cmsg_len - CMSG_LEN(0)) / sizeof(int);
+ if (nfd > MAX_ANCIL_FDS) {
+ int i;
+ pa_log("Trying to receive too many file descriptors!");
+ for (i = 0; i < nfd; i++)
+ pa_close(((int*) CMSG_DATA(cmh))[i]);
+ continue;
+ }
+ memcpy(ancil->fds, CMSG_DATA(cmh), nfd * sizeof(int));
+ ancil->nfd = nfd;
}
}
@@ -398,6 +418,11 @@ ssize_t pa_iochannel_read_with_creds(pa_iochannel*io, void*data, size_t l, pa_cr
enable_events(io);
}
+ if (r == -1 && errno == ENOTSOCK) {
+ io->ifd_type = 1;
+ return pa_iochannel_read_with_ancil(io, data, l, ancil);
+ }
+
return r;
}
diff --git a/src/pulsecore/iochannel.h b/src/pulsecore/iochannel.h
index e95f46f..4da8902 100644
--- a/src/pulsecore/iochannel.h
+++ b/src/pulsecore/iochannel.h
@@ -58,7 +58,7 @@ bool pa_iochannel_creds_supported(pa_iochannel *io);
int pa_iochannel_creds_enable(pa_iochannel *io);
ssize_t pa_iochannel_write_with_creds(pa_iochannel*io, const void*data, size_t l, const pa_creds *ucred);
-ssize_t pa_iochannel_read_with_creds(pa_iochannel*io, void*data, size_t l, pa_creds *ucred, bool *creds_valid);
+ssize_t pa_iochannel_read_with_ancil(pa_iochannel*io, void*data, size_t l, pa_ancil *ancil);
#endif
bool pa_iochannel_is_readable(pa_iochannel*io);
diff --git a/src/pulsecore/pdispatch.c b/src/pulsecore/pdispatch.c
index 1766d6d..483ce6b 100644
--- a/src/pulsecore/pdispatch.c
+++ b/src/pulsecore/pdispatch.c
@@ -216,7 +216,7 @@ struct pa_pdispatch {
PA_LLIST_HEAD(struct reply_info, replies);
pa_pdispatch_drain_cb_t drain_callback;
void *drain_userdata;
- const pa_creds *creds;
+ const pa_ancil *ancil;
bool use_rtclock;
};
@@ -286,7 +286,7 @@ static void run_action(pa_pdispatch *pd, struct reply_info *r, uint32_t command,
pa_pdispatch_unref(pd);
}
-int pa_pdispatch_run(pa_pdispatch *pd, pa_packet*packet, const pa_creds *creds, void *userdata) {
+int pa_pdispatch_run(pa_pdispatch *pd, pa_packet*packet, const pa_ancil *ancil, void *userdata) {
uint32_t tag, command;
pa_tagstruct *ts = NULL;
int ret = -1;
@@ -320,7 +320,7 @@ int pa_pdispatch_run(pa_pdispatch *pd, pa_packet*packet, const pa_creds *creds,
}
#endif
- pd->creds = creds;
+ pd->ancil = ancil;
if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
struct reply_info *r;
@@ -344,7 +344,7 @@ int pa_pdispatch_run(pa_pdispatch *pd, pa_packet*packet, const pa_creds *creds,
ret = 0;
finish:
- pd->creds = NULL;
+ pd->ancil = NULL;
if (ts)
pa_tagstruct_free(ts);
@@ -437,5 +437,21 @@ const pa_creds * pa_pdispatch_creds(pa_pdispatch *pd) {
pa_assert(pd);
pa_assert(PA_REFCNT_VALUE(pd) >= 1);
- return pd->creds;
+ if (pd->ancil && pd->ancil->creds_valid)
+ return &pd->ancil->creds;
+ return NULL;
+}
+
+const int * pa_pdispatch_fds(pa_pdispatch *pd, int *nfd) {
+ pa_assert(pd);
+ pa_assert(PA_REFCNT_VALUE(pd) >= 1);
+ pa_assert(nfd);
+
+ if (pd->ancil) {
+ *nfd = pd->ancil->nfd;
+ return pd->ancil->fds;
+ }
+
+ *nfd = 0;
+ return NULL;
}
diff --git a/src/pulsecore/pdispatch.h b/src/pulsecore/pdispatch.h
index 797ddca..038f90d 100644
--- a/src/pulsecore/pdispatch.h
+++ b/src/pulsecore/pdispatch.h
@@ -41,7 +41,7 @@ pa_pdispatch* pa_pdispatch_new(pa_mainloop_api *m, bool use_rtclock, const pa_pd
void pa_pdispatch_unref(pa_pdispatch *pd);
pa_pdispatch* pa_pdispatch_ref(pa_pdispatch *pd);
-int pa_pdispatch_run(pa_pdispatch *pd, pa_packet*p, const pa_creds *creds, void *userdata);
+int pa_pdispatch_run(pa_pdispatch *pd, pa_packet*p, const pa_ancil *ancil, void *userdata);
void pa_pdispatch_register_reply(pa_pdispatch *pd, uint32_t tag, int timeout, pa_pdispatch_cb_t callback, void *userdata, pa_free_cb_t free_cb);
@@ -54,4 +54,6 @@ void pa_pdispatch_unregister_reply(pa_pdispatch *pd, void *userdata);
const pa_creds * pa_pdispatch_creds(pa_pdispatch *pd);
+const int * pa_pdispatch_fds(pa_pdispatch *pd, int *nfd);
+
#endif
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index 584e1d9..86865ee 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -4816,14 +4816,14 @@ static void command_set_port_latency_offset(pa_pdispatch *pd, uint32_t command,
/*** pstream callbacks ***/
-static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
+static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_ancil *ancil, void *userdata) {
pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
pa_assert(p);
pa_assert(packet);
pa_native_connection_assert_ref(c);
- if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
+ if (pa_pdispatch_run(c->pdispatch, packet, ancil, c) < 0) {
pa_log("invalid packet.");
native_connection_unlink(c);
}
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 2c1444f..a3afed5 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -165,8 +165,9 @@ struct pa_pstream {
pa_mempool *mempool;
#ifdef HAVE_CREDS
- pa_creds read_creds, write_creds;
- bool read_creds_valid, send_creds_now;
+ pa_ancil read_ancil;
+ pa_creds write_creds;
+ bool send_creds_now;
#endif
};
@@ -646,12 +647,20 @@ static int do_read(pa_pstream *p) {
#ifdef HAVE_CREDS
{
- bool b = 0;
+ pa_ancil b;
- if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
+ if ((r = pa_iochannel_read_with_ancil(p->io, d, l, &b)) <= 0)
goto fail;
- p->read_creds_valid = p->read_creds_valid || b;
+ if (b.creds_valid) {
+ p->read_ancil.creds_valid = true;
+ p->read_ancil.creds = b.creds;
+ }
+ if (b.nfd > 0) {
+ pa_assert(b.nfd <= MAX_ANCIL_FDS);
+ p->read_ancil.nfd = b.nfd;
+ memcpy(p->read_ancil.fds, b.fds, sizeof(int) * b.nfd);
+ }
}
#else
if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
@@ -799,7 +808,7 @@ static int do_read(pa_pstream *p) {
if (p->receive_packet_callback)
#ifdef HAVE_CREDS
- p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, p->read.packet, &p->read_ancil, p->receive_packet_callback_userdata);
#else
p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
#endif
@@ -860,7 +869,8 @@ frame_done:
p->read.data = NULL;
#ifdef HAVE_CREDS
- p->read_creds_valid = false;
+ p->read_ancil.creds_valid = false;
+ p->read_ancil.nfd = 0;
#endif
return 0;
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index deb2bc3..9316d92 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -37,7 +37,7 @@
typedef struct pa_pstream pa_pstream;
-typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata);
+typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const pa_ancil *ancil, void *userdata);
typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata);
typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata);
typedef void (*pa_pstream_block_id_cb_t)(pa_pstream *p, uint32_t block_id, void *userdata);
commit cc7a317e858cbd64159e72bcbaf4796992591dc4
Author: David Henningsson <david.henningsson at canonical.com>
Date: Tue Apr 15 15:07:42 2014 +0200
creds: Add struct for ancillary data
To support later patches that add sending/receiving file descriptors,
let's add this struct.
Signed-off-by: David Henningsson <david.henningsson at canonical.com>
diff --git a/src/pulsecore/creds.h b/src/pulsecore/creds.h
index aa1d560..f09b35e 100644
--- a/src/pulsecore/creds.h
+++ b/src/pulsecore/creds.h
@@ -29,8 +29,12 @@
#endif
#include <pulsecore/socket.h>
+#include <stdbool.h>
+
+#define MAX_ANCIL_FDS (2)
typedef struct pa_creds pa_creds;
+typedef struct pa_ancil pa_ancil;
#if defined(SCM_CREDENTIALS)
@@ -41,6 +45,15 @@ struct pa_creds {
uid_t uid;
};
+/* Struct for handling ancillary data, i e, extra data that can be sent together with a message
+ over unix pipes. Supports sending and receiving credentials and file descriptors. */
+struct pa_ancil {
+ pa_creds creds;
+ bool creds_valid;
+ int nfd;
+ int fds[MAX_ANCIL_FDS];
+};
+
#else
#undef HAVE_CREDS
#endif
More information about the pulseaudio-commits
mailing list