[pulseaudio-commits] r2308 - in /branches/glitch-free/src: modules/ pulsecore/

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Wed Apr 23 11:26:49 PDT 2008


Author: lennart
Date: Wed Apr 23 20:26:48 2008
New Revision: 2308

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=2308&root=pulseaudio&view=rev
Log:
Big pile of interdependant changes:
* Fix a deadlock when an asyncq overflows and an RT thread needed to wait until space became available again while the main thread was waiting for a operation to complete and thus didn't free any new items. Now, if the asyncq overflows, queue those items temporarily, and return immediately. Then, when the queue becomes writable again, flush it.
* Modify pa_thread_mq_init() to also set up pa_rtpoll events properly for the MQ
* Some more pa_bool_t'ization
* Unify more common code between alsa-sink and alsa-source
* The upper limit for the tsched watermark is max_use minus one frame
* make module-alsa-source work
* make the alsa modules use pa_alsa_build_pollfd() now
* fix detection of dB scale for alsa-source

Modified:
    branches/glitch-free/src/modules/module-alsa-sink.c
    branches/glitch-free/src/modules/module-alsa-source.c
    branches/glitch-free/src/modules/module-null-sink.c
    branches/glitch-free/src/modules/module-pipe-sink.c
    branches/glitch-free/src/modules/module-pipe-source.c
    branches/glitch-free/src/pulsecore/asyncmsgq.c
    branches/glitch-free/src/pulsecore/asyncmsgq.h
    branches/glitch-free/src/pulsecore/asyncq.c
    branches/glitch-free/src/pulsecore/asyncq.h
    branches/glitch-free/src/pulsecore/rtpoll.c
    branches/glitch-free/src/pulsecore/rtpoll.h
    branches/glitch-free/src/pulsecore/thread-mq.c
    branches/glitch-free/src/pulsecore/thread-mq.h

Modified: branches/glitch-free/src/modules/module-alsa-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/modules/module-alsa-sink.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/modules/module-alsa-sink.c (original)
+++ branches/glitch-free/src/modules/module-alsa-sink.c Wed Apr 23 20:26:48 2008
@@ -131,7 +131,6 @@
     int64_t frame_index;
 
     snd_pcm_sframes_t hwbuf_unused_frames;
-    snd_pcm_sframes_t avail_min_frames;
 };
 
 static void fix_tsched_watermark(struct userdata *u) {
@@ -140,8 +139,61 @@
 
     max_use = u->hwbuf_size - u->hwbuf_unused_frames * u->frame_size;
 
-    if (u->tsched_watermark >= max_use/2)
-        u->tsched_watermark = max_use/2;
+    if (u->tsched_watermark >= max_use-u->frame_size)
+        u->tsched_watermark = max_use-u->frame_size;
+}
+
+static int try_recover(struct userdata *u, const char *call, int err) {
+    pa_assert(u);
+    pa_assert(call);
+    pa_assert(err < 0);
+
+    pa_log_debug("%s: %s", call, snd_strerror(err));
+
+    if (err == -EAGAIN) {
+        pa_log_debug("%s: EAGAIN", call);
+        return 1;
+    }
+
+    if (err == -EPIPE)
+        pa_log_debug("%s: Buffer underrun!", call);
+
+    if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
+        u->first = TRUE;
+        return 0;
+    }
+
+    pa_log("%s: %s", call, snd_strerror(err));
+    return -1;
+}
+
+static void check_left_to_play(struct userdata *u, snd_pcm_sframes_t n) {
+    size_t left_to_play;
+
+    if (u->first)
+        return;
+
+    if (n*u->frame_size < u->hwbuf_size)
+        left_to_play = u->hwbuf_size - (n*u->frame_size);
+    else
+        left_to_play = 0;
+
+    if (left_to_play > 0)
+        pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
+    else {
+        pa_log_info("Underrun!");
+
+        if (u->use_tsched) {
+            size_t old_watermark = u->tsched_watermark;
+
+            u->tsched_watermark *= 2;
+            fix_tsched_watermark(u);
+
+            if (old_watermark != u->tsched_watermark)
+                pa_log_notice("Increasing wakeup watermark to %0.2f ms",
+                              (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
+        }
+    }
 }
 
 static int mmap_write(struct userdata *u) {
@@ -154,10 +206,9 @@
         pa_memchunk chunk;
         void *p;
         snd_pcm_sframes_t n;
-        int err;
+        int err, r;
         const snd_pcm_channel_area_t *areas;
         snd_pcm_uframes_t offset, frames;
-        size_t left_to_play;
 
         snd_pcm_hwsync(u->pcm_handle);
 
@@ -166,24 +217,15 @@
 
         if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
 
-            pa_log_debug("snd_pcm_avail_update: %s", snd_strerror(n));
-
-            if (err == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (n == -EPIPE)
-                pa_log_debug("snd_pcm_avail_update: Buffer underrun!");
-
-            if ((err = snd_pcm_recover(u->pcm_handle, n, 1)) == 0) {
-                u->first = TRUE;
-                continue;
-            }
-
-            pa_log("snd_pcm_recover: %s", snd_strerror(err));
-            return -1;
-        }
+
+            return r;
+        }
+
+        check_left_to_play(u, n);
 
         /* We only use part of the buffer that matches our
          * dynamically requested latency */
@@ -191,44 +233,23 @@
         if (PA_UNLIKELY(n <= u->hwbuf_unused_frames))
             return work_done;
 
-        if (n*u->frame_size < u->hwbuf_size)
-            left_to_play = u->hwbuf_size - (n*u->frame_size);
-        else
-            left_to_play = 0;
-
-        pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
-
-        if (left_to_play <= 0 && !u->first) {
-            u->tsched_watermark *= 2;
-            fix_tsched_watermark(u);
-            pa_log_notice("Underrun! Increasing wakeup watermark to %0.2f ms",
-                          (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
-        }
-
         frames = n = n - u->hwbuf_unused_frames;
 
-        pa_log_debug("%llu frames to write", (unsigned long long) frames);
+        pa_log_debug("%lu frames to write", (unsigned long) frames);
 
         if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) {
 
-            pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err));
-
-            if (err == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (err == -EPIPE)
-                pa_log_debug("snd_pcm_mmap_begin: Buffer underrun!");
-
-            if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
-                u->first = TRUE;
-                continue;
-            }
-
-            pa_log("Failed to write data to DSP: %s", snd_strerror(err));
-            return -1;
-        }
+
+            return r;
+        }
+
+        /* Make sure that if these memblocks need to be copied they will fit into one slot */
+        if (frames > pa_mempool_block_size_max(u->sink->core->mempool)/u->frame_size)
+            frames = pa_mempool_block_size_max(u->sink->core->mempool)/u->frame_size;
 
         /* Check these are multiples of 8 bit */
         pa_assert((areas[0].first & 7) == 0);
@@ -240,7 +261,7 @@
 
         p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
 
-        chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, 1);
+        chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
         chunk.length = pa_memblock_get_length(chunk.memblock);
         chunk.index = 0;
 
@@ -252,30 +273,19 @@
 
         if (PA_UNLIKELY((err = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {
 
-            pa_log_debug("snd_pcm_mmap_commit: %s", snd_strerror(err));
-
-            if (err == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_mmap_commit", err)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (err == -EPIPE)
-                pa_log_debug("snd_pcm_mmap_commit: Buffer underrun!");
-
-            if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
-                u->first = TRUE;
-                continue;
-            }
-
-            pa_log("Failed to write data to DSP: %s", snd_strerror(err));
-            return -1;
+
+            return r;
         }
 
         work_done = 1;
 
         u->frame_index += frames;
 
-        pa_log_debug("wrote %llu frames", (unsigned long long) frames);
+        pa_log_debug("wrote %lu frames", (unsigned long) frames);
 
         if (PA_LIKELY(frames >= (snd_pcm_uframes_t) n))
             return work_done;
@@ -283,10 +293,7 @@
 }
 
 static int unix_write(struct userdata *u) {
-    snd_pcm_status_t *status;
     int work_done = 0;
-
-    snd_pcm_status_alloca(&status);
 
     pa_assert(u);
     pa_sink_assert_ref(u->sink);
@@ -294,28 +301,28 @@
     for (;;) {
         void *p;
         snd_pcm_sframes_t n, frames;
-        int err;
+        int r;
 
         snd_pcm_hwsync(u->pcm_handle);
-        snd_pcm_avail_update(u->pcm_handle);
-
-        if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) {
-            pa_log("Failed to query DSP status data: %s", snd_strerror(err));
-            return -1;
-        }
-
-        if (PA_UNLIKELY(snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size))
-            pa_log_debug("Buffer underrun!");
-
-        n = snd_pcm_status_get_avail(status);
-
-        /* We only use part of the buffer that matches our
-         * dynamically requested latency */
+
+        if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
+
+            if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0)
+                continue;
+            else if (r > 0)
+                return work_done;
+
+            return r;
+        }
+
+        check_left_to_play(u, n);
 
         if (PA_UNLIKELY(n <= u->hwbuf_unused_frames))
             return work_done;
 
         n -= u->hwbuf_unused_frames;
+
+        pa_log_debug("%lu frames to write", (unsigned long) frames);
 
         if (u->memchunk.length <= 0)
             pa_sink_render(u->sink, n * u->frame_size, &u->memchunk);
@@ -335,21 +342,12 @@
 
         if (PA_UNLIKELY(frames < 0)) {
 
-            if (frames == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_writei", n)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (frames == -EPIPE)
-                pa_log_debug("snd_pcm_avail_update: Buffer underrun!");
-
-            if ((frames = snd_pcm_recover(u->pcm_handle, frames, 1)) == 0) {
-                u->first = TRUE;
-                continue;
-            }
-
-            pa_log("Failed to write data to DSP: %s", snd_strerror(frames));
-            return -1;
+
+            return r;
         }
 
         u->memchunk.index += frames * u->frame_size;
@@ -363,6 +361,8 @@
         work_done = 1;
 
         u->frame_index += frames;
+
+        pa_log_debug("wrote %lu frames", (unsigned long) frames);
 
         if (PA_LIKELY(frames >= n))
             return work_done;
@@ -399,8 +399,8 @@
         return;
     }
 
-
     frames = u->frame_index - delay;
+
 /*     pa_log_debug("frame_index = %llu, delay = %llu, p = %llu", (unsigned long long) u->frame_index, (unsigned long long) delay, (unsigned long long) frames); */
 
 /*     snd_pcm_status_get_tstamp(status, &timestamp); */
@@ -422,7 +422,7 @@
     now1 = pa_rtclock_usec();
     now2 = pa_smoother_get(u->smoother, now1);
 
-    delay = (int64_t) pa_bytes_to_usec(u->frame_index * u->frame_size, &u->sink->sample_spec) - now2;
+    delay = (int64_t) pa_bytes_to_usec(u->frame_index * u->frame_size, &u->sink->sample_spec) - (int64_t) now2;
 
     if (delay > 0)
         r = (pa_usec_t) delay;
@@ -434,28 +434,14 @@
 }
 
 static int build_pollfd(struct userdata *u) {
-    int err;
-    struct pollfd *pollfd;
-    int n;
-
     pa_assert(u);
     pa_assert(u->pcm_handle);
-
-    if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) {
-        pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n));
-        return -1;
-    }
 
     if (u->alsa_rtpoll_item)
         pa_rtpoll_item_free(u->alsa_rtpoll_item);
 
-    u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, n);
-    pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL);
-
-    if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) {
-        pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err));
+    if (!(u->alsa_rtpoll_item = pa_alsa_build_pollfd(u->pcm_handle, u->rtpoll)))
         return -1;
-    }
 
     return 0;
 }
@@ -491,7 +477,7 @@
     if (usec == (pa_usec_t) -1)
         usec = pa_bytes_to_usec(u->hwbuf_size, &u->sink->sample_spec);
 
-/*     pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
+    pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
 
     wm = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
 
@@ -505,25 +491,27 @@
         usec /= 2;
     }
 
-/*     pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
+    pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
 
     return usec;
 }
 
 static int update_sw_params(struct userdata *u) {
+    snd_pcm_uframes_t avail_min;
     int err;
-    pa_usec_t latency;
 
     pa_assert(u);
 
     /* Use the full buffer if noone asked us for anything specific */
     u->hwbuf_unused_frames = 0;
 
-    if (u->use_tsched)
+    if (u->use_tsched) {
+        pa_usec_t latency;
+
         if ((latency = pa_sink_get_requested_latency_within_thread(u->sink)) != (pa_usec_t) -1) {
             size_t b;
 
-            pa_log_debug("latency set to %llu", (unsigned long long) latency);
+            pa_log_debug("latency set to %0.2f", (double) latency / PA_USEC_PER_MSEC);
 
             b = pa_usec_to_bytes(latency, &u->sink->sample_spec);
 
@@ -538,23 +526,23 @@
 
             fix_tsched_watermark(u);
         }
+    }
 
     pa_log_debug("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames);
 
     /* We need at last one frame in the used part of the buffer */
-    u->avail_min_frames = u->hwbuf_unused_frames + 1;
+    avail_min = u->hwbuf_unused_frames + 1;
 
     if (u->use_tsched) {
         pa_usec_t usec;
 
         usec = hw_sleep_time(u);
-
-        u->avail_min_frames += (pa_usec_to_bytes(usec, &u->sink->sample_spec) / u->frame_size);
-    }
-
-    pa_log_debug("setting avail_min=%lu", (unsigned long) u->avail_min_frames);
-
-    if ((err = pa_alsa_set_sw_params(u->pcm_handle, u->avail_min_frames)) < 0) {
+        avail_min += pa_usec_to_bytes(usec, &u->sink->sample_spec);
+    }
+
+    pa_log_debug("setting avail_min=%lu", (unsigned long) avail_min);
+
+    if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) {
         pa_log("Failed to set software parameters: %s", snd_strerror(err));
         return err;
     }
@@ -678,14 +666,6 @@
             }
 
             break;
-
-/*         case PA_SINK_MESSAGE_ADD_INPUT: */
-/*         case PA_SINK_MESSAGE_REMOVE_INPUT: */
-/*         case PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER: { */
-/*             int r = pa_sink_process_msg(o, code, data, offset, chunk); */
-/*             update_hwbuf_unused_frames(u); */
-/*             return r; */
-/*         } */
     }
 
     return pa_sink_process_msg(o, code, data, offset, chunk);
@@ -1091,10 +1071,9 @@
     u->use_mmap = use_mmap;
     u->use_tsched = use_tsched;
     u->first = TRUE;
-    pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = pa_rtpoll_new();
+    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
     u->alsa_rtpoll_item = NULL;
-    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
 
     u->smoother = pa_smoother_new(DEFAULT_TSCHED_BUFFER_USEC*2, DEFAULT_TSCHED_BUFFER_USEC*2, TRUE);
     usec = pa_rtclock_usec();
@@ -1238,7 +1217,6 @@
     u->nfragments = nfrags;
     u->hwbuf_size = u->fragment_size * nfrags;
     u->hwbuf_unused_frames = 0;
-    u->avail_min_frames = 0;
     u->tsched_watermark = tsched_watermark;
     u->frame_index = 0;
     u->hw_dB_supported = FALSE;
@@ -1246,12 +1224,10 @@
     u->hw_volume_min = u->hw_volume_max = 0;
 
     if (use_tsched)
-        if (u->tsched_watermark >= u->hwbuf_size/2)
-            u->tsched_watermark = pa_frame_align(u->hwbuf_size/2, &ss);
+        fix_tsched_watermark(u);
 
     u->sink->thread_info.max_rewind = use_tsched ? u->hwbuf_size : 0;
     u->sink->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss);
-
     if (!use_tsched)
         u->sink->min_latency = u->sink->max_latency;
 
@@ -1325,7 +1301,7 @@
                     u->sink->get_volume = sink_get_volume_cb;
                     u->sink->set_volume = sink_set_volume_cb;
                     u->sink->flags |= PA_SINK_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SINK_DECIBEL_VOLUME : 0);
-                    pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using");
+                    pa_log_info("Using hardware volume control. Hardware dB scale %s.", u->hw_dB_supported ? "supported" : "not supported");
 
                 } else if (mixer_reset) {
                     pa_log_info("Using software volume control. Trying to reset sound card to 0 dB.");

Modified: branches/glitch-free/src/modules/module-alsa-source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/modules/module-alsa-source.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/modules/module-alsa-source.c (original)
+++ branches/glitch-free/src/modules/module-alsa-source.c Wed Apr 23 20:26:48 2008
@@ -127,7 +127,69 @@
 
     pa_smoother *smoother;
     int64_t frame_index;
+
+    snd_pcm_sframes_t hwbuf_unused_frames;
 };
+
+static void fix_tsched_watermark(struct userdata *u) {
+    size_t max_use;
+    pa_assert(u);
+
+    max_use = u->hwbuf_size - u->hwbuf_unused_frames * u->frame_size;
+
+    if (u->tsched_watermark >= max_use-u->frame_size)
+        u->tsched_watermark = max_use-u->frame_size;
+}
+
+static int try_recover(struct userdata *u, const char *call, int err) {
+    pa_assert(u);
+    pa_assert(call);
+    pa_assert(err < 0);
+
+    pa_log_debug("%s: %s", call, snd_strerror(err));
+
+    if (err == -EAGAIN) {
+        pa_log_debug("%s: EAGAIN", call);
+        return 1;
+    }
+
+    if (err == -EPIPE)
+        pa_log_debug("%s: Buffer overrun!", call);
+
+    if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
+        snd_pcm_start(u->pcm_handle);
+        return 0;
+    }
+
+    pa_log("%s: %s", call, snd_strerror(err));
+    return -1;
+}
+
+static void check_left_to_record(struct userdata *u, snd_pcm_sframes_t n) {
+    size_t left_to_record;
+
+    if (n*u->frame_size < u->hwbuf_size)
+        left_to_record = u->hwbuf_size - (n*u->frame_size);
+    else
+        left_to_record = 0;
+
+    if (left_to_record > 0)
+        pa_log_debug("%0.2f ms left to record", (double) pa_bytes_to_usec(left_to_record, &u->source->sample_spec) / PA_USEC_PER_MSEC);
+    else {
+        pa_log_info("Overrun!");
+
+        if (u->use_tsched) {
+            size_t old_watermark = u->tsched_watermark;
+
+            u->tsched_watermark *= 2;
+            fix_tsched_watermark(u);
+
+            if (old_watermark != u->tsched_watermark)
+                pa_log_notice("Increasing wakeup watermark to %0.2f ms",
+                              (double) pa_bytes_to_usec(u->tsched_watermark, &u->source->sample_spec) / PA_USEC_PER_MSEC);
+        }
+    }
+}
 
 static int mmap_read(struct userdata *u) {
     int work_done = 0;
@@ -137,7 +199,7 @@
 
     for (;;) {
         snd_pcm_sframes_t n;
-        int err;
+        int err, r;
         const snd_pcm_channel_area_t *areas;
         snd_pcm_uframes_t offset, frames;
         pa_memchunk chunk;
@@ -147,50 +209,36 @@
 
         if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
 
-            pa_log_debug("snd_pcm_avail_update: %s", snd_strerror(n));
-
-            if (err == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_avail_update", err)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (n == -EPIPE)
-                pa_log_debug("snd_pcm_avail_update: Buffer underrun!");
-
-            if ((err = snd_pcm_recover(u->pcm_handle, n, 1)) == 0) {
-                snd_pcm_start(u->pcm_handle);
-                continue;
-            }
-
-            pa_log("snd_pcm_recover: %s", snd_strerror(err));
-            return -1;
-        }
+
+            return r;
+        }
+
+        check_left_to_record(u, n);
 
         if (PA_UNLIKELY(n <= 0))
             return work_done;
 
         frames = n;
 
+        pa_log_debug("%lu frames to read", (unsigned long) frames);
+
         if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) {
 
-            pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err));
-
-            if (err == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (err == -EPIPE)
-                pa_log_debug("snd_pcm_mmap_begin: Buffer underrun!");
-
-            if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
-                snd_pcm_start(u->pcm_handle);
-                continue;
-            }
-
-            pa_log("Failed to write data to DSP: %s", snd_strerror(err));
-            return -1;
-        }
+
+            return r;
+        }
+
+        /* Make sure that if these memblocks need to be copied they will fit into one slot */
+        if (frames > pa_mempool_block_size_max(u->source->core->mempool)/u->frame_size)
+            frames = pa_mempool_block_size_max(u->source->core->mempool)/u->frame_size;
 
         /* Check these are multiples of 8 bit */
         pa_assert((areas[0].first & 7) == 0);
@@ -202,42 +250,28 @@
 
         p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
 
-        chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, 1);
+        chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
         chunk.length = pa_memblock_get_length(chunk.memblock);
         chunk.index = 0;
 
         pa_source_post(u->source, &chunk);
-
-        /* FIXME: Maybe we can do something to keep this memory block
-         * a little bit longer around? */
         pa_memblock_unref_fixed(chunk.memblock);
 
         if (PA_UNLIKELY((err = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {
 
-            pa_log_debug("snd_pcm_mmap_commit: %s", snd_strerror(err));
-
-            if (err == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_mmap_commit", err)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (err == -EPIPE)
-                pa_log_debug("snd_pcm_mmap_commit: Buffer underrun!");
-
-            if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
-                snd_pcm_start(u->pcm_handle);
-                continue;
-            }
-
-            pa_log("Failed to write data to DSP: %s", snd_strerror(err));
-            return -1;
+
+            return r;
         }
 
         work_done = 1;
 
         u->frame_index += frames;
 
-        pa_log_debug("read %llu frames", (unsigned long long) frames);
+        pa_log_debug("read %lu frames", (unsigned long) frames);
 
         if (PA_LIKELY(frames >= (snd_pcm_uframes_t) n))
             return work_done;
@@ -245,10 +279,7 @@
 }
 
 static int unix_read(struct userdata *u) {
-    snd_pcm_status_t *status;
     int work_done = 0;
-
-    snd_pcm_status_alloca(&status);
 
     pa_assert(u);
     pa_source_assert_ref(u->source);
@@ -256,20 +287,22 @@
     for (;;) {
         void *p;
         snd_pcm_sframes_t n, frames;
-        int err;
+        int r;
         pa_memchunk chunk;
 
         snd_pcm_hwsync(u->pcm_handle);
 
-        if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) {
-            pa_log("Failed to query DSP status data: %s", snd_strerror(err));
-            return -1;
-        }
-
-        if (PA_UNLIKELY(snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size))
-            pa_log_debug("Buffer overrun!");
-
-        n = snd_pcm_status_get_avail(status);
+        if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
+
+            if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0)
+                continue;
+            else if (r > 0)
+                return work_done;
+
+            return r;
+        }
+
+        check_left_to_record(u, n);
 
         if (PA_UNLIKELY(n <= 0))
             return work_done;
@@ -280,6 +313,8 @@
 
         if (frames > n)
             frames = n;
+
+        pa_log_debug("%lu frames to read", (unsigned long) n);
 
         p = pa_memblock_acquire(chunk.memblock);
         frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, frames);
@@ -290,20 +325,12 @@
         if (PA_UNLIKELY(frames < 0)) {
             pa_memblock_unref(chunk.memblock);
 
-            if (frames == -EAGAIN) {
-                pa_log_debug("EAGAIN");
+            if ((r = try_recover(u, "snd_pcm_readi", n)) == 0)
+                continue;
+            else if (r > 0)
                 return work_done;
-            }
-
-            if (frames == -EPIPE)
-                pa_log_debug("snd_pcm_avail_update: Buffer overrun!");
-
-            if ((frames = snd_pcm_recover(u->pcm_handle, frames, 1)) == 0)
-                snd_pcm_start(u->pcm_handle);
-                continue;
-
-            pa_log("Failed to read data from DSP: %s", snd_strerror(frames));
-            return -1;
+
+            return r;
         }
 
         chunk.index = 0;
@@ -315,6 +342,8 @@
         work_done = 1;
 
         u->frame_index += frames;
+
+        pa_log_debug("read %lu frames", (unsigned long) frames);
 
         if (PA_LIKELY(frames >= n))
             return work_done;
@@ -344,46 +373,37 @@
 
     now1 = pa_rtclock_usec();
     now2 = pa_bytes_to_usec(frames * u->frame_size, &u->source->sample_spec);
+
     pa_smoother_put(u->smoother, now1, now2);
 }
 
 static pa_usec_t source_get_latency(struct userdata *u) {
     pa_usec_t r = 0;
     int64_t delay;
-
-    pa_assert(u);
-
-    delay = pa_smoother_get(u->smoother, pa_rtclock_usec()) - u->frame_index;
+    pa_usec_t now1, now2;
+
+    pa_assert(u);
+
+    now1 = pa_rtclock_usec();
+    now2 = pa_smoother_get(u->smoother, now1);
+
+    delay = (int64_t) now2 - pa_bytes_to_usec(u->frame_index * u->frame_size, &u->source->sample_spec);
 
     if (delay > 0)
-        r = pa_bytes_to_usec(delay * u->frame_size, &u->source->sample_spec);
+        r = (pa_usec_t) delay;
 
     return r;
 }
 
 static int build_pollfd(struct userdata *u) {
-    int err;
-    struct pollfd *pollfd;
-    int n;
-
     pa_assert(u);
     pa_assert(u->pcm_handle);
-
-    if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) {
-        pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n));
-        return -1;
-    }
 
     if (u->alsa_rtpoll_item)
         pa_rtpoll_item_free(u->alsa_rtpoll_item);
 
-    u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, n);
-    pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL);
-
-    if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) {
-        pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err));
+    if (!(u->alsa_rtpoll_item = pa_alsa_build_pollfd(u->pcm_handle, u->rtpoll)))
         return -1;
-    }
 
     return 0;
 }
@@ -418,7 +438,7 @@
     if (usec == (pa_usec_t) -1)
         usec = pa_bytes_to_usec(u->hwbuf_size, &u->source->sample_spec);
 
-    pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
+/*     pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
 
     wm = pa_bytes_to_usec(u->tsched_watermark, &u->source->sample_spec);
 
@@ -427,29 +447,55 @@
     else
         usec /= 2;
 
-    pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
+/*     pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
 
     return usec;
 }
 
 static int update_sw_params(struct userdata *u) {
-    size_t avail_min;
+    snd_pcm_uframes_t avail_min;
     int err;
 
     pa_assert(u);
+
+    /* Use the full buffer if noone asked us for anything specific */
+    u->hwbuf_unused_frames = 0;
+
+    if (u->use_tsched) {
+        pa_usec_t latency;
+
+        if ((latency = pa_source_get_requested_latency_within_thread(u->source)) != (pa_usec_t) -1) {
+            size_t b;
+
+            pa_log_debug("latency set to %0.2f", (double) latency / PA_USEC_PER_MSEC);
+
+            b = pa_usec_to_bytes(latency, &u->source->sample_spec);
+
+            /* We need at least one sample in our buffer */
+
+            if (PA_UNLIKELY(b < u->frame_size))
+                b = u->frame_size;
+
+            u->hwbuf_unused_frames =
+                PA_LIKELY(b < u->hwbuf_size) ?
+                ((u->hwbuf_size - b) / u->frame_size) : 0;
+
+            fix_tsched_watermark(u);
+        }
+    }
+
+    pa_log_debug("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames);
+
+    avail_min = 1;
 
     if (u->use_tsched) {
         pa_usec_t usec;
 
         usec = hw_sleep_time(u);
-
-        avail_min = pa_usec_to_bytes(usec, &u->source->sample_spec);
-
-        if (avail_min <= 0)
-            avail_min = 1;
-
-    } else
-        avail_min = 1;
+        avail_min += pa_usec_to_bytes(usec, &u->source->sample_spec);
+    }
+
+    pa_log_debug("setting avail_min=%lu", (unsigned long) avail_min);
 
     if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) {
         pa_log("Failed to set software parameters: %s", snd_strerror(err));
@@ -649,7 +695,6 @@
         long alsa_vol;
         pa_volume_t vol;
 
-
         pa_assert(snd_mixer_selem_has_capture_channel(u->mixer_elem, u->mixer_map[i]));
 
         vol = PA_MIN(s->volume.values[i], PA_VOLUME_NORM);
@@ -670,7 +715,6 @@
             u->hw_dB_supported = FALSE;
         }
 
-
         alsa_vol = (long) roundf(((float) vol * (u->hw_volume_max - u->hw_volume_min)) / PA_VOLUME_NORM) + u->hw_volume_min;
         alsa_vol = PA_CLAMP_UNLIKELY(alsa_vol, u->hw_volume_min, u->hw_volume_max);
 
@@ -744,6 +788,8 @@
 
     for (;;) {
         int ret;
+
+        pa_log_debug("loop");
 
         /* Read some data and pass it to the sources */
         if (PA_SOURCE_OPENED(u->source->thread_info.state)) {
@@ -756,6 +802,8 @@
 
             if (work_done < 0)
                 goto fail;
+
+            pa_log_debug("work_done = %i", work_done);
 
             if (work_done)
                 update_smoother(u);
@@ -806,8 +854,7 @@
             }
 
             if (revents & (POLLERR|POLLNVAL|POLLHUP)) {
-
-                if (pa_alsa_recover_from_poll(u->pcm_handle, revents))
+                if (pa_alsa_recover_from_poll(u->pcm_handle, revents) < 0)
                     goto fail;
 
                 snd_pcm_start(u->pcm_handle);
@@ -910,10 +957,9 @@
     m->userdata = u;
     u->use_mmap = use_mmap;
     u->use_tsched = use_tsched;
-    pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = pa_rtpoll_new();
+    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
     u->alsa_rtpoll_item = NULL;
-    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
 
     u->smoother = pa_smoother_new(DEFAULT_TSCHED_WATERMARK_USEC, DEFAULT_TSCHED_WATERMARK_USEC, TRUE);
     pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
@@ -951,7 +997,7 @@
 
     if (use_mmap && !b) {
         pa_log_info("Device doesn't support mmap(), falling back to UNIX read/write mode.");
-        u->use_mmap = use_mmap = b;
+        u->use_mmap = use_mmap = FALSE;
     }
 
     if (use_tsched && (!b || !d)) {
@@ -1030,7 +1076,7 @@
     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->device_name);
     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_BUFFERING_BUFFER_SIZE, "%lu", (unsigned long) (period_frames * frame_size * nfrags));
     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE, "%lu", (unsigned long) (period_frames * frame_size));
-    pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap_rewrite" : (u->use_mmap ? "mmap" : "serial"));
+    pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap+timer" : (u->use_mmap ? "mmap" : "serial"));
 
     u->source = pa_source_new(m->core, &data, PA_SOURCE_HARDWARE|PA_SOURCE_LATENCY);
     pa_source_new_data_done(&data);
@@ -1052,14 +1098,17 @@
     u->fragment_size = frag_size = period_frames * frame_size;
     u->nfragments = nfrags;
     u->hwbuf_size = u->fragment_size * nfrags;
+    u->hwbuf_unused_frames = 0;
     u->tsched_watermark = tsched_watermark;
     u->frame_index = 0;
     u->hw_dB_supported = FALSE;
     u->hw_dB_min = u->hw_dB_max = 0;
     u->hw_volume_min = u->hw_volume_max = 0;
 
+    if (use_tsched)
+        fix_tsched_watermark(u);
+
     u->source->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss);
-
     if (!use_tsched)
         u->source->min_latency = u->source->max_latency;
 
@@ -1095,7 +1144,7 @@
                     pa_log_info("Device has less than 4 volume levels. Falling back to software volume control.");
                     suitable = FALSE;
 
-                } else if (snd_mixer_selem_get_playback_dB_range(u->mixer_elem, &u->hw_dB_min, &u->hw_dB_max) >= 0) {
+                } else if (snd_mixer_selem_get_capture_dB_range(u->mixer_elem, &u->hw_dB_min, &u->hw_dB_max) >= 0) {
 
                     pa_log_info("Volume ranges from %0.2f dB to %0.2f dB.", u->hw_dB_min/100.0, u->hw_dB_max/100.0);
 
@@ -1122,7 +1171,7 @@
                     u->source->get_volume = source_get_volume_cb;
                     u->source->set_volume = source_set_volume_cb;
                     u->source->flags |= PA_SOURCE_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SOURCE_DECIBEL_VOLUME : 0);
-                    pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using");
+                    pa_log_info("Using hardware volume control. Hardware dB scale %s.", u->hw_dB_supported ? "supported" : "not supported");
 
                 } else if (mixer_reset) {
                     pa_log_info("Using software volume control. Trying to reset sound card to 0 dB.");

Modified: branches/glitch-free/src/modules/module-null-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/modules/module-null-sink.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/modules/module-null-sink.c (original)
+++ branches/glitch-free/src/modules/module-null-sink.c Wed Apr 23 20:26:48 2008
@@ -188,9 +188,8 @@
     u->core = m->core;
     u->module = m;
     m->userdata = u;
-    pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = pa_rtpoll_new();
-    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
+    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
 
     pa_sink_new_data_init(&data);
     data.driver = __FILE__;

Modified: branches/glitch-free/src/modules/module-pipe-sink.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/modules/module-pipe-sink.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/modules/module-pipe-sink.c (original)
+++ branches/glitch-free/src/modules/module-pipe-sink.c Wed Apr 23 20:26:48 2008
@@ -227,9 +227,8 @@
     u->module = m;
     m->userdata = u;
     pa_memchunk_reset(&u->memchunk);
-    pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = pa_rtpoll_new();
-    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
+    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
 
     u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
 

Modified: branches/glitch-free/src/modules/module-pipe-source.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/modules/module-pipe-source.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/modules/module-pipe-source.c (original)
+++ branches/glitch-free/src/modules/module-pipe-source.c Wed Apr 23 20:26:48 2008
@@ -204,9 +204,8 @@
     u->module = m;
     m->userdata = u;
     pa_memchunk_reset(&u->memchunk);
-    pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = pa_rtpoll_new();
-    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
+    pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
 
     u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
 

Modified: branches/glitch-free/src/pulsecore/asyncmsgq.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/asyncmsgq.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/asyncmsgq.c (original)
+++ branches/glitch-free/src/pulsecore/asyncmsgq.c Wed Apr 23 20:26:48 2008
@@ -136,7 +136,7 @@
 
     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
     pa_mutex_lock(a->mutex);
-    pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
+    pa_asyncq_post(a->asyncq, i);
     pa_mutex_unlock(a->mutex);
 }
 
@@ -163,7 +163,7 @@
 
     /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
     pa_mutex_lock(a->mutex);
-    pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
+    pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
     pa_mutex_unlock(a->mutex);
 
     pa_semaphore_wait(i.semaphore);
@@ -174,7 +174,7 @@
     return i.ret;
 }
 
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) {
     pa_assert(PA_REFCNT_VALUE(a) > 0);
     pa_assert(!a->current);
 
@@ -276,22 +276,40 @@
     return 1;
 }
 
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
-    pa_assert(PA_REFCNT_VALUE(a) > 0);
-
-    return pa_asyncq_get_fd(a->asyncq);
-}
-
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
-    pa_assert(PA_REFCNT_VALUE(a) > 0);
-
-    return pa_asyncq_before_poll(a->asyncq);
-}
-
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
-    pa_assert(PA_REFCNT_VALUE(a) > 0);
-
-    pa_asyncq_after_poll(a->asyncq);
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    return pa_asyncq_read_fd(a->asyncq);
+}
+
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    return pa_asyncq_read_before_poll(a->asyncq);
+}
+
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    pa_asyncq_read_after_poll(a->asyncq);
+}
+
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    return pa_asyncq_write_fd(a->asyncq);
+}
+
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    pa_asyncq_write_before_poll(a->asyncq);
+}
+
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
+    pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+    pa_asyncq_write_after_poll(a->asyncq);
 }
 
 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {

Modified: branches/glitch-free/src/pulsecore/asyncmsgq.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/asyncmsgq.h?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/asyncmsgq.h (original)
+++ branches/glitch-free/src/pulsecore/asyncmsgq.h Wed Apr 23 20:26:48 2008
@@ -62,15 +62,20 @@
 void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
 int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk);
 
-int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, int wait);
+int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, pa_bool_t wait);
 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
 void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
 int pa_asyncmsgq_process_one(pa_asyncmsgq *a);
 
-/* Just for the reading side */
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *q);
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a);
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a);
+/* For the reading side */
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *q);
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a);
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a);
+
+/* For the write side */
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *q);
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a);
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a);
 
 #endif

Modified: branches/glitch-free/src/pulsecore/asyncq.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/asyncq.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/asyncq.c (original)
+++ branches/glitch-free/src/pulsecore/asyncq.c Wed Apr 23 20:26:48 2008
@@ -33,6 +33,8 @@
 #include <pulsecore/thread.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/core-util.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/flist.h>
 #include <pulse/xmalloc.h>
 
 #include "asyncq.h"
@@ -51,12 +53,23 @@
 #define _Y do { } while(0)
 #endif
 
+struct localq {
+    void *data;
+    PA_LLIST_FIELDS(struct localq);
+};
+
 struct pa_asyncq {
     unsigned size;
     unsigned read_idx;
     unsigned write_idx;
     pa_fdsem *read_fdsem, *write_fdsem;
+
+    PA_LLIST_HEAD(struct localq, localq);
+    struct localq *last_localq;
+    pa_bool_t waiting_for_post;
 };
+
+PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
 
 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
 
@@ -79,6 +92,10 @@
     l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
 
     l->size = size;
+
+    PA_LLIST_HEAD_INIT(struct localq, l->localq);
+    l->last_localq = NULL;
+    l->waiting_for_post = FALSE;
 
     if (!(l->read_fdsem = pa_fdsem_new())) {
         pa_xfree(l);
@@ -95,6 +112,7 @@
 }
 
 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
+    struct localq *q;
     pa_assert(l);
 
     if (free_cb) {
@@ -102,6 +120,16 @@
 
         while ((p = pa_asyncq_pop(l, 0)))
             free_cb(p);
+    }
+
+    while ((q = l->localq)) {
+        if (free_cb)
+            free_cb(q->data);
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
     }
 
     pa_fdsem_free(l->read_fdsem);
@@ -109,7 +137,7 @@
     pa_xfree(l);
 }
 
-int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
+static int push(pa_asyncq*l, void *p, pa_bool_t wait) {
     int idx;
     pa_atomic_ptr_t *cells;
 
@@ -141,7 +169,63 @@
     return 0;
 }
 
-void* pa_asyncq_pop(pa_asyncq*l, int wait) {
+static pa_bool_t flush_postq(pa_asyncq *l) {
+    struct localq *q;
+
+    pa_assert(l);
+
+    while ((q = l->last_localq)) {
+
+        if (push(l, q->data, FALSE) < 0)
+            return FALSE;
+
+        l->last_localq = q->prev;
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
+    }
+
+    return TRUE;
+}
+
+int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) {
+    pa_assert(l);
+
+    if (!flush_postq(l))
+        return -1;
+
+    return push(l, p, wait);
+}
+
+void pa_asyncq_post(pa_asyncq*l, void *p) {
+    struct localq *q;
+
+    pa_assert(l);
+    pa_assert(p);
+
+    if (pa_asyncq_push(l, p, FALSE) >= 0)
+        return;
+
+    /* OK, we couldn't push anything in the queue. So let's queue it
+     * locally and push it later */
+
+    pa_log("q overrun, queuing locally");
+
+    if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
+        q = pa_xnew(struct localq, 1);
+
+    q->data = p;
+    PA_LLIST_PREPEND(struct localq, l->localq, q);
+
+    if (!l->last_localq)
+        l->last_localq = q;
+
+    return;
+}
+
+void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) {
     int idx;
     void *ret;
     pa_atomic_ptr_t *cells;
@@ -178,13 +262,13 @@
     return ret;
 }
 
-int pa_asyncq_get_fd(pa_asyncq *q) {
+int pa_asyncq_read_fd(pa_asyncq *q) {
     pa_assert(q);
 
     return pa_fdsem_get(q->write_fdsem);
 }
 
-int pa_asyncq_before_poll(pa_asyncq *l) {
+int pa_asyncq_read_before_poll(pa_asyncq *l) {
     int idx;
     pa_atomic_ptr_t *cells;
 
@@ -206,8 +290,38 @@
     return 0;
 }
 
-void pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_read_after_poll(pa_asyncq *l) {
     pa_assert(l);
 
     pa_fdsem_after_poll(l->write_fdsem);
 }
+
+int pa_asyncq_write_fd(pa_asyncq *q) {
+    pa_assert(q);
+
+    return pa_fdsem_get(q->read_fdsem);
+}
+
+void pa_asyncq_write_before_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    for (;;) {
+
+        if (flush_postq(l))
+            break;
+
+        if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
+            l->waiting_for_post = TRUE;
+            break;
+        }
+    }
+}
+
+void pa_asyncq_write_after_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    if (l->waiting_for_post) {
+        pa_fdsem_after_poll(l->read_fdsem);
+        l->waiting_for_post = FALSE;
+    }
+}

Modified: branches/glitch-free/src/pulsecore/asyncq.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/asyncq.h?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/asyncq.h (original)
+++ branches/glitch-free/src/pulsecore/asyncq.h Wed Apr 23 20:26:48 2008
@@ -26,6 +26,7 @@
 
 #include <sys/types.h>
 #include <pulse/def.h>
+#include <pulsecore/macro.h>
 
 /* A simple, asynchronous, lock-free (if requested also wait-free)
  * queue. Not multiple-reader/multiple-writer safe. If that is
@@ -46,11 +47,21 @@
 pa_asyncq* pa_asyncq_new(unsigned size);
 void pa_asyncq_free(pa_asyncq* q, pa_free_cb_t free_cb);
 
-void* pa_asyncq_pop(pa_asyncq *q, int wait);
-int pa_asyncq_push(pa_asyncq *q, void *p, int wait);
+void* pa_asyncq_pop(pa_asyncq *q, pa_bool_t wait);
+int pa_asyncq_push(pa_asyncq *q, void *p, pa_bool_t wait);
 
-int pa_asyncq_get_fd(pa_asyncq *q);
-int pa_asyncq_before_poll(pa_asyncq *a);
-void pa_asyncq_after_poll(pa_asyncq *a);
+/* Similar to pa_asyncq_push(), but if the queue is full, postpone it
+ * locally and delay until pa_asyncq_before_poll_post() */
+void pa_asyncq_post(pa_asyncq*l, void *p);
+
+/* For the reading side */
+int pa_asyncq_read_fd(pa_asyncq *q);
+int pa_asyncq_read_before_poll(pa_asyncq *a);
+void pa_asyncq_read_after_poll(pa_asyncq *a);
+
+/* For the writing side */
+int pa_asyncq_write_fd(pa_asyncq *q);
+void pa_asyncq_write_before_poll(pa_asyncq *a);
+void pa_asyncq_write_after_poll(pa_asyncq *a);
 
 #endif

Modified: branches/glitch-free/src/pulsecore/rtpoll.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/rtpoll.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/rtpoll.c (original)
+++ branches/glitch-free/src/pulsecore/rtpoll.c Wed Apr 23 20:26:48 2008
@@ -661,23 +661,23 @@
     return i;
 }
 
-static int asyncmsgq_before(pa_rtpoll_item *i) {
-    pa_assert(i);
-
-    if (pa_asyncmsgq_before_poll(i->userdata) < 0)
+static int asyncmsgq_read_before(pa_rtpoll_item *i) {
+    pa_assert(i);
+
+    if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
         return 1; /* 1 means immediate restart of the loop */
 
     return 0;
 }
 
-static void asyncmsgq_after(pa_rtpoll_item *i) {
+static void asyncmsgq_read_after(pa_rtpoll_item *i) {
     pa_assert(i);
 
     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
-    pa_asyncmsgq_after_poll(i->userdata);
-}
-
-static int asyncmsgq_work(pa_rtpoll_item *i) {
+    pa_asyncmsgq_read_after_poll(i->userdata);
+}
+
+static int asyncmsgq_read_work(pa_rtpoll_item *i) {
     pa_msgobject *object;
     int code;
     void *data;
@@ -703,7 +703,7 @@
     return 0;
 }
 
-pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
     pa_rtpoll_item *i;
     struct pollfd *pollfd;
 
@@ -713,17 +713,52 @@
     i = pa_rtpoll_item_new(p, prio, 1);
 
     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
-    pollfd->fd = pa_asyncmsgq_get_fd(q);
+    pollfd->fd = pa_asyncmsgq_read_fd(q);
     pollfd->events = POLLIN;
 
-    i->before_cb = asyncmsgq_before;
-    i->after_cb = asyncmsgq_after;
-    i->work_cb = asyncmsgq_work;
+    i->before_cb = asyncmsgq_read_before;
+    i->after_cb = asyncmsgq_read_after;
+    i->work_cb = asyncmsgq_read_work;
     i->userdata = q;
 
     return i;
 }
 
+static int asyncmsgq_write_before(pa_rtpoll_item *i) {
+    pa_assert(i);
+
+    pa_asyncmsgq_write_before_poll(i->userdata);
+    return 0;
+}
+
+static void asyncmsgq_write_after(pa_rtpoll_item *i) {
+    pa_assert(i);
+
+    pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
+    pa_asyncmsgq_write_after_poll(i->userdata);
+}
+
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
+    pa_rtpoll_item *i;
+    struct pollfd *pollfd;
+
+    pa_assert(p);
+    pa_assert(q);
+
+    i = pa_rtpoll_item_new(p, prio, 1);
+
+    pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
+    pollfd->fd = pa_asyncmsgq_write_fd(q);
+    pollfd->events = POLLIN;
+
+    i->before_cb = asyncmsgq_write_before;
+    i->after_cb = asyncmsgq_write_after;
+    i->work_cb = NULL;
+    i->userdata = q;
+
+    return i;
+}
+
 void pa_rtpoll_quit(pa_rtpoll *p) {
     pa_assert(p);
 

Modified: branches/glitch-free/src/pulsecore/rtpoll.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/rtpoll.h?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/rtpoll.h (original)
+++ branches/glitch-free/src/pulsecore/rtpoll.h Wed Apr 23 20:26:48 2008
@@ -106,7 +106,8 @@
 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i);
 
 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s);
-pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
 
 /* Requests the loop to exit. Will cause the next iteration of
  * pa_rtpoll_run() to return 0 */

Modified: branches/glitch-free/src/pulsecore/thread-mq.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/thread-mq.c?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/thread-mq.c (original)
+++ branches/glitch-free/src/pulsecore/thread-mq.c Wed Apr 23 20:26:48 2008
@@ -43,15 +43,15 @@
 
 PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq);
 
-static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
+static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
     pa_thread_mq *q = userdata;
     pa_asyncmsgq *aq;
 
-    pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd);
+    pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd);
     pa_assert(events == PA_IO_EVENT_INPUT);
 
     pa_asyncmsgq_ref(aq = q->outq);
-    pa_asyncmsgq_after_poll(aq);
+    pa_asyncmsgq_write_after_poll(aq);
 
     for (;;) {
         pa_msgobject *object;
@@ -68,14 +68,24 @@
             pa_asyncmsgq_done(aq, ret);
         }
 
-        if (pa_asyncmsgq_before_poll(aq) == 0)
+        if (pa_asyncmsgq_read_before_poll(aq) == 0)
             break;
     }
 
     pa_asyncmsgq_unref(aq);
 }
 
-void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) {
+static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
+    pa_thread_mq *q = userdata;
+
+    pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd);
+    pa_assert(events == PA_IO_EVENT_INPUT);
+
+    pa_asyncmsgq_write_after_poll(q->inq);
+    pa_asyncmsgq_write_before_poll(q->inq);
+}
+
+void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) {
     pa_assert(q);
     pa_assert(mainloop);
 
@@ -83,15 +93,22 @@
     pa_assert_se(q->inq = pa_asyncmsgq_new(0));
     pa_assert_se(q->outq = pa_asyncmsgq_new(0));
 
-    pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0);
-    pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q));
+    pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0);
+    pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q));
+
+    pa_asyncmsgq_write_before_poll(q->inq);
+    pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q));
+
+    pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq);
+    pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq);
 }
 
 void pa_thread_mq_done(pa_thread_mq *q) {
     pa_assert(q);
 
-    q->mainloop->io_free(q->io_event);
-    q->io_event = NULL;
+    q->mainloop->io_free(q->read_event);
+    q->mainloop->io_free(q->write_event);
+    q->read_event = q->write_event = NULL;
 
     pa_asyncmsgq_unref(q->inq);
     pa_asyncmsgq_unref(q->outq);

Modified: branches/glitch-free/src/pulsecore/thread-mq.h
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/glitch-free/src/pulsecore/thread-mq.h?rev=2308&root=pulseaudio&r1=2307&r2=2308&view=diff
==============================================================================
--- branches/glitch-free/src/pulsecore/thread-mq.h (original)
+++ branches/glitch-free/src/pulsecore/thread-mq.h Wed Apr 23 20:26:48 2008
@@ -26,6 +26,7 @@
 
 #include <pulse/mainloop-api.h>
 #include <pulsecore/asyncmsgq.h>
+#include <pulsecore/rtpoll.h>
 
 /* Two way communication between a thread and a mainloop. Before the
  * thread is started a pa_pthread_mq should be initialized and than
@@ -34,10 +35,10 @@
 typedef struct pa_thread_mq {
     pa_mainloop_api *mainloop;
     pa_asyncmsgq *inq, *outq;
-    pa_io_event *io_event;
+    pa_io_event *read_event, *write_event;
 } pa_thread_mq;
 
-void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop);
+void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll);
 void pa_thread_mq_done(pa_thread_mq *q);
 
 /* Install the specified pa_thread_mq object for the current thread */




More information about the pulseaudio-commits mailing list