[Spice-devel] [PATCH 3/4] server/red_channel (all): introduce RedChannelClient
Alon Levy
alevy at redhat.com
Tue Apr 26 03:31:56 PDT 2011
On Tue, Mar 29, 2011 at 07:33:09PM +0200, Marc-André Lureau wrote:
> Hi
>
> Overall, the patch is quite straightforward. However, it is quite
> large and thus touches parts that I am not familiar with. I have done
> various tests and valgrind was happy.
>
> A few minor nitpicks:
>
> - the indentation is a bit weird, for ex in
> red_channel_client_release_item(), red_channel_add_client(),
>
fixed (issue was introduced tabs from using an editor I didn't configure
to use spaces).
> - red_channel_apply_clients vs red_channel_apply_clients_data:
> Why two of them, just one with with data argument should be enough.
> Traditionally, accepting a vistor method is called accept(), not
> apply(), but I don't think the analogy with a visitor is worth here,
> we don't have a deep traversal of various objects, rather just a
> foreach()
>
well, apply is kinda taken from python/functional languages speak, and
the two function split was convenient for the usage, so I think it's
worth the introduction of another function in the interface, avoids
adding an unused parameter when it isn't required.
> - red_channel_receive() should probably guard against calling
> red_channel_client_receive() with a NULL rcc, (but that condition does
> not seem possible)
when the channel starts having a ring and not a single rcc this becomes
correct.
>
> - I would argue that red_channel_client_create() should be done
> earlier, in one place, such as reds_handle_link(), perhaps even before
> (link would take a RCC instead of a Stream)
ok, but I think this is an issue for a separate patch later.
>
> - there is a number of TODO added that I hope will get fixed in the
> following patches, some look scary too many, the pipe sharing and the
> migration handling for instance :)
as you will see shortly it is actually not quite there yet, I did manage
to remove a few, but there are still plenty.
The pipe sharing is not relevant - there are multiple independant pipes
(in a previous attempt I wanted to have a single pipe with multiple
consumers, but I got convinced it is not the correct approach).
>
> regards
>
> On Sun, Mar 27, 2011 at 9:03 PM, Alon Levy <alevy at redhat.com> wrote:
> > This commit adds a RedChannelClient that now owns the stream connection,
> > but still doesn't own the pipe. There is only a single RCC per RC
> > right now (and RC still means RedChannel, RedClient will be introduced
> > later). All internal api changes are in server/red_channel.h, hence
> > the need to update all channels. red_worker.c is affected the most because
> > it makes use of direct access to some of RedChannel still.
> >
> > API changes:
> >
> > 1. red_channel_client_create added.
> > rec_channel_create -> (red_channel_create, red_channel_client_create)
> > 2. two way connection: rcc->channel, channel->rcc (later channel will
> > hold a list, and there will be a RedClient to hold the list of channels
> > per client)
> > 3. seperation of channel disconnect and channel_client_disconnect
> > ---
> > server/inputs_channel.c | 39 ++-
> > server/main_channel.c | 82 +++---
> > server/red_channel.c | 617 ++++++++++++++++++++++-------------
> > server/red_channel.h | 106 ++++---
> > server/red_client_shared_cache.h | 17 +-
> > server/red_tunnel_worker.c | 214 +++++++-----
> > server/red_worker.c | 678 +++++++++++++++++++-------------------
> > server/smartcard.c | 150 +++++-----
> > 8 files changed, 1076 insertions(+), 827 deletions(-)
> >
> > diff --git a/server/inputs_channel.c b/server/inputs_channel.c
> > index 213f8a0..678eff3 100644
> > --- a/server/inputs_channel.c
> > +++ b/server/inputs_channel.c
> > @@ -161,9 +161,9 @@ const VDAgentMouseState *inputs_get_mouse_state(void)
> > return &g_inputs_channel->mouse_state;
> > }
> >
> > -static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> > +static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
> > {
> > - InputsChannel *inputs_channel = SPICE_CONTAINEROF(channel, InputsChannel, base);
> > + InputsChannel *inputs_channel = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
> >
> > if (msg_header->size > RECEIVE_BUF_SIZE) {
> > red_printf("error: too large incoming message");
> > @@ -172,7 +172,7 @@ static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataH
> > return inputs_channel->recv_buf;
> > }
> >
> > -static void inputs_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> > +static void inputs_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
> > uint8_t *msg)
> > {
> > }
> > @@ -246,17 +246,17 @@ static void inputs_pipe_add_type(InputsChannel *channel, int type)
> > red_channel_pipe_add_push(&channel->base, &pipe_item->base);
> > }
> >
> > -static void inputs_channel_release_pipe_item(RedChannel *channel,
> > +static void inputs_channel_release_pipe_item(RedChannelClient *rcc,
> > PipeItem *base, int item_pushed)
> > {
> > free(base);
> > }
> >
> > -static void inputs_channel_send_item(RedChannel *channel, PipeItem *base)
> > +static void inputs_channel_send_item(RedChannelClient *rcc, PipeItem *base)
> > {
> > - SpiceMarshaller *m = red_channel_get_marshaller(channel);
> > + SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
> >
> > - red_channel_init_send_data(channel, base->type, base);
> > + red_channel_client_init_send_data(rcc, base->type, base);
> > switch (base->type) {
> > case PIPE_ITEM_KEY_MODIFIERS:
> > {
> > @@ -285,12 +285,12 @@ static void inputs_channel_send_item(RedChannel *channel, PipeItem *base)
> > default:
> > break;
> > }
> > - red_channel_begin_send_message(channel);
> > + red_channel_client_begin_send_message(rcc);
> > }
> >
> > -static int inputs_channel_handle_parsed(RedChannel *channel, uint32_t size, uint16_t type, void *message)
> > +static int inputs_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message)
> > {
> > - InputsChannel *inputs_channel = (InputsChannel *)channel;
> > + InputsChannel *inputs_channel = (InputsChannel *)rcc->channel;
> > uint8_t *buf = (uint8_t *)message;
> >
> > ASSERT(g_inputs_channel == inputs_channel);
> > @@ -443,10 +443,10 @@ static void inputs_relase_keys(void)
> > kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
> > }
> >
> > -static void inputs_channel_on_error(RedChannel *channel)
> > +static void inputs_channel_on_error(RedChannelClient *rcc)
> > {
> > inputs_relase_keys();
> > - reds_disconnect();
> > + red_channel_client_destroy(rcc);
> > }
> >
> > static void inputs_shutdown(Channel *channel)
> > @@ -482,11 +482,11 @@ static void inputs_pipe_add_init(InputsChannel *inputs_channel)
> > red_channel_pipe_add_push(&inputs_channel->base, &item->base);
> > }
> >
> > -static int inputs_channel_config_socket(RedChannel *channel)
> > +static int inputs_channel_config_socket(RedChannelClient *rcc)
> > {
> > int flags;
> > int delay_val = 1;
> > - RedsStream *stream = red_channel_get_stream(channel);
> > + RedsStream *stream = red_channel_client_get_stream(rcc);
> >
> > if (setsockopt(stream->socket, IPPROTO_TCP, TCP_NODELAY,
> > &delay_val, sizeof(delay_val)) == -1) {
> > @@ -502,7 +502,7 @@ static int inputs_channel_config_socket(RedChannel *channel)
> > return TRUE;
> > }
> >
> > -static void inputs_channel_hold_pipe_item(RedChannel *channel, PipeItem *item)
> > +static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
> > {
> > }
> >
> > @@ -511,11 +511,13 @@ static void inputs_link(Channel *channel, RedsStream *stream, int migration,
> > uint32_t *caps)
> > {
> > InputsChannel *inputs_channel;
> > - red_printf("");
> > + RedChannelClient *rcc;
> > +
> > ASSERT(channel->data == NULL);
> >
> > + red_printf("input channel create");
> > g_inputs_channel = inputs_channel = (InputsChannel*)red_channel_create_parser(
> > - sizeof(*inputs_channel), stream, core, migration, FALSE /* handle_acks */
> > + sizeof(*inputs_channel), core, migration, FALSE /* handle_acks */
> > ,inputs_channel_config_socket
> > ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
> > ,inputs_channel_handle_parsed
> > @@ -530,6 +532,9 @@ static void inputs_link(Channel *channel, RedsStream *stream, int migration,
> > ,NULL
> > ,NULL);
> > ASSERT(inputs_channel);
> > + red_printf("input channel client create");
> > + rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base, stream);
> > + ASSERT(rcc);
> > channel->data = inputs_channel;
> > inputs_pipe_add_init(inputs_channel);
> > }
> > diff --git a/server/main_channel.c b/server/main_channel.c
> > index 70255d7..c849ef0 100644
> > --- a/server/main_channel.c
> > +++ b/server/main_channel.c
> > @@ -418,7 +418,8 @@ static void main_channel_marshall_migrate_data_item(SpiceMarshaller *m, int seri
> > data->ping_id = ping_id;
> > }
> >
> > -static uint64_t main_channel_handle_migrate_data_get_serial_proc(RedChannel *base,
> > +static uint64_t main_channel_handle_migrate_data_get_serial_proc(
> > + RedChannelClient *rcc,
> > uint32_t size, void *message)
> > {
> > MainMigrateData *data = message;
> > @@ -430,10 +431,10 @@ static uint64_t main_channel_handle_migrate_data_get_serial_proc(RedChannel *bas
> > return data->serial;
> > }
> >
> > -static uint64_t main_channel_handle_migrate_data(RedChannel *base,
> > +static uint64_t main_channel_handle_migrate_data(RedChannelClient *rcc,
> > uint32_t size, void *message)
> > {
> > - MainChannel *main_chan = SPICE_CONTAINEROF(base, MainChannel, base);
> > + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
> > MainMigrateData *data = message;
> >
> > if (size < sizeof(*data)) {
> > @@ -597,12 +598,12 @@ static void main_channel_marshall_multi_media_time(SpiceMarshaller *m,
> > spice_marshall_msg_main_multi_media_time(m, &time_mes);
> > }
> >
> > -static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> > +static void main_channel_send_item(RedChannelClient *rcc, PipeItem *base)
> > {
> > - SpiceMarshaller *m = red_channel_get_marshaller(channel);
> > - MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> > + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
> > + SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
> >
> > - red_channel_init_send_data(channel, base->type, base);
> > + red_channel_client_init_send_data(rcc, base->type, base);
> > switch (base->type) {
> > case SPICE_MSG_MAIN_CHANNELS_LIST:
> > main_channel_marshall_channels(m);
> > @@ -632,7 +633,7 @@ static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> > break;
> > case SPICE_MSG_MIGRATE_DATA:
> > main_channel_marshall_migrate_data_item(m,
> > - red_channel_get_message_serial(&main_chan->base),
> > + red_channel_client_get_message_serial(rcc),
> > main_chan->ping_id);
> > break;
> > case SPICE_MSG_MAIN_INIT:
> > @@ -658,18 +659,18 @@ static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> > main_channel_marshall_migrate_switch(m);
> > break;
> > };
> > - red_channel_begin_send_message(channel);
> > + red_channel_client_begin_send_message(rcc);
> > }
> >
> > -static void main_channel_release_pipe_item(RedChannel *channel,
> > +static void main_channel_release_pipe_item(RedChannelClient *rcc,
> > PipeItem *base, int item_pushed)
> > {
> > free(base);
> > }
> >
> > -static int main_channel_handle_parsed(RedChannel *channel, uint32_t size, uint16_t type, void *message)
> > +static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message)
> > {
> > - MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> > + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
> >
> > switch (type) {
> > case SPICE_MSGC_MAIN_AGENT_START:
> > @@ -760,35 +761,36 @@ static int main_channel_handle_parsed(RedChannel *channel, uint32_t size, uint16
> > return TRUE;
> > }
> >
> > -static void main_channel_on_error(RedChannel *channel)
> > +static void main_channel_on_error(RedChannelClient *rcc)
> > {
> > reds_disconnect();
> > }
> >
> > -static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> > +static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
> > {
> > - MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> > + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
> >
> > return main_chan->recv_buf;
> > }
> >
> > -static void main_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> > +static void main_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
> > uint8_t *msg)
> > {
> > }
> >
> > -static int main_channel_config_socket(RedChannel *channel)
> > +static int main_channel_config_socket(RedChannelClient *rcc)
> > {
> > return TRUE;
> > }
> >
> > -static void main_channel_hold_pipe_item(RedChannel *channel, PipeItem *item)
> > +static void main_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
> > {
> > }
> >
> > -static int main_channel_handle_migrate_flush_mark_proc(RedChannel *base)
> > +static int main_channel_handle_migrate_flush_mark_proc(RedChannelClient *rcc)
> > {
> > - main_channel_push_migrate_data_item(SPICE_CONTAINEROF(base, MainChannel, base));
> > + main_channel_push_migrate_data_item(SPICE_CONTAINEROF(rcc->channel,
> > + MainChannel, base));
> > return TRUE;
> > }
> >
> > @@ -797,26 +799,30 @@ static void main_channel_link(Channel *channel, RedsStream *stream, int migratio
> > uint32_t *caps)
> > {
> > MainChannel *main_chan;
> > - red_printf("");
> > ASSERT(channel->data == NULL);
> >
> > - main_chan = (MainChannel*)red_channel_create_parser(
> > - sizeof(*main_chan), stream, core, migration, FALSE /* handle_acks */
> > - ,main_channel_config_socket
> > - ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
> > - ,main_channel_handle_parsed
> > - ,main_channel_alloc_msg_rcv_buf
> > - ,main_channel_release_msg_rcv_buf
> > - ,main_channel_hold_pipe_item
> > - ,main_channel_send_item
> > - ,main_channel_release_pipe_item
> > - ,main_channel_on_error
> > - ,main_channel_on_error
> > - ,main_channel_handle_migrate_flush_mark_proc
> > - ,main_channel_handle_migrate_data
> > - ,main_channel_handle_migrate_data_get_serial_proc);
> > - ASSERT(main_chan);
> > - channel->data = main_chan;
> > + if (channel->data == NULL) {
> > + red_printf("create main channel");
> > + channel->data = red_channel_create_parser(
> > + sizeof(*main_chan), core, migration, FALSE /* handle_acks */
> > + ,main_channel_config_socket
> > + ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
> > + ,main_channel_handle_parsed
> > + ,main_channel_alloc_msg_rcv_buf
> > + ,main_channel_release_msg_rcv_buf
> > + ,main_channel_hold_pipe_item
> > + ,main_channel_send_item
> > + ,main_channel_release_pipe_item
> > + ,main_channel_on_error
> > + ,main_channel_on_error
> > + ,main_channel_handle_migrate_flush_mark_proc
> > + ,main_channel_handle_migrate_data
> > + ,main_channel_handle_migrate_data_get_serial_proc);
> > + ASSERT(channel->data);
> > + }
> > + main_chan = (MainChannel*)channel->data;
> > + red_printf("add main channel client");
> > + red_channel_client_create(sizeof(RedChannelClient), channel->data, stream);
> > }
> >
> > int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
> > diff --git a/server/red_channel.c b/server/red_channel.c
> > index b9e0324..7821fa9 100644
> > --- a/server/red_channel.c
> > +++ b/server/red_channel.c
> > @@ -30,8 +30,7 @@
> > #include "red_channel.h"
> > #include "generated_marshallers.h"
> >
> > -static PipeItem *red_channel_pipe_get(RedChannel *channel);
> > -static void red_channel_event(int fd, int event, void *data);
> > +static void red_channel_client_event(int fd, int event, void *data);
> >
> > /* return the number of bytes read. -1 in case of error */
> > static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
> > @@ -149,9 +148,14 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
> > }
> > }
> >
> > +void red_channel_client_receive(RedChannelClient *rcc)
> > +{
> > + red_peer_handle_incoming(rcc->stream, &rcc->incoming);
> > +}
> > +
> > void red_channel_receive(RedChannel *channel)
> > {
> > - red_peer_handle_incoming(channel->stream, &channel->incoming);
> > + red_channel_client_receive(channel->rcc);
> > }
> >
> > static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
> > @@ -198,124 +202,194 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle
> > }
> > }
> >
> > -void red_channel_on_output(void *opaque, int n)
> > +void red_channel_client_on_output(void *opaque, int n)
> > {
> > - RedChannel *channel = opaque;
> > + RedChannelClient *rcc = opaque;
> >
> > - stat_inc_counter(channel->out_bytes_counter, n);
> > + stat_inc_counter(rcc->channel->out_bytes_counter, n);
> > }
> >
> > -void red_channel_default_peer_on_error(RedChannel *channel)
> > +void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
> > {
> > - channel->disconnect(channel);
> > + rcc->channel->disconnect(rcc);
> > }
> >
> > -static void red_channel_peer_on_incoming_error(RedChannel *channel)
> > +static void red_channel_peer_on_incoming_error(RedChannelClient *rcc)
> > {
> > - channel->on_incoming_error(channel);
> > + rcc->channel->on_incoming_error(rcc);
> > }
> >
> > -static void red_channel_peer_on_outgoing_error(RedChannel *channel)
> > +static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc)
> > {
> > - channel->on_outgoing_error(channel);
> > + rcc->channel->on_outgoing_error(rcc);
> > }
> >
> > -static int red_channel_peer_get_out_msg_size(void *opaque)
> > +static int red_channel_client_peer_get_out_msg_size(void *opaque)
> > {
> > - RedChannel *channel = (RedChannel *)opaque;
> > + RedChannelClient *rcc = (RedChannelClient *)opaque;
> >
> > - return channel->send_data.size;
> > + return rcc->send_data.size;
> > }
> >
> > -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
> > +static void red_channel_client_peer_prepare_out_msg(
> > + void *opaque, struct iovec *vec, int *vec_size, int pos)
> > {
> > - RedChannel *channel = (RedChannel *)opaque;
> > + RedChannelClient *rcc = (RedChannelClient *)opaque;
> >
> > - *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
> > + *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
> > vec, MAX_SEND_VEC, pos);
> > }
> >
> > -static void red_channel_peer_on_out_block(void *opaque)
> > +static void red_channel_client_peer_on_out_block(void *opaque)
> > {
> > - RedChannel *channel = (RedChannel *)opaque;
> > + RedChannelClient *rcc = (RedChannelClient *)opaque;
> >
> > - channel->send_data.blocked = TRUE;
> > - channel->core->watch_update_mask(channel->stream->watch,
> > + rcc->send_data.blocked = TRUE;
> > + rcc->channel->core->watch_update_mask(rcc->stream->watch,
> > SPICE_WATCH_EVENT_READ |
> > SPICE_WATCH_EVENT_WRITE);
> > }
> >
> > -static void red_channel_reset_send_data(RedChannel *channel)
> > +static void red_channel_client_reset_send_data(RedChannelClient *rcc)
> > +{
> > + spice_marshaller_reset(rcc->send_data.marshaller);
> > + rcc->send_data.header = (SpiceDataHeader *)
> > + spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
> > + spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
> > + rcc->send_data.header->type = 0;
> > + rcc->send_data.header->size = 0;
> > + rcc->send_data.header->sub_list = 0;
> > + rcc->send_data.header->serial = ++rcc->send_data.serial;
> > +}
> > +
> > +void red_channel_client_push_set_ack(RedChannelClient *rcc)
> > {
> > - spice_marshaller_reset(channel->send_data.marshaller);
> > - channel->send_data.header = (SpiceDataHeader *)
> > - spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> > - spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> > - channel->send_data.header->type = 0;
> > - channel->send_data.header->size = 0;
> > - channel->send_data.header->sub_list = 0;
> > - channel->send_data.header->serial = ++channel->send_data.serial;
> > + red_channel_pipe_add_type(rcc->channel, PIPE_ITEM_TYPE_SET_ACK);
> > }
> >
> > void red_channel_push_set_ack(RedChannel *channel)
> > {
> > + // TODO - MC, should replace with add_type_all (or whatever I'll name it)
> > red_channel_pipe_add_type(channel, PIPE_ITEM_TYPE_SET_ACK);
> > }
> >
> > -static void red_channel_send_set_ack(RedChannel *channel)
> > +static void red_channel_client_send_set_ack(RedChannelClient *rcc)
> > {
> > SpiceMsgSetAck ack;
> >
> > - ASSERT(channel);
> > - red_channel_init_send_data(channel, SPICE_MSG_SET_ACK, NULL);
> > - ack.generation = ++channel->ack_data.generation;
> > - ack.window = channel->ack_data.client_window;
> > - channel->ack_data.messages_window = 0;
> > + ASSERT(rcc);
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
> > + ack.generation = ++rcc->ack_data.generation;
> > + ack.window = rcc->ack_data.client_window;
> > + rcc->ack_data.messages_window = 0;
> >
> > - spice_marshall_msg_set_ack(channel->send_data.marshaller, &ack);
> > + spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
> >
> > - red_channel_begin_send_message(channel);
> > + red_channel_client_begin_send_message(rcc);
> > }
> >
> > -static void red_channel_send_item(RedChannel *channel, PipeItem *item)
> > +static void red_channel_client_send_item(RedChannelClient *rcc, PipeItem *item)
> > {
> > - red_channel_reset_send_data(channel);
> > + int handled = TRUE;
> > +
> > + ASSERT(rcc->send_data.item == NULL);
> > + red_channel_client_reset_send_data(rcc);
> > switch (item->type) {
> > case PIPE_ITEM_TYPE_SET_ACK:
> > - red_channel_send_set_ack(channel);
> > - return;
> > + red_channel_client_send_set_ack(rcc);
> > + break;
> > + default:
> > + handled = FALSE;
> > + }
> > + if (!handled) {
> > + rcc->channel->send_item(rcc, item);
> > }
> > - /* only reached if not handled here */
> > - channel->send_item(channel, item);
> > }
> >
> > -static void red_channel_release_item(RedChannel *channel, PipeItem *item, int item_pushed)
> > +static void red_channel_client_release_item(RedChannelClient *rcc,
> > + PipeItem *item, int item_pushed)
> > {
> > + int handled = TRUE;
> > +
> > switch (item->type) {
> > case PIPE_ITEM_TYPE_SET_ACK:
> > free(item);
> > - return;
> > + default:
> > + handled = FALSE;
> > }
> > - /* only reached if not handled here */
> > - channel->release_item(channel, item, item_pushed);
> > + if (!handled) {
> > + rcc->channel->release_item(rcc, item, item_pushed);
> > + }
> > }
> >
> > -static void red_channel_peer_on_out_msg_done(void *opaque)
> > +static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
> > {
> > - RedChannel *channel = (RedChannel *)opaque;
> > - channel->send_data.size = 0;
> > - if (channel->send_data.item) {
> > - red_channel_release_item(channel, channel->send_data.item, TRUE);
> > - channel->send_data.item = NULL;
> > + if (rcc->send_data.item) {
> > + red_channel_client_release_item(rcc,
> > + rcc->send_data.item, TRUE);
> > + rcc->send_data.item = NULL;
> > }
> > - if (channel->send_data.blocked) {
> > - channel->send_data.blocked = FALSE;
> > - channel->core->watch_update_mask(channel->stream->watch,
> > +}
> > +
> > +static void red_channel_peer_on_out_msg_done(void *opaque)
> > +{
> > + RedChannelClient *rcc = (RedChannelClient *)opaque;
> > +
> > + rcc->send_data.size = 0;
> > + red_channel_client_release_sent_item(rcc);
> > + if (rcc->send_data.blocked) {
> > + rcc->send_data.blocked = FALSE;
> > + rcc->channel->core->watch_update_mask(rcc->stream->watch,
> > SPICE_WATCH_EVENT_READ);
> > }
> > }
> >
> > -RedChannel *red_channel_create(int size, RedsStream *stream,
> > +static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
> > +{
> > + ASSERT(rcc);
> > + channel->rcc = rcc;
> > +}
> > +
> > +RedChannelClient *red_channel_client_create(
> > + int size,
> > + RedChannel *channel,
> > + RedsStream *stream)
> > +{
> > + RedChannelClient *rcc = NULL;
> > +
> > + ASSERT(stream && channel && size >= sizeof(RedChannelClient));
> > + rcc = spice_malloc0(size);
> > + rcc->stream = stream;
> > + rcc->channel = channel;
> > + rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
> > + // block flags)
> > + rcc->ack_data.client_generation = ~0;
> > + rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
> > + rcc->send_data.marshaller = spice_marshaller_new();
> > +
> > + rcc->incoming.opaque = rcc;
> > + rcc->incoming.cb = &channel->incoming_cb;
> > +
> > + rcc->outgoing.opaque = rcc;
> > + rcc->outgoing.cb = &channel->outgoing_cb;
> > + rcc->outgoing.pos = 0;
> > + rcc->outgoing.size = 0;
> > + if (!channel->config_socket(rcc)) {
> > + goto error;
> > + }
> > +
> > + stream->watch = channel->core->watch_add(stream->socket,
> > + SPICE_WATCH_EVENT_READ,
> > + red_channel_client_event, rcc);
> > + red_channel_add_client(channel, rcc);
> > + return rcc;
> > +error:
> > + free(rcc);
> > + reds_stream_free(stream);
> > + return NULL;
> > +}
> > +
> > +RedChannel *red_channel_create(int size,
> > SpiceCoreInterface *core,
> > int migrate, int handle_acks,
> > channel_configure_socket_proc config_socket,
> > @@ -336,7 +410,6 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
> > ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf &&
> > release_item);
> > channel = spice_malloc0(size);
> > -
> > channel->handle_acks = handle_acks;
> > channel->disconnect = disconnect;
> > channel->send_item = send_item;
> > @@ -345,69 +418,40 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
> > channel->handle_migrate_flush_mark = handle_migrate_flush_mark;
> > channel->handle_migrate_data = handle_migrate_data;
> > channel->handle_migrate_data_get_serial = handle_migrate_data_get_serial;
> > + channel->config_socket = config_socket;
> >
> > - channel->stream = stream;
> > channel->core = core;
> > - channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
> > - // block flags)
> > - channel->ack_data.client_generation = ~0;
> > - channel->ack_data.client_window = CLIENT_ACK_WINDOW;
> > -
> > channel->migrate = migrate;
> > ring_init(&channel->pipe);
> > - channel->send_data.marshaller = spice_marshaller_new();
> >
> > - channel->incoming.opaque = channel;
> > channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
> > channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
> > channel->incoming_cb.handle_message = (handle_message_proc)handle_message;
> > - channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
> > -
> > - channel->outgoing.opaque = channel;
> > - channel->outgoing.pos = 0;
> > - channel->outgoing.size = 0;
> > -
> > - channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size;
> > - channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg;
> > - channel->outgoing_cb.on_block = red_channel_peer_on_out_block;
> > - channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
> > + channel->incoming_cb.on_error =
> > + (on_incoming_error_proc)red_channel_client_default_peer_on_error;
> > + channel->outgoing_cb.get_msg_size = red_channel_client_peer_get_out_msg_size;
> > + channel->outgoing_cb.prepare = red_channel_client_peer_prepare_out_msg;
> > + channel->outgoing_cb.on_block = red_channel_client_peer_on_out_block;
> > + channel->outgoing_cb.on_error =
> > + (on_outgoing_error_proc)red_channel_client_default_peer_on_error;
> > channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done;
> > - channel->outgoing_cb.on_output = red_channel_on_output;
> > -
> > - channel->incoming.cb = &channel->incoming_cb;
> > - channel->outgoing.cb = &channel->outgoing_cb;
> > + channel->outgoing_cb.on_output = red_channel_client_on_output;
> >
> > channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
> > channel->out_bytes_counter = 0;
> > -
> > - if (!config_socket(channel)) {
> > - goto error;
> > - }
> > -
> > - channel->stream->watch = channel->core->watch_add(channel->stream->socket,
> > - SPICE_WATCH_EVENT_READ,
> > - red_channel_event, channel);
> > -
> > return channel;
> > -
> > -error:
> > - spice_marshaller_destroy(channel->send_data.marshaller);
> > - free(channel);
> > - reds_stream_free(stream);
> > -
> > - return NULL;
> > }
> >
> > -void do_nothing_disconnect(RedChannel *red_channel)
> > +void do_nothing_disconnect(RedChannelClient *rcc)
> > {
> > }
> >
> > -int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header, uint8_t *msg)
> > +int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
> > {
> > return TRUE;
> > }
> >
> > -RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> > +RedChannel *red_channel_create_parser(int size,
> > SpiceCoreInterface *core,
> > int migrate, int handle_acks,
> > channel_configure_socket_proc config_socket,
> > @@ -424,7 +468,7 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> > channel_handle_migrate_data_proc handle_migrate_data,
> > channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
> > {
> > - RedChannel *channel = red_channel_create(size, stream,
> > + RedChannel *channel = red_channel_create(size,
> > core, migrate, handle_acks, config_socket, do_nothing_disconnect,
> > do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item,
> > send_item, release_item, handle_migrate_flush_mark, handle_migrate_data,
> > @@ -435,62 +479,152 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> > }
> > channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
> > channel->incoming_cb.parser = parser;
> > - channel->on_incoming_error = incoming_error;
> > - channel->on_outgoing_error = outgoing_error;
> > channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
> > channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
> > + channel->on_incoming_error = incoming_error;
> > + channel->on_outgoing_error = outgoing_error;
> > return channel;
> > }
> >
> > +void red_channel_client_destroy(RedChannelClient *rcc)
> > +{
> > + red_channel_client_disconnect(rcc);
> > + spice_marshaller_destroy(rcc->send_data.marshaller);
> > + free(rcc);
> > +}
> > +
> > void red_channel_destroy(RedChannel *channel)
> > {
> > if (!channel) {
> > return;
> > }
> > - red_channel_pipe_clear(channel);
> > - reds_stream_free(channel->stream);
> > - spice_marshaller_destroy(channel->send_data.marshaller);
> > + if (channel->rcc) {
> > + red_channel_client_destroy(channel->rcc);
> > + }
> > free(channel);
> > }
> >
> > +static void red_channel_client_shutdown(RedChannelClient *rcc)
> > +{
> > + if (rcc->stream && !rcc->stream->shutdown) {
> > + rcc->channel->core->watch_remove(rcc->stream->watch);
> > + rcc->stream->watch = NULL;
> > + shutdown(rcc->stream->socket, SHUT_RDWR);
> > + rcc->stream->shutdown = TRUE;
> > + rcc->incoming.shut = TRUE;
> > + }
> > + red_channel_client_release_sent_item(rcc);
> > +}
> > +
> > void red_channel_shutdown(RedChannel *channel)
> > {
> > - red_printf("");
> > - if (channel->stream && !channel->stream->shutdown) {
> > - channel->core->watch_update_mask(channel->stream->watch,
> > - SPICE_WATCH_EVENT_READ);
> > - red_channel_pipe_clear(channel);
> > - shutdown(channel->stream->socket, SHUT_RDWR);
> > - channel->stream->shutdown = TRUE;
> > - channel->incoming.shut = TRUE;
> > + if (channel->rcc) {
> > + red_channel_client_shutdown(channel->rcc);
> > + }
> > + red_channel_pipe_clear(channel);
> > +}
> > +
> > +void red_channel_client_send(RedChannelClient *rcc)
> > +{
> > + red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
> > +}
> > +
> > +void red_channel_send(RedChannel *channel)
> > +{
> > + if (channel->rcc) {
> > + red_channel_client_send(channel->rcc);
> > + }
> > +}
> > +
> > +static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
> > +{
> > + return (rcc->channel->handle_acks &&
> > + (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
> > +}
> > +
> > +// TODO: add refs and target to PipeItem. Right now this only works for a
> > +// single client (or actually, it's worse - first come first served)
> > +static inline PipeItem *red_channel_client_pipe_get(RedChannelClient *rcc)
> > +{
> > + PipeItem *item;
> > +
> > + if (!rcc || rcc->send_data.blocked
> > + || red_channel_client_waiting_for_ack(rcc)
> > + || !(item = (PipeItem *)ring_get_tail(&rcc->channel->pipe))) {
> > + return NULL;
> > + }
> > + --rcc->channel->pipe_size;
> > + ring_remove(&item->link);
> > + return item;
> > +}
> > +
> > +static void red_channel_client_push(RedChannelClient *rcc)
> > +{
> > + PipeItem *pipe_item;
> > +
> > + if (!rcc->during_send) {
> > + rcc->during_send = TRUE;
> > + } else {
> > + return;
> > + }
> > +
> > + if (rcc->send_data.blocked) {
> > + red_channel_client_send(rcc);
> > + }
> > +
> > + while ((pipe_item = red_channel_client_pipe_get(rcc))) {
> > + red_channel_client_send_item(rcc, pipe_item);
> > + }
> > + rcc->during_send = FALSE;
> > +}
> > +
> > +void red_channel_push(RedChannel *channel)
> > +{
> > + if (!channel || !channel->rcc) {
> > + return;
> > }
> > + red_channel_client_push(channel->rcc);
> > +}
> > +
> > +static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
> > +{
> > + rcc->ack_data.messages_window = 0;
> > + red_channel_client_push(rcc);
> > }
> >
> > +// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
> > +// specific
> > void red_channel_init_outgoing_messages_window(RedChannel *channel)
> > {
> > - channel->ack_data.messages_window = 0;
> > - red_channel_push(channel);
> > + red_channel_client_init_outgoing_messages_window(channel->rcc);
> > }
> >
> > -void red_channel_handle_migrate_flush_mark_proc(RedChannel *channel)
> > +static void red_channel_handle_migrate_flush_mark_proc(RedChannel *channel)
> > {
> > if (channel->handle_migrate_flush_mark) {
> > - channel->handle_migrate_flush_mark(channel);
> > + channel->handle_migrate_flush_mark(channel->rcc);
> > }
> > }
> >
> > -void red_channel_handle_migrate_data(RedChannel *channel, uint32_t size, void *message)
> > +// TODO: the whole migration is broken with multiple clients. What do we want to do?
> > +// basically just
> > +// 1) source send mark to all
> > +// 2) source gets at various times the data (waits for all)
> > +// 3) source migrates to target
> > +// 4) target sends data to all
> > +// So need to make all the handlers work with per channel/client data (what data exactly?)
> > +static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
> > {
> > - if (!channel->handle_migrate_data) {
> > + if (!rcc->channel->handle_migrate_data) {
> > return;
> > }
> > - ASSERT(red_channel_get_message_serial(channel) == 0);
> > - red_channel_set_message_serial(channel,
> > - channel->handle_migrate_data_get_serial(channel, size, message));
> > - channel->handle_migrate_data(channel, size, message);
> > + ASSERT(red_channel_client_get_message_serial(rcc) == 0);
> > + red_channel_client_set_message_serial(rcc,
> > + rcc->channel->handle_migrate_data_get_serial(rcc, size, message));
> > + rcc->channel->handle_migrate_data(rcc, size, message);
> > }
> >
> > -int red_channel_handle_message(RedChannel *channel, uint32_t size,
> > +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
> > uint16_t type, void *message)
> > {
> > switch (type) {
> > @@ -499,21 +633,21 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size,
> > red_printf("bad message size");
> > return FALSE;
> > }
> > - channel->ack_data.client_generation = *(uint32_t *)(message);
> > + rcc->ack_data.client_generation = *(uint32_t *)(message);
> > break;
> > case SPICE_MSGC_ACK:
> > - if (channel->ack_data.client_generation == channel->ack_data.generation) {
> > - channel->ack_data.messages_window -= channel->ack_data.client_window;
> > - red_channel_push(channel);
> > + if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
> > + rcc->ack_data.messages_window -= rcc->ack_data.client_window;
> > + red_channel_client_push(rcc);
> > }
> > break;
> > case SPICE_MSGC_DISCONNECTING:
> > break;
> > case SPICE_MSGC_MIGRATE_FLUSH_MARK:
> > - red_channel_handle_migrate_flush_mark_proc(channel);
> > + red_channel_handle_migrate_flush_mark_proc(rcc->channel);
> > break;
> > case SPICE_MSGC_MIGRATE_DATA:
> > - red_channel_handle_migrate_data(channel, size, message);
> > + red_channel_handle_migrate_data(rcc, size, message);
> > break;
> > default:
> > red_printf("invalid message type %u", type);
> > @@ -522,75 +656,54 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size,
> > return TRUE;
> > }
> >
> > -static void red_channel_event(int fd, int event, void *data)
> > +static void red_channel_client_event(int fd, int event, void *data)
> > {
> > - RedChannel *channel = (RedChannel *)data;
> > + RedChannelClient *rcc = (RedChannelClient *)data;
> >
> > if (event & SPICE_WATCH_EVENT_READ) {
> > - red_channel_receive(channel);
> > + red_channel_client_receive(rcc);
> > }
> > if (event & SPICE_WATCH_EVENT_WRITE) {
> > - red_channel_push(channel);
> > + red_channel_client_push(rcc);
> > }
> > }
> >
> > -void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
> > +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item)
> > {
> > - ASSERT(channel->send_data.item == NULL);
> > - channel->send_data.header->type = msg_type;
> > - channel->send_data.item = item;
> > + ASSERT(rcc->send_data.item == NULL);
> > + ASSERT(msg_type != 0);
> > + rcc->send_data.header->type = msg_type;
> > + rcc->send_data.item = item;
> > if (item) {
> > - channel->hold_item(channel, item);
> > + rcc->channel->hold_item(rcc, item);
> > }
> > }
> >
> > -void red_channel_send(RedChannel *channel)
> > -{
> > - red_peer_handle_outgoing(channel->stream, &channel->outgoing);
> > -}
> > -
> > -void red_channel_begin_send_message(RedChannel *channel)
> > +void red_channel_client_begin_send_message(RedChannelClient *rcc)
> > {
> > - spice_marshaller_flush(channel->send_data.marshaller);
> > - channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
> > - channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader);
> > - channel->ack_data.messages_window++;
> > - channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
> > - red_channel_send(channel);
> > -}
> > -
> > -void red_channel_push(RedChannel *channel)
> > -{
> > - PipeItem *pipe_item;
> > + SpiceMarshaller *m = rcc->send_data.marshaller;
> >
> > - if (!channel) {
> > + // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
> > + if (rcc->send_data.header->type == 0) {
> > + red_printf("BUG: header->type == 0");
> > return;
> > }
> > -
> > - if (!channel->during_send) {
> > - channel->during_send = TRUE;
> > - } else {
> > - return;
> > - }
> > -
> > - if (channel->send_data.blocked) {
> > - red_channel_send(channel);
> > - }
> > -
> > - while ((pipe_item = red_channel_pipe_get(channel))) {
> > - red_channel_send_item(channel, pipe_item);
> > - }
> > - channel->during_send = FALSE;
> > + spice_marshaller_flush(m);
> > + rcc->send_data.size = spice_marshaller_get_total_size(m);
> > + rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader);
> > + rcc->ack_data.messages_window++;
> > + rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */
> > + red_channel_client_send(rcc);
> > }
> >
> > -uint64_t red_channel_get_message_serial(RedChannel *channel)
> > +uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
> > {
> > - return channel->send_data.serial;
> > + return rcc->send_data.serial;
> > }
> >
> > -void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
> > +void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
> > {
> > - channel->send_data.serial = serial;
> > + rcc->send_data.serial = serial;
> > }
> >
> > void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
> > @@ -654,28 +767,17 @@ void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type)
> > red_channel_push(channel);
> > }
> >
> > -static inline int red_channel_waiting_for_ack(RedChannel *channel)
> > +int red_channel_is_connected(RedChannel *channel)
> > {
> > - return (channel->handle_acks && (channel->ack_data.messages_window > channel->ack_data.client_window * 2));
> > + return channel->rcc != NULL;
> > }
> >
> > -static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
> > +void red_channel_client_clear_sent_item(RedChannelClient *rcc)
> > {
> > - PipeItem *item;
> > -
> > - if (!channel || channel->send_data.blocked ||
> > - red_channel_waiting_for_ack(channel) ||
> > - !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
> > - return NULL;
> > + if (rcc->send_data.item) {
> > + red_channel_client_release_item(rcc, rcc->send_data.item, TRUE);
> > + rcc->send_data.item = NULL;
> > }
> > - --channel->pipe_size;
> > - ring_remove(&item->link);
> > - return item;
> > -}
> > -
> > -int red_channel_is_connected(RedChannel *channel)
> > -{
> > - return !!channel->stream;
> > }
> >
> > void red_channel_pipe_clear(RedChannel *channel)
> > @@ -683,82 +785,151 @@ void red_channel_pipe_clear(RedChannel *channel)
> > PipeItem *item;
> >
> > ASSERT(channel);
> > - if (channel->send_data.item) {
> > - red_channel_release_item(channel, channel->send_data.item, TRUE);
> > - channel->send_data.item = NULL;
> > + if (channel->rcc) {
> > + red_channel_client_clear_sent_item(channel->rcc);
> > }
> > while ((item = (PipeItem *)ring_get_head(&channel->pipe))) {
> > ring_remove(&item->link);
> > - red_channel_release_item(channel, item, FALSE);
> > + red_channel_client_release_item(channel->rcc, item, FALSE);
> > }
> > channel->pipe_size = 0;
> > }
> >
> > +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
> > +{
> > + rcc->ack_data.messages_window = 0;
> > +}
> > +
> > void red_channel_ack_zero_messages_window(RedChannel *channel)
> > {
> > - channel->ack_data.messages_window = 0;
> > + red_channel_client_ack_zero_messages_window(channel->rcc);
> > +}
> > +
> > +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
> > +{
> > + rcc->ack_data.client_window = client_window;
> > +}
> > +
> > +void red_channel_ack_set_client_window(RedChannel* channel, int client_window)
> > +{
> > + if (channel->rcc) {
> > + red_channel_client_ack_set_client_window(channel->rcc, client_window);
> > + }
> > +}
> > +
> > +void red_channel_client_disconnect(RedChannelClient *rcc)
> > +{
> > + red_printf("%p (channel %p)", rcc, rcc->channel);
> > +
> > + if (rcc->send_data.item) {
> > + rcc->channel->release_item(rcc, rcc->send_data.item, FALSE);
> > + }
> > + // TODO: clear our references from the pipe
> > + reds_stream_free(rcc->stream);
> > + rcc->send_data.item = NULL;
> > + rcc->send_data.blocked = FALSE;
> > + rcc->send_data.size = 0;
> > + rcc->channel->rcc = NULL;
> > +}
> > +
> > +void red_channel_disconnect(RedChannel *channel)
> > +{
> > + red_channel_pipe_clear(channel);
> > + if (channel->rcc) {
> > + red_channel_client_disconnect(channel->rcc);
> > + }
> > +}
> > +
> > +int red_channel_all_clients_serials_are_zero(RedChannel *channel)
> > +{
> > + return (!channel->rcc || channel->rcc->send_data.serial == 0);
> > +}
> > +
> > +void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v)
> > +{
> > + if (channel->rcc) {
> > + v(channel->rcc);
> > + }
> > +}
> > +
> > +void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data)
> > +{
> > + if (channel->rcc) {
> > + v(channel->rcc, data);
> > + }
> > }
> >
> > -void red_channel_ack_set_client_window(RedChannel *channel, int client_window)
> > +void red_channel_set_shut(RedChannel *channel)
> > {
> > - channel->ack_data.client_window = client_window;
> > + if (channel->rcc) {
> > + channel->rcc->incoming.shut = TRUE;
> > + }
> > }
> >
> > int red_channel_all_blocked(RedChannel *channel)
> > {
> > - return channel->send_data.blocked;
> > + return !channel->rcc || channel->rcc->send_data.blocked;
> > }
> >
> > int red_channel_any_blocked(RedChannel *channel)
> > {
> > - return channel->send_data.blocked;
> > + return !channel->rcc || channel->rcc->send_data.blocked;
> > }
> >
> > -int red_channel_send_message_pending(RedChannel *channel)
> > +int red_channel_client_send_message_pending(RedChannelClient *rcc)
> > {
> > - return channel->send_data.header->type != 0;
> > + return rcc->send_data.header->type != 0;
> > }
> >
> > -/* accessors for RedChannel */
> > -SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel)
> > +/* accessors for RedChannelClient */
> > +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
> > {
> > - return channel->send_data.marshaller;
> > + return rcc->send_data.marshaller;
> > }
> >
> > -RedsStream *red_channel_get_stream(RedChannel *channel)
> > +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
> > {
> > - return channel->stream;
> > + return rcc->stream;
> > }
> >
> > -SpiceDataHeader *red_channel_get_header(RedChannel *channel)
> > +SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc)
> > {
> > - return channel->send_data.header;
> > + return rcc->send_data.header;
> > }
> > /* end of accessors */
> >
> > int red_channel_get_first_socket(RedChannel *channel)
> > {
> > - if (!channel->stream) {
> > + if (!channel->rcc || !channel->rcc->stream) {
> > return -1;
> > }
> > - return channel->stream->socket;
> > + return channel->rcc->stream->socket;
> > +}
> > +
> > +int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item)
> > +{
> > + return rcc->send_data.item == item;
> > }
> >
> > int red_channel_item_being_sent(RedChannel *channel, PipeItem *item)
> > {
> > - return channel->send_data.item == item;
> > + return channel->rcc && red_channel_client_item_being_sent(channel->rcc, item);
> > }
> >
> > int red_channel_no_item_being_sent(RedChannel *channel)
> > {
> > - return channel->send_data.item == NULL;
> > + return !channel->rcc || channel->rcc->send_data.item == NULL;
> > }
> >
> > -void red_channel_disconnect(RedChannel *channel)
> > +static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item)
> > {
> > - red_channel_pipe_clear(channel);
> > - reds_stream_free(channel->stream);
> > - channel->stream = NULL;
> > - channel->send_data.blocked = FALSE;
> > - channel->send_data.size = 0;
> > + rcc->channel->pipe_size--;
> > + ring_remove(&item->link);
> > +}
> > +
> > +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
> > + PipeItem *item)
> > +{
> > + red_channel_client_pipe_remove(rcc, item);
> > + red_channel_client_release_item(rcc, item, FALSE);
> > }
> > diff --git a/server/red_channel.h b/server/red_channel.h
> > index d05722c..4d6a23c 100644
> > --- a/server/red_channel.h
> > +++ b/server/red_channel.h
> > @@ -97,6 +97,9 @@ typedef struct BufDescriptor {
> > uint8_t *data;
> > } BufDescriptor;
> >
> > +typedef struct RedChannel RedChannel;
> > +typedef struct RedChannelClient RedChannelClient;
> > +
> > /* Messages handled by red_channel
> > * SET_ACK - sent to client on channel connection
> > * Note that the numbers don't have to correspond to spice message types,
> > @@ -112,37 +115,33 @@ typedef struct PipeItem {
> > int type;
> > } PipeItem;
> >
> > -typedef struct RedChannel RedChannel;
> > -
> > -typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel,
> > +typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannelClient *channel,
> > SpiceDataHeader *msg_header);
> > -typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type,
> > +typedef int (*channel_handle_parsed_proc)(RedChannelClient *rcc, uint32_t size, uint16_t type,
> > void *message);
> > -typedef int (*channel_handle_message_proc)(RedChannel *channel,
> > +typedef int (*channel_handle_message_proc)(RedChannelClient *rcc,
> > SpiceDataHeader *header, uint8_t *msg);
> > -typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel,
> > +typedef void (*channel_release_msg_recv_buf_proc)(RedChannelClient *channel,
> > SpiceDataHeader *msg_header, uint8_t *msg);
> > -typedef void (*channel_disconnect_proc)(RedChannel *channel);
> > -typedef int (*channel_configure_socket_proc)(RedChannel *channel);
> > -typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item);
> > -typedef void (*channel_hold_pipe_item_proc)(RedChannel *channel, PipeItem *item);
> > -typedef void (*channel_release_pipe_item_proc)(RedChannel *channel,
> > +typedef void (*channel_disconnect_proc)(RedChannelClient *rcc);
> > +typedef int (*channel_configure_socket_proc)(RedChannelClient *rcc);
> > +typedef void (*channel_send_pipe_item_proc)(RedChannelClient *rcc, PipeItem *item);
> > +typedef void (*channel_hold_pipe_item_proc)(RedChannelClient *rcc, PipeItem *item);
> > +typedef void (*channel_release_pipe_item_proc)(RedChannelClient *rcc,
> > PipeItem *item, int item_pushed);
> > -typedef void (*channel_on_incoming_error_proc)(RedChannel *channel);
> > -typedef void (*channel_on_outgoing_error_proc)(RedChannel *channel);
> > +typedef void (*channel_on_incoming_error_proc)(RedChannelClient *rcc);
> > +typedef void (*channel_on_outgoing_error_proc)(RedChannelClient *rcc);
> >
> > -typedef int (*channel_handle_migrate_flush_mark_proc)(RedChannel *channel);
> > -typedef uint64_t (*channel_handle_migrate_data_proc)(RedChannel *channel,
> > +typedef int (*channel_handle_migrate_flush_mark_proc)(RedChannelClient *base);
> > +typedef uint64_t (*channel_handle_migrate_data_proc)(RedChannelClient *base,
> > uint32_t size, void *message);
> > -typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannel *channel,
> > +typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannelClient *base,
> > uint32_t size, void *message);
> >
> > -struct RedChannel {
> > +struct RedChannelClient {
> > + RingItem channel_link;
> > + RedChannel *channel;
> > RedsStream *stream;
> > - SpiceCoreInterface *core;
> > - int migrate;
> > - int handle_acks;
> > -
> > struct {
> > uint32_t generation;
> > uint32_t client_generation;
> > @@ -150,9 +149,6 @@ struct RedChannel {
> > uint32_t client_window;
> > } ack_data;
> >
> > - Ring pipe;
> > - uint32_t pipe_size;
> > -
> > struct {
> > SpiceMarshaller *marshaller;
> > SpiceDataHeader *header;
> > @@ -164,16 +160,28 @@ struct RedChannel {
> >
> > OutgoingHandler outgoing;
> > IncomingHandler incoming;
> > + int during_send;
> > +};
> > +
> > +struct RedChannel {
> > + SpiceCoreInterface *core;
> > + int migrate;
> > + int handle_acks;
> > +
> > + RedChannelClient *rcc;
> > +
> > + Ring pipe;
> > + uint32_t pipe_size;
> >
> > OutgoingHandlerInterface outgoing_cb;
> > IncomingHandlerInterface incoming_cb;
> >
> > + channel_configure_socket_proc config_socket;
> > channel_disconnect_proc disconnect;
> > channel_send_pipe_item_proc send_item;
> > channel_hold_pipe_item_proc hold_item;
> > channel_release_pipe_item_proc release_item;
> >
> > - int during_send;
> > /* Stuff below added for Main and Inputs channels switch to RedChannel
> > * (might be removed later) */
> > channel_on_incoming_error_proc on_incoming_error; /* alternative to disconnect */
> > @@ -190,7 +198,7 @@ struct RedChannel {
> >
> > /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
> > explicitly destroy the channel */
> > -RedChannel *red_channel_create(int size, RedsStream *stream,
> > +RedChannel *red_channel_create(int size,
> > SpiceCoreInterface *core,
> > int migrate, int handle_acks,
> > channel_configure_socket_proc config_socket,
> > @@ -207,7 +215,7 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
> >
> > /* alternative constructor, meant for marshaller based (inputs,main) channels,
> > * will become default eventually */
> > -RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> > +RedChannel *red_channel_create_parser(int size,
> > SpiceCoreInterface *core,
> > int migrate, int handle_acks,
> > channel_configure_socket_proc config_socket,
> > @@ -223,29 +231,31 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> > channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
> > channel_handle_migrate_data_proc handle_migrate_data,
> > channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial);
> > -
> > +RedChannelClient *red_channel_client_create(int size, RedChannel *channel,
> > + RedsStream *stream);
> > int red_channel_is_connected(RedChannel *channel);
> >
> > +void red_channel_client_destroy(RedChannelClient *rcc);
> > void red_channel_destroy(RedChannel *channel);
> >
> > /* should be called when a new channel is ready to send messages */
> > void red_channel_init_outgoing_messages_window(RedChannel *channel);
> >
> > /* handles general channel msgs from the client */
> > -int red_channel_handle_message(RedChannel *channel, uint32_t size,
> > +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
> > uint16_t type, void *message);
> >
> > /* default error handler that disconnects channel */
> > -void red_channel_default_peer_on_error(RedChannel *channel);
> > +void red_channel_client_default_peer_on_error(RedChannelClient *rcc);
> >
> > /* when preparing send_data: should call init and then use marshaller */
> > -void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item);
> > +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item);
> >
> > -uint64_t red_channel_get_message_serial(RedChannel *channel);
> > -void red_channel_set_message_serial(RedChannel *channel, uint64_t);
> > +uint64_t red_channel_client_get_message_serial(RedChannelClient *channel);
> > +void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
> >
> > -/* when sending a msg. should first call red_channel_begin_send_message */
> > -void red_channel_begin_send_message(RedChannel *channel);
> > +/* when sending a msg. should first call red_channel_client_begin_send_message */
> > +void red_channel_client_begin_send_message(RedChannelClient *rcc);
> >
> > void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type);
> > void red_channel_pipe_add_push(RedChannel *channel, PipeItem *item);
> > @@ -253,14 +263,19 @@ void red_channel_pipe_add(RedChannel *channel, PipeItem *item);
> > void red_channel_pipe_add_after(RedChannel *channel, PipeItem *item, PipeItem *pos);
> > int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item);
> > void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
> > +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, PipeItem *item);
> > void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item);
> > /* for types that use this routine -> the pipe item should be freed */
> > void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type);
> >
> > +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc);
> > +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window);
> > +void red_channel_client_push_set_ack(RedChannelClient *rcc);
> > void red_channel_ack_zero_messages_window(RedChannel *channel);
> > void red_channel_ack_set_client_window(RedChannel *channel, int client_window);
> > void red_channel_push_set_ack(RedChannel *channel);
> >
> > +/* TODO: This sets all clients to shut state - probably we want to close per channel */
> > void red_channel_shutdown(RedChannel *channel);
> >
> > int red_channel_get_first_socket(RedChannel *channel);
> > @@ -272,7 +287,7 @@ int red_channel_all_blocked(RedChannel *channel);
> > int red_channel_any_blocked(RedChannel *channel);
> >
> > /* helper for channels that have complex logic that can possibly ready a send */
> > -int red_channel_send_message_pending(RedChannel *channel);
> > +int red_channel_client_send_message_pending(RedChannelClient *rcc);
> >
> > /* returns TRUE if item is being sent by one of the channel clients. This will
> > * be true if someone called init_send_data but send has not completed (or perhaps
> > @@ -299,14 +314,18 @@ void red_channel_pipe_clear(RedChannel *channel);
> > // red_wait_pipe_item_sent
> > // handle_channel_events - this is the only one that was used before, and was in red_channel.c
> > void red_channel_receive(RedChannel *channel);
> > +void red_channel_client_receive(RedChannelClient *rcc);
> > +// For red_worker
> > void red_channel_send(RedChannel *channel);
> > +void red_channel_client_send(RedChannelClient *rcc);
> > // For red_worker
> > void red_channel_disconnect(RedChannel *channel);
> > +void red_channel_client_disconnect(RedChannelClient *rcc);
> >
> > -/* accessors for RedChannel */
> > +/* accessors for RedChannelClient */
> > /* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */
> > -SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel);
> > -RedsStream *red_channel_get_stream(RedChannel *channel);
> > +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc);
> > +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc);
> >
> > /* this is a convenience function for sending messages, sometimes (migration only?)
> > * the serial from the header needs to be available for sending. Note that the header
> > @@ -314,5 +333,12 @@ RedsStream *red_channel_get_stream(RedChannel *channel);
> > * red_channel_begin_send_message. red_channel_init_send_data changes the header (sets
> > * the type in it) as a convenience function. It is preffered to do that through it and
> > * not via the below accessor and direct header manipulation. */
> > -SpiceDataHeader *red_channel_get_header(RedChannel *channel);
> > +SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc);
> > +
> > +/* apply given function to all connected clients */
> > +typedef void (*channel_client_visitor)(RedChannelClient *rcc);
> > +typedef void (*channel_client_visitor_data)(RedChannelClient *rcc, void *data);
> > +void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v);
> > +void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data);
> > +
> > #endif
> > diff --git a/server/red_client_shared_cache.h b/server/red_client_shared_cache.h
> > index 74553c0..dddccc6 100644
> > --- a/server/red_client_shared_cache.h
> > +++ b/server/red_client_shared_cache.h
> > @@ -26,6 +26,7 @@
> > #define FUNC_NAME(name) pixmap_cache_##name
> > #define PRIVATE_FUNC_NAME(name) __pixmap_cache_##name
> > #define CHANNEL DisplayChannel
> > +#define CHANNEL_FROM_RCC(rcc) SPICE_CONTAINEROF(rcc->channel, CHANNEL, common.base);
> > #define CACH_GENERATION pixmap_cache_generation
> > #define INVAL_ALL_VERB SPICE_MSG_DISPLAY_INVAL_ALL_PIXMAPS
> > #else
> > @@ -35,12 +36,13 @@
> > #endif
> >
> >
> > -static int FUNC_NAME(hit)(CACHE *cache, uint64_t id, int *lossy, CHANNEL *channel)
> > +static int FUNC_NAME(hit)(CACHE *cache, uint64_t id, int *lossy, RedChannelClient *rcc)
> > {
> > + CHANNEL *channel = CHANNEL_FROM_RCC(rcc);
> > NewCacheItem *item;
> > uint64_t serial;
> >
> > - serial = red_channel_get_message_serial((RedChannel *)channel);
> > + serial = red_channel_client_get_message_serial(rcc);
> > pthread_mutex_lock(&cache->lock);
> > item = cache->hash_table[CACHE_HASH_KEY(id)];
> >
> > @@ -79,8 +81,9 @@ static int FUNC_NAME(set_lossy)(CACHE *cache, uint64_t id, int lossy)
> > return !!item;
> > }
> >
> > -static int FUNC_NAME(add)(CACHE *cache, uint64_t id, uint32_t size, int lossy, CHANNEL *channel)
> > +static int FUNC_NAME(add)(CACHE *cache, uint64_t id, uint32_t size, int lossy, RedChannelClient *rcc)
> > {
> > + CHANNEL *channel = CHANNEL_FROM_RCC(rcc);
> > NewCacheItem *item;
> > uint64_t serial;
> > int key;
> > @@ -88,7 +91,7 @@ static int FUNC_NAME(add)(CACHE *cache, uint64_t id, uint32_t size, int lossy, C
> > ASSERT(size > 0);
> >
> > item = spice_new(NewCacheItem, 1);
> > - serial = red_channel_get_message_serial((RedChannel *)channel);
> > + serial = red_channel_client_get_message_serial(rcc);
> >
> > pthread_mutex_lock(&cache->lock);
> >
> > @@ -166,13 +169,14 @@ static void PRIVATE_FUNC_NAME(clear)(CACHE *cache)
> > cache->items = 0;
> > }
> >
> > -static void FUNC_NAME(reset)(CACHE *cache, CHANNEL *channel, SpiceMsgWaitForChannels* sync_data)
> > +static void FUNC_NAME(reset)(CACHE *cache, RedChannelClient *rcc, SpiceMsgWaitForChannels* sync_data)
> > {
> > + CHANNEL *channel = CHANNEL_FROM_RCC(rcc);
> > uint8_t wait_count;
> > uint64_t serial;
> > uint32_t i;
> >
> > - serial = red_channel_get_message_serial((RedChannel *)channel);
> > + serial = red_channel_client_get_message_serial(rcc);
> > pthread_mutex_lock(&cache->lock);
> > PRIVATE_FUNC_NAME(clear)(cache);
> >
> > @@ -230,4 +234,5 @@ static void FUNC_NAME(destroy)(CACHE *cache)
> > #undef FUNC_NAME
> > #undef VAR_NAME
> > #undef CHANNEL
> > +#undef CHANNEL_FROM_RCC
> >
> > diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
> > index 5df801c..8183ed0 100644
> > --- a/server/red_tunnel_worker.c
> > +++ b/server/red_tunnel_worker.c
> > @@ -1642,9 +1642,11 @@ static int tunnel_channel_handle_socket_token(TunnelChannel *channel, RedSocket
> > return TRUE;
> > }
> >
> > -static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> > +static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
> > + SpiceDataHeader *msg_header)
> > {
> > - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> > + TunnelChannel *tunnel_channel = (TunnelChannel *)rcc->channel;
> > +
> > if (msg_header->type == SPICE_MSGC_TUNNEL_SOCKET_DATA) {
> > return (__tunnel_worker_alloc_socket_rcv_buf(tunnel_channel->worker)->buf);
> > } else if ((msg_header->type == SPICE_MSGC_MIGRATE_DATA) ||
> > @@ -1656,10 +1658,11 @@ static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataH
> > }
> >
> > // called by the receive routine of the channel, before the buffer was assigned to a socket
> > -static void tunnel_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> > +static void tunnel_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
> > uint8_t *msg)
> > {
> > - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> > + TunnelChannel *tunnel_channel = (TunnelChannel *)rcc->channel;
> > +
> > if (msg_header->type == SPICE_MSGC_TUNNEL_SOCKET_DATA) {
> > ASSERT(!(SPICE_CONTAINEROF(msg, RedSocketRawRcvBuf, buf)->base.usr_opaque));
> > __tunnel_worker_free_socket_rcv_buf(tunnel_channel->worker,
> > @@ -1741,9 +1744,9 @@ static void __tunnel_channel_fill_socket_migrate_item(TunnelChannel *channel, Re
> > }
> >
> > static void release_migrate_item(TunnelMigrateItem *item);
> > -static int tunnel_channel_handle_migrate_mark(RedChannel *base)
> > +static int tunnel_channel_handle_migrate_mark(RedChannelClient *rcc)
> > {
> > - TunnelChannel *channel = SPICE_CONTAINEROF(base, TunnelChannel, base);
> > + TunnelChannel *channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > TunnelMigrateItem *migrate_item = NULL;
> > TunnelService *service;
> > TunnelMigrateServiceItem *mig_service;
> > @@ -2156,7 +2159,7 @@ static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *chann
> > }
> > }
> >
> > -static uint64_t tunnel_channel_handle_migrate_data_get_serial_proc(RedChannel *base,
> > +static uint64_t tunnel_channel_handle_migrate_data_get_serial_proc(RedChannelClient *rcc,
> > uint32_t size, void *msg)
> > {
> > TunnelMigrateData *migrate_data = msg;
> > @@ -2169,10 +2172,10 @@ static uint64_t tunnel_channel_handle_migrate_data_get_serial_proc(RedChannel *b
> > return migrate_data->message_serial;
> > }
> >
> > -static uint64_t tunnel_channel_handle_migrate_data(RedChannel *base,
> > +static uint64_t tunnel_channel_handle_migrate_data(RedChannelClient *rcc,
> > uint32_t size, void *msg)
> > {
> > - TunnelChannel *channel = SPICE_CONTAINEROF(base, TunnelChannel, base);
> > + TunnelChannel *channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > TunnelMigrateSocketList *sockets_list;
> > TunnelMigrateServicesList *services_list;
> > TunnelMigrateData *migrate_data = msg;
> > @@ -2239,9 +2242,9 @@ error:
> > }
> >
> > // msg was allocated by tunnel_channel_alloc_msg_rcv_buf
> > -static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *header, uint8_t *msg)
> > +static int tunnel_channel_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
> > {
> > - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> > + TunnelChannel *tunnel_channel = (TunnelChannel *)rcc->channel;
> > RedSocket *sckt = NULL;
> > // retrieve the sckt
> > switch (header->type) {
> > @@ -2265,7 +2268,7 @@ static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *h
> > }
> > break;
> > default:
> > - return red_channel_handle_message(channel, header->size, header->type, msg);
> > + return red_channel_client_handle_message(rcc, header->size, header->type, msg);
> > }
> >
> > switch (header->type) {
> > @@ -2334,7 +2337,7 @@ static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *h
> > return tunnel_channel_handle_socket_token(tunnel_channel, sckt,
> > (SpiceMsgcTunnelSocketTokens *)msg);
> > default:
> > - return red_channel_handle_message(channel, header->size, header->type, msg);
> > + return red_channel_client_handle_message(rcc, header->size, header->type, msg);
> > }
> > return TRUE;
> > }
> > @@ -2343,13 +2346,16 @@ static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *h
> > /* outgoing msgs
> > ********************************/
> >
> > -static void tunnel_channel_marshall_migrate(TunnelChannel *tunnel_channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_migrate(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > - ASSERT(tunnel_channel);
> > + TunnelChannel *tunnel_channel;
> > +
> > + ASSERT(rcc);
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > tunnel_channel->send_data.u.migrate.flags =
> > SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
> > tunnel_channel->expect_migrate_mark = TRUE;
> > - red_channel_init_send_data(&tunnel_channel->base, SPICE_MSG_MIGRATE, item);
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, item);
> > spice_marshaller_add_ref(m,
> > (uint8_t*)&tunnel_channel->send_data.u.migrate,
> > sizeof(SpiceMsgMigrate));
> > @@ -2489,20 +2495,23 @@ static int __tunnel_channel_marshall_socket_migrate_data(TunnelChannel *channel,
> > return (cur_offset - offset);
> > }
> >
> > -static void tunnel_channel_marshall_migrate_data(TunnelChannel *channel,
> > +static void tunnel_channel_marshall_migrate_data(RedChannelClient *rcc,
> > SpiceMarshaller *m, PipeItem *item)
> > {
> > - TunnelMigrateData *migrate_data = &channel->send_data.u.migrate_data;
> > + TunnelChannel *tunnel_channel;
> > + TunnelMigrateData *migrate_data;
> > TunnelMigrateItem *migrate_item = (TunnelMigrateItem *)item;
> > int i;
> >
> > uint32_t data_buf_offset = 0; // current location in data[0] field
> > - ASSERT(channel);
> > + ASSERT(rcc);
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > + migrate_data = &tunnel_channel->send_data.u.migrate_data;
> >
> > migrate_data->magic = TUNNEL_MIGRATE_DATA_MAGIC;
> > migrate_data->version = TUNNEL_MIGRATE_DATA_VERSION;
> > - migrate_data->message_serial = red_channel_get_message_serial(&channel->base);
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_MIGRATE_DATA, item);
> > + migrate_data->message_serial = red_channel_client_get_message_serial(rcc);
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item);
> > spice_marshaller_add_ref(m, (uint8_t*)migrate_data, sizeof(*migrate_data));
> >
> > migrate_data->slirp_state = data_buf_offset;
> > @@ -2516,7 +2525,7 @@ static void tunnel_channel_marshall_migrate_data(TunnelChannel *channel,
> >
> > for (i = 0; i < migrate_item->services_list->num_services; i++) {
> > migrate_item->services_list->services[i] = data_buf_offset;
> > - data_buf_offset += __tunnel_channel_marshall_service_migrate_data(channel, m,
> > + data_buf_offset += __tunnel_channel_marshall_service_migrate_data(tunnel_channel, m,
> > migrate_item->services + i,
> > data_buf_offset);
> > }
> > @@ -2529,83 +2538,93 @@ static void tunnel_channel_marshall_migrate_data(TunnelChannel *channel,
> >
> > for (i = 0; i < migrate_item->sockets_list->num_sockets; i++) {
> > migrate_item->sockets_list->sockets[i] = data_buf_offset;
> > - data_buf_offset += __tunnel_channel_marshall_socket_migrate_data(channel, m,
> > + data_buf_offset += __tunnel_channel_marshall_socket_migrate_data(tunnel_channel, m,
> > migrate_item->sockets_data + i,
> > data_buf_offset);
> > }
> > }
> >
> > -static void tunnel_channel_marshall_init(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_init(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > - ASSERT(channel);
> > + TunnelChannel *channel;
> >
> > + ASSERT(rcc);
> > + channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > channel->send_data.u.init.max_socket_data_size = MAX_SOCKET_DATA_SIZE;
> > channel->send_data.u.init.max_num_of_sockets = MAX_SOCKETS_NUM;
> >
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_INIT, item);
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_INIT, item);
> > spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.init, sizeof(SpiceMsgTunnelInit));
> > }
> >
> > -static void tunnel_channel_marshall_service_ip_map(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_service_ip_map(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > TunnelService *service = SPICE_CONTAINEROF(item, TunnelService, pipe_item);
> >
> > - channel->send_data.u.service_ip.service_id = service->id;
> > - channel->send_data.u.service_ip.virtual_ip.type = SPICE_TUNNEL_IP_TYPE_IPv4;
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > + tunnel_channel->send_data.u.service_ip.service_id = service->id;
> > + tunnel_channel->send_data.u.service_ip.virtual_ip.type = SPICE_TUNNEL_IP_TYPE_IPv4;
> >
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SERVICE_IP_MAP, item);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.service_ip,
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SERVICE_IP_MAP, item);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.service_ip,
> > sizeof(SpiceMsgTunnelServiceIpMap));
> > spice_marshaller_add_ref(m, (uint8_t*)&service->virt_ip.s_addr, sizeof(SpiceTunnelIPv4));
> > }
> >
> > -static void tunnel_channel_marshall_socket_open(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_socket_open(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> > RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> >
> > - channel->send_data.u.socket_open.connection_id = sckt->connection_id;
> > - channel->send_data.u.socket_open.service_id = sckt->far_service->id;
> > - channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > + tunnel_channel->send_data.u.socket_open.connection_id = sckt->connection_id;
> > + tunnel_channel->send_data.u.socket_open.service_id = sckt->far_service->id;
> > + tunnel_channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
> >
> > sckt->in_data.client_total_num_tokens = SOCKET_WINDOW_SIZE;
> > sckt->in_data.num_tokens = 0;
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_OPEN, item);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_open,
> > - sizeof(channel->send_data.u.socket_open));
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_OPEN, item);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_open,
> > + sizeof(tunnel_channel->send_data.u.socket_open));
> > #ifdef DEBUG_NETWORK
> > PRINT_SCKT(sckt);
> > #endif
> > }
> >
> > -static void tunnel_channel_marshall_socket_fin(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_socket_fin(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> > RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> >
> > ASSERT(!sckt->out_data.ready_chunks_queue.head);
> >
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > if (sckt->out_data.process_queue->head) {
> > red_printf("socket sent FIN but there are still buffers in outgoing process queue"
> > "(local_port=%d, service_id=%d)",
> > ntohs(sckt->local_port), sckt->far_service->id);
> > }
> >
> > - channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
> > + tunnel_channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
> >
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_FIN, item);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_fin,
> > - sizeof(channel->send_data.u.socket_fin));
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_FIN, item);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_fin,
> > + sizeof(tunnel_channel->send_data.u.socket_fin));
> > #ifdef DEBUG_NETWORK
> > PRINT_SCKT(sckt);
> > #endif
> > }
> >
> > -static void tunnel_channel_marshall_socket_close(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_socket_close(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> > RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> >
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > // can happen when it is a forced close
> > if (sckt->out_data.ready_chunks_queue.head) {
> > red_printf("socket closed but there are still buffers in outgoing ready queue"
> > @@ -2620,65 +2639,71 @@ static void tunnel_channel_marshall_socket_close(TunnelChannel *channel, SpiceMa
> > ntohs(sckt->local_port), sckt->far_service->id);
> > }
> >
> > - channel->send_data.u.socket_close.connection_id = sckt->connection_id;
> > + tunnel_channel->send_data.u.socket_close.connection_id = sckt->connection_id;
> >
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_CLOSE, item);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_close,
> > - sizeof(channel->send_data.u.socket_close));
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_CLOSE, item);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_close,
> > + sizeof(tunnel_channel->send_data.u.socket_close));
> > #ifdef DEBUG_NETWORK
> > PRINT_SCKT(sckt);
> > #endif
> > }
> >
> > -static void tunnel_channel_marshall_socket_closed_ack(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_socket_closed_ack(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> > RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> >
> > - channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > + tunnel_channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
> >
> > // pipe item is null because we free the sckt.
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_CLOSED_ACK, NULL);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_close_ack,
> > - sizeof(channel->send_data.u.socket_close_ack));
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_CLOSED_ACK, NULL);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_close_ack,
> > + sizeof(tunnel_channel->send_data.u.socket_close_ack));
> > #ifdef DEBUG_NETWORK
> > PRINT_SCKT(sckt);
> > #endif
> >
> > ASSERT(sckt->client_waits_close_ack && (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED));
> > - tunnel_worker_free_socket(channel->worker, sckt);
> > - if (CHECK_TUNNEL_ERROR(channel)) {
> > - tunnel_shutdown(channel->worker);
> > + tunnel_worker_free_socket(tunnel_channel->worker, sckt);
> > + if (CHECK_TUNNEL_ERROR(tunnel_channel)) {
> > + tunnel_shutdown(tunnel_channel->worker);
> > }
> > }
> >
> > -static void tunnel_channel_marshall_socket_token(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_socket_token(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, token_pipe_item);
> > RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> >
> > /* notice that the num of tokens sent can be > SOCKET_TOKENS_TO_SEND, since
> > the sending is performed after the pipe item was pushed */
> >
> > - channel->send_data.u.socket_token.connection_id = sckt->connection_id;
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > + tunnel_channel->send_data.u.socket_token.connection_id = sckt->connection_id;
> >
> > if (sckt->in_data.num_tokens > 0) {
> > - channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
> > + tunnel_channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
> > } else {
> > ASSERT(!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head);
> > - channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
> > + tunnel_channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
> > }
> > - sckt->in_data.num_tokens -= channel->send_data.u.socket_token.num_tokens;
> > - sckt->in_data.client_total_num_tokens += channel->send_data.u.socket_token.num_tokens;
> > + sckt->in_data.num_tokens -= tunnel_channel->send_data.u.socket_token.num_tokens;
> > + sckt->in_data.client_total_num_tokens += tunnel_channel->send_data.u.socket_token.num_tokens;
> > ASSERT(sckt->in_data.client_total_num_tokens <= SOCKET_WINDOW_SIZE);
> >
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_TOKEN, item);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_token,
> > - sizeof(channel->send_data.u.socket_token));
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_TOKEN, item);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_token,
> > + sizeof(tunnel_channel->send_data.u.socket_token));
> > }
> >
> > -static void tunnel_channel_marshall_socket_out_data(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> > +static void tunnel_channel_marshall_socket_out_data(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> > {
> > + TunnelChannel *tunnel_channel;
> > + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> > RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, data_pipe_item);
> > RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> > ReadyTunneledChunk *chunk;
> > @@ -2698,11 +2723,11 @@ static void tunnel_channel_marshall_socket_out_data(TunnelChannel *channel, Spic
> > ASSERT(!sckt->out_data.push_tail);
> > ASSERT(sckt->out_data.ready_chunks_queue.head->size <= MAX_SOCKET_DATA_SIZE);
> >
> > - channel->send_data.u.socket_data.connection_id = sckt->connection_id;
> > + tunnel_channel->send_data.u.socket_data.connection_id = sckt->connection_id;
> >
> > - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_DATA, item);
> > - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_data,
> > - sizeof(channel->send_data.u.socket_data));
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_DATA, item);
> > + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_data,
> > + sizeof(tunnel_channel->send_data.u.socket_data));
> > pushed_bufs_num++;
> >
> > // the first chunk is in a valid size
> > @@ -2787,52 +2812,51 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem
> > }
> > }
> >
> > -static void tunnel_channel_send_item(RedChannel *channel, PipeItem *item)
> > +static void tunnel_channel_send_item(RedChannelClient *rcc, PipeItem *item)
> > {
> > - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> > - SpiceMarshaller *m = red_channel_get_marshaller(channel);
> > + SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
> >
> > switch (item->type) {
> > case PIPE_ITEM_TYPE_TUNNEL_INIT:
> > - tunnel_channel_marshall_init(tunnel_channel, m, item);
> > + tunnel_channel_marshall_init(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SERVICE_IP_MAP:
> > - tunnel_channel_marshall_service_ip_map(tunnel_channel, m, item);
> > + tunnel_channel_marshall_service_ip_map(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_OPEN:
> > - tunnel_channel_marshall_socket_open(tunnel_channel, m, item);
> > + tunnel_channel_marshall_socket_open(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_DATA:
> > - tunnel_channel_marshall_socket_out_data(tunnel_channel, m, item);
> > + tunnel_channel_marshall_socket_out_data(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_FIN:
> > - tunnel_channel_marshall_socket_fin(tunnel_channel, m, item);
> > + tunnel_channel_marshall_socket_fin(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_CLOSE:
> > - tunnel_channel_marshall_socket_close(tunnel_channel, m, item);
> > + tunnel_channel_marshall_socket_close(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK:
> > - tunnel_channel_marshall_socket_closed_ack(tunnel_channel, m, item);
> > + tunnel_channel_marshall_socket_closed_ack(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_TOKEN:
> > - tunnel_channel_marshall_socket_token(tunnel_channel, m, item);
> > + tunnel_channel_marshall_socket_token(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_MIGRATE:
> > - tunnel_channel_marshall_migrate(tunnel_channel, m, item);
> > + tunnel_channel_marshall_migrate(rcc, m, item);
> > break;
> > case PIPE_ITEM_TYPE_MIGRATE_DATA:
> > - tunnel_channel_marshall_migrate_data(tunnel_channel, m, item);
> > + tunnel_channel_marshall_migrate_data(rcc, m, item);
> > break;
> > default:
> > red_error("invalid pipe item type");
> > }
> > - red_channel_begin_send_message(channel);
> > + red_channel_client_begin_send_message(rcc);
> > }
> >
> > /* param item_pushed: distinguishes between a pipe item that was pushed for sending, and
> > a pipe item that is still in the pipe and is released due to disconnection.
> > see red_pipe_item_clear */
> > -static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item, int item_pushed)
> > +static void tunnel_channel_release_pipe_item(RedChannelClient *rcc, PipeItem *item, int item_pushed)
> > {
> > if (!item) { // e.g. when acking closed socket
> > return;
> > @@ -2849,7 +2873,7 @@ static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item
> > break;
> > case PIPE_ITEM_TYPE_SOCKET_DATA:
> > if (item_pushed) {
> > - tunnel_worker_release_socket_out_data(((TunnelChannel *)channel)->worker, item);
> > + tunnel_worker_release_socket_out_data(((TunnelChannel *)rcc->channel)->worker, item);
> > }
> > break;
> > case PIPE_ITEM_TYPE_MIGRATE:
> > @@ -3318,11 +3342,11 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
> > * channel interface and other related procedures
> > ************************************************/
> >
> > -static int tunnel_channel_config_socket(RedChannel *channel)
> > +static int tunnel_channel_config_socket(RedChannelClient *rcc)
> > {
> > int flags;
> > int delay_val;
> > - RedsStream *stream = red_channel_get_stream(channel);
> > + RedsStream *stream = red_channel_client_get_stream(rcc);
> >
> > if ((flags = fcntl(stream->socket, F_GETFL)) == -1) {
> > red_printf("accept failed, %s", strerror(errno)); // can't we just use red_error?
> > @@ -3383,6 +3407,12 @@ static void tunnel_channel_disconnect(RedChannel *channel)
> > worker->channel = NULL;
> > }
> >
> > +// TODO - not MC friendly, remove
> > +static void tunnel_channel_disconnect_client(RedChannelClient *rcc)
> > +{
> > + tunnel_channel_disconnect(rcc->channel);
> > +}
> > +
> > /* interface for reds */
> >
> > static void on_new_tunnel_channel(TunnelChannel *channel)
> > @@ -3397,7 +3427,7 @@ static void on_new_tunnel_channel(TunnelChannel *channel)
> > }
> > }
> >
> > -static void tunnel_channel_hold_pipe_item(RedChannel *channel, PipeItem *item)
> > +static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
> > {
> > }
> >
> > @@ -3412,10 +3442,10 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int
> > }
> >
> > tunnel_channel =
> > - (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), stream, worker->core_interface,
> > + (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), worker->core_interface,
> > migration, TRUE,
> > tunnel_channel_config_socket,
> > - tunnel_channel_disconnect,
> > + tunnel_channel_disconnect_client,
> > tunnel_channel_handle_message,
> > tunnel_channel_alloc_msg_rcv_buf,
> > tunnel_channel_release_msg_rcv_buf,
> > @@ -3429,7 +3459,7 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int
> > if (!tunnel_channel) {
> > return;
> > }
> > -
> > + red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, stream);
> >
> > tunnel_channel->worker = worker;
> > tunnel_channel->worker->channel = tunnel_channel;
> > diff --git a/server/red_worker.c b/server/red_worker.c
> > index 8e3b106..834ba3c 100644
> > --- a/server/red_worker.c
> > +++ b/server/red_worker.c
> > @@ -924,7 +924,7 @@ static void red_display_release_stream(DisplayChannel *display, StreamAgent *age
> > static inline void red_detach_stream(RedWorker *worker, Stream *stream);
> > static void red_stop_stream(RedWorker *worker, Stream *stream);
> > static inline void red_stream_maintenance(RedWorker *worker, Drawable *candidate, Drawable *sect);
> > -static inline void display_begin_send_message(DisplayChannel *channel, SpiceMarshaller *base_marshaller);
> > +static inline void display_begin_send_message(RedChannelClient *rcc, SpiceMarshaller *base_marshaller);
> > static void red_release_pixmap_cache(DisplayChannel *channel);
> > static void red_release_glz(DisplayChannel *channel);
> > static void red_freeze_glz(DisplayChannel *channel);
> > @@ -1261,16 +1261,16 @@ static void release_upgrade_item(RedWorker* worker, UpgradeItem *item)
> > }
> > }
> >
> > -static uint8_t *common_alloc_recv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> > +static uint8_t *common_alloc_recv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
> > {
> > - CommonChannel *common = SPICE_CONTAINEROF(channel, CommonChannel, base);
> > + CommonChannel *common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
> >
> > return common->recv_buf;
> > }
> >
> > -static void common_release_recv_buf(RedChannel *channel, SpiceDataHeader *msg_header, uint8_t* msg)
> > +static void common_release_recv_buf(RedChannelClient *rcc,
> > + SpiceDataHeader *msg_header, uint8_t* msg)
> > {
> > - return;
> > }
> >
> > #define CLIENT_PIXMAPS_CACHE
> > @@ -1686,7 +1686,7 @@ static void red_clear_surface_drawables_from_pipe(RedWorker *worker, int surface
> > item = (PipeItem *)ring_prev(ring, (RingItem *)item);
> > ring_remove(&tmp_item->link);
> > worker->display_channel->common.base.release_item(
> > - &worker->display_channel->common.base, tmp_item, FALSE);
> > + worker->display_channel->common.base.rcc, tmp_item, FALSE);
> > worker->display_channel->common.base.pipe_size--;
> >
> > if (!item) {
> > @@ -5687,16 +5687,18 @@ static inline int red_compress_image(DisplayChannel *display_channel,
> > }
> > }
> >
> > -static inline void red_display_add_image_to_pixmap_cache(DisplayChannel *display_channel,
> > +static inline void red_display_add_image_to_pixmap_cache(RedChannelClient *rcc,
> > SpiceImage *image, SpiceImage *io_image,
> > int is_lossy)
> > {
> > + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> > +
> > if ((image->descriptor.flags & SPICE_IMAGE_FLAGS_CACHE_ME)) {
> > ASSERT(image->descriptor.width * image->descriptor.height > 0);
> > if (!(io_image->descriptor.flags & SPICE_IMAGE_FLAGS_CACHE_REPLACE_ME)) {
> > if (pixmap_cache_add(display_channel->pixmap_cache, image->descriptor.id,
> > image->descriptor.width * image->descriptor.height, is_lossy,
> > - display_channel)) {
> > + rcc)) {
> > io_image->descriptor.flags |= SPICE_IMAGE_FLAGS_CACHE_ME;
> > stat_inc_counter(display_channel->add_to_cache_counter, 1);
> > }
> > @@ -5719,9 +5721,10 @@ typedef enum {
> >
> > /* if the number of times fill_bits can be called per one qxl_drawable increases -
> > MAX_LZ_DRAWABLE_INSTANCES must be increased as well */
> > -static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *m,
> > +static FillBitsType fill_bits(RedChannelClient *rcc, SpiceMarshaller *m,
> > SpiceImage *simage, Drawable *drawable, int can_lossy)
> > {
> > + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> > RedWorker *worker = display_channel->common.worker;
> > SpiceImage image;
> > compress_send_data_t comp_send_data = {0};
> > @@ -5737,7 +5740,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> > if ((simage->descriptor.flags & SPICE_IMAGE_FLAGS_CACHE_ME)) {
> > int lossy_cache_item;
> > if (pixmap_cache_hit(display_channel->pixmap_cache, image.descriptor.id,
> > - &lossy_cache_item, display_channel)) {
> > + &lossy_cache_item, rcc)) {
> > if (can_lossy || !lossy_cache_item) {
> > if (!display_channel->enable_jpeg || lossy_cache_item) {
> > image.descriptor.type = SPICE_IMAGE_TYPE_FROM_CACHE;
> > @@ -5794,7 +5797,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> > drawable, can_lossy, &comp_send_data)) {
> > SpicePalette *palette;
> >
> > - red_display_add_image_to_pixmap_cache(display_channel, simage, &image, FALSE);
> > + red_display_add_image_to_pixmap_cache(rcc, simage, &image, FALSE);
> >
> > *bitmap = simage->u.bitmap;
> > bitmap->flags = bitmap->flags & SPICE_BITMAP_FLAGS_TOP_DOWN;
> > @@ -5812,7 +5815,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> > spice_marshaller_add_ref_chunks(m, bitmap->data);
> > return FILL_BITS_TYPE_BITMAP;
> > } else {
> > - red_display_add_image_to_pixmap_cache(display_channel, simage, &image,
> > + red_display_add_image_to_pixmap_cache(rcc, simage, &image,
> > comp_send_data.is_lossy);
> >
> > spice_marshall_Image(m, &image,
> > @@ -5833,7 +5836,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> > break;
> > }
> > case SPICE_IMAGE_TYPE_QUIC:
> > - red_display_add_image_to_pixmap_cache(display_channel, simage, &image, FALSE);
> > + red_display_add_image_to_pixmap_cache(rcc, simage, &image, FALSE);
> > image.u.quic = simage->u.quic;
> > spice_marshall_Image(m, &image,
> > &bitmap_palette_out, &lzplt_palette_out);
> > @@ -5846,23 +5849,25 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> > }
> > }
> >
> > -static void fill_mask(DisplayChannel *display_channel, SpiceMarshaller *m,
> > +static void fill_mask(RedChannelClient *rcc, SpiceMarshaller *m,
> > SpiceImage *mask_bitmap, Drawable *drawable)
> > {
> > + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> > +
> > if (mask_bitmap && m) {
> > if (display_channel->common.worker->image_compression != SPICE_IMAGE_COMPRESS_OFF) {
> > spice_image_compression_t save_img_comp =
> > display_channel->common.worker->image_compression;
> > display_channel->common.worker->image_compression = SPICE_IMAGE_COMPRESS_OFF;
> > - fill_bits(display_channel, m, mask_bitmap, drawable, FALSE);
> > + fill_bits(rcc, m, mask_bitmap, drawable, FALSE);
> > display_channel->common.worker->image_compression = save_img_comp;
> > } else {
> > - fill_bits(display_channel, m, mask_bitmap, drawable, FALSE);
> > + fill_bits(rcc, m, mask_bitmap, drawable, FALSE);
> > }
> > }
> > }
> >
> > -static void fill_attr(DisplayChannel *display_channel, SpiceMarshaller *m, SpiceLineAttr *attr, uint32_t group_id)
> > +static void fill_attr(SpiceMarshaller *m, SpiceLineAttr *attr, uint32_t group_id)
> > {
> > int i;
> >
> > @@ -5963,9 +5968,11 @@ static int is_surface_area_lossy(DisplayChannel *display_channel, uint32_t surfa
> > to the client, returns false. "area" is for surfaces. If area = NULL,
> > all the surface is considered. out_lossy_data will hold info about the bitmap, and its lossy
> > area in case it is lossy and part of a surface. */
> > -static int is_bitmap_lossy(DisplayChannel *display_channel, SpiceImage *image, SpiceRect *area,
> > +static int is_bitmap_lossy(RedChannelClient *rcc, SpiceImage *image, SpiceRect *area,
> > Drawable *drawable, BitmapData *out_data)
> > {
> > + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> > +
> > if (image == NULL) {
> > // self bitmap
> > out_data->type = BITMAP_DATA_TYPE_BITMAP;
> > @@ -5977,7 +5984,7 @@ static int is_bitmap_lossy(DisplayChannel *display_channel, SpiceImage *image, S
> >
> > out_data->id = image->descriptor.id;
> > if (pixmap_cache_hit(display_channel->pixmap_cache, image->descriptor.id,
> > - &is_hit_lossy, display_channel)) {
> > + &is_hit_lossy, rcc)) {
> > out_data->type = BITMAP_DATA_TYPE_CACHE;
> > if (is_hit_lossy) {
> > return TRUE;
> > @@ -6007,11 +6014,11 @@ static int is_bitmap_lossy(DisplayChannel *display_channel, SpiceImage *image, S
> > }
> > }
> >
> > -static int is_brush_lossy(DisplayChannel *display_channel, SpiceBrush *brush,
> > +static int is_brush_lossy(RedChannelClient *rcc, SpiceBrush *brush,
> > Drawable *drawable, BitmapData *out_data)
> > {
> > if (brush->type == SPICE_BRUSH_TYPE_PATTERN) {
> > - return is_bitmap_lossy(display_channel, brush->u.pattern.pat, NULL,
> > + return is_bitmap_lossy(rcc, brush->u.pattern.pat, NULL,
> > drawable, out_data);
> > } else {
> > out_data->type = BITMAP_DATA_TYPE_INVALID;
> > @@ -6286,17 +6293,16 @@ static void red_add_lossless_drawable_dependencies(RedWorker *worker,
> > }
> >
> > static void red_marshall_qxl_draw_fill(RedWorker *worker,
> > - DisplayChannel *display_channel,
> > + RedChannelClient *rcc,
> > SpiceMarshaller *base_marshaller,
> > Drawable *item)
> > {
> > - RedChannel *channel = &display_channel->common.base;
> > RedDrawable *drawable = item->red_drawable;
> > SpiceMarshaller *brush_pat_out;
> > SpiceMarshaller *mask_bitmap_out;
> > SpiceFill fill;
> >
> > - red_channel_init_send_data(channel, SPICE_MSG_DISPLAY_DRAW_FILL, &item->pipe_item);
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_DISPLAY_DRAW_FILL, &item->pipe_item);
> > fill_base(base_marshaller, item);
> > fill = drawable->u.fill;
> > spice_marshall_Fill(base_marshaller,
> > @@ -6305,18 +6311,19 @@ static void red_marshall_qxl_draw_fill(RedWorker *worker,
> > &mask_bitmap_out);
> >
> > if (brush_pat_out) {
> > - fill_bits(display_channel, brush_pat_out, fill.brush.u.pattern.pat, item, FALSE);
> > + fill_bits(rcc, brush_pat_out, fill.brush.u.pattern.pat, item, FALSE);
> > }
> >
> > - fill_mask(display_channel, mask_bitmap_out, fill.mask.bitmap, item);
> > + fill_mask(rcc, mask_bitmap_out, fill.mask.bitmap, item);
> > }
> >
> >
> > -static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> > - DisplayChannel *display_channel,
> > +static void red_lossy_marshall_qxl_draw_fill(RedWorker *worker,
> > + RedChannelClient *rcc,
> > SpiceMarshaller *m,
> > Drawable *item)
> > {
> > + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> > RedDrawable *drawable = item->red_drawable;
> >
> > int dest_allowed_lossy = FALSE;
> > @@ -6332,7 +6339,7 @@ static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> > (rop & SPICE_ROPD_OP_AND) ||
> > (rop & SPICE_ROPD_OP_XOR));
> >
> > - brush_is_lossy = is_brush_lossy(display_channel, &drawable->u.fill.brush, item,
> > + brush_is_lossy = is_brush_lossy(rcc, &drawable->u.fill.brush, item,
> > &brush_bitmap_data);
> > if (!dest_allowed_lossy) {
> > dest_is_lossy = is_surface_area_lossy(display_channel, item->surface_id, &drawable->bbox,
> > @@ -6343,8 +6350,7 @@ static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> > !(brush_is_lossy && (brush_bitmap_data.type == BITMAP_DATA_TYPE_SURFACE))) {
> > int has_mask = !!drawable->u.fill.mask.bitmap;
> >
> > - red_marshall_qxl_draw_fill(worker, display_channel, m, item);
> > -
> > + red_marshall_qxl_draw_fill(worker, rcc, m, item);
> > // either the brush operation is opaque, or the dest is not lossy
> > surface_lossy_region_update(worker, display_channel, item, has_mask, FALSE);
> > } else {
> > @@ -6370,11 +6376,10 @@ static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> > }
> >
> > static FillBitsType red_marshall_qxl_draw_opaque(RedWorker *worker,
> > - DisplayChannel *display_channel,
> > + RedChannelClient *rcc,
> > SpiceMarshaller *base_marshaller,
> > Drawable *item, int src_allowed_lossy)
> > {
> > - RedChannel *channel = &display_channel->common.base;
> > RedDrawable *drawable = item->red_drawable;
> > SpiceMarshaller *brush_pat_out;
> > SpiceMarshaller *src_bitmap_out;
> > @@ -6382,7 +6387,7 @@ static FillBitsType red_marshall_qxl_draw_opaque(RedWorker *worker,
> > SpiceOpaque opaque;
> > FillBitsType src_send_type;
> >
> > - red_channel_init_send_data(channel, SPICE_MSG_DISPLAY_DRAW_OPAQUE, &item->pipe_item);
> > + red_channel_client_init_send_data(rcc, SPICE_MSG_DISPLAY_DRAW_OPAQUE, &item->pipe_item);
> > fill_base(base_marshaller, item);
> > opaque = drawable->u.opaque;
> > spice_marshall_Opaque(base_marshaller,
> > @@ -6391,22 +6396,23 @@ static FillBitsType red_marshall_qxl_draw_opaque(RedWorker *worker,
> > &brush_pat_out,
> > &mask_bitmap_out);
> >
> > - src_send_type = fill_bits(display_channel, src_bitmap_out, opaque.src_bitmap, item,
> > + src_send_type = fill_bits(rcc, src_bitmap_out, opaque.src_bitmap, item,
> > src_allowed_lossy);
> >
> > if (brush_pat_out) {
> > - fill_bits(display_channel, brush_pat_out, opaque.brush.u.pattern.pat, item, FALSE);
> > + fill_bits(rcc, brush_pat_out, opaque.brush.u.pattern.pat, item, FALSE);
> > }
> > - fill_mask(display_channel, mask_bitmap_out, opaque.mask.bitmap, item);
> > + fill_mask(rcc, mask_bitmap_out, opaque.mask.bitmap, item);
> >
> > return src_send_type;
> > }
> >
> > -static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> > - DisplayChannel *display_channel,
> > +static void red_lossy_marshall_qxl_draw_opaque(RedWorker *worker,
> > + RedChannelClient *rcc,
> > SpiceMarshaller *m,
> > Drawable *item)
> > {
> > + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> > RedDrawable *drawable = item->red_drawable;
> >
> > int src_allowed_lossy;
> > @@ -6421,11 +6427,11 @@ static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> > (rop & SPICE_ROPD_OP_AND) ||
> > (rop & SPICE_ROPD_OP_XOR));
> >
> > - brush_is_lossy = is_brush_lossy(display_channel, &drawable->u.opaque.brush, item,
> > + brush_is_lossy = is_brush_lossy(rcc, &drawable->u.opaque.brush, item,
> > &brush_bitmap_data);
> >
> > if (!src_allowed_lossy) {
> > - src_is_lossy = is_bitmap_lossy(display_channel, drawable->u.opaque.src_bitmap,
> > + src_is_lossy = is_bitmap_lossy(rcc, drawable->u.opaque.src_bitmap,
> > &drawable->u.opaque.src_area,
> > item,
> > &src_bitmap_data);
> > @@ -6436,8 +6442,7 @@ static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> > FillBitsType src_send_type;
> > int has_mask = !!drawable->u.opaque.mask.bitmap;
> >
> > - src_send_type = red_marshall_qxl_draw_opaque(worker, display_channel, m, item, src_allowed_lossy);
> > -
> > + src_send_type = red_marshall_qxl_draw_opaque(worker, rcc, m, item, src_allowed_lossy);
> > if (src_send_type == FILL_BITS_TYPE_COMPRESS_LOSSY) {
> > src_is_lossy = TRUE;
> > } else if (src_send_type == FILL_BITS_TYPE_COMPRESS_LOSSLESS) {
> > @@ -6468,18 +6473,17 @@ static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> > }
> >
> > static FillBitsType red_marshall_qxl_draw_copy(RedWorker *worker,
> > - DisplayChannel *display_channel,
> > + RedChannelClient *rcc,
> > SpiceMarshaller *base_marshaller,
> > Drawable *item, int src_allowed_...
> >
> > [Message clipped]
>
>
>
> --
> Marc-André Lureau
More information about the Spice-devel
mailing list