[pulseaudio-commits] r1871 - /branches/lennart/src/modules/module-combine.c
svnmailer-noreply at 0pointer.de
svnmailer-noreply at 0pointer.de
Wed Sep 19 15:21:56 PDT 2007
Author: lennart
Date: Thu Sep 20 00:21:55 2007
New Revision: 1871
URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=1871&root=pulseaudio&view=rev
Log:
render new data always in the master sink's thread, fixing missing locking
Modified:
branches/lennart/src/modules/module-combine.c
Modified: branches/lennart/src/modules/module-combine.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-combine.c?rev=1871&root=pulseaudio&r1=1870&r2=1871&view=diff
==============================================================================
--- branches/lennart/src/modules/module-combine.c (original)
+++ branches/lennart/src/modules/module-combine.c Thu Sep 20 00:21:55 2007
@@ -87,8 +87,9 @@
pa_sink *sink;
pa_sink_input *sink_input;
- pa_asyncmsgq *asyncmsgq;
- pa_rtpoll_item *rtpoll_item;
+ pa_asyncmsgq *inq, /* Message queue from the master to this sink input */
+ *outq; /* Message queue from this sink input to the master */
+ pa_rtpoll_item *inq_rtpoll_item, *outq_rtpoll_item;
pa_memblockq *memblockq;
@@ -106,8 +107,6 @@
pa_thread_mq thread_mq;
pa_rtpoll *rtpoll;
- pa_mutex *mutex;
-
struct output *master;
pa_time_event *time_event;
@@ -134,7 +133,8 @@
enum {
SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
- SINK_MESSAGE_REMOVE_OUTPUT
+ SINK_MESSAGE_REMOVE_OUTPUT,
+ SINK_MESSAGE_NEED
};
enum {
@@ -275,9 +275,51 @@
pa_log_debug("Thread shutting down");
}
+static void render_memblock(struct userdata *u, struct output *o, size_t length) {
+ pa_assert(u);
+ pa_assert(o);
+
+ if (!PA_SINK_OPENED(u->sink->thread_info.state))
+ return;
+
+ /* We are run by the master output (u->master), possibly on behalf
+ * of another output (o). The other output is waiting for us,
+ * hence it is safe to access its mainblockq directly. */
+
+ /* Maybe there's some data in the requesting output's queue
+ * now? */
+ while (pa_asyncmsgq_process_one(o->inq) > 0)
+ ;
+
+ /* Ok, now let's prepare some data if we really have to */
+ while (!pa_memblockq_is_readable(o->memblockq)) {
+ struct output *j;
+ pa_memchunk chunk;
+
+ /* Render data! */
+ pa_sink_render(u->sink, length, &chunk);
+
+ /* OK, let's send this data to the other threads */
+ for (j = o->userdata->thread_info.outputs; j; j = j->next)
+
+ /* Send to other outputs, which are not the requesting
+ * one, and not the master */
+
+ if (j != o && j != u->master && j->sink_input)
+ pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+
+ /* Now push it into the master queue */
+ pa_memblockq_push_align(u->master->memblockq, &chunk);
+
+ /* And into the requesting output's queue */
+ if (o != u->master)
+ pa_memblockq_push_align(o->memblockq, &chunk);
+
+ pa_memblock_unref(chunk.memblock);
+ }
+}
+
static void request_memblock(struct output *o, size_t length) {
- pa_memchunk chunk;
-
pa_assert(o);
pa_sink_input_assert_ref(o->sink_input);
pa_sink_assert_ref(o->userdata->sink);
@@ -285,7 +327,7 @@
/* If another thread already prepared some data we received
* the data over the asyncmsgq, hence let's first process
* it. */
- while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
+ while (pa_asyncmsgq_process_one(o->inq) > 0)
;
/* Check whether we're now readable */
@@ -293,33 +335,16 @@
return;
/* OK, we need to prepare new data */
- pa_mutex_lock(o->userdata->mutex);
-
- if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) {
-
- /* Maybe there's some data now? */
- while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
- ;
-
- /* Ok, now let's prepare some data if we really have to */
- while (!pa_memblockq_is_readable(o->memblockq)) {
- struct output *j;
-
- /* Do it! */
- pa_sink_render(o->userdata->sink, length, &chunk);
-
- /* OK, let's send this data to the other threads */
- for (j = o->userdata->thread_info.outputs; j; j = j->next)
- if (j != o && j->sink_input)
- pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
-
- /* And push it into our own queue */
- pa_memblockq_push_align(o->memblockq, &chunk);
- pa_memblock_unref(chunk.memblock);
- }
- }
-
- pa_mutex_unlock(o->userdata->mutex);
+
+ if (o == o->userdata->master)
+ /* OK, we're the master, so let's render some data */
+ render_memblock(o->userdata, o, length);
+
+ else
+ /* We're not the master, we need to ask the master to do the
+ * rendering for us */
+
+ pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, length, NULL);
}
/* Called from I/O thread context */
@@ -327,8 +352,7 @@
struct output *o;
pa_sink_input_assert_ref(i);
- o = i->userdata;
- pa_assert(o);
+ pa_assert_se(o = i->userdata);
/* If necessary, get some new data */
request_memblock(o, length);
@@ -342,8 +366,7 @@
pa_sink_input_assert_ref(i);
pa_assert(length > 0);
- o = i->userdata;
- pa_assert(o);
+ pa_assert_se(o = i->userdata);
pa_memblockq_drop(o->memblockq, length);
}
@@ -353,23 +376,42 @@
struct output *o;
pa_sink_input_assert_ref(i);
- o = i->userdata;
- pa_assert(o);
-
+ pa_assert_se(o = i->userdata);
+
+ pa_assert(!o->inq_rtpoll_item);
+
if (o->userdata->master == o) {
+ struct output *k;
+
+ pa_assert(!o->outq_rtpoll_item);
+
+ /* Set up the queues from the outputs to the master */
+ for (k = o->userdata->thread_info.outputs; k; k = k->next) {
+
+ pa_assert(!k->outq_rtpoll_item);
+
+ if (o == k)
+ continue;
+
+ k->outq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
+ i->sink->rtpoll,
+ PA_RTPOLL_EARLY+1, /* This one has a slightly lower priority than the normal message handling */
+ k->outq);
+ }
+
/* Calling these two functions here is safe, because both
- * threads that might access this sink input are known to be
+ * threads that might access this sink are known to be
* waiting for us. */
pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq);
pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll);
pa_sink_attach_within_thread(o->userdata->sink);
}
-
- pa_assert(!o->rtpoll_item);
- o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
+
+ /* Set up the queues from the inputs to the master */
+ o->inq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
i->sink->rtpoll,
PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */
- o->asyncmsgq);
+ o->inq);
}
/* Called from I/O thread context */
@@ -377,15 +419,27 @@
struct output *o;
pa_sink_input_assert_ref(i);
- o = i->userdata;
- pa_assert(o);
-
- pa_assert(o->rtpoll_item);
- pa_rtpoll_item_free(o->rtpoll_item);
- o->rtpoll_item = NULL;
-
- if (o->userdata->master == o)
+ pa_assert_se(o = i->userdata);
+
+ pa_assert(o->inq_rtpoll_item);
+ pa_rtpoll_item_free(o->inq_rtpoll_item);
+ o->inq_rtpoll_item = NULL;
+
+ if (o->userdata->master == o) {
+ struct output *k;
+
pa_sink_detach_within_thread(o->userdata->sink);
+
+ for (k = o->userdata->thread_info.outputs; k; k = k->next) {
+
+ if (o == k)
+ continue;
+
+ pa_assert(k->outq_rtpoll_item);
+ pa_rtpoll_item_free(k->outq_rtpoll_item);
+ k->outq_rtpoll_item = NULL;
+ }
+ }
}
/* Called from main context */
@@ -433,6 +487,7 @@
return pa_sink_input_process_msg(obj, code, data, offset, chunk);
}
+/* Called from main context */
static int suspend(struct userdata *u) {
struct output *o;
uint32_t idx;
@@ -458,6 +513,7 @@
return 0;
}
+/* Called from main context */
static int unsuspend(struct userdata *u) {
struct output *o;
uint32_t idx;
@@ -485,12 +541,12 @@
return 0;
}
+/* Called from main context */
static int sink_set_state(pa_sink *sink, pa_sink_state_t state) {
struct userdata *u;
pa_sink_assert_ref(sink);
- u = sink->userdata;
- pa_assert(u);
+ pa_assert_se(u = sink->userdata);
/* Please note that in contrast to the ALSA modules we call
* suspend/unsuspend from main context here! */
@@ -574,6 +630,10 @@
case SINK_MESSAGE_REMOVE_OUTPUT:
PA_LLIST_REMOVE(struct output, u->thread_info.outputs, (struct output*) data);
+ break;
+
+ case SINK_MESSAGE_NEED:
+ render_memblock(u, data, (size_t) offset);
break;
}
@@ -765,8 +825,10 @@
o = pa_xnew(struct output, 1);
o->userdata = u;
- o->asyncmsgq = pa_asyncmsgq_new(0);
- o->rtpoll_item = NULL;
+ o->inq = pa_asyncmsgq_new(0);
+ o->outq = pa_asyncmsgq_new(0);
+ o->inq_rtpoll_item = NULL;
+ o->outq_rtpoll_item = NULL;
o->sink = sink;
o->sink_input = NULL;
o->memblockq = pa_memblockq_new(
@@ -809,9 +871,12 @@
if (o->memblockq)
pa_memblockq_free(o->memblockq);
- if (o->asyncmsgq)
- pa_asyncmsgq_unref(o->asyncmsgq);
-
+ if (o->inq)
+ pa_asyncmsgq_unref(o->inq);
+
+ if (o->outq)
+ pa_asyncmsgq_unref(o->outq);
+
pa_xfree(o);
}
@@ -947,7 +1012,6 @@
u->thread_info.master = u->master = NULL;
u->time_event = NULL;
u->adjust_time = DEFAULT_ADJUST_TIME;
- u->mutex = pa_mutex_new(FALSE, TRUE);
pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = NULL;
u->thread = NULL;
@@ -1134,14 +1198,20 @@
pa_sink_input_unref(o->sink_input);
}
- if (o->rtpoll_item)
- pa_rtpoll_item_free(o->rtpoll_item);
+ if (o->inq_rtpoll_item)
+ pa_rtpoll_item_free(o->inq_rtpoll_item);
+
+ if (o->outq_rtpoll_item)
+ pa_rtpoll_item_free(o->outq_rtpoll_item);
+
+ if (o->inq)
+ pa_asyncmsgq_unref(o->inq);
+
+ if (o->outq)
+ pa_asyncmsgq_unref(o->outq);
if (o->memblockq)
pa_memblockq_free(o->memblockq);
-
- if (o->asyncmsgq)
- pa_asyncmsgq_unref(o->asyncmsgq);
pa_xfree(o);
}
@@ -1190,8 +1260,6 @@
if (u->time_event)
u->core->mainloop->time_free(u->time_event);
- pa_mutex_free(u->mutex);
-
pa_xfree(u);
}
More information about the pulseaudio-commits
mailing list