[Spice-commits] 12 commits - server/inputs_channel.c server/main_channel.c server/main_channel.h server/Makefile.am server/red_channel.c server/red_channel.h server/reds.c server/reds.h tests/migrate.py

Alon Levy alon at kemper.freedesktop.org
Sun Jan 16 02:03:39 PST 2011


 server/Makefile.am      |    1 
 server/inputs_channel.c |  134 +++---
 server/main_channel.c   |  849 ++++++++++++++++++++++++++++++++++++++++++++
 server/main_channel.h   |   78 ++++
 server/red_channel.c    |  107 +----
 server/red_channel.h    |   11 
 server/reds.c           |  926 ++++++++++--------------------------------------
 server/reds.h           |   22 +
 tests/migrate.py        |  158 ++++++++
 9 files changed, 1431 insertions(+), 855 deletions(-)

New commits:
commit f16c2fccdd3862c5603db0b8eb4605284294321c
Author: Alon Levy <alevy at redhat.com>
Date:   Sat Jan 15 20:53:44 2011 +0200

    server/red_channel: fix segfault on red_channel_destroy if peer already removed

diff --git a/server/red_channel.c b/server/red_channel.c
index 40f3a1f..a13ef0e 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -365,7 +365,7 @@ void red_channel_destroy(RedChannel *channel)
 void red_channel_shutdown(RedChannel *channel)
 {
     red_printf("");
-    if (!channel->peer->shutdown) {
+    if (channel->peer && !channel->peer->shutdown) {
         channel->core->watch_update_mask(channel->peer->watch,
                                          SPICE_WATCH_EVENT_READ);
         red_channel_pipe_clear(channel);
commit 2809d847247dbb97e4c8118c99067cc14487f374
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Nov 4 16:03:48 2010 +0200

    server/inputs_channel: use outgoing marshaller in red_channel/RedChannel

diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index e53a634..b7ae55a 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -72,10 +72,18 @@ enum {
 
 typedef struct InputsPipeItem {
     PipeItem base;
-    SpiceMarshaller *m;
-    uint8_t *data;      /* If the marshaller malloced, pointer is here */
 } InputsPipeItem;
 
+typedef struct KeyModifiersPipeItem {
+    PipeItem base;
+    uint8_t modifiers;
+} KeyModifiersPipeItem;
+
+typedef struct InputsInitPipeItem {
+    PipeItem base;
+    uint8_t modifiers;
+} InputsInitPipeItem;
+
 static SpiceKbdInstance *keyboard = NULL;
 static SpiceMouseInstance *mouse = NULL;
 static SpiceTabletInstance *tablet = NULL;
@@ -210,13 +218,22 @@ static uint8_t kbd_get_leds(SpiceKbdInstance *sin)
     return sif->get_leds(sin);
 }
 
-static InputsPipeItem *inputs_pipe_item_new(InputsChannel *channel, int type)
+static InputsPipeItem *inputs_pipe_item_new(InputsChannel *inputs_channel, int type)
 {
     InputsPipeItem *item = spice_malloc(sizeof(InputsPipeItem));
 
-    red_channel_pipe_item_init(&channel->base, &item->base, type);
-    item->m = spice_marshaller_new();
-    item->data = NULL;
+    red_channel_pipe_item_init(&inputs_channel->base, &item->base, type);
+    return item;
+}
+
+static KeyModifiersPipeItem *inputs_key_modifiers_item_new(
+    InputsChannel *inputs_channel, uint8_t modifiers)
+{
+    KeyModifiersPipeItem *item = spice_malloc(sizeof(KeyModifiersPipeItem));
+
+    red_channel_pipe_item_init(&inputs_channel->base, &item->base,
+                               PIPE_ITEM_KEY_MODIFIERS);
+    item->modifiers = modifiers;
     return item;
 }
 
@@ -232,33 +249,43 @@ static void inputs_pipe_add_type(InputsChannel *channel, int type)
 static void inputs_channel_release_pipe_item(RedChannel *channel,
     PipeItem *base, int item_pushed)
 {
-    // All PipeItems we push are InputsPipeItem
-    InputsPipeItem *item = (InputsPipeItem*)base;
-
-    if (item->data) {
-        free(item->data);
-    }
-    spice_marshaller_destroy(item->m);
-    free(item);
+    free(base);
 }
 
 static void inputs_channel_send_item(RedChannel *channel, PipeItem *base)
 {
-    InputsPipeItem *item = SPICE_CONTAINEROF(base, InputsPipeItem, base);
-    SpiceMarshaller *m = item->m;
-    uint8_t *data;
-    size_t len;
-    int free_data;
+    InputsChannel *inputs_channel = (InputsChannel *)channel;
+    SpiceMarshaller *m = inputs_channel->base.send_data.marshaller;
 
     red_channel_reset_send_data(channel);
     red_channel_init_send_data(channel, base->type, base);
-    spice_marshaller_flush(m);
-    // TODO: use spice_marshaller_fill_iovec. Right now we are doing something stupid,
-    // namely copying twice. See reds.c.
-    data = spice_marshaller_linearize(m, 0, &len, &free_data);
-    item->data = (free_data && len > 0) ? data : NULL;
-    if (len > 0) {
-        red_channel_add_buf(channel, data, len);
+    switch (base->type) {
+        case PIPE_ITEM_KEY_MODIFIERS:
+        {
+            SpiceMsgInputsKeyModifiers key_modifiers;
+
+            key_modifiers.modifiers =
+                SPICE_CONTAINEROF(base, KeyModifiersPipeItem, base)->modifiers;
+            spice_marshall_msg_inputs_key_modifiers(m, &key_modifiers);
+        }
+        case PIPE_ITEM_INIT:
+        {
+            SpiceMsgInputsInit inputs_init;
+
+            inputs_init.keyboard_modifiers =
+                SPICE_CONTAINEROF(base, InputsInitPipeItem, base)->modifiers;
+            spice_marshall_msg_inputs_init(m, &inputs_init);
+        }
+        case PIPE_ITEM_MIGRATE:
+        {
+            SpiceMsgMigrate migrate;
+
+            migrate.flags = 0;
+            spice_marshall_msg_migrate(m, &migrate);
+            break;
+        }
+        default:
+            break;
     }
     red_channel_begin_send_message(channel);
 }
@@ -431,10 +458,12 @@ static void inputs_channel_on_outgoing_error(RedChannel *channel)
 
 static void inputs_shutdown(Channel *channel)
 {
-    ASSERT(g_inputs_channel == (InputsChannel *)channel->data);
-    if (g_inputs_channel) {
-        red_channel_shutdown(&g_inputs_channel->base);
-        g_inputs_channel->base.incoming.shut = TRUE;
+    InputsChannel *inputs_channel = (InputsChannel *)channel->data;
+    ASSERT(g_inputs_channel == inputs_channel);
+
+    if (inputs_channel) {
+        red_channel_shutdown(&inputs_channel->base);
+        inputs_channel->base.incoming.shut = TRUE;
         channel->data = NULL;
         g_inputs_channel = NULL;
     }
@@ -442,28 +471,22 @@ static void inputs_shutdown(Channel *channel)
 
 static void inputs_migrate(Channel *channel)
 {
-    InputsChannel *inputs_channel = (InputsChannel *)channel->data;
-    InputsPipeItem *pipe_item;
-    SpiceMarshaller *m;
-    SpiceMsgMigrate migrate;
+    InputsChannel *inputs_channel = channel->data;
+    InputsPipeItem *item;
 
-    ASSERT(g_inputs_channel == inputs_channel);
-    pipe_item = inputs_pipe_item_new(inputs_channel, PIPE_ITEM_MIGRATE);
-    m = pipe_item->m;
-    migrate.flags = 0;
-    spice_marshall_msg_migrate(m, &migrate);
-    red_channel_pipe_add(&inputs_channel->base, &pipe_item->base);
+    ASSERT(g_inputs_channel == (InputsChannel *)channel->data);
+    item = inputs_pipe_item_new(inputs_channel, PIPE_ITEM_MIGRATE);
+    red_channel_pipe_add(&inputs_channel->base, &item->base);
 }
 
-static void inputs_pipe_add_init(InputsChannel *channel)
+static void inputs_pipe_add_init(InputsChannel *inputs_channel)
 {
-    SpiceMsgInputsInit inputs_init;
-    InputsPipeItem *pipe_item = inputs_pipe_item_new(channel, PIPE_ITEM_INIT);
-    SpiceMarshaller *m = pipe_item->m;
+    InputsInitPipeItem *item = spice_malloc(sizeof(InputsInitPipeItem));
 
-    inputs_init.keyboard_modifiers = kbd_get_leds(keyboard);
-    spice_marshall_msg_inputs_init(m, &inputs_init);
-    red_channel_pipe_add(&channel->base, &pipe_item->base);
+    red_channel_pipe_item_init(&inputs_channel->base, &item->base,
+                               PIPE_ITEM_INIT);
+    item->modifiers = kbd_get_leds(keyboard);
+    red_channel_pipe_add(&inputs_channel->base, &item->base);
 }
 
 static int inputs_channel_config_socket(RedChannel *channel)
@@ -509,30 +532,25 @@ static void inputs_link(Channel *channel, RedsStreamContext *peer, int migration
     inputs_pipe_add_init(inputs_channel);
 }
 
-void inputs_send_keyboard_modifiers(uint8_t modifiers)
+static void inputs_push_keyboard_modifiers(uint8_t modifiers)
 {
-    SpiceMsgInputsKeyModifiers key_modifiers;
-    InputsPipeItem *pipe_item;
-    SpiceMarshaller *m;
+    KeyModifiersPipeItem *item;
 
     if (!g_inputs_channel || !red_channel_is_connected(&g_inputs_channel->base)) {
         return;
     }
-    pipe_item = inputs_pipe_item_new(g_inputs_channel, PIPE_ITEM_KEY_MODIFIERS);
-    m = pipe_item->m;
-    key_modifiers.modifiers = modifiers;
-    spice_marshall_msg_inputs_key_modifiers(m, &key_modifiers);
-    red_channel_pipe_add(&g_inputs_channel->base, &pipe_item->base);
+    item = inputs_key_modifiers_item_new(g_inputs_channel, modifiers);
+    red_channel_pipe_add(&g_inputs_channel->base, &item->base);
 }
 
 void inputs_on_keyboard_leds_change(void *opaque, uint8_t leds)
 {
-    inputs_send_keyboard_modifiers(leds);
+    inputs_push_keyboard_modifiers(leds);
 }
 
 static void key_modifiers_sender(void *opaque)
 {
-    inputs_send_keyboard_modifiers(kbd_get_leds(keyboard));
+    inputs_push_keyboard_modifiers(kbd_get_leds(keyboard));
 }
 
 void inputs_init(void)
commit fc3637bf32a56128314f8dc8825d267cbaac1a2a
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Jan 13 07:02:36 2011 +0200

    tests/migrate.py: add a migration test

diff --git a/tests/migrate.py b/tests/migrate.py
new file mode 100644
index 0000000..c01b107
--- /dev/null
+++ b/tests/migrate.py
@@ -0,0 +1,158 @@
+"""
+Spice Migration test
+
+Somewhat stressfull test of continuous migration with spice in VGA mode or QXL mode,
+depends on supplying an image in IMAGE variable (if no image is supplied then
+VGA mode since it will just be SeaBIOS).
+
+Dependencies:
+either qmp in python path or running with spice and qemu side by side:
+qemu/QMP/qmp.py
+spice/tests/migrate.py
+
+Will create two temporary unix sockets in /tmp
+Will leave a log file, migrate_test.log, in current directory.
+"""
+
+#
+# start one spiceclient, have two machines (active and target),
+# and repeat:
+#  active wait until it's active
+#  active client_migrate_info
+#  active migrate tcp:localhost:9000
+#  _wait for event of quit
+#  active stop, active<->passive
+# 
+# wait until it's active
+#  command query-status, if running good
+#  if not listen to events until event of running
+
+try:
+    import qmp
+except:
+    import sys
+    sys.path.append("../../qemu/QMP")
+    try:
+        import qmp
+    except:
+        print "can't find qmp"
+        raise SystemExit
+from subprocess import Popen, PIPE
+import os
+import time
+import socket
+import datetime
+import atexit
+
+QMP_1, QMP_2 = "/tmp/migrate_test.1.qmp", "/tmp/migrate_test.2.qmp"
+SPICE_PORT_1, SPICE_PORT_2 = 5911, 6911
+MIGRATE_PORT = 9000
+QEMU = "qemu.upstream"
+LOG_FILENAME = "migrate_log.log"
+IMAGE = "/store/images/f14_regular.qcow2"
+
+qemu_exec = os.popen("which %s" % QEMU).read().strip()
+
+def start_qemu(spice_port, qmp_filename, incoming_port=None):
+    incoming_args = []
+    if incoming_port:
+        incoming_args = ("-incoming tcp::%s" % incoming_port).split()
+    args = ([qemu_exec, "-qmp", "unix:%s,server,nowait" % qmp_filename,
+        "-spice", "disable-ticketing,port=%s" % spice_port]
+        + incoming_args)
+    if os.path.exists(IMAGE):
+        args += ["-m", "512", "-drive",
+                 "file=%s,index=0,media=disk,cache=unsafe" % IMAGE, "-snapshot"]
+    proc = Popen(args, executable=qemu_exec, stdin=PIPE, stdout=PIPE)
+    while not os.path.exists(qmp_filename):
+        time.sleep(0.1)
+    proc.qmp_filename = qmp_filename
+    proc.qmp = qmp.QEMUMonitorProtocol(qmp_filename)
+    while True:
+        try:
+            proc.qmp.connect()
+            break
+        except socket.error, err:
+            pass
+    proc.spice_port = spice_port
+    proc.incoming_port = incoming_port
+    return proc
+
+def start_spicec(spice_port):
+    return Popen(("spicec -h localhost -p %s" % spice_port).split(), executable="spicec")
+
+def wait_active(q, active):
+    events = ["RESUME"] if active else ["STOP"]
+    while True:
+        try:
+            ret = q.cmd("query-status")
+        except:
+            # ValueError
+            time.sleep(0.1)
+            continue
+        if ret and ret.has_key("return"):
+            if ret["return"]["running"] == active:
+                break
+        for e in q.get_events():
+            if e["event"] in events:
+                break
+        time.sleep(0.5)
+
+def wait_for_event(q, event):
+    while True:
+        for e in q.get_events():
+            if e["event"] == event:
+                return
+        time.sleep(0.5)
+
+def cleanup(*args):
+    print "doing cleanup"
+    os.system("killall %s" % qemu_exec)
+    for x in [QMP_1, QMP_2]:
+        if os.path.exists(x):
+            os.unlink(x)
+
+#################### Main #######################
+
+cleanup()
+atexit.register(cleanup)
+
+active = start_qemu(spice_port=SPICE_PORT_1, qmp_filename=QMP_1)
+target = start_qemu(spice_port=SPICE_PORT_2, qmp_filename=QMP_2,
+                    incoming_port=MIGRATE_PORT)
+
+i = 0
+spicec = None
+log = open(LOG_FILENAME, "a+")
+log.write("# "+str(datetime.datetime.now())+"\n")
+
+while True:
+    wait_active(active.qmp, True)
+    wait_active(target.qmp, False)
+    if spicec == None:
+        spicec = start_spicec(spice_port=SPICE_PORT_1)
+        wait_for_event(active.qmp, 'SPICE_INITIALIZED')
+        print "waiting for Enter to start migrations"
+        raw_input()
+    active.qmp.cmd('client_migrate_info', {'protocol':'spice',
+        'hostname':'localhost', 'port':target.spice_port})
+    active.qmp.cmd('migrate', {'uri': 'tcp:localhost:%s' % MIGRATE_PORT})
+    wait_active(active.qmp, False)
+    wait_active(target.qmp, True)
+    wait_for_event(target.qmp, 'SPICE_CONNECTED')
+    dead = active
+    dead.qmp.cmd("quit")
+    dead.qmp.close()
+    dead.wait()
+    new_spice_port = dead.spice_port
+    new_qmp_filename = dead.qmp_filename
+    log.write("# STDOUT dead %s\n" % dead.pid)
+    log.write(dead.stdout.read())
+    del dead
+    active = target
+    target = start_qemu(spice_port=new_spice_port,
+                        qmp_filename=new_qmp_filename,
+                        incoming_port=MIGRATE_PORT)
+    print i
+    i += 1
+
commit 8fabff2a1eef41d61207a1f5c94dbbf6ef10f202
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Nov 4 14:28:57 2010 +0200

    server/main_channel: use red_channel (most code is pipe send/marshall separation)

diff --git a/server/main_channel.c b/server/main_channel.c
index e42f173..f1fb4c6 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -38,6 +38,7 @@
 #include "common/messages.h"
 #include "reds.h"
 #include "main_channel.h"
+#include "red_channel.h"
 #include "generated_marshallers.h"
 
 #define ZERO_BUF_SIZE 4096
@@ -55,34 +56,67 @@ static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
 
 typedef struct RedsOutItem RedsOutItem;
 struct RedsOutItem {
-    RingItem link;
-    SpiceMarshaller *m;
-    SpiceDataHeader *header;
+    PipeItem base;
 };
 
-typedef struct RedsOutgoingData {
-    Ring pipe;
-    RedsOutItem *item;
-    int vec_size;
-    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
-    struct iovec *vec;
-} RedsOutgoingData;
-
-// TODO - remove and use red_channel.h
-typedef struct IncomingHandler {
-    spice_parse_channel_func_t parser;
+typedef struct PingPipeItem {
+    PipeItem base;
+    int size;
+} PingPipeItem;
+
+typedef struct MouseModePipeItem {
+    PipeItem base;
+    int current_mode;
+    int is_client_mouse_allowed;
+} MouseModePipeItem;
+
+typedef struct TokensPipeItem {
+    PipeItem base;
+    int tokens;
+} TokensPipeItem;
+
+typedef struct AgentDataPipeItem {
+    PipeItem base;
+    uint8_t* data;
+    size_t len;
+    spice_marshaller_item_free_func free_data;
     void *opaque;
-    int shut;
-    uint8_t buf[RECEIVE_BUF_SIZE];
-    uint32_t end_pos;
-    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
-} IncomingHandler;
+} AgentDataPipeItem;
+
+typedef struct InitPipeItem {
+    PipeItem base;
+    int connection_id;
+    int display_channels_hint;
+    int current_mouse_mode;
+    int is_client_mouse_allowed;
+    int multi_media_time;
+    int ram_hint;
+} InitPipeItem;
+
+typedef struct NotifyPipeItem {
+    PipeItem base;
+    uint8_t *mess;
+    int mess_len;
+} NotifyPipeItem;
+
+typedef struct MigrateBeginPipeItem {
+    PipeItem base;
+    int port;
+    int sport;
+    char *host;
+    uint16_t cert_pub_key_type;
+    uint32_t cert_pub_key_len;
+    uint8_t *cert_pub_key;
+} MigrateBeginPipeItem;
+
+typedef struct MultiMediaTimePipeItem {
+    PipeItem base;
+    int time;
+} MultiMediaTimePipeItem;
 
 typedef struct MainChannel {
-    RedsStreamContext *peer;
-    IncomingHandler in_handler;
-    RedsOutgoingData outgoing;
-    uint64_t serial; //migrate me
+    RedChannel base;
+    uint8_t recv_buf[RECEIVE_BUF_SIZE];
     uint32_t ping_id;
     uint32_t net_test_id;
     int net_test_stage;
@@ -98,45 +132,13 @@ enum NetTestStage {
 static uint64_t latency = 0;
 uint64_t bitrate_per_sec = ~0;
 
-static void main_channel_out_item_free(RedsOutItem *item);
-
-static void main_reset_outgoing(MainChannel *main_chan)
-{
-    RedsOutgoingData *outgoing = &main_chan->outgoing;
-    RingItem *ring_item;
-
-    if (outgoing->item) {
-        main_channel_out_item_free(outgoing->item);
-        outgoing->item = NULL;
-    }
-    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
-        RedsOutItem *out_item = (RedsOutItem *)ring_item;
-        ring_remove(ring_item);
-        main_channel_out_item_free(out_item);
-    }
-    outgoing->vec_size = 0;
-    outgoing->vec = outgoing->vec_buf;
-}
-
-// ALON from reds_disconnect
 static void main_disconnect(MainChannel *main_chan)
 {
-    if (!main_chan || !main_chan->peer) {
-        return;
-    }
-    main_reset_outgoing(main_chan);
-    core->watch_remove(main_chan->peer->watch);
-    main_chan->peer->watch = NULL;
-    main_chan->peer->cb_free(main_chan->peer);
-    main_chan->peer = NULL;
-    main_chan->in_handler.shut = TRUE;
-    main_chan->serial = 0;
     main_chan->ping_id = 0;
     main_chan->net_test_id = 0;
     main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
-    main_chan->in_handler.end_pos = 0;
+    red_channel_destroy(&main_chan->base);
 
-    // TODO: Should probably reset these on the ping start, not here
     latency = 0;
     bitrate_per_sec = ~0;
 }
@@ -149,233 +151,199 @@ void main_channel_start_net_test(Channel *channel)
         return;
     }
 
-    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES) &&
-                            main_channel_push_ping(channel, 0) &&
-                            main_channel_push_ping(channel, NET_TEST_BYTES)) {
+    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)
+        && main_channel_push_ping(channel, 0)
+        && main_channel_push_ping(channel, NET_TEST_BYTES)) {
         main_chan->net_test_id = main_chan->ping_id - 2;
         main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
     }
 }
 
-static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+static RedsOutItem *main_pipe_item_new(MainChannel *main_chan, int type)
 {
-    for (;;) {
-        uint8_t *buf = handler->buf;
-        uint32_t pos = handler->end_pos;
-        uint8_t *end = buf + pos;
-        SpiceDataHeader *header;
-        int n;
-        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
-        if (n <= 0) {
-            if (n == 0) {
-                return -1;
-            }
-            switch (errno) {
-            case EAGAIN:
-                return 0;
-            case EINTR:
-                break;
-            case EPIPE:
-                return -1;
-            default:
-                red_printf("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            pos += n;
-            end = buf + pos;
-            while (buf + sizeof(SpiceDataHeader) <= end &&
-                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
-                uint8_t *data = (uint8_t *)(header+1);
-                size_t parsed_size;
-                uint8_t *parsed;
-                message_destructor_t parsed_free;
-
-                buf += sizeof(SpiceDataHeader) + header->size;
-                parsed = handler->parser(data, data + header->size, header->type,
-                                         SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
-                if (parsed == NULL) {
-                    red_printf("failed to parse message type %d", header->type);
-                    return -1;
-                }
-                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
-                parsed_free(parsed);
-                if (handler->shut) {
-                    return -1;
-                }
-            }
-            memmove(handler->buf, buf, (handler->end_pos = end - buf));
-        }
-    }
+    RedsOutItem *item = spice_malloc(sizeof(RedsOutItem));
+
+    red_channel_pipe_item_init(&main_chan->base, &item->base, type);
+    return item;
 }
 
-static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
+static MouseModePipeItem *main_mouse_mode_item_new(MainChannel *main_chan,
+    int current_mode, int is_client_mouse_allowed)
 {
-    RedsOutItem *item;
-
-    item = spice_new(RedsOutItem, 1);
-    ring_item_init(&item->link);
+    MouseModePipeItem *item = spice_malloc(sizeof(MouseModePipeItem));
 
-    item->m = spice_marshaller_new();
-    item->header = (SpiceDataHeader *)
-        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
-    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_MOUSE_MODE);
+    item->current_mode = current_mode;
+    item->is_client_mouse_allowed = is_client_mouse_allowed;
+    return item;
+}
 
-    item->header->serial = ++main_chan->serial;
-    item->header->type = type;
-    item->header->sub_list = 0;
+static PingPipeItem *main_ping_item_new(MainChannel *channel, int size)
+{
+    PingPipeItem *item = spice_malloc(sizeof(PingPipeItem));
 
+    red_channel_pipe_item_init(&channel->base, &item->base, SPICE_MSG_PING);
+    item->size = size;
     return item;
 }
 
-static void main_channel_out_item_free(RedsOutItem *item)
+static TokensPipeItem *main_tokens_item_new(MainChannel *main_chan, int tokens)
 {
-    spice_marshaller_destroy(item->m);
-    free(item);
+    TokensPipeItem *item = spice_malloc(sizeof(TokensPipeItem));
+
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_AGENT_TOKEN);
+    item->tokens = tokens;
+    return item;
 }
 
-static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int *vec_size)
+static AgentDataPipeItem *main_agent_data_item_new(MainChannel *channel,
+           uint8_t* data, size_t len,
+           spice_marshaller_item_free_func free_data, void *opaque)
 {
-    struct iovec *now = vec;
+    AgentDataPipeItem *item = spice_malloc(sizeof(AgentDataPipeItem));
 
-    while (skip && skip >= now->iov_len) {
-        skip -= now->iov_len;
-        --*vec_size;
-        now++;
-    }
-    now->iov_base = (uint8_t *)now->iov_base + skip;
-    now->iov_len -= skip;
-    return now;
+    red_channel_pipe_item_init(&channel->base, &item->base, SPICE_MSG_MAIN_AGENT_DATA);
+    item->data = data;
+    item->len = len;
+    item->free_data = free_data;
+    item->opaque = opaque;
+    return item;
 }
 
-static int main_channel_send_data(MainChannel *main_chan)
+static InitPipeItem *main_init_item_new(MainChannel *main_chan,
+    int connection_id, int display_channels_hint, int current_mouse_mode,
+    int is_client_mouse_allowed, int multi_media_time,
+    int ram_hint)
 {
-    RedsOutgoingData *outgoing = &main_chan->outgoing;
-    int n;
-
-    if (!outgoing->item) {
-        return TRUE;
-    }
+    InitPipeItem *item = spice_malloc(sizeof(InitPipeItem));
 
-    ASSERT(outgoing->vec_size);
-    for (;;) {
-        if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
-            switch (errno) {
-            case EAGAIN:
-                core->watch_update_mask(main_chan->peer->watch,
-                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
-                return FALSE;
-            case EINTR:
-                break;
-            case EPIPE:
-                reds_disconnect();
-                return FALSE;
-            default:
-                red_printf("%s", strerror(errno));
-                reds_disconnect();
-                return FALSE;
-            }
-        } else {
-            outgoing->vec = main_channel_iovec_skip(outgoing->vec, n, &outgoing->vec_size);
-            if (!outgoing->vec_size) {
-                main_channel_out_item_free(outgoing->item);
-                outgoing->item = NULL;
-                outgoing->vec = outgoing->vec_buf;
-                return TRUE;
-            }
-        }
-    }
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_INIT);
+    item->connection_id = connection_id;
+    item->display_channels_hint = display_channels_hint;
+    item->current_mouse_mode = current_mouse_mode;
+    item->is_client_mouse_allowed = is_client_mouse_allowed;
+    item->multi_media_time = multi_media_time;
+    item->ram_hint = ram_hint;
+    return item;
 }
 
-static void main_channel_push(MainChannel *main_chan)
+static NotifyPipeItem *main_notify_item_new(MainChannel *main_chan,
+                                        uint8_t *mess, const int mess_len)
 {
-    RedsOutgoingData *outgoing = &main_chan->outgoing;
-    RingItem *ring_item;
-    RedsOutItem *item;
+    NotifyPipeItem *item = spice_malloc(sizeof(NotifyPipeItem));
 
-    for (;;) {
-        if (!main_chan->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
-            return;
-        }
-        ring_remove(ring_item);
-        outgoing->item = item = (RedsOutItem *)ring_item;
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_NOTIFY);
+    item->mess = mess;
+    item->mess_len = mess_len;
+    return item;
+}
 
-        spice_marshaller_flush(item->m);
-        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
+static MigrateBeginPipeItem *main_migrate_begin_item_new(
+    MainChannel *main_chan, int port, int sport,
+    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
+    uint8_t *cert_pub_key)
+{
+    MigrateBeginPipeItem *item = spice_malloc(sizeof(MigrateBeginPipeItem));
 
-        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
-                                                         outgoing->vec_buf,
-                                                         REDS_MAX_SEND_IOVEC, 0);
-        main_channel_send_data(main_chan);
-    }
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    item->port = port;
+    item->sport = sport;
+    item->host = host;
+    item->cert_pub_key_type = cert_pub_key_type;
+    item->cert_pub_key_len = cert_pub_key_len;
+    item->cert_pub_key = cert_pub_key;
+    return item;
 }
 
-static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem *item)
+static MultiMediaTimePipeItem *main_multi_media_time_item_new(
+    MainChannel *main_chan, int time)
 {
-    ring_add(&main_chan->outgoing.pipe, &item->link);
-    main_channel_push(main_chan);
+    MultiMediaTimePipeItem *item;
+
+    item = spice_malloc(sizeof(MultiMediaTimePipeItem));
+    red_channel_pipe_item_init(&main_chan->base, &item->base,
+                               SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
+    item->time = time;
+    return item;
 }
 
 static void main_channel_push_channels(MainChannel *main_chan)
 {
-    SpiceMsgChannels* channels_info;
     RedsOutItem *item;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
-    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds_num_of_channels() * sizeof(SpiceChannelId));
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_channels(MainChannel *main_chan)
+{
+    SpiceMsgChannels* channels_info;
+
+    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels)
+                            + reds_num_of_channels() * sizeof(SpiceChannelId));
     reds_fill_channels(channels_info);
-    spice_marshall_msg_main_channels_list(item->m, channels_info);
+    spice_marshall_msg_main_channels_list(
+        main_chan->base.send_data.marshaller, channels_info);
     free(channels_info);
-    main_channel_push_pipe_item(main_chan, item);
 }
 
 int main_channel_push_ping(Channel *channel, int size)
 {
-    struct timespec time_space;
-    RedsOutItem *item;
-    SpiceMsgPing ping;
     MainChannel *main_chan = channel->data;
-
-    if (!main_chan) {
+    PingPipeItem *item;
+    
+    if (main_chan == NULL) {
         return FALSE;
     }
-    item = new_out_item(main_chan, SPICE_MSG_PING);
+    item = main_ping_item_new(main_chan, size);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+    return TRUE;
+}
+
+static void main_channel_marshall_ping(MainChannel *main_chan, int size)
+{
+    struct timespec time_space;
+    SpiceMsgPing ping;
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
+
     ping.id = ++main_chan->ping_id;
     clock_gettime(CLOCK_MONOTONIC, &time_space);
     ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
-    spice_marshall_msg_ping(item->m, &ping);
+    spice_marshall_msg_ping(m, &ping);
 
     while (size > 0) {
         int now = MIN(ZERO_BUF_SIZE, size);
         size -= now;
-        spice_marshaller_add_ref(item->m, zero_page, now);
+        spice_marshaller_add_ref(m, zero_page, now);
     }
+}
 
-    main_channel_push_pipe_item(main_chan, item);
+void main_channel_push_mouse_mode(Channel *channel, int current_mode,
+                                  int is_client_mouse_allowed)
+{
+    MainChannel *main_chan = channel->data;
+    MouseModePipeItem *item;
 
-    return TRUE;
+    item = main_mouse_mode_item_new(main_chan, current_mode,
+                                    is_client_mouse_allowed);
+    red_channel_pipe_add(&main_chan->base, &item->base);
 }
 
-void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed)
+static void main_channel_marshall_mouse_mode(MainChannel *main_chan, int current_mode, int is_client_mouse_allowed)
 {
     SpiceMsgMainMouseMode mouse_mode;
-    RedsOutItem *item;
-    MainChannel *main_chan;
-
-    if (!channel) {
-        return;
-    }
-    main_chan = channel->data;
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
     mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
     if (is_client_mouse_allowed) {
         mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
     }
     mouse_mode.current_mode = current_mode;
-
-    spice_marshall_msg_main_mouse_mode(item->m, &mouse_mode);
-
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_mouse_mode(main_chan->base.send_data.marshaller,
+                                       &mouse_mode);
 }
 
 void main_channel_push_agent_connected(Channel *channel)
@@ -383,186 +351,319 @@ void main_channel_push_agent_connected(Channel *channel)
     RedsOutItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
-    main_channel_push_pipe_item(main_chan, item);
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
+    red_channel_pipe_add(&main_chan->base, &item->base);
 }
 
 void main_channel_push_agent_disconnected(Channel *channel)
 {
-    SpiceMsgMainAgentDisconnect disconnect;
     RedsOutItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_agent_disconnected(MainChannel *main_chan)
+{
+    SpiceMsgMainAgentDisconnect disconnect;
+
     disconnect.error_code = SPICE_LINK_ERR_OK;
-    spice_marshall_msg_main_agent_disconnected(item->m, &disconnect);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_agent_disconnected(
+        main_chan->base.send_data.marshaller, &disconnect);
 }
 
 void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
 {
-    SpiceMsgMainAgentTokens tokens;
-    RedsOutItem *item;
     MainChannel *main_chan = channel->data;
+    TokensPipeItem *item = main_tokens_item_new(main_chan, num_tokens);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_tokens(MainChannel *main_chan, uint32_t num_tokens)
+{
+    SpiceMsgMainAgentTokens tokens;
 
-    if (!main_chan) {
-        return;
-    }
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
     tokens.num_tokens = num_tokens;
-    spice_marshall_msg_main_agent_token(item->m, &tokens);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_agent_token(
+        main_chan->base.send_data.marshaller, &tokens);
 }
 
 void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
            spice_marshaller_item_free_func free_data, void *opaque)
 {
-    RedsOutItem *item;
     MainChannel *main_chan = channel->data;
+    AgentDataPipeItem *item;
+
+    item = main_agent_data_item_new(main_chan, data, len, free_data, opaque);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
-    spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
-    main_channel_push_pipe_item(main_chan, item);
+static void main_channel_marshall_agent_data(MainChannel *main_chan,
+                                  AgentDataPipeItem *item)
+{
+    spice_marshaller_add_ref_full(main_chan->base.send_data.marshaller,
+        item->data, item->len, item->free_data, item->opaque);
 }
 
 static void main_channel_push_migrate_data_item(MainChannel *main_chan)
 {
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
-    SpiceMarshaller *m = item->m;
+    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE_DATA);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_migrate_data_item(MainChannel *main_chan)
+{
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
     MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
 
     reds_marshall_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
-    data->serial = main_chan->serial;
+    data->serial = red_channel_get_message_serial(&main_chan->base);
     data->ping_id = main_chan->ping_id;
-    main_channel_push_pipe_item(main_chan, item);
 }
 
-static void main_channel_receive_migrate_data(MainChannel *main_chan, MainMigrateData *data, uint8_t *end)
+static void main_channel_receive_migrate_data(MainChannel *main_chan,
+                                  MainMigrateData *data, uint8_t *end)
 {
-    main_chan->serial = data->serial;
+    red_channel_set_message_serial(&main_chan->base, data->serial);
     main_chan->ping_id = data->ping_id;
 }
 
-void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
-    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+void main_channel_push_init(Channel *channel, int connection_id,
+    int display_channels_hint, int current_mouse_mode,
+    int is_client_mouse_allowed, int multi_media_time,
     int ram_hint)
 {
-    RedsOutItem *item;
-    SpiceMsgMainInit init;
+    InitPipeItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
-    init.session_id = connection_id;
-    init.display_channels_hint = display_channels_hint;
-    init.current_mouse_mode = current_mouse_mode;
+    item = main_init_item_new(main_chan,
+             connection_id, display_channels_hint, current_mouse_mode,
+             is_client_mouse_allowed, multi_media_time, ram_hint);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_init(MainChannel *main_chan,
+                                       InitPipeItem *item)
+{
+    SpiceMsgMainInit init;
+
+    init.session_id = item->connection_id;
+    init.display_channels_hint = item->display_channels_hint;
+    init.current_mouse_mode = item->current_mouse_mode;
     init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
-    if (is_client_mouse_allowed) {
+    if (item->is_client_mouse_allowed) {
         init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
     }
     init.agent_connected = reds_has_vdagent();
     init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
-    init.multi_media_time = multi_media_time;
-    init.ram_hint = ram_hint;
-    spice_marshall_msg_main_init(item->m, &init);
-    main_channel_push_pipe_item(main_chan, item);
+    init.multi_media_time = item->multi_media_time;
+    init.ram_hint = item->ram_hint;
+    spice_marshall_msg_main_init(main_chan->base.send_data.marshaller, &init);
 }
 
 void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len)
 {
-    // TODO possible free-then-use bug - caller frees mess after this, but is that pointer being
-    // used by spice_marshaller?
-    RedsOutItem *item;
-    SpiceMsgNotify notify;
     MainChannel *main_chan = channel->data;
+    NotifyPipeItem *item = main_notify_item_new(main_chan, mess, mess_len);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
 
-    item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
-    notify.time_stamp = get_time_stamp();
+static void main_channel_marshall_notify(MainChannel *main_chan, NotifyPipeItem *item)
+{
+    SpiceMsgNotify notify;
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
+
+    notify.time_stamp = get_time_stamp(); // TODO - move to main_new_notify_item
     notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
     notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
     notify.what = SPICE_WARN_GENERAL;
-    notify.message_len = mess_len;
-    spice_marshall_msg_notify(item->m, &notify);
-    spice_marshaller_add(item->m, mess, mess_len + 1);
-    main_channel_push_pipe_item(main_chan, item);
+    notify.message_len = item->mess_len;
+    spice_marshall_msg_notify(m, &notify);
+    spice_marshaller_add(m, item->mess, item->mess_len + 1);
 }
 
-void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
-    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key)
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport,
+    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
+    uint8_t *cert_pub_key)
 {
     MainChannel *main_chan = channel->data;
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    MigrateBeginPipeItem *item = main_migrate_begin_item_new(main_chan, port,
+        sport, host, cert_pub_key_type, cert_pub_key_len, cert_pub_key);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_migrate_begin(MainChannel *main_chan,
+    MigrateBeginPipeItem *item)
+{
     SpiceMsgMainMigrationBegin migrate;
 
-    migrate.port = port;
-    migrate.sport = sport;
-    migrate.host_size = strlen(host) + 1;
-    migrate.host_data = (uint8_t *)host;
-    migrate.pub_key_type = cert_pub_key_type;
-    migrate.pub_key_size = cert_pub_key_len;
-    migrate.pub_key_data = cert_pub_key;
-    spice_marshall_msg_main_migrate_begin(item->m, &migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    migrate.port = item->port;
+    migrate.sport = item->sport;
+    migrate.host_size = strlen(item->host) + 1;
+    migrate.host_data = (uint8_t *)item->host;
+    migrate.pub_key_type = item->cert_pub_key_type;
+    migrate.pub_key_size = item->cert_pub_key_len;
+    migrate.pub_key_data = item->cert_pub_key;
+    spice_marshall_msg_main_migrate_begin(main_chan->base.send_data.marshaller,
+                                          &migrate);
 }
 
 void main_channel_push_migrate(Channel *channel)
 {
-    RedsOutItem *item;
-    SpiceMsgMigrate migrate;
     MainChannel *main_chan = channel->data;
+    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE);
+
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static void main_channel_marshall_migrate(MainChannel *main_chan)
+{
+    SpiceMsgMigrate migrate;
 
-    item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
     migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
-    spice_marshall_msg_migrate(item->m, &migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_migrate(main_chan->base.send_data.marshaller, &migrate);
 }
 
 void main_channel_push_migrate_cancel(Channel *channel)
 {
     MainChannel *main_chan = channel->data;
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
+    RedsOutItem *item = main_pipe_item_new(main_chan,
+                                           SPICE_MSG_MAIN_MIGRATE_CANCEL);
 
-    main_channel_push_pipe_item(main_chan, item);
+    red_channel_pipe_add(&main_chan->base, &item->base);
 }
 
 void main_channel_push_multi_media_time(Channel *channel, int time)
 {
-    SpiceMsgMainMultiMediaTime time_mes;
-    RedsOutItem *item;
     MainChannel *main_chan = channel->data;
 
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
-    time_mes.time = time;
-    spice_marshall_msg_main_multi_media_time(item->m, &time_mes);
-    main_channel_push_pipe_item(main_chan, item);
+    MultiMediaTimePipeItem *item =
+        main_multi_media_time_item_new(main_chan, time);
+    red_channel_pipe_add(&main_chan->base, &item->base);
+}
+
+static PipeItem *main_migrate_switch_item_new(MainChannel *main_chan)
+{
+    PipeItem *item = spice_malloc(sizeof(*item));
+
+    red_channel_pipe_item_init(&main_chan->base, item,
+                               SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+    return item;
 }
 
 void main_channel_push_migrate_switch(Channel *channel)
 {
+    MainChannel *main_chan = channel->data;
+
+    red_channel_pipe_add(&main_chan->base,
+        main_migrate_switch_item_new(main_chan));
+}
+
+static void main_channel_marshall_migrate_switch(MainChannel *main_chan)
+{
     SpiceMsgMainMigrationSwitchHost migrate;
-    RedsOutItem *item;
-    MainChannel *main_chan;
-    
-    if (!channel) {
-        return;
-    }
-    main_chan = channel->data;
+
     red_printf("");
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+
     reds_fill_mig_switch(&migrate);
-    spice_marshall_msg_main_migrate_switch_host(item->m, &migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_migrate_switch_host(
+        main_chan->base.send_data.marshaller, &migrate);
+
     reds_mig_release();
 }
 
-static void main_channel_handle_message(void *opaque, size_t size, uint32_t type, void *message)
+static void main_channel_marshall_multi_media_time(MainChannel *main_chan,
+    MultiMediaTimePipeItem *item)
+{
+    SpiceMsgMainMultiMediaTime time_mes;
+
+    time_mes.time = item->time;
+    spice_marshall_msg_main_multi_media_time(
+        main_chan->base.send_data.marshaller, &time_mes);
+}
+
+static void main_channel_send_item(RedChannel *channel, PipeItem *base)
+{
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
+
+    red_channel_reset_send_data(channel);
+    red_channel_init_send_data(channel, base->type, base);
+    switch (base->type) {
+        case SPICE_MSG_MAIN_CHANNELS_LIST:
+            main_channel_marshall_channels(main_chan);
+            break;
+        case SPICE_MSG_PING:
+            main_channel_marshall_ping(main_chan,
+                SPICE_CONTAINEROF(base, PingPipeItem, base)->size);
+            break;
+        case SPICE_MSG_MAIN_MOUSE_MODE:
+            {
+                MouseModePipeItem *item =
+                    SPICE_CONTAINEROF(base, MouseModePipeItem, base);
+                main_channel_marshall_mouse_mode(main_chan,
+                    item->current_mode, item->is_client_mouse_allowed);
+                break;
+            }
+        case SPICE_MSG_MAIN_AGENT_DISCONNECTED:
+            main_channel_marshall_agent_disconnected(main_chan);
+            break;
+        case SPICE_MSG_MAIN_AGENT_TOKEN:
+            main_channel_marshall_tokens(main_chan,
+                SPICE_CONTAINEROF(base, TokensPipeItem, base)->tokens);
+            break;
+        case SPICE_MSG_MAIN_AGENT_DATA:
+            main_channel_marshall_agent_data(main_chan,
+                SPICE_CONTAINEROF(base, AgentDataPipeItem, base));
+            break;
+        case SPICE_MSG_MIGRATE_DATA:
+            main_channel_marshall_migrate_data_item(main_chan);
+            break;
+        case SPICE_MSG_MAIN_INIT:
+            main_channel_marshall_init(main_chan,
+                SPICE_CONTAINEROF(base, InitPipeItem, base));
+            break;
+        case SPICE_MSG_NOTIFY:
+            main_channel_marshall_notify(main_chan,
+                SPICE_CONTAINEROF(base, NotifyPipeItem, base));
+            break;
+        case SPICE_MSG_MIGRATE:
+            main_channel_marshall_migrate(main_chan);
+            break;
+        case SPICE_MSG_MAIN_MIGRATE_BEGIN:
+            main_channel_marshall_migrate_begin(main_chan,
+                SPICE_CONTAINEROF(base, MigrateBeginPipeItem, base));
+            break;
+        case SPICE_MSG_MAIN_MULTI_MEDIA_TIME:
+            main_channel_marshall_multi_media_time(main_chan,
+                SPICE_CONTAINEROF(base, MultiMediaTimePipeItem, base));
+            break;
+        case SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST:
+            main_channel_marshall_migrate_switch(main_chan);
+            break;
+    };
+    red_channel_begin_send_message(channel);
+}
+
+static void main_channel_release_pipe_item(RedChannel *channel,
+    PipeItem *base, int item_pushed)
+{
+    free(base);
+}
+
+static int main_channel_handle_parsed(RedChannel *channel, size_t size, uint32_t type, void *message)
 {
-    MainChannel *main_chan = opaque;
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
 
     switch (type) {
     case SPICE_MSGC_MAIN_AGENT_START:
         red_printf("agent start");
         if (!main_chan) {
-            return;
+            return FALSE;
         }
         reds_on_main_agent_start(main_chan);
         break;
@@ -650,28 +751,29 @@ static void main_channel_handle_message(void *opaque, size_t size, uint32_t type
     default:
         red_printf("unexpected type %d", type);
     }
+    return TRUE;
 }
 
-static void main_channel_event(int fd, int event, void *data)
+static void main_channel_on_error(RedChannel *channel)
 {
-    MainChannel *main_chan = data;
+    reds_disconnect();
+}
 
-    if (event & SPICE_WATCH_EVENT_READ) {
-        if (handle_incoming(main_chan->peer, &main_chan->in_handler)) {
-            main_disconnect(main_chan);
-            reds_disconnect();
-        }
-    }
-    if (event & SPICE_WATCH_EVENT_WRITE) {
-        RedsOutgoingData *outgoing = &main_chan->outgoing;
-        if (main_channel_send_data(main_chan)) {
-            main_channel_push(main_chan);
-            if (!outgoing->item && main_chan->peer) {
-                core->watch_update_mask(main_chan->peer->watch,
-                                        SPICE_WATCH_EVENT_READ);
-            }
-        }
-    }
+static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
+{
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
+
+    return main_chan->recv_buf;
+}
+
+static void main_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
+                                               uint8_t *msg)
+{
+}
+
+static int main_channel_config_socket(RedChannel *channel)
+{
+    return TRUE;
 }
 
 static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
@@ -679,41 +781,44 @@ static void main_channel_link(Channel *channel, RedsStreamContext *peer, int mig
                         uint32_t *caps)
 {
     MainChannel *main_chan;
-
-    main_chan = spice_malloc0(sizeof(MainChannel));
+    red_printf("");
+    ASSERT(channel->data == NULL);
+
+    main_chan = (MainChannel*)red_channel_create_parser(
+        sizeof(*main_chan), peer, core, migration, FALSE /* handle_acks */
+        ,main_channel_config_socket
+        ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
+        ,main_channel_handle_parsed
+        ,main_channel_alloc_msg_rcv_buf
+        ,main_channel_release_msg_rcv_buf
+        ,main_channel_send_item
+        ,main_channel_release_pipe_item
+        ,main_channel_on_error
+        ,main_channel_on_error);
+    ASSERT(main_chan);
     channel->data = main_chan;
-    main_chan->peer = peer;
-    main_chan->in_handler.shut = FALSE;
-    main_chan->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
-    main_chan->in_handler.opaque = main_chan;
-    main_chan->in_handler.handle_message = main_channel_handle_message;
-    ring_init(&main_chan->outgoing.pipe);
-    main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
-    peer->watch = core->watch_add(peer->socket,
-                                  SPICE_WATCH_EVENT_READ,
-                                  main_channel_event, main_chan);
 }
 
 int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
 {
     MainChannel *main_chan = channel->data;
 
-    return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
+    return main_chan ? getsockname(main_chan->base.peer->socket, sa, salen) : -1;
 }
 
 int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen)
 {
     MainChannel *main_chan = channel->data;
 
-    return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
+    return main_chan ? getpeername(main_chan->base.peer->socket, sa, salen) : -1;
 }
 
 void main_channel_close(Channel *channel)
 {
     MainChannel *main_chan = channel->data;
 
-    if (main_chan && main_chan->peer) {
-        close(main_chan->peer->socket);
+    if (main_chan && main_chan->base.peer) {
+        close(main_chan->base.peer->socket);
     }
 }
 
@@ -722,9 +827,8 @@ static void main_channel_shutdown(Channel *channel)
     MainChannel *main_chan = channel->data;
 
     if (main_chan != NULL) {
-        main_disconnect(main_chan); // TODO - really here? reset peer etc.
+        main_disconnect(main_chan);
     }
-    free(main_chan);
 }
 
 static void main_channel_migrate()
commit 444b322cabbf0a8cd7220f055133b3b5eaf3042e
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Nov 4 12:01:48 2010 +0200

    server/red_channel: no need for extra loop

diff --git a/server/red_channel.c b/server/red_channel.c
index b6c13d1..40f3a1f 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -172,7 +172,7 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
             }
         } else {
             handler->pos += n;
-            if (!handler->vec_size && handler->pos == handler->size) { // finished writing data
+            if (handler->pos == handler->size) { // finished writing data
                 handler->on_msg_done(handler->opaque);
                 handler->vec = handler->vec_buf;
                 handler->pos = 0;
commit 90c93eb3c15c28238999c76dc051c0055e1cb3d8
Author: Alon Levy <alevy at redhat.com>
Date:   Wed Nov 3 19:01:31 2010 +0200

    server/red_channel: go marshaller for outgoing (copied from red_worker)

diff --git a/server/red_channel.c b/server/red_channel.c
index 3c1aede..b6c13d1 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -143,21 +143,6 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
     }
 }
 
-static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
-{
-    struct iovec *now = vec;
-
-    while ((skip) && (skip >= now->iov_len)) {
-        skip -= now->iov_len;
-        --*vec_size;
-        now++;
-    }
-
-    now->iov_base = (uint8_t *)now->iov_base + skip;
-    now->iov_len -= skip;
-    return now;
-}
-
 static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
 {
     int n;
@@ -167,9 +152,9 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
         if (!handler->size) {  // nothing to be sent
             return;
         }
-        handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
     }
     for (;;) {
+        handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
         if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
             switch (errno) {
             case EAGAIN:
@@ -187,27 +172,17 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
             }
         } else {
             handler->pos += n;
-            handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size);
-            if (!handler->vec_size) {
-                if (handler->pos == handler->size) { // finished writing data
-                    handler->on_msg_done(handler->opaque);
-                    handler->vec = handler->vec_buf;
-                    handler->pos = 0;
-                    handler->size = 0;
-                    return;
-                } else {
-                    // There wasn't enough place for all the outgoing data in one iovec array.
-                    // Filling the rest of the data.
-                    handler->vec = handler->vec_buf;
-                    handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
-                }
+            if (!handler->vec_size && handler->pos == handler->size) { // finished writing data
+                handler->on_msg_done(handler->opaque);
+                handler->vec = handler->vec_buf;
+                handler->pos = 0;
+                handler->size = 0;
+                return;
             }
         }
     }
 }
 
-static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
-
 static void red_channel_peer_on_error(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
@@ -232,13 +207,16 @@ static void red_channel_peer_on_outgoing_error(void *opaque)
 static int red_channel_peer_get_out_msg_size(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
+
     return channel->send_data.size;
 }
 
-static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
+static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
 {
     RedChannel *channel = (RedChannel *)opaque;
-    red_channel_fill_iovec(channel, vec, vec_size);
+
+    *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
+                                            vec, MAX_SEND_VEC, pos);
 }
 
 static void red_channel_peer_on_out_block(void *opaque)
@@ -254,8 +232,6 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
     channel->send_data.size = 0;
-    channel->send_data.n_bufs = 0;
-    channel->send_data.not_sent_buf_head = 0;
     if (channel->send_data.item) {
         channel->release_item(channel, channel->send_data.item, TRUE);
         channel->send_data.item = NULL;
@@ -298,6 +274,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
 
     channel->migrate = migrate;
     ring_init(&channel->pipe);
+    channel->send_data.marshaller = spice_marshaller_new();
 
     channel->incoming.opaque = channel;
     channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
@@ -328,6 +305,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
     return channel;
 
 error:
+    spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
     peer->cb_free(peer);
 
@@ -380,6 +358,7 @@ void red_channel_destroy(RedChannel *channel)
     red_channel_pipe_clear(channel);
     channel->core->watch_remove(channel->peer->watch);
     channel->peer->cb_free(channel->peer);
+    spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
 }
 
@@ -436,50 +415,30 @@ static void red_channel_event(int fd, int event, void *data)
     }
 }
 
-static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
-{
-    int pos = channel->send_data.n_bufs++;
-    ASSERT(pos < MAX_SEND_BUFS);
-    channel->send_data.bufs[pos].size = size;
-    channel->send_data.bufs[pos].data = data;
-}
-
 void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
 {
-    __red_channel_add_buf(channel, data, size);
-    channel->send_data.header.size += size;
+    spice_marshaller_add_ref(channel->send_data.marshaller, data, size);
+    channel->send_data.header->size += size;
 }
 
 void red_channel_reset_send_data(RedChannel *channel)
 {
-    channel->send_data.n_bufs = 0;
-    channel->send_data.header.size = 0;
-    channel->send_data.header.sub_list = 0;
-    ++channel->send_data.header.serial;
-    __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(SpiceDataHeader));
+    spice_marshaller_reset(channel->send_data.marshaller);
+    channel->send_data.header = (SpiceDataHeader *)
+        spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
+    spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
+    channel->send_data.header->type = 0;
+    channel->send_data.header->size = 0;
+    channel->send_data.header->sub_list = 0;
+    channel->send_data.header->serial = ++channel->send_data.serial;
 }
 
 void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
 {
-    channel->send_data.header.type = msg_type;
+    channel->send_data.header->type = msg_type;
     channel->send_data.item = item;
 }
 
-static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
-{
-    BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
-    ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs);
-    *vec_size = 0;
-    do {
-        vec[*vec_size].iov_base = buf->data;
-        vec[*vec_size].iov_len = buf->size;
-        (*vec_size)++;
-        buf++;
-        channel->send_data.not_sent_buf_head++;
-    } while (((*vec_size) < MAX_SEND_VEC) &&
-             (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
-}
-
 static void red_channel_send(RedChannel *channel)
 {
     red_peer_handle_outgoing(channel->peer, &channel->outgoing);
@@ -487,8 +446,11 @@ static void red_channel_send(RedChannel *channel)
 
 void red_channel_begin_send_message(RedChannel *channel)
 {
-    channel->send_data.size = channel->send_data.header.size + sizeof(SpiceDataHeader);
+    spice_marshaller_flush(channel->send_data.marshaller);
+    channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
+    channel->send_data.header->size =  channel->send_data.size - sizeof(SpiceDataHeader);
     channel->ack_data.messages_window++;
+    channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
     red_channel_send(channel);
 }
 
@@ -514,7 +476,12 @@ static void red_channel_push(RedChannel *channel)
 
 uint64_t red_channel_get_message_serial(RedChannel *channel)
 {
-    return channel->send_data.header.serial;
+    return channel->send_data.serial;
+}
+
+void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
+{
+    channel->send_data.serial = serial;
 }
 
 void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
diff --git a/server/red_channel.h b/server/red_channel.h
index 30adfc6..893a7f8 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -61,7 +61,7 @@ typedef struct IncomingHandler {
 } IncomingHandler;
 
 typedef int (*get_outgoing_msg_size_proc)(void *opaque);
-typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
 typedef void (*on_outgoing_error_proc)(void *opaque);
 typedef void (*on_outgoing_block_proc)(void *opaque);
 typedef void (*on_outgoing_msg_done_proc)(void *opaque);
@@ -125,18 +125,16 @@ struct RedChannel {
     uint32_t pipe_size;
 
     struct {
-        SpiceDataHeader header;
+        SpiceMarshaller *marshaller;
+        SpiceDataHeader *header;
         union {
             SpiceMsgSetAck ack;
             SpiceMsgMigrate migrate;
         } u;
-        uint32_t n_bufs;
-        BufDescriptor bufs[MAX_SEND_BUFS];
         uint32_t size;
-        uint32_t not_sent_buf_head;
-
         PipeItem *item;
         int blocked;
+        uint64_t serial;
     } send_data;
 
     OutgoingHandler outgoing;
@@ -200,6 +198,7 @@ void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem
 void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
 
 uint64_t red_channel_get_message_serial(RedChannel *channel);
+void red_channel_set_message_serial(RedChannel *channel, uint64_t);
 
 /* when sending a msg. should first call red_channel_begin_send_message */
 void red_channel_begin_send_message(RedChannel *channel);
commit e7e667f81db7ce7e0a44b1db067c085bc5b40dc7
Author: Alon Levy <alevy at redhat.com>
Date:   Mon Nov 8 12:37:15 2010 +0200

    server/reds: don't remove agent if it's not connected

diff --git a/server/reds.c b/server/reds.c
index 30a1b0c..97e4623 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -2537,7 +2537,7 @@ static void spice_server_char_device_remove_interface(SpiceBaseInstance *sin)
 
     red_printf("remove CHAR_DEVICE %s", char_device->subtype);
     if (strcmp(char_device->subtype, SUBTYPE_VDAGENT) == 0) {
-        if (vdagent) {
+        if (vdagent && reds->agent_state.connected) {
             reds_agent_remove();
         }
     }
commit 26cd666be57e4a751618d54080942caa31a60e80
Author: Alon Levy <alevy at redhat.com>
Date:   Mon Nov 8 10:12:52 2010 +0200

    server/reds: protect reds_update_mouse_mode when main_channel is disconnected

diff --git a/server/reds.c b/server/reds.c
index d79b02c..30a1b0c 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -725,7 +725,10 @@ static void reds_update_mouse_mode()
         reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
         return;
     }
-    main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
+    if (reds->main_channel) {
+        main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode,
+                                     reds->is_client_mouse_allowed);
+    }
 }
 
 static void reds_agent_remove()
commit 42522e0db98590133fd0370ab8a9bc657084e679
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Nov 4 17:26:13 2010 +0200

    server/reds: don't call close on NULL channel on atexit callback

diff --git a/server/reds.c b/server/reds.c
index 7b4782c..d79b02c 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -2202,7 +2202,9 @@ static void reds_init_ssl()
 
 static void reds_exit()
 {
-    main_channel_close(reds->main_channel);
+    if (reds->main_channel) {
+        main_channel_close(reds->main_channel);
+    }
 #ifdef RED_STATISTICS
     shm_unlink(reds->stat_shm_name);
     free(reds->stat_shm_name);
commit cd6c57e33759fea28ae921755403da5327019970
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Nov 4 16:00:32 2010 +0200

    server/reds: fix possible segfault when accessing vdagent from reds_update_mouse_mode after vdagent set to NULL

diff --git a/server/reds.c b/server/reds.c
index d325e20..7b4782c 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -733,6 +733,8 @@ static void reds_agent_remove()
     SpiceCharDeviceInstance *sin = vdagent;
     SpiceCharDeviceInterface *sif;
 
+    ASSERT(reds->agent_state.connected)
+    reds->agent_state.connected = 0;
     vdagent = NULL;
     reds_update_mouse_mode();
 
@@ -740,9 +742,7 @@ static void reds_agent_remove()
         return;
     }
 
-    ASSERT(reds->agent_state.connected)
     sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base);
-    reds->agent_state.connected = 0;
     if (sif->state) {
         sif->state(sin, reds->agent_state.connected);
     }
commit cdfa261dbb617214d371ae136c5bd005a452dabf
Author: Alon Levy <alevy at redhat.com>
Date:   Thu Nov 4 14:21:08 2010 +0200

    server/reds: s/reds_push_migrate_data_item/reds_marshall_migrate_data_item/

diff --git a/server/main_channel.c b/server/main_channel.c
index 7dff2b4..e42f173 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -431,7 +431,7 @@ static void main_channel_push_migrate_data_item(MainChannel *main_chan)
     SpiceMarshaller *m = item->m;
     MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
 
-    reds_push_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
+    reds_marshall_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
     data->serial = main_chan->serial;
     data->ping_id = main_chan->ping_id;
     main_channel_push_pipe_item(main_chan, item);
diff --git a/server/reds.c b/server/reds.c
index d6397e5..d325e20 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -1105,7 +1105,7 @@ typedef struct WriteQueueInfo {
     uint32_t len;
 } WriteQueueInfo;
 
-void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data)
+void reds_marshall_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data)
 {
     VDIPortState *state = &reds->agent_state;
     int buf_index;
diff --git a/server/reds.h b/server/reds.h
index e1a5ab7..b5dec7e 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -97,7 +97,7 @@ void reds_disconnect(void);
 
 // Temporary (?) for splitting main channel
 typedef struct MainMigrateData MainMigrateData;
-void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data);
+void reds_marshall_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data);
 void reds_fill_channels(SpiceMsgChannels *channels_info);
 void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate);
 void reds_mig_release(void);
commit 685f82a48e1c4aea3344d49786b94b1015423047
Author: Alon Levy <alevy at redhat.com>
Date:   Wed Nov 3 01:06:45 2010 +0200

    server: split main_channel from reds

diff --git a/server/Makefile.am b/server/Makefile.am
index ab66ba7..d265bfb 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -114,6 +114,7 @@ libspice_server_la_SOURCES =			\
 	red_parse_qxl.c				\
 	red_parse_qxl.h				\
 	reds.c					\
+	main_channel.c				\
 	inputs_channel.c			\
 	reds.h					\
 	stat.h					\
diff --git a/server/main_channel.c b/server/main_channel.c
new file mode 100644
index 0000000..7dff2b4
--- /dev/null
+++ b/server/main_channel.c
@@ -0,0 +1,745 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+   Copyright (C) 2009 Red Hat, Inc.
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2.1 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <limits.h>
+#include <time.h>
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <ctype.h>
+
+#include "server/red_common.h"
+#include "server/demarshallers.h"
+#include "common/ring.h"
+#include "common/messages.h"
+#include "reds.h"
+#include "main_channel.h"
+#include "generated_marshallers.h"
+
+#define ZERO_BUF_SIZE 4096
+
+// approximate max receive message size for main channel
+#define RECEIVE_BUF_SIZE \
+    (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
+
+#define REDS_MAX_SEND_IOVEC 100
+
+#define NET_TEST_WARMUP_BYTES 0
+#define NET_TEST_BYTES (1024 * 250)
+
+static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
+
+typedef struct RedsOutItem RedsOutItem;
+struct RedsOutItem {
+    RingItem link;
+    SpiceMarshaller *m;
+    SpiceDataHeader *header;
+};
+
+typedef struct RedsOutgoingData {
+    Ring pipe;
+    RedsOutItem *item;
+    int vec_size;
+    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
+    struct iovec *vec;
+} RedsOutgoingData;
+
+// TODO - remove and use red_channel.h
+typedef struct IncomingHandler {
+    spice_parse_channel_func_t parser;
+    void *opaque;
+    int shut;
+    uint8_t buf[RECEIVE_BUF_SIZE];
+    uint32_t end_pos;
+    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
+} IncomingHandler;
+
+typedef struct MainChannel {
+    RedsStreamContext *peer;
+    IncomingHandler in_handler;
+    RedsOutgoingData outgoing;
+    uint64_t serial; //migrate me
+    uint32_t ping_id;
+    uint32_t net_test_id;
+    int net_test_stage;
+} MainChannel;
+
+enum NetTestStage {
+    NET_TEST_STAGE_INVALID,
+    NET_TEST_STAGE_WARMUP,
+    NET_TEST_STAGE_LATENCY,
+    NET_TEST_STAGE_RATE,
+};
+
+static uint64_t latency = 0;
+uint64_t bitrate_per_sec = ~0;
+
+static void main_channel_out_item_free(RedsOutItem *item);
+
+static void main_reset_outgoing(MainChannel *main_chan)
+{
+    RedsOutgoingData *outgoing = &main_chan->outgoing;
+    RingItem *ring_item;
+
+    if (outgoing->item) {
+        main_channel_out_item_free(outgoing->item);
+        outgoing->item = NULL;
+    }
+    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
+        RedsOutItem *out_item = (RedsOutItem *)ring_item;
+        ring_remove(ring_item);
+        main_channel_out_item_free(out_item);
+    }
+    outgoing->vec_size = 0;
+    outgoing->vec = outgoing->vec_buf;
+}
+
+// ALON from reds_disconnect
+static void main_disconnect(MainChannel *main_chan)
+{
+    if (!main_chan || !main_chan->peer) {
+        return;
+    }
+    main_reset_outgoing(main_chan);
+    core->watch_remove(main_chan->peer->watch);
+    main_chan->peer->watch = NULL;
+    main_chan->peer->cb_free(main_chan->peer);
+    main_chan->peer = NULL;
+    main_chan->in_handler.shut = TRUE;
+    main_chan->serial = 0;
+    main_chan->ping_id = 0;
+    main_chan->net_test_id = 0;
+    main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
+    main_chan->in_handler.end_pos = 0;
+
+    // TODO: Should probably reset these on the ping start, not here
+    latency = 0;
+    bitrate_per_sec = ~0;
+}
+
+void main_channel_start_net_test(Channel *channel)
+{
+    MainChannel *main_chan = channel->data;
+
+    if (!main_chan || main_chan->net_test_id) {
+        return;
+    }
+
+    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES) &&
+                            main_channel_push_ping(channel, 0) &&
+                            main_channel_push_ping(channel, NET_TEST_BYTES)) {
+        main_chan->net_test_id = main_chan->ping_id - 2;
+        main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
+    }
+}
+
+static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+{
+    for (;;) {
+        uint8_t *buf = handler->buf;
+        uint32_t pos = handler->end_pos;
+        uint8_t *end = buf + pos;
+        SpiceDataHeader *header;
+        int n;
+        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
+        if (n <= 0) {
+            if (n == 0) {
+                return -1;
+            }
+            switch (errno) {
+            case EAGAIN:
+                return 0;
+            case EINTR:
+                break;
+            case EPIPE:
+                return -1;
+            default:
+                red_printf("%s", strerror(errno));
+                return -1;
+            }
+        } else {
+            pos += n;
+            end = buf + pos;
+            while (buf + sizeof(SpiceDataHeader) <= end &&
+                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
+                uint8_t *data = (uint8_t *)(header+1);
+                size_t parsed_size;
+                uint8_t *parsed;
+                message_destructor_t parsed_free;
+
+                buf += sizeof(SpiceDataHeader) + header->size;
+                parsed = handler->parser(data, data + header->size, header->type,
+                                         SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
+                if (parsed == NULL) {
+                    red_printf("failed to parse message type %d", header->type);
+                    return -1;
+                }
+                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
+                parsed_free(parsed);
+                if (handler->shut) {
+                    return -1;
+                }
+            }
+            memmove(handler->buf, buf, (handler->end_pos = end - buf));
+        }
+    }
+}
+
+static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
+{
+    RedsOutItem *item;
+
+    item = spice_new(RedsOutItem, 1);
+    ring_item_init(&item->link);
+
+    item->m = spice_marshaller_new();
+    item->header = (SpiceDataHeader *)
+        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
+    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
+
+    item->header->serial = ++main_chan->serial;
+    item->header->type = type;
+    item->header->sub_list = 0;
+
+    return item;
+}
+
+static void main_channel_out_item_free(RedsOutItem *item)
+{
+    spice_marshaller_destroy(item->m);
+    free(item);
+}
+
+static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int *vec_size)
+{
+    struct iovec *now = vec;
+
+    while (skip && skip >= now->iov_len) {
+        skip -= now->iov_len;
+        --*vec_size;
+        now++;
+    }
+    now->iov_base = (uint8_t *)now->iov_base + skip;
+    now->iov_len -= skip;
+    return now;
+}
+
+static int main_channel_send_data(MainChannel *main_chan)
+{
+    RedsOutgoingData *outgoing = &main_chan->outgoing;
+    int n;
+
+    if (!outgoing->item) {
+        return TRUE;
+    }
+
+    ASSERT(outgoing->vec_size);
+    for (;;) {
+        if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
+            switch (errno) {
+            case EAGAIN:
+                core->watch_update_mask(main_chan->peer->watch,
+                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
+                return FALSE;
+            case EINTR:
+                break;
+            case EPIPE:
+                reds_disconnect();
+                return FALSE;
+            default:
+                red_printf("%s", strerror(errno));
+                reds_disconnect();
+                return FALSE;
+            }
+        } else {
+            outgoing->vec = main_channel_iovec_skip(outgoing->vec, n, &outgoing->vec_size);
+            if (!outgoing->vec_size) {
+                main_channel_out_item_free(outgoing->item);
+                outgoing->item = NULL;
+                outgoing->vec = outgoing->vec_buf;
+                return TRUE;
+            }
+        }
+    }
+}
+
+static void main_channel_push(MainChannel *main_chan)
+{
+    RedsOutgoingData *outgoing = &main_chan->outgoing;
+    RingItem *ring_item;
+    RedsOutItem *item;
+
+    for (;;) {
+        if (!main_chan->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
+            return;
+        }
+        ring_remove(ring_item);
+        outgoing->item = item = (RedsOutItem *)ring_item;
+
+        spice_marshaller_flush(item->m);
+        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
+
+        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
+                                                         outgoing->vec_buf,
+                                                         REDS_MAX_SEND_IOVEC, 0);
+        main_channel_send_data(main_chan);
+    }
+}
+
+static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem *item)
+{
+    ring_add(&main_chan->outgoing.pipe, &item->link);
+    main_channel_push(main_chan);
+}
+
+static void main_channel_push_channels(MainChannel *main_chan)
+{
+    SpiceMsgChannels* channels_info;
+    RedsOutItem *item;
+
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
+    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds_num_of_channels() * sizeof(SpiceChannelId));
+    reds_fill_channels(channels_info);
+    spice_marshall_msg_main_channels_list(item->m, channels_info);
+    free(channels_info);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+int main_channel_push_ping(Channel *channel, int size)
+{
+    struct timespec time_space;
+    RedsOutItem *item;
+    SpiceMsgPing ping;
+    MainChannel *main_chan = channel->data;
+
+    if (!main_chan) {
+        return FALSE;
+    }
+    item = new_out_item(main_chan, SPICE_MSG_PING);
+    ping.id = ++main_chan->ping_id;
+    clock_gettime(CLOCK_MONOTONIC, &time_space);
+    ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
+    spice_marshall_msg_ping(item->m, &ping);
+
+    while (size > 0) {
+        int now = MIN(ZERO_BUF_SIZE, size);
+        size -= now;
+        spice_marshaller_add_ref(item->m, zero_page, now);
+    }
+
+    main_channel_push_pipe_item(main_chan, item);
+
+    return TRUE;
+}
+
+void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed)
+{
+    SpiceMsgMainMouseMode mouse_mode;
+    RedsOutItem *item;
+    MainChannel *main_chan;
+
+    if (!channel) {
+        return;
+    }
+    main_chan = channel->data;
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
+    mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
+    if (is_client_mouse_allowed) {
+        mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
+    }
+    mouse_mode.current_mode = current_mode;
+
+    spice_marshall_msg_main_mouse_mode(item->m, &mouse_mode);
+
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_agent_connected(Channel *channel)
+{
+    RedsOutItem *item;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_agent_disconnected(Channel *channel)
+{
+    SpiceMsgMainAgentDisconnect disconnect;
+    RedsOutItem *item;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    disconnect.error_code = SPICE_LINK_ERR_OK;
+    spice_marshall_msg_main_agent_disconnected(item->m, &disconnect);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
+{
+    SpiceMsgMainAgentTokens tokens;
+    RedsOutItem *item;
+    MainChannel *main_chan = channel->data;
+
+    if (!main_chan) {
+        return;
+    }
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
+    tokens.num_tokens = num_tokens;
+    spice_marshall_msg_main_agent_token(item->m, &tokens);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
+           spice_marshaller_item_free_func free_data, void *opaque)
+{
+    RedsOutItem *item;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
+    spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+static void main_channel_push_migrate_data_item(MainChannel *main_chan)
+{
+    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
+    SpiceMarshaller *m = item->m;
+    MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
+
+    reds_push_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
+    data->serial = main_chan->serial;
+    data->ping_id = main_chan->ping_id;
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+static void main_channel_receive_migrate_data(MainChannel *main_chan, MainMigrateData *data, uint8_t *end)
+{
+    main_chan->serial = data->serial;
+    main_chan->ping_id = data->ping_id;
+}
+
+void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
+    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+    int ram_hint)
+{
+    RedsOutItem *item;
+    SpiceMsgMainInit init;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
+    init.session_id = connection_id;
+    init.display_channels_hint = display_channels_hint;
+    init.current_mouse_mode = current_mouse_mode;
+    init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
+    if (is_client_mouse_allowed) {
+        init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
+    }
+    init.agent_connected = reds_has_vdagent();
+    init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
+    init.multi_media_time = multi_media_time;
+    init.ram_hint = ram_hint;
+    spice_marshall_msg_main_init(item->m, &init);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len)
+{
+    // TODO possible free-then-use bug - caller frees mess after this, but is that pointer being
+    // used by spice_marshaller?
+    RedsOutItem *item;
+    SpiceMsgNotify notify;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
+    notify.time_stamp = get_time_stamp();
+    notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
+    notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
+    notify.what = SPICE_WARN_GENERAL;
+    notify.message_len = mess_len;
+    spice_marshall_msg_notify(item->m, &notify);
+    spice_marshaller_add(item->m, mess, mess_len + 1);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
+    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key)
+{
+    MainChannel *main_chan = channel->data;
+    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    SpiceMsgMainMigrationBegin migrate;
+
+    migrate.port = port;
+    migrate.sport = sport;
+    migrate.host_size = strlen(host) + 1;
+    migrate.host_data = (uint8_t *)host;
+    migrate.pub_key_type = cert_pub_key_type;
+    migrate.pub_key_size = cert_pub_key_len;
+    migrate.pub_key_data = cert_pub_key;
+    spice_marshall_msg_main_migrate_begin(item->m, &migrate);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate(Channel *channel)
+{
+    RedsOutItem *item;
+    SpiceMsgMigrate migrate;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
+    migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
+    spice_marshall_msg_migrate(item->m, &migrate);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate_cancel(Channel *channel)
+{
+    MainChannel *main_chan = channel->data;
+    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
+
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_multi_media_time(Channel *channel, int time)
+{
+    SpiceMsgMainMultiMediaTime time_mes;
+    RedsOutItem *item;
+    MainChannel *main_chan = channel->data;
+
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
+    time_mes.time = time;
+    spice_marshall_msg_main_multi_media_time(item->m, &time_mes);
+    main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate_switch(Channel *channel)
+{
+    SpiceMsgMainMigrationSwitchHost migrate;
+    RedsOutItem *item;
+    MainChannel *main_chan;
+    
+    if (!channel) {
+        return;
+    }
+    main_chan = channel->data;
+    red_printf("");
+    item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+    reds_fill_mig_switch(&migrate);
+    spice_marshall_msg_main_migrate_switch_host(item->m, &migrate);
+    main_channel_push_pipe_item(main_chan, item);
+    reds_mig_release();
+}
+
+static void main_channel_handle_message(void *opaque, size_t size, uint32_t type, void *message)
+{
+    MainChannel *main_chan = opaque;
+
+    switch (type) {
+    case SPICE_MSGC_MAIN_AGENT_START:
+        red_printf("agent start");
+        if (!main_chan) {
+            return;
+        }
+        reds_on_main_agent_start(main_chan);
+        break;
+    case SPICE_MSGC_MAIN_AGENT_DATA: {
+        reds_on_main_agent_data(message, size);
+        break;
+    }
+    case SPICE_MSGC_MAIN_AGENT_TOKEN:
+        break;
+    case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
+        main_channel_push_channels(main_chan);
+        break;
+    case SPICE_MSGC_MAIN_MIGRATE_CONNECTED:
+        red_printf("connected");
+        reds_on_main_migrate_connected();
+        break;
+    case SPICE_MSGC_MAIN_MIGRATE_CONNECT_ERROR:
+        red_printf("mig connect error");
+        reds_on_main_migrate_connect_error();
+        break;
+    case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST:
+        reds_on_main_mouse_mode_request(message, size);
+        break;
+    case SPICE_MSGC_PONG: {
+        SpiceMsgPing *ping = (SpiceMsgPing *)message;
+        uint64_t roundtrip;
+        struct timespec ts;
+
+        clock_gettime(CLOCK_MONOTONIC, &ts);
+        roundtrip = ts.tv_sec * 1000000LL + ts.tv_nsec / 1000LL - ping->timestamp;
+
+        if (ping->id == main_chan->net_test_id) {
+            switch (main_chan->net_test_stage) {
+            case NET_TEST_STAGE_WARMUP:
+                main_chan->net_test_id++;
+                main_chan->net_test_stage = NET_TEST_STAGE_LATENCY;
+                break;
+            case NET_TEST_STAGE_LATENCY:
+                main_chan->net_test_id++;
+                main_chan->net_test_stage = NET_TEST_STAGE_RATE;
+                latency = roundtrip;
+                break;
+            case NET_TEST_STAGE_RATE:
+                main_chan->net_test_id = 0;
+                if (roundtrip <= latency) {
+                    // probably high load on client or server result with incorrect values
+                    latency = 0;
+                    red_printf("net test: invalid values, latency %lu roundtrip %lu. assuming high"
+                               "bandwidth", latency, roundtrip);
+                    break;
+                }
+                bitrate_per_sec = (uint64_t)(NET_TEST_BYTES * 8) * 1000000 / (roundtrip - latency);
+                red_printf("net test: latency %f ms, bitrate %lu bps (%f Mbps)%s",
+                           (double)latency / 1000,
+                           bitrate_per_sec,
+                           (double)bitrate_per_sec / 1024 / 1024,
+                           IS_LOW_BANDWIDTH() ? " LOW BANDWIDTH" : "");
+                main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
+                break;
+            default:
+                red_printf("invalid net test stage, ping id %d test id %d stage %d",
+                           ping->id,
+                           main_chan->net_test_id,
+                           main_chan->net_test_stage);
+            }
+            break;
+        }
+#ifdef RED_STATISTICS
+        reds_update_stat_value(roundtrip);
+#endif
+        break;
+    }
+    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
+        main_channel_push_migrate_data_item(main_chan);
+        break;
+    case SPICE_MSGC_MIGRATE_DATA: {
+            MainMigrateData *data = (MainMigrateData *)message;
+            uint8_t *end = ((uint8_t *)message) + size;
+            main_channel_receive_migrate_data(main_chan, data, end);
+            reds_on_main_receive_migrate_data(data, end);
+            break;
+        }
+    case SPICE_MSGC_DISCONNECTING:
+        break;
+    default:
+        red_printf("unexpected type %d", type);
+    }
+}
+
+static void main_channel_event(int fd, int event, void *data)
+{
+    MainChannel *main_chan = data;
+
+    if (event & SPICE_WATCH_EVENT_READ) {
+        if (handle_incoming(main_chan->peer, &main_chan->in_handler)) {
+            main_disconnect(main_chan);
+            reds_disconnect();
+        }
+    }
+    if (event & SPICE_WATCH_EVENT_WRITE) {
+        RedsOutgoingData *outgoing = &main_chan->outgoing;
+        if (main_channel_send_data(main_chan)) {
+            main_channel_push(main_chan);
+            if (!outgoing->item && main_chan->peer) {
+                core->watch_update_mask(main_chan->peer->watch,
+                                        SPICE_WATCH_EVENT_READ);
+            }
+        }
+    }
+}
+
+static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+                        int num_common_caps, uint32_t *common_caps, int num_caps,
+                        uint32_t *caps)
+{
+    MainChannel *main_chan;
+
+    main_chan = spice_malloc0(sizeof(MainChannel));
+    channel->data = main_chan;
+    main_chan->peer = peer;
+    main_chan->in_handler.shut = FALSE;
+    main_chan->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
+    main_chan->in_handler.opaque = main_chan;
+    main_chan->in_handler.handle_message = main_channel_handle_message;
+    ring_init(&main_chan->outgoing.pipe);
+    main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
+    peer->watch = core->watch_add(peer->socket,
+                                  SPICE_WATCH_EVENT_READ,
+                                  main_channel_event, main_chan);
+}
+
+int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
+{
+    MainChannel *main_chan = channel->data;
+
+    return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
+}
+
+int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen)
+{
+    MainChannel *main_chan = channel->data;
+
+    return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
+}
+
+void main_channel_close(Channel *channel)
+{
+    MainChannel *main_chan = channel->data;
+
+    if (main_chan && main_chan->peer) {
+        close(main_chan->peer->socket);
+    }
+}
+
+static void main_channel_shutdown(Channel *channel)
+{
+    MainChannel *main_chan = channel->data;
+
+    if (main_chan != NULL) {
+        main_disconnect(main_chan); // TODO - really here? reset peer etc.
+    }
+    free(main_chan);
+}
+
+static void main_channel_migrate()
+{
+}
+
+Channel* main_channel_init(void)
+{
+    Channel *channel;
+
+    channel = spice_new0(Channel, 1);
+    channel->type = SPICE_CHANNEL_MAIN;
+    channel->link = main_channel_link;
+    channel->shutdown = main_channel_shutdown;
+    channel->migrate = main_channel_migrate;
+    return channel;
+}
+
diff --git a/server/main_channel.h b/server/main_channel.h
new file mode 100644
index 0000000..db95dc2
--- /dev/null
+++ b/server/main_channel.h
@@ -0,0 +1,78 @@
+/*
+   Copyright (C) 2009 Red Hat, Inc.
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2.1 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __MAIN_CHANNEL_H__
+#define __MAIN_CHANNEL_H__
+
+#include <stdint.h>
+#include <spice/vd_agent.h>
+#include "common/marshaller.h"
+
+/* This is a temporary measure for reds/main split - should not be in a header,
+ * but private (although only reds.c includes main_channel.h) */
+struct MainMigrateData {
+    uint32_t version;
+    uint32_t serial;
+    uint32_t ping_id;
+
+    uint32_t agent_connected;
+    uint32_t client_agent_started;
+    uint32_t num_client_tokens;
+    uint32_t send_tokens;
+
+    uint32_t read_state;
+    VDIChunkHeader vdi_chunk_header;
+    uint32_t recive_len;
+    uint32_t message_recive_len;
+    uint32_t read_buf_len;
+
+    uint32_t write_queue_size;
+};
+
+Channel *main_channel_init();
+void main_channel_close(Channel *channel); // not destroy, just socket close
+int main_channel_push_ping(Channel *channel, int size);
+void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed);
+void main_channel_push_agent_connected(Channel *channel);
+void main_channel_push_agent_disconnected(Channel *channel);
+void main_channel_push_tokens(Channel *channel, uint32_t num_tokens);
+void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
+           spice_marshaller_item_free_func free_data, void *opaque);
+void main_channel_start_net_test(Channel *channel);
+// TODO: huge. Consider making a reds_* interface for these functions
+// and calling from main.
+void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
+    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+    int ram_hint);
+void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len);
+// TODO: consider exporting RedsMigSpice from reds.c
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
+    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key);
+void main_channel_push_migrate(Channel *channel);
+void main_channel_push_migrate_switch(Channel *channel);
+void main_channel_push_migrate_cancel(Channel *channel);
+void main_channel_push_multi_media_time(Channel *channel, int time);
+int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen);
+int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen);
+
+// TODO: Defines used to calculate receive buffer size, and also by reds.c
+// other options: is to make a reds_main_consts.h, to duplicate defines.
+#define REDS_AGENT_WINDOW_SIZE 10
+#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
+
+#endif
+
diff --git a/server/reds.c b/server/reds.c
index ba6f552..d6397e5 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -46,6 +46,7 @@
 #include <spice/vd_agent.h>
 
 #include "inputs_channel.h"
+#include "main_channel.h"
 #include "red_common.h"
 #include "red_dispatcher.h"
 #include "snd_worker.h"
@@ -74,14 +75,8 @@ static SpiceCharDeviceInstance *vdagent = NULL;
 #define REDS_MIG_ABORT 2
 #define REDS_MIG_DIFF_VERSION 3
 
-#define REDS_AGENT_WINDOW_SIZE 10
 #define REDS_TOKENS_TO_SEND 5
-#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
 #define REDS_VDI_PORT_NUM_RECEIVE_BUFFS 5
-#define REDS_MAX_SEND_IOVEC 100
-
-#define NET_TEST_WARMUP_BYTES 0
-#define NET_TEST_BYTES (1024 * 250)
 
 static int spice_port = -1;
 static int spice_secure_port = -1;
@@ -109,24 +104,6 @@ static void openssl_init();
 #define MM_TIME_DELTA 400 /*ms*/
 #define VDI_PORT_WRITE_RETRY_TIMEOUT 100 /*ms*/
 
-// approximate max receive message size for main channel
-#define RECEIVE_BUF_SIZE \
-    (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
-
-#define SCROLL_LOCK_SCAN_CODE 0x46
-#define NUM_LOCK_SCAN_CODE 0x45
-#define CAPS_LOCK_SCAN_CODE 0x3a
-
-// TODO - remove and use red_channel.h
-typedef struct IncomingHandler {
-    spice_parse_channel_func_t parser;
-    void *opaque;
-    int shut;
-    uint8_t buf[RECEIVE_BUF_SIZE];
-    uint32_t end_pos;
-    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
-} IncomingHandler;
-
 
 typedef struct TicketAuthentication {
     char password[SPICE_MAX_PASSWORD_LENGTH];
@@ -147,13 +124,6 @@ typedef struct MonitorMode {
     uint32_t y_res;
 } MonitorMode;
 
-typedef struct RedsOutItem RedsOutItem;
-struct RedsOutItem {
-    RingItem link;
-    SpiceMarshaller *m;
-    SpiceDataHeader *header;
-};
-
 typedef struct VDIReadBuf {
     RingItem link;
     int len;
@@ -193,21 +163,6 @@ typedef struct VDIPortState {
     int client_agent_started;
 } VDIPortState;
 
-typedef struct RedsOutgoingData {
-    Ring pipe;
-    RedsOutItem *item;
-    int vec_size;
-    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
-    struct iovec *vec;
-} RedsOutgoingData;
-
-enum NetTestStage {
-    NET_TEST_STAGE_INVALID,
-    NET_TEST_STAGE_WARMUP,
-    NET_TEST_STAGE_LATENCY,
-    NET_TEST_STAGE_RATE,
-};
-
 #ifdef RED_STATISTICS
 
 #define REDS_MAX_STAT_NODES 100
@@ -230,12 +185,11 @@ typedef struct RedsState {
     int secure_listen_socket;
     SpiceWatch *listen_watch;
     SpiceWatch *secure_listen_watch;
-    RedsStreamContext *peer;
     int disconnecting;
-    uint32_t link_id;
-    uint64_t serial; //migrate me
     VDIPortState agent_state;
     int pending_mouse_event;
+    uint32_t link_id;
+    Channel *main_channel;
 
     int mig_wait_connect;
     int mig_wait_disconnect;
@@ -243,8 +197,6 @@ typedef struct RedsState {
     int mig_target;
     RedsMigSpice *mig_spice;
     int num_of_channels;
-    IncomingHandler in_handler;
-    RedsOutgoingData outgoing;
     Channel *channels;
     int mouse_mode;
     int is_client_mouse_allowed;
@@ -266,15 +218,9 @@ typedef struct RedsState {
     SpiceTimer *ping_timer;
     int ping_interval;
 #endif
-    uint32_t ping_id;
-    uint32_t net_test_id;
-    int net_test_stage;
     int peer_minor_version;
 } RedsState;
 
-uint64_t bitrate_per_sec = ~0;
-static uint64_t latency = 0;
-
 static RedsState *reds = NULL;
 
 typedef struct AsyncRead {
@@ -336,13 +282,6 @@ struct ChannelSecurityOptions {
     ChannelSecurityOptions *next;
 };
 
-#define ZERO_BUF_SIZE 4096
-
-static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
-
-static void reds_push();
-static void reds_out_item_free(RedsOutItem *item);
-
 static ChannelSecurityOptions *channels_security = NULL;
 static int default_channel_security =
     SPICE_CHANNEL_SECURITY_NONE | SPICE_CHANNEL_SECURITY_SSL;
@@ -481,20 +420,6 @@ static inline void reds_release_link(RedLinkInfo *link)
     peer->cb_free(peer);
 }
 
-static struct iovec *reds_iovec_skip(struct iovec vec[], int skip, int *vec_size)
-{
-    struct iovec *now = vec;
-
-    while (skip && skip >= now->iov_len) {
-        skip -= now->iov_len;
-        --*vec_size;
-        now++;
-    }
-    now->iov_base = (uint8_t *)now->iov_base + skip;
-    now->iov_len -= skip;
-    return now;
-}
-
 #ifdef RED_STATISTICS
 
 void insert_stat_node(StatNodeRef parent, StatNodeRef ref)
@@ -598,8 +523,10 @@ void stat_remove_counter(uint64_t *counter)
     stat_remove((SpiceStatNode *)(counter - offsetof(SpiceStatNode, value)));
 }
 
-static void reds_update_stat_value(RedsStatValue* stat_value, uint32_t value)
+void reds_update_stat_value(uint32_t value)
 {
+    RedsStatValue *stat_value = &reds->roundtrip_stat;
+
     stat_value->value = value;
     stat_value->min = (stat_value->count ? MIN(stat_value->min, value) : value);
     stat_value->max = MAX(stat_value->max, value);
@@ -685,33 +612,20 @@ static void reds_reset_vdp()
     state->client_agent_started = FALSE;
 }
 
-static void reds_reset_outgoing()
+int reds_main_channel_connected()
 {
-    RedsOutgoingData *outgoing = &reds->outgoing;
-    RingItem *ring_item;
-
-    if (outgoing->item) {
-        reds_out_item_free(outgoing->item);
-        outgoing->item = NULL;
-    }
-    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
-        RedsOutItem *out_item = (RedsOutItem *)ring_item;
-        ring_remove(ring_item);
-        reds_out_item_free(out_item);
-    }
-    outgoing->vec_size = 0;
-    outgoing->vec = outgoing->vec_buf;
+    return !!reds->main_channel;
 }
 
 void reds_disconnect()
 {
-    if (!reds->peer || reds->disconnecting) {
+    if (!reds_main_channel_connected() || reds->disconnecting) {
         return;
     }
 
     red_printf("");
     reds->disconnecting = TRUE;
-    reds_reset_outgoing();
+    reds->link_id = 0;
 
     if (reds->agent_state.connected) {
         SpiceCharDeviceInterface *sif;
@@ -724,178 +638,32 @@ void reds_disconnect()
     }
 
     reds_shatdown_channels();
-    core->watch_remove(reds->peer->watch);
-    reds->peer->watch = NULL;
-    reds->peer->cb_free(reds->peer);
-    reds->peer = NULL;
-    reds->in_handler.shut = TRUE;
-    reds->link_id = 0;
-    reds->serial = 0;
-    reds->ping_id = 0;
-    reds->net_test_id = 0;
-    reds->net_test_stage = NET_TEST_STAGE_INVALID;
-    reds->in_handler.end_pos = 0;
-
-    bitrate_per_sec = ~0;
-    latency = 0;
-
+    reds->main_channel->shutdown(reds->main_channel);
+    reds->main_channel = NULL;
     reds_mig_cleanup();
     reds->disconnecting = FALSE;
 }
 
 static void reds_mig_disconnect()
 {
-    if (reds->peer) {
+    if (reds_main_channel_connected()) {
         reds_disconnect();
     } else {
         reds_mig_cleanup();
     }
 }
 
-static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
-{
-    for (;;) {
-        uint8_t *buf = handler->buf;
-        uint32_t pos = handler->end_pos;
-        uint8_t *end = buf + pos;
-        SpiceDataHeader *header;
-        int n;
-        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
-        if (n <= 0) {
-            if (n == 0) {
-                return -1;
-            }
-            switch (errno) {
-            case EAGAIN:
-                return 0;
-            case EINTR:
-                break;
-            case EPIPE:
-                return -1;
-            default:
-                red_printf("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            pos += n;
-            end = buf + pos;
-            while (buf + sizeof(SpiceDataHeader) <= end &&
-                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
-                uint8_t *data = (uint8_t *)(header+1);
-                size_t parsed_size;
-                uint8_t *parsed;
-                message_destructor_t parsed_free;
-
-
-                buf += sizeof(SpiceDataHeader) + header->size;
-                parsed = handler->parser(data, data + header->size, header->type,
-                                         SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
-                if (parsed == NULL) {
-                    red_printf("failed to parse message type %d", header->type);
-                    return -1;
-                }
-                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
-                parsed_free(parsed);
-                if (handler->shut) {
-                    return -1;
-                }
-            }
-            memmove(handler->buf, buf, (handler->end_pos = end - buf));
-        }
-    }
-}
-
-static RedsOutItem *new_out_item(uint32_t type)
-{
-    RedsOutItem *item;
-
-    item = spice_new(RedsOutItem, 1);
-    ring_item_init(&item->link);
-
-    item->m = spice_marshaller_new();
-    item->header = (SpiceDataHeader *)
-        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
-    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
-
-    item->header->serial = ++reds->serial;
-    item->header->type = type;
-    item->header->sub_list = 0;
-
-    return item;
-}
-
-static void reds_out_item_free(RedsOutItem *item)
-{
-    spice_marshaller_destroy(item->m);
-    free(item);
-}
-
-static void reds_push_pipe_item(RedsOutItem *item)
-{
-    ring_add(&reds->outgoing.pipe, &item->link);
-    reds_push();
-}
-
-static void reds_send_channels()
-{
-    SpiceMsgChannels* channels_info;
-    RedsOutItem *item;
-    Channel *channel;
-    int i;
-
-    item = new_out_item(SPICE_MSG_MAIN_CHANNELS_LIST);
-    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds->num_of_channels * sizeof(SpiceChannelId));
-    channels_info->num_of_channels = reds->num_of_channels;
-    channel = reds->channels;
-
-    for (i = 0; i < reds->num_of_channels; i++) {
-        ASSERT(channel);
-        channels_info->channels[i].type = channel->type;
-        channels_info->channels[i].id = channel->id;
-        channel = channel->next;
-    }
-    spice_marshall_msg_main_channels_list(item->m, channels_info);
-    free(channels_info);
-    reds_push_pipe_item(item);
-}
-
-static int send_ping(int size)
-{
-    struct timespec time_space;
-    RedsOutItem *item;
-    SpiceMsgPing ping;
-
-    if (!reds->peer) {
-        return FALSE;
-    }
-    item = new_out_item(SPICE_MSG_PING);
-    ping.id = ++reds->ping_id;
-    clock_gettime(CLOCK_MONOTONIC, &time_space);
-    ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
-    spice_marshall_msg_ping(item->m, &ping);
-
-    while (size > 0) {
-        int now = MIN(ZERO_BUF_SIZE, size);
-        size -= now;
-        spice_marshaller_add_ref(item->m, zero_page, now);
-    }
-
-    reds_push_pipe_item(item);
-
-    return TRUE;
-}
-
 #ifdef RED_STATISTICS
 
 static void do_ping_client(const char *opt, int has_interval, int interval)
 {
-    if (!reds->peer) {
+    if (!reds_main_channel_connected()) {
         red_printf("not connected to peer");
         return;
     }
 
     if (!opt) {
-        send_ping(0);
+        main_channel_push_ping(reds->main_channel, 0);
     } else if (!strcmp(opt, "on")) {
         if (has_interval && interval > 0) {
             reds->ping_interval = interval * 1000;
@@ -910,7 +678,7 @@ static void do_ping_client(const char *opt, int has_interval, int interval)
 
 static void ping_timer_cb()
 {
-    if (!reds->peer) {
+    if (!reds_main_channel_connected()) {
         red_printf("not connected to peer, ping off");
         core->timer_cancel(reds->ping_timer);
         return;
@@ -921,27 +689,6 @@ static void ping_timer_cb()
 
 #endif
 
-static void reds_send_mouse_mode()
-{
-    SpiceMsgMainMouseMode mouse_mode;
-    RedsOutItem *item;
-
-    if (!reds->peer) {
-        return;
-    }
-
-    item = new_out_item(SPICE_MSG_MAIN_MOUSE_MODE);
-    mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
-    if (reds->is_client_mouse_allowed) {
-        mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
-    }
-    mouse_mode.current_mode = reds->mouse_mode;
-
-    spice_marshall_msg_main_mouse_mode(item->m, &mouse_mode);
-
-    reds_push_pipe_item(item);
-}
-
 int reds_get_mouse_mode(void)
 {
     return reds->mouse_mode;
@@ -954,7 +701,7 @@ static void reds_set_mouse_mode(uint32_t mode)
     }
     reds->mouse_mode = mode;
     red_dispatcher_set_mouse_mode(reds->mouse_mode);
-    reds_send_mouse_mode();
+    main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
 }
 
 int reds_get_agent_mouse(void)
@@ -978,26 +725,7 @@ static void reds_update_mouse_mode()
         reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
         return;
     }
-    reds_send_mouse_mode();
-}
-
-static void reds_send_agent_connected()
-{
-    RedsOutItem *item;
-
-    item = new_out_item(SPICE_MSG_MAIN_AGENT_CONNECTED);
-    reds_push_pipe_item(item);
-}
-
-static void reds_send_agent_disconnected()
-{
-    SpiceMsgMainAgentDisconnect disconnect;
-    RedsOutItem *item;
-
-    item = new_out_item(SPICE_MSG_MAIN_AGENT_DISCONNECTED);
-    disconnect.error_code = SPICE_LINK_ERR_OK;
-    spice_marshall_msg_main_agent_disconnected(item->m, &disconnect);
-    reds_push_pipe_item(item);
+    main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
 }
 
 static void reds_agent_remove()
@@ -1008,7 +736,7 @@ static void reds_agent_remove()
     vdagent = NULL;
     reds_update_mouse_mode();
 
-    if (!reds->peer || !sin) {
+    if (!reds_main_channel_connected() || !sin) {
         return;
     }
 
@@ -1024,27 +752,15 @@ static void reds_agent_remove()
     }
 
     reds_reset_vdp();
-    reds_send_agent_disconnected();
+    main_channel_push_agent_disconnected(reds->main_channel);
 }
 
-static void reds_send_tokens()
+static void reds_push_tokens()
 {
-    SpiceMsgMainAgentTokens tokens;
-    RedsOutItem *item;
-
-    if (!reds->peer) {
-        return;
-    }
-
-    item = new_out_item(SPICE_MSG_MAIN_AGENT_TOKEN);
-    tokens.num_tokens = reds->agent_state.num_tokens;
-    reds->agent_state.num_client_tokens += tokens.num_tokens;
+    reds->agent_state.num_client_tokens += reds->agent_state.num_tokens;
     ASSERT(reds->agent_state.num_client_tokens <= REDS_AGENT_WINDOW_SIZE);
+    main_channel_push_tokens(reds->main_channel, reds->agent_state.num_tokens);
     reds->agent_state.num_tokens = 0;
-
-    spice_marshall_msg_main_agent_token(item->m, &tokens);
-
-    reds_push_pipe_item(item);
 }
 
 static int write_to_vdi_port();
@@ -1122,16 +838,12 @@ void vdi_read_buf_release(uint8_t *data, void *opaque)
 static void dispatch_vdi_port_data(int port, VDIReadBuf *buf)
 {
     VDIPortState *state = &reds->agent_state;
-    RedsOutItem *item;
 
     switch (port) {
     case VDP_CLIENT_PORT: {
         if (reds->agent_state.connected) {
-            item = new_out_item(SPICE_MSG_MAIN_AGENT_DATA);
-
-            spice_marshaller_add_ref_full(item->m, buf->data, buf->len,
-                                          vdi_read_buf_release, buf);
-            reds_push_pipe_item(item);
+            main_channel_push_agent_data(reds->main_channel, buf->data, buf->len,
+                                         vdi_read_buf_release, buf);
         } else {
             red_printf("throwing away, no client: %d", buf->len);
             vdi_read_buf_release(buf->data, buf);
@@ -1281,28 +993,110 @@ static void add_token()
     VDIPortState *state = &reds->agent_state;
 
     if (++state->num_tokens == REDS_TOKENS_TO_SEND) {
-        reds_send_tokens();
+        reds_push_tokens();
     }
 }
 
-typedef struct MainMigrateData {
-    uint32_t version;
-    uint32_t serial;
-    uint32_t ping_id;
+int reds_num_of_channels()
+{
+    return reds ? reds->num_of_channels : 0;
+}
 
-    uint32_t agent_connected;
-    uint32_t client_agent_started;
-    uint32_t num_client_tokens;
-    uint32_t send_tokens;
+void reds_fill_channels(SpiceMsgChannels *channels_info)
+{
+    Channel *channel;
+    int i;
 
-    uint32_t read_state;
-    VDIChunkHeader vdi_chunk_header;
-    uint32_t recive_len;
-    uint32_t message_recive_len;
-    uint32_t read_buf_len;
+    channels_info->num_of_channels = reds->num_of_channels;
+    channel = reds->channels;
+    for (i = 0; i < reds->num_of_channels; i++) {
+        ASSERT(channel);
+        channels_info->channels[i].type = channel->type;
+        channels_info->channels[i].id = channel->id;
+        channel = channel->next;
+    }
+}
 
-    uint32_t write_queue_size;
-} MainMigrateData;
+void reds_on_main_agent_start()
+{
+    reds->agent_state.client_agent_started = TRUE;
+}
+
+void reds_on_main_agent_data(void *message, size_t size)
+{
+    RingItem *ring_item;
+    VDAgentExtBuf *buf;
+
+    if (!reds->agent_state.num_client_tokens) {
+        red_printf("token violation");
+        reds_disconnect();
+        return;
+    }
+    --reds->agent_state.num_client_tokens;
+
+    if (!vdagent) {
+        add_token();
+        return;
+    }
+
+    if (!reds->agent_state.client_agent_started) {
+        red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
+        add_token();
+        return;
+    }
+
+    if (size > SPICE_AGENT_MAX_DATA_SIZE) {
+        red_printf("invalid agent message");
+        reds_disconnect();
+        return;
+    }
+
+    if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
+        red_printf("no agent free bufs");
+        reds_disconnect();
+        return;
+    }
+    ring_remove(ring_item);
+    buf = (VDAgentExtBuf *)ring_item;
+    buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
+    buf->base.write_len = size + sizeof(VDIChunkHeader);
+    buf->base.chunk_header.size = size;
+    memcpy(buf->buf, message, size);
+    ring_add(&reds->agent_state.write_queue, ring_item);
+    write_to_vdi_port();
+}
+
+void reds_on_main_migrate_connected()
+{
+    if (reds->mig_wait_connect) {
+        reds_mig_cleanup();
+    }
+}
+
+void reds_on_main_migrate_connect_error()
+{
+    if (reds->mig_wait_connect) {
+        reds_mig_cleanup();
+    }
+}
+
+void reds_on_main_mouse_mode_request(void *message, size_t size)
+{
+    switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
+    case SPICE_MOUSE_MODE_CLIENT:
+        if (reds->is_client_mouse_allowed) {
+            reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
+        } else {
+            red_printf("client mouse is disabled");
+        }
+        break;
+    case SPICE_MOUSE_MODE_SERVER:
+        reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
+        break;
+    default:
+        red_printf("unsupported mouse mode");
+    }
+}
 
 #define MAIN_CHANNEL_MIG_DATA_VERSION 1
 
@@ -1311,20 +1105,13 @@ typedef struct WriteQueueInfo {
     uint32_t len;
 } WriteQueueInfo;
 
-static void main_channel_push_migrate_data_item()
+void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data)
 {
-    RedsOutItem *item;
-    MainMigrateData *data;
     VDIPortState *state = &reds->agent_state;
     int buf_index;
     RingItem *now;
 
-    item = new_out_item(SPICE_MSG_MIGRATE_DATA);
-
-    data = (MainMigrateData *)spice_marshaller_reserve_space(item->m, sizeof(MainMigrateData));
     data->version = MAIN_CHANNEL_MIG_DATA_VERSION;
-    data->serial = reds->serial;
-    data->ping_id = reds->ping_id;
 
     data->agent_connected = !!state->connected;
     data->client_agent_started = state->client_agent_started;
@@ -1340,7 +1127,7 @@ static void main_channel_push_migrate_data_item()
         data->read_buf_len = state->current_read_buf->len;
 
         if (data->read_buf_len - data->recive_len) {
-            spice_marshaller_add_ref(item->m,
+            spice_marshaller_add_ref(m,
                                      state->current_read_buf->data,
                                      data->read_buf_len - data->recive_len);
         }
@@ -1357,7 +1144,7 @@ static void main_channel_push_migrate_data_item()
         WriteQueueInfo *queue_info;
 
         queue_info = (WriteQueueInfo *)
-            spice_marshaller_reserve_space(item->m,
+            spice_marshaller_reserve_space(m,
                                            data->write_queue_size * sizeof(queue_info[0]));
 
         buf_index = 0;
@@ -1366,15 +1153,13 @@ static void main_channel_push_migrate_data_item()
             VDIPortBuf *buf = (VDIPortBuf *)now;
             queue_info[buf_index].port = buf->chunk_header.port;
             queue_info[buf_index++].len = buf->write_len;
-            spice_marshaller_add_ref(item->m, buf->now, buf->write_len);
+            spice_marshaller_add_ref(m, buf->now, buf->write_len);
         }
     }
-
-    reds_push_pipe_item((RedsOutItem *)item);
 }
 
 
-static int main_channel_restore_vdi_read_state(MainMigrateData *data, uint8_t **in_pos,
+static int reds_main_channel_restore_vdi_read_state(MainMigrateData *data, uint8_t **in_pos,
                                                uint8_t *end)
 {
     VDIPortState *state = &reds->agent_state;
@@ -1459,7 +1244,7 @@ static void free_tmp_internal_buf(VDIPortBuf *buf)
     free(buf);
 }
 
-static int main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos, uint8_t *end)
+static int reds_main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos, uint8_t *end)
 {
     VDIPortState *state = &reds->agent_state;
     WriteQueueInfo *inf;
@@ -1530,7 +1315,7 @@ static int main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos,
     return TRUE;
 }
 
-static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end)
+void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
 {
     VDIPortState *state = &reds->agent_state;
     uint8_t *pos;
@@ -1541,9 +1326,6 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
         return;
     }
 
-    reds->serial = data->serial;
-    reds->ping_id = data->ping_id;
-
     state->num_client_tokens = data->num_client_tokens;
     ASSERT(state->num_client_tokens + data->write_queue_size <= REDS_AGENT_WINDOW_SIZE +
                                                                 REDS_NUM_INTERNAL_AGENT_MESSAGES);
@@ -1551,19 +1333,19 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
 
     if (!data->agent_connected) {
         if (state->connected) {
-            reds_send_agent_connected();
+            main_channel_push_agent_connected(reds->main_channel);
         }
         return;
     }
 
     if (!state->connected) {
-        reds_send_agent_disconnected();
+        main_channel_push_agent_disconnected(reds->main_channel);
         return;
     }
 
     if (state->plug_generation > 1) {
-        reds_send_agent_disconnected();
-        reds_send_agent_connected();
+        main_channel_push_agent_disconnected(reds->main_channel);
+        main_channel_push_agent_connected(reds->main_channel);
         return;
     }
 
@@ -1571,245 +1353,15 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
 
     pos = (uint8_t *)(data + 1);
 
-    if (!main_channel_restore_vdi_read_state(data, &pos, end)) {
+    if (!reds_main_channel_restore_vdi_read_state(data, &pos, end)) {
         return;
     }
 
-    main_channel_restore_vdi_wqueue(data, pos, end);
+    reds_main_channel_restore_vdi_wqueue(data, pos, end);
     ASSERT(state->num_client_tokens + state->num_tokens == REDS_AGENT_WINDOW_SIZE);
-}
-
-static void reds_main_handle_message(void *opaque, size_t size, uint32_t type, void *message)
-{
-    switch (type) {
-    case SPICE_MSGC_MAIN_AGENT_START:
-        red_printf("agent start");
-        if (!reds->peer) {
-            return;
-        }
-        reds->agent_state.client_agent_started = TRUE;
-        break;
-    case SPICE_MSGC_MAIN_AGENT_DATA: {
-        RingItem *ring_item;
-        VDAgentExtBuf *buf;
-
-        if (!reds->agent_state.num_client_tokens) {
-            red_printf("token violation");
-            reds_disconnect();
-            break;
-        }
-        --reds->agent_state.num_client_tokens;
-
-        if (!vdagent) {
-            add_token();
-            break;
-        }
-
-        if (!reds->agent_state.client_agent_started) {
-            red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
-            add_token();
-            break;
-        }
-
-        if (size > SPICE_AGENT_MAX_DATA_SIZE) {
-            red_printf("invalid agent message");
-            reds_disconnect();
-            break;
-        }
-
-        if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
-            red_printf("no agent free bufs");
-            reds_disconnect();
-            break;
-        }
-        ring_remove(ring_item);
-        buf = (VDAgentExtBuf *)ring_item;
-        buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
-        buf->base.write_len = size + sizeof(VDIChunkHeader);
-        buf->base.chunk_header.size = size;
-        memcpy(buf->buf, message, size);
-        ring_add(&reds->agent_state.write_queue, ring_item);
-        write_to_vdi_port();
-        break;
-    }
-    case SPICE_MSGC_MAIN_AGENT_TOKEN:
-        break;
-    case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
-        reds_send_channels();
-        break;
-    case SPICE_MSGC_MAIN_MIGRATE_CONNECTED:
-        red_printf("connected");
-        if (reds->mig_wait_connect) {
-            reds_mig_cleanup();
-        }
-        break;
-    case SPICE_MSGC_MAIN_MIGRATE_CONNECT_ERROR:
-        red_printf("mig connect error");
-        if (reds->mig_wait_connect) {
-            reds_mig_cleanup();
-        }
-        break;
-    case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST: {
-        switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
-        case SPICE_MOUSE_MODE_CLIENT:
-            if (reds->is_client_mouse_allowed) {
-                reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
-            } else {
-                red_printf("client mouse is disabled");
-            }
-            break;
-        case SPICE_MOUSE_MODE_SERVER:
-            reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
-            break;
-        default:
-            red_printf("unsupported mouse mode");
-        }
-        break;
-    }
-    case SPICE_MSGC_PONG: {
-        SpiceMsgPing *ping = (SpiceMsgPing *)message;
-        uint64_t roundtrip;
-        struct timespec ts;
-
-        clock_gettime(CLOCK_MONOTONIC, &ts);
-        roundtrip = ts.tv_sec * 1000000LL + ts.tv_nsec / 1000LL - ping->timestamp;
-
-        if (ping->id == reds->net_test_id) {
-            switch (reds->net_test_stage) {
-            case NET_TEST_STAGE_WARMUP:
-                reds->net_test_id++;
-                reds->net_test_stage = NET_TEST_STAGE_LATENCY;
-                break;
-            case NET_TEST_STAGE_LATENCY:
-                reds->net_test_id++;
-                reds->net_test_stage = NET_TEST_STAGE_RATE;
-                latency = roundtrip;
-                break;
-            case NET_TEST_STAGE_RATE:
-                reds->net_test_id = 0;
-                if (roundtrip <= latency) {
-                    // probably high load on client or server result with incorrect values
-                    latency = 0;
-                    red_printf("net test: invalid values, latency %lu roundtrip %lu. assuming high"
-                               "bandwidth", latency, roundtrip);
-                    break;
-                }
-                bitrate_per_sec = (uint64_t)(NET_TEST_BYTES * 8) * 1000000 / (roundtrip - latency);
-                red_printf("net test: latency %f ms, bitrate %lu bps (%f Mbps)%s",
-                           (double)latency / 1000,
-                           bitrate_per_sec,
-                           (double)bitrate_per_sec / 1024 / 1024,
-                           IS_LOW_BANDWIDTH() ? " LOW BANDWIDTH" : "");
-                reds->net_test_stage = NET_TEST_STAGE_INVALID;
-                break;
-            default:
-                red_printf("invalid net test stage, ping id %d test id %d stage %d",
-                           ping->id,
-                           reds->net_test_id,
-                           reds->net_test_stage);
-            }
-            break;
-        }
-#ifdef RED_STATISTICS
-        reds_update_stat_value(&reds->roundtrip_stat, roundtrip);
-#endif
-        break;
-    }
-    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
-        main_channel_push_migrate_data_item();
-        break;
-    case SPICE_MSGC_MIGRATE_DATA:
-        main_channel_recive_migrate_data((MainMigrateData *)message,
-                                         ((uint8_t *)message) + size);
-        reds->mig_target = FALSE;
-        while (write_to_vdi_port() || read_from_vdi_port());
-        break;
-    case SPICE_MSGC_DISCONNECTING:
-        break;
-    default:
-        red_printf("unexpected type %d", type);
-    }
-}
-
-static int reds_send_data()
-{
-    RedsOutgoingData *outgoing = &reds->outgoing;
-    int n;
-
-    if (!outgoing->item) {
-        return TRUE;
-    }
 
-    ASSERT(outgoing->vec_size);
-    for (;;) {
-        if ((n = reds->peer->cb_writev(reds->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
-            switch (errno) {
-            case EAGAIN:
-                core->watch_update_mask(reds->peer->watch,
-                                        SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
-                return FALSE;
-            case EINTR:
-                break;
-            case EPIPE:
-                reds_disconnect();
-                return FALSE;
-            default:
-                red_printf("%s", strerror(errno));
-                reds_disconnect();
-                return FALSE;
-            }
-        } else {
-            outgoing->vec = reds_iovec_skip(outgoing->vec, n, &outgoing->vec_size);
-            if (!outgoing->vec_size) {
-                reds_out_item_free(outgoing->item);
-                outgoing->item = NULL;
-                outgoing->vec = outgoing->vec_buf;
-                return TRUE;
-            }
-        }
-    }
-}
-
-static void reds_push()
-{
-    RedsOutgoingData *outgoing = &reds->outgoing;
-    RingItem *ring_item;
-    RedsOutItem *item;
-
-    for (;;) {
-        if (!reds->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
-            return;
-        }
-        ring_remove(ring_item);
-        outgoing->item = item = (RedsOutItem *)ring_item;
-
-        spice_marshaller_flush(item->m);
-        item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
-
-        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
-                                                         outgoing->vec_buf,
-                                                         REDS_MAX_SEND_IOVEC, 0);
-        reds_send_data();
-    }
-}
-
-static void reds_main_event(int fd, int event, void *data)
-{
-    if (event & SPICE_WATCH_EVENT_READ) {
-        if (handle_incoming(reds->peer, &reds->in_handler)) {
-            reds_disconnect();
-        }
-    }
-    if (event & SPICE_WATCH_EVENT_WRITE) {
-        RedsOutgoingData *outgoing = &reds->outgoing;
-        if (reds_send_data()) {
-            reds_push();
-            if (!outgoing->item && reds->peer) {
-                core->watch_update_mask(reds->peer->watch,
-                                        SPICE_WATCH_EVENT_READ);
-            }
-        }
-    }
+    reds->mig_target = FALSE;
+    while (write_to_vdi_port() || read_from_vdi_port());
 }
 
 static int sync_write(RedsStreamContext *peer, void *in_buf, size_t n)
@@ -1920,40 +1472,33 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
     sync_write(link->peer, &error, sizeof(error));
 }
 
-static void reds_start_net_test()
-{
-    if (!reds->peer || reds->net_test_id) {
-        return;
-    }
-
-    if (send_ping(NET_TEST_WARMUP_BYTES) && send_ping(0) && send_ping(NET_TEST_BYTES)) {
-        reds->net_test_id = reds->ping_id - 2;
-        reds->net_test_stage = NET_TEST_STAGE_WARMUP;
-    }
-}
-
+// TODO: now that main is a separate channel this should
+// actually be joined with reds_handle_other_links, ebcome reds_handle_link
 static void reds_handle_main_link(RedLinkInfo *link)
 {
+    RedsStreamContext *peer;
+    SpiceLinkMess *link_mess;
+    uint32_t *caps;
     uint32_t connection_id;
 
     red_printf("");
-
+    link_mess = link->link_mess;
     reds_disconnect();
 
-    if (!link->link_mess->connection_id) {
+    if (!link_mess->connection_id) {
         reds_send_link_result(link, SPICE_LINK_ERR_OK);
         while((connection_id = rand()) == 0);
         reds->agent_state.num_tokens = 0;
         memcpy(&(reds->taTicket), &taTicket, sizeof(reds->taTicket));
         reds->mig_target = FALSE;
     } else {
-        if (link->link_mess->connection_id != reds->link_id) {
+        if (link_mess->connection_id != reds->link_id) {
             reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
             reds_release_link(link);
             return;
         }
         reds_send_link_result(link, SPICE_LINK_ERR_OK);
-        connection_id = link->link_mess->connection_id;
+        connection_id = link_mess->connection_id;
         reds->mig_target = TRUE;
     }
 
@@ -1961,11 +1506,18 @@ static void reds_handle_main_link(RedLinkInfo *link)
     reds->mig_inprogress = FALSE;
     reds->mig_wait_connect = FALSE;
     reds->mig_wait_disconnect = FALSE;
-    reds->peer = link->peer;
-    reds->in_handler.shut = FALSE;
 
     reds_show_new_channel(link, connection_id);
+    peer = link->peer;
+    link->link_mess = NULL;
     __reds_release_link(link);
+    caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
+    reds->main_channel = main_channel_init();
+    reds->main_channel->link(reds->main_channel, peer, reds->mig_target, link_mess->num_common_caps,
+                  link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
+                  link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
+    free(link_mess);
+
     if (vdagent) {
         SpiceCharDeviceInterface *sif;
         sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
@@ -1975,32 +1527,15 @@ static void reds_handle_main_link(RedLinkInfo *link)
         }
         reds->agent_state.plug_generation++;
     }
-    reds->peer->watch = core->watch_add(reds->peer->socket,
-                                        SPICE_WATCH_EVENT_READ,
-                                        reds_main_event, NULL);
 
     if (!reds->mig_target) {
-        RedsOutItem *item;
-        SpiceMsgMainInit init;
-
-        item = new_out_item(SPICE_MSG_MAIN_INIT);
-        init.session_id = connection_id;
-        init.display_channels_hint = red_dispatcher_count();
-        init.current_mouse_mode = reds->mouse_mode;
-        init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
-        if (reds->is_client_mouse_allowed) {
-            init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
-        }
-        init.agent_connected = !!vdagent;
-        init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
         reds->agent_state.num_client_tokens = REDS_AGENT_WINDOW_SIZE;
-        init.multi_media_time = reds_get_mm_time() - MM_TIME_DELTA;
-        init.ram_hint = red_dispatcher_qxl_ram_size();
-
-        spice_marshall_msg_main_init(item->m, &init);
+        main_channel_push_init(reds->main_channel, connection_id, red_dispatcher_count(),
+            reds->mouse_mode, reds->is_client_mouse_allowed,
+            reds_get_mm_time() - MM_TIME_DELTA,
+            red_dispatcher_qxl_ram_size());
 
-        reds_push_pipe_item(item);
-        reds_start_net_test();
+        main_channel_start_net_test(reds->main_channel);
         /* Now that we have a client, forward any pending agent data */
         while (read_from_vdi_port());
     }
@@ -2064,23 +1599,9 @@ static void reds_handle_other_links(RedLinkInfo *link)
     reds_send_link_result(link, SPICE_LINK_ERR_OK);
     reds_show_new_channel(link, reds->link_id);
     if (link_mess->channel_type == SPICE_CHANNEL_INPUTS && !link->peer->ssl) {
-        RedsOutItem *item;
-        SpiceMsgNotify notify;
         char *mess = "keyboard channel is insecure";
         const int mess_len = strlen(mess);
-
-        item = new_out_item(SPICE_MSG_NOTIFY);
-
-        notify.time_stamp = get_time_stamp();
-        notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
-        notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
-        notify.what = SPICE_WARN_GENERAL;
-        notify.message_len = mess_len;
-
-        spice_marshall_msg_notify(item->m, &notify);
-        spice_marshaller_add(item->m, (uint8_t *)mess, mess_len + 1);
-
-        reds_push_pipe_item(item);
+        main_channel_push_notify(reds->main_channel, (uint8_t*)mess, mess_len);
     }
     peer = link->peer;
     link->link_mess = NULL;
@@ -2681,9 +2202,7 @@ static void reds_init_ssl()
 
 static void reds_exit()
 {
-    if (reds->peer) {
-        close(reds->peer->socket);
-    }
+    main_channel_close(reds->main_channel);
 #ifdef RED_STATISTICS
     shm_unlink(reds->stat_shm_name);
     free(reds->stat_shm_name);
@@ -2725,7 +2244,7 @@ enum {
 
 static inline void on_activating_ticketing()
 {
-    if (!ticketing_enabled && reds->peer) {
+    if (!ticketing_enabled && reds_main_channel_connected()) {
         red_printf("disconnecting");
         reds_disconnect();
     }
@@ -2778,7 +2297,7 @@ typedef struct RedsMigCertPubKeyInfo {
     uint32_t len;
 } RedsMigCertPubKeyInfo;
 
-static void reds_mig_release(void)
+void reds_mig_release(void)
 {
     if (reds->mig_spice) {
         free(reds->mig_spice->cert_subject);
@@ -2791,22 +2310,10 @@ static void reds_mig_release(void)
 static void reds_mig_continue(void)
 {
     RedsMigSpice *s = reds->mig_spice;
-    SpiceMsgMainMigrationBegin migrate;
-    RedsOutItem *item;
 
     red_printf("");
-    item = new_out_item(SPICE_MSG_MAIN_MIGRATE_BEGIN);
-
-    migrate.port = s->port;
-    migrate.sport = s->sport;
-    migrate.host_size = strlen(s->host) + 1;
-    migrate.host_data = (uint8_t *)s->host;
-    migrate.pub_key_type = s->cert_pub_key_type;
-    migrate.pub_key_size = s->cert_pub_key_len;
-    migrate.pub_key_data = s->cert_pub_key;
-    spice_marshall_msg_main_migrate_begin(item->m, &migrate);
-
-    reds_push_pipe_item(item);
+    main_channel_push_migrate_begin(reds->main_channel, s->port, s->sport,
+        s->host, s->cert_pub_key_type, s->cert_pub_key_len, s->cert_pub_key);
 
     reds_mig_release();
 
@@ -2828,7 +2335,7 @@ static void reds_mig_started(void)
         core->watch_update_mask(reds->secure_listen_watch, 0);
     }
 
-    if (reds->peer == NULL) {
+    if (!reds_main_channel_connected()) {
         red_printf("not connected to peer");
         goto error;
     }
@@ -2849,8 +2356,6 @@ error:
 
 static void reds_mig_finished(int completed)
 {
-    RedsOutItem *item;
-
     red_printf("");
     if (reds->listen_watch != NULL) {
         core->watch_update_mask(reds->listen_watch, SPICE_WATCH_EVENT_READ);
@@ -2860,7 +2365,7 @@ static void reds_mig_finished(int completed)
         core->watch_update_mask(reds->secure_listen_watch, SPICE_WATCH_EVENT_READ);
     }
 
-    if (reds->peer == NULL) {
+    if (!reds_main_channel_connected()) {
         red_printf("no peer connected");
         return;
     }
@@ -2868,53 +2373,45 @@ static void reds_mig_finished(int completed)
 
     if (completed) {
         Channel *channel;
-        SpiceMsgMigrate migrate;
 
         reds->mig_wait_disconnect = TRUE;
         core->timer_start(reds->mig_timer, MIGRATE_TIMEOUT);
 
-        item = new_out_item(SPICE_MSG_MIGRATE);
-        migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
-        spice_marshall_msg_migrate(item->m, &migrate);
-
-        reds_push_pipe_item(item);
+        // TODO: so now that main channel is separate, how exactly does migration of it work?
+        //  - it can have an empty migrate - that seems ok
+        //  - I can try to fill it's migrate, then move stuff from reds.c there, but a lot of data
+        //    is in reds state right now.
+        main_channel_push_migrate(reds->main_channel);
         channel = reds->channels;
         while (channel) {
             channel->migrate(channel);
             channel = channel->next;
         }
     } else {
-        item = new_out_item(SPICE_MSG_MAIN_MIGRATE_CANCEL);
-        reds_push_pipe_item(item);
+        main_channel_push_migrate_cancel(reds->main_channel);
         reds_mig_cleanup();
     }
 }
 
-static void reds_mig_switch(void)
+void reds_mig_switch(void)
 {
-    RedsMigSpice *s = reds->mig_spice;
-    SpiceMsgMainMigrationSwitchHost migrate;
-    RedsOutItem *item;
-
-    red_printf("");
-    item = new_out_item(SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+    main_channel_push_migrate_switch(reds->main_channel);
+}
 
-    migrate.port = s->port;
-    migrate.sport = s->sport;
-    migrate.host_size = strlen(s->host) + 1;
-    migrate.host_data = (uint8_t *)s->host;
+void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate)
+{
+    RedsMigSpice *s = reds->mig_spice;
+    migrate->port = s->port;
+    migrate->sport = s->sport;
+    migrate->host_size = strlen(s->host) + 1;
+    migrate->host_data = (uint8_t *)s->host;
     if (s->cert_subject) {
-        migrate.cert_subject_size = strlen(s->cert_subject) + 1;
-        migrate.cert_subject_data = (uint8_t *)s->cert_subject;
+        migrate->cert_subject_size = strlen(s->cert_subject) + 1;
+        migrate->cert_subject_data = (uint8_t *)s->cert_subject;
     } else {
-        migrate.cert_subject_size = 0;
-        migrate.cert_subject_data = NULL;
+        migrate->cert_subject_size = 0;
+        migrate->cert_subject_data = NULL;
     }
-    spice_marshall_msg_main_migrate_switch_host(item->m, &migrate);
-
-    reds_push_pipe_item(item);
-
-    reds_mig_release();
 }
 
 static void migrate_timout(void *opaque)
@@ -2938,18 +2435,11 @@ void reds_update_mm_timer(uint32_t mm_time)
 
 void reds_enable_mm_timer()
 {
-    SpiceMsgMainMultiMediaTime time_mes;
-    RedsOutItem *item;
-
     core->timer_start(reds->mm_timer, MM_TIMER_GRANULARITY_MS);
-    if (!reds->peer) {
+    if (!reds_main_channel_connected()) {
         return;
     }
-
-    item = new_out_item(SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
-    time_mes.time = reds_get_mm_time() - MM_TIME_DELTA;
-    spice_marshall_msg_main_multi_media_time(item->m, &time_mes);
-    reds_push_pipe_item(item);
+    main_channel_push_multi_media_time(reds->main_channel, reds_get_mm_time() - MM_TIME_DELTA);
 }
 
 void reds_desable_mm_timer()
@@ -2970,7 +2460,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
 
     vdagent = sin;
     reds_update_mouse_mode();
-    if (!reds->peer) {
+    if (!reds_main_channel_connected()) {
         return;
     }
     sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
@@ -2984,7 +2474,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
         return;
     }
 
-    reds_send_agent_connected();
+    main_channel_push_agent_connected(reds->main_channel);
 }
 
 __visible__ void spice_server_char_device_wakeup(SpiceCharDeviceInstance* sin)
@@ -3260,12 +2750,6 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
     core = core_interface;
     reds->listen_socket = -1;
     reds->secure_listen_socket = -1;
-    reds->peer = NULL;
-    reds->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
-    reds->in_handler.handle_message = reds_main_handle_message;
-    ring_init(&reds->outgoing.pipe);
-    reds->outgoing.vec = reds->outgoing.vec_buf;
-
     init_vd_agent_resources();
 
     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
@@ -3317,6 +2801,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
     if (reds->secure_listen_socket != -1) {
         reds_init_ssl();
     }
+    reds->main_channel = NULL;
     inputs_init();
 
 #ifdef USE_SMARTCARD
@@ -3416,7 +2901,7 @@ __visible__ int spice_server_set_ticket(SpiceServer *s,
 {
     ASSERT(reds == s);
 
-    if (reds->peer) {
+    if (reds_main_channel_connected()) {
         if (fail_if_connected) {
             return -1;
         }
@@ -3549,10 +3034,7 @@ __visible__ int spice_server_set_channel_security(SpiceServer *s, const char *ch
 __visible__ int spice_server_get_sock_info(SpiceServer *s, struct sockaddr *sa, socklen_t *salen)
 {
     ASSERT(reds == s);
-    if (!reds->peer) {
-        return -1;
-    }
-    if (getsockname(reds->peer->socket, sa, salen) < 0) {
+    if (main_channel_getsockname(reds->main_channel, sa, salen) < 0) {
         return -1;
     }
     return 0;
@@ -3561,10 +3043,7 @@ __visible__ int spice_server_get_sock_info(SpiceServer *s, struct sockaddr *sa,
 __visible__ int spice_server_get_peer_info(SpiceServer *s, struct sockaddr *sa, socklen_t *salen)
 {
     ASSERT(reds == s);
-    if (!reds->peer) {
-        return -1;
-    }
-    if (getpeername(reds->peer->socket, sa, salen) < 0) {
+    if (main_channel_getpeername(reds->main_channel, sa, salen) < 0) {
         return -1;
     }
     return 0;
@@ -3658,7 +3137,7 @@ __visible__ int spice_server_migrate_client_state(SpiceServer *s)
 {
     ASSERT(reds == s);
 
-    if (!reds->peer) {
+    if (!reds_main_channel_connected()) {
         return SPICE_MIGRATE_CLIENT_NONE;
     } else if (reds->mig_wait_connect) {
         return SPICE_MIGRATE_CLIENT_WAITING;
diff --git a/server/reds.h b/server/reds.h
index e440804..e1a5ab7 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -22,6 +22,9 @@
 #include <openssl/ssl.h>
 #include <sys/uio.h>
 #include <spice/vd_agent.h>
+#include "common/marshaller.h"
+#include "common/messages.h"
+#include "spice.h"
 
 #define __visible__ __attribute__ ((visibility ("default")))
 
@@ -92,5 +95,24 @@ extern uint64_t bitrate_per_sec;
 // Temporary measures to make splitting reds.c to inputs_channel.c easier
 void reds_disconnect(void);
 
+// Temporary (?) for splitting main channel
+typedef struct MainMigrateData MainMigrateData;
+void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data);
+void reds_fill_channels(SpiceMsgChannels *channels_info);
+void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate);
+void reds_mig_release(void);
+int reds_num_of_channels(void);
+#ifdef RED_STATISTICS
+void reds_update_stat_value(uint32_t value);
+#endif
+
+// callbacks from main channel messages
+void reds_on_main_agent_start();
+void reds_on_main_agent_data(void *message, size_t size);
+void reds_on_main_migrate_connected();
+void reds_on_main_migrate_connect_error();
+void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end);
+void reds_on_main_mouse_mode_request(void *message, size_t size);
+
 #endif
 


More information about the Spice-commits mailing list