[Spice-devel] [PATCH 01/10] server: split main_channel from reds

Hans de Goede hdegoede at redhat.com
Wed Jan 12 23:43:35 PST 2011


Ack.

On 01/13/2011 06:01 AM, Alon Levy wrote:
> ---
>   server/Makefile.am    |    1 +
>   server/main_channel.c |  745 +++++++++++++++++++++++++++++++++++++++
>   server/main_channel.h |   78 +++++
>   server/reds.c         |  917 +++++++++++--------------------------------------
>   server/reds.h         |   22 ++
>   5 files changed, 1044 insertions(+), 719 deletions(-)
>   create mode 100644 server/main_channel.c
>   create mode 100644 server/main_channel.h
>
> diff --git a/server/Makefile.am b/server/Makefile.am
> index ab66ba7..d265bfb 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -114,6 +114,7 @@ libspice_server_la_SOURCES =			\
>   	red_parse_qxl.c				\
>   	red_parse_qxl.h				\
>   	reds.c					\
> +	main_channel.c				\
>   	inputs_channel.c			\
>   	reds.h					\
>   	stat.h					\
> diff --git a/server/main_channel.c b/server/main_channel.c
> new file mode 100644
> index 0000000..7dff2b4
> --- /dev/null
> +++ b/server/main_channel.c
> @@ -0,0 +1,745 @@
> +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
> +/*
> +   Copyright (C) 2009 Red Hat, Inc.
> +
> +   This library is free software; you can redistribute it and/or
> +   modify it under the terms of the GNU Lesser General Public
> +   License as published by the Free Software Foundation; either
> +   version 2.1 of the License, or (at your option) any later version.
> +
> +   This library is distributed in the hope that it will be useful,
> +   but WITHOUT ANY WARRANTY; without even the implied warranty of
> +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +   Lesser General Public License for more details.
> +
> +   You should have received a copy of the GNU Lesser General Public
> +   License along with this library; if not, see<http://www.gnu.org/licenses/>.
> +*/
> +
> +#include<stdint.h>
> +#include<stdio.h>
> +#include<unistd.h>
> +#include<sys/socket.h>
> +#include<netinet/in.h>
> +#include<netinet/tcp.h>
> +#include<arpa/inet.h>
> +#include<netdb.h>
> +#include<limits.h>
> +#include<time.h>
> +#include<pthread.h>
> +#include<sys/mman.h>
> +#include<fcntl.h>
> +#include<errno.h>
> +#include<ctype.h>
> +
> +#include "server/red_common.h"
> +#include "server/demarshallers.h"
> +#include "common/ring.h"
> +#include "common/messages.h"
> +#include "reds.h"
> +#include "main_channel.h"
> +#include "generated_marshallers.h"
> +
> +#define ZERO_BUF_SIZE 4096
> +
> +// approximate max receive message size for main channel
> +#define RECEIVE_BUF_SIZE \
> +    (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
> +
> +#define REDS_MAX_SEND_IOVEC 100
> +
> +#define NET_TEST_WARMUP_BYTES 0
> +#define NET_TEST_BYTES (1024 * 250)
> +
> +static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
> +
> +typedef struct RedsOutItem RedsOutItem;
> +struct RedsOutItem {
> +    RingItem link;
> +    SpiceMarshaller *m;
> +    SpiceDataHeader *header;
> +};
> +
> +typedef struct RedsOutgoingData {
> +    Ring pipe;
> +    RedsOutItem *item;
> +    int vec_size;
> +    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
> +    struct iovec *vec;
> +} RedsOutgoingData;
> +
> +// TODO - remove and use red_channel.h
> +typedef struct IncomingHandler {
> +    spice_parse_channel_func_t parser;
> +    void *opaque;
> +    int shut;
> +    uint8_t buf[RECEIVE_BUF_SIZE];
> +    uint32_t end_pos;
> +    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
> +} IncomingHandler;
> +
> +typedef struct MainChannel {
> +    RedsStreamContext *peer;
> +    IncomingHandler in_handler;
> +    RedsOutgoingData outgoing;
> +    uint64_t serial; //migrate me
> +    uint32_t ping_id;
> +    uint32_t net_test_id;
> +    int net_test_stage;
> +} MainChannel;
> +
> +enum NetTestStage {
> +    NET_TEST_STAGE_INVALID,
> +    NET_TEST_STAGE_WARMUP,
> +    NET_TEST_STAGE_LATENCY,
> +    NET_TEST_STAGE_RATE,
> +};
> +
> +static uint64_t latency = 0;
> +uint64_t bitrate_per_sec = ~0;
> +
> +static void main_channel_out_item_free(RedsOutItem *item);
> +
> +static void main_reset_outgoing(MainChannel *main_chan)
> +{
> +    RedsOutgoingData *outgoing =&main_chan->outgoing;
> +    RingItem *ring_item;
> +
> +    if (outgoing->item) {
> +        main_channel_out_item_free(outgoing->item);
> +        outgoing->item = NULL;
> +    }
> +    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
> +        RedsOutItem *out_item = (RedsOutItem *)ring_item;
> +        ring_remove(ring_item);
> +        main_channel_out_item_free(out_item);
> +    }
> +    outgoing->vec_size = 0;
> +    outgoing->vec = outgoing->vec_buf;
> +}
> +
> +// ALON from reds_disconnect
> +static void main_disconnect(MainChannel *main_chan)
> +{
> +    if (!main_chan || !main_chan->peer) {
> +        return;
> +    }
> +    main_reset_outgoing(main_chan);
> +    core->watch_remove(main_chan->peer->watch);
> +    main_chan->peer->watch = NULL;
> +    main_chan->peer->cb_free(main_chan->peer);
> +    main_chan->peer = NULL;
> +    main_chan->in_handler.shut = TRUE;
> +    main_chan->serial = 0;
> +    main_chan->ping_id = 0;
> +    main_chan->net_test_id = 0;
> +    main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
> +    main_chan->in_handler.end_pos = 0;
> +
> +    // TODO: Should probably reset these on the ping start, not here
> +    latency = 0;
> +    bitrate_per_sec = ~0;
> +}
> +
> +void main_channel_start_net_test(Channel *channel)
> +{
> +    MainChannel *main_chan = channel->data;
> +
> +    if (!main_chan || main_chan->net_test_id) {
> +        return;
> +    }
> +
> +    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)&&
> +                            main_channel_push_ping(channel, 0)&&
> +                            main_channel_push_ping(channel, NET_TEST_BYTES)) {
> +        main_chan->net_test_id = main_chan->ping_id - 2;
> +        main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
> +    }
> +}
> +
> +static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
> +{
> +    for (;;) {
> +        uint8_t *buf = handler->buf;
> +        uint32_t pos = handler->end_pos;
> +        uint8_t *end = buf + pos;
> +        SpiceDataHeader *header;
> +        int n;
> +        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
> +        if (n<= 0) {
> +            if (n == 0) {
> +                return -1;
> +            }
> +            switch (errno) {
> +            case EAGAIN:
> +                return 0;
> +            case EINTR:
> +                break;
> +            case EPIPE:
> +                return -1;
> +            default:
> +                red_printf("%s", strerror(errno));
> +                return -1;
> +            }
> +        } else {
> +            pos += n;
> +            end = buf + pos;
> +            while (buf + sizeof(SpiceDataHeader)<= end&&
> +                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size<= end) {
> +                uint8_t *data = (uint8_t *)(header+1);
> +                size_t parsed_size;
> +                uint8_t *parsed;
> +                message_destructor_t parsed_free;
> +
> +                buf += sizeof(SpiceDataHeader) + header->size;
> +                parsed = handler->parser(data, data + header->size, header->type,
> +                                         SPICE_VERSION_MINOR,&parsed_size,&parsed_free);
> +                if (parsed == NULL) {
> +                    red_printf("failed to parse message type %d", header->type);
> +                    return -1;
> +                }
> +                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
> +                parsed_free(parsed);
> +                if (handler->shut) {
> +                    return -1;
> +                }
> +            }
> +            memmove(handler->buf, buf, (handler->end_pos = end - buf));
> +        }
> +    }
> +}
> +
> +static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
> +{
> +    RedsOutItem *item;
> +
> +    item = spice_new(RedsOutItem, 1);
> +    ring_item_init(&item->link);
> +
> +    item->m = spice_marshaller_new();
> +    item->header = (SpiceDataHeader *)
> +        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
> +    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
> +
> +    item->header->serial = ++main_chan->serial;
> +    item->header->type = type;
> +    item->header->sub_list = 0;
> +
> +    return item;
> +}
> +
> +static void main_channel_out_item_free(RedsOutItem *item)
> +{
> +    spice_marshaller_destroy(item->m);
> +    free(item);
> +}
> +
> +static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int *vec_size)
> +{
> +    struct iovec *now = vec;
> +
> +    while (skip&&  skip>= now->iov_len) {
> +        skip -= now->iov_len;
> +        --*vec_size;
> +        now++;
> +    }
> +    now->iov_base = (uint8_t *)now->iov_base + skip;
> +    now->iov_len -= skip;
> +    return now;
> +}
> +
> +static int main_channel_send_data(MainChannel *main_chan)
> +{
> +    RedsOutgoingData *outgoing =&main_chan->outgoing;
> +    int n;
> +
> +    if (!outgoing->item) {
> +        return TRUE;
> +    }
> +
> +    ASSERT(outgoing->vec_size);
> +    for (;;) {
> +        if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
> +            switch (errno) {
> +            case EAGAIN:
> +                core->watch_update_mask(main_chan->peer->watch,
> +                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
> +                return FALSE;
> +            case EINTR:
> +                break;
> +            case EPIPE:
> +                reds_disconnect();
> +                return FALSE;
> +            default:
> +                red_printf("%s", strerror(errno));
> +                reds_disconnect();
> +                return FALSE;
> +            }
> +        } else {
> +            outgoing->vec = main_channel_iovec_skip(outgoing->vec, n,&outgoing->vec_size);
> +            if (!outgoing->vec_size) {
> +                main_channel_out_item_free(outgoing->item);
> +                outgoing->item = NULL;
> +                outgoing->vec = outgoing->vec_buf;
> +                return TRUE;
> +            }
> +        }
> +    }
> +}
> +
> +static void main_channel_push(MainChannel *main_chan)
> +{
> +    RedsOutgoingData *outgoing =&main_chan->outgoing;
> +    RingItem *ring_item;
> +    RedsOutItem *item;
> +
> +    for (;;) {
> +        if (!main_chan->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
> +            return;
> +        }
> +        ring_remove(ring_item);
> +        outgoing->item = item = (RedsOutItem *)ring_item;
> +
> +        spice_marshaller_flush(item->m);
> +        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
> +
> +        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
> +                                                         outgoing->vec_buf,
> +                                                         REDS_MAX_SEND_IOVEC, 0);
> +        main_channel_send_data(main_chan);
> +    }
> +}
> +
> +static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem *item)
> +{
> +    ring_add(&main_chan->outgoing.pipe,&item->link);
> +    main_channel_push(main_chan);
> +}
> +
> +static void main_channel_push_channels(MainChannel *main_chan)
> +{
> +    SpiceMsgChannels* channels_info;
> +    RedsOutItem *item;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
> +    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds_num_of_channels() * sizeof(SpiceChannelId));
> +    reds_fill_channels(channels_info);
> +    spice_marshall_msg_main_channels_list(item->m, channels_info);
> +    free(channels_info);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +int main_channel_push_ping(Channel *channel, int size)
> +{
> +    struct timespec time_space;
> +    RedsOutItem *item;
> +    SpiceMsgPing ping;
> +    MainChannel *main_chan = channel->data;
> +
> +    if (!main_chan) {
> +        return FALSE;
> +    }
> +    item = new_out_item(main_chan, SPICE_MSG_PING);
> +    ping.id = ++main_chan->ping_id;
> +    clock_gettime(CLOCK_MONOTONIC,&time_space);
> +    ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
> +    spice_marshall_msg_ping(item->m,&ping);
> +
> +    while (size>  0) {
> +        int now = MIN(ZERO_BUF_SIZE, size);
> +        size -= now;
> +        spice_marshaller_add_ref(item->m, zero_page, now);
> +    }
> +
> +    main_channel_push_pipe_item(main_chan, item);
> +
> +    return TRUE;
> +}
> +
> +void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed)
> +{
> +    SpiceMsgMainMouseMode mouse_mode;
> +    RedsOutItem *item;
> +    MainChannel *main_chan;
> +
> +    if (!channel) {
> +        return;
> +    }
> +    main_chan = channel->data;
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
> +    mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
> +    if (is_client_mouse_allowed) {
> +        mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
> +    }
> +    mouse_mode.current_mode = current_mode;
> +
> +    spice_marshall_msg_main_mouse_mode(item->m,&mouse_mode);
> +
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_agent_connected(Channel *channel)
> +{
> +    RedsOutItem *item;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_agent_disconnected(Channel *channel)
> +{
> +    SpiceMsgMainAgentDisconnect disconnect;
> +    RedsOutItem *item;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
> +    disconnect.error_code = SPICE_LINK_ERR_OK;
> +    spice_marshall_msg_main_agent_disconnected(item->m,&disconnect);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
> +{
> +    SpiceMsgMainAgentTokens tokens;
> +    RedsOutItem *item;
> +    MainChannel *main_chan = channel->data;
> +
> +    if (!main_chan) {
> +        return;
> +    }
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
> +    tokens.num_tokens = num_tokens;
> +    spice_marshall_msg_main_agent_token(item->m,&tokens);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
> +           spice_marshaller_item_free_func free_data, void *opaque)
> +{
> +    RedsOutItem *item;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
> +    spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +static void main_channel_push_migrate_data_item(MainChannel *main_chan)
> +{
> +    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
> +    SpiceMarshaller *m = item->m;
> +    MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
> +
> +    reds_push_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
> +    data->serial = main_chan->serial;
> +    data->ping_id = main_chan->ping_id;
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +static void main_channel_receive_migrate_data(MainChannel *main_chan, MainMigrateData *data, uint8_t *end)
> +{
> +    main_chan->serial = data->serial;
> +    main_chan->ping_id = data->ping_id;
> +}
> +
> +void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
> +    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
> +    int ram_hint)
> +{
> +    RedsOutItem *item;
> +    SpiceMsgMainInit init;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
> +    init.session_id = connection_id;
> +    init.display_channels_hint = display_channels_hint;
> +    init.current_mouse_mode = current_mouse_mode;
> +    init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
> +    if (is_client_mouse_allowed) {
> +        init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
> +    }
> +    init.agent_connected = reds_has_vdagent();
> +    init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
> +    init.multi_media_time = multi_media_time;
> +    init.ram_hint = ram_hint;
> +    spice_marshall_msg_main_init(item->m,&init);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len)
> +{
> +    // TODO possible free-then-use bug - caller frees mess after this, but is that pointer being
> +    // used by spice_marshaller?
> +    RedsOutItem *item;
> +    SpiceMsgNotify notify;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
> +    notify.time_stamp = get_time_stamp();
> +    notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
> +    notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
> +    notify.what = SPICE_WARN_GENERAL;
> +    notify.message_len = mess_len;
> +    spice_marshall_msg_notify(item->m,&notify);
> +    spice_marshaller_add(item->m, mess, mess_len + 1);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
> +    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key)
> +{
> +    MainChannel *main_chan = channel->data;
> +    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
> +    SpiceMsgMainMigrationBegin migrate;
> +
> +    migrate.port = port;
> +    migrate.sport = sport;
> +    migrate.host_size = strlen(host) + 1;
> +    migrate.host_data = (uint8_t *)host;
> +    migrate.pub_key_type = cert_pub_key_type;
> +    migrate.pub_key_size = cert_pub_key_len;
> +    migrate.pub_key_data = cert_pub_key;
> +    spice_marshall_msg_main_migrate_begin(item->m,&migrate);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_migrate(Channel *channel)
> +{
> +    RedsOutItem *item;
> +    SpiceMsgMigrate migrate;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
> +    migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
> +    spice_marshall_msg_migrate(item->m,&migrate);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_migrate_cancel(Channel *channel)
> +{
> +    MainChannel *main_chan = channel->data;
> +    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
> +
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_multi_media_time(Channel *channel, int time)
> +{
> +    SpiceMsgMainMultiMediaTime time_mes;
> +    RedsOutItem *item;
> +    MainChannel *main_chan = channel->data;
> +
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
> +    time_mes.time = time;
> +    spice_marshall_msg_main_multi_media_time(item->m,&time_mes);
> +    main_channel_push_pipe_item(main_chan, item);
> +}
> +
> +void main_channel_push_migrate_switch(Channel *channel)
> +{
> +    SpiceMsgMainMigrationSwitchHost migrate;
> +    RedsOutItem *item;
> +    MainChannel *main_chan;
> +
> +    if (!channel) {
> +        return;
> +    }
> +    main_chan = channel->data;
> +    red_printf("");
> +    item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
> +    reds_fill_mig_switch(&migrate);
> +    spice_marshall_msg_main_migrate_switch_host(item->m,&migrate);
> +    main_channel_push_pipe_item(main_chan, item);
> +    reds_mig_release();
> +}
> +
> +static void main_channel_handle_message(void *opaque, size_t size, uint32_t type, void *message)
> +{
> +    MainChannel *main_chan = opaque;
> +
> +    switch (type) {
> +    case SPICE_MSGC_MAIN_AGENT_START:
> +        red_printf("agent start");
> +        if (!main_chan) {
> +            return;
> +        }
> +        reds_on_main_agent_start(main_chan);
> +        break;
> +    case SPICE_MSGC_MAIN_AGENT_DATA: {
> +        reds_on_main_agent_data(message, size);
> +        break;
> +    }
> +    case SPICE_MSGC_MAIN_AGENT_TOKEN:
> +        break;
> +    case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
> +        main_channel_push_channels(main_chan);
> +        break;
> +    case SPICE_MSGC_MAIN_MIGRATE_CONNECTED:
> +        red_printf("connected");
> +        reds_on_main_migrate_connected();
> +        break;
> +    case SPICE_MSGC_MAIN_MIGRATE_CONNECT_ERROR:
> +        red_printf("mig connect error");
> +        reds_on_main_migrate_connect_error();
> +        break;
> +    case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST:
> +        reds_on_main_mouse_mode_request(message, size);
> +        break;
> +    case SPICE_MSGC_PONG: {
> +        SpiceMsgPing *ping = (SpiceMsgPing *)message;
> +        uint64_t roundtrip;
> +        struct timespec ts;
> +
> +        clock_gettime(CLOCK_MONOTONIC,&ts);
> +        roundtrip = ts.tv_sec * 1000000LL + ts.tv_nsec / 1000LL - ping->timestamp;
> +
> +        if (ping->id == main_chan->net_test_id) {
> +            switch (main_chan->net_test_stage) {
> +            case NET_TEST_STAGE_WARMUP:
> +                main_chan->net_test_id++;
> +                main_chan->net_test_stage = NET_TEST_STAGE_LATENCY;
> +                break;
> +            case NET_TEST_STAGE_LATENCY:
> +                main_chan->net_test_id++;
> +                main_chan->net_test_stage = NET_TEST_STAGE_RATE;
> +                latency = roundtrip;
> +                break;
> +            case NET_TEST_STAGE_RATE:
> +                main_chan->net_test_id = 0;
> +                if (roundtrip<= latency) {
> +                    // probably high load on client or server result with incorrect values
> +                    latency = 0;
> +                    red_printf("net test: invalid values, latency %lu roundtrip %lu. assuming high"
> +                               "bandwidth", latency, roundtrip);
> +                    break;
> +                }
> +                bitrate_per_sec = (uint64_t)(NET_TEST_BYTES * 8) * 1000000 / (roundtrip - latency);
> +                red_printf("net test: latency %f ms, bitrate %lu bps (%f Mbps)%s",
> +                           (double)latency / 1000,
> +                           bitrate_per_sec,
> +                           (double)bitrate_per_sec / 1024 / 1024,
> +                           IS_LOW_BANDWIDTH() ? " LOW BANDWIDTH" : "");
> +                main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
> +                break;
> +            default:
> +                red_printf("invalid net test stage, ping id %d test id %d stage %d",
> +                           ping->id,
> +                           main_chan->net_test_id,
> +                           main_chan->net_test_stage);
> +            }
> +            break;
> +        }
> +#ifdef RED_STATISTICS
> +        reds_update_stat_value(roundtrip);
> +#endif
> +        break;
> +    }
> +    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
> +        main_channel_push_migrate_data_item(main_chan);
> +        break;
> +    case SPICE_MSGC_MIGRATE_DATA: {
> +            MainMigrateData *data = (MainMigrateData *)message;
> +            uint8_t *end = ((uint8_t *)message) + size;
> +            main_channel_receive_migrate_data(main_chan, data, end);
> +            reds_on_main_receive_migrate_data(data, end);
> +            break;
> +        }
> +    case SPICE_MSGC_DISCONNECTING:
> +        break;
> +    default:
> +        red_printf("unexpected type %d", type);
> +    }
> +}
> +
> +static void main_channel_event(int fd, int event, void *data)
> +{
> +    MainChannel *main_chan = data;
> +
> +    if (event&  SPICE_WATCH_EVENT_READ) {
> +        if (handle_incoming(main_chan->peer,&main_chan->in_handler)) {
> +            main_disconnect(main_chan);
> +            reds_disconnect();
> +        }
> +    }
> +    if (event&  SPICE_WATCH_EVENT_WRITE) {
> +        RedsOutgoingData *outgoing =&main_chan->outgoing;
> +        if (main_channel_send_data(main_chan)) {
> +            main_channel_push(main_chan);
> +            if (!outgoing->item&&  main_chan->peer) {
> +                core->watch_update_mask(main_chan->peer->watch,
> +                                        SPICE_WATCH_EVENT_READ);
> +            }
> +        }
> +    }
> +}
> +
> +static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
> +                        int num_common_caps, uint32_t *common_caps, int num_caps,
> +                        uint32_t *caps)
> +{
> +    MainChannel *main_chan;
> +
> +    main_chan = spice_malloc0(sizeof(MainChannel));
> +    channel->data = main_chan;
> +    main_chan->peer = peer;
> +    main_chan->in_handler.shut = FALSE;
> +    main_chan->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
> +    main_chan->in_handler.opaque = main_chan;
> +    main_chan->in_handler.handle_message = main_channel_handle_message;
> +    ring_init(&main_chan->outgoing.pipe);
> +    main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
> +    peer->watch = core->watch_add(peer->socket,
> +                                  SPICE_WATCH_EVENT_READ,
> +                                  main_channel_event, main_chan);
> +}
> +
> +int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
> +{
> +    MainChannel *main_chan = channel->data;
> +
> +    return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
> +}
> +
> +int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen)
> +{
> +    MainChannel *main_chan = channel->data;
> +
> +    return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
> +}
> +
> +void main_channel_close(Channel *channel)
> +{
> +    MainChannel *main_chan = channel->data;
> +
> +    if (main_chan&&  main_chan->peer) {
> +        close(main_chan->peer->socket);
> +    }
> +}
> +
> +static void main_channel_shutdown(Channel *channel)
> +{
> +    MainChannel *main_chan = channel->data;
> +
> +    if (main_chan != NULL) {
> +        main_disconnect(main_chan); // TODO - really here? reset peer etc.
> +    }
> +    free(main_chan);
> +}
> +
> +static void main_channel_migrate()
> +{
> +}
> +
> +Channel* main_channel_init(void)
> +{
> +    Channel *channel;
> +
> +    channel = spice_new0(Channel, 1);
> +    channel->type = SPICE_CHANNEL_MAIN;
> +    channel->link = main_channel_link;
> +    channel->shutdown = main_channel_shutdown;
> +    channel->migrate = main_channel_migrate;
> +    return channel;
> +}
> +
> diff --git a/server/main_channel.h b/server/main_channel.h
> new file mode 100644
> index 0000000..db95dc2
> --- /dev/null
> +++ b/server/main_channel.h
> @@ -0,0 +1,78 @@
> +/*
> +   Copyright (C) 2009 Red Hat, Inc.
> +
> +   This library is free software; you can redistribute it and/or
> +   modify it under the terms of the GNU Lesser General Public
> +   License as published by the Free Software Foundation; either
> +   version 2.1 of the License, or (at your option) any later version.
> +
> +   This library is distributed in the hope that it will be useful,
> +   but WITHOUT ANY WARRANTY; without even the implied warranty of
> +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +   Lesser General Public License for more details.
> +
> +   You should have received a copy of the GNU Lesser General Public
> +   License along with this library; if not, see<http://www.gnu.org/licenses/>.
> +*/
> +
> +#ifndef __MAIN_CHANNEL_H__
> +#define __MAIN_CHANNEL_H__
> +
> +#include<stdint.h>
> +#include<spice/vd_agent.h>
> +#include "common/marshaller.h"
> +
> +/* This is a temporary measure for reds/main split - should not be in a header,
> + * but private (although only reds.c includes main_channel.h) */
> +struct MainMigrateData {
> +    uint32_t version;
> +    uint32_t serial;
> +    uint32_t ping_id;
> +
> +    uint32_t agent_connected;
> +    uint32_t client_agent_started;
> +    uint32_t num_client_tokens;
> +    uint32_t send_tokens;
> +
> +    uint32_t read_state;
> +    VDIChunkHeader vdi_chunk_header;
> +    uint32_t recive_len;
> +    uint32_t message_recive_len;
> +    uint32_t read_buf_len;
> +
> +    uint32_t write_queue_size;
> +};
> +
> +Channel *main_channel_init();
> +void main_channel_close(Channel *channel); // not destroy, just socket close
> +int main_channel_push_ping(Channel *channel, int size);
> +void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed);
> +void main_channel_push_agent_connected(Channel *channel);
> +void main_channel_push_agent_disconnected(Channel *channel);
> +void main_channel_push_tokens(Channel *channel, uint32_t num_tokens);
> +void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
> +           spice_marshaller_item_free_func free_data, void *opaque);
> +void main_channel_start_net_test(Channel *channel);
> +// TODO: huge. Consider making a reds_* interface for these functions
> +// and calling from main.
> +void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
> +    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
> +    int ram_hint);
> +void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len);
> +// TODO: consider exporting RedsMigSpice from reds.c
> +void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
> +    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key);
> +void main_channel_push_migrate(Channel *channel);
> +void main_channel_push_migrate_switch(Channel *channel);
> +void main_channel_push_migrate_cancel(Channel *channel);
> +void main_channel_push_multi_media_time(Channel *channel, int time);
> +int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen);
> +int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen);
> +
> +// TODO: Defines used to calculate receive buffer size, and also by reds.c
> +// other options: is to make a reds_main_consts.h, to duplicate defines.
> +#define REDS_AGENT_WINDOW_SIZE 10
> +#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
> +
> +#endif
> +
> diff --git a/server/reds.c b/server/reds.c
> index ba6f552..d6397e5 100644
> --- a/server/reds.c
> +++ b/server/reds.c
> @@ -46,6 +46,7 @@
>   #include<spice/vd_agent.h>
>
>   #include "inputs_channel.h"
> +#include "main_channel.h"
>   #include "red_common.h"
>   #include "red_dispatcher.h"
>   #include "snd_worker.h"
> @@ -74,14 +75,8 @@ static SpiceCharDeviceInstance *vdagent = NULL;
>   #define REDS_MIG_ABORT 2
>   #define REDS_MIG_DIFF_VERSION 3
>
> -#define REDS_AGENT_WINDOW_SIZE 10
>   #define REDS_TOKENS_TO_SEND 5
> -#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
>   #define REDS_VDI_PORT_NUM_RECEIVE_BUFFS 5
> -#define REDS_MAX_SEND_IOVEC 100
> -
> -#define NET_TEST_WARMUP_BYTES 0
> -#define NET_TEST_BYTES (1024 * 250)
>
>   static int spice_port = -1;
>   static int spice_secure_port = -1;
> @@ -109,24 +104,6 @@ static void openssl_init();
>   #define MM_TIME_DELTA 400 /*ms*/
>   #define VDI_PORT_WRITE_RETRY_TIMEOUT 100 /*ms*/
>
> -// approximate max receive message size for main channel
> -#define RECEIVE_BUF_SIZE \
> -    (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
> -
> -#define SCROLL_LOCK_SCAN_CODE 0x46
> -#define NUM_LOCK_SCAN_CODE 0x45
> -#define CAPS_LOCK_SCAN_CODE 0x3a
> -
> -// TODO - remove and use red_channel.h
> -typedef struct IncomingHandler {
> -    spice_parse_channel_func_t parser;
> -    void *opaque;
> -    int shut;
> -    uint8_t buf[RECEIVE_BUF_SIZE];
> -    uint32_t end_pos;
> -    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
> -} IncomingHandler;
> -
>
>   typedef struct TicketAuthentication {
>       char password[SPICE_MAX_PASSWORD_LENGTH];
> @@ -147,13 +124,6 @@ typedef struct MonitorMode {
>       uint32_t y_res;
>   } MonitorMode;
>
> -typedef struct RedsOutItem RedsOutItem;
> -struct RedsOutItem {
> -    RingItem link;
> -    SpiceMarshaller *m;
> -    SpiceDataHeader *header;
> -};
> -
>   typedef struct VDIReadBuf {
>       RingItem link;
>       int len;
> @@ -193,21 +163,6 @@ typedef struct VDIPortState {
>       int client_agent_started;
>   } VDIPortState;
>
> -typedef struct RedsOutgoingData {
> -    Ring pipe;
> -    RedsOutItem *item;
> -    int vec_size;
> -    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
> -    struct iovec *vec;
> -} RedsOutgoingData;
> -
> -enum NetTestStage {
> -    NET_TEST_STAGE_INVALID,
> -    NET_TEST_STAGE_WARMUP,
> -    NET_TEST_STAGE_LATENCY,
> -    NET_TEST_STAGE_RATE,
> -};
> -
>   #ifdef RED_STATISTICS
>
>   #define REDS_MAX_STAT_NODES 100
> @@ -230,12 +185,11 @@ typedef struct RedsState {
>       int secure_listen_socket;
>       SpiceWatch *listen_watch;
>       SpiceWatch *secure_listen_watch;
> -    RedsStreamContext *peer;
>       int disconnecting;
> -    uint32_t link_id;
> -    uint64_t serial; //migrate me
>       VDIPortState agent_state;
>       int pending_mouse_event;
> +    uint32_t link_id;
> +    Channel *main_channel;
>
>       int mig_wait_connect;
>       int mig_wait_disconnect;
> @@ -243,8 +197,6 @@ typedef struct RedsState {
>       int mig_target;
>       RedsMigSpice *mig_spice;
>       int num_of_channels;
> -    IncomingHandler in_handler;
> -    RedsOutgoingData outgoing;
>       Channel *channels;
>       int mouse_mode;
>       int is_client_mouse_allowed;
> @@ -266,15 +218,9 @@ typedef struct RedsState {
>       SpiceTimer *ping_timer;
>       int ping_interval;
>   #endif
> -    uint32_t ping_id;
> -    uint32_t net_test_id;
> -    int net_test_stage;
>       int peer_minor_version;
>   } RedsState;
>
> -uint64_t bitrate_per_sec = ~0;
> -static uint64_t latency = 0;
> -
>   static RedsState *reds = NULL;
>
>   typedef struct AsyncRead {
> @@ -336,13 +282,6 @@ struct ChannelSecurityOptions {
>       ChannelSecurityOptions *next;
>   };
>
> -#define ZERO_BUF_SIZE 4096
> -
> -static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
> -
> -static void reds_push();
> -static void reds_out_item_free(RedsOutItem *item);
> -
>   static ChannelSecurityOptions *channels_security = NULL;
>   static int default_channel_security =
>       SPICE_CHANNEL_SECURITY_NONE | SPICE_CHANNEL_SECURITY_SSL;
> @@ -481,20 +420,6 @@ static inline void reds_release_link(RedLinkInfo *link)
>       peer->cb_free(peer);
>   }
>
> -static struct iovec *reds_iovec_skip(struct iovec vec[], int skip, int *vec_size)
> -{
> -    struct iovec *now = vec;
> -
> -    while (skip&&  skip>= now->iov_len) {
> -        skip -= now->iov_len;
> -        --*vec_size;
> -        now++;
> -    }
> -    now->iov_base = (uint8_t *)now->iov_base + skip;
> -    now->iov_len -= skip;
> -    return now;
> -}
> -
>   #ifdef RED_STATISTICS
>
>   void insert_stat_node(StatNodeRef parent, StatNodeRef ref)
> @@ -598,8 +523,10 @@ void stat_remove_counter(uint64_t *counter)
>       stat_remove((SpiceStatNode *)(counter - offsetof(SpiceStatNode, value)));
>   }
>
> -static void reds_update_stat_value(RedsStatValue* stat_value, uint32_t value)
> +void reds_update_stat_value(uint32_t value)
>   {
> +    RedsStatValue *stat_value =&reds->roundtrip_stat;
> +
>       stat_value->value = value;
>       stat_value->min = (stat_value->count ? MIN(stat_value->min, value) : value);
>       stat_value->max = MAX(stat_value->max, value);
> @@ -685,33 +612,20 @@ static void reds_reset_vdp()
>       state->client_agent_started = FALSE;
>   }
>
> -static void reds_reset_outgoing()
> +int reds_main_channel_connected()
>   {
> -    RedsOutgoingData *outgoing =&reds->outgoing;
> -    RingItem *ring_item;
> -
> -    if (outgoing->item) {
> -        reds_out_item_free(outgoing->item);
> -        outgoing->item = NULL;
> -    }
> -    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
> -        RedsOutItem *out_item = (RedsOutItem *)ring_item;
> -        ring_remove(ring_item);
> -        reds_out_item_free(out_item);
> -    }
> -    outgoing->vec_size = 0;
> -    outgoing->vec = outgoing->vec_buf;
> +    return !!reds->main_channel;
>   }
>
>   void reds_disconnect()
>   {
> -    if (!reds->peer || reds->disconnecting) {
> +    if (!reds_main_channel_connected() || reds->disconnecting) {
>           return;
>       }
>
>       red_printf("");
>       reds->disconnecting = TRUE;
> -    reds_reset_outgoing();
> +    reds->link_id = 0;
>
>       if (reds->agent_state.connected) {
>           SpiceCharDeviceInterface *sif;
> @@ -724,178 +638,32 @@ void reds_disconnect()
>       }
>
>       reds_shatdown_channels();
> -    core->watch_remove(reds->peer->watch);
> -    reds->peer->watch = NULL;
> -    reds->peer->cb_free(reds->peer);
> -    reds->peer = NULL;
> -    reds->in_handler.shut = TRUE;
> -    reds->link_id = 0;
> -    reds->serial = 0;
> -    reds->ping_id = 0;
> -    reds->net_test_id = 0;
> -    reds->net_test_stage = NET_TEST_STAGE_INVALID;
> -    reds->in_handler.end_pos = 0;
> -
> -    bitrate_per_sec = ~0;
> -    latency = 0;
> -
> +    reds->main_channel->shutdown(reds->main_channel);
> +    reds->main_channel = NULL;
>       reds_mig_cleanup();
>       reds->disconnecting = FALSE;
>   }
>
>   static void reds_mig_disconnect()
>   {
> -    if (reds->peer) {
> +    if (reds_main_channel_connected()) {
>           reds_disconnect();
>       } else {
>           reds_mig_cleanup();
>       }
>   }
>
> -static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
> -{
> -    for (;;) {
> -        uint8_t *buf = handler->buf;
> -        uint32_t pos = handler->end_pos;
> -        uint8_t *end = buf + pos;
> -        SpiceDataHeader *header;
> -        int n;
> -        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
> -        if (n<= 0) {
> -            if (n == 0) {
> -                return -1;
> -            }
> -            switch (errno) {
> -            case EAGAIN:
> -                return 0;
> -            case EINTR:
> -                break;
> -            case EPIPE:
> -                return -1;
> -            default:
> -                red_printf("%s", strerror(errno));
> -                return -1;
> -            }
> -        } else {
> -            pos += n;
> -            end = buf + pos;
> -            while (buf + sizeof(SpiceDataHeader)<= end&&
> -                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size<= end) {
> -                uint8_t *data = (uint8_t *)(header+1);
> -                size_t parsed_size;
> -                uint8_t *parsed;
> -                message_destructor_t parsed_free;
> -
> -
> -                buf += sizeof(SpiceDataHeader) + header->size;
> -                parsed = handler->parser(data, data + header->size, header->type,
> -                                         SPICE_VERSION_MINOR,&parsed_size,&parsed_free);
> -                if (parsed == NULL) {
> -                    red_printf("failed to parse message type %d", header->type);
> -                    return -1;
> -                }
> -                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
> -                parsed_free(parsed);
> -                if (handler->shut) {
> -                    return -1;
> -                }
> -            }
> -            memmove(handler->buf, buf, (handler->end_pos = end - buf));
> -        }
> -    }
> -}
> -
> -static RedsOutItem *new_out_item(uint32_t type)
> -{
> -    RedsOutItem *item;
> -
> -    item = spice_new(RedsOutItem, 1);
> -    ring_item_init(&item->link);
> -
> -    item->m = spice_marshaller_new();
> -    item->header = (SpiceDataHeader *)
> -        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
> -    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
> -
> -    item->header->serial = ++reds->serial;
> -    item->header->type = type;
> -    item->header->sub_list = 0;
> -
> -    return item;
> -}
> -
> -static void reds_out_item_free(RedsOutItem *item)
> -{
> -    spice_marshaller_destroy(item->m);
> -    free(item);
> -}
> -
> -static void reds_push_pipe_item(RedsOutItem *item)
> -{
> -    ring_add(&reds->outgoing.pipe,&item->link);
> -    reds_push();
> -}
> -
> -static void reds_send_channels()
> -{
> -    SpiceMsgChannels* channels_info;
> -    RedsOutItem *item;
> -    Channel *channel;
> -    int i;
> -
> -    item = new_out_item(SPICE_MSG_MAIN_CHANNELS_LIST);
> -    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds->num_of_channels * sizeof(SpiceChannelId));
> -    channels_info->num_of_channels = reds->num_of_channels;
> -    channel = reds->channels;
> -
> -    for (i = 0; i<  reds->num_of_channels; i++) {
> -        ASSERT(channel);
> -        channels_info->channels[i].type = channel->type;
> -        channels_info->channels[i].id = channel->id;
> -        channel = channel->next;
> -    }
> -    spice_marshall_msg_main_channels_list(item->m, channels_info);
> -    free(channels_info);
> -    reds_push_pipe_item(item);
> -}
> -
> -static int send_ping(int size)
> -{
> -    struct timespec time_space;
> -    RedsOutItem *item;
> -    SpiceMsgPing ping;
> -
> -    if (!reds->peer) {
> -        return FALSE;
> -    }
> -    item = new_out_item(SPICE_MSG_PING);
> -    ping.id = ++reds->ping_id;
> -    clock_gettime(CLOCK_MONOTONIC,&time_space);
> -    ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
> -    spice_marshall_msg_ping(item->m,&ping);
> -
> -    while (size>  0) {
> -        int now = MIN(ZERO_BUF_SIZE, size);
> -        size -= now;
> -        spice_marshaller_add_ref(item->m, zero_page, now);
> -    }
> -
> -    reds_push_pipe_item(item);
> -
> -    return TRUE;
> -}
> -
>   #ifdef RED_STATISTICS
>
>   static void do_ping_client(const char *opt, int has_interval, int interval)
>   {
> -    if (!reds->peer) {
> +    if (!reds_main_channel_connected()) {
>           red_printf("not connected to peer");
>           return;
>       }
>
>       if (!opt) {
> -        send_ping(0);
> +        main_channel_push_ping(reds->main_channel, 0);
>       } else if (!strcmp(opt, "on")) {
>           if (has_interval&&  interval>  0) {
>               reds->ping_interval = interval * 1000;
> @@ -910,7 +678,7 @@ static void do_ping_client(const char *opt, int has_interval, int interval)
>
>   static void ping_timer_cb()
>   {
> -    if (!reds->peer) {
> +    if (!reds_main_channel_connected()) {
>           red_printf("not connected to peer, ping off");
>           core->timer_cancel(reds->ping_timer);
>           return;
> @@ -921,27 +689,6 @@ static void ping_timer_cb()
>
>   #endif
>
> -static void reds_send_mouse_mode()
> -{
> -    SpiceMsgMainMouseMode mouse_mode;
> -    RedsOutItem *item;
> -
> -    if (!reds->peer) {
> -        return;
> -    }
> -
> -    item = new_out_item(SPICE_MSG_MAIN_MOUSE_MODE);
> -    mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
> -    if (reds->is_client_mouse_allowed) {
> -        mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
> -    }
> -    mouse_mode.current_mode = reds->mouse_mode;
> -
> -    spice_marshall_msg_main_mouse_mode(item->m,&mouse_mode);
> -
> -    reds_push_pipe_item(item);
> -}
> -
>   int reds_get_mouse_mode(void)
>   {
>       return reds->mouse_mode;
> @@ -954,7 +701,7 @@ static void reds_set_mouse_mode(uint32_t mode)
>       }
>       reds->mouse_mode = mode;
>       red_dispatcher_set_mouse_mode(reds->mouse_mode);
> -    reds_send_mouse_mode();
> +    main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
>   }
>
>   int reds_get_agent_mouse(void)
> @@ -978,26 +725,7 @@ static void reds_update_mouse_mode()
>           reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
>           return;
>       }
> -    reds_send_mouse_mode();
> -}
> -
> -static void reds_send_agent_connected()
> -{
> -    RedsOutItem *item;
> -
> -    item = new_out_item(SPICE_MSG_MAIN_AGENT_CONNECTED);
> -    reds_push_pipe_item(item);
> -}
> -
> -static void reds_send_agent_disconnected()
> -{
> -    SpiceMsgMainAgentDisconnect disconnect;
> -    RedsOutItem *item;
> -
> -    item = new_out_item(SPICE_MSG_MAIN_AGENT_DISCONNECTED);
> -    disconnect.error_code = SPICE_LINK_ERR_OK;
> -    spice_marshall_msg_main_agent_disconnected(item->m,&disconnect);
> -    reds_push_pipe_item(item);
> +    main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
>   }
>
>   static void reds_agent_remove()
> @@ -1008,7 +736,7 @@ static void reds_agent_remove()
>       vdagent = NULL;
>       reds_update_mouse_mode();
>
> -    if (!reds->peer || !sin) {
> +    if (!reds_main_channel_connected() || !sin) {
>           return;
>       }
>
> @@ -1024,27 +752,15 @@ static void reds_agent_remove()
>       }
>
>       reds_reset_vdp();
> -    reds_send_agent_disconnected();
> +    main_channel_push_agent_disconnected(reds->main_channel);
>   }
>
> -static void reds_send_tokens()
> +static void reds_push_tokens()
>   {
> -    SpiceMsgMainAgentTokens tokens;
> -    RedsOutItem *item;
> -
> -    if (!reds->peer) {
> -        return;
> -    }
> -
> -    item = new_out_item(SPICE_MSG_MAIN_AGENT_TOKEN);
> -    tokens.num_tokens = reds->agent_state.num_tokens;
> -    reds->agent_state.num_client_tokens += tokens.num_tokens;
> +    reds->agent_state.num_client_tokens += reds->agent_state.num_tokens;
>       ASSERT(reds->agent_state.num_client_tokens<= REDS_AGENT_WINDOW_SIZE);
> +    main_channel_push_tokens(reds->main_channel, reds->agent_state.num_tokens);
>       reds->agent_state.num_tokens = 0;
> -
> -    spice_marshall_msg_main_agent_token(item->m,&tokens);
> -
> -    reds_push_pipe_item(item);
>   }
>
>   static int write_to_vdi_port();
> @@ -1122,16 +838,12 @@ void vdi_read_buf_release(uint8_t *data, void *opaque)
>   static void dispatch_vdi_port_data(int port, VDIReadBuf *buf)
>   {
>       VDIPortState *state =&reds->agent_state;
> -    RedsOutItem *item;
>
>       switch (port) {
>       case VDP_CLIENT_PORT: {
>           if (reds->agent_state.connected) {
> -            item = new_out_item(SPICE_MSG_MAIN_AGENT_DATA);
> -
> -            spice_marshaller_add_ref_full(item->m, buf->data, buf->len,
> -                                          vdi_read_buf_release, buf);
> -            reds_push_pipe_item(item);
> +            main_channel_push_agent_data(reds->main_channel, buf->data, buf->len,
> +                                         vdi_read_buf_release, buf);
>           } else {
>               red_printf("throwing away, no client: %d", buf->len);
>               vdi_read_buf_release(buf->data, buf);
> @@ -1281,28 +993,110 @@ static void add_token()
>       VDIPortState *state =&reds->agent_state;
>
>       if (++state->num_tokens == REDS_TOKENS_TO_SEND) {
> -        reds_send_tokens();
> +        reds_push_tokens();
>       }
>   }
>
> -typedef struct MainMigrateData {
> -    uint32_t version;
> -    uint32_t serial;
> -    uint32_t ping_id;
> +int reds_num_of_channels()
> +{
> +    return reds ? reds->num_of_channels : 0;
> +}
>
> -    uint32_t agent_connected;
> -    uint32_t client_agent_started;
> -    uint32_t num_client_tokens;
> -    uint32_t send_tokens;
> +void reds_fill_channels(SpiceMsgChannels *channels_info)
> +{
> +    Channel *channel;
> +    int i;
>
> -    uint32_t read_state;
> -    VDIChunkHeader vdi_chunk_header;
> -    uint32_t recive_len;
> -    uint32_t message_recive_len;
> -    uint32_t read_buf_len;
> +    channels_info->num_of_channels = reds->num_of_channels;
> +    channel = reds->channels;
> +    for (i = 0; i<  reds->num_of_channels; i++) {
> +        ASSERT(channel);
> +        channels_info->channels[i].type = channel->type;
> +        channels_info->channels[i].id = channel->id;
> +        channel = channel->next;
> +    }
> +}
>
> -    uint32_t write_queue_size;
> -} MainMigrateData;
> +void reds_on_main_agent_start()
> +{
> +    reds->agent_state.client_agent_started = TRUE;
> +}
> +
> +void reds_on_main_agent_data(void *message, size_t size)
> +{
> +    RingItem *ring_item;
> +    VDAgentExtBuf *buf;
> +
> +    if (!reds->agent_state.num_client_tokens) {
> +        red_printf("token violation");
> +        reds_disconnect();
> +        return;
> +    }
> +    --reds->agent_state.num_client_tokens;
> +
> +    if (!vdagent) {
> +        add_token();
> +        return;
> +    }
> +
> +    if (!reds->agent_state.client_agent_started) {
> +        red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
> +        add_token();
> +        return;
> +    }
> +
> +    if (size>  SPICE_AGENT_MAX_DATA_SIZE) {
> +        red_printf("invalid agent message");
> +        reds_disconnect();
> +        return;
> +    }
> +
> +    if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
> +        red_printf("no agent free bufs");
> +        reds_disconnect();
> +        return;
> +    }
> +    ring_remove(ring_item);
> +    buf = (VDAgentExtBuf *)ring_item;
> +    buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
> +    buf->base.write_len = size + sizeof(VDIChunkHeader);
> +    buf->base.chunk_header.size = size;
> +    memcpy(buf->buf, message, size);
> +    ring_add(&reds->agent_state.write_queue, ring_item);
> +    write_to_vdi_port();
> +}
> +
> +void reds_on_main_migrate_connected()
> +{
> +    if (reds->mig_wait_connect) {
> +        reds_mig_cleanup();
> +    }
> +}
> +
> +void reds_on_main_migrate_connect_error()
> +{
> +    if (reds->mig_wait_connect) {
> +        reds_mig_cleanup();
> +    }
> +}
> +
> +void reds_on_main_mouse_mode_request(void *message, size_t size)
> +{
> +    switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
> +    case SPICE_MOUSE_MODE_CLIENT:
> +        if (reds->is_client_mouse_allowed) {
> +            reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
> +        } else {
> +            red_printf("client mouse is disabled");
> +        }
> +        break;
> +    case SPICE_MOUSE_MODE_SERVER:
> +        reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
> +        break;
> +    default:
> +        red_printf("unsupported mouse mode");
> +    }
> +}
>
>   #define MAIN_CHANNEL_MIG_DATA_VERSION 1
>
> @@ -1311,20 +1105,13 @@ typedef struct WriteQueueInfo {
>       uint32_t len;
>   } WriteQueueInfo;
>
> -static void main_channel_push_migrate_data_item()
> +void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data)
>   {
> -    RedsOutItem *item;
> -    MainMigrateData *data;
>       VDIPortState *state =&reds->agent_state;
>       int buf_index;
>       RingItem *now;
>
> -    item = new_out_item(SPICE_MSG_MIGRATE_DATA);
> -
> -    data = (MainMigrateData *)spice_marshaller_reserve_space(item->m, sizeof(MainMigrateData));
>       data->version = MAIN_CHANNEL_MIG_DATA_VERSION;
> -    data->serial = reds->serial;
> -    data->ping_id = reds->ping_id;
>
>       data->agent_connected = !!state->connected;
>       data->client_agent_started = state->client_agent_started;
> @@ -1340,7 +1127,7 @@ static void main_channel_push_migrate_data_item()
>           data->read_buf_len = state->current_read_buf->len;
>
>           if (data->read_buf_len - data->recive_len) {
> -            spice_marshaller_add_ref(item->m,
> +            spice_marshaller_add_ref(m,
>                                        state->current_read_buf->data,
>                                        data->read_buf_len - data->recive_len);
>           }
> @@ -1357,7 +1144,7 @@ static void main_channel_push_migrate_data_item()
>           WriteQueueInfo *queue_info;
>
>           queue_info = (WriteQueueInfo *)
> -            spice_marshaller_reserve_space(item->m,
> +            spice_marshaller_reserve_space(m,
>                                              data->write_queue_size * sizeof(queue_info[0]));
>
>           buf_index = 0;
> @@ -1366,15 +1153,13 @@ static void main_channel_push_migrate_data_item()
>               VDIPortBuf *buf = (VDIPortBuf *)now;
>               queue_info[buf_index].port = buf->chunk_header.port;
>               queue_info[buf_index++].len = buf->write_len;
> -            spice_marshaller_add_ref(item->m, buf->now, buf->write_len);
> +            spice_marshaller_add_ref(m, buf->now, buf->write_len);
>           }
>       }
> -
> -    reds_push_pipe_item((RedsOutItem *)item);
>   }
>
>
> -static int main_channel_restore_vdi_read_state(MainMigrateData *data, uint8_t **in_pos,
> +static int reds_main_channel_restore_vdi_read_state(MainMigrateData *data, uint8_t **in_pos,
>                                                  uint8_t *end)
>   {
>       VDIPortState *state =&reds->agent_state;
> @@ -1459,7 +1244,7 @@ static void free_tmp_internal_buf(VDIPortBuf *buf)
>       free(buf);
>   }
>
> -static int main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos, uint8_t *end)
> +static int reds_main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos, uint8_t *end)
>   {
>       VDIPortState *state =&reds->agent_state;
>       WriteQueueInfo *inf;
> @@ -1530,7 +1315,7 @@ static int main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos,
>       return TRUE;
>   }
>
> -static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end)
> +void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
>   {
>       VDIPortState *state =&reds->agent_state;
>       uint8_t *pos;
> @@ -1541,9 +1326,6 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
>           return;
>       }
>
> -    reds->serial = data->serial;
> -    reds->ping_id = data->ping_id;
> -
>       state->num_client_tokens = data->num_client_tokens;
>       ASSERT(state->num_client_tokens + data->write_queue_size<= REDS_AGENT_WINDOW_SIZE +
>                                                                   REDS_NUM_INTERNAL_AGENT_MESSAGES);
> @@ -1551,19 +1333,19 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
>
>       if (!data->agent_connected) {
>           if (state->connected) {
> -            reds_send_agent_connected();
> +            main_channel_push_agent_connected(reds->main_channel);
>           }
>           return;
>       }
>
>       if (!state->connected) {
> -        reds_send_agent_disconnected();
> +        main_channel_push_agent_disconnected(reds->main_channel);
>           return;
>       }
>
>       if (state->plug_generation>  1) {
> -        reds_send_agent_disconnected();
> -        reds_send_agent_connected();
> +        main_channel_push_agent_disconnected(reds->main_channel);
> +        main_channel_push_agent_connected(reds->main_channel);
>           return;
>       }
>
> @@ -1571,245 +1353,15 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
>
>       pos = (uint8_t *)(data + 1);
>
> -    if (!main_channel_restore_vdi_read_state(data,&pos, end)) {
> +    if (!reds_main_channel_restore_vdi_read_state(data,&pos, end)) {
>           return;
>       }
>
> -    main_channel_restore_vdi_wqueue(data, pos, end);
> +    reds_main_channel_restore_vdi_wqueue(data, pos, end);
>       ASSERT(state->num_client_tokens + state->num_tokens == REDS_AGENT_WINDOW_SIZE);
> -}
> -
> -static void reds_main_handle_message(void *opaque, size_t size, uint32_t type, void *message)
> -{
> -    switch (type) {
> -    case SPICE_MSGC_MAIN_AGENT_START:
> -        red_printf("agent start");
> -        if (!reds->peer) {
> -            return;
> -        }
> -        reds->agent_state.client_agent_started = TRUE;
> -        break;
> -    case SPICE_MSGC_MAIN_AGENT_DATA: {
> -        RingItem *ring_item;
> -        VDAgentExtBuf *buf;
> -
> -        if (!reds->agent_state.num_client_tokens) {
> -            red_printf("token violation");
> -            reds_disconnect();
> -            break;
> -        }
> -        --reds->agent_state.num_client_tokens;
> -
> -        if (!vdagent) {
> -            add_token();
> -            break;
> -        }
> -
> -        if (!reds->agent_state.client_agent_started) {
> -            red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
> -            add_token();
> -            break;
> -        }
> -
> -        if (size>  SPICE_AGENT_MAX_DATA_SIZE) {
> -            red_printf("invalid agent message");
> -            reds_disconnect();
> -            break;
> -        }
> -
> -        if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
> -            red_printf("no agent free bufs");
> -            reds_disconnect();
> -            break;
> -        }
> -        ring_remove(ring_item);
> -        buf = (VDAgentExtBuf *)ring_item;
> -        buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
> -        buf->base.write_len = size + sizeof(VDIChunkHeader);
> -        buf->base.chunk_header.size = size;
> -        memcpy(buf->buf, message, size);
> -        ring_add(&reds->agent_state.write_queue, ring_item);
> -        write_to_vdi_port();
> -        break;
> -    }
> -    case SPICE_MSGC_MAIN_AGENT_TOKEN:
> -        break;
> -    case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
> -        reds_send_channels();
> -        break;
> -    case SPICE_MSGC_MAIN_MIGRATE_CONNECTED:
> -        red_printf("connected");
> -        if (reds->mig_wait_connect) {
> -            reds_mig_cleanup();
> -        }
> -        break;
> -    case SPICE_MSGC_MAIN_MIGRATE_CONNECT_ERROR:
> -        red_printf("mig connect error");
> -        if (reds->mig_wait_connect) {
> -            reds_mig_cleanup();
> -        }
> -        break;
> -    case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST: {
> -        switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
> -        case SPICE_MOUSE_MODE_CLIENT:
> -            if (reds->is_client_mouse_allowed) {
> -                reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
> -            } else {
> -                red_printf("client mouse is disabled");
> -            }
> -            break;
> -        case SPICE_MOUSE_MODE_SERVER:
> -            reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
> -            break;
> -        default:
> -            red_printf("unsupported mouse mode");
> -        }
> -        break;
> -    }
> -    case SPICE_MSGC_PONG: {
> -        SpiceMsgPing *ping = (SpiceMsgPing *)message;
> -        uint64_t roundtrip;
> -        struct timespec ts;
> -
> -        clock_gettime(CLOCK_MONOTONIC,&ts);
> -        roundtrip = ts.tv_sec * 1000000LL + ts.tv_nsec / 1000LL - ping->timestamp;
> -
> -        if (ping->id == reds->net_test_id) {
> -            switch (reds->net_test_stage) {
> -            case NET_TEST_STAGE_WARMUP:
> -                reds->net_test_id++;
> -                reds->net_test_stage = NET_TEST_STAGE_LATENCY;
> -                break;
> -            case NET_TEST_STAGE_LATENCY:
> -                reds->net_test_id++;
> -                reds->net_test_stage = NET_TEST_STAGE_RATE;
> -                latency = roundtrip;
> -                break;
> -            case NET_TEST_STAGE_RATE:
> -                reds->net_test_id = 0;
> -                if (roundtrip<= latency) {
> -                    // probably high load on client or server result with incorrect values
> -                    latency = 0;
> -                    red_printf("net test: invalid values, latency %lu roundtrip %lu. assuming high"
> -                               "bandwidth", latency, roundtrip);
> -                    break;
> -                }
> -                bitrate_per_sec = (uint64_t)(NET_TEST_BYTES * 8) * 1000000 / (roundtrip - latency);
> -                red_printf("net test: latency %f ms, bitrate %lu bps (%f Mbps)%s",
> -                           (double)latency / 1000,
> -                           bitrate_per_sec,
> -                           (double)bitrate_per_sec / 1024 / 1024,
> -                           IS_LOW_BANDWIDTH() ? " LOW BANDWIDTH" : "");
> -                reds->net_test_stage = NET_TEST_STAGE_INVALID;
> -                break;
> -            default:
> -                red_printf("invalid net test stage, ping id %d test id %d stage %d",
> -                           ping->id,
> -                           reds->net_test_id,
> -                           reds->net_test_stage);
> -            }
> -            break;
> -        }
> -#ifdef RED_STATISTICS
> -        reds_update_stat_value(&reds->roundtrip_stat, roundtrip);
> -#endif
> -        break;
> -    }
> -    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
> -        main_channel_push_migrate_data_item();
> -        break;
> -    case SPICE_MSGC_MIGRATE_DATA:
> -        main_channel_recive_migrate_data((MainMigrateData *)message,
> -                                         ((uint8_t *)message) + size);
> -        reds->mig_target = FALSE;
> -        while (write_to_vdi_port() || read_from_vdi_port());
> -        break;
> -    case SPICE_MSGC_DISCONNECTING:
> -        break;
> -    default:
> -        red_printf("unexpected type %d", type);
> -    }
> -}
> -
> -static int reds_send_data()
> -{
> -    RedsOutgoingData *outgoing =&reds->outgoing;
> -    int n;
> -
> -    if (!outgoing->item) {
> -        return TRUE;
> -    }
>
> -    ASSERT(outgoing->vec_size);
> -    for (;;) {
> -        if ((n = reds->peer->cb_writev(reds->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
> -            switch (errno) {
> -            case EAGAIN:
> -                core->watch_update_mask(reds->peer->watch,
> -                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
> -                return FALSE;
> -            case EINTR:
> -                break;
> -            case EPIPE:
> -                reds_disconnect();
> -                return FALSE;
> -            default:
> -                red_printf("%s", strerror(errno));
> -                reds_disconnect();
> -                return FALSE;
> -            }
> -        } else {
> -            outgoing->vec = reds_iovec_skip(outgoing->vec, n,&outgoing->vec_size);
> -            if (!outgoing->vec_size) {
> -                reds_out_item_free(outgoing->item);
> -                outgoing->item = NULL;
> -                outgoing->vec = outgoing->vec_buf;
> -                return TRUE;
> -            }
> -        }
> -    }
> -}
> -
> -static void reds_push()
> -{
> -    RedsOutgoingData *outgoing =&reds->outgoing;
> -    RingItem *ring_item;
> -    RedsOutItem *item;
> -
> -    for (;;) {
> -        if (!reds->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
> -            return;
> -        }
> -        ring_remove(ring_item);
> -        outgoing->item = item = (RedsOutItem *)ring_item;
> -
> -        spice_marshaller_flush(item->m);
> -        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
> -
> -        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
> -                                                         outgoing->vec_buf,
> -                                                         REDS_MAX_SEND_IOVEC, 0);
> -        reds_send_data();
> -    }
> -}
> -
> -static void reds_main_event(int fd, int event, void *data)
> -{
> -    if (event&  SPICE_WATCH_EVENT_READ) {
> -        if (handle_incoming(reds->peer,&reds->in_handler)) {
> -            reds_disconnect();
> -        }
> -    }
> -    if (event&  SPICE_WATCH_EVENT_WRITE) {
> -        RedsOutgoingData *outgoing =&reds->outgoing;
> -        if (reds_send_data()) {
> -            reds_push();
> -            if (!outgoing->item&&  reds->peer) {
> -                core->watch_update_mask(reds->peer->watch,
> -                                        SPICE_WATCH_EVENT_READ);
> -            }
> -        }
> -    }
> +    reds->mig_target = FALSE;
> +    while (write_to_vdi_port() || read_from_vdi_port());
>   }
>
>   static int sync_write(RedsStreamContext *peer, void *in_buf, size_t n)
> @@ -1920,40 +1472,33 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
>       sync_write(link->peer,&error, sizeof(error));
>   }
>
> -static void reds_start_net_test()
> -{
> -    if (!reds->peer || reds->net_test_id) {
> -        return;
> -    }
> -
> -    if (send_ping(NET_TEST_WARMUP_BYTES)&&  send_ping(0)&&  send_ping(NET_TEST_BYTES)) {
> -        reds->net_test_id = reds->ping_id - 2;
> -        reds->net_test_stage = NET_TEST_STAGE_WARMUP;
> -    }
> -}
> -
> +// TODO: now that main is a separate channel this should
> +// actually be joined with reds_handle_other_links, ebcome reds_handle_link
>   static void reds_handle_main_link(RedLinkInfo *link)
>   {
> +    RedsStreamContext *peer;
> +    SpiceLinkMess *link_mess;
> +    uint32_t *caps;
>       uint32_t connection_id;
>
>       red_printf("");
> -
> +    link_mess = link->link_mess;
>       reds_disconnect();
>
> -    if (!link->link_mess->connection_id) {
> +    if (!link_mess->connection_id) {
>           reds_send_link_result(link, SPICE_LINK_ERR_OK);
>           while((connection_id = rand()) == 0);
>           reds->agent_state.num_tokens = 0;
>           memcpy(&(reds->taTicket),&taTicket, sizeof(reds->taTicket));
>           reds->mig_target = FALSE;
>       } else {
> -        if (link->link_mess->connection_id != reds->link_id) {
> +        if (link_mess->connection_id != reds->link_id) {
>               reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
>               reds_release_link(link);
>               return;
>           }
>           reds_send_link_result(link, SPICE_LINK_ERR_OK);
> -        connection_id = link->link_mess->connection_id;
> +        connection_id = link_mess->connection_id;
>           reds->mig_target = TRUE;
>       }
>
> @@ -1961,11 +1506,18 @@ static void reds_handle_main_link(RedLinkInfo *link)
>       reds->mig_inprogress = FALSE;
>       reds->mig_wait_connect = FALSE;
>       reds->mig_wait_disconnect = FALSE;
> -    reds->peer = link->peer;
> -    reds->in_handler.shut = FALSE;
>
>       reds_show_new_channel(link, connection_id);
> +    peer = link->peer;
> +    link->link_mess = NULL;
>       __reds_release_link(link);
> +    caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
> +    reds->main_channel = main_channel_init();
> +    reds->main_channel->link(reds->main_channel, peer, reds->mig_target, link_mess->num_common_caps,
> +                  link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
> +                  link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
> +    free(link_mess);
> +
>       if (vdagent) {
>           SpiceCharDeviceInterface *sif;
>           sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
> @@ -1975,32 +1527,15 @@ static void reds_handle_main_link(RedLinkInfo *link)
>           }
>           reds->agent_state.plug_generation++;
>       }
> -    reds->peer->watch = core->watch_add(reds->peer->socket,
> -                                        SPICE_WATCH_EVENT_READ,
> -                                        reds_main_event, NULL);
>
>       if (!reds->mig_target) {
> -        RedsOutItem *item;
> -        SpiceMsgMainInit init;
> -
> -        item = new_out_item(SPICE_MSG_MAIN_INIT);
> -        init.session_id = connection_id;
> -        init.display_channels_hint = red_dispatcher_count();
> -        init.current_mouse_mode = reds->mouse_mode;
> -        init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
> -        if (reds->is_client_mouse_allowed) {
> -            init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
> -        }
> -        init.agent_connected = !!vdagent;
> -        init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
>           reds->agent_state.num_client_tokens = REDS_AGENT_WINDOW_SIZE;
> -        init.multi_media_time = reds_get_mm_time() - MM_TIME_DELTA;
> -        init.ram_hint = red_dispatcher_qxl_ram_size();
> -
> -        spice_marshall_msg_main_init(item->m,&init);
> +        main_channel_push_init(reds->main_channel, connection_id, red_dispatcher_count(),
> +            reds->mouse_mode, reds->is_client_mouse_allowed,
> +            reds_get_mm_time() - MM_TIME_DELTA,
> +            red_dispatcher_qxl_ram_size());
>
> -        reds_push_pipe_item(item);
> -        reds_start_net_test();
> +        main_channel_start_net_test(reds->main_channel);
>           /* Now that we have a client, forward any pending agent data */
>           while (read_from_vdi_port());
>       }
> @@ -2064,23 +1599,9 @@ static void reds_handle_other_links(RedLinkInfo *link)
>       reds_send_link_result(link, SPICE_LINK_ERR_OK);
>       reds_show_new_channel(link, reds->link_id);
>       if (link_mess->channel_type == SPICE_CHANNEL_INPUTS&&  !link->peer->ssl) {
> -        RedsOutItem *item;
> -        SpiceMsgNotify notify;
>           char *mess = "keyboard channel is insecure";
>           const int mess_len = strlen(mess);
> -
> -        item = new_out_item(SPICE_MSG_NOTIFY);
> -
> -        notify.time_stamp = get_time_stamp();
> -        notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
> -        notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
> -        notify.what = SPICE_WARN_GENERAL;
> -        notify.message_len = mess_len;
> -
> -        spice_marshall_msg_notify(item->m,&notify);
> -        spice_marshaller_add(item->m, (uint8_t *)mess, mess_len + 1);
> -
> -        reds_push_pipe_item(item);
> +        main_channel_push_notify(reds->main_channel, (uint8_t*)mess, mess_len);
>       }
>       peer = link->peer;
>       link->link_mess = NULL;
> @@ -2681,9 +2202,7 @@ static void reds_init_ssl()
>
>   static void reds_exit()
>   {
> -    if (reds->peer) {
> -        close(reds->peer->socket);
> -    }
> +    main_channel_close(reds->main_channel);
>   #ifdef RED_STATISTICS
>       shm_unlink(reds->stat_shm_name);
>       free(reds->stat_shm_name);
> @@ -2725,7 +2244,7 @@ enum {
>
>   static inline void on_activating_ticketing()
>   {
> -    if (!ticketing_enabled&&  reds->peer) {
> +    if (!ticketing_enabled&&  reds_main_channel_connected()) {
>           red_printf("disconnecting");
>           reds_disconnect();
>       }
> @@ -2778,7 +2297,7 @@ typedef struct RedsMigCertPubKeyInfo {
>       uint32_t len;
>   } RedsMigCertPubKeyInfo;
>
> -static void reds_mig_release(void)
> +void reds_mig_release(void)
>   {
>       if (reds->mig_spice) {
>           free(reds->mig_spice->cert_subject);
> @@ -2791,22 +2310,10 @@ static void reds_mig_release(void)
>   static void reds_mig_continue(void)
>   {
>       RedsMigSpice *s = reds->mig_spice;
> -    SpiceMsgMainMigrationBegin migrate;
> -    RedsOutItem *item;
>
>       red_printf("");
> -    item = new_out_item(SPICE_MSG_MAIN_MIGRATE_BEGIN);
> -
> -    migrate.port = s->port;
> -    migrate.sport = s->sport;
> -    migrate.host_size = strlen(s->host) + 1;
> -    migrate.host_data = (uint8_t *)s->host;
> -    migrate.pub_key_type = s->cert_pub_key_type;
> -    migrate.pub_key_size = s->cert_pub_key_len;
> -    migrate.pub_key_data = s->cert_pub_key;
> -    spice_marshall_msg_main_migrate_begin(item->m,&migrate);
> -
> -    reds_push_pipe_item(item);
> +    main_channel_push_migrate_begin(reds->main_channel, s->port, s->sport,
> +        s->host, s->cert_pub_key_type, s->cert_pub_key_len, s->cert_pub_key);
>
>       reds_mig_release();
>
> @@ -2828,7 +2335,7 @@ static void reds_mig_started(void)
>           core->watch_update_mask(reds->secure_listen_watch, 0);
>       }
>
> -    if (reds->peer == NULL) {
> +    if (!reds_main_channel_connected()) {
>           red_printf("not connected to peer");
>           goto error;
>       }
> @@ -2849,8 +2356,6 @@ error:
>
>   static void reds_mig_finished(int completed)
>   {
> -    RedsOutItem *item;
> -
>       red_printf("");
>       if (reds->listen_watch != NULL) {
>           core->watch_update_mask(reds->listen_watch, SPICE_WATCH_EVENT_READ);
> @@ -2860,7 +2365,7 @@ static void reds_mig_finished(int completed)
>           core->watch_update_mask(reds->secure_listen_watch, SPICE_WATCH_EVENT_READ);
>       }
>
> -    if (reds->peer == NULL) {
> +    if (!reds_main_channel_connected()) {
>           red_printf("no peer connected");
>           return;
>       }
> @@ -2868,53 +2373,45 @@ static void reds_mig_finished(int completed)
>
>       if (completed) {
>           Channel *channel;
> -        SpiceMsgMigrate migrate;
>
>           reds->mig_wait_disconnect = TRUE;
>           core->timer_start(reds->mig_timer, MIGRATE_TIMEOUT);
>
> -        item = new_out_item(SPICE_MSG_MIGRATE);
> -        migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
> -        spice_marshall_msg_migrate(item->m,&migrate);
> -
> -        reds_push_pipe_item(item);
> +        // TODO: so now that main channel is separate, how exactly does migration of it work?
> +        //  - it can have an empty migrate - that seems ok
> +        //  - I can try to fill it's migrate, then move stuff from reds.c there, but a lot of data
> +        //    is in reds state right now.
> +        main_channel_push_migrate(reds->main_channel);
>           channel = reds->channels;
>           while (channel) {
>               channel->migrate(channel);
>               channel = channel->next;
>           }
>       } else {
> -        item = new_out_item(SPICE_MSG_MAIN_MIGRATE_CANCEL);
> -        reds_push_pipe_item(item);
> +        main_channel_push_migrate_cancel(reds->main_channel);
>           reds_mig_cleanup();
>       }
>   }
>
> -static void reds_mig_switch(void)
> +void reds_mig_switch(void)
>   {
> -    RedsMigSpice *s = reds->mig_spice;
> -    SpiceMsgMainMigrationSwitchHost migrate;
> -    RedsOutItem *item;
> -
> -    red_printf("");
> -    item = new_out_item(SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
> +    main_channel_push_migrate_switch(reds->main_channel);
> +}
>
> -    migrate.port = s->port;
> -    migrate.sport = s->sport;
> -    migrate.host_size = strlen(s->host) + 1;
> -    migrate.host_data = (uint8_t *)s->host;
> +void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate)
> +{
> +    RedsMigSpice *s = reds->mig_spice;
> +    migrate->port = s->port;
> +    migrate->sport = s->sport;
> +    migrate->host_size = strlen(s->host) + 1;
> +    migrate->host_data = (uint8_t *)s->host;
>       if (s->cert_subject) {
> -        migrate.cert_subject_size = strlen(s->cert_subject) + 1;
> -        migrate.cert_subject_data = (uint8_t *)s->cert_subject;
> +        migrate->cert_subject_size = strlen(s->cert_subject) + 1;
> +        migrate->cert_subject_data = (uint8_t *)s->cert_subject;
>       } else {
> -        migrate.cert_subject_size = 0;
> -        migrate.cert_subject_data = NULL;
> +        migrate->cert_subject_size = 0;
> +        migrate->cert_subject_data = NULL;
>       }
> -    spice_marshall_msg_main_migrate_switch_host(item->m,&migrate);
> -
> -    reds_push_pipe_item(item);
> -
> -    reds_mig_release();
>   }
>
>   static void migrate_timout(void *opaque)
> @@ -2938,18 +2435,11 @@ void reds_update_mm_timer(uint32_t mm_time)
>
>   void reds_enable_mm_timer()
>   {
> -    SpiceMsgMainMultiMediaTime time_mes;
> -    RedsOutItem *item;
> -
>       core->timer_start(reds->mm_timer, MM_TIMER_GRANULARITY_MS);
> -    if (!reds->peer) {
> +    if (!reds_main_channel_connected()) {
>           return;
>       }
> -
> -    item = new_out_item(SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
> -    time_mes.time = reds_get_mm_time() - MM_TIME_DELTA;
> -    spice_marshall_msg_main_multi_media_time(item->m,&time_mes);
> -    reds_push_pipe_item(item);
> +    main_channel_push_multi_media_time(reds->main_channel, reds_get_mm_time() - MM_TIME_DELTA);
>   }
>
>   void reds_desable_mm_timer()
> @@ -2970,7 +2460,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
>
>       vdagent = sin;
>       reds_update_mouse_mode();
> -    if (!reds->peer) {
> +    if (!reds_main_channel_connected()) {
>           return;
>       }
>       sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
> @@ -2984,7 +2474,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
>           return;
>       }
>
> -    reds_send_agent_connected();
> +    main_channel_push_agent_connected(reds->main_channel);
>   }
>
>   __visible__ void spice_server_char_device_wakeup(SpiceCharDeviceInstance* sin)
> @@ -3260,12 +2750,6 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
>       core = core_interface;
>       reds->listen_socket = -1;
>       reds->secure_listen_socket = -1;
> -    reds->peer = NULL;
> -    reds->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
> -    reds->in_handler.handle_message = reds_main_handle_message;
> -    ring_init(&reds->outgoing.pipe);
> -    reds->outgoing.vec = reds->outgoing.vec_buf;
> -
>       init_vd_agent_resources();
>
>       if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
> @@ -3317,6 +2801,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
>       if (reds->secure_listen_socket != -1) {
>           reds_init_ssl();
>       }
> +    reds->main_channel = NULL;
>       inputs_init();
>
>   #ifdef USE_SMARTCARD
> @@ -3416,7 +2901,7 @@ __visible__ int spice_server_set_ticket(SpiceServer *s,
>   {
>       ASSERT(reds == s);
>
> -    if (reds->peer) {
> +    if (reds_main_channel_connected()) {
>           if (fail_if_connected) {
>               return -1;
>           }
> @@ -3549,10 +3034,7 @@ __visible__ int spice_server_set_channel_security(SpiceServer *s, const char *ch
>   __visible__ int spice_server_get_sock_info(SpiceServer *s, struct sockaddr *sa, socklen_t *salen)
>   {
>       ASSERT(reds == s);
> -    if (!reds->peer) {
> -        return -1;
> -    }
> -    if (getsockname(reds->peer->socket, sa, salen)<  0) {
> +    if (main_channel_getsockname(reds->main_channel, sa, salen)<  0) {
>           return -1;
>       }
>       return 0;
> @@ -3561,10 +3043,7 @@ __visible__ int spice_server_get_sock_info(SpiceServer *s, struct sockaddr *sa,
>   __visible__ int spice_server_get_peer_info(SpiceServer *s, struct sockaddr *sa, socklen_t *salen)
>   {
>       ASSERT(reds == s);
> -    if (!reds->peer) {
> -        return -1;
> -    }
> -    if (getpeername(reds->peer->socket, sa, salen)<  0) {
> +    if (main_channel_getpeername(reds->main_channel, sa, salen)<  0) {
>           return -1;
>       }
>       return 0;
> @@ -3658,7 +3137,7 @@ __visible__ int spice_server_migrate_client_state(SpiceServer *s)
>   {
>       ASSERT(reds == s);
>
> -    if (!reds->peer) {
> +    if (!reds_main_channel_connected()) {
>           return SPICE_MIGRATE_CLIENT_NONE;
>       } else if (reds->mig_wait_connect) {
>           return SPICE_MIGRATE_CLIENT_WAITING;
> diff --git a/server/reds.h b/server/reds.h
> index e440804..e1a5ab7 100644
> --- a/server/reds.h
> +++ b/server/reds.h
> @@ -22,6 +22,9 @@
>   #include<openssl/ssl.h>
>   #include<sys/uio.h>
>   #include<spice/vd_agent.h>
> +#include "common/marshaller.h"
> +#include "common/messages.h"
> +#include "spice.h"
>
>   #define __visible__ __attribute__ ((visibility ("default")))
>
> @@ -92,5 +95,24 @@ extern uint64_t bitrate_per_sec;
>   // Temporary measures to make splitting reds.c to inputs_channel.c easier
>   void reds_disconnect(void);
>
> +// Temporary (?) for splitting main channel
> +typedef struct MainMigrateData MainMigrateData;
> +void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data);
> +void reds_fill_channels(SpiceMsgChannels *channels_info);
> +void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate);
> +void reds_mig_release(void);
> +int reds_num_of_channels(void);
> +#ifdef RED_STATISTICS
> +void reds_update_stat_value(uint32_t value);
> +#endif
> +
> +// callbacks from main channel messages
> +void reds_on_main_agent_start();
> +void reds_on_main_agent_data(void *message, size_t size);
> +void reds_on_main_migrate_connected();
> +void reds_on_main_migrate_connect_error();
> +void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end);
> +void reds_on_main_mouse_mode_request(void *message, size_t size);
> +
>   #endif
>


More information about the Spice-devel mailing list