[pulseaudio-commits] r1924 - /branches/lennart/src/pulsecore/protocol-native.c

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Mon Oct 1 09:43:00 PDT 2007


Author: lennart
Date: Mon Oct  1 18:42:59 2007
New Revision: 1924

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=1924&root=pulseaudio&view=rev
Log:
update native protocol to make use of pa_memblockq_pop_missing

Modified:
    branches/lennart/src/pulsecore/protocol-native.c

Modified: branches/lennart/src/pulsecore/protocol-native.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/lennart/src/pulsecore/protocol-native.c?rev=1924&root=pulseaudio&r1=1923&r2=1924&view=diff
==============================================================================
--- branches/lennart/src/pulsecore/protocol-native.c (original)
+++ branches/lennart/src/pulsecore/protocol-native.c Mon Oct  1 18:42:59 2007
@@ -80,7 +80,7 @@
 
     connection *connection;
     uint32_t index;
-    
+
     pa_source_output *source_output;
     pa_memblockq *memblockq;
     size_t fragment_size;
@@ -92,10 +92,10 @@
 
 typedef struct playback_stream {
     output_stream parent;
-    
+
     connection *connection;
     uint32_t index;
-    
+
     pa_sink_input *sink_input;
     pa_memblockq *memblockq;
     int drain_request;
@@ -104,7 +104,7 @@
     int underrun;
 
     pa_atomic_t missing;
-    size_t last_missing;
+    size_t minreq;
 
     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
     int64_t read_index, write_index;
@@ -113,10 +113,10 @@
 
 typedef struct upload_stream {
     output_stream parent;
-    
+
     connection *connection;
     uint32_t index;
-    
+
     pa_memchunk memchunk;
     size_t length;
     char *name;
@@ -126,7 +126,7 @@
 
 struct connection {
     pa_msgobject parent;
-    
+
     int authorized;
     uint32_t version;
     pa_protocol_native *protocol;
@@ -299,7 +299,7 @@
 
     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
-    
+
     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
@@ -360,7 +360,7 @@
         const char *name, size_t length) {
 
     upload_stream *s;
-    
+
     pa_assert(c);
     pa_assert(ss);
     pa_assert(name);
@@ -376,7 +376,7 @@
     s->length = length;
 
     pa_idxset_put(c->output_streams, s, &s->index);
-    
+
     return s;
 }
 
@@ -394,7 +394,7 @@
 
     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
     s->connection = NULL;
-    record_stream_unref(s);    
+    record_stream_unref(s);
 }
 
 static void record_stream_free(pa_object *o) {
@@ -402,7 +402,7 @@
     pa_assert(s);
 
     record_stream_unlink(s);
-    
+
     pa_memblockq_free(s->memblockq);
     pa_xfree(s);
 }
@@ -413,11 +413,11 @@
 
     if (!s->connection)
         return -1;
-    
+
     switch (code) {
-        
+
         case RECORD_STREAM_MESSAGE_POST_DATA:
-            
+
             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
 /*                 pa_log_warn("Failed to push data into output queue."); */
                 return -1;
@@ -512,7 +512,7 @@
 
     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
     s->connection = NULL;
-    playback_stream_unref(s);    
+    playback_stream_unref(s);
 }
 
 static void playback_stream_free(pa_object* o) {
@@ -520,7 +520,7 @@
     pa_assert(s);
 
     playback_stream_unlink(s);
-    
+
     pa_memblockq_free(s->memblockq);
     pa_xfree(s);
 }
@@ -535,23 +535,26 @@
     switch (code) {
         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
             pa_tagstruct *t;
-            int32_t l = 0;
+            uint32_t l = 0;
 
             for (;;) {
                 int32_t k;
-                
+
                 if ((k = pa_atomic_load(&s->missing)) <= 0)
                     break;
 
                 l += k;
-                
+
+                if (l < s->minreq)
+                    break;
+
                 if (pa_atomic_sub(&s->missing, k) <= k)
                     break;
             }
 
-            if (l <= 0)
+            if (l < s->minreq)
                 break;
-            
+
             t = pa_tagstruct_new(NULL, 0);
             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
@@ -689,16 +692,16 @@
     *tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq);
     *prebuf = (uint32_t) pa_memblockq_get_prebuf(s->memblockq);
     *minreq = (uint32_t) pa_memblockq_get_minreq(s->memblockq);
-    *missing = (uint32_t) pa_memblockq_missing(s->memblockq);
-    
+    *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
+
+    s->minreq = pa_memblockq_get_minreq(s->memblockq);
     pa_atomic_store(&s->missing, 0);
-    s->last_missing = *missing;
     s->drain_request = 0;
 
     pa_idxset_put(c->output_streams, s, &s->index);
 
     pa_sink_input_put(s->sink_input);
-    
+
     return s;
 }
 
@@ -708,9 +711,9 @@
 
     if (!c->protocol)
         return -1;
-    
+
     switch (code) {
-        
+
         case CONNECTION_MESSAGE_REVOKE:
             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
             break;
@@ -751,7 +754,7 @@
         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
         c->auth_timeout_event = NULL;
     }
-    
+
     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
     c->protocol = NULL;
     connection_unref(c);
@@ -759,11 +762,11 @@
 
 static void connection_free(pa_object *o) {
     connection *c = CONNECTION(o);
-    
+
     pa_assert(c);
 
     connection_unlink(c);
-    
+
     pa_idxset_free(c->record_streams, NULL, NULL);
     pa_idxset_free(c->output_streams, NULL, NULL);
 
@@ -776,23 +779,19 @@
 
 /* Called from thread context */
 static void request_bytes(playback_stream *s) {
-    size_t new_missing, delta, previous_missing;
-    size_t minreq;
+    size_t m, previous_missing;
 
     playback_stream_assert_ref(s);
 
-    new_missing = pa_memblockq_missing(s->memblockq);
-    delta = new_missing > s->last_missing ? new_missing - s->last_missing : 0;
-    s->last_missing = new_missing;
-
-    if (delta <= 0)
-        return;
-
-/*     pa_log("request_bytes(%u)", delta); */
-    minreq = pa_memblockq_get_minreq(s->memblockq);
-
-    previous_missing = pa_atomic_add(&s->missing, delta);
-    if (previous_missing < minreq && previous_missing+delta >= minreq) {
+    m = pa_memblockq_pop_missing(s->memblockq);
+
+    if (m <= 0)
+        return;
+
+/*     pa_log("request_bytes(%u)", m); */
+
+    previous_missing = pa_atomic_add(&s->missing, m);
+    if (previous_missing < s->minreq && previous_missing+m >= s->minreq) {
         pa_assert(pa_thread_mq_get());
         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
     }
@@ -821,7 +820,7 @@
                 schunk.length = r->fragment_size;
 
             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
-            
+
             pa_memblockq_drop(r->memblockq, schunk.length);
             pa_memblock_unref(schunk.memblock);
 
@@ -865,13 +864,15 @@
 
     switch (code) {
 
-        case SINK_INPUT_MESSAGE_SEEK: 
+        case SINK_INPUT_MESSAGE_SEEK:
             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
             request_bytes(s);
             return 0;
 
         case SINK_INPUT_MESSAGE_POST_DATA: {
             pa_assert(chunk);
+
+/*             pa_log("sink input post: %u", chunk->length); */
 
             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
 
@@ -904,7 +905,7 @@
         case SINK_INPUT_MESSAGE_FLUSH:
         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
         case SINK_INPUT_MESSAGE_TRIGGER: {
-            
+
             pa_sink_input *isync;
             void (*func)(pa_memblockq *bq);
 
@@ -912,11 +913,11 @@
                 case SINK_INPUT_MESSAGE_FLUSH:
                     func = pa_memblockq_flush;
                     break;
-                    
+
                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
                     func = pa_memblockq_prebuf_force;
                     break;
-                    
+
                 case SINK_INPUT_MESSAGE_TRIGGER:
                     func = pa_memblockq_prebuf_disable;
                     break;
@@ -924,7 +925,7 @@
                 default:
                     pa_assert_not_reached();
             }
-            
+
             func(s->memblockq);
             s->underrun = 0;
             request_bytes(s);
@@ -943,17 +944,17 @@
                 ssync->underrun = 0;
                 request_bytes(ssync);
             }
-            
+
             return 0;
         }
 
-        case SINK_INPUT_MESSAGE_UPDATE_LATENCY: 
+        case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
 
             s->read_index = pa_memblockq_get_read_index(s->memblockq);
             s->write_index = pa_memblockq_get_write_index(s->memblockq);
             s->resampled_chunk_length = s->sink_input->thread_info.resampled_chunk.memblock ? s->sink_input->thread_info.resampled_chunk.length : 0;
             return 0;
-            
+
         case PA_SINK_INPUT_MESSAGE_SET_STATE:
 
             pa_memblockq_prebuf_force(s->memblockq);
@@ -993,7 +994,7 @@
         return -1;
     }
 
-/*     pa_log("peek: %u", chunk->length);     */
+/*     pa_log("peek: %u", chunk->length); */
 
     request_bytes(s);
 
@@ -1018,7 +1019,7 @@
 
     request_bytes(s);
 
-/*     pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq));   */
+/*     pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */
 }
 
 static void sink_input_kill_cb(pa_sink_input *i) {
@@ -1103,7 +1104,7 @@
     pa_sink *sink = NULL;
     pa_cvolume volume;
     int corked;
-    
+
     connection_assert_ref(c);
     pa_assert(t);
 
@@ -1156,7 +1157,7 @@
     pa_tagstruct_putu32(reply, missing);
 
 /*     pa_log("initial request is %u", missing); */
-    
+
     if (c->version >= 9) {
         /* Since 0.9 we support sending the buffer metrics back to the client */
 
@@ -1185,25 +1186,25 @@
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 
     switch (command) {
-        
+
         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
             playback_stream *s;
             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
                 return;
             }
-            
+
             playback_stream_unlink(s);
             break;
         }
-            
+
         case PA_COMMAND_DELETE_RECORD_STREAM: {
             record_stream *s;
             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
                 return;
             }
-            
+
             record_stream_unlink(s);
             break;
         }
@@ -1215,7 +1216,7 @@
                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
                 return;
             }
-            
+
             upload_stream_unlink(s);
             break;
         }
@@ -1294,7 +1295,7 @@
 
     connection_assert_ref(c);
     pa_assert(t);
-    
+
     if (!pa_tagstruct_eof(t)) {
         protocol_error(c);
         return;
@@ -1427,7 +1428,7 @@
     connection *c = CONNECTION(userdata);
     const char *name;
     uint32_t idx = PA_IDXSET_INVALID;
-    
+
     connection_assert_ref(c);
     pa_assert(t);
 
@@ -1532,12 +1533,12 @@
     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
     CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY)
-    
+
     reply = reply_new(tag);
-    
+
     latency = pa_sink_get_latency(s->sink_input->sink);
-    latency += pa_bytes_to_usec(s->resampled_chunk_length, &s->sink_input->sample_spec); 
-    
+    latency += pa_bytes_to_usec(s->resampled_chunk_length, &s->sink_input->sample_spec);
+
     pa_tagstruct_put_usec(reply, latency);
 
     pa_tagstruct_put_usec(reply, 0);
@@ -1792,7 +1793,7 @@
     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
     pa_tagstruct_puts(t, s->driver);
     if (c->version >= 11)
-        pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));    
+        pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
 }
 
 static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) {
@@ -1815,7 +1816,7 @@
 static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) {
     pa_assert(t);
     pa_assert(e);
-    
+
     pa_tagstruct_putu32(t, e->index);
     pa_tagstruct_puts(t, e->name);
     pa_tagstruct_put_cvolume(t, &e->volume);
@@ -2073,7 +2074,7 @@
     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
 
     switch (command) {
-        
+
         case PA_COMMAND_SET_SINK_VOLUME:
             if (idx != PA_INVALID_INDEX)
                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
@@ -2087,7 +2088,7 @@
             else
                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
             break;
-            
+
         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
             break;
@@ -2139,7 +2140,7 @@
     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || !name || (*name && pa_utf8_valid(name)), tag, PA_ERR_INVALID);
 
     switch (command) {
-        
+
         case PA_COMMAND_SET_SINK_MUTE:
 
             if (idx != PA_INVALID_INDEX)
@@ -2227,7 +2228,7 @@
         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
             break;
-            
+
         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
             break;
@@ -2275,7 +2276,7 @@
 
     connection_assert_ref(c);
     pa_assert(t);
-    
+
     if (pa_tagstruct_getu32(t, &idx) < 0 ||
         !pa_tagstruct_eof(t)) {
         protocol_error(c);
@@ -2405,7 +2406,7 @@
 
     connection_assert_ref(c);
     pa_assert(t);
-    
+
     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_gets(t, &argument) < 0 ||
         !pa_tagstruct_eof(t)) {
@@ -2562,7 +2563,7 @@
 static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     connection *c = CONNECTION(userdata);
     pa_tagstruct *reply;
-    
+
     connection_assert_ref(c);
     pa_assert(t);
 
@@ -2680,7 +2681,7 @@
 
             if (idx != PA_INVALID_INDEX)
                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
-            else 
+            else
                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
 
             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
@@ -2693,9 +2694,9 @@
     } else {
 
         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
-        
+
         if (idx == PA_INVALID_INDEX && name && !*name) {
-            
+
             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
                 return;
@@ -2708,7 +2709,7 @@
                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
             else
                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
-            
+
             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
 
             if (pa_source_suspend(source, b) < 0) {
@@ -2739,7 +2740,7 @@
 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
     connection *c = CONNECTION(userdata);
     output_stream *stream;
-    
+
     pa_assert(p);
     pa_assert(chunk);
     connection_assert_ref(c);
@@ -2752,12 +2753,12 @@
 
     if (playback_stream_isinstance(stream)) {
         playback_stream *ps = PLAYBACK_STREAM(stream);
-        
+
         if (seek != PA_SEEK_RELATIVE || offset != 0)
             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
-        
+
         pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
-        
+
     } else {
         upload_stream *u = UPLOAD_STREAM(stream);
         size_t l;
@@ -2799,7 +2800,7 @@
 
 static void pstream_die_callback(pa_pstream *p, void *userdata) {
     connection *c = CONNECTION(userdata);
-    
+
     pa_assert(p);
     connection_assert_ref(c);
 
@@ -2818,7 +2819,7 @@
 
 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
     pa_thread_mq *q;
-    
+
     if (!(q = pa_thread_mq_get()))
         pa_pstream_send_revoke(p, block_id);
     else
@@ -2827,7 +2828,7 @@
 
 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
     pa_thread_mq *q;
-    
+
     if (!(q = pa_thread_mq_get()))
         pa_pstream_send_release(p, block_id);
     else
@@ -2838,7 +2839,7 @@
 
 static void client_kill_cb(pa_client *c) {
     pa_assert(c);
-    
+
     connection_unlink(CONNECTION(c->userdata));
 }
 
@@ -2846,7 +2847,7 @@
 
 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
     connection *c = CONNECTION(userdata);
-    
+
     pa_assert(m);
     pa_assert(tv);
     connection_assert_ref(c);
@@ -3075,7 +3076,7 @@
         pa_iochannel *io,
         pa_module *m,
         pa_modargs *ma) {
-    
+
     pa_protocol_native *p;
 
     if (!(p = protocol_new_internal(core, m, ma)))




More information about the pulseaudio-commits mailing list