[Spice-devel] [PATCH 10/10] server/main_channel: use red_channel (most code is pipe send/marshall separation)

Alon Levy alevy at redhat.com
Wed Jan 12 21:01:43 PST 2011


---
 server/main_channel.c |  778 ++++++++++++++++++++++++++++---------------------
 1 files changed, 441 insertions(+), 337 deletions(-)

diff --git a/server/main_channel.c b/server/main_channel.c
index e42f173..f1fb4c6 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -38,6 +38,7 @@
 #include "common/messages.h"
 #include "reds.h"
 #include "main_channel.h"
+#include "red_channel.h"
 #include "generated_marshallers.h"
 
 #define ZERO_BUF_SIZE 4096
@@ -55,34 +56,67 @@ static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
 
 typedef struct RedsOutItem RedsOutItem;
 struct RedsOutItem {
-    RingItem link;
-    SpiceMarshaller *m;
-    SpiceDataHeader *header;
+    PipeItem base;
 };
 
-typedef struct RedsOutgoingData {
-    Ring pipe;
-    RedsOutItem *item;
-    int vec_size;
-    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
-    struct iovec *vec;
-} RedsOutgoingData;
-
-// TODO - remove and use red_channel.h
-typedef struct IncomingHandler {
-    spice_parse_channel_func_t parser;
+typedef struct PingPipeItem {
+    PipeItem base;
+    int size;
+} PingPipeItem;
+
+typedef struct MouseModePipeItem {
+    PipeItem base;
+    int current_mode;
+    int is_client_mouse_allowed;
+} MouseModePipeItem;
+
+typedef struct TokensPipeItem {
+    PipeItem base;
+    int tokens;
+} TokensPipeItem;
+
+typedef struct AgentDataPipeItem {
+    PipeItem base;
+    uint8_t* data;
+    size_t len;
+    spice_marshaller_item_free_func free_data;
     void *opaque;
-    int shut;
-    uint8_t buf[RECEIVE_BUF_SIZE];
-    uint32_t end_pos;
-    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
-} IncomingHandler;
+} AgentDataPipeItem;
+
+typedef struct InitPipeItem {
+    PipeItem base;
+    int connection_id;
+    int display_channels_hint;
+    int current_mouse_mode;
+    int is_client_mouse_allowed;
+    int multi_media_time;
+    int ram_hint;
+} InitPipeItem;
+
+typedef struct NotifyPipeItem {
+    PipeItem base;
+    uint8_t *mess;
+    int mess_len;
+} NotifyPipeItem;
+
+typedef struct MigrateBeginPipeItem {
+    PipeItem base;
+    int port;
+    int sport;
+    char *host;
+    uint16_t cert_pub_key_type;
+    uint32_t cert_pub_key_len;
+    uint8_t *cert_pub_key;
+} MigrateBeginPipeItem;
+
+typedef struct MultiMediaTimePipeItem {
+    PipeItem base;
+    int time;
+} MultiMediaTimePipeItem;
 
 typedef struct MainChannel {
-    RedsStreamContext *peer;
-    IncomingHandler in_handler;
-    RedsOutgoingData outgoing;
-    uint64_t serial; //migrate me
+    RedChannel base;
+    uint8_t recv_buf[RECEIVE_BUF_SIZE];
     uint32_t ping_id;
     uint32_t net_test_id;
     int net_test_stage;
@@ -98,45 +132,13 @@ enum NetTestStage {
 static uint64_t latency = 0;
 uint64_t bitrate_per_sec = ~0;
 
-static void main_channel_out_item_free(RedsOutItem *item);
-
-static void main_reset_outgoing(MainChannel *main_chan)
-{
-    RedsOutgoingData *outgoing = &main_chan->outgoing;
-    RingItem *ring_item;
-
-    if (outgoing->item) {
-        main_channel_out_item_free(outgoing->item);
-        outgoing->item = NULL;
-    }
-    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
-        RedsOutItem *out_item = (RedsOutItem *)ring_item;
-        ring_remove(ring_item);
-        main_channel_out_item_free(out_item);
-    }
-    outgoing->vec_size = 0;
-    outgoing->vec = outgoing->vec_buf;
-}
-
-// ALON from reds_disconnect
 static void main_disconnect(MainChannel *main_chan)
 {
-    if (!main_chan || !main_chan->peer) {
-        return;
-    }
-    main_reset_outgoing(main_chan);
-    core->watch_remove(main_chan->peer->watch);
-    main_chan->peer->watch = NULL;
-    main_chan->peer->cb_free(main_chan->peer);
-    main_chan->peer = NULL;
-    main_chan->in_handler.shut = TRUE;
-    main_chan->serial = 0;
     main_chan->ping_id = 0;
     main_chan->net_test_id = 0;
     main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
-    main_chan->in_handler.end_pos = 0;
+    red_channel_destroy(&main_chan->base);
 
-    // TODO: Should probably reset these on the ping start, not here
     latency = 0;
     bitrate_per_sec = ~0;
 }
@@ -149,233 +151,199 @@ void main_channel_start_net_test(Channel *channel)
         return;
     }
 
-    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES) &&
-                            main_channel_push_ping(channel, 0) &&
-                            main_channel_push_ping(channel, NET_TEST_BYTES)) {
+    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)
+        && main_channel_push_ping(channel, 0)
+        && main_channel_push_ping(channel, NET_TEST_BYTES)) {
         main_chan->net_test_id = main_chan->ping_id - 2;
         main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
     }
 }
 
-static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+static RedsOutItem *main_pipe_item_new(MainChannel *main_chan, int type)
 {
-    for (;;) {
-        uint8_t *buf = handler->buf;
-        uint32_t pos = handler->end_pos;
-        uint8_t *end = buf + pos;
-        SpiceDataHeader *header;
-        int n;
-        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
-        if (n <= 0) {
-            if (n == 0) {
-                return -1;
-            }
-            switch (errno) {
-            case EAGAIN:
-                return 0;
-            case EINTR:
-                break;
-            case EPIPE:
-                return -1;
-            default:
-                red_printf("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            pos += n;
-            end = buf + pos;
-            while (buf + sizeof(SpiceDataHeader) <= end &&
-                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
-                uint8_t *data = (uint8_t *)(header+1);
-                size_t parsed_size;
-                uint8_t *parsed;
-                message_destructor_t parsed_free;
-
-                buf += sizeof(SpiceDataHeader) + header->size;
-                parsed = handler->parser(data, data + header->size, header->type,
-                                         SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
-                if (parsed == NULL) {
-                    red_printf("failed to parse message type %d", header->type);
-                    return -1;
-                }
-                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
-                parsed_free(parsed);
-                if (handler->shut) {
-                    return -1;
-                }
-            }
-            memmove(handler->buf, buf, (handler->end_pos = end - buf));
-        }
-    }
+    RedsOutItem *item = spice_malloc(sizeof(RedsOutItem));
+
+    red_channel_pipe_item_init(&main_chan->base, &item->base, type);
+    return item;
 }
 
-static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
+static MouseModePipeItem *main_mouse_mode_item_new(MainChannel *main_chan,
+    int current_mode, int is_client_mouse_allowed)
 {
-    RedsOutItem *item;
-
-    item = spice_new(RedsOutItem, 1);
-    ring_item_init(&item->link);
+    MouseModePipeItem *item = spice_malloc(sizeof(MouseModePipeItem));
 
-    item->m = spice_marshaller_new();
-    item->header = (SpiceDataHeader *)
-        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
-    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_MOUSE_MODE);
+    item->current_mode = current_mode;
+    item->is_client_mouse_allowed = is_client_mouse_allowed;
+    return item;
+}
 
-    item->header->serial = ++main_chan->serial;
-    item->header->type = type;
-    item->header->sub_list = 0;
+static PingPipeItem *main_ping_item_new(MainChannel *channel, int size)
+{
+    PingPipeItem *item = spice_malloc(sizeof(PingPipeItem));
 
+    red_channel_pipe_item_init(&channel->base, &item->base, SPICE_MSG_PING);
+    item->size = size;
     return item;
 }
 
-static void main_channel_out_item_free(RedsOutItem *item)
+static TokensPipeItem *main_tokens_item_new(MainChannel *main_chan, int tokens)
 {
-    spice_marshaller_destroy(item->m);
-    free(item);
+    TokensPipeItem *item = spice_malloc(sizeof(TokensPipeItem));
+
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_AGENT_TOKEN);
+    item->tokens = tokens;
+    return item;
 }
 
-static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int *vec_size)
+static AgentDataPipeItem *main_agent_data_item_new(MainChannel *channel,
+           uint8_t* data, size_t len,
+           spice_marshaller_item_free_func free_data, void *opaque)
 {
-    struct iovec *now = vec;
+    AgentDataPipeItem *item = spice_malloc(sizeof(AgentDataPipeItem));
 
-    while (skip && skip >= now->iov_len) {
-        skip -= now->iov_len;
-        --*vec_size;
-        now++;
-    }
-    now->iov_base = (uint8_t *)now->iov_base + skip;
-    now->iov_len -= skip;
-    return now;
+    red_channel_pipe_item_init(&channel->base, &item->base, SPICE_MSG_MAIN_AGENT_DATA);
+    item->data = data;
+    item->len = len;
+    item->free_data = free_data;
+    item->opaque = opaque;
+    return item;
 }
 
-static int main_channel_send_data(MainChannel *main_chan)
+static InitPipeItem *main_init_item_new(MainChannel *main_chan,
+    int connection_id, int display_channels_hint, int current_mouse_mode,
+    int is_client_mouse_allowed, int multi_media_time,
+    int ram_hint)
 {
-    RedsOutgoingData *outgoing = &main_chan->outgoing;
-    int n;
-
-    if (!outgoing->item) {
-        return TRUE;
-    }
+    InitPipeItem *item = spice_malloc(sizeof(InitPipeItem));
 
-    ASSERT(outgoing->vec_size);
-    for (;;) {
-        if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
-            switch (errno) {
-            case EAGAIN:
-                core->watch_update_mask(main_chan->peer->watch,
-                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
-                return FALSE;
-            case EINTR:
-                break;
-            case EPIPE:
-                reds_disconnect();
-                return FALSE;
-            default:
-                red_printf("%s", strerror(errno));
-                reds_disconnect();
-                return FALSE;
-            }
-        } else {
-            outgoing->vec = main_channel_iovec_skip(outgoing->vec, n, &outgoing->vec_size);
-            if (!outgoing->vec_size) {
-                main_channel_out_item_free(outgoing->item);
-                outgoing->item = NULL;
-                outgoing->vec = outgoing->vec_buf;
-                return TRUE;
-            }
-        }
-    }
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_INIT);
+    item->connection_id = connection_id;
+    item->display_channels_hint = display_channels_hint;
+    item->current_mouse_mode = current_mouse_mode;
+    item->is_client_mouse_allowed = is_client_mouse_allowed;
+    item->multi_media_time = multi_media_time;
+    item->ram_hint = ram_hint;
+    return item;
 }
 
-static void main_channel_push(MainChannel *main_chan)
+static NotifyPipeItem *main_notify_item_new(MainChannel *main_chan,
+                                        uint8_t *mess, const int mess_len)
 {
-    RedsOutgoingData *outgoing = &main_chan->outgoing;
-    RingItem *ring_item;
-    RedsOutItem *item;
+    NotifyPipeItem *item = spice_malloc(sizeof(NotifyPipeItem));
 
-    for (;;) {
-        if (!main_chan->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
-            return;
-        }
-        ring_remove(ring_item);
-        outgoing->item = item = (RedsOutItem *)ring_item;
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_NOTIFY);
+    item->mess = mess;
+    item->mess_len = mess_len;
+    return item;
+}
 
-        spice_marshaller_flush(item->m);
-        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
+static MigrateBeginPipeItem *main_migrate_begin_item_new(
+    MainChannel *main_chan, int port, int sport,
+    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
+    uint8_t *cert_pub_key)
+{
+    MigrateBeginPipeItem *item = spice_malloc(sizeof(MigrateBeginPipeItem));
 
-        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
-                                                         outgoing->vec_buf,
-                                                         REDS_MAX_SEND_IOVEC, 0);
-        main_channel_send_data(main_chan);
-    }
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    item->port = port;
+    item->sport = sport;
+    item->host = host;
+    item->cert_pub_key_type = cert_pub_key_type;
+    item->cert_pub_key_len = cert_pub_key_len;
+    item->cert_pub_key = cert_pub_key;
+    return item;
 }
 
-static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem *item)
+static MultiMediaTimePipeItem *main_multi_media_time_item_new(
+    MainChannel *main_chan, int time)
 {
-    ring_add(&main_chan->outgoing.pipe, &item->link);
-    main_channel_push(main_chan);
+    MultiMediaTimePipeItem *item;
+
+    item = spice_malloc(sizeof(MultiMediaTimePipeItem));
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
+    item->time = time;
+    return item;
 }
 
 static void main_channel_push_channels(MainChannel *main_chan)
 {
-    SpiceMsgChannels* channels_info;
     RedsOutItem *item;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
-    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds_num_of_channels() * sizeof(SpiceChannelId));
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_channels(MainChannel *main_chan)
+{
+    SpiceMsgChannels* channels_info;
+
+    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels)
+                            + reds_num_of_channels() * sizeof(SpiceChannelId));
     reds_fill_channels(channels_info);
-    spice_marshall_msg_main_channels_list(item->m, channels_info);
+    spice_marshall_msg_main_channels_list(
+        main_chan->base.send_data.marshaller, channels_info);
     free(channels_info);
-    main_channel_push_pipe_item(main_chan, item);
 }
 
 int main_channel_push_ping(Channel *channel, int size)
 {
-    struct timespec time_space;
-    RedsOutItem *item;
-    SpiceMsgPing ping;
     MainChannel *main_chan = channel->data;
-
-    if (!main_chan) {
+    PingPipeItem *item;
+    
+    if (main_chan == NULL) {
         return FALSE;
     }
-    item = new_out_item(main_chan, SPICE_MSG_PING);
+    item = main_ping_item_new(main_chan, size);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+    return TRUE;
+}
+
+static void main_channel_marshall_ping(MainChannel *main_chan, int size)
+{
+    struct timespec time_space;
+    SpiceMsgPing ping;
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
+
     ping.id = ++main_chan->ping_id;
     clock_gettime(CLOCK_MONOTONIC, &time_space);
     ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
-    spice_marshall_msg_ping(item->m, &ping);
+    spice_marshall_msg_ping(m, &ping);
 
     while (size > 0) {
         int now = MIN(ZERO_BUF_SIZE, size);
         size -= now;
-        spice_marshaller_add_ref(item->m, zero_page, now);
+        spice_marshaller_add_ref(m, zero_page, now);
     }
+}
 
-    main_channel_push_pipe_item(main_chan, item);
+void main_channel_push_mouse_mode(Channel *channel, int current_mode,
+                                  int is_client_mouse_allowed)
+{
+    MainChannel *main_chan = channel->data;
+    MouseModePipeItem *item;
 
-    return TRUE;
+    item = main_mouse_mode_item_new(main_chan, current_mode,
+                                    is_client_mouse_allowed);
+    red_channel_pipe_add(&main_chan->base, &item->base);
 }
 
-void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed)
+static void main_channel_marshall_mouse_mode(MainChannel *main_chan, int current_mode, int is_client_mouse_allowed)
 {
     SpiceMsgMainMouseMode mouse_mode;
-    RedsOutItem *item;
-    MainChannel *main_chan;
-
-    if (!channel) {
-        return;
-    }
-    main_chan = channel->data;
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
     mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
     if (is_client_mouse_allowed) {
         mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
     }
     mouse_mode.current_mode = current_mode;
-
-    spice_marshall_msg_main_mouse_mode(item->m, &mouse_mode);
-
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_mouse_mode(main_chan->base.send_data.marshaller,
+                                       &mouse_mode);
 }
 
 void main_channel_push_agent_connected(Channel *channel)
@@ -383,186 +351,319 @@ void main_channel_push_agent_connected(Channel *channel)
     RedsOutItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
-    main_channel_push_pipe_item(main_chan, item);
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
+    red_channel_pipe_add(&main_chan->base, &item->base);
 }
 
 void main_channel_push_agent_disconnected(Channel *channel)
 {
-    SpiceMsgMainAgentDisconnect disconnect;
     RedsOutItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_agent_disconnected(MainChannel *main_chan)
+{
+    SpiceMsgMainAgentDisconnect disconnect;
+
     disconnect.error_code = SPICE_LINK_ERR_OK;
-    spice_marshall_msg_main_agent_disconnected(item->m, &disconnect);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_agent_disconnected(
+        main_chan->base.send_data.marshaller, &disconnect);
 }
 
 void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
 {
-    SpiceMsgMainAgentTokens tokens;
-    RedsOutItem *item;
     MainChannel *main_chan = channel->data;
+    TokensPipeItem *item = main_tokens_item_new(main_chan, num_tokens);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_tokens(MainChannel *main_chan, uint32_t num_tokens)
+{
+    SpiceMsgMainAgentTokens tokens;
 
-    if (!main_chan) {
-        return;
-    }
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
     tokens.num_tokens = num_tokens;
-    spice_marshall_msg_main_agent_token(item->m, &tokens);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_agent_token(
+        main_chan->base.send_data.marshaller, &tokens);
 }
 
 void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
            spice_marshaller_item_free_func free_data, void *opaque)
 {
-    RedsOutItem *item;
     MainChannel *main_chan = channel->data;
+    AgentDataPipeItem *item;
+
+    item = main_agent_data_item_new(main_chan, data, len, free_data, opaque);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
-    spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
-    main_channel_push_pipe_item(main_chan, item);
+static void main_channel_marshall_agent_data(MainChannel *main_chan,
+                                  AgentDataPipeItem *item)
+{
+    spice_marshaller_add_ref_full(main_chan->base.send_data.marshaller,
+        item->data, item->len, item->free_data, item->opaque);
 }
 
 static void main_channel_push_migrate_data_item(MainChannel *main_chan)
 {
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
-    SpiceMarshaller *m = item->m;
+    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE_DATA);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_migrate_data_item(MainChannel *main_chan)
+{
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
     MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
 
     reds_marshall_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
-    data->serial = main_chan->serial;
+    data->serial = red_channel_get_message_serial(&main_chan->base);
     data->ping_id = main_chan->ping_id;
-    main_channel_push_pipe_item(main_chan, item);
 }
 
-static void main_channel_receive_migrate_data(MainChannel *main_chan, MainMigrateData *data, uint8_t *end)
+static void main_channel_receive_migrate_data(MainChannel *main_chan,
+                                  MainMigrateData *data, uint8_t *end)
 {
-    main_chan->serial = data->serial;
+    red_channel_set_message_serial(&main_chan->base, data->serial);
     main_chan->ping_id = data->ping_id;
 }
 
-void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
-    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+void main_channel_push_init(Channel *channel, int connection_id,
+    int display_channels_hint, int current_mouse_mode,
+    int is_client_mouse_allowed, int multi_media_time,
     int ram_hint)
 {
-    RedsOutItem *item;
-    SpiceMsgMainInit init;
+    InitPipeItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
-    init.session_id = connection_id;
-    init.display_channels_hint = display_channels_hint;
-    init.current_mouse_mode = current_mouse_mode;
+    item = main_init_item_new(main_chan,
+             connection_id, display_channels_hint, current_mouse_mode,
+             is_client_mouse_allowed, multi_media_time, ram_hint);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_init(MainChannel *main_chan,
+                                       InitPipeItem *item)
+{
+    SpiceMsgMainInit init;
+
+    init.session_id = item->connection_id;
+    init.display_channels_hint = item->display_channels_hint;
+    init.current_mouse_mode = item->current_mouse_mode;
     init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
-    if (is_client_mouse_allowed) {
+    if (item->is_client_mouse_allowed) {
         init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
     }
     init.agent_connected = reds_has_vdagent();
     init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
-    init.multi_media_time = multi_media_time;
-    init.ram_hint = ram_hint;
-    spice_marshall_msg_main_init(item->m, &init);
-    main_channel_push_pipe_item(main_chan, item);
+    init.multi_media_time = item->multi_media_time;
+    init.ram_hint = item->ram_hint;
+    spice_marshall_msg_main_init(main_chan->base.send_data.marshaller, &init);
 }
 
 void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len)
 {
-    // TODO possible free-then-use bug - caller frees mess after this, but is that pointer being
-    // used by spice_marshaller?
-    RedsOutItem *item;
-    SpiceMsgNotify notify;
     MainChannel *main_chan = channel->data;
+    NotifyPipeItem *item = main_notify_item_new(main_chan, mess, mess_len);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
 
-    item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
-    notify.time_stamp = get_time_stamp();
+static void main_channel_marshall_notify(MainChannel *main_chan, NotifyPipeItem *item)
+{
+    SpiceMsgNotify notify;
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
+
+    notify.time_stamp = get_time_stamp(); // TODO - move to main_new_notify_item
     notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
     notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
     notify.what = SPICE_WARN_GENERAL;
-    notify.message_len = mess_len;
-    spice_marshall_msg_notify(item->m, &notify);
-    spice_marshaller_add(item->m, mess, mess_len + 1);
-    main_channel_push_pipe_item(main_chan, item);
+    notify.message_len = item->mess_len;
+    spice_marshall_msg_notify(m, &notify);
+    spice_marshaller_add(m, item->mess, item->mess_len + 1);
 }
 
-void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
-    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key)
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport,
+    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
+    uint8_t *cert_pub_key)
 {
     MainChannel *main_chan = channel->data;
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    MigrateBeginPipeItem *item = main_migrate_begin_item_new(main_chan, port,
+        sport, host, cert_pub_key_type, cert_pub_key_len, cert_pub_key);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_migrate_begin(MainChannel *main_chan,
+    MigrateBeginPipeItem *item)
+{
     SpiceMsgMainMigrationBegin migrate;
 
-    migrate.port = port;
-    migrate.sport = sport;
-    migrate.host_size = strlen(host) + 1;
-    migrate.host_data = (uint8_t *)host;
-    migrate.pub_key_type = cert_pub_key_type;
-    migrate.pub_key_size = cert_pub_key_len;
-    migrate.pub_key_data = cert_pub_key;
-    spice_marshall_msg_main_migrate_begin(item->m, &migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    migrate.port = item->port;
+    migrate.sport = item->sport;
+    migrate.host_size = strlen(item->host) + 1;
+    migrate.host_data = (uint8_t *)item->host;
+    migrate.pub_key_type = item->cert_pub_key_type;
+    migrate.pub_key_size = item->cert_pub_key_len;
+    migrate.pub_key_data = item->cert_pub_key;
+    spice_marshall_msg_main_migrate_begin(main_chan->base.send_data.marshaller,
+                                          &migrate);
 }
 
 void main_channel_push_migrate(Channel *channel)
 {
-    RedsOutItem *item;
-    SpiceMsgMigrate migrate;
     MainChannel *main_chan = channel->data;
+    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_migrate(MainChannel *main_chan)
+{
+    SpiceMsgMigrate migrate;
 
-    item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
     migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
-    spice_marshall_msg_migrate(item->m, &migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_migrate(main_chan->base.send_data.marshaller, &migrate);
 }
 
 void main_channel_push_migrate_cancel(Channel *channel)
 {
     MainChannel *main_chan = channel->data;
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
+    RedsOutItem *item = main_pipe_item_new(main_chan,
+                                           SPICE_MSG_MAIN_MIGRATE_CANCEL);
 
-    main_channel_push_pipe_item(main_chan, item);
+    red_channel_pipe_add(&main_chan->base, &item->base);
 }
 
 void main_channel_push_multi_media_time(Channel *channel, int time)
 {
-    SpiceMsgMainMultiMediaTime time_mes;
-    RedsOutItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
-    time_mes.time = time;
-    spice_marshall_msg_main_multi_media_time(item->m, &time_mes);
-    main_channel_push_pipe_item(main_chan, item);
+    MultiMediaTimePipeItem *item =
+        main_multi_media_time_item_new(main_chan, time);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static PipeItem *main_migrate_switch_item_new(MainChannel *main_chan)
+{
+    PipeItem *item = spice_malloc(sizeof(*item));
+
+    red_channel_pipe_item_init(&main_chan->base, item,
+                               SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+    return item;
 }
 
 void main_channel_push_migrate_switch(Channel *channel)
 {
+    MainChannel *main_chan = channel->data;
+
+    red_channel_pipe_add(&main_chan->base,
+        main_migrate_switch_item_new(main_chan));
+}
+
+static void main_channel_marshall_migrate_switch(MainChannel *main_chan)
+{
     SpiceMsgMainMigrationSwitchHost migrate;
-    RedsOutItem *item;
-    MainChannel *main_chan;
-    
-    if (!channel) {
-        return;
-    }
-    main_chan = channel->data;
+
     red_printf("");
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+
     reds_fill_mig_switch(&migrate);
-    spice_marshall_msg_main_migrate_switch_host(item->m, &migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_migrate_switch_host(
+        main_chan->base.send_data.marshaller, &migrate);
+
     reds_mig_release();
 }
 
-static void main_channel_handle_message(void *opaque, size_t size, uint32_t type, void *message)
+static void main_channel_marshall_multi_media_time(MainChannel *main_chan,
+    MultiMediaTimePipeItem *item)
+{
+    SpiceMsgMainMultiMediaTime time_mes;
+
+    time_mes.time = item->time;
+    spice_marshall_msg_main_multi_media_time(
+        main_chan->base.send_data.marshaller, &time_mes);
+}
+
+static void main_channel_send_item(RedChannel *channel, PipeItem *base)
+{
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
+
+    red_channel_reset_send_data(channel);
+    red_channel_init_send_data(channel, base->type, base);
+    switch (base->type) {
+        case SPICE_MSG_MAIN_CHANNELS_LIST:
+            main_channel_marshall_channels(main_chan);
+            break;
+        case SPICE_MSG_PING:
+            main_channel_marshall_ping(main_chan,
+                SPICE_CONTAINEROF(base, PingPipeItem, base)->size);
+            break;
+        case SPICE_MSG_MAIN_MOUSE_MODE:
+            {
+                MouseModePipeItem *item =
+                    SPICE_CONTAINEROF(base, MouseModePipeItem, base);
+                main_channel_marshall_mouse_mode(main_chan,
+                    item->current_mode, item->is_client_mouse_allowed);
+                break;
+            }
+        case SPICE_MSG_MAIN_AGENT_DISCONNECTED:
+            main_channel_marshall_agent_disconnected(main_chan);
+            break;
+        case SPICE_MSG_MAIN_AGENT_TOKEN:
+            main_channel_marshall_tokens(main_chan,
+                SPICE_CONTAINEROF(base, TokensPipeItem, base)->tokens);
+            break;
+        case SPICE_MSG_MAIN_AGENT_DATA:
+            main_channel_marshall_agent_data(main_chan,
+                SPICE_CONTAINEROF(base, AgentDataPipeItem, base));
+            break;
+        case SPICE_MSG_MIGRATE_DATA:
+            main_channel_marshall_migrate_data_item(main_chan);
+            break;
+        case SPICE_MSG_MAIN_INIT:
+            main_channel_marshall_init(main_chan,
+                SPICE_CONTAINEROF(base, InitPipeItem, base));
+            break;
+        case SPICE_MSG_NOTIFY:
+            main_channel_marshall_notify(main_chan,
+                SPICE_CONTAINEROF(base, NotifyPipeItem, base));
+            break;
+        case SPICE_MSG_MIGRATE:
+            main_channel_marshall_migrate(main_chan);
+            break;
+        case SPICE_MSG_MAIN_MIGRATE_BEGIN:
+            main_channel_marshall_migrate_begin(main_chan,
+                SPICE_CONTAINEROF(base, MigrateBeginPipeItem, base));
+            break;
+        case SPICE_MSG_MAIN_MULTI_MEDIA_TIME:
+            main_channel_marshall_multi_media_time(main_chan,
+                SPICE_CONTAINEROF(base, MultiMediaTimePipeItem, base));
+            break;
+        case SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST:
+            main_channel_marshall_migrate_switch(main_chan);
+            break;
+    };
+    red_channel_begin_send_message(channel);
+}
+
+static void main_channel_release_pipe_item(RedChannel *channel,
+    PipeItem *base, int item_pushed)
+{
+    free(base);
+}
+
+static int main_channel_handle_parsed(RedChannel *channel, size_t size, uint32_t type, void *message)
 {
-    MainChannel *main_chan = opaque;
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
 
     switch (type) {
     case SPICE_MSGC_MAIN_AGENT_START:
         red_printf("agent start");
         if (!main_chan) {
-            return;
+            return FALSE;
         }
         reds_on_main_agent_start(main_chan);
         break;
@@ -650,28 +751,29 @@ static void main_channel_handle_message(void *opaque, size_t size, uint32_t type
     default:
         red_printf("unexpected type %d", type);
     }
+    return TRUE;
 }
 
-static void main_channel_event(int fd, int event, void *data)
+static void main_channel_on_error(RedChannel *channel)
 {
-    MainChannel *main_chan = data;
+    reds_disconnect();
+}
 
-    if (event & SPICE_WATCH_EVENT_READ) {
-        if (handle_incoming(main_chan->peer, &main_chan->in_handler)) {
-            main_disconnect(main_chan);
-            reds_disconnect();
-        }
-    }
-    if (event & SPICE_WATCH_EVENT_WRITE) {
-        RedsOutgoingData *outgoing = &main_chan->outgoing;
-        if (main_channel_send_data(main_chan)) {
-            main_channel_push(main_chan);
-            if (!outgoing->item && main_chan->peer) {
-                core->watch_update_mask(main_chan->peer->watch,
-                                        SPICE_WATCH_EVENT_READ);
-            }
-        }
-    }
+static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
+{
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
+
+    return main_chan->recv_buf;
+}
+
+static void main_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
+                                               uint8_t *msg)
+{
+}
+
+static int main_channel_config_socket(RedChannel *channel)
+{
+    return TRUE;
 }
 
 static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
@@ -679,41 +781,44 @@ static void main_channel_link(Channel *channel, RedsStreamContext *peer, int mig
                         uint32_t *caps)
 {
     MainChannel *main_chan;
-
-    main_chan = spice_malloc0(sizeof(MainChannel));
+    red_printf("");
+    ASSERT(channel->data == NULL);
+
+    main_chan = (MainChannel*)red_channel_create_parser(
+        sizeof(*main_chan), peer, core, migration, FALSE /* handle_acks */
+        ,main_channel_config_socket
+        ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
+        ,main_channel_handle_parsed
+        ,main_channel_alloc_msg_rcv_buf
+        ,main_channel_release_msg_rcv_buf
+        ,main_channel_send_item
+        ,main_channel_release_pipe_item
+        ,main_channel_on_error
+        ,main_channel_on_error);
+    ASSERT(main_chan);
     channel->data = main_chan;
-    main_chan->peer = peer;
-    main_chan->in_handler.shut = FALSE;
-    main_chan->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
-    main_chan->in_handler.opaque = main_chan;
-    main_chan->in_handler.handle_message = main_channel_handle_message;
-    ring_init(&main_chan->outgoing.pipe);
-    main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
-    peer->watch = core->watch_add(peer->socket,
-                                  SPICE_WATCH_EVENT_READ,
-                                  main_channel_event, main_chan);
 }
 
 int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
 {
     MainChannel *main_chan = channel->data;
 
-    return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
+    return main_chan ? getsockname(main_chan->base.peer->socket, sa, salen) : -1;
 }
 
 int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen)
 {
     MainChannel *main_chan = channel->data;
 
-    return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
+    return main_chan ? getpeername(main_chan->base.peer->socket, sa, salen) : -1;
 }
 
 void main_channel_close(Channel *channel)
 {
     MainChannel *main_chan = channel->data;
 
-    if (main_chan && main_chan->peer) {
-        close(main_chan->peer->socket);
+    if (main_chan && main_chan->base.peer) {
+        close(main_chan->base.peer->socket);
     }
 }
 
@@ -722,9 +827,8 @@ static void main_channel_shutdown(Channel *channel)
     MainChannel *main_chan = channel->data;
 
     if (main_chan != NULL) {
-        main_disconnect(main_chan); // TODO - really here? reset peer etc.
+        main_disconnect(main_chan);
     }
-    free(main_chan);
 }
 
 static void main_channel_migrate()
-- 
1.7.3.4



More information about the Spice-devel mailing list