[pulseaudio-commits] r1379 - /trunk/src/pulsecore/pstream.c
svnmailer-noreply at 0pointer.de
svnmailer-noreply at 0pointer.de
Thu Sep 7 12:08:20 PDT 2006
Author: lennart
Date: Thu Sep 7 21:08:19 2006
New Revision: 1379
URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=1379&root=pulseaudio&view=rev
Log:
make pa_stream thread-safe: use new refcounting system, protect access using mutexes
Modified:
trunk/src/pulsecore/pstream.c
Modified: trunk/src/pulsecore/pstream.c
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/trunk/src/pulsecore/pstream.c?rev=1379&root=pulseaudio&r1=1378&r2=1379&view=diff
==============================================================================
--- trunk/src/pulsecore/pstream.c (original)
+++ trunk/src/pulsecore/pstream.c Thu Sep 7 21:08:19 2006
@@ -46,6 +46,8 @@
#include <pulsecore/log.h>
#include <pulsecore/core-scache.h>
#include <pulsecore/creds.h>
+#include <pulsecore/mutex.h>
+#include <pulsecore/refcnt.h>
#include "pstream.h"
@@ -108,12 +110,13 @@
};
struct pa_pstream {
- int ref;
+ PA_REFCNT_DECLARE;
pa_mainloop_api *mainloop;
pa_defer_event *defer_event;
pa_iochannel *io;
pa_queue *send_queue;
+ pa_mutex *mutex;
int dead;
@@ -163,10 +166,13 @@
static void do_something(pa_pstream *p) {
assert(p);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_pstream_ref(p);
+
+ pa_mutex_lock(p->mutex);
+
p->mainloop->defer_enable(p->defer_event, 0);
-
- pa_pstream_ref(p);
if (!p->dead && pa_iochannel_is_readable(p->io)) {
if (do_read(p) < 0)
@@ -179,6 +185,8 @@
goto fail;
}
+ pa_mutex_unlock(p->mutex);
+
pa_pstream_unref(p);
return;
@@ -188,6 +196,8 @@
if (p->die_callback)
p->die_callback(p, p->die_callback_userdata);
+
+ pa_mutex_unlock(p->mutex);
pa_pstream_unref(p);
}
@@ -221,10 +231,12 @@
assert(pool);
p = pa_xnew(pa_pstream, 1);
- p->ref = 1;
+ PA_REFCNT_INIT(p);
p->io = io;
pa_iochannel_set_callback(io, io_callback, p);
p->dead = 0;
+
+ p->mutex = pa_mutex_new(1);
p->mainloop = m;
p->defer_event = m->defer_new(m, defer_callback, p);
@@ -297,6 +309,9 @@
if (p->read.packet)
pa_packet_unref(p->read.packet);
+ if (p->mutex)
+ pa_mutex_free(p->mutex);
+
pa_xfree(p);
}
@@ -304,11 +319,13 @@
struct item_info *i;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
assert(packet);
+ pa_mutex_lock(p->mutex);
+
if (p->dead)
- return;
+ goto finish;
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
@@ -321,18 +338,24 @@
pa_queue_push(p->send_queue, i);
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
size_t length, idx;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
assert(channel != (uint32_t) -1);
assert(chunk);
+ pa_mutex_lock(p->mutex);
+
if (p->dead)
- return;
+ goto finish;
length = chunk->length;
idx = 0;
@@ -363,6 +386,10 @@
}
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
@@ -370,10 +397,12 @@
pa_pstream *p = userdata;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
if (p->dead)
- return;
+ goto finish;
/* pa_log("Releasing block %u", block_id); */
@@ -386,6 +415,10 @@
pa_queue_push(p->send_queue, item);
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
@@ -393,10 +426,12 @@
pa_pstream *p = userdata;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
if (p->dead)
- return;
+ goto finish;
/* pa_log("Revoking block %u", block_id); */
@@ -409,10 +444,15 @@
pa_queue_push(p->send_queue, item);
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
static void prepare_next_write_item(pa_pstream *p) {
assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
if (!(p->write.current = pa_queue_pop(p->send_queue)))
return;
@@ -501,7 +541,9 @@
void *d;
size_t l;
ssize_t r;
- assert(p);
+
+ assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
if (!p->write.current)
prepare_next_write_item(p);
@@ -552,8 +594,10 @@
void *d;
size_t l;
ssize_t r;
- assert(p);
-
+
+ assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
d = (uint8_t*) p->read.descriptor + p->read.index;
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
@@ -782,65 +826,83 @@
void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->die_callback = cb;
p->die_callback_userdata = userdata;
-}
-
+ pa_mutex_unlock(p->mutex);
+}
void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->drain_callback = cb;
p->drain_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->recieve_packet_callback = cb;
p->recieve_packet_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->recieve_memblock_callback = cb;
p->recieve_memblock_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
int pa_pstream_is_pending(pa_pstream *p) {
- assert(p);
+ int b;
+
+ assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
if (p->dead)
- return 0;
-
- return p->write.current || !pa_queue_is_empty(p->send_queue);
+ b = 0;
+ else
+ b = p->write.current || !pa_queue_is_empty(p->send_queue);
+
+ pa_mutex_unlock(p->mutex);
+
+ return b;
}
void pa_pstream_unref(pa_pstream*p) {
assert(p);
- assert(p->ref >= 1);
-
- if (--p->ref == 0)
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ if (PA_REFCNT_DEC(p) <= 0)
pstream_free(p);
}
pa_pstream* pa_pstream_ref(pa_pstream*p) {
assert(p);
- assert(p->ref >= 1);
-
- p->ref++;
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ PA_REFCNT_INC(p);
return p;
}
void pa_pstream_close(pa_pstream *p) {
assert(p);
+ pa_mutex_lock(p->mutex);
+
p->dead = 1;
if (p->import) {
@@ -868,12 +930,14 @@
p->recieve_packet_callback = NULL;
p->recieve_memblock_callback = NULL;
-
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_use_shm(pa_pstream *p, int enable) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->use_shm = enable;
@@ -888,6 +952,7 @@
pa_memexport_free(p->export);
p->export = NULL;
}
-
- }
-}
+ }
+
+ pa_mutex_unlock(p->mutex);
+}
More information about the pulseaudio-commits
mailing list