[pulseaudio-commits] r1871 - /branches/lennart/src/modules/module-combine.c

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Wed Sep 19 15:21:56 PDT 2007


Author: lennart
Date: Thu Sep 20 00:21:55 2007
New Revision: 1871

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=1871&root=pulseaudio&view=rev
Log:
render new data always in the master sink's thread, fixing missing locking

Modified:
    branches/lennart/src/modules/module-combine.c

Modified: branches/lennart/src/modules/module-combine.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-combine.c?rev=1871&root=pulseaudio&r1=1870&r2=1871&view=diff
==============================================================================
--- branches/lennart/src/modules/module-combine.c (original)
+++ branches/lennart/src/modules/module-combine.c Thu Sep 20 00:21:55 2007
@@ -87,8 +87,9 @@
     pa_sink *sink;
     pa_sink_input *sink_input;
 
-    pa_asyncmsgq *asyncmsgq;
-    pa_rtpoll_item *rtpoll_item;
+    pa_asyncmsgq *inq,    /* Message queue from the master to this sink input */
+                 *outq;   /* Message queue from this sink input to the master */
+    pa_rtpoll_item *inq_rtpoll_item, *outq_rtpoll_item;
     
     pa_memblockq *memblockq;
 
@@ -106,8 +107,6 @@
     pa_thread_mq thread_mq;
     pa_rtpoll *rtpoll;
 
-    pa_mutex *mutex;
-    
     struct output *master;
 
     pa_time_event *time_event; 
@@ -134,7 +133,8 @@
 
 enum {
     SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
-    SINK_MESSAGE_REMOVE_OUTPUT
+    SINK_MESSAGE_REMOVE_OUTPUT,
+    SINK_MESSAGE_NEED
 };
 
 enum {
@@ -275,9 +275,51 @@
     pa_log_debug("Thread shutting down");
 }
 
+static void render_memblock(struct userdata *u, struct output *o, size_t length) {
+    pa_assert(u);
+    pa_assert(o);
+
+    if (!PA_SINK_OPENED(u->sink->thread_info.state))
+        return;
+
+    /* We are run by the master output (u->master), possibly on behalf
+     * of another output (o). The other output is waiting for us,
+     * hence it is safe to access its mainblockq directly. */
+    
+    /* Maybe there's some data in the requesting output's queue
+     * now? */
+    while (pa_asyncmsgq_process_one(o->inq) > 0)
+        ;
+            
+    /* Ok, now let's prepare some data if we really have to */
+    while (!pa_memblockq_is_readable(o->memblockq)) {
+        struct output *j;
+        pa_memchunk chunk;
+        
+        /* Render data! */
+        pa_sink_render(u->sink, length, &chunk);
+        
+        /* OK, let's send this data to the other threads */
+        for (j = o->userdata->thread_info.outputs; j; j = j->next)
+
+            /* Send to other outputs, which are not the requesting
+             * one, and not the master */
+            
+            if (j != o && j != u->master && j->sink_input)
+                pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+        
+        /* Now push it into the master queue */
+        pa_memblockq_push_align(u->master->memblockq, &chunk);
+
+        /* And into the requesting output's queue */
+        if (o != u->master)
+            pa_memblockq_push_align(o->memblockq, &chunk);
+        
+        pa_memblock_unref(chunk.memblock);
+    }
+}
+
 static void request_memblock(struct output *o, size_t length) {
-    pa_memchunk chunk;
-    
     pa_assert(o);
     pa_sink_input_assert_ref(o->sink_input);
     pa_sink_assert_ref(o->userdata->sink);
@@ -285,7 +327,7 @@
     /* If another thread already prepared some data we received
      * the data over the asyncmsgq, hence let's first process
      * it. */
-    while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
+    while (pa_asyncmsgq_process_one(o->inq) > 0)
         ;
     
     /* Check whether we're now readable */
@@ -293,33 +335,16 @@
         return;
     
     /* OK, we need to prepare new data */
-    pa_mutex_lock(o->userdata->mutex);
-
-    if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) {
-    
-        /* Maybe there's some data now? */
-        while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
-            ;
-        
-        /* Ok, now let's prepare some data if we really have to */
-        while (!pa_memblockq_is_readable(o->memblockq)) {
-            struct output *j;
-            
-            /* Do it! */
-            pa_sink_render(o->userdata->sink, length, &chunk);
-            
-            /* OK, let's send this data to the other threads */
-            for (j = o->userdata->thread_info.outputs; j; j = j->next)
-                if (j != o && j->sink_input)
-                    pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
-            
-            /* And push it into our own queue */
-            pa_memblockq_push_align(o->memblockq, &chunk);
-            pa_memblock_unref(chunk.memblock);
-        }
-    }
-    
-    pa_mutex_unlock(o->userdata->mutex);
+
+    if (o == o->userdata->master)
+        /* OK, we're the master, so let's render some data */
+        render_memblock(o->userdata, o, length);
+
+    else
+        /* We're not the master, we need to ask the master to do the
+         * rendering for us */
+
+        pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, length, NULL);
 }
 
 /* Called from I/O thread context */
@@ -327,8 +352,7 @@
     struct output *o;
 
     pa_sink_input_assert_ref(i);
-    o = i->userdata;
-    pa_assert(o);
+    pa_assert_se(o = i->userdata);
 
     /* If necessary, get some new data */
     request_memblock(o, length);
@@ -342,8 +366,7 @@
 
     pa_sink_input_assert_ref(i);
     pa_assert(length > 0);
-    o = i->userdata;
-    pa_assert(o);
+    pa_assert_se(o = i->userdata);
 
     pa_memblockq_drop(o->memblockq, length);
 }
@@ -353,23 +376,42 @@
     struct output *o;
 
     pa_sink_input_assert_ref(i);
-    o = i->userdata;
-    pa_assert(o);
-
+    pa_assert_se(o = i->userdata);
+
+    pa_assert(!o->inq_rtpoll_item);
+    
     if (o->userdata->master == o) {
+        struct output *k;
+
+        pa_assert(!o->outq_rtpoll_item);
+        
+        /* Set up the queues from the outputs to the master */
+        for (k = o->userdata->thread_info.outputs; k; k = k->next) {
+
+            pa_assert(!k->outq_rtpoll_item);
+
+            if (o == k)
+                continue;
+            
+            k->outq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
+                    i->sink->rtpoll,
+                    PA_RTPOLL_EARLY+1,  /* This one has a slightly lower priority than the normal message handling */
+                    k->outq);
+        }
+        
         /* Calling these two functions here is safe, because both
-         * threads that might access this sink input are known to be
+         * threads that might access this sink are known to be
          * waiting for us. */
         pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq);
         pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll);
         pa_sink_attach_within_thread(o->userdata->sink);
     }
-    
-    pa_assert(!o->rtpoll_item);
-    o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
+
+    /* Set up the queues from the inputs to the master */
+    o->inq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
             i->sink->rtpoll,
             PA_RTPOLL_NORMAL,  /* This one has a lower priority than the normal message handling */
-            o->asyncmsgq);
+            o->inq);
 }
 
 /* Called from I/O thread context */
@@ -377,15 +419,27 @@
     struct output *o;
 
     pa_sink_input_assert_ref(i);
-    o = i->userdata;
-    pa_assert(o);
-
-    pa_assert(o->rtpoll_item);
-    pa_rtpoll_item_free(o->rtpoll_item);
-    o->rtpoll_item = NULL;
-
-    if (o->userdata->master == o)
+    pa_assert_se(o = i->userdata);
+
+    pa_assert(o->inq_rtpoll_item);
+    pa_rtpoll_item_free(o->inq_rtpoll_item);
+    o->inq_rtpoll_item = NULL;
+
+    if (o->userdata->master == o) {
+        struct output *k;
+        
         pa_sink_detach_within_thread(o->userdata->sink);
+
+        for (k = o->userdata->thread_info.outputs; k; k = k->next) {
+
+            if (o == k)
+                continue;
+
+            pa_assert(k->outq_rtpoll_item);
+            pa_rtpoll_item_free(k->outq_rtpoll_item);
+            k->outq_rtpoll_item = NULL;
+        }
+    }
 }
 
 /* Called from main context */
@@ -433,6 +487,7 @@
     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
 }
 
+/* Called from main context */
 static int suspend(struct userdata *u) {
     struct output *o;
     uint32_t idx;
@@ -458,6 +513,7 @@
     return 0;
 }
 
+/* Called from main context */
 static int unsuspend(struct userdata *u) {
     struct output *o;
     uint32_t idx;
@@ -485,12 +541,12 @@
     return 0;
 }
 
+/* Called from main context */
 static int sink_set_state(pa_sink *sink, pa_sink_state_t state) {
     struct userdata *u;
     
     pa_sink_assert_ref(sink);
-    u = sink->userdata;
-    pa_assert(u);
+    pa_assert_se(u = sink->userdata);
 
     /* Please note that in contrast to the ALSA modules we call
      * suspend/unsuspend from main context here! */
@@ -574,6 +630,10 @@
 
         case SINK_MESSAGE_REMOVE_OUTPUT:
             PA_LLIST_REMOVE(struct output, u->thread_info.outputs, (struct output*) data);
+            break;
+
+        case SINK_MESSAGE_NEED:
+            render_memblock(u, data, (size_t) offset);
             break;
     }
     
@@ -765,8 +825,10 @@
 
     o = pa_xnew(struct output, 1);
     o->userdata = u;
-    o->asyncmsgq = pa_asyncmsgq_new(0);
-    o->rtpoll_item = NULL;
+    o->inq = pa_asyncmsgq_new(0);
+    o->outq = pa_asyncmsgq_new(0);
+    o->inq_rtpoll_item = NULL;
+    o->outq_rtpoll_item = NULL;
     o->sink = sink;
     o->sink_input = NULL;
     o->memblockq = pa_memblockq_new(
@@ -809,9 +871,12 @@
         if (o->memblockq)
             pa_memblockq_free(o->memblockq);
 
-        if (o->asyncmsgq)
-            pa_asyncmsgq_unref(o->asyncmsgq);
-
+        if (o->inq)
+            pa_asyncmsgq_unref(o->inq);
+
+        if (o->outq)
+            pa_asyncmsgq_unref(o->outq);
+        
         pa_xfree(o);
     }
 
@@ -947,7 +1012,6 @@
     u->thread_info.master = u->master = NULL;
     u->time_event = NULL; 
     u->adjust_time = DEFAULT_ADJUST_TIME; 
-    u->mutex = pa_mutex_new(FALSE, TRUE);
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = NULL;
     u->thread = NULL;
@@ -1134,14 +1198,20 @@
         pa_sink_input_unref(o->sink_input);
     }
 
-    if (o->rtpoll_item)
-        pa_rtpoll_item_free(o->rtpoll_item);
+    if (o->inq_rtpoll_item)
+        pa_rtpoll_item_free(o->inq_rtpoll_item);
+
+    if (o->outq_rtpoll_item)
+        pa_rtpoll_item_free(o->outq_rtpoll_item);
+
+    if (o->inq)
+        pa_asyncmsgq_unref(o->inq);
+
+    if (o->outq)
+        pa_asyncmsgq_unref(o->outq);
 
     if (o->memblockq)
         pa_memblockq_free(o->memblockq);
-
-    if (o->asyncmsgq)
-        pa_asyncmsgq_unref(o->asyncmsgq);
     
     pa_xfree(o);
 }
@@ -1190,8 +1260,6 @@
     if (u->time_event)
         u->core->mainloop->time_free(u->time_event);
     
-    pa_mutex_free(u->mutex);
-        
     pa_xfree(u);
 }
 




More information about the pulseaudio-commits mailing list