[Spice-devel] [spice-server v3 07/10] sound: Use RedChannelClient to receive/send data

Christophe Fergeau cfergeau at redhat.com
Thu Jan 26 11:13:18 UTC 2017


You can see that SndChannelClient has much less field
as the code to read/write from/to client is reused from
RedChannelClient instead of creating a fake RedChannelClient
just to make the system happy.

One of the different between the old sound code and all other
RedChannelClient objects was that the sound channel don't use
a queue while RedChannelClient use RedPipeItem object. This was
the main reason why RedChannelClient was not used. To implement
the old behaviour a "persistent_pipe_item" is used. This RedPipeItem
will be queued to RedChannelClient (only one!) so signal code we
have data to send. The {playback,record}_channel_send_item will
then send the messages to the client using RedChannelClient functions.
For this reason snd_reset_send_data is replaced by a call to
red_channel_client_init_send_data and snd_begin_send_message is
replaced by red_channel_client_begin_send_message.

Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
Signed-off-by: Christophe Fergeau <cfergeau at redhat.com>
---
 server/sound.c | 659 +++++++++++++++++++++------------------------------------
 1 file changed, 245 insertions(+), 414 deletions(-)

diff --git a/server/sound.c b/server/sound.c
index f27a53d..2bb5688 100644
--- a/server/sound.c
+++ b/server/sound.c
@@ -82,8 +82,6 @@ typedef struct AudioFrameContainer AudioFrameContainer;
 typedef struct SpicePlaybackState PlaybackChannel;
 typedef struct SpiceRecordState RecordChannel;
 
-typedef void (*snd_channel_send_messages_proc)(void *in_channel);
-typedef int (*snd_channel_handle_message_proc)(SndChannelClient *client, size_t size, uint32_t type, void *message);
 typedef void (*snd_channel_on_message_done_proc)(SndChannelClient *client);
 typedef void (*snd_channel_cleanup_channel_proc)(SndChannelClient *client);
 
@@ -91,37 +89,30 @@ typedef void (*snd_channel_cleanup_channel_proc)(SndChannelClient *client);
 
 /* Connects an audio client to a Spice client */
 struct SndChannelClient {
-    RedsStream *stream;
-    spice_parse_channel_func_t parser;
     int refs;
 
     RedChannelClient *channel_client;
 
     int active;
     int client_active;
-    int blocked;
 
     uint32_t command;
 
-    struct {
-        uint64_t serial;
-        uint32_t size;
-        uint32_t pos;
-    } send_data;
+    /* we don't expect very big messages so don't allocate too much
+     * bytes, data will be cached in RecordChannelClient::samples */
+    uint8_t receive_buf[SND_CODEC_MAX_FRAME_BYTES + 64];
+    RedPipeItem persistent_pipe_item;
 
-    struct {
-        uint8_t buf[SND_RECEIVE_BUF_SIZE];
-        uint8_t *message_start;
-        uint8_t *now;
-        uint8_t *end;
-    } receive_data;
-
-    snd_channel_send_messages_proc send_messages;
-    snd_channel_handle_message_proc handle_message;
     snd_channel_on_message_done_proc on_message_done;
     snd_channel_cleanup_channel_proc cleanup;
 };
 
+
+enum {
+    RED_PIPE_ITEM_PERSISTENT = RED_PIPE_ITEM_TYPE_CHANNEL_BASE,
+};
+
+
 struct AudioFrame {
     uint32_t time;
     uint32_t samples[SND_CODEC_MAX_FRAME_SIZE];
@@ -226,10 +217,10 @@ struct RecordChannelClient {
 /* A list of all Spice{Playback,Record}State objects */
 static SndChannel *snd_channels;
 
-static void snd_receive(SndChannelClient *client);
 static void snd_playback_start(SndChannel *channel);
 static void snd_record_start(SndChannel *channel);
 static void snd_playback_alloc_frames(PlaybackChannelClient *playback);
+static void snd_send(SndChannelClient * client);
 
 static SndChannelClient *snd_channel_unref(SndChannelClient *client)
 {
@@ -241,6 +232,16 @@ static SndChannelClient *snd_channel_unref(SndChannelClient *client)
     return client;
 }
 
+static SndChannelClient *snd_channel_client_from_dummy(RedChannelClient *dummy)
+{
+    SndChannelClient *sound_client;
+
+    g_assert(IS_DUMMY_CHANNEL_CLIENT(dummy));
+    sound_client =  g_object_get_data(G_OBJECT(dummy), "sound-channel-client");
+
+    return sound_client;
+}
+
 static RedsState* snd_channel_get_server(SndChannelClient *client)
 {
     g_return_val_if_fail(client != NULL, NULL);
@@ -250,16 +251,14 @@ static RedsState* snd_channel_get_server(SndChannelClient *client)
 static void snd_disconnect_channel(SndChannelClient *client)
 {
     SndChannel *channel;
-    RedsState *reds;
     RedChannel *red_channel;
     uint32_t type;
 
-    if (!client || !client->stream) {
+    if (!client || !red_channel_client_is_connected(client->channel_client)) {
         spice_debug("not connected");
         return;
     }
     red_channel = red_channel_client_get_channel(client->channel_client);
-    reds = snd_channel_get_server(client);
     g_object_get(red_channel, "channel-type", &type, NULL);
     spice_debug("SndChannelClient=%p rcc=%p type=%d",
                  client, client->channel_client, type);
@@ -267,10 +266,6 @@ static void snd_disconnect_channel(SndChannelClient *client)
     client->cleanup(client);
     red_channel_client_disconnect(channel->connection->channel_client);
     channel->connection->channel_client = NULL;
-    reds_core_watch_remove(reds, client->stream->watch);
-    client->stream->watch = NULL;
-    reds_stream_free(client->stream);
-    client->stream = NULL;
     snd_channel_unref(client);
     channel->connection = NULL;
 }
@@ -290,6 +285,7 @@ static void snd_playback_on_message_done(SndChannelClient *client)
         playback_client->in_progress = NULL;
         if (playback_client->pending_frame) {
             client->command |= SND_PLAYBACK_PCM_MASK;
+            snd_send(client);
         }
     }
 }
@@ -298,63 +294,6 @@ static void snd_record_on_message_done(SndChannelClient *client)
 {
 }
 
-static int snd_send_data(SndChannelClient *client)
-{
-    uint32_t n;
-    RedChannelClient *rcc = client->channel_client;
-    SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
-
-    if (!client) {
-        return FALSE;
-    }
-
-    if (!(n = client->send_data.size - client->send_data.pos)) {
-        return TRUE;
-    }
-
-    RedsState *reds = snd_channel_get_server(client);
-    for (;;) {
-        struct iovec vec[IOV_MAX];
-        int vec_size;
-
-        if (!n) {
-            client->on_message_done(client);
-
-            if (client->blocked) {
-                client->blocked = FALSE;
-                reds_core_watch_update_mask(reds, client->stream->watch, SPICE_WATCH_EVENT_READ);
-            }
-            break;
-        }
-
-        vec_size = spice_marshaller_fill_iovec(m,
-                                               vec, IOV_MAX, client->send_data.pos);
-        n = reds_stream_writev(client->stream, vec, vec_size);
-        if (n == -1) {
-            switch (errno) {
-            case EAGAIN:
-                client->blocked = TRUE;
-                reds_core_watch_update_mask(reds, client->stream->watch, SPICE_WATCH_EVENT_READ |
-                                                                 SPICE_WATCH_EVENT_WRITE);
-                return FALSE;
-            case EINTR:
-                break;
-            case EPIPE:
-                snd_disconnect_channel(client);
-                return FALSE;
-            default:
-                spice_printerr("%s", strerror(errno));
-                snd_disconnect_channel(client);
-                return FALSE;
-            }
-        } else {
-            client->send_data.pos += n;
-        }
-        n = client->send_data.size - client->send_data.pos;
-    }
-    return TRUE;
-}
-
 static int snd_record_handle_write(RecordChannelClient *record_client, size_t size, void *message)
 {
     SpiceMsgcRecordPacket *packet;
@@ -400,30 +339,24 @@ static int snd_record_handle_write(RecordChannelClient *record_client, size_t si
     return TRUE;
 }
 
-static int snd_playback_handle_message(SndChannelClient *client, size_t size, uint32_t type, void *message)
+static int
+playback_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message)
 {
-    if (!client) {
-        return FALSE;
-    }
-
     switch (type) {
     case SPICE_MSGC_DISCONNECTING:
         break;
     default:
-        spice_printerr("invalid message type %u", type);
-        return FALSE;
+        return red_channel_client_handle_message(rcc, size, type, message);
     }
     return TRUE;
 }
 
-static int snd_record_handle_message(SndChannelClient *client, size_t size, uint32_t type, void *message)
+static int
+record_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message)
 {
-    RecordChannelClient *record_client = (RecordChannelClient *)client;
-    RedChannelClient *rcc = client->channel_client;
+    SndChannelClient *snd_client = snd_channel_client_from_dummy(rcc);
+    RecordChannelClient *record_client = SPICE_CONTAINEROF(snd_client, RecordChannelClient, base);
 
-    if (!client) {
-        return FALSE;
-    }
     switch (type) {
     case SPICE_MSGC_RECORD_DATA:
         return snd_record_handle_write(record_client, size, message);
@@ -459,156 +392,23 @@ static int snd_record_handle_message(SndChannelClient *client, size_t size, uint
     case SPICE_MSGC_DISCONNECTING:
         break;
     default:
-        spice_printerr("invalid message type %u", type);
-        return FALSE;
+        return red_channel_client_handle_message(rcc, size, type, message);
     }
     return TRUE;
 }
 
-static void snd_receive(SndChannelClient *client)
-{
-    SpiceDataHeaderOpaque *header;
-
-    if (!client) {
-        return;
-    }
-
-    header = &client->channel_client->incoming.header;
-
-    for (;;) {
-        ssize_t n;
-        n = client->receive_data.end - client->receive_data.now;
-        spice_warn_if_fail(n > 0);
-        n = reds_stream_read(client->stream, client->receive_data.now, n);
-        if (n <= 0) {
-            if (n == 0) {
-                snd_disconnect_channel(client);
-                return;
-            }
-            spice_assert(n == -1);
-            switch (errno) {
-            case EAGAIN:
-                return;
-            case EINTR:
-                break;
-            case EPIPE:
-                snd_disconnect_channel(client);
-                return;
-            default:
-                spice_printerr("%s", strerror(errno));
-                snd_disconnect_channel(client);
-                return;
-            }
-        } else {
-            client->receive_data.now += n;
-            for (;;) {
-                uint8_t *msg_start = client->receive_data.message_start;
-                uint8_t *data = msg_start + header->header_size;
-                size_t parsed_size;
-                uint8_t *parsed;
-                message_destructor_t parsed_free;
-
-                header->data = msg_start;
-                n = client->receive_data.now - msg_start;
-
-                if (n < header->header_size ||
-                    n < header->header_size + header->get_msg_size(header)) {
-                    break;
-                }
-                parsed = client->parser((void *)data, data + header->get_msg_size(header),
-                                         header->get_msg_type(header),
-                                         SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
-                if (parsed == NULL) {
-                    spice_printerr("failed to parse message type %d", header->get_msg_type(header));
-                    snd_disconnect_channel(client);
-                    return;
-                }
-                if (!client->handle_message(client, parsed_size,
-                                             header->get_msg_type(header), parsed)) {
-                    free(parsed);
-                    snd_disconnect_channel(client);
-                    return;
-                }
-                parsed_free(parsed);
-                client->receive_data.message_start = msg_start + header->header_size +
-                                                     header->get_msg_size(header);
-            }
-            if (client->receive_data.now == client->receive_data.message_start) {
-                client->receive_data.now = client->receive_data.buf;
-                client->receive_data.message_start = client->receive_data.buf;
-            } else if (client->receive_data.now == client->receive_data.end) {
-                memcpy(client->receive_data.buf, client->receive_data.message_start, n);
-                client->receive_data.now = client->receive_data.buf + n;
-                client->receive_data.message_start = client->receive_data.buf;
-            }
-        }
-    }
-}
-
-static void snd_event(int fd, int event, void *data)
-{
-    SndChannelClient *client = data;
-
-    if (event & SPICE_WATCH_EVENT_READ) {
-        snd_receive(client);
-    }
-    if (event & SPICE_WATCH_EVENT_WRITE) {
-        client->send_messages(client);
-    }
-}
-
-static inline int snd_reset_send_data(SndChannelClient *client, uint16_t verb)
-{
-    SpiceDataHeaderOpaque *header;
-    RedChannelClient *rcc = client->channel_client;
-    SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
-
-    if (!client) {
-        return FALSE;
-    }
-
-    header = &client->channel_client->priv->send_data.header;
-    spice_marshaller_reset(m);
-    header->data = spice_marshaller_reserve_space(m, header->header_size);
-    spice_marshaller_set_base(m, header->header_size);
-    client->send_data.pos = 0;
-    header->set_msg_size(header, 0);
-    header->set_msg_type(header, verb);
-    client->send_data.serial++;
-    if (!client->channel_client->priv->is_mini_header) {
-        header->set_msg_serial(header, client->send_data.serial);
-        header->set_msg_sub_list(header, 0);
-    }
-
-    return TRUE;
-}
-
-static int snd_begin_send_message(SndChannelClient *client)
-{
-    SpiceDataHeaderOpaque *header = &client->channel_client->priv->send_data.header;
-    RedChannelClient *rcc = client->channel_client;
-    SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
-
-    spice_marshaller_flush(m);
-    client->send_data.size = spice_marshaller_get_total_size(m);
-    header->set_msg_size(header, client->send_data.size - header->header_size);
-    return snd_send_data(client);
-}
-
 static int snd_channel_send_migrate(SndChannelClient *client)
 {
     RedChannelClient *rcc = client->channel_client;
     SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
     SpiceMsgMigrate migrate;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_MIGRATE)) {
-        return FALSE;
-    }
-    spice_debug(NULL);
+    red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE);
     migrate.flags = 0;
     spice_marshall_msg_migrate(m, &migrate);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_playback_send_migrate(PlaybackChannelClient *client)
@@ -625,22 +425,21 @@ static int snd_send_volume(SndChannelClient *client, uint32_t cap, int msg)
     SndChannel *channel = SND_CHANNEL(red_channel_client_get_channel(rcc));
     SpiceVolumeState *st = &channel->volume;
 
-    if (!red_channel_client_test_remote_cap(client->channel_client, cap)) {
-        return TRUE;
+    if (!red_channel_client_test_remote_cap(rcc, cap)) {
+        return FALSE;
     }
 
     vol = alloca(sizeof (SpiceMsgAudioVolume) +
                  st->volume_nchannels * sizeof (uint16_t));
-    if (!snd_reset_send_data(client, msg)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, msg);
     vol->nchannels = st->volume_nchannels;
     for (c = 0; c < st->volume_nchannels; ++c) {
         vol->volume[c] = st->volume[c];
     }
     spice_marshall_SpiceMsgAudioVolume(m, vol);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_playback_send_volume(PlaybackChannelClient *playback_client)
@@ -657,17 +456,16 @@ static int snd_send_mute(SndChannelClient *client, uint32_t cap, int msg)
     SndChannel *channel = SND_CHANNEL(red_channel_client_get_channel(rcc));
     SpiceVolumeState *st = &channel->volume;
 
-    if (!red_channel_client_test_remote_cap(client->channel_client, cap)) {
-        return TRUE;
-    }
-
-    if (!snd_reset_send_data(client, msg)) {
+    if (!red_channel_client_test_remote_cap(rcc, cap)) {
         return FALSE;
     }
+
+    red_channel_client_init_send_data(rcc, msg);
     mute.mute = st->mute;
     spice_marshall_SpiceMsgAudioMute(m, &mute);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_playback_send_mute(PlaybackChannelClient *playback_client)
@@ -684,14 +482,14 @@ static int snd_playback_send_latency(PlaybackChannelClient *playback_client)
     SpiceMsgPlaybackLatency latency_msg;
 
     spice_debug("latency %u", playback_client->latency);
-    if (!snd_reset_send_data(client, SPICE_MSG_PLAYBACK_LATENCY)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, SPICE_MSG_PLAYBACK_LATENCY);
     latency_msg.latency_ms = playback_client->latency;
     spice_marshall_msg_playback_latency(m, &latency_msg);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
+
 static int snd_playback_send_start(PlaybackChannelClient *playback_client)
 {
     SndChannelClient *client = (SndChannelClient *)playback_client;
@@ -699,10 +497,7 @@ static int snd_playback_send_start(PlaybackChannelClient *playback_client)
     SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
     SpiceMsgPlaybackStart start;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_PLAYBACK_START)) {
-        return FALSE;
-    }
-
+    red_channel_client_init_send_data(rcc, SPICE_MSG_PLAYBACK_START);
     start.channels = SPICE_INTERFACE_PLAYBACK_CHAN;
     start.frequency = SND_CHANNEL(red_channel_client_get_channel(rcc))->frequency;
     spice_assert(SPICE_INTERFACE_PLAYBACK_FMT == SPICE_INTERFACE_AUDIO_FMT_S16);
@@ -710,18 +505,19 @@ static int snd_playback_send_start(PlaybackChannelClient *playback_client)
     start.time = reds_get_mm_time();
     spice_marshall_msg_playback_start(m, &start);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_playback_send_stop(PlaybackChannelClient *playback_client)
 {
     SndChannelClient *client = (SndChannelClient *)playback_client;
+    RedChannelClient *rcc = client->channel_client;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_PLAYBACK_STOP)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, SPICE_MSG_PLAYBACK_STOP);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_playback_send_ctl(PlaybackChannelClient *playback_client)
@@ -742,9 +538,7 @@ static int snd_record_send_start(RecordChannelClient *record_client)
     SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
     SpiceMsgRecordStart start;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_RECORD_START)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, SPICE_MSG_RECORD_START);
 
     start.channels = SPICE_INTERFACE_RECORD_CHAN;
     start.frequency = SND_CHANNEL(red_channel_client_get_channel(rcc))->frequency;
@@ -752,18 +546,19 @@ static int snd_record_send_start(RecordChannelClient *record_client)
     start.format = SPICE_AUDIO_FMT_S16;
     spice_marshall_msg_record_start(m, &start);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_record_send_stop(RecordChannelClient *record_client)
 {
     SndChannelClient *client = (SndChannelClient *)record_client;
+    RedChannelClient *rcc = client->channel_client;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_RECORD_STOP)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, SPICE_MSG_RECORD_STOP);
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_record_send_ctl(RecordChannelClient *record_client)
@@ -805,10 +600,9 @@ static int snd_playback_send_write(PlaybackChannelClient *playback_client)
     SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
     AudioFrame *frame;
     SpiceMsgPlaybackPacket msg;
+    RedPipeItem *pipe_item = &SND_CHANNEL_CLIENT(playback_client)->persistent_pipe_item;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_PLAYBACK_DATA)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, SPICE_MSG_PLAYBACK_DATA);
 
     frame = playback_client->in_progress;
     msg.time = frame->time;
@@ -816,9 +610,10 @@ static int snd_playback_send_write(PlaybackChannelClient *playback_client)
     spice_marshall_msg_playback_data(m, &msg);
 
     if (playback_client->mode == SPICE_AUDIO_DATA_MODE_RAW) {
-        spice_marshaller_add_by_ref(m, (uint8_t *)frame->samples,
-                                    snd_codec_frame_size(playback_client->codec) *
-                                    sizeof(frame->samples[0]));
+        spice_marshaller_add_by_ref_full(m, (uint8_t *)frame->samples,
+                                         snd_codec_frame_size(playback_client->codec) *
+                                         sizeof(frame->samples[0]),
+                                         marshaller_unref_pipe_item, pipe_item);
     }
     else {
         int n = sizeof(playback_client->encode_buf);
@@ -829,10 +624,12 @@ static int snd_playback_send_write(PlaybackChannelClient *playback_client)
             snd_disconnect_channel(client);
             return FALSE;
         }
-        spice_marshaller_add_by_ref(m, playback_client->encode_buf, n);
+        spice_marshaller_add_by_ref_full(m, playback_client->encode_buf, n,
+                                         marshaller_unref_pipe_item, pipe_item);
     }
 
-    return snd_begin_send_message(client);
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int playback_send_mode(PlaybackChannelClient *playback_client)
@@ -842,114 +639,13 @@ static int playback_send_mode(PlaybackChannelClient *playback_client)
     SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
     SpiceMsgPlaybackMode mode;
 
-    if (!snd_reset_send_data(client, SPICE_MSG_PLAYBACK_MODE)) {
-        return FALSE;
-    }
+    red_channel_client_init_send_data(rcc, SPICE_MSG_PLAYBACK_MODE);
     mode.time = reds_get_mm_time();
     mode.mode = playback_client->mode;
     spice_marshall_msg_playback_mode(m, &mode);
 
-    return snd_begin_send_message(client);
-}
-
-static void snd_playback_send(void* data)
-{
-    PlaybackChannelClient *playback_client = (PlaybackChannelClient*)data;
-    SndChannelClient *client = SND_CHANNEL_CLIENT(playback_client);
-
-    if (!playback_client || !snd_send_data(data)) {
-        return;
-    }
-
-    client->command &= SND_PLAYBACK_MODE_MASK|SND_PLAYBACK_PCM_MASK|
-                       SND_CTRL_MASK|SND_VOLUME_MUTE_MASK|
-                       SND_MIGRATE_MASK|SND_PLAYBACK_LATENCY_MASK;
-    while (client->command) {
-        if (client->command & SND_PLAYBACK_MODE_MASK) {
-            if (!playback_send_mode(playback_client)) {
-                return;
-            }
-            client->command &= ~SND_PLAYBACK_MODE_MASK;
-        }
-        if (client->command & SND_PLAYBACK_PCM_MASK) {
-            spice_assert(!playback_client->in_progress && playback_client->pending_frame);
-            playback_client->in_progress = playback_client->pending_frame;
-            playback_client->pending_frame = NULL;
-            client->command &= ~SND_PLAYBACK_PCM_MASK;
-            if (!snd_playback_send_write(playback_client)) {
-                spice_printerr("snd_send_playback_write failed");
-                return;
-            }
-        }
-        if (client->command & SND_CTRL_MASK) {
-            if (!snd_playback_send_ctl(playback_client)) {
-                return;
-            }
-            client->command &= ~SND_CTRL_MASK;
-        }
-        if (client->command & SND_VOLUME_MASK) {
-            if (!snd_playback_send_volume(playback_client)) {
-                return;
-            }
-            client->command &= ~SND_VOLUME_MASK;
-        }
-        if (client->command & SND_MUTE_MASK) {
-            if (!snd_playback_send_mute(playback_client)) {
-                return;
-            }
-            client->command &= ~SND_MUTE_MASK;
-        }
-        if (client->command & SND_MIGRATE_MASK) {
-            if (!snd_playback_send_migrate(playback_client)) {
-                return;
-            }
-            client->command &= ~SND_MIGRATE_MASK;
-        }
-        if (client->command & SND_PLAYBACK_LATENCY_MASK) {
-            if (!snd_playback_send_latency(playback_client)) {
-                return;
-            }
-            client->command &= ~SND_PLAYBACK_LATENCY_MASK;
-        }
-    }
-}
-
-static void snd_record_send(void* data)
-{
-    RecordChannelClient *record_client = (RecordChannelClient*)data;
-    SndChannelClient *client = SND_CHANNEL_CLIENT(record_client);
-
-    if (!record_client || !snd_send_data(data)) {
-        return;
-    }
-
-    client->command &= SND_CTRL_MASK|SND_VOLUME_MUTE_MASK|SND_MIGRATE_MASK;
-    while (client->command) {
-        if (client->command & SND_CTRL_MASK) {
-            if (!snd_record_send_ctl(record_client)) {
-                return;
-            }
-            client->command &= ~SND_CTRL_MASK;
-        }
-        if (client->command & SND_VOLUME_MASK) {
-            if (!snd_record_send_volume(record_client)) {
-                return;
-            }
-            client->command &= ~SND_VOLUME_MASK;
-        }
-        if (client->command & SND_MUTE_MASK) {
-            if (!snd_record_send_mute(record_client)) {
-                return;
-            }
-            client->command &= ~SND_MUTE_MASK;
-        }
-        if (client->command & SND_MIGRATE_MASK) {
-            if (!snd_record_send_migrate(record_client)) {
-                return;
-            }
-            client->command &= ~SND_MIGRATE_MASK;
-        }
-    }
+    red_channel_client_begin_send_message(rcc);
+    return TRUE;
 }
 
 static int snd_channel_config_socket(RedChannelClient *rcc);
@@ -957,34 +653,16 @@ static int snd_channel_config_socket(RedChannelClient *rcc);
 static SndChannelClient *__new_channel(SndChannel *channel, int size, uint32_t channel_id,
                                        RedClient *red_client,
                                        RedsStream *stream,
-                                       snd_channel_send_messages_proc send_messages,
-                                       snd_channel_handle_message_proc handle_message,
                                        snd_channel_on_message_done_proc on_message_done,
                                        snd_channel_cleanup_channel_proc cleanup,
                                        uint32_t *common_caps, int num_common_caps,
                                        uint32_t *caps, int num_caps)
 {
     SndChannelClient *client;
-    RedsState *reds = red_channel_get_server(RED_CHANNEL(channel));
 
     spice_assert(size >= sizeof(*client));
     client = spice_malloc0(size);
     client->refs = 1;
-    client->parser = spice_get_client_channel_parser(channel_id, NULL);
-    client->stream = stream;
-    client->receive_data.message_start = client->receive_data.buf;
-    client->receive_data.now = client->receive_data.buf;
-    client->receive_data.end = client->receive_data.buf + sizeof(client->receive_data.buf);
-
-    stream->watch = reds_core_watch_add(reds, stream->socket, SPICE_WATCH_EVENT_READ,
-                                        snd_event, client);
-    if (stream->watch == NULL) {
-        spice_printerr("watch_add failed, %s", strerror(errno));
-        goto error2;
-    }
-
-    client->send_messages = send_messages;
-    client->handle_message = handle_message;
     client->on_message_done = on_message_done;
     client->cleanup = cleanup;
 
@@ -994,6 +672,13 @@ static SndChannelClient *__new_channel(SndChannel *channel, int size, uint32_t c
     if (!client->channel_client) {
         goto error2;
     }
+
+    /* SndChannelClient is not yet a RedChannelClient, but we still need to go from our
+     * RedChannelClient implementation (DummyChannelClient) to the SndChannelClient instance
+     * in various vfuncs
+     */
+    g_object_set_data(G_OBJECT(client->channel_client), "sound-channel-client", client);
+
     if (!snd_channel_config_socket(RED_CHANNEL_CLIENT(client->channel_client))) {
         goto error2;
     }
@@ -1005,6 +690,135 @@ error2:
     return NULL;
 }
 
+/* This function is called when the "persistent" item is removed from the
+ * queue. Note that there is not free call as the item is allocated into
+ * SndChannelClient.
+ * This is used to have a simple item in RedChannelClient queue but to send
+ * multiple messages in a row if possible.
+ * During realtime sound transmission you usually don't want to queue too
+ * much data or having retransmission preferring instead loosing some
+ * samples.
+ */
+
+static void snd_persistent_pipe_item_free(struct RedPipeItem *item)
+{
+    SndChannelClient *client = SPICE_CONTAINEROF(item, SndChannelClient, persistent_pipe_item);
+
+    red_pipe_item_init_full(item, RED_PIPE_ITEM_PERSISTENT,
+                            snd_persistent_pipe_item_free);
+
+    if (client->on_message_done) {
+        client->on_message_done(client);
+    }
+}
+
+static void snd_send(SndChannelClient * client)
+{
+    RedChannelClient *rcc = client->channel_client;
+
+    if (!client || !red_channel_client_pipe_is_empty(rcc) || !client->command) {
+        return;
+    }
+    // just append a dummy item and push!
+    red_pipe_item_init_full(&client->persistent_pipe_item, RED_PIPE_ITEM_PERSISTENT,
+                            snd_persistent_pipe_item_free);
+    red_channel_client_pipe_add_push(rcc, &client->persistent_pipe_item);
+}
+
+static void playback_channel_send_item(RedChannelClient *rcc, G_GNUC_UNUSED RedPipeItem *item)
+{
+    SndChannelClient *client = snd_channel_client_from_dummy(rcc);
+    PlaybackChannelClient *playback_client = SPICE_CONTAINEROF(client, PlaybackChannelClient, base);
+
+    client->command &= SND_PLAYBACK_MODE_MASK|SND_PLAYBACK_PCM_MASK|
+                       SND_CTRL_MASK|SND_VOLUME_MUTE_MASK|
+                       SND_MIGRATE_MASK|SND_PLAYBACK_LATENCY_MASK;
+    while (client->command) {
+        if (client->command & SND_PLAYBACK_MODE_MASK) {
+            client->command &= ~SND_PLAYBACK_MODE_MASK;
+            if (playback_send_mode(playback_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_PLAYBACK_PCM_MASK) {
+            spice_assert(!playback_client->in_progress && playback_client->pending_frame);
+            playback_client->in_progress = playback_client->pending_frame;
+            playback_client->pending_frame = NULL;
+            client->command &= ~SND_PLAYBACK_PCM_MASK;
+            if (snd_playback_send_write(playback_client)) {
+                break;
+            }
+            spice_printerr("snd_send_playback_write failed");
+        }
+        if (client->command & SND_CTRL_MASK) {
+            client->command &= ~SND_CTRL_MASK;
+            if (snd_playback_send_ctl(playback_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_VOLUME_MASK) {
+            client->command &= ~SND_VOLUME_MASK;
+            if (snd_playback_send_volume(playback_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_MUTE_MASK) {
+            client->command &= ~SND_MUTE_MASK;
+            if (snd_playback_send_mute(playback_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_MIGRATE_MASK) {
+            client->command &= ~SND_MIGRATE_MASK;
+            if (snd_playback_send_migrate(playback_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_PLAYBACK_LATENCY_MASK) {
+            client->command &= ~SND_PLAYBACK_LATENCY_MASK;
+            if (snd_playback_send_latency(playback_client)) {
+                break;
+            }
+        }
+    }
+    snd_send(client);
+}
+
+static void record_channel_send_item(RedChannelClient *rcc, G_GNUC_UNUSED RedPipeItem *item)
+{
+    SndChannelClient *client = snd_channel_client_from_dummy(rcc);
+    RecordChannelClient *record_client = SPICE_CONTAINEROF(client, RecordChannelClient, base);
+
+    client->command &= SND_CTRL_MASK|SND_VOLUME_MUTE_MASK|SND_MIGRATE_MASK;
+    while (client->command) {
+        if (client->command & SND_CTRL_MASK) {
+            client->command &= ~SND_CTRL_MASK;
+            if (snd_record_send_ctl(record_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_VOLUME_MASK) {
+            client->command &= ~SND_VOLUME_MASK;
+            if (snd_record_send_volume(record_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_MUTE_MASK) {
+            client->command &= ~SND_MUTE_MASK;
+            if (snd_record_send_mute(record_client)) {
+                break;
+            }
+        }
+        if (client->command & SND_MIGRATE_MASK) {
+            client->command &= ~SND_MIGRATE_MASK;
+            if (snd_record_send_migrate(record_client)) {
+                break;
+            }
+        }
+    }
+    snd_send(client);
+}
+
 static int snd_channel_config_socket(RedChannelClient *rcc)
 {
     int delay_val;
@@ -1056,20 +870,31 @@ static int snd_channel_config_socket(RedChannelClient *rcc)
 
 static void snd_channel_on_disconnect(RedChannelClient *rcc)
 {
-    g_assert_not_reached();
+    SndChannel *channel = SND_CHANNEL(red_channel_client_get_channel(rcc));
+    if (channel->connection && rcc == RED_CHANNEL_CLIENT(channel->connection)) {
+        channel->connection = NULL;
+    }
 }
 
 static uint8_t*
 snd_channel_client_alloc_recv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size)
 {
-    g_assert_not_reached();
+    SndChannelClient *client = snd_channel_client_from_dummy(rcc);
+    // If message is too big allocate one, this should never happen
+    if (size > sizeof(client->receive_buf)) {
+        return spice_malloc(size);
+    }
+    return client->receive_buf;
 }
 
 static void
 snd_channel_client_release_recv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size,
                                     uint8_t *msg)
 {
-    g_assert_not_reached();
+    SndChannelClient *client = snd_channel_client_from_dummy(rcc);
+    if (msg != client->receive_buf) {
+        free(msg);
+    }
 }
 
 static void snd_disconnect_channel_client(RedChannelClient *rcc)
@@ -1141,7 +966,7 @@ static void snd_playback_start(SndChannel *channel)
     client->active = TRUE;
     if (!client->client_active) {
         snd_set_command(client, SND_CTRL_MASK);
-        snd_playback_send(client);
+        snd_send(client);
     } else {
         client->command &= ~SND_CTRL_MASK;
     }
@@ -1165,7 +990,7 @@ SPICE_GNUC_VISIBLE void spice_server_playback_stop(SpicePlaybackInstance *sin)
     client->active = FALSE;
     if (client->client_active) {
         snd_set_command(client, SND_CTRL_MASK);
-        snd_playback_send(client);
+        snd_send(client);
     } else {
         client->command &= ~SND_CTRL_MASK;
         client->command &= ~SND_PLAYBACK_PCM_MASK;
@@ -1231,7 +1056,7 @@ SPICE_GNUC_VISIBLE void spice_server_playback_put_samples(SpicePlaybackInstance
     frame->time = reds_get_mm_time();
     playback_client->pending_frame = frame;
     snd_set_command(SND_CHANNEL_CLIENT(playback_client), SND_PLAYBACK_PCM_MASK);
-    snd_playback_send(SND_CHANNEL_CLIENT(playback_client));
+    snd_send(SND_CHANNEL_CLIENT(playback_client));
 }
 
 void snd_set_playback_latency(RedClient *client, uint32_t latency)
@@ -1250,7 +1075,7 @@ void snd_set_playback_latency(RedClient *client, uint32_t latency)
 
                 playback->latency = latency;
                 snd_set_command(now->connection, SND_PLAYBACK_LATENCY_MASK);
-                snd_playback_send(now->connection);
+                snd_send(now->connection);
             } else {
                 spice_debug("client doesn't not support SPICE_PLAYBACK_CAP_LATENCY");
             }
@@ -1327,8 +1152,6 @@ static void snd_set_playback_peer(RedChannel *red_channel, RedClient *client, Re
                                                                    SPICE_CHANNEL_PLAYBACK,
                                                                    client,
                                                                    stream,
-                                                                   snd_playback_send,
-                                                                   snd_playback_handle_message,
                                                                    snd_playback_on_message_done,
                                                                    snd_playback_cleanup,
                                                                    common_caps, num_common_caps,
@@ -1363,7 +1186,7 @@ static void snd_set_playback_peer(RedChannel *red_channel, RedClient *client, Re
     if (channel->active) {
         snd_playback_start(channel);
     }
-    snd_playback_send(channel->connection);
+    snd_send(channel->connection);
 }
 
 static void snd_record_migrate_channel_client(RedChannelClient *rcc)
@@ -1377,7 +1200,7 @@ static void snd_record_migrate_channel_client(RedChannelClient *rcc)
     if (channel->connection) {
         spice_assert(channel->connection->channel_client == rcc);
         snd_set_command(channel->connection, SND_MIGRATE_MASK);
-        snd_record_send(channel->connection);
+        snd_send(channel->connection);
     }
 }
 
@@ -1427,7 +1250,7 @@ static void snd_record_start(SndChannel *channel)
     client->active = TRUE;
     if (!client->client_active) {
         snd_set_command(client, SND_CTRL_MASK);
-        snd_record_send(client);
+        snd_send(client);
     } else {
         client->command &= ~SND_CTRL_MASK;
     }
@@ -1449,7 +1272,7 @@ SPICE_GNUC_VISIBLE void spice_server_record_stop(SpiceRecordInstance *sin)
     client->active = FALSE;
     if (client->client_active) {
         snd_set_command(client, SND_CTRL_MASK);
-        snd_record_send(client);
+        snd_send(client);
     } else {
         client->command &= ~SND_CTRL_MASK;
     }
@@ -1560,8 +1383,6 @@ static void snd_set_record_peer(RedChannel *red_channel, RedClient *client, Reds
                                                                SPICE_CHANNEL_RECORD,
                                                                client,
                                                                stream,
-                                                               snd_record_send,
-                                                               snd_record_handle_message,
                                                                snd_record_on_message_done,
                                                                snd_record_cleanup,
                                                                common_caps, num_common_caps,
@@ -1575,7 +1396,7 @@ static void snd_set_record_peer(RedChannel *red_channel, RedClient *client, Reds
     if (channel->active) {
         snd_record_start(channel);
     }
-    snd_record_send(channel->connection);
+    snd_send(channel->connection);
 }
 
 static void snd_playback_migrate_channel_client(RedChannelClient *rcc)
@@ -1590,7 +1411,7 @@ static void snd_playback_migrate_channel_client(RedChannelClient *rcc)
     if (channel->connection) {
         spice_assert(channel->connection->channel_client == rcc);
         snd_set_command(channel->connection, SND_MIGRATE_MASK);
-        snd_playback_send(channel->connection);
+        snd_send(channel->connection);
     }
 }
 
@@ -1662,8 +1483,13 @@ static void
 playback_channel_class_init(PlaybackChannelClass *klass)
 {
     GObjectClass *object_class = G_OBJECT_CLASS(klass);
+    RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass);
 
     object_class->constructed = playback_channel_constructed;
+
+    channel_class->parser = spice_get_client_channel_parser(SPICE_CHANNEL_PLAYBACK, NULL);
+    channel_class->handle_parsed = playback_channel_handle_parsed;
+    channel_class->send_item = playback_channel_send_item;
 }
 
 void snd_attach_playback(RedsState *reds, SpicePlaybackInstance *sin)
@@ -1708,8 +1534,13 @@ static void
 record_channel_class_init(RecordChannelClass *klass)
 {
     GObjectClass *object_class = G_OBJECT_CLASS(klass);
+    RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass);
 
     object_class->constructed = record_channel_constructed;
+
+    channel_class->parser = spice_get_client_channel_parser(SPICE_CHANNEL_RECORD, NULL);
+    channel_class->handle_parsed = record_channel_handle_parsed;
+    channel_class->send_item = record_channel_send_item;
 }
 
 void snd_attach_record(RedsState *reds, SpiceRecordInstance *sin)
-- 
2.9.3



More information about the Spice-devel mailing list