[pulseaudio-commits] r1822 - in /branches/lennart/src: modules/ pulsecore/

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Fri Sep 14 14:51:06 PDT 2007


Author: lennart
Date: Fri Sep 14 23:51:05 2007
New Revision: 1822

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=1822&root=pulseaudio&view=rev
Log:
simplify rt loops a bit by moving more code into pa_rtpoll. It is now possible to attach "work" functions to a pa_rtpoll_item, which will be called in each loop iteration. This allows us to hide the message processing in the RT loops and to drop the seperate sink_input->process hooks. Basically, only the driver-specific code remains in the RT loops.

Modified:
    branches/lennart/src/modules/module-alsa-sink.c
    branches/lennart/src/modules/module-alsa-source.c
    branches/lennart/src/modules/module-combine.c
    branches/lennart/src/modules/module-null-sink.c
    branches/lennart/src/modules/module-oss.c
    branches/lennart/src/modules/module-pipe-sink.c
    branches/lennart/src/modules/module-pipe-source.c
    branches/lennart/src/pulsecore/asyncmsgq.c
    branches/lennart/src/pulsecore/asyncmsgq.h
    branches/lennart/src/pulsecore/rtpoll.c
    branches/lennart/src/pulsecore/rtpoll.h
    branches/lennart/src/pulsecore/sink-input.c
    branches/lennart/src/pulsecore/sink-input.h
    branches/lennart/src/pulsecore/sink.c
    branches/lennart/src/pulsecore/source-output.c
    branches/lennart/src/pulsecore/source-output.h
    branches/lennart/src/pulsecore/source.c
    branches/lennart/src/pulsecore/thread-mq.c
    branches/lennart/src/pulsecore/thread-mq.h

Modified: branches/lennart/src/modules/module-alsa-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-alsa-sink.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-alsa-sink.c (original)
+++ branches/lennart/src/modules/module-alsa-sink.c Fri Sep 14 23:51:05 2007
@@ -630,24 +630,13 @@
             }
         }
 
-        /* Now give the sink inputs some to time to process their data */
-        if ((ret = pa_sink_process_inputs(u->sink)) < 0)
+        /* Hmm, nothing to do. Let's sleep */
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        if (ret > 0)
-            continue;
-
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
+
+        if (ret == 0)
             goto finish;
-        if (ret > 0)
-            continue;
-        
-        /* Hmm, nothing to do. Let's sleep */
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
-            goto fail;
-        }
-
+        
         /* Tell ALSA about this and process its response */
         if (PA_SINK_OPENED(u->sink->thread_info.state)) {
             struct pollfd *pollfd;
@@ -676,8 +665,8 @@
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 

Modified: branches/lennart/src/modules/module-alsa-source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-alsa-source.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-alsa-source.c (original)
+++ branches/lennart/src/modules/module-alsa-source.c Fri Sep 14 23:51:05 2007
@@ -612,24 +612,13 @@
             }
         }
 
-        /* Now give the source outputs some to time to process their data */
-        if ((ret = pa_source_process_outputs(u->source)) < 0)
+        /* Hmm, nothing to do. Let's sleep */
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        if (ret > 0)
-            continue;
-        
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
+
+        if (ret == 0)
             goto finish;
-        if (ret > 0)
-            continue;
-
-        /* Hmm, nothing to do. Let's sleep */
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
-            goto fail;
-        }
-
+        
         /* Tell ALSA about this and process its response */
         if (PA_SOURCE_OPENED(u->source->thread_info.state)) {
             struct pollfd *pollfd;
@@ -658,8 +647,8 @@
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 

Modified: branches/lennart/src/modules/module-combine.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-combine.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-combine.c (original)
+++ branches/lennart/src/modules/module-combine.c Fri Sep 14 23:51:05 2007
@@ -139,6 +139,10 @@
     SINK_MESSAGE_REMOVE_OUTPUT
 };
 
+enum {
+    SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX
+};
+
 static void output_free(struct output *o);
 static int output_create_sink_input(struct userdata *u, struct output *o);
 static int update_master(struct userdata *u, struct output *o);
@@ -255,28 +259,17 @@
         } else
             pa_rtpoll_set_timer_disabled(u->rtpoll);
 
-        /* Now give the sink inputs some to time to process their data */
-        if ((ret = pa_sink_process_inputs(u->sink)) < 0)
+        /* Hmm, nothing to do. Let's sleep */
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        if (ret > 0)
-            continue;
-
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
+
+        if (ret == 0)
             goto finish;
-        if (ret > 0)
-            continue;
-
-        /* Hmm, nothing to do. Let's sleep */
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
-            goto fail;
-        }
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 
@@ -294,10 +287,8 @@
     /* If another thread already prepared some data we received
      * the data over the asyncmsgq, hence let's first process
      * it. */
-    while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
-        pa_memblockq_push_align(o->memblockq, &chunk);
-        pa_asyncmsgq_done(o->asyncmsgq, 0);
-    }
+    while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
+        ;
     
     /* Check whether we're now readable */
     if (pa_memblockq_is_readable(o->memblockq))
@@ -309,10 +300,8 @@
     if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) {
     
         /* Maybe there's some data now? */
-        while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
-            pa_memblockq_push_align(o->memblockq, &chunk);
-            pa_asyncmsgq_done(o->asyncmsgq, 0);
-        }
+        while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
+            ;
         
         /* Ok, now let's prepare some data if we really have to */
         while (!pa_memblockq_is_readable(o->memblockq)) {
@@ -324,7 +313,7 @@
             /* OK, let's send this data to the other threads */
             for (j = o->userdata->thread_info.outputs; j; j = j->next)
                 if (j != o && j->sink_input)
-                    pa_asyncmsgq_post(j->asyncmsgq, NULL, 0, NULL, 0, &chunk, NULL);
+                    pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
             
             /* And push it into our own queue */
             pa_memblockq_push_align(o->memblockq, &chunk);
@@ -362,69 +351,41 @@
 }
 
 /* Called from I/O thread context */
-static int sink_input_process_cb(pa_sink_input *i) {
-    struct output *o;
-    pa_memchunk chunk;
-    int r = 0;
-    
+static void sink_input_attach_cb(pa_sink_input *i) {
+    struct output *o;
+
     pa_sink_input_assert_ref(i);
     o = i->userdata;
     pa_assert(o);
 
-    /* Move all data in the asyncmsgq into our memblockq */
-    
-    while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
-        if (PA_SINK_OPENED(i->sink->thread_info.state))
-            pa_memblockq_push_align(o->memblockq, &chunk);
-        pa_asyncmsgq_done(o->asyncmsgq, 0);
-    }
-
-    /* If the sink is suspended, flush our queue */
-    if (!PA_SINK_OPENED(i->sink->thread_info.state))
-        pa_memblockq_flush(o->memblockq);
-
-    if (o == o->userdata->thread_info.master) {
-        pa_mutex_lock(o->userdata->mutex);
-        r = pa_sink_process_inputs(o->userdata->sink);
-        pa_mutex_unlock(o->userdata->mutex);
-    }
-    
-    return r;
+    pa_assert(!o->rtpoll_item);
+    o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
+            i->sink->rtpoll,
+            PA_RTPOLL_NORMAL,  /* This one has a lower priority than the normal message handling */
+            o->asyncmsgq);
 }
 
 /* Called from I/O thread context */
-static void sink_input_attach_cb(pa_sink_input *i) {
+static void sink_input_detach_cb(pa_sink_input *i) {
     struct output *o;
 
     pa_sink_input_assert_ref(i);
     o = i->userdata;
     pa_assert(o);
 
-    pa_assert(!o->rtpoll_item);
-    o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(i->sink->rtpoll, PA_RTPOLL_NORMAL, o->asyncmsgq);
-}
-
-/* Called from I/O thread context */
-static void sink_input_detach_cb(pa_sink_input *i) {
+    pa_assert(o->rtpoll_item);
+    pa_rtpoll_item_free(o->rtpoll_item);
+    o->rtpoll_item = NULL;
+}
+
+/* Called from main context */
+static void sink_input_kill_cb(pa_sink_input *i) {
     struct output *o;
 
     pa_sink_input_assert_ref(i);
     o = i->userdata;
     pa_assert(o);
 
-    pa_assert(o->rtpoll_item);
-    pa_rtpoll_item_free(o->rtpoll_item);
-    o->rtpoll_item = NULL;
-}
-
-/* Called from main context */
-static void sink_input_kill_cb(pa_sink_input *i) {
-    struct output *o;
-
-    pa_sink_input_assert_ref(i);
-    o = i->userdata;
-    pa_assert(o);
-
     pa_sink_input_unlink(o->sink_input);
     pa_sink_input_unref(o->sink_input);
     o->sink_input = NULL;
@@ -448,6 +409,15 @@
             break;
         }
 
+        case SINK_INPUT_MESSAGE_POST: {
+
+            if (PA_SINK_OPENED(o->sink_input->sink->thread_info.state))
+                pa_memblockq_push_align(o->memblockq, chunk);
+            else
+                pa_memblockq_flush(o->memblockq);
+            
+            break;
+        }
     }
     
     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -784,7 +754,6 @@
     o->sink_input->parent.process_msg = sink_input_process_msg;
     o->sink_input->peek = sink_input_peek_cb;
     o->sink_input->drop = sink_input_drop_cb;
-    o->sink_input->process = sink_input_process_cb;
     o->sink_input->attach = sink_input_attach_cb;
     o->sink_input->detach = sink_input_detach_cb;
     o->sink_input->kill = sink_input_kill_cb;

Modified: branches/lennart/src/modules/module-null-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-null-sink.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-null-sink.c (original)
+++ branches/lennart/src/modules/module-null-sink.c Fri Sep 14 23:51:05 2007
@@ -145,28 +145,17 @@
         } else
             pa_rtpoll_set_timer_disabled(u->rtpoll);
 
-        /* Now give the sink inputs some to time to process their data */
-        if ((ret = pa_sink_process_inputs(u->sink)) < 0)
+        /* Hmm, nothing to do. Let's sleep */
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        if (ret > 0)
-            continue;
-
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
+        
+        if (ret == 0)
             goto finish;
-        if (ret > 0)
-            continue;
-
-        /* Hmm, nothing to do. Let's sleep */
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
-            goto fail;
-        }
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 

Modified: branches/lennart/src/modules/module-oss.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-oss.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-oss.c (original)
+++ branches/lennart/src/modules/module-oss.c Fri Sep 14 23:51:05 2007
@@ -999,28 +999,6 @@
 
 /*         pa_log("loop2"); */
 
-        /* Now give the sink inputs some to time to process their data */
-        if (u->sink) {
-            if ((ret = pa_sink_process_inputs(u->sink)) < 0)
-                goto fail;
-            if (ret > 0)
-                continue;
-        }
-
-        /* Now give the source outputs some to time to process their data */
-        if (u->source) {
-            if ((ret = pa_source_process_outputs(u->source)) < 0)
-                goto fail;
-            if (ret > 0)
-                continue;
-        }
-        
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
-            goto finish;
-        if (ret > 0)
-            continue;
-
         if (u->fd >= 0) {
             struct pollfd *pollfd;
 
@@ -1031,11 +1009,12 @@
         }
         
         /* Hmm, nothing to do. Let's sleep */
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        }
-
+
+        if (ret == 0)
+            goto finish;
+        
         if (u->fd >= 0) {
             struct pollfd *pollfd;
             
@@ -1052,8 +1031,8 @@
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 

Modified: branches/lennart/src/modules/module-pipe-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-pipe-sink.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-pipe-sink.c (original)
+++ branches/lennart/src/modules/module-pipe-sink.c Fri Sep 14 23:51:05 2007
@@ -126,8 +126,8 @@
     pa_rtpoll_install(u->rtpoll);
 
     for (;;) {
+        struct pollfd *pollfd;
         int ret;
-        struct pollfd *pollfd;
 
         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
 
@@ -170,36 +170,26 @@
             }
         }
 
-        /* Now give the sink inputs some to time to process their data */
-        if ((ret = pa_sink_process_inputs(u->sink)) < 0)
-            goto fail;
-        if (ret > 0)
-            continue;
-
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
-            goto finish;
-        if (ret > 0)
-            continue;
-        
         /* Hmm, nothing to do. Let's sleep */
         pollfd->events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0;
 
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        }
+
+        if (ret == 0)
+            goto finish;
 
         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+        
         if (pollfd->revents & ~POLLOUT) {
             pa_log("FIFO shutdown.");
             goto fail;
         }
-    }
+    } 
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 

Modified: branches/lennart/src/modules/module-pipe-source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/modules/module-pipe-source.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/modules/module-pipe-source.c (original)
+++ branches/lennart/src/modules/module-pipe-source.c Fri Sep 14 23:51:05 2007
@@ -149,26 +149,15 @@
             }
         }
 
-        /* Now give the source outputs some to time to process their data */
-        if ((ret = pa_source_process_outputs(u->source)) < 0)
-            goto fail;
-        if (ret > 0)
-            continue;
-
-        /* Check whether there is a message for us to process */
-        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
-            goto finish;
-        if (ret > 0)
-            continue;
-
         /* Hmm, nothing to do. Let's sleep */
         pollfd->events = u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0;
 
-        if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
-            pa_log("poll() failed: %s", pa_cstrerror(errno));
+        if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
             goto fail;
-        }
-
+
+        if (ret == 0)
+            goto finish;
+        
         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
         if (pollfd->revents & ~POLLIN) {
             pa_log("FIFO shutdown.");
@@ -177,8 +166,8 @@
     }
 
 fail:
-    /* We have to continue processing messages until we receive the
-     * SHUTDOWN message */
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 

Modified: branches/lennart/src/pulsecore/asyncmsgq.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/asyncmsgq.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/asyncmsgq.c (original)
+++ branches/lennart/src/pulsecore/asyncmsgq.c Fri Sep 14 23:51:05 2007
@@ -248,6 +248,27 @@
     return 0;
 }
 
+int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
+    pa_msgobject *object;
+    int code;
+    void *data;
+    pa_memchunk chunk;
+    int64_t offset;
+    int ret;
+
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+    
+    if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
+        return 0;
+    
+    pa_asyncmsgq_ref(a);
+    ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+    pa_asyncmsgq_done(a, ret);
+    pa_asyncmsgq_unref(a);
+    
+    return 1;
+}
+
 int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
     pa_assert(PA_REFCNT_VALUE(a) > 0);
 

Modified: branches/lennart/src/pulsecore/asyncmsgq.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/asyncmsgq.h?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/asyncmsgq.h (original)
+++ branches/lennart/src/pulsecore/asyncmsgq.h Fri Sep 14 23:51:05 2007
@@ -65,6 +65,7 @@
 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
 void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
+int pa_asyncmsgq_process_one(pa_asyncmsgq *a);
 
 /* Just for the reading side */
 int pa_asyncmsgq_get_fd(pa_asyncmsgq *q);

Modified: branches/lennart/src/pulsecore/rtpoll.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/rtpoll.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/rtpoll.c (original)
+++ branches/lennart/src/pulsecore/rtpoll.c Fri Sep 14 23:51:05 2007
@@ -53,7 +53,7 @@
     pa_usec_t period;
 
     int scan_for_dead;
-    int running, installed, rebuild_needed;
+    int running, installed, rebuild_needed, quit;
 
 #ifdef HAVE_PPOLL
     int rtsig;
@@ -76,6 +76,7 @@
     struct pollfd *pollfd;
     unsigned n_pollfd;
 
+    int (*work_cb)(pa_rtpoll_item *i);
     int (*before_cb)(pa_rtpoll_item *i);
     void (*after_cb)(pa_rtpoll_item *i);
     void *userdata;
@@ -134,6 +135,7 @@
     p->installed = 0;
     p->scan_for_dead = 0;
     p->rebuild_needed = 0;
+    p->quit = 0;
     
     PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
 
@@ -288,7 +290,6 @@
 int pa_rtpoll_run(pa_rtpoll *p, int wait) {
     pa_rtpoll_item *i;
     int r = 0;
-    int saved_errno = 0;
     struct timespec timeout;
     
     pa_assert(p);
@@ -297,6 +298,7 @@
     
     p->running = 1;
 
+    /* First, let's do some work */
     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
         int k;
         
@@ -306,12 +308,31 @@
         if (!i->before_cb)
             continue;
 
-        if ((k = i->before_cb(i)) != 0) {
+        if (p->quit)
+            goto finish;
+        
+        if ((k = i->work_cb(i)) != 0) {
+            if (k < 0)
+                r = k;
+            
+            goto finish;
+        }
+    }
+
+    /* Now let's prepare for entering the sleep */
+    for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
+        int k = 0;
+        
+        if (i->dead)
+            continue;
+        
+        if (!i->before_cb)
+            continue;
+
+        if (p->quit || (k = i->before_cb(i)) != 0) {
 
             /* Hmm, this one doesn't let us enter the poll, so rewind everything */
 
-            reset_all_revents(p);
-            
             for (i = i->prev; i; i = i->prev) {
                 
                 if (i->dead)
@@ -334,7 +355,7 @@
         rtpoll_rebuild(p);
 
     /* Calculate timeout */
-    if (!wait) {
+    if (!wait || p->quit) {
         timeout.tv_sec = 0;
         timeout.tv_nsec = 0;
     } else if (p->timer_enabled) {
@@ -362,13 +383,14 @@
         r = poll(p->pollfd, p->n_pollfd_used, p->timer_enabled > 0 ? (timeout.tv_sec*1000) + (timeout.tv_nsec / 1000000) : -1);
 #endif
 
-    if (r < 0)
+    if (r < 0) {
         reset_all_revents(p);
     
-    if (r < 0 && (errno == EAGAIN || errno == EINTR))
-        r = 0;
-
-    saved_errno = r < 0 ? errno : 0;
+        if (errno == EAGAIN || errno == EINTR)
+            r = 0;
+        else
+            pa_log_error("poll(): %s", pa_cstrerror(errno));
+    }
 
     if (p->timer_enabled) {
         if (p->period > 0) {
@@ -385,6 +407,7 @@
             p->timer_enabled = 0;
     }
 
+    /* Let's tell everyone that we left the sleep */
     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
 
         if (i->dead)
@@ -413,10 +436,7 @@
         }
     }
 
-    if (saved_errno != 0)
-        errno = saved_errno;
-
-    return r;
+    return r < 0 ? r : !p->quit;
 }
 
 static void update_timer(pa_rtpoll *p) {
@@ -528,6 +548,7 @@
     i->userdata = NULL;
     i->before_cb = NULL;
     i->after_cb = NULL;
+    i->work_cb = NULL;
 
     for (j = p->items; j; j = j->next) {
         if (prio <= j->priority)
@@ -585,6 +606,13 @@
     i->after_cb = after_cb;
 }
 
+void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
+    pa_assert(i);
+    pa_assert(i->priority < PA_RTPOLL_NEVER);
+
+    i->work_cb = work_cb;
+}
+
 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
     pa_assert(i);
 
@@ -649,6 +677,32 @@
     pa_asyncmsgq_after_poll(i->userdata);
 }
 
+static int asyncmsgq_work(pa_rtpoll_item *i) {
+    pa_msgobject *object;
+    int code;
+    void *data;
+    pa_memchunk chunk;
+    int64_t offset;
+
+    pa_assert(i);
+
+    if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
+        int ret;
+        
+        if (!object && code == PA_MESSAGE_SHUTDOWN) {
+            pa_asyncmsgq_done(i->userdata, 0);
+            pa_rtpoll_quit(i->rtpoll);
+            return 1;
+        }
+
+        ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+        pa_asyncmsgq_done(i->userdata, ret);
+        return 1;
+    } 
+
+    return 0;
+}
+
 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
     pa_rtpoll_item *i;
     struct pollfd *pollfd;
@@ -664,7 +718,14 @@
     
     i->before_cb = asyncmsgq_before;
     i->after_cb = asyncmsgq_after;
+    i->work_cb = asyncmsgq_work;
     i->userdata = q;
 
     return i;
 }
+
+void pa_rtpoll_quit(pa_rtpoll *p) {
+    pa_assert(p);
+
+    p->quit = 1;
+}

Modified: branches/lennart/src/pulsecore/rtpoll.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/rtpoll.h?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/rtpoll.h (original)
+++ branches/lennart/src/pulsecore/rtpoll.h Fri Sep 14 23:51:05 2007
@@ -69,7 +69,9 @@
 
 /* Sleep on the rtpoll until the time event, or any of the fd events
  * is triggered. If "wait" is 0 we don't sleep but only update the
- * struct pollfd. */
+ * struct pollfd. Returns negative on error, positive if the loop
+ * should continue to run, 0 when the loop should be terminated
+ * cleanly. */
 int pa_rtpoll_run(pa_rtpoll *f, int wait);
 
 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, const struct timespec *ts);
@@ -86,18 +88,30 @@
  * using the pointer and don't save the result anywhere */
 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds);
 
+/* Set the callback that shall be called when there's time to do some work: If the
+ * callback returns a value > 0, the poll is skipped and the next
+ * iteraton of the loop will start immediately. */
+void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i));
+
 /* Set the callback that shall be called immediately before entering
- * the sleeping poll: If the callback returns a negative value, the
- * poll is skipped. */
+ * the sleeping poll: If the callback returns a value > 0, the poll is
+ * skipped and the next iteraton of the loop will start
+ * immediately.. */
 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i));
 
 /* Set the callback that shall be called immediately after having
  * entered the sleeping poll */
 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i));
+
+
 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata);
 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i);
 
 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s);
 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
 
+/* Requests the loop to exit. Will cause the next iteration of
+ * pa_rtpoll_run() to return 0 */
+void pa_rtpoll_quit(pa_rtpoll *p);
+
 #endif

Modified: branches/lennart/src/pulsecore/sink-input.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/sink-input.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/sink-input.c (original)
+++ branches/lennart/src/pulsecore/sink-input.c Fri Sep 14 23:51:05 2007
@@ -194,7 +194,6 @@
     
     i->peek = NULL;
     i->drop = NULL;
-    i->process = NULL;
     i->kill = NULL;
     i->get_latency = NULL;
     i->attach = NULL;
@@ -272,7 +271,6 @@
     
     i->peek = NULL;
     i->drop = NULL;
-    i->process = NULL;
     i->kill = NULL;
     i->get_latency = NULL;
     i->attach = NULL;

Modified: branches/lennart/src/pulsecore/sink-input.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/sink-input.h?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/sink-input.h (original)
+++ branches/lennart/src/pulsecore/sink-input.h Fri Sep 14 23:51:05 2007
@@ -89,13 +89,6 @@
     /* Drops the specified number of bytes, usually called right after
      * peek(), but not necessarily. Called from IO thread context. */
     void (*drop) (pa_sink_input *i, size_t length);
-
-    /* If non-NULL this function is called in each IO event loop and
-     * can be used to do additional processing even when the device is
-     * suspended and peek() is never called. Should return 1 when
-     * "some work" has been done and the IO event loop should be
-     * reiterated immediately. Called from IO thread context. */
-    int (*process) (pa_sink_input *i);           /* may be NULL */
 
     /* If non-NULL this function is called when the input is first
      * connected to a sink. Called from IO thread context */

Modified: branches/lennart/src/pulsecore/sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/sink.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/sink.c (original)
+++ branches/lennart/src/pulsecore/sink.c Fri Sep 14 23:51:05 2007
@@ -922,20 +922,3 @@
     return ret;
 }
 
-int pa_sink_process_inputs(pa_sink *s) {
-    pa_sink_input *i;
-    void *state = NULL;
-    int r;
-    
-    pa_sink_assert_ref(s);
-
-    if (!PA_SINK_LINKED(s->thread_info.state))
-        return 0;
-    
-    while ((i = PA_SINK_INPUT(pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))))
-        if (i->process)
-            if ((r = i->process(i)))
-                return r;
-
-    return 0;
-}

Modified: branches/lennart/src/pulsecore/source-output.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/source-output.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/source-output.c (original)
+++ branches/lennart/src/pulsecore/source-output.c Fri Sep 14 23:51:05 2007
@@ -148,7 +148,6 @@
     o->channel_map = data->channel_map;
 
     o->push = NULL;
-    o->process = NULL;
     o->kill = NULL;
     o->get_latency = NULL;
     o->detach = NULL;
@@ -204,7 +203,6 @@
     pa_source_update_status(o->source);
 
     o->push = NULL;
-    o->process = NULL;
     o->kill = NULL;
     o->get_latency = NULL;
     o->attach = NULL;

Modified: branches/lennart/src/pulsecore/source-output.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/source-output.h?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/source-output.h (original)
+++ branches/lennart/src/pulsecore/source-output.h Fri Sep 14 23:51:05 2007
@@ -72,13 +72,6 @@
     /* Pushes a new memchunk into the output. Called from IO thread
      * context. */
     void (*push)(pa_source_output *o, const pa_memchunk *chunk);
-
-    /* If non-NULL this function is called in each IO event loop and
-     * can be used to do additional processing even when the device is
-     * suspended and peek() is never called. Should return 1 when
-     * "some work" has been done and the IO event loop should be
-     * reiterated immediately. Called from IO thread context. */
-    int (*process) (pa_source_output *o);           /* may be NULL */
 
     /* If non-NULL this function is called when the output is first
      * connected to a source. Called from IO thread context */

Modified: branches/lennart/src/pulsecore/source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/source.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/source.c (original)
+++ branches/lennart/src/pulsecore/source.c Fri Sep 14 23:51:05 2007
@@ -504,21 +504,3 @@
 
     return ret;
 }
-
-int pa_source_process_outputs(pa_source *s) {
-    pa_source_output *o;
-    void *state = NULL;
-    int r;
-    
-    pa_source_assert_ref(s);
-
-    if (!PA_SOURCE_LINKED(s->state))
-        return 0;
-    
-    while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
-        if (o->process)
-            if ((r = o->process(o)))
-                return r;
-
-    return 0;
-}

Modified: branches/lennart/src/pulsecore/thread-mq.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/thread-mq.c?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/thread-mq.c (original)
+++ branches/lennart/src/pulsecore/thread-mq.c Fri Sep 14 23:51:05 2007
@@ -110,28 +110,3 @@
 pa_thread_mq *pa_thread_mq_get(void) {
     return PA_STATIC_TLS_GET(thread_mq);
 }
-
-int pa_thread_mq_process(pa_thread_mq *q) {
-    pa_msgobject *object;
-    int code;
-    void *data;
-    pa_memchunk chunk;
-    int64_t offset;
-
-    pa_assert(q);
-
-    if (pa_asyncmsgq_get(q->inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
-        int ret;
-        
-        if (!object && code == PA_MESSAGE_SHUTDOWN) {
-            pa_asyncmsgq_done(q->inq, 0);
-            return -1;
-        }
-
-        ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
-        pa_asyncmsgq_done(q->inq, ret);
-        return 1;
-    } 
-
-    return 0;
-}

Modified: branches/lennart/src/pulsecore/thread-mq.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/thread-mq.h?rev=1822&root=pulseaudio&r1=1821&r2=1822&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/thread-mq.h (original)
+++ branches/lennart/src/pulsecore/thread-mq.h Fri Sep 14 23:51:05 2007
@@ -43,9 +43,6 @@
 /* Install the specified pa_thread_mq object for the current thread */
 void pa_thread_mq_install(pa_thread_mq *q);
 
-/* Dispatched queued events on the thread side. */
-int pa_thread_mq_process(pa_thread_mq *q);
-
 /* Return the pa_thread_mq object that is set for the current thread */
 pa_thread_mq *pa_thread_mq_get(void);
 




More information about the pulseaudio-commits mailing list