[pulseaudio-commits] r2398 - /branches/coling/airtunes/src/modules/module-raop-sink.c

svnmailer-noreply at 0pointer.de svnmailer-noreply at 0pointer.de
Sun May 11 05:21:33 PDT 2008


Author: coling
Date: Sun May 11 14:21:32 2008
New Revision: 2398

URL: http://0pointer.de/cgi-bin/viewcvs.cgi?rev=2398&root=pulseaudio&view=rev
Log:
A very rough first version of the sink.
I can actually play music to my airport now (woot).
Still very rough round the edges and I need to handle disconnects etc. but it's all good progress :)

Added:
    branches/coling/airtunes/src/modules/module-raop-sink.c
      - copied, changed from r2340, branches/coling/airtunes/src/modules/module-esound-sink.c

Copied: branches/coling/airtunes/src/modules/module-raop-sink.c (from r2340, branches/coling/airtunes/src/modules/module-esound-sink.c)
URL: http://0pointer.de/cgi-bin/viewcvs.cgi/branches/coling/airtunes/src/modules/module-raop-sink.c?p2=branches/coling/airtunes/src/modules/module-raop-sink.c&p1=branches/coling/airtunes/src/modules/module-esound-sink.c&r1=2340&r2=2398&rev=2398&root=pulseaudio&view=diff
==============================================================================
--- branches/coling/airtunes/src/modules/module-esound-sink.c (original)
+++ branches/coling/airtunes/src/modules/module-raop-sink.c Sun May 11 14:21:32 2008
@@ -3,7 +3,7 @@
 /***
   This file is part of PulseAudio.
 
-  Copyright 2004-2006 Lennart Poettering
+  Copyright 2008 Colin Guthrie
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as published
@@ -54,7 +54,6 @@
 #include <pulsecore/modargs.h>
 #include <pulsecore/log.h>
 #include <pulsecore/socket-client.h>
-#include <pulsecore/esound.h>
 #include <pulsecore/authkey.h>
 #include <pulsecore/thread-mq.h>
 #include <pulsecore/thread.h>
@@ -62,10 +61,14 @@
 #include <pulsecore/rtclock.h>
 #include <pulsecore/socket-util.h>
 
-#include "module-esound-sink-symdef.h"
-
-PA_MODULE_AUTHOR("Lennart Poettering");
-PA_MODULE_DESCRIPTION("ESOUND Sink");
+#include "module-raop-sink-symdef.h"
+#include "rtp.h"
+#include "sdp.h"
+#include "sap.h"
+#include "raop_client.h"
+
+PA_MODULE_AUTHOR("Colin Guthrie");
+PA_MODULE_DESCRIPTION("RAOP Sink (Apple Airtunes)");
 PA_MODULE_VERSION(PACKAGE_VERSION);
 PA_MODULE_LOAD_ONCE(FALSE);
 PA_MODULE_USAGE(
@@ -75,7 +78,7 @@
         "channels=<number of channels> "
         "rate=<sample rate>");
 
-#define DEFAULT_SINK_NAME "esound_out"
+#define DEFAULT_SINK_NAME "airtunes"
 
 struct userdata {
     pa_core *core;
@@ -87,7 +90,8 @@
     pa_rtpoll_item *rtpoll_item;
     pa_thread *thread;
 
-    pa_memchunk memchunk;
+    pa_memchunk raw_memchunk;
+    pa_memchunk encoded_memchunk;
 
     void *write_data;
     size_t write_length, write_index;
@@ -95,33 +99,25 @@
     void *read_data;
     size_t read_length, read_index;
 
-    enum {
-        STATE_AUTH,
-        STATE_LATENCY,
-        STATE_PREPARE,
-        STATE_RUNNING,
-        STATE_DEAD
-    } state;
-
     pa_usec_t latency;
 
-    esd_format_t format;
+    /*esd_format_t format;*/
     int32_t rate;
 
     pa_smoother *smoother;
     int fd;
 
     int64_t offset;
-
-    pa_iochannel *io;
-    pa_socket_client *client;
+    int64_t encoding_overhead;
+    double encoding_ratio;
+
+    pa_raop_client *raop;
 
     size_t block_size;
 };
 
 static const char* const valid_modargs[] = {
     "server",
-    "cookie",
     "rate",
     "format",
     "channels",
@@ -167,7 +163,7 @@
             pa_usec_t w, r;
 
             r = pa_smoother_get(u->smoother, pa_rtclock_usec());
-            w = pa_bytes_to_usec(u->offset + u->memchunk.length, &u->sink->sample_spec);
+            w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
 
             *((pa_usec_t*) data) = w > r ? w - r : 0;
             break;
@@ -219,14 +215,26 @@
                     ssize_t l;
                     void *p;
 
-                    if (u->memchunk.length <= 0)
-                        pa_sink_render(u->sink, u->block_size, &u->memchunk);
-
-                    pa_assert(u->memchunk.length > 0);
-
-                    p = pa_memblock_acquire(u->memchunk.memblock);
-                    l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
-                    pa_memblock_release(u->memchunk.memblock);
+                    if (u->raw_memchunk.length <= 0) {
+                        /* Grab unencoded data */
+                        pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
+                    }
+                    pa_assert(u->raw_memchunk.length > 0);
+
+                    if (u->encoded_memchunk.length <= 0) {
+                        /* Encode it */
+                        size_t rl = u->raw_memchunk.length;
+                        if (u->encoded_memchunk.memblock)
+                            pa_memblock_unref(u->encoded_memchunk.memblock);
+                        u->encoded_memchunk = pa_raop_client_encode_sample(u->raop, u->core->mempool, &u->raw_memchunk);
+                        u->encoding_overhead += (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
+                        u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
+                    }
+                    pa_assert(u->encoded_memchunk.length > 0);
+
+                    p = pa_memblock_acquire(u->encoded_memchunk.memblock);
+                    l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
+                    pa_memblock_release(u->encoded_memchunk.memblock);
 
                     pa_assert(l != 0);
 
@@ -248,17 +256,17 @@
                     } else {
                         u->offset += l;
 
-                        u->memchunk.index += l;
-                        u->memchunk.length -= l;
-
-                        if (u->memchunk.length <= 0) {
-                            pa_memblock_unref(u->memchunk.memblock);
-                            pa_memchunk_reset(&u->memchunk);
+                        u->encoded_memchunk.index += l;
+                        u->encoded_memchunk.length -= l;
+
+                        if (u->encoded_memchunk.length <= 0) {
+                            pa_memblock_unref(u->encoded_memchunk.memblock);
+                            pa_memchunk_reset(&u->encoded_memchunk);
                         }
 
                         pollfd->revents = 0;
 
-                        if (u->memchunk.length > 0)
+                        if (u->encoded_memchunk.length > 0)
 
                             /* OK, we wrote less that we asked for,
                              * hence we can assume that the socket
@@ -325,179 +333,16 @@
     pa_log_debug("Thread shutting down");
 }
 
-static int do_write(struct userdata *u) {
-    ssize_t r;
-    pa_assert(u);
-
-    if (!pa_iochannel_is_writable(u->io))
-        return 0;
-
-    if (u->write_data) {
-        pa_assert(u->write_index < u->write_length);
-
-        if ((r = pa_iochannel_write(u->io, (uint8_t*) u->write_data + u->write_index, u->write_length - u->write_index)) <= 0) {
-            pa_log("write() failed: %s", pa_cstrerror(errno));
-            return -1;
-        }
-
-        u->write_index += r;
-        pa_assert(u->write_index <= u->write_length);
-
-        if (u->write_index == u->write_length) {
-            pa_xfree(u->write_data);
-            u->write_data = NULL;
-            u->write_index = u->write_length = 0;
-        }
-    }
-
-    if (!u->write_data && u->state == STATE_PREPARE) {
-        /* OK, we're done with sending all control data we need to, so
-         * let's hand the socket over to the IO thread now */
-
-        pa_assert(u->fd < 0);
-        u->fd = pa_iochannel_get_send_fd(u->io);
-
-        pa_iochannel_set_noclose(u->io, TRUE);
-        pa_iochannel_free(u->io);
-        u->io = NULL;
-
-        pa_make_tcp_socket_low_delay(u->fd);
-
-        pa_log_debug("Connection authenticated, handing fd to IO thread...");
-
-        pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
-        u->state = STATE_RUNNING;
-    }
-
-    return 0;
-}
-
-static int handle_response(struct userdata *u) {
-    pa_assert(u);
-
-    switch (u->state) {
-
-        case STATE_AUTH:
-            pa_assert(u->read_length == sizeof(int32_t));
-
-            /* Process auth data */
-            if (!*(int32_t*) u->read_data) {
-                pa_log("Authentication failed: %s", pa_cstrerror(errno));
-                return -1;
-            }
-
-            /* Request latency data */
-            pa_assert(!u->write_data);
-            *(int32_t*) (u->write_data = pa_xmalloc(u->write_length = sizeof(int32_t))) = ESD_PROTO_LATENCY;
-
-            u->write_index = 0;
-            u->state = STATE_LATENCY;
-
-            /* Space for next response */
-            pa_assert(u->read_length >= sizeof(int32_t));
-            u->read_index = 0;
-            u->read_length = sizeof(int32_t);
-
-            break;
-
-        case STATE_LATENCY: {
-            int32_t *p;
-            pa_assert(u->read_length == sizeof(int32_t));
-
-            /* Process latency info */
-            u->latency = (pa_usec_t) ((double) (*(int32_t*) u->read_data) * 1000000 / 44100);
-            if (u->latency > 10000000) {
-                pa_log_warn("Invalid latency information received from server");
-                u->latency = 0;
-            }
-
-            /* Create stream */
-            pa_assert(!u->write_data);
-            p = u->write_data = pa_xmalloc0(u->write_length = sizeof(int32_t)*3+ESD_NAME_MAX);
-            *(p++) = ESD_PROTO_STREAM_PLAY;
-            *(p++) = u->format;
-            *(p++) = u->rate;
-            pa_strlcpy((char*) p, "PulseAudio Tunnel", ESD_NAME_MAX);
-
-            u->write_index = 0;
-            u->state = STATE_PREPARE;
-
-            /* Don't read any further */
-            pa_xfree(u->read_data);
-            u->read_data = NULL;
-            u->read_index = u->read_length = 0;
-
-            break;
-        }
-
-        default:
-            pa_assert_not_reached();
-    }
-
-    return 0;
-}
-
-static int do_read(struct userdata *u) {
-    pa_assert(u);
-
-    if (!pa_iochannel_is_readable(u->io))
-        return 0;
-
-    if (u->state == STATE_AUTH || u->state == STATE_LATENCY) {
-        ssize_t r;
-
-        if (!u->read_data)
-            return 0;
-
-        pa_assert(u->read_index < u->read_length);
-
-        if ((r = pa_iochannel_read(u->io, (uint8_t*) u->read_data + u->read_index, u->read_length - u->read_index)) <= 0) {
-            pa_log("read() failed: %s", r < 0 ? pa_cstrerror(errno) : "EOF");
-            return -1;
-        }
-
-        u->read_index += r;
-        pa_assert(u->read_index <= u->read_length);
-
-        if (u->read_index == u->read_length)
-            return handle_response(u);
-    }
-
-    return 0;
-}
-
-static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void*userdata) {
+static void on_connection(PA_GCC_UNUSED int fd, void*userdata) {
     struct userdata *u = userdata;
     pa_assert(u);
 
-    if (do_read(u) < 0 || do_write(u) < 0) {
-
-        if (u->io) {
-            pa_iochannel_free(u->io);
-            u->io = NULL;
-        }
-
-       pa_module_unload_request(u->module);
-    }
-}
-
-static void on_connection(PA_GCC_UNUSED pa_socket_client *c, pa_iochannel*io, void *userdata) {
-    struct userdata *u = userdata;
-
-    pa_socket_client_unref(u->client);
-    u->client = NULL;
-
-    if (!io) {
-        pa_log("Connection failed: %s", pa_cstrerror(errno));
-        pa_module_unload_request(u->module);
-        return;
-    }
-
-    pa_assert(!u->io);
-    u->io = io;
-    pa_iochannel_set_callback(u->io, io_callback, u);
-
-    pa_log_debug("Connection established, authenticating ...");
+    pa_assert(u->fd < 0);
+    u->fd = fd;
+
+    pa_log_debug("Connection authenticated, handing fd to IO thread...");
+
+    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
 }
 
 int pa__init(pa_module*m) {
@@ -506,8 +351,6 @@
     pa_sample_spec ss;
     pa_modargs *ma = NULL;
     char *t;
-    const char *espeaker;
-    uint32_t key;
 
     pa_assert(m);
 
@@ -522,9 +365,9 @@
         goto fail;
     }
 
-    if ((ss.format != PA_SAMPLE_U8 && ss.format != PA_SAMPLE_S16NE) ||
+    if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
         (ss.channels > 2)) {
-        pa_log("esound sample type support is limited to mono/stereo and U8 or S16NE sample data");
+        pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
         goto fail;
     }
 
@@ -534,24 +377,27 @@
     m->userdata = u;
     u->fd = -1;
     u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE);
-    pa_memchunk_reset(&u->memchunk);
+    pa_memchunk_reset(&u->raw_memchunk);
+    pa_memchunk_reset(&u->encoded_memchunk);
     u->offset = 0;
+    u->encoding_overhead = 0;
+    u->encoding_ratio = 1.0;
 
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
     u->rtpoll = pa_rtpoll_new();
     pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
     u->rtpoll_item = NULL;
 
-    u->format =
+    /*u->format =
         (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
-        (ss.channels == 2 ? ESD_STEREO : ESD_MONO);
+        (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
     u->rate = ss.rate;
     u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
 
     u->read_data = u->write_data = NULL;
     u->read_index = u->write_index = u->read_length = u->write_length = 0;
 
-    u->state = STATE_AUTH;
+    /*u->state = STATE_AUTH;*/
     u->latency = 0;
 
     if (!(u->sink = pa_sink_new(m->core, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, NULL))) {
@@ -567,31 +413,20 @@
     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
     pa_sink_set_rtpoll(u->sink, u->rtpoll);
 
-    if (!(espeaker = getenv("ESPEAKER")))
-        espeaker = ESD_UNIX_SOCKET_NAME;
-
-    if (!(u->client = pa_socket_client_new_string(u->core->mainloop, p = pa_modargs_get_value(ma, "server", espeaker), ESD_DEFAULT_PORT))) {
+    if (!(p = pa_modargs_get_value(ma, "server", NULL))) {
+        pa_log("No server argument given.");
+        goto fail;
+    }
+
+    if (!(u->raop = pa_raop_client_new(u->core->mainloop, p))) {
         pa_log("Failed to connect to server.");
         goto fail;
     }
 
-    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Esound sink '%s'", p));
+    pa_raop_client_set_callback(u->raop, on_connection, u);
+    pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Airtunes sink '%s'", p));
     pa_xfree(t);
 
-    pa_socket_client_set_callback(u->client, on_connection, u);
-
-    /* Prepare the initial request */
-    u->write_data = pa_xmalloc(u->write_length = ESD_KEY_LEN + sizeof(int32_t));
-    if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", ".esd_auth"), u->write_data, ESD_KEY_LEN) < 0) {
-        pa_log("Failed to load cookie");
-        goto fail;
-    }
-
-    key = ESD_ENDIAN_KEY;
-    memcpy((uint8_t*) u->write_data + ESD_KEY_LEN, &key, sizeof(key));
-
-    /* Reserve space for the response */
-    u->read_data = pa_xmalloc(u->read_length = sizeof(int32_t));
 
     if (!(u->thread = pa_thread_new(thread_func, u))) {
         pa_log("Failed to create thread.");
@@ -633,20 +468,20 @@
     if (u->sink)
         pa_sink_unref(u->sink);
 
-    if (u->io)
-        pa_iochannel_free(u->io);
-
     if (u->rtpoll_item)
         pa_rtpoll_item_free(u->rtpoll_item);
 
     if (u->rtpoll)
         pa_rtpoll_free(u->rtpoll);
 
-    if (u->memchunk.memblock)
-        pa_memblock_unref(u->memchunk.memblock);
-
-    if (u->client)
-        pa_socket_client_unref(u->client);
+    if (u->raw_memchunk.memblock)
+        pa_memblock_unref(u->raw_memchunk.memblock);
+
+    if (u->encoded_memchunk.memblock)
+        pa_memblock_unref(u->encoded_memchunk.memblock);
+
+    if (u->raop)
+        pa_raop_client_free(u->raop);
 
     pa_xfree(u->read_data);
     pa_xfree(u->write_data);




More information about the pulseaudio-commits mailing list