[Spice-devel] [PATCH 10/10] server/main_channel: use red_channel (most code is pipe send/marshall separation)

Hans de Goede hdegoede at redhat.com
Wed Jan 12 23:59:06 PST 2011


Ack.

On 01/13/2011 06:01 AM, Alon Levy wrote:
> ---
>   server/main_channel.c |  778 ++++++++++++++++++++++++++++---------------------
>   1 files changed, 441 insertions(+), 337 deletions(-)
>
> diff --git a/server/main_channel.c b/server/main_channel.c
> index e42f173..f1fb4c6 100644
> --- a/server/main_channel.c
> +++ b/server/main_channel.c
> @@ -38,6 +38,7 @@
>   #include "common/messages.h"
>   #include "reds.h"
>   #include "main_channel.h"
> +#include "red_channel.h"
>   #include "generated_marshallers.h"
>
>   #define ZERO_BUF_SIZE 4096
> @@ -55,34 +56,67 @@ static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
>
>   typedef struct RedsOutItem RedsOutItem;
>   struct RedsOutItem {
> -    RingItem link;
> -    SpiceMarshaller *m;
> -    SpiceDataHeader *header;
> +    PipeItem base;
>   };
>
> -typedef struct RedsOutgoingData {
> -    Ring pipe;
> -    RedsOutItem *item;
> -    int vec_size;
> -    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
> -    struct iovec *vec;
> -} RedsOutgoingData;
> -
> -// TODO - remove and use red_channel.h
> -typedef struct IncomingHandler {
> -    spice_parse_channel_func_t parser;
> +typedef struct PingPipeItem {
> +    PipeItem base;
> +    int size;
> +} PingPipeItem;
> +
> +typedef struct MouseModePipeItem {
> +    PipeItem base;
> +    int current_mode;
> +    int is_client_mouse_allowed;
> +} MouseModePipeItem;
> +
> +typedef struct TokensPipeItem {
> +    PipeItem base;
> +    int tokens;
> +} TokensPipeItem;
> +
> +typedef struct AgentDataPipeItem {
> +    PipeItem base;
> +    uint8_t* data;
> +    size_t len;
> +    spice_marshaller_item_free_func free_data;
>       void *opaque;
> -    int shut;
> -    uint8_t buf[RECEIVE_BUF_SIZE];
> -    uint32_t end_pos;
> -    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
> -} IncomingHandler;
> +} AgentDataPipeItem;
> +
> +typedef struct InitPipeItem {
> +    PipeItem base;
> +    int connection_id;
> +    int display_channels_hint;
> +    int current_mouse_mode;
> +    int is_client_mouse_allowed;
> +    int multi_media_time;
> +    int ram_hint;
> +} InitPipeItem;
> +
> +typedef struct NotifyPipeItem {
> +    PipeItem base;
> +    uint8_t *mess;
> +    int mess_len;
> +} NotifyPipeItem;
> +
> +typedef struct MigrateBeginPipeItem {
> +    PipeItem base;
> +    int port;
> +    int sport;
> +    char *host;
> +    uint16_t cert_pub_key_type;
> +    uint32_t cert_pub_key_len;
> +    uint8_t *cert_pub_key;
> +} MigrateBeginPipeItem;
> +
> +typedef struct MultiMediaTimePipeItem {
> +    PipeItem base;
> +    int time;
> +} MultiMediaTimePipeItem;
>
>   typedef struct MainChannel {
> -    RedsStreamContext *peer;
> -    IncomingHandler in_handler;
> -    RedsOutgoingData outgoing;
> -    uint64_t serial; //migrate me
> +    RedChannel base;
> +    uint8_t recv_buf[RECEIVE_BUF_SIZE];
>       uint32_t ping_id;
>       uint32_t net_test_id;
>       int net_test_stage;
> @@ -98,45 +132,13 @@ enum NetTestStage {
>   static uint64_t latency = 0;
>   uint64_t bitrate_per_sec = ~0;
>
> -static void main_channel_out_item_free(RedsOutItem *item);
> -
> -static void main_reset_outgoing(MainChannel *main_chan)
> -{
> -    RedsOutgoingData *outgoing =&main_chan->outgoing;
> -    RingItem *ring_item;
> -
> -    if (outgoing->item) {
> -        main_channel_out_item_free(outgoing->item);
> -        outgoing->item = NULL;
> -    }
> -    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
> -        RedsOutItem *out_item = (RedsOutItem *)ring_item;
> -        ring_remove(ring_item);
> -        main_channel_out_item_free(out_item);
> -    }
> -    outgoing->vec_size = 0;
> -    outgoing->vec = outgoing->vec_buf;
> -}
> -
> -// ALON from reds_disconnect
>   static void main_disconnect(MainChannel *main_chan)
>   {
> -    if (!main_chan || !main_chan->peer) {
> -        return;
> -    }
> -    main_reset_outgoing(main_chan);
> -    core->watch_remove(main_chan->peer->watch);
> -    main_chan->peer->watch = NULL;
> -    main_chan->peer->cb_free(main_chan->peer);
> -    main_chan->peer = NULL;
> -    main_chan->in_handler.shut = TRUE;
> -    main_chan->serial = 0;
>       main_chan->ping_id = 0;
>       main_chan->net_test_id = 0;
>       main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
> -    main_chan->in_handler.end_pos = 0;
> +    red_channel_destroy(&main_chan->base);
>
> -    // TODO: Should probably reset these on the ping start, not here
>       latency = 0;
>       bitrate_per_sec = ~0;
>   }
> @@ -149,233 +151,199 @@ void main_channel_start_net_test(Channel *channel)
>           return;
>       }
>
> -    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)&&
> -                            main_channel_push_ping(channel, 0)&&
> -                            main_channel_push_ping(channel, NET_TEST_BYTES)) {
> +    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)
> +&&  main_channel_push_ping(channel, 0)
> +&&  main_channel_push_ping(channel, NET_TEST_BYTES)) {
>           main_chan->net_test_id = main_chan->ping_id - 2;
>           main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
>       }
>   }
>
> -static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
> +static RedsOutItem *main_pipe_item_new(MainChannel *main_chan, int type)
>   {
> -    for (;;) {
> -        uint8_t *buf = handler->buf;
> -        uint32_t pos = handler->end_pos;
> -        uint8_t *end = buf + pos;
> -        SpiceDataHeader *header;
> -        int n;
> -        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
> -        if (n<= 0) {
> -            if (n == 0) {
> -                return -1;
> -            }
> -            switch (errno) {
> -            case EAGAIN:
> -                return 0;
> -            case EINTR:
> -                break;
> -            case EPIPE:
> -                return -1;
> -            default:
> -                red_printf("%s", strerror(errno));
> -                return -1;
> -            }
> -        } else {
> -            pos += n;
> -            end = buf + pos;
> -            while (buf + sizeof(SpiceDataHeader)<= end&&
> -                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size<= end) {
> -                uint8_t *data = (uint8_t *)(header+1);
> -                size_t parsed_size;
> -                uint8_t *parsed;
> -                message_destructor_t parsed_free;
> -
> -                buf += sizeof(SpiceDataHeader) + header->size;
> -                parsed = handler->parser(data, data + header->size, header->type,
> -                                         SPICE_VERSION_MINOR,&parsed_size,&parsed_free);
> -                if (parsed == NULL) {
> -                    red_printf("failed to parse message type %d", header->type);
> -                    return -1;
> -                }
> -                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
> -                parsed_free(parsed);
> -                if (handler->shut) {
> -                    return -1;
> -                }
> -            }
> -            memmove(handler->buf, buf, (handler->end_pos = end - buf));
> -        }
> -    }
> +    RedsOutItem *item = spice_malloc(sizeof(RedsOutItem));
> +
> +    red_channel_pipe_item_init(&main_chan->base,&item->base, type);
> +    return item;
>   }
>
> -static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
> +static MouseModePipeItem *main_mouse_mode_item_new(MainChannel *main_chan,
> +    int current_mode, int is_client_mouse_allowed)
>   {
> -    RedsOutItem *item;
> -
> -    item = spice_new(RedsOutItem, 1);
> -    ring_item_init(&item->link);
> +    MouseModePipeItem *item = spice_malloc(sizeof(MouseModePipeItem));
>
> -    item->m = spice_marshaller_new();
> -    item->header = (SpiceDataHeader *)
> -        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
> -    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
> +    red_channel_pipe_item_init(&main_chan->base,&item->base,
> +                               SPICE_MSG_MAIN_MOUSE_MODE);
> +    item->current_mode = current_mode;
> +    item->is_client_mouse_allowed = is_client_mouse_allowed;
> +    return item;
> +}
>
> -    item->header->serial = ++main_chan->serial;
> -    item->header->type = type;
> -    item->header->sub_list = 0;
> +static PingPipeItem *main_ping_item_new(MainChannel *channel, int size)
> +{
> +    PingPipeItem *item = spice_malloc(sizeof(PingPipeItem));
>
> +    red_channel_pipe_item_init(&channel->base,&item->base, SPICE_MSG_PING);
> +    item->size = size;
>       return item;
>   }
>
> -static void main_channel_out_item_free(RedsOutItem *item)
> +static TokensPipeItem *main_tokens_item_new(MainChannel *main_chan, int tokens)
>   {
> -    spice_marshaller_destroy(item->m);
> -    free(item);
> +    TokensPipeItem *item = spice_malloc(sizeof(TokensPipeItem));
> +
> +    red_channel_pipe_item_init(&main_chan->base,&item->base,
> +                               SPICE_MSG_MAIN_AGENT_TOKEN);
> +    item->tokens = tokens;
> +    return item;
>   }
>
> -static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int *vec_size)
> +static AgentDataPipeItem *main_agent_data_item_new(MainChannel *channel,
> +           uint8_t* data, size_t len,
> +           spice_marshaller_item_free_func free_data, void *opaque)
>   {
> -    struct iovec *now = vec;
> +    AgentDataPipeItem *item = spice_malloc(sizeof(AgentDataPipeItem));
>
> -    while (skip&&  skip>= now->iov_len) {
> -        skip -= now->iov_len;
> -        --*vec_size;
> -        now++;
> -    }
> -    now->iov_base = (uint8_t *)now->iov_base + skip;
> -    now->iov_len -= skip;
> -    return now;
> +    red_channel_pipe_item_init(&channel->base,&item->base, SPICE_MSG_MAIN_AGENT_DATA);
> +    item->data = data;
> +    item->len = len;
> +    item->free_data = free_data;
> +    item->opaque = opaque;
> +    return item;
>   }
>
> -static int main_channel_send_data(MainChannel *main_chan)
> +static InitPipeItem *main_init_item_new(MainChannel *main_chan,
> +    int connection_id, int display_channels_hint, int current_mouse_mode,
> +    int is_client_mouse_allowed, int multi_media_time,
> +    int ram_hint)
>   {
> -    RedsOutgoingData *outgoing =&main_chan->outgoing;
> -    int n;
> -
> -    if (!outgoing->item) {
> -        return TRUE;
> -    }
> +    InitPipeItem *item = spice_malloc(sizeof(InitPipeItem));
>
> -    ASSERT(outgoing->vec_size);
> -    for (;;) {
> -        if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
> -            switch (errno) {
> -            case EAGAIN:
> -                core->watch_update_mask(main_chan->peer->watch,
> -                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
> -                return FALSE;
> -            case EINTR:
> -                break;
> -            case EPIPE:
> -                reds_disconnect();
> -                return FALSE;
> -            default:
> -                red_printf("%s", strerror(errno));
> -                reds_disconnect();
> -                return FALSE;
> -            }
> -        } else {
> -            outgoing->vec = main_channel_iovec_skip(outgoing->vec, n,&outgoing->vec_size);
> -            if (!outgoing->vec_size) {
> -                main_channel_out_item_free(outgoing->item);
> -                outgoing->item = NULL;
> -                outgoing->vec = outgoing->vec_buf;
> -                return TRUE;
> -            }
> -        }
> -    }
> +    red_channel_pipe_item_init(&main_chan->base,&item->base,
> +                               SPICE_MSG_MAIN_INIT);
> +    item->connection_id = connection_id;
> +    item->display_channels_hint = display_channels_hint;
> +    item->current_mouse_mode = current_mouse_mode;
> +    item->is_client_mouse_allowed = is_client_mouse_allowed;
> +    item->multi_media_time = multi_media_time;
> +    item->ram_hint = ram_hint;
> +    return item;
>   }
>
> -static void main_channel_push(MainChannel *main_chan)
> +static NotifyPipeItem *main_notify_item_new(MainChannel *main_chan,
> +                                        uint8_t *mess, const int mess_len)
>   {
> -    RedsOutgoingData *outgoing =&main_chan->outgoing;
> -    RingItem *ring_item;
> -    RedsOutItem *item;
> +    NotifyPipeItem *item = spice_malloc(sizeof(NotifyPipeItem));
>
> -    for (;;) {
> -        if (!main_chan->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
> -            return;
> -        }
> -        ring_remove(ring_item);
> -        outgoing->item = item = (RedsOutItem *)ring_item;
> +    red_channel_pipe_item_init(&main_chan->base,&item->base,
> +                               SPICE_MSG_NOTIFY);
> +    item->mess = mess;
> +    item->mess_len = mess_len;
> +    return item;
> +}
>
> -        spice_marshaller_flush(item->m);
> -        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
> +static MigrateBeginPipeItem *main_migrate_begin_item_new(
> +    MainChannel *main_chan, int port, int sport,
> +    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
> +    uint8_t *cert_pub_key)
> +{
> +    MigrateBeginPipeItem *item = spice_malloc(sizeof(MigrateBeginPipeItem));
>
> -        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
> -                                                         outgoing->vec_buf,
> -                                                         REDS_MAX_SEND_IOVEC, 0);
> -        main_channel_send_data(main_chan);
> -    }
> +    red_channel_pipe_item_init(&main_chan->base,&item->base,
> +                               SPICE_MSG_MAIN_MIGRATE_BEGIN);
> +    item->port = port;
> +    item->sport = sport;
> +    item->host = host;
> +    item->cert_pub_key_type = cert_pub_key_type;
> +    item->cert_pub_key_len = cert_pub_key_len;
> +    item->cert_pub_key = cert_pub_key;
> +    return item;
>   }
>
> -static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem *item)
> +static MultiMediaTimePipeItem *main_multi_media_time_item_new(
> +    MainChannel *main_chan, int time)
>   {
> -    ring_add(&main_chan->outgoing.pipe,&item->link);
> -    main_channel_push(main_chan);
> +    MultiMediaTimePipeItem *item;
> +
> +    item = spice_malloc(sizeof(MultiMediaTimePipeItem));
> +    red_channel_pipe_item_init(&main_chan->base,&item->base,
> +                               SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
> +    item->time = time;
> +    return item;
>   }
>
>   static void main_channel_push_channels(MainChannel *main_chan)
>   {
> -    SpiceMsgChannels* channels_info;
>       RedsOutItem *item;
>
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
> -    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds_num_of_channels() * sizeof(SpiceChannelId));
> +    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_channels(MainChannel *main_chan)
> +{
> +    SpiceMsgChannels* channels_info;
> +
> +    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels)
> +                            + reds_num_of_channels() * sizeof(SpiceChannelId));
>       reds_fill_channels(channels_info);
> -    spice_marshall_msg_main_channels_list(item->m, channels_info);
> +    spice_marshall_msg_main_channels_list(
> +        main_chan->base.send_data.marshaller, channels_info);
>       free(channels_info);
> -    main_channel_push_pipe_item(main_chan, item);
>   }
>
>   int main_channel_push_ping(Channel *channel, int size)
>   {
> -    struct timespec time_space;
> -    RedsOutItem *item;
> -    SpiceMsgPing ping;
>       MainChannel *main_chan = channel->data;
> -
> -    if (!main_chan) {
> +    PingPipeItem *item;
> +
> +    if (main_chan == NULL) {
>           return FALSE;
>       }
> -    item = new_out_item(main_chan, SPICE_MSG_PING);
> +    item = main_ping_item_new(main_chan, size);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +    return TRUE;
> +}
> +
> +static void main_channel_marshall_ping(MainChannel *main_chan, int size)
> +{
> +    struct timespec time_space;
> +    SpiceMsgPing ping;
> +    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
> +
>       ping.id = ++main_chan->ping_id;
>       clock_gettime(CLOCK_MONOTONIC,&time_space);
>       ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
> -    spice_marshall_msg_ping(item->m,&ping);
> +    spice_marshall_msg_ping(m,&ping);
>
>       while (size>  0) {
>           int now = MIN(ZERO_BUF_SIZE, size);
>           size -= now;
> -        spice_marshaller_add_ref(item->m, zero_page, now);
> +        spice_marshaller_add_ref(m, zero_page, now);
>       }
> +}
>
> -    main_channel_push_pipe_item(main_chan, item);
> +void main_channel_push_mouse_mode(Channel *channel, int current_mode,
> +                                  int is_client_mouse_allowed)
> +{
> +    MainChannel *main_chan = channel->data;
> +    MouseModePipeItem *item;
>
> -    return TRUE;
> +    item = main_mouse_mode_item_new(main_chan, current_mode,
> +                                    is_client_mouse_allowed);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
>   }
>
> -void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed)
> +static void main_channel_marshall_mouse_mode(MainChannel *main_chan, int current_mode, int is_client_mouse_allowed)
>   {
>       SpiceMsgMainMouseMode mouse_mode;
> -    RedsOutItem *item;
> -    MainChannel *main_chan;
> -
> -    if (!channel) {
> -        return;
> -    }
> -    main_chan = channel->data;
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
>       mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
>       if (is_client_mouse_allowed) {
>           mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
>       }
>       mouse_mode.current_mode = current_mode;
> -
> -    spice_marshall_msg_main_mouse_mode(item->m,&mouse_mode);
> -
> -    main_channel_push_pipe_item(main_chan, item);
> +    spice_marshall_msg_main_mouse_mode(main_chan->base.send_data.marshaller,
> +&mouse_mode);
>   }
>
>   void main_channel_push_agent_connected(Channel *channel)
> @@ -383,186 +351,319 @@ void main_channel_push_agent_connected(Channel *channel)
>       RedsOutItem *item;
>       MainChannel *main_chan = channel->data;
>
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
> -    main_channel_push_pipe_item(main_chan, item);
> +    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
>   }
>
>   void main_channel_push_agent_disconnected(Channel *channel)
>   {
> -    SpiceMsgMainAgentDisconnect disconnect;
>       RedsOutItem *item;
>       MainChannel *main_chan = channel->data;
>
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
> +    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_agent_disconnected(MainChannel *main_chan)
> +{
> +    SpiceMsgMainAgentDisconnect disconnect;
> +
>       disconnect.error_code = SPICE_LINK_ERR_OK;
> -    spice_marshall_msg_main_agent_disconnected(item->m,&disconnect);
> -    main_channel_push_pipe_item(main_chan, item);
> +    spice_marshall_msg_main_agent_disconnected(
> +        main_chan->base.send_data.marshaller,&disconnect);
>   }
>
>   void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
>   {
> -    SpiceMsgMainAgentTokens tokens;
> -    RedsOutItem *item;
>       MainChannel *main_chan = channel->data;
> +    TokensPipeItem *item = main_tokens_item_new(main_chan, num_tokens);
> +
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_tokens(MainChannel *main_chan, uint32_t num_tokens)
> +{
> +    SpiceMsgMainAgentTokens tokens;
>
> -    if (!main_chan) {
> -        return;
> -    }
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
>       tokens.num_tokens = num_tokens;
> -    spice_marshall_msg_main_agent_token(item->m,&tokens);
> -    main_channel_push_pipe_item(main_chan, item);
> +    spice_marshall_msg_main_agent_token(
> +        main_chan->base.send_data.marshaller,&tokens);
>   }
>
>   void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
>              spice_marshaller_item_free_func free_data, void *opaque)
>   {
> -    RedsOutItem *item;
>       MainChannel *main_chan = channel->data;
> +    AgentDataPipeItem *item;
> +
> +    item = main_agent_data_item_new(main_chan, data, len, free_data, opaque);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
>
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
> -    spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
> -    main_channel_push_pipe_item(main_chan, item);
> +static void main_channel_marshall_agent_data(MainChannel *main_chan,
> +                                  AgentDataPipeItem *item)
> +{
> +    spice_marshaller_add_ref_full(main_chan->base.send_data.marshaller,
> +        item->data, item->len, item->free_data, item->opaque);
>   }
>
>   static void main_channel_push_migrate_data_item(MainChannel *main_chan)
>   {
> -    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
> -    SpiceMarshaller *m = item->m;
> +    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE_DATA);
> +
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_migrate_data_item(MainChannel *main_chan)
> +{
> +    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
>       MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
>
>       reds_marshall_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
> -    data->serial = main_chan->serial;
> +    data->serial = red_channel_get_message_serial(&main_chan->base);
>       data->ping_id = main_chan->ping_id;
> -    main_channel_push_pipe_item(main_chan, item);
>   }
>
> -static void main_channel_receive_migrate_data(MainChannel *main_chan, MainMigrateData *data, uint8_t *end)
> +static void main_channel_receive_migrate_data(MainChannel *main_chan,
> +                                  MainMigrateData *data, uint8_t *end)
>   {
> -    main_chan->serial = data->serial;
> +    red_channel_set_message_serial(&main_chan->base, data->serial);
>       main_chan->ping_id = data->ping_id;
>   }
>
> -void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
> -    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
> +void main_channel_push_init(Channel *channel, int connection_id,
> +    int display_channels_hint, int current_mouse_mode,
> +    int is_client_mouse_allowed, int multi_media_time,
>       int ram_hint)
>   {
> -    RedsOutItem *item;
> -    SpiceMsgMainInit init;
> +    InitPipeItem *item;
>       MainChannel *main_chan = channel->data;
>
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
> -    init.session_id = connection_id;
> -    init.display_channels_hint = display_channels_hint;
> -    init.current_mouse_mode = current_mouse_mode;
> +    item = main_init_item_new(main_chan,
> +             connection_id, display_channels_hint, current_mouse_mode,
> +             is_client_mouse_allowed, multi_media_time, ram_hint);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_init(MainChannel *main_chan,
> +                                       InitPipeItem *item)
> +{
> +    SpiceMsgMainInit init;
> +
> +    init.session_id = item->connection_id;
> +    init.display_channels_hint = item->display_channels_hint;
> +    init.current_mouse_mode = item->current_mouse_mode;
>       init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
> -    if (is_client_mouse_allowed) {
> +    if (item->is_client_mouse_allowed) {
>           init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
>       }
>       init.agent_connected = reds_has_vdagent();
>       init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
> -    init.multi_media_time = multi_media_time;
> -    init.ram_hint = ram_hint;
> -    spice_marshall_msg_main_init(item->m,&init);
> -    main_channel_push_pipe_item(main_chan, item);
> +    init.multi_media_time = item->multi_media_time;
> +    init.ram_hint = item->ram_hint;
> +    spice_marshall_msg_main_init(main_chan->base.send_data.marshaller,&init);
>   }
>
>   void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len)
>   {
> -    // TODO possible free-then-use bug - caller frees mess after this, but is that pointer being
> -    // used by spice_marshaller?
> -    RedsOutItem *item;
> -    SpiceMsgNotify notify;
>       MainChannel *main_chan = channel->data;
> +    NotifyPipeItem *item = main_notify_item_new(main_chan, mess, mess_len);
> +
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
>
> -    item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
> -    notify.time_stamp = get_time_stamp();
> +static void main_channel_marshall_notify(MainChannel *main_chan, NotifyPipeItem *item)
> +{
> +    SpiceMsgNotify notify;
> +    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
> +
> +    notify.time_stamp = get_time_stamp(); // TODO - move to main_new_notify_item
>       notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
>       notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
>       notify.what = SPICE_WARN_GENERAL;
> -    notify.message_len = mess_len;
> -    spice_marshall_msg_notify(item->m,&notify);
> -    spice_marshaller_add(item->m, mess, mess_len + 1);
> -    main_channel_push_pipe_item(main_chan, item);
> +    notify.message_len = item->mess_len;
> +    spice_marshall_msg_notify(m,&notify);
> +    spice_marshaller_add(m, item->mess, item->mess_len + 1);
>   }
>
> -void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
> -    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key)
> +void main_channel_push_migrate_begin(Channel *channel, int port, int sport,
> +    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
> +    uint8_t *cert_pub_key)
>   {
>       MainChannel *main_chan = channel->data;
> -    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
> +    MigrateBeginPipeItem *item = main_migrate_begin_item_new(main_chan, port,
> +        sport, host, cert_pub_key_type, cert_pub_key_len, cert_pub_key);
> +
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_migrate_begin(MainChannel *main_chan,
> +    MigrateBeginPipeItem *item)
> +{
>       SpiceMsgMainMigrationBegin migrate;
>
> -    migrate.port = port;
> -    migrate.sport = sport;
> -    migrate.host_size = strlen(host) + 1;
> -    migrate.host_data = (uint8_t *)host;
> -    migrate.pub_key_type = cert_pub_key_type;
> -    migrate.pub_key_size = cert_pub_key_len;
> -    migrate.pub_key_data = cert_pub_key;
> -    spice_marshall_msg_main_migrate_begin(item->m,&migrate);
> -    main_channel_push_pipe_item(main_chan, item);
> +    migrate.port = item->port;
> +    migrate.sport = item->sport;
> +    migrate.host_size = strlen(item->host) + 1;
> +    migrate.host_data = (uint8_t *)item->host;
> +    migrate.pub_key_type = item->cert_pub_key_type;
> +    migrate.pub_key_size = item->cert_pub_key_len;
> +    migrate.pub_key_data = item->cert_pub_key;
> +    spice_marshall_msg_main_migrate_begin(main_chan->base.send_data.marshaller,
> +&migrate);
>   }
>
>   void main_channel_push_migrate(Channel *channel)
>   {
> -    RedsOutItem *item;
> -    SpiceMsgMigrate migrate;
>       MainChannel *main_chan = channel->data;
> +    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE);
> +
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static void main_channel_marshall_migrate(MainChannel *main_chan)
> +{
> +    SpiceMsgMigrate migrate;
>
> -    item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
>       migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
> -    spice_marshall_msg_migrate(item->m,&migrate);
> -    main_channel_push_pipe_item(main_chan, item);
> +    spice_marshall_msg_migrate(main_chan->base.send_data.marshaller,&migrate);
>   }
>
>   void main_channel_push_migrate_cancel(Channel *channel)
>   {
>       MainChannel *main_chan = channel->data;
> -    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
> +    RedsOutItem *item = main_pipe_item_new(main_chan,
> +                                           SPICE_MSG_MAIN_MIGRATE_CANCEL);
>
> -    main_channel_push_pipe_item(main_chan, item);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
>   }
>
>   void main_channel_push_multi_media_time(Channel *channel, int time)
>   {
> -    SpiceMsgMainMultiMediaTime time_mes;
> -    RedsOutItem *item;
>       MainChannel *main_chan = channel->data;
>
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
> -    time_mes.time = time;
> -    spice_marshall_msg_main_multi_media_time(item->m,&time_mes);
> -    main_channel_push_pipe_item(main_chan, item);
> +    MultiMediaTimePipeItem *item =
> +        main_multi_media_time_item_new(main_chan, time);
> +    red_channel_pipe_add(&main_chan->base,&item->base);
> +}
> +
> +static PipeItem *main_migrate_switch_item_new(MainChannel *main_chan)
> +{
> +    PipeItem *item = spice_malloc(sizeof(*item));
> +
> +    red_channel_pipe_item_init(&main_chan->base, item,
> +                               SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
> +    return item;
>   }
>
>   void main_channel_push_migrate_switch(Channel *channel)
>   {
> +    MainChannel *main_chan = channel->data;
> +
> +    red_channel_pipe_add(&main_chan->base,
> +        main_migrate_switch_item_new(main_chan));
> +}
> +
> +static void main_channel_marshall_migrate_switch(MainChannel *main_chan)
> +{
>       SpiceMsgMainMigrationSwitchHost migrate;
> -    RedsOutItem *item;
> -    MainChannel *main_chan;
> -
> -    if (!channel) {
> -        return;
> -    }
> -    main_chan = channel->data;
> +
>       red_printf("");
> -    item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
> +
>       reds_fill_mig_switch(&migrate);
> -    spice_marshall_msg_main_migrate_switch_host(item->m,&migrate);
> -    main_channel_push_pipe_item(main_chan, item);
> +    spice_marshall_msg_main_migrate_switch_host(
> +        main_chan->base.send_data.marshaller,&migrate);
> +
>       reds_mig_release();
>   }
>
> -static void main_channel_handle_message(void *opaque, size_t size, uint32_t type, void *message)
> +static void main_channel_marshall_multi_media_time(MainChannel *main_chan,
> +    MultiMediaTimePipeItem *item)
> +{
> +    SpiceMsgMainMultiMediaTime time_mes;
> +
> +    time_mes.time = item->time;
> +    spice_marshall_msg_main_multi_media_time(
> +        main_chan->base.send_data.marshaller,&time_mes);
> +}
> +
> +static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> +{
> +    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> +
> +    red_channel_reset_send_data(channel);
> +    red_channel_init_send_data(channel, base->type, base);
> +    switch (base->type) {
> +        case SPICE_MSG_MAIN_CHANNELS_LIST:
> +            main_channel_marshall_channels(main_chan);
> +            break;
> +        case SPICE_MSG_PING:
> +            main_channel_marshall_ping(main_chan,
> +                SPICE_CONTAINEROF(base, PingPipeItem, base)->size);
> +            break;
> +        case SPICE_MSG_MAIN_MOUSE_MODE:
> +            {
> +                MouseModePipeItem *item =
> +                    SPICE_CONTAINEROF(base, MouseModePipeItem, base);
> +                main_channel_marshall_mouse_mode(main_chan,
> +                    item->current_mode, item->is_client_mouse_allowed);
> +                break;
> +            }
> +        case SPICE_MSG_MAIN_AGENT_DISCONNECTED:
> +            main_channel_marshall_agent_disconnected(main_chan);
> +            break;
> +        case SPICE_MSG_MAIN_AGENT_TOKEN:
> +            main_channel_marshall_tokens(main_chan,
> +                SPICE_CONTAINEROF(base, TokensPipeItem, base)->tokens);
> +            break;
> +        case SPICE_MSG_MAIN_AGENT_DATA:
> +            main_channel_marshall_agent_data(main_chan,
> +                SPICE_CONTAINEROF(base, AgentDataPipeItem, base));
> +            break;
> +        case SPICE_MSG_MIGRATE_DATA:
> +            main_channel_marshall_migrate_data_item(main_chan);
> +            break;
> +        case SPICE_MSG_MAIN_INIT:
> +            main_channel_marshall_init(main_chan,
> +                SPICE_CONTAINEROF(base, InitPipeItem, base));
> +            break;
> +        case SPICE_MSG_NOTIFY:
> +            main_channel_marshall_notify(main_chan,
> +                SPICE_CONTAINEROF(base, NotifyPipeItem, base));
> +            break;
> +        case SPICE_MSG_MIGRATE:
> +            main_channel_marshall_migrate(main_chan);
> +            break;
> +        case SPICE_MSG_MAIN_MIGRATE_BEGIN:
> +            main_channel_marshall_migrate_begin(main_chan,
> +                SPICE_CONTAINEROF(base, MigrateBeginPipeItem, base));
> +            break;
> +        case SPICE_MSG_MAIN_MULTI_MEDIA_TIME:
> +            main_channel_marshall_multi_media_time(main_chan,
> +                SPICE_CONTAINEROF(base, MultiMediaTimePipeItem, base));
> +            break;
> +        case SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST:
> +            main_channel_marshall_migrate_switch(main_chan);
> +            break;
> +    };
> +    red_channel_begin_send_message(channel);
> +}
> +
> +static void main_channel_release_pipe_item(RedChannel *channel,
> +    PipeItem *base, int item_pushed)
> +{
> +    free(base);
> +}
> +
> +static int main_channel_handle_parsed(RedChannel *channel, size_t size, uint32_t type, void *message)
>   {
> -    MainChannel *main_chan = opaque;
> +    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
>
>       switch (type) {
>       case SPICE_MSGC_MAIN_AGENT_START:
>           red_printf("agent start");
>           if (!main_chan) {
> -            return;
> +            return FALSE;
>           }
>           reds_on_main_agent_start(main_chan);
>           break;
> @@ -650,28 +751,29 @@ static void main_channel_handle_message(void *opaque, size_t size, uint32_t type
>       default:
>           red_printf("unexpected type %d", type);
>       }
> +    return TRUE;
>   }
>
> -static void main_channel_event(int fd, int event, void *data)
> +static void main_channel_on_error(RedChannel *channel)
>   {
> -    MainChannel *main_chan = data;
> +    reds_disconnect();
> +}
>
> -    if (event&  SPICE_WATCH_EVENT_READ) {
> -        if (handle_incoming(main_chan->peer,&main_chan->in_handler)) {
> -            main_disconnect(main_chan);
> -            reds_disconnect();
> -        }
> -    }
> -    if (event&  SPICE_WATCH_EVENT_WRITE) {
> -        RedsOutgoingData *outgoing =&main_chan->outgoing;
> -        if (main_channel_send_data(main_chan)) {
> -            main_channel_push(main_chan);
> -            if (!outgoing->item&&  main_chan->peer) {
> -                core->watch_update_mask(main_chan->peer->watch,
> -                                        SPICE_WATCH_EVENT_READ);
> -            }
> -        }
> -    }
> +static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> +{
> +    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> +
> +    return main_chan->recv_buf;
> +}
> +
> +static void main_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> +                                               uint8_t *msg)
> +{
> +}
> +
> +static int main_channel_config_socket(RedChannel *channel)
> +{
> +    return TRUE;
>   }
>
>   static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
> @@ -679,41 +781,44 @@ static void main_channel_link(Channel *channel, RedsStreamContext *peer, int mig
>                           uint32_t *caps)
>   {
>       MainChannel *main_chan;
> -
> -    main_chan = spice_malloc0(sizeof(MainChannel));
> +    red_printf("");
> +    ASSERT(channel->data == NULL);
> +
> +    main_chan = (MainChannel*)red_channel_create_parser(
> +        sizeof(*main_chan), peer, core, migration, FALSE /* handle_acks */
> +        ,main_channel_config_socket
> +        ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
> +        ,main_channel_handle_parsed
> +        ,main_channel_alloc_msg_rcv_buf
> +        ,main_channel_release_msg_rcv_buf
> +        ,main_channel_send_item
> +        ,main_channel_release_pipe_item
> +        ,main_channel_on_error
> +        ,main_channel_on_error);
> +    ASSERT(main_chan);
>       channel->data = main_chan;
> -    main_chan->peer = peer;
> -    main_chan->in_handler.shut = FALSE;
> -    main_chan->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
> -    main_chan->in_handler.opaque = main_chan;
> -    main_chan->in_handler.handle_message = main_channel_handle_message;
> -    ring_init(&main_chan->outgoing.pipe);
> -    main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
> -    peer->watch = core->watch_add(peer->socket,
> -                                  SPICE_WATCH_EVENT_READ,
> -                                  main_channel_event, main_chan);
>   }
>
>   int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
>   {
>       MainChannel *main_chan = channel->data;
>
> -    return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
> +    return main_chan ? getsockname(main_chan->base.peer->socket, sa, salen) : -1;
>   }
>
>   int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen)
>   {
>       MainChannel *main_chan = channel->data;
>
> -    return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
> +    return main_chan ? getpeername(main_chan->base.peer->socket, sa, salen) : -1;
>   }
>
>   void main_channel_close(Channel *channel)
>   {
>       MainChannel *main_chan = channel->data;
>
> -    if (main_chan&&  main_chan->peer) {
> -        close(main_chan->peer->socket);
> +    if (main_chan&&  main_chan->base.peer) {
> +        close(main_chan->base.peer->socket);
>       }
>   }
>
> @@ -722,9 +827,8 @@ static void main_channel_shutdown(Channel *channel)
>       MainChannel *main_chan = channel->data;
>
>       if (main_chan != NULL) {
> -        main_disconnect(main_chan); // TODO - really here? reset peer etc.
> +        main_disconnect(main_chan);
>       }
> -    free(main_chan);
>   }
>
>   static void main_channel_migrate()


More information about the Spice-devel mailing list