[Spice-devel] [PATCH 3/4] server/red_channel (all): introduce RedChannelClient
Marc-André Lureau
marcandre.lureau at gmail.com
Tue Mar 29 10:33:09 PDT 2011
Hi
Overall, the patch is quite straightforward. However, it is quite
large and thus touches parts that I am not familiar with. I have done
various tests and valgrind was happy.
A few minor nitpicks:
- the indentation is a bit weird, for ex in
red_channel_client_release_item(), red_channel_add_client(),
- red_channel_apply_clients vs red_channel_apply_clients_data:
Why two of them, just one with with data argument should be enough.
Traditionally, accepting a vistor method is called accept(), not
apply(), but I don't think the analogy with a visitor is worth here,
we don't have a deep traversal of various objects, rather just a
foreach()
- red_channel_receive() should probably guard against calling
red_channel_client_receive() with a NULL rcc, (but that condition does
not seem possible)
- I would argue that red_channel_client_create() should be done
earlier, in one place, such as reds_handle_link(), perhaps even before
(link would take a RCC instead of a Stream)
- there is a number of TODO added that I hope will get fixed in the
following patches, some look scary too many, the pipe sharing and the
migration handling for instance :)
regards
On Sun, Mar 27, 2011 at 9:03 PM, Alon Levy <alevy at redhat.com> wrote:
> This commit adds a RedChannelClient that now owns the stream connection,
> but still doesn't own the pipe. There is only a single RCC per RC
> right now (and RC still means RedChannel, RedClient will be introduced
> later). All internal api changes are in server/red_channel.h, hence
> the need to update all channels. red_worker.c is affected the most because
> it makes use of direct access to some of RedChannel still.
>
> API changes:
>
> 1. red_channel_client_create added.
> rec_channel_create -> (red_channel_create, red_channel_client_create)
> 2. two way connection: rcc->channel, channel->rcc (later channel will
> hold a list, and there will be a RedClient to hold the list of channels
> per client)
> 3. seperation of channel disconnect and channel_client_disconnect
> ---
> server/inputs_channel.c | 39 ++-
> server/main_channel.c | 82 +++---
> server/red_channel.c | 617 ++++++++++++++++++++++-------------
> server/red_channel.h | 106 ++++---
> server/red_client_shared_cache.h | 17 +-
> server/red_tunnel_worker.c | 214 +++++++-----
> server/red_worker.c | 678 +++++++++++++++++++-------------------
> server/smartcard.c | 150 +++++-----
> 8 files changed, 1076 insertions(+), 827 deletions(-)
>
> diff --git a/server/inputs_channel.c b/server/inputs_channel.c
> index 213f8a0..678eff3 100644
> --- a/server/inputs_channel.c
> +++ b/server/inputs_channel.c
> @@ -161,9 +161,9 @@ const VDAgentMouseState *inputs_get_mouse_state(void)
> return &g_inputs_channel->mouse_state;
> }
>
> -static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> +static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
> {
> - InputsChannel *inputs_channel = SPICE_CONTAINEROF(channel, InputsChannel, base);
> + InputsChannel *inputs_channel = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
>
> if (msg_header->size > RECEIVE_BUF_SIZE) {
> red_printf("error: too large incoming message");
> @@ -172,7 +172,7 @@ static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataH
> return inputs_channel->recv_buf;
> }
>
> -static void inputs_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> +static void inputs_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
> uint8_t *msg)
> {
> }
> @@ -246,17 +246,17 @@ static void inputs_pipe_add_type(InputsChannel *channel, int type)
> red_channel_pipe_add_push(&channel->base, &pipe_item->base);
> }
>
> -static void inputs_channel_release_pipe_item(RedChannel *channel,
> +static void inputs_channel_release_pipe_item(RedChannelClient *rcc,
> PipeItem *base, int item_pushed)
> {
> free(base);
> }
>
> -static void inputs_channel_send_item(RedChannel *channel, PipeItem *base)
> +static void inputs_channel_send_item(RedChannelClient *rcc, PipeItem *base)
> {
> - SpiceMarshaller *m = red_channel_get_marshaller(channel);
> + SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
>
> - red_channel_init_send_data(channel, base->type, base);
> + red_channel_client_init_send_data(rcc, base->type, base);
> switch (base->type) {
> case PIPE_ITEM_KEY_MODIFIERS:
> {
> @@ -285,12 +285,12 @@ static void inputs_channel_send_item(RedChannel *channel, PipeItem *base)
> default:
> break;
> }
> - red_channel_begin_send_message(channel);
> + red_channel_client_begin_send_message(rcc);
> }
>
> -static int inputs_channel_handle_parsed(RedChannel *channel, uint32_t size, uint16_t type, void *message)
> +static int inputs_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message)
> {
> - InputsChannel *inputs_channel = (InputsChannel *)channel;
> + InputsChannel *inputs_channel = (InputsChannel *)rcc->channel;
> uint8_t *buf = (uint8_t *)message;
>
> ASSERT(g_inputs_channel == inputs_channel);
> @@ -443,10 +443,10 @@ static void inputs_relase_keys(void)
> kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
> }
>
> -static void inputs_channel_on_error(RedChannel *channel)
> +static void inputs_channel_on_error(RedChannelClient *rcc)
> {
> inputs_relase_keys();
> - reds_disconnect();
> + red_channel_client_destroy(rcc);
> }
>
> static void inputs_shutdown(Channel *channel)
> @@ -482,11 +482,11 @@ static void inputs_pipe_add_init(InputsChannel *inputs_channel)
> red_channel_pipe_add_push(&inputs_channel->base, &item->base);
> }
>
> -static int inputs_channel_config_socket(RedChannel *channel)
> +static int inputs_channel_config_socket(RedChannelClient *rcc)
> {
> int flags;
> int delay_val = 1;
> - RedsStream *stream = red_channel_get_stream(channel);
> + RedsStream *stream = red_channel_client_get_stream(rcc);
>
> if (setsockopt(stream->socket, IPPROTO_TCP, TCP_NODELAY,
> &delay_val, sizeof(delay_val)) == -1) {
> @@ -502,7 +502,7 @@ static int inputs_channel_config_socket(RedChannel *channel)
> return TRUE;
> }
>
> -static void inputs_channel_hold_pipe_item(RedChannel *channel, PipeItem *item)
> +static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
> {
> }
>
> @@ -511,11 +511,13 @@ static void inputs_link(Channel *channel, RedsStream *stream, int migration,
> uint32_t *caps)
> {
> InputsChannel *inputs_channel;
> - red_printf("");
> + RedChannelClient *rcc;
> +
> ASSERT(channel->data == NULL);
>
> + red_printf("input channel create");
> g_inputs_channel = inputs_channel = (InputsChannel*)red_channel_create_parser(
> - sizeof(*inputs_channel), stream, core, migration, FALSE /* handle_acks */
> + sizeof(*inputs_channel), core, migration, FALSE /* handle_acks */
> ,inputs_channel_config_socket
> ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
> ,inputs_channel_handle_parsed
> @@ -530,6 +532,9 @@ static void inputs_link(Channel *channel, RedsStream *stream, int migration,
> ,NULL
> ,NULL);
> ASSERT(inputs_channel);
> + red_printf("input channel client create");
> + rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base, stream);
> + ASSERT(rcc);
> channel->data = inputs_channel;
> inputs_pipe_add_init(inputs_channel);
> }
> diff --git a/server/main_channel.c b/server/main_channel.c
> index 70255d7..c849ef0 100644
> --- a/server/main_channel.c
> +++ b/server/main_channel.c
> @@ -418,7 +418,8 @@ static void main_channel_marshall_migrate_data_item(SpiceMarshaller *m, int seri
> data->ping_id = ping_id;
> }
>
> -static uint64_t main_channel_handle_migrate_data_get_serial_proc(RedChannel *base,
> +static uint64_t main_channel_handle_migrate_data_get_serial_proc(
> + RedChannelClient *rcc,
> uint32_t size, void *message)
> {
> MainMigrateData *data = message;
> @@ -430,10 +431,10 @@ static uint64_t main_channel_handle_migrate_data_get_serial_proc(RedChannel *bas
> return data->serial;
> }
>
> -static uint64_t main_channel_handle_migrate_data(RedChannel *base,
> +static uint64_t main_channel_handle_migrate_data(RedChannelClient *rcc,
> uint32_t size, void *message)
> {
> - MainChannel *main_chan = SPICE_CONTAINEROF(base, MainChannel, base);
> + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
> MainMigrateData *data = message;
>
> if (size < sizeof(*data)) {
> @@ -597,12 +598,12 @@ static void main_channel_marshall_multi_media_time(SpiceMarshaller *m,
> spice_marshall_msg_main_multi_media_time(m, &time_mes);
> }
>
> -static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> +static void main_channel_send_item(RedChannelClient *rcc, PipeItem *base)
> {
> - SpiceMarshaller *m = red_channel_get_marshaller(channel);
> - MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
> + SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
>
> - red_channel_init_send_data(channel, base->type, base);
> + red_channel_client_init_send_data(rcc, base->type, base);
> switch (base->type) {
> case SPICE_MSG_MAIN_CHANNELS_LIST:
> main_channel_marshall_channels(m);
> @@ -632,7 +633,7 @@ static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> break;
> case SPICE_MSG_MIGRATE_DATA:
> main_channel_marshall_migrate_data_item(m,
> - red_channel_get_message_serial(&main_chan->base),
> + red_channel_client_get_message_serial(rcc),
> main_chan->ping_id);
> break;
> case SPICE_MSG_MAIN_INIT:
> @@ -658,18 +659,18 @@ static void main_channel_send_item(RedChannel *channel, PipeItem *base)
> main_channel_marshall_migrate_switch(m);
> break;
> };
> - red_channel_begin_send_message(channel);
> + red_channel_client_begin_send_message(rcc);
> }
>
> -static void main_channel_release_pipe_item(RedChannel *channel,
> +static void main_channel_release_pipe_item(RedChannelClient *rcc,
> PipeItem *base, int item_pushed)
> {
> free(base);
> }
>
> -static int main_channel_handle_parsed(RedChannel *channel, uint32_t size, uint16_t type, void *message)
> +static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message)
> {
> - MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
>
> switch (type) {
> case SPICE_MSGC_MAIN_AGENT_START:
> @@ -760,35 +761,36 @@ static int main_channel_handle_parsed(RedChannel *channel, uint32_t size, uint16
> return TRUE;
> }
>
> -static void main_channel_on_error(RedChannel *channel)
> +static void main_channel_on_error(RedChannelClient *rcc)
> {
> reds_disconnect();
> }
>
> -static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> +static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
> {
> - MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
> + MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
>
> return main_chan->recv_buf;
> }
>
> -static void main_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> +static void main_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
> uint8_t *msg)
> {
> }
>
> -static int main_channel_config_socket(RedChannel *channel)
> +static int main_channel_config_socket(RedChannelClient *rcc)
> {
> return TRUE;
> }
>
> -static void main_channel_hold_pipe_item(RedChannel *channel, PipeItem *item)
> +static void main_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
> {
> }
>
> -static int main_channel_handle_migrate_flush_mark_proc(RedChannel *base)
> +static int main_channel_handle_migrate_flush_mark_proc(RedChannelClient *rcc)
> {
> - main_channel_push_migrate_data_item(SPICE_CONTAINEROF(base, MainChannel, base));
> + main_channel_push_migrate_data_item(SPICE_CONTAINEROF(rcc->channel,
> + MainChannel, base));
> return TRUE;
> }
>
> @@ -797,26 +799,30 @@ static void main_channel_link(Channel *channel, RedsStream *stream, int migratio
> uint32_t *caps)
> {
> MainChannel *main_chan;
> - red_printf("");
> ASSERT(channel->data == NULL);
>
> - main_chan = (MainChannel*)red_channel_create_parser(
> - sizeof(*main_chan), stream, core, migration, FALSE /* handle_acks */
> - ,main_channel_config_socket
> - ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
> - ,main_channel_handle_parsed
> - ,main_channel_alloc_msg_rcv_buf
> - ,main_channel_release_msg_rcv_buf
> - ,main_channel_hold_pipe_item
> - ,main_channel_send_item
> - ,main_channel_release_pipe_item
> - ,main_channel_on_error
> - ,main_channel_on_error
> - ,main_channel_handle_migrate_flush_mark_proc
> - ,main_channel_handle_migrate_data
> - ,main_channel_handle_migrate_data_get_serial_proc);
> - ASSERT(main_chan);
> - channel->data = main_chan;
> + if (channel->data == NULL) {
> + red_printf("create main channel");
> + channel->data = red_channel_create_parser(
> + sizeof(*main_chan), core, migration, FALSE /* handle_acks */
> + ,main_channel_config_socket
> + ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
> + ,main_channel_handle_parsed
> + ,main_channel_alloc_msg_rcv_buf
> + ,main_channel_release_msg_rcv_buf
> + ,main_channel_hold_pipe_item
> + ,main_channel_send_item
> + ,main_channel_release_pipe_item
> + ,main_channel_on_error
> + ,main_channel_on_error
> + ,main_channel_handle_migrate_flush_mark_proc
> + ,main_channel_handle_migrate_data
> + ,main_channel_handle_migrate_data_get_serial_proc);
> + ASSERT(channel->data);
> + }
> + main_chan = (MainChannel*)channel->data;
> + red_printf("add main channel client");
> + red_channel_client_create(sizeof(RedChannelClient), channel->data, stream);
> }
>
> int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
> diff --git a/server/red_channel.c b/server/red_channel.c
> index b9e0324..7821fa9 100644
> --- a/server/red_channel.c
> +++ b/server/red_channel.c
> @@ -30,8 +30,7 @@
> #include "red_channel.h"
> #include "generated_marshallers.h"
>
> -static PipeItem *red_channel_pipe_get(RedChannel *channel);
> -static void red_channel_event(int fd, int event, void *data);
> +static void red_channel_client_event(int fd, int event, void *data);
>
> /* return the number of bytes read. -1 in case of error */
> static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
> @@ -149,9 +148,14 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
> }
> }
>
> +void red_channel_client_receive(RedChannelClient *rcc)
> +{
> + red_peer_handle_incoming(rcc->stream, &rcc->incoming);
> +}
> +
> void red_channel_receive(RedChannel *channel)
> {
> - red_peer_handle_incoming(channel->stream, &channel->incoming);
> + red_channel_client_receive(channel->rcc);
> }
>
> static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
> @@ -198,124 +202,194 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle
> }
> }
>
> -void red_channel_on_output(void *opaque, int n)
> +void red_channel_client_on_output(void *opaque, int n)
> {
> - RedChannel *channel = opaque;
> + RedChannelClient *rcc = opaque;
>
> - stat_inc_counter(channel->out_bytes_counter, n);
> + stat_inc_counter(rcc->channel->out_bytes_counter, n);
> }
>
> -void red_channel_default_peer_on_error(RedChannel *channel)
> +void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
> {
> - channel->disconnect(channel);
> + rcc->channel->disconnect(rcc);
> }
>
> -static void red_channel_peer_on_incoming_error(RedChannel *channel)
> +static void red_channel_peer_on_incoming_error(RedChannelClient *rcc)
> {
> - channel->on_incoming_error(channel);
> + rcc->channel->on_incoming_error(rcc);
> }
>
> -static void red_channel_peer_on_outgoing_error(RedChannel *channel)
> +static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc)
> {
> - channel->on_outgoing_error(channel);
> + rcc->channel->on_outgoing_error(rcc);
> }
>
> -static int red_channel_peer_get_out_msg_size(void *opaque)
> +static int red_channel_client_peer_get_out_msg_size(void *opaque)
> {
> - RedChannel *channel = (RedChannel *)opaque;
> + RedChannelClient *rcc = (RedChannelClient *)opaque;
>
> - return channel->send_data.size;
> + return rcc->send_data.size;
> }
>
> -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
> +static void red_channel_client_peer_prepare_out_msg(
> + void *opaque, struct iovec *vec, int *vec_size, int pos)
> {
> - RedChannel *channel = (RedChannel *)opaque;
> + RedChannelClient *rcc = (RedChannelClient *)opaque;
>
> - *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
> + *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
> vec, MAX_SEND_VEC, pos);
> }
>
> -static void red_channel_peer_on_out_block(void *opaque)
> +static void red_channel_client_peer_on_out_block(void *opaque)
> {
> - RedChannel *channel = (RedChannel *)opaque;
> + RedChannelClient *rcc = (RedChannelClient *)opaque;
>
> - channel->send_data.blocked = TRUE;
> - channel->core->watch_update_mask(channel->stream->watch,
> + rcc->send_data.blocked = TRUE;
> + rcc->channel->core->watch_update_mask(rcc->stream->watch,
> SPICE_WATCH_EVENT_READ |
> SPICE_WATCH_EVENT_WRITE);
> }
>
> -static void red_channel_reset_send_data(RedChannel *channel)
> +static void red_channel_client_reset_send_data(RedChannelClient *rcc)
> +{
> + spice_marshaller_reset(rcc->send_data.marshaller);
> + rcc->send_data.header = (SpiceDataHeader *)
> + spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
> + spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
> + rcc->send_data.header->type = 0;
> + rcc->send_data.header->size = 0;
> + rcc->send_data.header->sub_list = 0;
> + rcc->send_data.header->serial = ++rcc->send_data.serial;
> +}
> +
> +void red_channel_client_push_set_ack(RedChannelClient *rcc)
> {
> - spice_marshaller_reset(channel->send_data.marshaller);
> - channel->send_data.header = (SpiceDataHeader *)
> - spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> - spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> - channel->send_data.header->type = 0;
> - channel->send_data.header->size = 0;
> - channel->send_data.header->sub_list = 0;
> - channel->send_data.header->serial = ++channel->send_data.serial;
> + red_channel_pipe_add_type(rcc->channel, PIPE_ITEM_TYPE_SET_ACK);
> }
>
> void red_channel_push_set_ack(RedChannel *channel)
> {
> + // TODO - MC, should replace with add_type_all (or whatever I'll name it)
> red_channel_pipe_add_type(channel, PIPE_ITEM_TYPE_SET_ACK);
> }
>
> -static void red_channel_send_set_ack(RedChannel *channel)
> +static void red_channel_client_send_set_ack(RedChannelClient *rcc)
> {
> SpiceMsgSetAck ack;
>
> - ASSERT(channel);
> - red_channel_init_send_data(channel, SPICE_MSG_SET_ACK, NULL);
> - ack.generation = ++channel->ack_data.generation;
> - ack.window = channel->ack_data.client_window;
> - channel->ack_data.messages_window = 0;
> + ASSERT(rcc);
> + red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
> + ack.generation = ++rcc->ack_data.generation;
> + ack.window = rcc->ack_data.client_window;
> + rcc->ack_data.messages_window = 0;
>
> - spice_marshall_msg_set_ack(channel->send_data.marshaller, &ack);
> + spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
>
> - red_channel_begin_send_message(channel);
> + red_channel_client_begin_send_message(rcc);
> }
>
> -static void red_channel_send_item(RedChannel *channel, PipeItem *item)
> +static void red_channel_client_send_item(RedChannelClient *rcc, PipeItem *item)
> {
> - red_channel_reset_send_data(channel);
> + int handled = TRUE;
> +
> + ASSERT(rcc->send_data.item == NULL);
> + red_channel_client_reset_send_data(rcc);
> switch (item->type) {
> case PIPE_ITEM_TYPE_SET_ACK:
> - red_channel_send_set_ack(channel);
> - return;
> + red_channel_client_send_set_ack(rcc);
> + break;
> + default:
> + handled = FALSE;
> + }
> + if (!handled) {
> + rcc->channel->send_item(rcc, item);
> }
> - /* only reached if not handled here */
> - channel->send_item(channel, item);
> }
>
> -static void red_channel_release_item(RedChannel *channel, PipeItem *item, int item_pushed)
> +static void red_channel_client_release_item(RedChannelClient *rcc,
> + PipeItem *item, int item_pushed)
> {
> + int handled = TRUE;
> +
> switch (item->type) {
> case PIPE_ITEM_TYPE_SET_ACK:
> free(item);
> - return;
> + default:
> + handled = FALSE;
> }
> - /* only reached if not handled here */
> - channel->release_item(channel, item, item_pushed);
> + if (!handled) {
> + rcc->channel->release_item(rcc, item, item_pushed);
> + }
> }
>
> -static void red_channel_peer_on_out_msg_done(void *opaque)
> +static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
> {
> - RedChannel *channel = (RedChannel *)opaque;
> - channel->send_data.size = 0;
> - if (channel->send_data.item) {
> - red_channel_release_item(channel, channel->send_data.item, TRUE);
> - channel->send_data.item = NULL;
> + if (rcc->send_data.item) {
> + red_channel_client_release_item(rcc,
> + rcc->send_data.item, TRUE);
> + rcc->send_data.item = NULL;
> }
> - if (channel->send_data.blocked) {
> - channel->send_data.blocked = FALSE;
> - channel->core->watch_update_mask(channel->stream->watch,
> +}
> +
> +static void red_channel_peer_on_out_msg_done(void *opaque)
> +{
> + RedChannelClient *rcc = (RedChannelClient *)opaque;
> +
> + rcc->send_data.size = 0;
> + red_channel_client_release_sent_item(rcc);
> + if (rcc->send_data.blocked) {
> + rcc->send_data.blocked = FALSE;
> + rcc->channel->core->watch_update_mask(rcc->stream->watch,
> SPICE_WATCH_EVENT_READ);
> }
> }
>
> -RedChannel *red_channel_create(int size, RedsStream *stream,
> +static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
> +{
> + ASSERT(rcc);
> + channel->rcc = rcc;
> +}
> +
> +RedChannelClient *red_channel_client_create(
> + int size,
> + RedChannel *channel,
> + RedsStream *stream)
> +{
> + RedChannelClient *rcc = NULL;
> +
> + ASSERT(stream && channel && size >= sizeof(RedChannelClient));
> + rcc = spice_malloc0(size);
> + rcc->stream = stream;
> + rcc->channel = channel;
> + rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
> + // block flags)
> + rcc->ack_data.client_generation = ~0;
> + rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
> + rcc->send_data.marshaller = spice_marshaller_new();
> +
> + rcc->incoming.opaque = rcc;
> + rcc->incoming.cb = &channel->incoming_cb;
> +
> + rcc->outgoing.opaque = rcc;
> + rcc->outgoing.cb = &channel->outgoing_cb;
> + rcc->outgoing.pos = 0;
> + rcc->outgoing.size = 0;
> + if (!channel->config_socket(rcc)) {
> + goto error;
> + }
> +
> + stream->watch = channel->core->watch_add(stream->socket,
> + SPICE_WATCH_EVENT_READ,
> + red_channel_client_event, rcc);
> + red_channel_add_client(channel, rcc);
> + return rcc;
> +error:
> + free(rcc);
> + reds_stream_free(stream);
> + return NULL;
> +}
> +
> +RedChannel *red_channel_create(int size,
> SpiceCoreInterface *core,
> int migrate, int handle_acks,
> channel_configure_socket_proc config_socket,
> @@ -336,7 +410,6 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
> ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf &&
> release_item);
> channel = spice_malloc0(size);
> -
> channel->handle_acks = handle_acks;
> channel->disconnect = disconnect;
> channel->send_item = send_item;
> @@ -345,69 +418,40 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
> channel->handle_migrate_flush_mark = handle_migrate_flush_mark;
> channel->handle_migrate_data = handle_migrate_data;
> channel->handle_migrate_data_get_serial = handle_migrate_data_get_serial;
> + channel->config_socket = config_socket;
>
> - channel->stream = stream;
> channel->core = core;
> - channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
> - // block flags)
> - channel->ack_data.client_generation = ~0;
> - channel->ack_data.client_window = CLIENT_ACK_WINDOW;
> -
> channel->migrate = migrate;
> ring_init(&channel->pipe);
> - channel->send_data.marshaller = spice_marshaller_new();
>
> - channel->incoming.opaque = channel;
> channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
> channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
> channel->incoming_cb.handle_message = (handle_message_proc)handle_message;
> - channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
> -
> - channel->outgoing.opaque = channel;
> - channel->outgoing.pos = 0;
> - channel->outgoing.size = 0;
> -
> - channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size;
> - channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg;
> - channel->outgoing_cb.on_block = red_channel_peer_on_out_block;
> - channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
> + channel->incoming_cb.on_error =
> + (on_incoming_error_proc)red_channel_client_default_peer_on_error;
> + channel->outgoing_cb.get_msg_size = red_channel_client_peer_get_out_msg_size;
> + channel->outgoing_cb.prepare = red_channel_client_peer_prepare_out_msg;
> + channel->outgoing_cb.on_block = red_channel_client_peer_on_out_block;
> + channel->outgoing_cb.on_error =
> + (on_outgoing_error_proc)red_channel_client_default_peer_on_error;
> channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done;
> - channel->outgoing_cb.on_output = red_channel_on_output;
> -
> - channel->incoming.cb = &channel->incoming_cb;
> - channel->outgoing.cb = &channel->outgoing_cb;
> + channel->outgoing_cb.on_output = red_channel_client_on_output;
>
> channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
> channel->out_bytes_counter = 0;
> -
> - if (!config_socket(channel)) {
> - goto error;
> - }
> -
> - channel->stream->watch = channel->core->watch_add(channel->stream->socket,
> - SPICE_WATCH_EVENT_READ,
> - red_channel_event, channel);
> -
> return channel;
> -
> -error:
> - spice_marshaller_destroy(channel->send_data.marshaller);
> - free(channel);
> - reds_stream_free(stream);
> -
> - return NULL;
> }
>
> -void do_nothing_disconnect(RedChannel *red_channel)
> +void do_nothing_disconnect(RedChannelClient *rcc)
> {
> }
>
> -int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header, uint8_t *msg)
> +int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
> {
> return TRUE;
> }
>
> -RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> +RedChannel *red_channel_create_parser(int size,
> SpiceCoreInterface *core,
> int migrate, int handle_acks,
> channel_configure_socket_proc config_socket,
> @@ -424,7 +468,7 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> channel_handle_migrate_data_proc handle_migrate_data,
> channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
> {
> - RedChannel *channel = red_channel_create(size, stream,
> + RedChannel *channel = red_channel_create(size,
> core, migrate, handle_acks, config_socket, do_nothing_disconnect,
> do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item,
> send_item, release_item, handle_migrate_flush_mark, handle_migrate_data,
> @@ -435,62 +479,152 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> }
> channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
> channel->incoming_cb.parser = parser;
> - channel->on_incoming_error = incoming_error;
> - channel->on_outgoing_error = outgoing_error;
> channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
> channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
> + channel->on_incoming_error = incoming_error;
> + channel->on_outgoing_error = outgoing_error;
> return channel;
> }
>
> +void red_channel_client_destroy(RedChannelClient *rcc)
> +{
> + red_channel_client_disconnect(rcc);
> + spice_marshaller_destroy(rcc->send_data.marshaller);
> + free(rcc);
> +}
> +
> void red_channel_destroy(RedChannel *channel)
> {
> if (!channel) {
> return;
> }
> - red_channel_pipe_clear(channel);
> - reds_stream_free(channel->stream);
> - spice_marshaller_destroy(channel->send_data.marshaller);
> + if (channel->rcc) {
> + red_channel_client_destroy(channel->rcc);
> + }
> free(channel);
> }
>
> +static void red_channel_client_shutdown(RedChannelClient *rcc)
> +{
> + if (rcc->stream && !rcc->stream->shutdown) {
> + rcc->channel->core->watch_remove(rcc->stream->watch);
> + rcc->stream->watch = NULL;
> + shutdown(rcc->stream->socket, SHUT_RDWR);
> + rcc->stream->shutdown = TRUE;
> + rcc->incoming.shut = TRUE;
> + }
> + red_channel_client_release_sent_item(rcc);
> +}
> +
> void red_channel_shutdown(RedChannel *channel)
> {
> - red_printf("");
> - if (channel->stream && !channel->stream->shutdown) {
> - channel->core->watch_update_mask(channel->stream->watch,
> - SPICE_WATCH_EVENT_READ);
> - red_channel_pipe_clear(channel);
> - shutdown(channel->stream->socket, SHUT_RDWR);
> - channel->stream->shutdown = TRUE;
> - channel->incoming.shut = TRUE;
> + if (channel->rcc) {
> + red_channel_client_shutdown(channel->rcc);
> + }
> + red_channel_pipe_clear(channel);
> +}
> +
> +void red_channel_client_send(RedChannelClient *rcc)
> +{
> + red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
> +}
> +
> +void red_channel_send(RedChannel *channel)
> +{
> + if (channel->rcc) {
> + red_channel_client_send(channel->rcc);
> + }
> +}
> +
> +static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
> +{
> + return (rcc->channel->handle_acks &&
> + (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
> +}
> +
> +// TODO: add refs and target to PipeItem. Right now this only works for a
> +// single client (or actually, it's worse - first come first served)
> +static inline PipeItem *red_channel_client_pipe_get(RedChannelClient *rcc)
> +{
> + PipeItem *item;
> +
> + if (!rcc || rcc->send_data.blocked
> + || red_channel_client_waiting_for_ack(rcc)
> + || !(item = (PipeItem *)ring_get_tail(&rcc->channel->pipe))) {
> + return NULL;
> + }
> + --rcc->channel->pipe_size;
> + ring_remove(&item->link);
> + return item;
> +}
> +
> +static void red_channel_client_push(RedChannelClient *rcc)
> +{
> + PipeItem *pipe_item;
> +
> + if (!rcc->during_send) {
> + rcc->during_send = TRUE;
> + } else {
> + return;
> + }
> +
> + if (rcc->send_data.blocked) {
> + red_channel_client_send(rcc);
> + }
> +
> + while ((pipe_item = red_channel_client_pipe_get(rcc))) {
> + red_channel_client_send_item(rcc, pipe_item);
> + }
> + rcc->during_send = FALSE;
> +}
> +
> +void red_channel_push(RedChannel *channel)
> +{
> + if (!channel || !channel->rcc) {
> + return;
> }
> + red_channel_client_push(channel->rcc);
> +}
> +
> +static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
> +{
> + rcc->ack_data.messages_window = 0;
> + red_channel_client_push(rcc);
> }
>
> +// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
> +// specific
> void red_channel_init_outgoing_messages_window(RedChannel *channel)
> {
> - channel->ack_data.messages_window = 0;
> - red_channel_push(channel);
> + red_channel_client_init_outgoing_messages_window(channel->rcc);
> }
>
> -void red_channel_handle_migrate_flush_mark_proc(RedChannel *channel)
> +static void red_channel_handle_migrate_flush_mark_proc(RedChannel *channel)
> {
> if (channel->handle_migrate_flush_mark) {
> - channel->handle_migrate_flush_mark(channel);
> + channel->handle_migrate_flush_mark(channel->rcc);
> }
> }
>
> -void red_channel_handle_migrate_data(RedChannel *channel, uint32_t size, void *message)
> +// TODO: the whole migration is broken with multiple clients. What do we want to do?
> +// basically just
> +// 1) source send mark to all
> +// 2) source gets at various times the data (waits for all)
> +// 3) source migrates to target
> +// 4) target sends data to all
> +// So need to make all the handlers work with per channel/client data (what data exactly?)
> +static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
> {
> - if (!channel->handle_migrate_data) {
> + if (!rcc->channel->handle_migrate_data) {
> return;
> }
> - ASSERT(red_channel_get_message_serial(channel) == 0);
> - red_channel_set_message_serial(channel,
> - channel->handle_migrate_data_get_serial(channel, size, message));
> - channel->handle_migrate_data(channel, size, message);
> + ASSERT(red_channel_client_get_message_serial(rcc) == 0);
> + red_channel_client_set_message_serial(rcc,
> + rcc->channel->handle_migrate_data_get_serial(rcc, size, message));
> + rcc->channel->handle_migrate_data(rcc, size, message);
> }
>
> -int red_channel_handle_message(RedChannel *channel, uint32_t size,
> +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
> uint16_t type, void *message)
> {
> switch (type) {
> @@ -499,21 +633,21 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size,
> red_printf("bad message size");
> return FALSE;
> }
> - channel->ack_data.client_generation = *(uint32_t *)(message);
> + rcc->ack_data.client_generation = *(uint32_t *)(message);
> break;
> case SPICE_MSGC_ACK:
> - if (channel->ack_data.client_generation == channel->ack_data.generation) {
> - channel->ack_data.messages_window -= channel->ack_data.client_window;
> - red_channel_push(channel);
> + if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
> + rcc->ack_data.messages_window -= rcc->ack_data.client_window;
> + red_channel_client_push(rcc);
> }
> break;
> case SPICE_MSGC_DISCONNECTING:
> break;
> case SPICE_MSGC_MIGRATE_FLUSH_MARK:
> - red_channel_handle_migrate_flush_mark_proc(channel);
> + red_channel_handle_migrate_flush_mark_proc(rcc->channel);
> break;
> case SPICE_MSGC_MIGRATE_DATA:
> - red_channel_handle_migrate_data(channel, size, message);
> + red_channel_handle_migrate_data(rcc, size, message);
> break;
> default:
> red_printf("invalid message type %u", type);
> @@ -522,75 +656,54 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size,
> return TRUE;
> }
>
> -static void red_channel_event(int fd, int event, void *data)
> +static void red_channel_client_event(int fd, int event, void *data)
> {
> - RedChannel *channel = (RedChannel *)data;
> + RedChannelClient *rcc = (RedChannelClient *)data;
>
> if (event & SPICE_WATCH_EVENT_READ) {
> - red_channel_receive(channel);
> + red_channel_client_receive(rcc);
> }
> if (event & SPICE_WATCH_EVENT_WRITE) {
> - red_channel_push(channel);
> + red_channel_client_push(rcc);
> }
> }
>
> -void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
> +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item)
> {
> - ASSERT(channel->send_data.item == NULL);
> - channel->send_data.header->type = msg_type;
> - channel->send_data.item = item;
> + ASSERT(rcc->send_data.item == NULL);
> + ASSERT(msg_type != 0);
> + rcc->send_data.header->type = msg_type;
> + rcc->send_data.item = item;
> if (item) {
> - channel->hold_item(channel, item);
> + rcc->channel->hold_item(rcc, item);
> }
> }
>
> -void red_channel_send(RedChannel *channel)
> -{
> - red_peer_handle_outgoing(channel->stream, &channel->outgoing);
> -}
> -
> -void red_channel_begin_send_message(RedChannel *channel)
> +void red_channel_client_begin_send_message(RedChannelClient *rcc)
> {
> - spice_marshaller_flush(channel->send_data.marshaller);
> - channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
> - channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader);
> - channel->ack_data.messages_window++;
> - channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
> - red_channel_send(channel);
> -}
> -
> -void red_channel_push(RedChannel *channel)
> -{
> - PipeItem *pipe_item;
> + SpiceMarshaller *m = rcc->send_data.marshaller;
>
> - if (!channel) {
> + // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
> + if (rcc->send_data.header->type == 0) {
> + red_printf("BUG: header->type == 0");
> return;
> }
> -
> - if (!channel->during_send) {
> - channel->during_send = TRUE;
> - } else {
> - return;
> - }
> -
> - if (channel->send_data.blocked) {
> - red_channel_send(channel);
> - }
> -
> - while ((pipe_item = red_channel_pipe_get(channel))) {
> - red_channel_send_item(channel, pipe_item);
> - }
> - channel->during_send = FALSE;
> + spice_marshaller_flush(m);
> + rcc->send_data.size = spice_marshaller_get_total_size(m);
> + rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader);
> + rcc->ack_data.messages_window++;
> + rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */
> + red_channel_client_send(rcc);
> }
>
> -uint64_t red_channel_get_message_serial(RedChannel *channel)
> +uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
> {
> - return channel->send_data.serial;
> + return rcc->send_data.serial;
> }
>
> -void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
> +void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
> {
> - channel->send_data.serial = serial;
> + rcc->send_data.serial = serial;
> }
>
> void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
> @@ -654,28 +767,17 @@ void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type)
> red_channel_push(channel);
> }
>
> -static inline int red_channel_waiting_for_ack(RedChannel *channel)
> +int red_channel_is_connected(RedChannel *channel)
> {
> - return (channel->handle_acks && (channel->ack_data.messages_window > channel->ack_data.client_window * 2));
> + return channel->rcc != NULL;
> }
>
> -static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
> +void red_channel_client_clear_sent_item(RedChannelClient *rcc)
> {
> - PipeItem *item;
> -
> - if (!channel || channel->send_data.blocked ||
> - red_channel_waiting_for_ack(channel) ||
> - !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
> - return NULL;
> + if (rcc->send_data.item) {
> + red_channel_client_release_item(rcc, rcc->send_data.item, TRUE);
> + rcc->send_data.item = NULL;
> }
> - --channel->pipe_size;
> - ring_remove(&item->link);
> - return item;
> -}
> -
> -int red_channel_is_connected(RedChannel *channel)
> -{
> - return !!channel->stream;
> }
>
> void red_channel_pipe_clear(RedChannel *channel)
> @@ -683,82 +785,151 @@ void red_channel_pipe_clear(RedChannel *channel)
> PipeItem *item;
>
> ASSERT(channel);
> - if (channel->send_data.item) {
> - red_channel_release_item(channel, channel->send_data.item, TRUE);
> - channel->send_data.item = NULL;
> + if (channel->rcc) {
> + red_channel_client_clear_sent_item(channel->rcc);
> }
> while ((item = (PipeItem *)ring_get_head(&channel->pipe))) {
> ring_remove(&item->link);
> - red_channel_release_item(channel, item, FALSE);
> + red_channel_client_release_item(channel->rcc, item, FALSE);
> }
> channel->pipe_size = 0;
> }
>
> +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
> +{
> + rcc->ack_data.messages_window = 0;
> +}
> +
> void red_channel_ack_zero_messages_window(RedChannel *channel)
> {
> - channel->ack_data.messages_window = 0;
> + red_channel_client_ack_zero_messages_window(channel->rcc);
> +}
> +
> +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
> +{
> + rcc->ack_data.client_window = client_window;
> +}
> +
> +void red_channel_ack_set_client_window(RedChannel* channel, int client_window)
> +{
> + if (channel->rcc) {
> + red_channel_client_ack_set_client_window(channel->rcc, client_window);
> + }
> +}
> +
> +void red_channel_client_disconnect(RedChannelClient *rcc)
> +{
> + red_printf("%p (channel %p)", rcc, rcc->channel);
> +
> + if (rcc->send_data.item) {
> + rcc->channel->release_item(rcc, rcc->send_data.item, FALSE);
> + }
> + // TODO: clear our references from the pipe
> + reds_stream_free(rcc->stream);
> + rcc->send_data.item = NULL;
> + rcc->send_data.blocked = FALSE;
> + rcc->send_data.size = 0;
> + rcc->channel->rcc = NULL;
> +}
> +
> +void red_channel_disconnect(RedChannel *channel)
> +{
> + red_channel_pipe_clear(channel);
> + if (channel->rcc) {
> + red_channel_client_disconnect(channel->rcc);
> + }
> +}
> +
> +int red_channel_all_clients_serials_are_zero(RedChannel *channel)
> +{
> + return (!channel->rcc || channel->rcc->send_data.serial == 0);
> +}
> +
> +void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v)
> +{
> + if (channel->rcc) {
> + v(channel->rcc);
> + }
> +}
> +
> +void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data)
> +{
> + if (channel->rcc) {
> + v(channel->rcc, data);
> + }
> }
>
> -void red_channel_ack_set_client_window(RedChannel *channel, int client_window)
> +void red_channel_set_shut(RedChannel *channel)
> {
> - channel->ack_data.client_window = client_window;
> + if (channel->rcc) {
> + channel->rcc->incoming.shut = TRUE;
> + }
> }
>
> int red_channel_all_blocked(RedChannel *channel)
> {
> - return channel->send_data.blocked;
> + return !channel->rcc || channel->rcc->send_data.blocked;
> }
>
> int red_channel_any_blocked(RedChannel *channel)
> {
> - return channel->send_data.blocked;
> + return !channel->rcc || channel->rcc->send_data.blocked;
> }
>
> -int red_channel_send_message_pending(RedChannel *channel)
> +int red_channel_client_send_message_pending(RedChannelClient *rcc)
> {
> - return channel->send_data.header->type != 0;
> + return rcc->send_data.header->type != 0;
> }
>
> -/* accessors for RedChannel */
> -SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel)
> +/* accessors for RedChannelClient */
> +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
> {
> - return channel->send_data.marshaller;
> + return rcc->send_data.marshaller;
> }
>
> -RedsStream *red_channel_get_stream(RedChannel *channel)
> +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
> {
> - return channel->stream;
> + return rcc->stream;
> }
>
> -SpiceDataHeader *red_channel_get_header(RedChannel *channel)
> +SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc)
> {
> - return channel->send_data.header;
> + return rcc->send_data.header;
> }
> /* end of accessors */
>
> int red_channel_get_first_socket(RedChannel *channel)
> {
> - if (!channel->stream) {
> + if (!channel->rcc || !channel->rcc->stream) {
> return -1;
> }
> - return channel->stream->socket;
> + return channel->rcc->stream->socket;
> +}
> +
> +int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item)
> +{
> + return rcc->send_data.item == item;
> }
>
> int red_channel_item_being_sent(RedChannel *channel, PipeItem *item)
> {
> - return channel->send_data.item == item;
> + return channel->rcc && red_channel_client_item_being_sent(channel->rcc, item);
> }
>
> int red_channel_no_item_being_sent(RedChannel *channel)
> {
> - return channel->send_data.item == NULL;
> + return !channel->rcc || channel->rcc->send_data.item == NULL;
> }
>
> -void red_channel_disconnect(RedChannel *channel)
> +static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item)
> {
> - red_channel_pipe_clear(channel);
> - reds_stream_free(channel->stream);
> - channel->stream = NULL;
> - channel->send_data.blocked = FALSE;
> - channel->send_data.size = 0;
> + rcc->channel->pipe_size--;
> + ring_remove(&item->link);
> +}
> +
> +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
> + PipeItem *item)
> +{
> + red_channel_client_pipe_remove(rcc, item);
> + red_channel_client_release_item(rcc, item, FALSE);
> }
> diff --git a/server/red_channel.h b/server/red_channel.h
> index d05722c..4d6a23c 100644
> --- a/server/red_channel.h
> +++ b/server/red_channel.h
> @@ -97,6 +97,9 @@ typedef struct BufDescriptor {
> uint8_t *data;
> } BufDescriptor;
>
> +typedef struct RedChannel RedChannel;
> +typedef struct RedChannelClient RedChannelClient;
> +
> /* Messages handled by red_channel
> * SET_ACK - sent to client on channel connection
> * Note that the numbers don't have to correspond to spice message types,
> @@ -112,37 +115,33 @@ typedef struct PipeItem {
> int type;
> } PipeItem;
>
> -typedef struct RedChannel RedChannel;
> -
> -typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel,
> +typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannelClient *channel,
> SpiceDataHeader *msg_header);
> -typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type,
> +typedef int (*channel_handle_parsed_proc)(RedChannelClient *rcc, uint32_t size, uint16_t type,
> void *message);
> -typedef int (*channel_handle_message_proc)(RedChannel *channel,
> +typedef int (*channel_handle_message_proc)(RedChannelClient *rcc,
> SpiceDataHeader *header, uint8_t *msg);
> -typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel,
> +typedef void (*channel_release_msg_recv_buf_proc)(RedChannelClient *channel,
> SpiceDataHeader *msg_header, uint8_t *msg);
> -typedef void (*channel_disconnect_proc)(RedChannel *channel);
> -typedef int (*channel_configure_socket_proc)(RedChannel *channel);
> -typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item);
> -typedef void (*channel_hold_pipe_item_proc)(RedChannel *channel, PipeItem *item);
> -typedef void (*channel_release_pipe_item_proc)(RedChannel *channel,
> +typedef void (*channel_disconnect_proc)(RedChannelClient *rcc);
> +typedef int (*channel_configure_socket_proc)(RedChannelClient *rcc);
> +typedef void (*channel_send_pipe_item_proc)(RedChannelClient *rcc, PipeItem *item);
> +typedef void (*channel_hold_pipe_item_proc)(RedChannelClient *rcc, PipeItem *item);
> +typedef void (*channel_release_pipe_item_proc)(RedChannelClient *rcc,
> PipeItem *item, int item_pushed);
> -typedef void (*channel_on_incoming_error_proc)(RedChannel *channel);
> -typedef void (*channel_on_outgoing_error_proc)(RedChannel *channel);
> +typedef void (*channel_on_incoming_error_proc)(RedChannelClient *rcc);
> +typedef void (*channel_on_outgoing_error_proc)(RedChannelClient *rcc);
>
> -typedef int (*channel_handle_migrate_flush_mark_proc)(RedChannel *channel);
> -typedef uint64_t (*channel_handle_migrate_data_proc)(RedChannel *channel,
> +typedef int (*channel_handle_migrate_flush_mark_proc)(RedChannelClient *base);
> +typedef uint64_t (*channel_handle_migrate_data_proc)(RedChannelClient *base,
> uint32_t size, void *message);
> -typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannel *channel,
> +typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannelClient *base,
> uint32_t size, void *message);
>
> -struct RedChannel {
> +struct RedChannelClient {
> + RingItem channel_link;
> + RedChannel *channel;
> RedsStream *stream;
> - SpiceCoreInterface *core;
> - int migrate;
> - int handle_acks;
> -
> struct {
> uint32_t generation;
> uint32_t client_generation;
> @@ -150,9 +149,6 @@ struct RedChannel {
> uint32_t client_window;
> } ack_data;
>
> - Ring pipe;
> - uint32_t pipe_size;
> -
> struct {
> SpiceMarshaller *marshaller;
> SpiceDataHeader *header;
> @@ -164,16 +160,28 @@ struct RedChannel {
>
> OutgoingHandler outgoing;
> IncomingHandler incoming;
> + int during_send;
> +};
> +
> +struct RedChannel {
> + SpiceCoreInterface *core;
> + int migrate;
> + int handle_acks;
> +
> + RedChannelClient *rcc;
> +
> + Ring pipe;
> + uint32_t pipe_size;
>
> OutgoingHandlerInterface outgoing_cb;
> IncomingHandlerInterface incoming_cb;
>
> + channel_configure_socket_proc config_socket;
> channel_disconnect_proc disconnect;
> channel_send_pipe_item_proc send_item;
> channel_hold_pipe_item_proc hold_item;
> channel_release_pipe_item_proc release_item;
>
> - int during_send;
> /* Stuff below added for Main and Inputs channels switch to RedChannel
> * (might be removed later) */
> channel_on_incoming_error_proc on_incoming_error; /* alternative to disconnect */
> @@ -190,7 +198,7 @@ struct RedChannel {
>
> /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
> explicitly destroy the channel */
> -RedChannel *red_channel_create(int size, RedsStream *stream,
> +RedChannel *red_channel_create(int size,
> SpiceCoreInterface *core,
> int migrate, int handle_acks,
> channel_configure_socket_proc config_socket,
> @@ -207,7 +215,7 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
>
> /* alternative constructor, meant for marshaller based (inputs,main) channels,
> * will become default eventually */
> -RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> +RedChannel *red_channel_create_parser(int size,
> SpiceCoreInterface *core,
> int migrate, int handle_acks,
> channel_configure_socket_proc config_socket,
> @@ -223,29 +231,31 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
> channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
> channel_handle_migrate_data_proc handle_migrate_data,
> channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial);
> -
> +RedChannelClient *red_channel_client_create(int size, RedChannel *channel,
> + RedsStream *stream);
> int red_channel_is_connected(RedChannel *channel);
>
> +void red_channel_client_destroy(RedChannelClient *rcc);
> void red_channel_destroy(RedChannel *channel);
>
> /* should be called when a new channel is ready to send messages */
> void red_channel_init_outgoing_messages_window(RedChannel *channel);
>
> /* handles general channel msgs from the client */
> -int red_channel_handle_message(RedChannel *channel, uint32_t size,
> +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
> uint16_t type, void *message);
>
> /* default error handler that disconnects channel */
> -void red_channel_default_peer_on_error(RedChannel *channel);
> +void red_channel_client_default_peer_on_error(RedChannelClient *rcc);
>
> /* when preparing send_data: should call init and then use marshaller */
> -void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item);
> +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item);
>
> -uint64_t red_channel_get_message_serial(RedChannel *channel);
> -void red_channel_set_message_serial(RedChannel *channel, uint64_t);
> +uint64_t red_channel_client_get_message_serial(RedChannelClient *channel);
> +void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
>
> -/* when sending a msg. should first call red_channel_begin_send_message */
> -void red_channel_begin_send_message(RedChannel *channel);
> +/* when sending a msg. should first call red_channel_client_begin_send_message */
> +void red_channel_client_begin_send_message(RedChannelClient *rcc);
>
> void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type);
> void red_channel_pipe_add_push(RedChannel *channel, PipeItem *item);
> @@ -253,14 +263,19 @@ void red_channel_pipe_add(RedChannel *channel, PipeItem *item);
> void red_channel_pipe_add_after(RedChannel *channel, PipeItem *item, PipeItem *pos);
> int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item);
> void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
> +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, PipeItem *item);
> void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item);
> /* for types that use this routine -> the pipe item should be freed */
> void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type);
>
> +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc);
> +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window);
> +void red_channel_client_push_set_ack(RedChannelClient *rcc);
> void red_channel_ack_zero_messages_window(RedChannel *channel);
> void red_channel_ack_set_client_window(RedChannel *channel, int client_window);
> void red_channel_push_set_ack(RedChannel *channel);
>
> +/* TODO: This sets all clients to shut state - probably we want to close per channel */
> void red_channel_shutdown(RedChannel *channel);
>
> int red_channel_get_first_socket(RedChannel *channel);
> @@ -272,7 +287,7 @@ int red_channel_all_blocked(RedChannel *channel);
> int red_channel_any_blocked(RedChannel *channel);
>
> /* helper for channels that have complex logic that can possibly ready a send */
> -int red_channel_send_message_pending(RedChannel *channel);
> +int red_channel_client_send_message_pending(RedChannelClient *rcc);
>
> /* returns TRUE if item is being sent by one of the channel clients. This will
> * be true if someone called init_send_data but send has not completed (or perhaps
> @@ -299,14 +314,18 @@ void red_channel_pipe_clear(RedChannel *channel);
> // red_wait_pipe_item_sent
> // handle_channel_events - this is the only one that was used before, and was in red_channel.c
> void red_channel_receive(RedChannel *channel);
> +void red_channel_client_receive(RedChannelClient *rcc);
> +// For red_worker
> void red_channel_send(RedChannel *channel);
> +void red_channel_client_send(RedChannelClient *rcc);
> // For red_worker
> void red_channel_disconnect(RedChannel *channel);
> +void red_channel_client_disconnect(RedChannelClient *rcc);
>
> -/* accessors for RedChannel */
> +/* accessors for RedChannelClient */
> /* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */
> -SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel);
> -RedsStream *red_channel_get_stream(RedChannel *channel);
> +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc);
> +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc);
>
> /* this is a convenience function for sending messages, sometimes (migration only?)
> * the serial from the header needs to be available for sending. Note that the header
> @@ -314,5 +333,12 @@ RedsStream *red_channel_get_stream(RedChannel *channel);
> * red_channel_begin_send_message. red_channel_init_send_data changes the header (sets
> * the type in it) as a convenience function. It is preffered to do that through it and
> * not via the below accessor and direct header manipulation. */
> -SpiceDataHeader *red_channel_get_header(RedChannel *channel);
> +SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc);
> +
> +/* apply given function to all connected clients */
> +typedef void (*channel_client_visitor)(RedChannelClient *rcc);
> +typedef void (*channel_client_visitor_data)(RedChannelClient *rcc, void *data);
> +void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v);
> +void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data);
> +
> #endif
> diff --git a/server/red_client_shared_cache.h b/server/red_client_shared_cache.h
> index 74553c0..dddccc6 100644
> --- a/server/red_client_shared_cache.h
> +++ b/server/red_client_shared_cache.h
> @@ -26,6 +26,7 @@
> #define FUNC_NAME(name) pixmap_cache_##name
> #define PRIVATE_FUNC_NAME(name) __pixmap_cache_##name
> #define CHANNEL DisplayChannel
> +#define CHANNEL_FROM_RCC(rcc) SPICE_CONTAINEROF(rcc->channel, CHANNEL, common.base);
> #define CACH_GENERATION pixmap_cache_generation
> #define INVAL_ALL_VERB SPICE_MSG_DISPLAY_INVAL_ALL_PIXMAPS
> #else
> @@ -35,12 +36,13 @@
> #endif
>
>
> -static int FUNC_NAME(hit)(CACHE *cache, uint64_t id, int *lossy, CHANNEL *channel)
> +static int FUNC_NAME(hit)(CACHE *cache, uint64_t id, int *lossy, RedChannelClient *rcc)
> {
> + CHANNEL *channel = CHANNEL_FROM_RCC(rcc);
> NewCacheItem *item;
> uint64_t serial;
>
> - serial = red_channel_get_message_serial((RedChannel *)channel);
> + serial = red_channel_client_get_message_serial(rcc);
> pthread_mutex_lock(&cache->lock);
> item = cache->hash_table[CACHE_HASH_KEY(id)];
>
> @@ -79,8 +81,9 @@ static int FUNC_NAME(set_lossy)(CACHE *cache, uint64_t id, int lossy)
> return !!item;
> }
>
> -static int FUNC_NAME(add)(CACHE *cache, uint64_t id, uint32_t size, int lossy, CHANNEL *channel)
> +static int FUNC_NAME(add)(CACHE *cache, uint64_t id, uint32_t size, int lossy, RedChannelClient *rcc)
> {
> + CHANNEL *channel = CHANNEL_FROM_RCC(rcc);
> NewCacheItem *item;
> uint64_t serial;
> int key;
> @@ -88,7 +91,7 @@ static int FUNC_NAME(add)(CACHE *cache, uint64_t id, uint32_t size, int lossy, C
> ASSERT(size > 0);
>
> item = spice_new(NewCacheItem, 1);
> - serial = red_channel_get_message_serial((RedChannel *)channel);
> + serial = red_channel_client_get_message_serial(rcc);
>
> pthread_mutex_lock(&cache->lock);
>
> @@ -166,13 +169,14 @@ static void PRIVATE_FUNC_NAME(clear)(CACHE *cache)
> cache->items = 0;
> }
>
> -static void FUNC_NAME(reset)(CACHE *cache, CHANNEL *channel, SpiceMsgWaitForChannels* sync_data)
> +static void FUNC_NAME(reset)(CACHE *cache, RedChannelClient *rcc, SpiceMsgWaitForChannels* sync_data)
> {
> + CHANNEL *channel = CHANNEL_FROM_RCC(rcc);
> uint8_t wait_count;
> uint64_t serial;
> uint32_t i;
>
> - serial = red_channel_get_message_serial((RedChannel *)channel);
> + serial = red_channel_client_get_message_serial(rcc);
> pthread_mutex_lock(&cache->lock);
> PRIVATE_FUNC_NAME(clear)(cache);
>
> @@ -230,4 +234,5 @@ static void FUNC_NAME(destroy)(CACHE *cache)
> #undef FUNC_NAME
> #undef VAR_NAME
> #undef CHANNEL
> +#undef CHANNEL_FROM_RCC
>
> diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
> index 5df801c..8183ed0 100644
> --- a/server/red_tunnel_worker.c
> +++ b/server/red_tunnel_worker.c
> @@ -1642,9 +1642,11 @@ static int tunnel_channel_handle_socket_token(TunnelChannel *channel, RedSocket
> return TRUE;
> }
>
> -static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> +static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
> + SpiceDataHeader *msg_header)
> {
> - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> + TunnelChannel *tunnel_channel = (TunnelChannel *)rcc->channel;
> +
> if (msg_header->type == SPICE_MSGC_TUNNEL_SOCKET_DATA) {
> return (__tunnel_worker_alloc_socket_rcv_buf(tunnel_channel->worker)->buf);
> } else if ((msg_header->type == SPICE_MSGC_MIGRATE_DATA) ||
> @@ -1656,10 +1658,11 @@ static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataH
> }
>
> // called by the receive routine of the channel, before the buffer was assigned to a socket
> -static void tunnel_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
> +static void tunnel_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
> uint8_t *msg)
> {
> - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> + TunnelChannel *tunnel_channel = (TunnelChannel *)rcc->channel;
> +
> if (msg_header->type == SPICE_MSGC_TUNNEL_SOCKET_DATA) {
> ASSERT(!(SPICE_CONTAINEROF(msg, RedSocketRawRcvBuf, buf)->base.usr_opaque));
> __tunnel_worker_free_socket_rcv_buf(tunnel_channel->worker,
> @@ -1741,9 +1744,9 @@ static void __tunnel_channel_fill_socket_migrate_item(TunnelChannel *channel, Re
> }
>
> static void release_migrate_item(TunnelMigrateItem *item);
> -static int tunnel_channel_handle_migrate_mark(RedChannel *base)
> +static int tunnel_channel_handle_migrate_mark(RedChannelClient *rcc)
> {
> - TunnelChannel *channel = SPICE_CONTAINEROF(base, TunnelChannel, base);
> + TunnelChannel *channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> TunnelMigrateItem *migrate_item = NULL;
> TunnelService *service;
> TunnelMigrateServiceItem *mig_service;
> @@ -2156,7 +2159,7 @@ static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *chann
> }
> }
>
> -static uint64_t tunnel_channel_handle_migrate_data_get_serial_proc(RedChannel *base,
> +static uint64_t tunnel_channel_handle_migrate_data_get_serial_proc(RedChannelClient *rcc,
> uint32_t size, void *msg)
> {
> TunnelMigrateData *migrate_data = msg;
> @@ -2169,10 +2172,10 @@ static uint64_t tunnel_channel_handle_migrate_data_get_serial_proc(RedChannel *b
> return migrate_data->message_serial;
> }
>
> -static uint64_t tunnel_channel_handle_migrate_data(RedChannel *base,
> +static uint64_t tunnel_channel_handle_migrate_data(RedChannelClient *rcc,
> uint32_t size, void *msg)
> {
> - TunnelChannel *channel = SPICE_CONTAINEROF(base, TunnelChannel, base);
> + TunnelChannel *channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> TunnelMigrateSocketList *sockets_list;
> TunnelMigrateServicesList *services_list;
> TunnelMigrateData *migrate_data = msg;
> @@ -2239,9 +2242,9 @@ error:
> }
>
> // msg was allocated by tunnel_channel_alloc_msg_rcv_buf
> -static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *header, uint8_t *msg)
> +static int tunnel_channel_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
> {
> - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> + TunnelChannel *tunnel_channel = (TunnelChannel *)rcc->channel;
> RedSocket *sckt = NULL;
> // retrieve the sckt
> switch (header->type) {
> @@ -2265,7 +2268,7 @@ static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *h
> }
> break;
> default:
> - return red_channel_handle_message(channel, header->size, header->type, msg);
> + return red_channel_client_handle_message(rcc, header->size, header->type, msg);
> }
>
> switch (header->type) {
> @@ -2334,7 +2337,7 @@ static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *h
> return tunnel_channel_handle_socket_token(tunnel_channel, sckt,
> (SpiceMsgcTunnelSocketTokens *)msg);
> default:
> - return red_channel_handle_message(channel, header->size, header->type, msg);
> + return red_channel_client_handle_message(rcc, header->size, header->type, msg);
> }
> return TRUE;
> }
> @@ -2343,13 +2346,16 @@ static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *h
> /* outgoing msgs
> ********************************/
>
> -static void tunnel_channel_marshall_migrate(TunnelChannel *tunnel_channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_migrate(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> - ASSERT(tunnel_channel);
> + TunnelChannel *tunnel_channel;
> +
> + ASSERT(rcc);
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> tunnel_channel->send_data.u.migrate.flags =
> SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
> tunnel_channel->expect_migrate_mark = TRUE;
> - red_channel_init_send_data(&tunnel_channel->base, SPICE_MSG_MIGRATE, item);
> + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, item);
> spice_marshaller_add_ref(m,
> (uint8_t*)&tunnel_channel->send_data.u.migrate,
> sizeof(SpiceMsgMigrate));
> @@ -2489,20 +2495,23 @@ static int __tunnel_channel_marshall_socket_migrate_data(TunnelChannel *channel,
> return (cur_offset - offset);
> }
>
> -static void tunnel_channel_marshall_migrate_data(TunnelChannel *channel,
> +static void tunnel_channel_marshall_migrate_data(RedChannelClient *rcc,
> SpiceMarshaller *m, PipeItem *item)
> {
> - TunnelMigrateData *migrate_data = &channel->send_data.u.migrate_data;
> + TunnelChannel *tunnel_channel;
> + TunnelMigrateData *migrate_data;
> TunnelMigrateItem *migrate_item = (TunnelMigrateItem *)item;
> int i;
>
> uint32_t data_buf_offset = 0; // current location in data[0] field
> - ASSERT(channel);
> + ASSERT(rcc);
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> + migrate_data = &tunnel_channel->send_data.u.migrate_data;
>
> migrate_data->magic = TUNNEL_MIGRATE_DATA_MAGIC;
> migrate_data->version = TUNNEL_MIGRATE_DATA_VERSION;
> - migrate_data->message_serial = red_channel_get_message_serial(&channel->base);
> - red_channel_init_send_data(&channel->base, SPICE_MSG_MIGRATE_DATA, item);
> + migrate_data->message_serial = red_channel_client_get_message_serial(rcc);
> + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item);
> spice_marshaller_add_ref(m, (uint8_t*)migrate_data, sizeof(*migrate_data));
>
> migrate_data->slirp_state = data_buf_offset;
> @@ -2516,7 +2525,7 @@ static void tunnel_channel_marshall_migrate_data(TunnelChannel *channel,
>
> for (i = 0; i < migrate_item->services_list->num_services; i++) {
> migrate_item->services_list->services[i] = data_buf_offset;
> - data_buf_offset += __tunnel_channel_marshall_service_migrate_data(channel, m,
> + data_buf_offset += __tunnel_channel_marshall_service_migrate_data(tunnel_channel, m,
> migrate_item->services + i,
> data_buf_offset);
> }
> @@ -2529,83 +2538,93 @@ static void tunnel_channel_marshall_migrate_data(TunnelChannel *channel,
>
> for (i = 0; i < migrate_item->sockets_list->num_sockets; i++) {
> migrate_item->sockets_list->sockets[i] = data_buf_offset;
> - data_buf_offset += __tunnel_channel_marshall_socket_migrate_data(channel, m,
> + data_buf_offset += __tunnel_channel_marshall_socket_migrate_data(tunnel_channel, m,
> migrate_item->sockets_data + i,
> data_buf_offset);
> }
> }
>
> -static void tunnel_channel_marshall_init(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_init(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> - ASSERT(channel);
> + TunnelChannel *channel;
>
> + ASSERT(rcc);
> + channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> channel->send_data.u.init.max_socket_data_size = MAX_SOCKET_DATA_SIZE;
> channel->send_data.u.init.max_num_of_sockets = MAX_SOCKETS_NUM;
>
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_INIT, item);
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_INIT, item);
> spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.init, sizeof(SpiceMsgTunnelInit));
> }
>
> -static void tunnel_channel_marshall_service_ip_map(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_service_ip_map(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> TunnelService *service = SPICE_CONTAINEROF(item, TunnelService, pipe_item);
>
> - channel->send_data.u.service_ip.service_id = service->id;
> - channel->send_data.u.service_ip.virtual_ip.type = SPICE_TUNNEL_IP_TYPE_IPv4;
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> + tunnel_channel->send_data.u.service_ip.service_id = service->id;
> + tunnel_channel->send_data.u.service_ip.virtual_ip.type = SPICE_TUNNEL_IP_TYPE_IPv4;
>
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SERVICE_IP_MAP, item);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.service_ip,
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SERVICE_IP_MAP, item);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.service_ip,
> sizeof(SpiceMsgTunnelServiceIpMap));
> spice_marshaller_add_ref(m, (uint8_t*)&service->virt_ip.s_addr, sizeof(SpiceTunnelIPv4));
> }
>
> -static void tunnel_channel_marshall_socket_open(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_socket_open(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
>
> - channel->send_data.u.socket_open.connection_id = sckt->connection_id;
> - channel->send_data.u.socket_open.service_id = sckt->far_service->id;
> - channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> + tunnel_channel->send_data.u.socket_open.connection_id = sckt->connection_id;
> + tunnel_channel->send_data.u.socket_open.service_id = sckt->far_service->id;
> + tunnel_channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
>
> sckt->in_data.client_total_num_tokens = SOCKET_WINDOW_SIZE;
> sckt->in_data.num_tokens = 0;
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_OPEN, item);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_open,
> - sizeof(channel->send_data.u.socket_open));
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_OPEN, item);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_open,
> + sizeof(tunnel_channel->send_data.u.socket_open));
> #ifdef DEBUG_NETWORK
> PRINT_SCKT(sckt);
> #endif
> }
>
> -static void tunnel_channel_marshall_socket_fin(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_socket_fin(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
>
> ASSERT(!sckt->out_data.ready_chunks_queue.head);
>
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> if (sckt->out_data.process_queue->head) {
> red_printf("socket sent FIN but there are still buffers in outgoing process queue"
> "(local_port=%d, service_id=%d)",
> ntohs(sckt->local_port), sckt->far_service->id);
> }
>
> - channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
> + tunnel_channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
>
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_FIN, item);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_fin,
> - sizeof(channel->send_data.u.socket_fin));
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_FIN, item);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_fin,
> + sizeof(tunnel_channel->send_data.u.socket_fin));
> #ifdef DEBUG_NETWORK
> PRINT_SCKT(sckt);
> #endif
> }
>
> -static void tunnel_channel_marshall_socket_close(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_socket_close(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
>
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> // can happen when it is a forced close
> if (sckt->out_data.ready_chunks_queue.head) {
> red_printf("socket closed but there are still buffers in outgoing ready queue"
> @@ -2620,65 +2639,71 @@ static void tunnel_channel_marshall_socket_close(TunnelChannel *channel, SpiceMa
> ntohs(sckt->local_port), sckt->far_service->id);
> }
>
> - channel->send_data.u.socket_close.connection_id = sckt->connection_id;
> + tunnel_channel->send_data.u.socket_close.connection_id = sckt->connection_id;
>
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_CLOSE, item);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_close,
> - sizeof(channel->send_data.u.socket_close));
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_CLOSE, item);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_close,
> + sizeof(tunnel_channel->send_data.u.socket_close));
> #ifdef DEBUG_NETWORK
> PRINT_SCKT(sckt);
> #endif
> }
>
> -static void tunnel_channel_marshall_socket_closed_ack(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_socket_closed_ack(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
> RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
>
> - channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> + tunnel_channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
>
> // pipe item is null because we free the sckt.
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_CLOSED_ACK, NULL);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_close_ack,
> - sizeof(channel->send_data.u.socket_close_ack));
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_CLOSED_ACK, NULL);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_close_ack,
> + sizeof(tunnel_channel->send_data.u.socket_close_ack));
> #ifdef DEBUG_NETWORK
> PRINT_SCKT(sckt);
> #endif
>
> ASSERT(sckt->client_waits_close_ack && (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED));
> - tunnel_worker_free_socket(channel->worker, sckt);
> - if (CHECK_TUNNEL_ERROR(channel)) {
> - tunnel_shutdown(channel->worker);
> + tunnel_worker_free_socket(tunnel_channel->worker, sckt);
> + if (CHECK_TUNNEL_ERROR(tunnel_channel)) {
> + tunnel_shutdown(tunnel_channel->worker);
> }
> }
>
> -static void tunnel_channel_marshall_socket_token(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_socket_token(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, token_pipe_item);
> RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
>
> /* notice that the num of tokens sent can be > SOCKET_TOKENS_TO_SEND, since
> the sending is performed after the pipe item was pushed */
>
> - channel->send_data.u.socket_token.connection_id = sckt->connection_id;
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> + tunnel_channel->send_data.u.socket_token.connection_id = sckt->connection_id;
>
> if (sckt->in_data.num_tokens > 0) {
> - channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
> + tunnel_channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
> } else {
> ASSERT(!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head);
> - channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
> + tunnel_channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
> }
> - sckt->in_data.num_tokens -= channel->send_data.u.socket_token.num_tokens;
> - sckt->in_data.client_total_num_tokens += channel->send_data.u.socket_token.num_tokens;
> + sckt->in_data.num_tokens -= tunnel_channel->send_data.u.socket_token.num_tokens;
> + sckt->in_data.client_total_num_tokens += tunnel_channel->send_data.u.socket_token.num_tokens;
> ASSERT(sckt->in_data.client_total_num_tokens <= SOCKET_WINDOW_SIZE);
>
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_TOKEN, item);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_token,
> - sizeof(channel->send_data.u.socket_token));
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_TOKEN, item);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_token,
> + sizeof(tunnel_channel->send_data.u.socket_token));
> }
>
> -static void tunnel_channel_marshall_socket_out_data(TunnelChannel *channel, SpiceMarshaller *m, PipeItem *item)
> +static void tunnel_channel_marshall_socket_out_data(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
> {
> + TunnelChannel *tunnel_channel;
> + tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannel, base);
> RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, data_pipe_item);
> RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
> ReadyTunneledChunk *chunk;
> @@ -2698,11 +2723,11 @@ static void tunnel_channel_marshall_socket_out_data(TunnelChannel *channel, Spic
> ASSERT(!sckt->out_data.push_tail);
> ASSERT(sckt->out_data.ready_chunks_queue.head->size <= MAX_SOCKET_DATA_SIZE);
>
> - channel->send_data.u.socket_data.connection_id = sckt->connection_id;
> + tunnel_channel->send_data.u.socket_data.connection_id = sckt->connection_id;
>
> - red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_DATA, item);
> - spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.socket_data,
> - sizeof(channel->send_data.u.socket_data));
> + red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_DATA, item);
> + spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_data,
> + sizeof(tunnel_channel->send_data.u.socket_data));
> pushed_bufs_num++;
>
> // the first chunk is in a valid size
> @@ -2787,52 +2812,51 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem
> }
> }
>
> -static void tunnel_channel_send_item(RedChannel *channel, PipeItem *item)
> +static void tunnel_channel_send_item(RedChannelClient *rcc, PipeItem *item)
> {
> - TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
> - SpiceMarshaller *m = red_channel_get_marshaller(channel);
> + SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
>
> switch (item->type) {
> case PIPE_ITEM_TYPE_TUNNEL_INIT:
> - tunnel_channel_marshall_init(tunnel_channel, m, item);
> + tunnel_channel_marshall_init(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SERVICE_IP_MAP:
> - tunnel_channel_marshall_service_ip_map(tunnel_channel, m, item);
> + tunnel_channel_marshall_service_ip_map(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SOCKET_OPEN:
> - tunnel_channel_marshall_socket_open(tunnel_channel, m, item);
> + tunnel_channel_marshall_socket_open(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SOCKET_DATA:
> - tunnel_channel_marshall_socket_out_data(tunnel_channel, m, item);
> + tunnel_channel_marshall_socket_out_data(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SOCKET_FIN:
> - tunnel_channel_marshall_socket_fin(tunnel_channel, m, item);
> + tunnel_channel_marshall_socket_fin(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SOCKET_CLOSE:
> - tunnel_channel_marshall_socket_close(tunnel_channel, m, item);
> + tunnel_channel_marshall_socket_close(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK:
> - tunnel_channel_marshall_socket_closed_ack(tunnel_channel, m, item);
> + tunnel_channel_marshall_socket_closed_ack(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_SOCKET_TOKEN:
> - tunnel_channel_marshall_socket_token(tunnel_channel, m, item);
> + tunnel_channel_marshall_socket_token(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_MIGRATE:
> - tunnel_channel_marshall_migrate(tunnel_channel, m, item);
> + tunnel_channel_marshall_migrate(rcc, m, item);
> break;
> case PIPE_ITEM_TYPE_MIGRATE_DATA:
> - tunnel_channel_marshall_migrate_data(tunnel_channel, m, item);
> + tunnel_channel_marshall_migrate_data(rcc, m, item);
> break;
> default:
> red_error("invalid pipe item type");
> }
> - red_channel_begin_send_message(channel);
> + red_channel_client_begin_send_message(rcc);
> }
>
> /* param item_pushed: distinguishes between a pipe item that was pushed for sending, and
> a pipe item that is still in the pipe and is released due to disconnection.
> see red_pipe_item_clear */
> -static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item, int item_pushed)
> +static void tunnel_channel_release_pipe_item(RedChannelClient *rcc, PipeItem *item, int item_pushed)
> {
> if (!item) { // e.g. when acking closed socket
> return;
> @@ -2849,7 +2873,7 @@ static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item
> break;
> case PIPE_ITEM_TYPE_SOCKET_DATA:
> if (item_pushed) {
> - tunnel_worker_release_socket_out_data(((TunnelChannel *)channel)->worker, item);
> + tunnel_worker_release_socket_out_data(((TunnelChannel *)rcc->channel)->worker, item);
> }
> break;
> case PIPE_ITEM_TYPE_MIGRATE:
> @@ -3318,11 +3342,11 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
> * channel interface and other related procedures
> ************************************************/
>
> -static int tunnel_channel_config_socket(RedChannel *channel)
> +static int tunnel_channel_config_socket(RedChannelClient *rcc)
> {
> int flags;
> int delay_val;
> - RedsStream *stream = red_channel_get_stream(channel);
> + RedsStream *stream = red_channel_client_get_stream(rcc);
>
> if ((flags = fcntl(stream->socket, F_GETFL)) == -1) {
> red_printf("accept failed, %s", strerror(errno)); // can't we just use red_error?
> @@ -3383,6 +3407,12 @@ static void tunnel_channel_disconnect(RedChannel *channel)
> worker->channel = NULL;
> }
>
> +// TODO - not MC friendly, remove
> +static void tunnel_channel_disconnect_client(RedChannelClient *rcc)
> +{
> + tunnel_channel_disconnect(rcc->channel);
> +}
> +
> /* interface for reds */
>
> static void on_new_tunnel_channel(TunnelChannel *channel)
> @@ -3397,7 +3427,7 @@ static void on_new_tunnel_channel(TunnelChannel *channel)
> }
> }
>
> -static void tunnel_channel_hold_pipe_item(RedChannel *channel, PipeItem *item)
> +static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
> {
> }
>
> @@ -3412,10 +3442,10 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int
> }
>
> tunnel_channel =
> - (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), stream, worker->core_interface,
> + (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), worker->core_interface,
> migration, TRUE,
> tunnel_channel_config_socket,
> - tunnel_channel_disconnect,
> + tunnel_channel_disconnect_client,
> tunnel_channel_handle_message,
> tunnel_channel_alloc_msg_rcv_buf,
> tunnel_channel_release_msg_rcv_buf,
> @@ -3429,7 +3459,7 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int
> if (!tunnel_channel) {
> return;
> }
> -
> + red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, stream);
>
> tunnel_channel->worker = worker;
> tunnel_channel->worker->channel = tunnel_channel;
> diff --git a/server/red_worker.c b/server/red_worker.c
> index 8e3b106..834ba3c 100644
> --- a/server/red_worker.c
> +++ b/server/red_worker.c
> @@ -924,7 +924,7 @@ static void red_display_release_stream(DisplayChannel *display, StreamAgent *age
> static inline void red_detach_stream(RedWorker *worker, Stream *stream);
> static void red_stop_stream(RedWorker *worker, Stream *stream);
> static inline void red_stream_maintenance(RedWorker *worker, Drawable *candidate, Drawable *sect);
> -static inline void display_begin_send_message(DisplayChannel *channel, SpiceMarshaller *base_marshaller);
> +static inline void display_begin_send_message(RedChannelClient *rcc, SpiceMarshaller *base_marshaller);
> static void red_release_pixmap_cache(DisplayChannel *channel);
> static void red_release_glz(DisplayChannel *channel);
> static void red_freeze_glz(DisplayChannel *channel);
> @@ -1261,16 +1261,16 @@ static void release_upgrade_item(RedWorker* worker, UpgradeItem *item)
> }
> }
>
> -static uint8_t *common_alloc_recv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
> +static uint8_t *common_alloc_recv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
> {
> - CommonChannel *common = SPICE_CONTAINEROF(channel, CommonChannel, base);
> + CommonChannel *common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
>
> return common->recv_buf;
> }
>
> -static void common_release_recv_buf(RedChannel *channel, SpiceDataHeader *msg_header, uint8_t* msg)
> +static void common_release_recv_buf(RedChannelClient *rcc,
> + SpiceDataHeader *msg_header, uint8_t* msg)
> {
> - return;
> }
>
> #define CLIENT_PIXMAPS_CACHE
> @@ -1686,7 +1686,7 @@ static void red_clear_surface_drawables_from_pipe(RedWorker *worker, int surface
> item = (PipeItem *)ring_prev(ring, (RingItem *)item);
> ring_remove(&tmp_item->link);
> worker->display_channel->common.base.release_item(
> - &worker->display_channel->common.base, tmp_item, FALSE);
> + worker->display_channel->common.base.rcc, tmp_item, FALSE);
> worker->display_channel->common.base.pipe_size--;
>
> if (!item) {
> @@ -5687,16 +5687,18 @@ static inline int red_compress_image(DisplayChannel *display_channel,
> }
> }
>
> -static inline void red_display_add_image_to_pixmap_cache(DisplayChannel *display_channel,
> +static inline void red_display_add_image_to_pixmap_cache(RedChannelClient *rcc,
> SpiceImage *image, SpiceImage *io_image,
> int is_lossy)
> {
> + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> +
> if ((image->descriptor.flags & SPICE_IMAGE_FLAGS_CACHE_ME)) {
> ASSERT(image->descriptor.width * image->descriptor.height > 0);
> if (!(io_image->descriptor.flags & SPICE_IMAGE_FLAGS_CACHE_REPLACE_ME)) {
> if (pixmap_cache_add(display_channel->pixmap_cache, image->descriptor.id,
> image->descriptor.width * image->descriptor.height, is_lossy,
> - display_channel)) {
> + rcc)) {
> io_image->descriptor.flags |= SPICE_IMAGE_FLAGS_CACHE_ME;
> stat_inc_counter(display_channel->add_to_cache_counter, 1);
> }
> @@ -5719,9 +5721,10 @@ typedef enum {
>
> /* if the number of times fill_bits can be called per one qxl_drawable increases -
> MAX_LZ_DRAWABLE_INSTANCES must be increased as well */
> -static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *m,
> +static FillBitsType fill_bits(RedChannelClient *rcc, SpiceMarshaller *m,
> SpiceImage *simage, Drawable *drawable, int can_lossy)
> {
> + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> RedWorker *worker = display_channel->common.worker;
> SpiceImage image;
> compress_send_data_t comp_send_data = {0};
> @@ -5737,7 +5740,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> if ((simage->descriptor.flags & SPICE_IMAGE_FLAGS_CACHE_ME)) {
> int lossy_cache_item;
> if (pixmap_cache_hit(display_channel->pixmap_cache, image.descriptor.id,
> - &lossy_cache_item, display_channel)) {
> + &lossy_cache_item, rcc)) {
> if (can_lossy || !lossy_cache_item) {
> if (!display_channel->enable_jpeg || lossy_cache_item) {
> image.descriptor.type = SPICE_IMAGE_TYPE_FROM_CACHE;
> @@ -5794,7 +5797,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> drawable, can_lossy, &comp_send_data)) {
> SpicePalette *palette;
>
> - red_display_add_image_to_pixmap_cache(display_channel, simage, &image, FALSE);
> + red_display_add_image_to_pixmap_cache(rcc, simage, &image, FALSE);
>
> *bitmap = simage->u.bitmap;
> bitmap->flags = bitmap->flags & SPICE_BITMAP_FLAGS_TOP_DOWN;
> @@ -5812,7 +5815,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> spice_marshaller_add_ref_chunks(m, bitmap->data);
> return FILL_BITS_TYPE_BITMAP;
> } else {
> - red_display_add_image_to_pixmap_cache(display_channel, simage, &image,
> + red_display_add_image_to_pixmap_cache(rcc, simage, &image,
> comp_send_data.is_lossy);
>
> spice_marshall_Image(m, &image,
> @@ -5833,7 +5836,7 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> break;
> }
> case SPICE_IMAGE_TYPE_QUIC:
> - red_display_add_image_to_pixmap_cache(display_channel, simage, &image, FALSE);
> + red_display_add_image_to_pixmap_cache(rcc, simage, &image, FALSE);
> image.u.quic = simage->u.quic;
> spice_marshall_Image(m, &image,
> &bitmap_palette_out, &lzplt_palette_out);
> @@ -5846,23 +5849,25 @@ static FillBitsType fill_bits(DisplayChannel *display_channel, SpiceMarshaller *
> }
> }
>
> -static void fill_mask(DisplayChannel *display_channel, SpiceMarshaller *m,
> +static void fill_mask(RedChannelClient *rcc, SpiceMarshaller *m,
> SpiceImage *mask_bitmap, Drawable *drawable)
> {
> + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> +
> if (mask_bitmap && m) {
> if (display_channel->common.worker->image_compression != SPICE_IMAGE_COMPRESS_OFF) {
> spice_image_compression_t save_img_comp =
> display_channel->common.worker->image_compression;
> display_channel->common.worker->image_compression = SPICE_IMAGE_COMPRESS_OFF;
> - fill_bits(display_channel, m, mask_bitmap, drawable, FALSE);
> + fill_bits(rcc, m, mask_bitmap, drawable, FALSE);
> display_channel->common.worker->image_compression = save_img_comp;
> } else {
> - fill_bits(display_channel, m, mask_bitmap, drawable, FALSE);
> + fill_bits(rcc, m, mask_bitmap, drawable, FALSE);
> }
> }
> }
>
> -static void fill_attr(DisplayChannel *display_channel, SpiceMarshaller *m, SpiceLineAttr *attr, uint32_t group_id)
> +static void fill_attr(SpiceMarshaller *m, SpiceLineAttr *attr, uint32_t group_id)
> {
> int i;
>
> @@ -5963,9 +5968,11 @@ static int is_surface_area_lossy(DisplayChannel *display_channel, uint32_t surfa
> to the client, returns false. "area" is for surfaces. If area = NULL,
> all the surface is considered. out_lossy_data will hold info about the bitmap, and its lossy
> area in case it is lossy and part of a surface. */
> -static int is_bitmap_lossy(DisplayChannel *display_channel, SpiceImage *image, SpiceRect *area,
> +static int is_bitmap_lossy(RedChannelClient *rcc, SpiceImage *image, SpiceRect *area,
> Drawable *drawable, BitmapData *out_data)
> {
> + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> +
> if (image == NULL) {
> // self bitmap
> out_data->type = BITMAP_DATA_TYPE_BITMAP;
> @@ -5977,7 +5984,7 @@ static int is_bitmap_lossy(DisplayChannel *display_channel, SpiceImage *image, S
>
> out_data->id = image->descriptor.id;
> if (pixmap_cache_hit(display_channel->pixmap_cache, image->descriptor.id,
> - &is_hit_lossy, display_channel)) {
> + &is_hit_lossy, rcc)) {
> out_data->type = BITMAP_DATA_TYPE_CACHE;
> if (is_hit_lossy) {
> return TRUE;
> @@ -6007,11 +6014,11 @@ static int is_bitmap_lossy(DisplayChannel *display_channel, SpiceImage *image, S
> }
> }
>
> -static int is_brush_lossy(DisplayChannel *display_channel, SpiceBrush *brush,
> +static int is_brush_lossy(RedChannelClient *rcc, SpiceBrush *brush,
> Drawable *drawable, BitmapData *out_data)
> {
> if (brush->type == SPICE_BRUSH_TYPE_PATTERN) {
> - return is_bitmap_lossy(display_channel, brush->u.pattern.pat, NULL,
> + return is_bitmap_lossy(rcc, brush->u.pattern.pat, NULL,
> drawable, out_data);
> } else {
> out_data->type = BITMAP_DATA_TYPE_INVALID;
> @@ -6286,17 +6293,16 @@ static void red_add_lossless_drawable_dependencies(RedWorker *worker,
> }
>
> static void red_marshall_qxl_draw_fill(RedWorker *worker,
> - DisplayChannel *display_channel,
> + RedChannelClient *rcc,
> SpiceMarshaller *base_marshaller,
> Drawable *item)
> {
> - RedChannel *channel = &display_channel->common.base;
> RedDrawable *drawable = item->red_drawable;
> SpiceMarshaller *brush_pat_out;
> SpiceMarshaller *mask_bitmap_out;
> SpiceFill fill;
>
> - red_channel_init_send_data(channel, SPICE_MSG_DISPLAY_DRAW_FILL, &item->pipe_item);
> + red_channel_client_init_send_data(rcc, SPICE_MSG_DISPLAY_DRAW_FILL, &item->pipe_item);
> fill_base(base_marshaller, item);
> fill = drawable->u.fill;
> spice_marshall_Fill(base_marshaller,
> @@ -6305,18 +6311,19 @@ static void red_marshall_qxl_draw_fill(RedWorker *worker,
> &mask_bitmap_out);
>
> if (brush_pat_out) {
> - fill_bits(display_channel, brush_pat_out, fill.brush.u.pattern.pat, item, FALSE);
> + fill_bits(rcc, brush_pat_out, fill.brush.u.pattern.pat, item, FALSE);
> }
>
> - fill_mask(display_channel, mask_bitmap_out, fill.mask.bitmap, item);
> + fill_mask(rcc, mask_bitmap_out, fill.mask.bitmap, item);
> }
>
>
> -static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> - DisplayChannel *display_channel,
> +static void red_lossy_marshall_qxl_draw_fill(RedWorker *worker,
> + RedChannelClient *rcc,
> SpiceMarshaller *m,
> Drawable *item)
> {
> + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> RedDrawable *drawable = item->red_drawable;
>
> int dest_allowed_lossy = FALSE;
> @@ -6332,7 +6339,7 @@ static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> (rop & SPICE_ROPD_OP_AND) ||
> (rop & SPICE_ROPD_OP_XOR));
>
> - brush_is_lossy = is_brush_lossy(display_channel, &drawable->u.fill.brush, item,
> + brush_is_lossy = is_brush_lossy(rcc, &drawable->u.fill.brush, item,
> &brush_bitmap_data);
> if (!dest_allowed_lossy) {
> dest_is_lossy = is_surface_area_lossy(display_channel, item->surface_id, &drawable->bbox,
> @@ -6343,8 +6350,7 @@ static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> !(brush_is_lossy && (brush_bitmap_data.type == BITMAP_DATA_TYPE_SURFACE))) {
> int has_mask = !!drawable->u.fill.mask.bitmap;
>
> - red_marshall_qxl_draw_fill(worker, display_channel, m, item);
> -
> + red_marshall_qxl_draw_fill(worker, rcc, m, item);
> // either the brush operation is opaque, or the dest is not lossy
> surface_lossy_region_update(worker, display_channel, item, has_mask, FALSE);
> } else {
> @@ -6370,11 +6376,10 @@ static void red_lossy_send_qxl_draw_fill(RedWorker *worker,
> }
>
> static FillBitsType red_marshall_qxl_draw_opaque(RedWorker *worker,
> - DisplayChannel *display_channel,
> + RedChannelClient *rcc,
> SpiceMarshaller *base_marshaller,
> Drawable *item, int src_allowed_lossy)
> {
> - RedChannel *channel = &display_channel->common.base;
> RedDrawable *drawable = item->red_drawable;
> SpiceMarshaller *brush_pat_out;
> SpiceMarshaller *src_bitmap_out;
> @@ -6382,7 +6387,7 @@ static FillBitsType red_marshall_qxl_draw_opaque(RedWorker *worker,
> SpiceOpaque opaque;
> FillBitsType src_send_type;
>
> - red_channel_init_send_data(channel, SPICE_MSG_DISPLAY_DRAW_OPAQUE, &item->pipe_item);
> + red_channel_client_init_send_data(rcc, SPICE_MSG_DISPLAY_DRAW_OPAQUE, &item->pipe_item);
> fill_base(base_marshaller, item);
> opaque = drawable->u.opaque;
> spice_marshall_Opaque(base_marshaller,
> @@ -6391,22 +6396,23 @@ static FillBitsType red_marshall_qxl_draw_opaque(RedWorker *worker,
> &brush_pat_out,
> &mask_bitmap_out);
>
> - src_send_type = fill_bits(display_channel, src_bitmap_out, opaque.src_bitmap, item,
> + src_send_type = fill_bits(rcc, src_bitmap_out, opaque.src_bitmap, item,
> src_allowed_lossy);
>
> if (brush_pat_out) {
> - fill_bits(display_channel, brush_pat_out, opaque.brush.u.pattern.pat, item, FALSE);
> + fill_bits(rcc, brush_pat_out, opaque.brush.u.pattern.pat, item, FALSE);
> }
> - fill_mask(display_channel, mask_bitmap_out, opaque.mask.bitmap, item);
> + fill_mask(rcc, mask_bitmap_out, opaque.mask.bitmap, item);
>
> return src_send_type;
> }
>
> -static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> - DisplayChannel *display_channel,
> +static void red_lossy_marshall_qxl_draw_opaque(RedWorker *worker,
> + RedChannelClient *rcc,
> SpiceMarshaller *m,
> Drawable *item)
> {
> + DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
> RedDrawable *drawable = item->red_drawable;
>
> int src_allowed_lossy;
> @@ -6421,11 +6427,11 @@ static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> (rop & SPICE_ROPD_OP_AND) ||
> (rop & SPICE_ROPD_OP_XOR));
>
> - brush_is_lossy = is_brush_lossy(display_channel, &drawable->u.opaque.brush, item,
> + brush_is_lossy = is_brush_lossy(rcc, &drawable->u.opaque.brush, item,
> &brush_bitmap_data);
>
> if (!src_allowed_lossy) {
> - src_is_lossy = is_bitmap_lossy(display_channel, drawable->u.opaque.src_bitmap,
> + src_is_lossy = is_bitmap_lossy(rcc, drawable->u.opaque.src_bitmap,
> &drawable->u.opaque.src_area,
> item,
> &src_bitmap_data);
> @@ -6436,8 +6442,7 @@ static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> FillBitsType src_send_type;
> int has_mask = !!drawable->u.opaque.mask.bitmap;
>
> - src_send_type = red_marshall_qxl_draw_opaque(worker, display_channel, m, item, src_allowed_lossy);
> -
> + src_send_type = red_marshall_qxl_draw_opaque(worker, rcc, m, item, src_allowed_lossy);
> if (src_send_type == FILL_BITS_TYPE_COMPRESS_LOSSY) {
> src_is_lossy = TRUE;
> } else if (src_send_type == FILL_BITS_TYPE_COMPRESS_LOSSLESS) {
> @@ -6468,18 +6473,17 @@ static void red_lossy_send_qxl_draw_opaque(RedWorker *worker,
> }
>
> static FillBitsType red_marshall_qxl_draw_copy(RedWorker *worker,
> - DisplayChannel *display_channel,
> + RedChannelClient *rcc,
> SpiceMarshaller *base_marshaller,
> Drawable *item, int src_allowed_...
>
> [Message clipped]
--
Marc-André Lureau
More information about the Spice-devel
mailing list