[pulseaudio-discuss] [PATCH] module-tunnel-sink-new: add a rewrite of module-tunnel using libpulse

Tanu Kaskinen tanu.kaskinen at linux.intel.com
Tue Aug 20 01:11:09 PDT 2013


On Mon, 2013-08-19 at 07:41 +0200, Alexander Couzens wrote:
> Old module-tunnel shares duplicated functionality with libpulse because
> it is implementing pulse protocol again.
> module-tunnel-sink-new uses libpulse to connect to the remote server
> 
> Signed-off-by: Alexander Couzens <lynxis at fe80.eu>
> ---
>  src/Makefile.am                      |   6 +
>  src/modules/module-tunnel-sink-new.c | 526 +++++++++++++++++++++++++++++++++++
>  2 files changed, 532 insertions(+)
>  create mode 100644 src/modules/module-tunnel-sink-new.c
> 
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 6de6e96..27477e9 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -1097,6 +1097,7 @@ modlibexec_LTLIBRARIES += \
>  		module-remap-sink.la \
>  		module-remap-source.la \
>  		module-ladspa-sink.la \
> +		module-tunnel-sink-new.la \
>  		module-tunnel-sink.la \
>  		module-tunnel-source.la \
>  		module-position-event-sounds.la \
> @@ -1368,6 +1369,7 @@ SYMDEF_FILES = \
>  		module-ladspa-sink-symdef.h \
>  		module-equalizer-sink-symdef.h \
>  		module-match-symdef.h \
> +		module-tunnel-sink-new-symdef.h \
>  		module-tunnel-sink-symdef.h \
>  		module-tunnel-source-symdef.h \
>  		module-null-sink-symdef.h \
> @@ -1638,6 +1640,10 @@ module_match_la_SOURCES = modules/module-match.c
>  module_match_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_match_la_LIBADD = $(MODULE_LIBADD)
>  
> +module_tunnel_sink_new_la_SOURCES = modules/module-tunnel-sink-new.c
> +module_tunnel_sink_new_la_LDFLAGS = $(MODULE_LDFLAGS)
> +module_tunnel_sink_new_la_LIBADD = $(MODULE_LIBADD)
> +
>  module_tunnel_sink_la_SOURCES = modules/module-tunnel.c
>  module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS)
>  module_tunnel_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
> diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
> new file mode 100644
> index 0000000..18448c2
> --- /dev/null
> +++ b/src/modules/module-tunnel-sink-new.c
> @@ -0,0 +1,526 @@
> +/***
> +    This file is part of PulseAudio.
> +
> +    Copyright 2013 Alexander Couzens
> +
> +    PulseAudio is free software; you can redistribute it and/or modify
> +    it under the terms of the GNU Lesser General Public License as published
> +    by the Free Software Foundation; either version 2.1 of the License,
> +    or (at your option) any later version.
> +
> +    PulseAudio is distributed in the hope that it will be useful, but
> +    WITHOUT ANY WARRANTY; without even the implied warranty of
> +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +    General Public License for more details.
> +
> +    You should have received a copy of the GNU Lesser General Public License
> +    along with PulseAudio; if not, write to the Free Software
> +    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
> +    USA.
> +***/
> +
> +#ifdef HAVE_CONFIG_H
> +#include <config.h>
> +#endif
> +
> +#include <pulse/context.h>
> +#include <pulse/timeval.h>
> +#include <pulse/xmalloc.h>
> +#include <pulse/stream.h>
> +#include <pulse/mainloop.h>
> +#include <pulse/introspect.h>
> +#include <pulse/error.h>
> +
> +#include <pulsecore/core.h>
> +#include <pulsecore/core-util.h>
> +#include <pulsecore/i18n.h>
> +#include <pulsecore/sink.h>
> +#include <pulsecore/modargs.h>
> +#include <pulsecore/log.h>
> +#include <pulsecore/thread.h>
> +#include <pulsecore/thread-mq.h>
> +#include <pulsecore/poll.h>
> +#include <pulsecore/proplist-util.h>
> +
> +#include "module-tunnel-sink-new-symdef.h"
> +
> +PA_MODULE_AUTHOR("Alexander Couzens");
> +PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
> +PA_MODULE_VERSION(PACKAGE_VERSION);
> +PA_MODULE_LOAD_ONCE(false);
> +PA_MODULE_USAGE(
> +        "server=<address> "
> +        "sink=<name of the remote sink> "
> +        "sink_name=<name for the local sink> "
> +        "sink_properties=<properties for the local sink> "
> +        "format=<sample format> "
> +        "channels=<number of channels> "
> +        "rate=<sample rate> "
> +        "channel_map=<channel map>"
> +        );
> +
> +#define TUNNEL_THREAD_FAILED_MAINLOOP 1
> +
> +static void stream_state_cb(pa_stream *stream, void *userdata);
> +static void stream_buffer_attr_cb(pa_stream *stream, void *userdata);
> +static void context_state_cb(pa_context *c, void *userdata);
> +static void sink_update_requested_latency_cb(pa_sink *s);
> +
> +struct userdata {
> +    pa_module *module;
> +    pa_sink *sink;
> +    pa_thread *thread;
> +    pa_thread_mq thread_mq;
> +    pa_mainloop *thread_mainloop;
> +    pa_mainloop_api *thread_mainloop_api;
> +
> +    pa_context *context;
> +    pa_stream *stream;
> +
> +    pa_buffer_attr bufferattr;

I think you don't really need this field in userdata.

> +
> +    bool connected;
> +
> +    char *remote_server;
> +    char *remote_sink_name;
> +};
> +
> +static const char* const valid_modargs[] = {
> +    "sink_name",
> +    "sink_properties",
> +    "server",
> +    "sink",
> +    "format",
> +    "channels",
> +    "rate",
> +    "channel_map",
> +   /* "cookie", unimplemented */
> +   /* "reconnect", reconnect if server comes back again - unimplemented */
> +    NULL,
> +};
> +
> +static pa_proplist* tunnel_new_proplist(struct userdata *u) {
> +    pa_proplist *proplist = pa_proplist_new();
> +    pa_assert(proplist);
> +    pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
> +    pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
> +    pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
> +    pa_init_proplist(proplist);
> +
> +    return proplist;
> +}
> +
> +static void thread_func(void *userdata) {
> +    struct userdata *u = userdata;
> +    pa_proplist *proplist;
> +    pa_memchunk memchunk;
> +    pa_operation *operation;

This could be moved nearer to the pa_stream_cork() call, which is the
only place where this is needed. memchunk could also be moved closer to
where it's used.

> +
> +    pa_assert(u);
> +
> +    pa_log_debug("Thread starting up");
> +    pa_thread_mq_install(&u->thread_mq);
> +
> +    pa_memchunk_reset(&memchunk);

This is not necessary, because pa_sink_render_full() will anyway
overwrite all of the memchunk fields.

> +
> +    proplist = tunnel_new_proplist(u);
> +
> +    u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
> +                                              "PulseAudio",
> +                                              proplist);
> +    pa_proplist_free(proplist);
> +
> +    if (!u->context) {
> +        pa_log("Failed to create libpulse context");
> +        goto fail;
> +    }
> +
> +    pa_context_set_state_callback(u->context, context_state_cb, u);
> +    if (pa_context_connect(u->context,
> +                           u->remote_server,
> +                           PA_CONTEXT_NOAUTOSPAWN,
> +                           NULL) < 0) {
> +        pa_log("Failed to connect libpulse context");
> +        goto fail;
> +    }
> +
> +    for (;;) {
> +        int ret;
> +        const void *p;
> +
> +        size_t writable = 0;
> +
> +        if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
> +            if (ret == 0)
> +                goto finish;
> +            else
> +                goto fail;
> +        }
> +
> +        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
> +            pa_sink_process_rewind(u->sink, 0);
> +
> +        if (u->connected &&
> +                pa_stream_get_state(u->stream) == PA_STREAM_READY &&
> +                PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
> +            /* TODO: use IS_RUNNING + cork stream */
> +
> +            if (pa_stream_is_corked(u->stream)) {
> +                if((operation = pa_stream_cork(u->stream, 0, NULL, NULL))) {
> +                    pa_operation_unref(operation);
> +                }
> +            } else {
> +                writable = pa_stream_writable_size(u->stream);
> +                if (writable > 0) {
> +                    pa_sink_render_full(u->sink, writable, &memchunk);
> +
> +                    pa_assert(memchunk.length > 0);
> +
> +                    /* we have new data to write */
> +                    p = (const uint8_t *) pa_memblock_acquire(memchunk.memblock);

I don't see the point of the cast.

> +                    /* TODO: ZERO COPY! */
> +                    ret = pa_stream_write(u->stream,
> +                                         ((uint8_t*) p + memchunk.index),

The outer parentheses aren't useful in my opinion. And the indentation
is still not quite right (one space too little).

> +                                         memchunk.length,
> +                                         NULL,     /**< A cleanup routine for the data or NULL to request an internal copy */
> +                                         0,        /** offset */
> +                                         PA_SEEK_RELATIVE);
> +                    pa_memblock_release(memchunk.memblock);
> +                    pa_memblock_unref(memchunk.memblock);
> +                    pa_memchunk_reset(&memchunk);

This reset is unnecessary too.

> +
> +                    if (ret != 0) {
> +                        pa_log_error("Could not write data into the stream ... ret = %i", ret);
> +                        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
> +                    }
> +                }
> +            }
> +        }
> +    }
> +fail:
> +    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->module->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
> +    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
> +
> +finish:
> +    if (memchunk.memblock)
> +        pa_memblock_unref(memchunk.memblock);

This is unnecessary, the memblock never needs unreffing here.

> +
> +    if (u->stream) {
> +        pa_stream_disconnect(u->stream);
> +        pa_stream_unref(u->stream);
> +        u->stream = NULL;
> +    }
> +
> +    if (u->context) {
> +        pa_context_disconnect(u->context);
> +        pa_context_unref(u->context);
> +        u->context = NULL;
> +    }
> +
> +    pa_log_debug("Thread shutting down");
> +}
> +
> +static void stream_state_cb(pa_stream *stream, void *userdata) {
> +    struct userdata *u = userdata;
> +
> +    pa_assert(u);
> +
> +    switch (pa_stream_get_state(stream)) {
> +        case PA_STREAM_FAILED:
> +            pa_log_error("Stream failed.");
> +            u->connected = false;
> +            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
> +            break;
> +        case PA_STREAM_TERMINATED:
> +            pa_log_debug("Stream terminated.");
> +            break;
> +        case PA_STREAM_READY:
> +            /* just call our sink callback to ensure stream latency */
> +            sink_update_requested_latency_cb(u->sink);

sink_update_requested_latency_cb() will unconditionally try to set
tlength to the current requested latency. That doesn't match the logic I
requested you to implement:

* If the requested latency doesn't change between the stream creation
and the stream becoming ready, then the sink latency should be set to
whatever tlength the server assigned for our stream (actually I now
realized that there's no way or need to "set the sink latency" - what
you need to do instead is to set the sink max_request). If the server
decided to assign a different tlength than what we requested, it has a
reason for it, and there's no point trying to override that decision.
pa_stream_set_buffer_attr() must not be called in this case.

* If the requested latency changed between the stream creation and the
stream becoming ready, pa_stream_set_buffer_attr() should be called, but
only if the tlength that the server assigned is different than what
pa_sink_get_requested_latency_within_thread() returns (if the assigned
tlength is the same as the sink requested latency, then calling
pa_stream_set_buffer_attr() is of course redundant and unnecessary
overhead).

> +        default:
> +            break;
> +    }
> +}
> +
> +static void stream_buffer_attr_cb(pa_stream *stream, void *userdata) {
> +    struct userdata *u = userdata;
> +    const pa_buffer_attr *attr;
> +    pa_assert(u);
> +
> +    attr = pa_stream_get_buffer_attr(u->stream);
> +    u->bufferattr = *attr;

pa_sink_set_max_request_within_thread() should be called here.

> +}
> +
> +static void context_state_cb(pa_context *c, void *userdata) {
> +    struct userdata *u = userdata;
> +    pa_assert(u);
> +
> +    switch (pa_context_get_state(c)) {
> +        case PA_CONTEXT_UNCONNECTED:
> +        case PA_CONTEXT_CONNECTING:
> +        case PA_CONTEXT_AUTHORIZING:
> +        case PA_CONTEXT_SETTING_NAME:
> +            break;
> +        case PA_CONTEXT_READY: {
> +            pa_proplist *proplist;
> +            const char *username = pa_get_user_name_malloc();
> +            const char *hostname = pa_get_host_name_malloc();
> +            /* TODO: old tunnel say 'Null-Output' */
> +            char *stream_name = pa_sprintf_malloc("%s for %s@%s", "Tunnel", username, hostname);

You did no changes here. The TODO comment is still confusing, "Tunnel"
is unnecessarily inserted via substitution and the string is still not
translatable.

> +
> +            pa_log_debug("Connection successful. Creating stream.");
> +            pa_assert(!u->stream);
> +
> +            proplist = tunnel_new_proplist(u);
> +            pa_proplist_sets(proplist, PA_PROP_MEDIA_ROLE, "abstract");

So you wanted to set the media role anyway, even though I said it's best
to leave it unspecified? Perhaps you misunderstood what I meant what I
meant by it - now that I read my message again, it's possible to
understand it so that I wanted to leave the "abstract" role undocumented
(which is true). However, my wish regarding this code was that you would
not set any media role, because setting the role to "abstract" doesn't
really have any benefit compared to leaving the property unset.

> +
> +            u->stream = pa_stream_new_with_proplist(u->context,
> +                                                    stream_name,
> +                                                    &u->sink->sample_spec,
> +                                                    &u->sink->channel_map,
> +                                                    proplist);
> +            pa_proplist_free(proplist);
> +            pa_xfree(stream_name);
> +
> +            if (!u->stream) {
> +                pa_log_error("Could not create a stream.");
> +                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
> +                return;
> +            }
> +
> +            pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
> +            pa_stream_set_buffer_attr_callback(u->stream, stream_buffer_attr_cb, userdata);
> +            if (pa_stream_connect_playback(u->stream,
> +                                           u->remote_sink_name,
> +                                           &u->bufferattr,
> +                                           PA_STREAM_INTERPOLATE_TIMING || PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE,

"||" should be "|".

> +                                           NULL,
> +                                           NULL) < 0) {
> +                pa_log_error("Could not connect stream.");
> +                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
> +            }
> +            u->connected = true;
> +            break;
> +        }
> +        case PA_CONTEXT_FAILED:
> +            pa_log_debug("Context failed with %s.", pa_strerror(pa_context_errno(u->context)));

A colon would be better than "with", like this: "Context failed: %s"

> +            u->connected = false;
> +            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
> +            break;
> +        case PA_CONTEXT_TERMINATED:
> +            pa_log_debug("Context terminated.");
> +            u->connected = false;
> +            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
> +            break;
> +        default:
> +            break;
> +    }
> +}
> +
> +static void sink_update_requested_latency_cb(pa_sink *s) {
> +    struct userdata *u;
> +    pa_operation *operation;
> +    size_t nbytes;
> +    pa_usec_t block_usec;
> +
> +    pa_sink_assert_ref(s);
> +    pa_assert_se(u = s->userdata);
> +
> +    block_usec = pa_sink_get_requested_latency_within_thread(s);
> +
> +    if (block_usec == (pa_usec_t) -1)
> +        block_usec = s->thread_info.max_latency;
> +
> +    nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
> +    pa_sink_set_max_rewind_within_thread(s, 0);

Setting max_rewind isn't necessary here, because it's already 0. (I
should have said this already in the previous review, but I didn't
realize it at that time.)

Why did you remove the pa_sink_set_max_request_within_thread() call?

> +
> +    if (block_usec != (pa_usec_t) -1) {
> +        u->bufferattr.tlength = nbytes;
> +    }

>From the previous review:

"block_usec is never -1, because if
pa_sink_get_requested_latency_within_thread() returns -1, then you set
block_usec to s->thread_info.max_latency."

> +
> +    if (u->stream && (pa_stream_get_state(u->stream) == PA_STREAM_READY)) {
> +        if((operation = pa_stream_set_buffer_attr(u->stream, &u->bufferattr, NULL, NULL)))

Missing space after "if".

I looked at the stream.c code, and it seems that the buffer attr
callback is not called when you request the change yourself via
pa_stream_set_buffer_attr(). For that reason you need to provide a
callback here, so that you know what attributes the server assigns (it's
not guaranteed that the server does exactly what you ask it to do).

Also, I think the pa_buffer_attr struct that you pass to
pa_stream_set_buffer_attr() should have all other fields initialized to
-1 except tlength. With the current code they are -1 during the
initialization of the module, but in stream_buffer_attr_cb() all fields
get set to something else than -1, and that something else may not be
appropriate later when you want to update tlength. (If you remove
bufferattr from userdata as I suggest, this problem should take care of
itself.)

> +            pa_operation_unref(operation);
> +    }
> +}
> +
> +static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
> +    struct userdata *u = PA_SINK(o)->userdata;
> +
> +    switch (code) {
> +        case PA_SINK_MESSAGE_GET_LATENCY: {
> +            int negative;
> +            pa_usec_t remote_latency;
> +
> +            if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
> +                *((pa_usec_t*) data) = 0;
> +                return 0;
> +            }
> +
> +            if (!u->stream) {
> +                *((pa_usec_t*) data) = 0;
> +                return 0;
> +            }
> +
> +            if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
> +                *((pa_usec_t*) data) = 0;
> +                return 0;
> +            }
> +
> +            if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
> +                *((pa_usec_t*) data) = 0;
> +                return 0;
> +            }
> +
> +            *((pa_usec_t*) data) =
> +                remote_latency;

Unnecessary line wrapping.

> +            return 0;
> +        }
> +    }
> +    return pa_sink_process_msg(o, code, data, offset, chunk);
> +}
> +
> +int pa__init(pa_module *m) {
> +    struct userdata *u = NULL;
> +    pa_modargs *ma = NULL;
> +    pa_sink_new_data sink_data;
> +    pa_sample_spec ss;
> +    pa_channel_map map;
> +    const char *remote_server = NULL;
> +    const char *sink_name = NULL;
> +    char *default_sink_name = NULL;
> +
> +    pa_assert(m);
> +
> +    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
> +        pa_log("Failed to parse module arguments.");
> +        goto fail;
> +    }
> +
> +    ss = m->core->default_sample_spec;
> +    map = m->core->default_channel_map;
> +    if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
> +        pa_log("Invalid sample format specification or channel map");
> +        goto fail;
> +    }
> +
> +    remote_server = pa_modargs_get_value(ma, "server", NULL);
> +    if (!remote_server) {
> +        pa_log("No server given!");
> +        goto fail;
> +    }
> +
> +    u = pa_xnew0(struct userdata, 1);
> +    u->module = m;
> +    m->userdata = u;
> +    u->remote_server = pa_xstrdup(remote_server);
> +    u->thread_mainloop = pa_mainloop_new();
> +    if (u->thread_mainloop == NULL) {
> +        pa_log("Failed to create mainloop");
> +        goto fail;
> +    }
> +    u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
> +
> +    u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
> +
> +    u->bufferattr.maxlength = (uint32_t) -1;
> +    u->bufferattr.minreq = (uint32_t) -1;
> +    u->bufferattr.prebuf = (uint32_t) -1;
> +    u->bufferattr.tlength = (uint32_t) -1;

As I said in the previous review (perhaps not clearly enough), tlength
should be initialized to max_latency of the sink, to be consistent with
the case when the requested latency changes to -1 (in that case you set
tlength to max_latency).

-- 
Tanu



More information about the pulseaudio-discuss mailing list