[Spice-devel] [PATCH 10/14] Move RedChannelClient to separate file
Jonathon Jongsma
jjongsma at redhat.com
Tue May 3 20:00:26 UTC 2016
Reduce direct access to RedChannelClient, and get ready to convert to
GObject.
---
server/Makefile.am | 3 +
server/common-graphics-channel-client-private.h | 2 +-
server/common-graphics-channel-client.c | 1 +
server/dcc.h | 4 +-
server/display-channel.c | 3 +-
server/inputs-channel-client.c | 1 +
server/inputs-channel.c | 13 +-
server/main-channel-client.c | 2 +-
server/main-channel.c | 60 +-
server/red-channel-client-private.h | 78 +
server/red-channel-client.c | 1622 ++++++++++++++++++++
server/red-channel-client.h | 176 +++
server/red-channel.c | 1845 ++---------------------
server/red-channel.h | 186 +--
server/red-qxl.c | 24 +-
server/red-worker.c | 3 +-
server/reds.c | 22 +-
server/smartcard.c | 2 +-
server/sound.c | 8 +-
server/spicevmc.c | 40 +-
server/stream.c | 11 +-
21 files changed, 2155 insertions(+), 1951 deletions(-)
create mode 100644 server/red-channel-client-private.h
create mode 100644 server/red-channel-client.c
create mode 100644 server/red-channel-client.h
diff --git a/server/Makefile.am b/server/Makefile.am
index ed6c875..1643b93 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -97,6 +97,9 @@ libserver_la_SOURCES = \
mjpeg-encoder.h \
red-channel.c \
red-channel.h \
+ red-channel-client.c \
+ red-channel-client.h \
+ red-channel-client-private.h \
red-common.h \
dispatcher.c \
dispatcher.h \
diff --git a/server/common-graphics-channel-client-private.h b/server/common-graphics-channel-client-private.h
index 8a9ef76..92442ff 100644
--- a/server/common-graphics-channel-client-private.h
+++ b/server/common-graphics-channel-client-private.h
@@ -18,7 +18,7 @@
#define COMMON_GRAPHICS_CHANNEL_CLIENT_PRIVATE_H
#include "common-graphics-channel-client.h"
-#include "red-channel.h"
+#include "red-channel-client-private.h"
struct CommonGraphicsChannelClient {
RedChannelClient base;
diff --git a/server/common-graphics-channel-client.c b/server/common-graphics-channel-client.c
index 1de4fe2..f25d737 100644
--- a/server/common-graphics-channel-client.c
+++ b/server/common-graphics-channel-client.c
@@ -20,6 +20,7 @@
#include "common-graphics-channel-client-private.h"
#include "dcc.h"
+#include "red-channel-client.h"
void common_graphics_channel_client_set_low_bandwidth(CommonGraphicsChannelClient *self,
gboolean low_bandwidth)
diff --git a/server/dcc.h b/server/dcc.h
index a56c4e2..76ac401 100644
--- a/server/dcc.h
+++ b/server/dcc.h
@@ -60,8 +60,8 @@ typedef struct FreeList {
typedef struct DisplayChannelClient DisplayChannelClient;
-#define DCC_TO_WORKER(dcc) ((RedWorker*)((CommonGraphicsChannel*)((RedChannelClient*)dcc)->channel)->worker)
-#define DCC_TO_DC(dcc) ((DisplayChannel*)((RedChannelClient*)dcc)->channel)
+#define DCC_TO_WORKER(dcc) ((RedWorker*)((CommonGraphicsChannel*)(red_channel_client_get_channel((RedChannelClient*)dcc)))->worker)
+#define DCC_TO_DC(dcc) ((DisplayChannel*)red_channel_client_get_channel((RedChannelClient*)dcc))
#define RCC_TO_DCC(rcc) ((DisplayChannelClient*)rcc)
typedef struct RedSurfaceCreateItem {
diff --git a/server/display-channel.c b/server/display-channel.c
index cca99eb..e6112b4 100644
--- a/server/display-channel.c
+++ b/server/display-channel.c
@@ -1987,8 +1987,7 @@ static void release_item(RedChannelClient *rcc, RedPipeItem *item, int item_push
static int handle_migrate_flush_mark(RedChannelClient *rcc)
{
- DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
- RedChannel *channel = RED_CHANNEL(display_channel);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
red_channel_pipes_add_type(channel, RED_PIPE_ITEM_TYPE_MIGRATE_DATA);
return TRUE;
diff --git a/server/inputs-channel-client.c b/server/inputs-channel-client.c
index adcc5c6..e02fbc9 100644
--- a/server/inputs-channel-client.c
+++ b/server/inputs-channel-client.c
@@ -21,6 +21,7 @@
#include "inputs-channel-client.h"
#include "inputs-channel.h"
#include "migration-protocol.h"
+#include "red-channel-client-private.h"
struct InputsChannelClient {
RedChannelClient base;
diff --git a/server/inputs-channel.c b/server/inputs-channel.c
index 8f548c6..237905e 100644
--- a/server/inputs-channel.c
+++ b/server/inputs-channel.c
@@ -39,6 +39,7 @@
#include "reds.h"
#include "reds-stream.h"
#include "red-channel.h"
+#include "red-channel-client.h"
#include "inputs-channel-client.h"
#include "main-channel-client.h"
#include "inputs-channel.h"
@@ -151,7 +152,7 @@ static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
uint16_t type,
uint32_t size)
{
- InputsChannel *inputs_channel = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+ InputsChannel *inputs_channel = (InputsChannel*)red_channel_client_get_channel(rcc);
if (size > RECEIVE_BUF_SIZE) {
spice_printerr("error: too large incoming message");
@@ -275,7 +276,7 @@ static void inputs_channel_send_item(RedChannelClient *rcc, RedPipeItem *base)
static int inputs_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type,
void *message)
{
- InputsChannel *inputs_channel = (InputsChannel *)rcc->channel;
+ InputsChannel *inputs_channel = (InputsChannel *)red_channel_client_get_channel(rcc);
InputsChannelClient *icc = (InputsChannelClient *)rcc;
uint32_t i;
RedsState *reds = red_channel_get_server(&inputs_channel->base);
@@ -461,13 +462,13 @@ static void inputs_channel_on_disconnect(RedChannelClient *rcc)
if (!rcc) {
return;
}
- inputs_release_keys(SPICE_CONTAINEROF(rcc->channel, InputsChannel, base));
+ inputs_release_keys((InputsChannel*)red_channel_client_get_channel(rcc));
}
static void inputs_pipe_add_init(RedChannelClient *rcc)
{
RedInputsInitPipeItem *item = spice_malloc(sizeof(RedInputsInitPipeItem));
- InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+ InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc);
red_pipe_item_init(&item->base, RED_PIPE_ITEM_INPUTS_INIT);
item->modifiers = kbd_get_leds(inputs_channel_get_keyboard(inputs));
@@ -518,7 +519,7 @@ static void inputs_connect(RedChannel *channel, RedClient *client,
static void inputs_migrate(RedChannelClient *rcc)
{
- InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+ InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc);
inputs->src_during_migrate = TRUE;
red_channel_client_default_migrate(rcc);
}
@@ -555,7 +556,7 @@ static int inputs_channel_handle_migrate_data(RedChannelClient *rcc,
void *message)
{
InputsChannelClient *icc = (InputsChannelClient*)rcc;
- InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+ InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc);
SpiceMigrateDataHeader *header;
SpiceMigrateDataInputs *mig_data;
diff --git a/server/main-channel-client.c b/server/main-channel-client.c
index bed0f55..077bcef 100644
--- a/server/main-channel-client.c
+++ b/server/main-channel-client.c
@@ -20,8 +20,8 @@
#include <inttypes.h>
#include "main-channel-client.h"
-#include "main-channel-client.h"
#include "main-channel.h"
+#include "red-channel-client-private.h"
#include "reds.h"
#define NET_TEST_WARMUP_BYTES 0
diff --git a/server/main-channel.c b/server/main-channel.c
index 824c65c..fbbb032 100644
--- a/server/main-channel.c
+++ b/server/main-channel.c
@@ -66,9 +66,10 @@ int main_channel_is_connected(MainChannel *main_chan)
*/
static void main_channel_client_on_disconnect(RedChannelClient *rcc)
{
- RedsState *reds = red_channel_get_server(rcc->channel);
+ RedsState *reds = red_channel_get_server(red_channel_client_get_channel(rcc));
spice_printerr("rcc=%p", rcc);
- main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds), rcc->client);
+ main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds),
+ red_channel_client_get_client(rcc));
}
RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id)
@@ -81,7 +82,7 @@ RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t c
rcc = link->data;
mcc = (MainChannelClient*) rcc;
if (main_channel_client_get_connection_id(mcc) == connection_id) {
- return rcc->client;
+ return red_channel_client_get_client(rcc);
}
}
return NULL;
@@ -116,12 +117,13 @@ static RedPipeItem *main_multi_media_time_item_new(RedChannelClient *rcc,
static void main_channel_push_channels(MainChannelClient *mcc)
{
- if (red_client_during_migrate_at_target((main_channel_client_get_base(mcc))->client)) {
+ RedChannelClient *rcc = main_channel_client_get_base(mcc);
+ if (red_client_during_migrate_at_target(red_channel_client_get_client(rcc))) {
spice_printerr("warning: ignoring unexpected SPICE_MSGC_MAIN_ATTACH_CHANNELS"
"during migration");
return;
}
- red_channel_client_pipe_add_type(main_channel_client_get_base(mcc), RED_PIPE_ITEM_TYPE_MAIN_CHANNELS_LIST);
+ red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MAIN_CHANNELS_LIST);
}
static void main_channel_marshall_channels(RedChannelClient *rcc,
@@ -129,9 +131,10 @@ static void main_channel_marshall_channels(RedChannelClient *rcc,
RedPipeItem *item)
{
SpiceMsgChannels* channels_info;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_CHANNELS_LIST, item);
- channels_info = reds_msg_channels_new(rcc->channel->reds);
+ channels_info = reds_msg_channels_new(channel->reds);
spice_marshall_msg_main_channels_list(m, channels_info);
free(channels_info);
}
@@ -246,18 +249,20 @@ static void main_channel_marshall_migrate_data_item(RedChannelClient *rcc,
SpiceMarshaller *m,
RedPipeItem *item)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item);
- reds_marshall_migrate_data(rcc->channel->reds, m); // TODO: from reds split. ugly separation.
+ reds_marshall_migrate_data(channel->reds, m); // TODO: from reds split. ugly separation.
}
static int main_channel_handle_migrate_data(RedChannelClient *rcc,
uint32_t size, void *message)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
MainChannelClient *mcc = (MainChannelClient*)rcc;
SpiceMigrateDataHeader *header = (SpiceMigrateDataHeader *)message;
/* not supported with multi-clients */
- spice_assert(rcc->channel->clients_num == 1);
+ spice_assert(channel->clients_num == 1);
if (size < sizeof(SpiceMigrateDataHeader) + sizeof(SpiceMigrateDataMain)) {
spice_printerr("bad message size %u", size);
@@ -269,7 +274,7 @@ static int main_channel_handle_migrate_data(RedChannelClient *rcc,
spice_error("bad header");
return FALSE;
}
- return reds_handle_migrate_data(rcc->channel->reds, mcc, (SpiceMigrateDataMain *)(header + 1), size);
+ return reds_handle_migrate_data(channel->reds, mcc, (SpiceMigrateDataMain *)(header + 1), size);
}
static void main_channel_marshall_init(RedChannelClient *rcc,
@@ -277,7 +282,7 @@ static void main_channel_marshall_init(RedChannelClient *rcc,
RedInitPipeItem *item)
{
SpiceMsgMainInit init; // TODO - remove this copy, make RedInitPipeItem reuse SpiceMsgMainInit
-
+ RedChannel *channel = red_channel_client_get_channel(rcc);
red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_INIT, &item->base);
init.session_id = item->connection_id;
@@ -287,7 +292,7 @@ static void main_channel_marshall_init(RedChannelClient *rcc,
if (item->is_client_mouse_allowed) {
init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
}
- init.agent_connected = reds_has_vdagent(rcc->channel->reds);
+ init.agent_connected = reds_has_vdagent(channel->reds);
init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
init.multi_media_time = item->multi_media_time;
init.ram_hint = item->ram_hint;
@@ -329,11 +334,12 @@ static void main_channel_fill_migrate_dst_info(MainChannel *main_channel,
static void main_channel_marshall_migrate_begin(SpiceMarshaller *m, RedChannelClient *rcc,
RedPipeItem *item)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
SpiceMsgMainMigrationBegin migrate;
MainChannel *main_ch;
red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_BEGIN, item);
- main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+ main_ch = SPICE_CONTAINEROF(channel, MainChannel, base);
main_channel_fill_migrate_dst_info(main_ch, &migrate.dst_info);
spice_marshall_msg_main_migrate_begin(m, &migrate);
}
@@ -342,11 +348,12 @@ static void main_channel_marshall_migrate_begin_seamless(SpiceMarshaller *m,
RedChannelClient *rcc,
RedPipeItem *item)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
SpiceMsgMainMigrateBeginSeamless migrate_seamless;
MainChannel *main_ch;
red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_BEGIN_SEAMLESS, item);
- main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+ main_ch = SPICE_CONTAINEROF(channel, MainChannel, base);
main_channel_fill_migrate_dst_info(main_ch, &migrate_seamless.dst_info);
migrate_seamless.src_mig_version = SPICE_MIGRATION_PROTOCOL_VERSION;
spice_marshall_msg_main_migrate_begin_seamless(m, &migrate_seamless);
@@ -386,12 +393,13 @@ void main_channel_migrate_switch(MainChannel *main_chan, RedsMigSpice *mig_targe
static void main_channel_marshall_migrate_switch(SpiceMarshaller *m, RedChannelClient *rcc,
RedPipeItem *item)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
SpiceMsgMainMigrationSwitchHost migrate;
MainChannel *main_ch;
spice_printerr("");
red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST, item);
- main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+ main_ch = SPICE_CONTAINEROF(channel, MainChannel, base);
migrate.port = main_ch->mig_target.port;
migrate.sport = main_ch->mig_target.sport;
migrate.host_size = strlen(main_ch->mig_target.host) + 1;
@@ -431,7 +439,7 @@ static void main_channel_send_item(RedChannelClient *rcc, RedPipeItem *base)
base->type != RED_PIPE_ITEM_TYPE_MAIN_INIT) {
spice_printerr("Init msg for client %p was not sent yet "
"(client is probably during semi-seamless migration). Ignoring msg type %d",
- rcc->client, base->type);
+ red_channel_client_get_client(rcc), base->type);
main_channel_release_pipe_item(rcc, base, FALSE);
return;
}
@@ -527,7 +535,8 @@ static void main_channel_release_pipe_item(RedChannelClient *rcc,
static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type,
void *message)
{
- MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
MainChannelClient *mcc = (MainChannelClient*)rcc;
switch (type) {
@@ -539,18 +548,18 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
return FALSE;
}
tokens = (SpiceMsgcMainAgentStart *)message;
- reds_on_main_agent_start(rcc->channel->reds, mcc, tokens->num_tokens);
+ reds_on_main_agent_start(channel->reds, mcc, tokens->num_tokens);
break;
}
case SPICE_MSGC_MAIN_AGENT_DATA: {
- reds_on_main_agent_data(rcc->channel->reds, mcc, message, size);
+ reds_on_main_agent_data(channel->reds, mcc, message, size);
break;
}
case SPICE_MSGC_MAIN_AGENT_TOKEN: {
SpiceMsgcMainAgentTokens *tokens;
tokens = (SpiceMsgcMainAgentTokens *)message;
- reds_on_main_agent_tokens(rcc->channel->reds, mcc, tokens->num_tokens);
+ reds_on_main_agent_tokens(channel->reds, mcc, tokens->num_tokens);
break;
}
case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
@@ -574,7 +583,7 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
((SpiceMsgcMainMigrateDstDoSeamless *)message)->src_version);
break;
case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST:
- reds_on_main_mouse_mode_request(rcc->channel->reds, message, size);
+ reds_on_main_mouse_mode_request(channel->reds, message, size);
break;
case SPICE_MSGC_PONG: {
main_channel_client_handle_pong(mcc, (SpiceMsgPing *)message, size);
@@ -595,11 +604,12 @@ static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
uint16_t type,
uint32_t size)
{
- MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
MainChannelClient *mcc = (MainChannelClient*)rcc;
if (type == SPICE_MSGC_MAIN_AGENT_DATA) {
- return reds_get_agent_data_buffer(rcc->channel->reds, mcc, size);
+ return reds_get_agent_data_buffer(channel->reds, mcc, size);
} else {
return main_chan->recv_buf;
}
@@ -610,8 +620,9 @@ static void main_channel_release_msg_rcv_buf(RedChannelClient *rcc,
uint32_t size,
uint8_t *msg)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
if (type == SPICE_MSGC_MAIN_AGENT_DATA) {
- reds_release_agent_data_buffer(rcc->channel->reds, msg);
+ reds_release_agent_data_buffer(channel->reds, msg);
}
}
@@ -626,8 +637,9 @@ static void main_channel_hold_pipe_item(RedChannelClient *rcc, RedPipeItem *item
static int main_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
spice_debug(NULL);
- main_channel_push_migrate_data_item(SPICE_CONTAINEROF(rcc->channel,
+ main_channel_push_migrate_data_item(SPICE_CONTAINEROF(channel,
MainChannel, base));
return TRUE;
}
diff --git a/server/red-channel-client-private.h b/server/red-channel-client-private.h
new file mode 100644
index 0000000..4ad4d90
--- /dev/null
+++ b/server/red-channel-client-private.h
@@ -0,0 +1,78 @@
+/*
+ Copyright (C) 2009-2015 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _H_RED_CHANNEL_CLIENT_PRIVATE
+#define _H_RED_CHANNEL_CLIENT_PRIVATE
+
+#include "red-channel-client.h"
+#include "red-channel.h"
+
+struct RedChannelClient {
+ RedChannel *channel;
+ RedClient *client;
+ RedsStream *stream;
+ int dummy;
+ int dummy_connected;
+
+ uint32_t refs;
+
+ struct {
+ uint32_t generation;
+ uint32_t client_generation;
+ uint32_t messages_window;
+ uint32_t client_window;
+ } ack_data;
+
+ struct {
+ SpiceMarshaller *marshaller;
+ SpiceDataHeaderOpaque header;
+ uint32_t size;
+ RedPipeItem *item;
+ int blocked;
+ uint64_t serial;
+ uint64_t last_sent_serial;
+
+ struct {
+ SpiceMarshaller *marshaller;
+ uint8_t *header_data;
+ RedPipeItem *item;
+ } main;
+
+ struct {
+ SpiceMarshaller *marshaller;
+ } urgent;
+ } send_data;
+
+ OutgoingHandler outgoing;
+ IncomingHandler incoming;
+ int during_send;
+ int id; // debugging purposes
+ Ring pipe;
+ uint32_t pipe_size;
+
+ RedChannelCapabilities remote_caps;
+ int is_mini_header;
+ gboolean destroying;
+
+ int wait_migrate_data;
+ int wait_migrate_flush_mark;
+
+ RedChannelClientLatencyMonitor latency_monitor;
+ RedChannelClientConnectivityMonitor connectivity_monitor;
+};
+
+#endif /* _H_RED_CHANNEL_CLIENT_PRIVATE */
diff --git a/server/red-channel-client.c b/server/red-channel-client.c
new file mode 100644
index 0000000..02450df
--- /dev/null
+++ b/server/red-channel-client.c
@@ -0,0 +1,1622 @@
+/*
+ Copyright (C) 2009-2015 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/ioctl.h>
+#ifdef HAVE_LINUX_SOCKIOS_H
+#include <linux/sockios.h> /* SIOCOUTQ */
+#endif
+
+#include "common/generated_server_marshallers.h"
+#include "red-channel-client-private.h"
+#include "red-channel.h"
+
+#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15)
+#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10)
+
+enum QosPingState {
+ PING_STATE_NONE,
+ PING_STATE_TIMER,
+ PING_STATE_WARMUP,
+ PING_STATE_LATENCY,
+};
+
+enum ConnectivityState {
+ CONNECTIVITY_STATE_CONNECTED,
+ CONNECTIVITY_STATE_BLOCKED,
+ CONNECTIVITY_STATE_WAIT_PONG,
+ CONNECTIVITY_STATE_DISCONNECTED,
+};
+
+typedef struct RedEmptyMsgPipeItem {
+ RedPipeItem base;
+ int msg;
+} RedEmptyMsgPipeItem;
+
+static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout)
+{
+ if (!rcc->latency_monitor.timer) {
+ return;
+ }
+ if (rcc->latency_monitor.state != PING_STATE_NONE) {
+ return;
+ }
+ rcc->latency_monitor.state = PING_STATE_TIMER;
+ rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout);
+}
+
+static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc)
+{
+ if (!rcc->latency_monitor.timer) {
+ return;
+ }
+ if (rcc->latency_monitor.state != PING_STATE_TIMER) {
+ return;
+ }
+
+ rcc->channel->core->timer_cancel(rcc->latency_monitor.timer);
+ rcc->latency_monitor.state = PING_STATE_NONE;
+}
+
+static void red_channel_client_restart_ping_timer(RedChannelClient *rcc)
+{
+ uint64_t passed, timeout;
+
+ passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC;
+ timeout = PING_TEST_IDLE_NET_TIMEOUT_MS;
+ if (passed < PING_TEST_TIMEOUT_MS) {
+ timeout += PING_TEST_TIMEOUT_MS - passed;
+ }
+
+ red_channel_client_start_ping_timer(rcc, timeout);
+}
+
+RedChannel* red_channel_client_get_channel(RedChannelClient* rcc)
+{
+ return rcc->channel;
+}
+
+IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc)
+{
+ return &rcc->incoming;
+}
+
+void red_channel_client_on_output(void *opaque, int n)
+{
+ RedChannelClient *rcc = opaque;
+
+ if (rcc->connectivity_monitor.timer) {
+ rcc->connectivity_monitor.out_bytes += n;
+ }
+ stat_inc_counter(reds, rcc->channel->out_bytes_counter, n);
+}
+
+void red_channel_client_on_input(void *opaque, int n)
+{
+ RedChannelClient *rcc = opaque;
+
+ if (rcc->connectivity_monitor.timer) {
+ rcc->connectivity_monitor.in_bytes += n;
+ }
+}
+
+int red_channel_client_get_out_msg_size(void *opaque)
+{
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+ return rcc->send_data.size;
+}
+
+void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec,
+ int *vec_size, int pos)
+{
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+ *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
+ vec, IOV_MAX, pos);
+}
+
+void red_channel_client_on_out_block(void *opaque)
+{
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+ rcc->send_data.blocked = TRUE;
+ rcc->channel->core->watch_update_mask(rcc->stream->watch,
+ SPICE_WATCH_EVENT_READ |
+ SPICE_WATCH_EVENT_WRITE);
+}
+
+static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc)
+{
+ return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller);
+}
+
+static void red_channel_client_reset_send_data(RedChannelClient *rcc)
+{
+ spice_marshaller_reset(rcc->send_data.marshaller);
+ rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller,
+ rcc->send_data.header.header_size);
+ spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size);
+ rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0);
+ rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0);
+
+ /* Keeping the serial consecutive: resetting it if reset_send_data
+ * has been called before, but no message has been sent since then.
+ */
+ if (rcc->send_data.last_sent_serial != rcc->send_data.serial) {
+ spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1);
+ /* When the urgent marshaller is active, the serial was incremented by
+ * the call to reset_send_data that was made for the main marshaller.
+ * The urgent msg receives this serial, and the main msg serial is
+ * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial)
+ * should be 1 in this case*/
+ if (!red_channel_client_urgent_marshaller_is_active(rcc)) {
+ rcc->send_data.serial = rcc->send_data.last_sent_serial;
+ }
+ }
+ rcc->send_data.serial++;
+
+ if (!rcc->is_mini_header) {
+ spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller);
+ rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0);
+ rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
+ }
+}
+
+static void red_channel_client_send_set_ack(RedChannelClient *rcc)
+{
+ SpiceMsgSetAck ack;
+
+ spice_assert(rcc);
+ red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
+ ack.generation = ++rcc->ack_data.generation;
+ ack.window = rcc->ack_data.client_window;
+ rcc->ack_data.messages_window = 0;
+
+ spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
+
+ red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_migrate(RedChannelClient *rcc)
+{
+ SpiceMsgMigrate migrate;
+
+ red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL);
+ migrate.flags = rcc->channel->migration_flags;
+ spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate);
+ if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) {
+ rcc->wait_migrate_flush_mark = TRUE;
+ }
+
+ red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_ping(RedChannelClient *rcc)
+{
+ SpiceMsgPing ping;
+
+ if (!rcc->latency_monitor.warmup_was_sent) { // latency test start
+ int delay_val;
+ socklen_t opt_size = sizeof(delay_val);
+
+ rcc->latency_monitor.warmup_was_sent = TRUE;
+ /*
+ * When testing latency, TCP_NODELAY must be switched on, otherwise,
+ * sending the ping message is delayed by Nagle algorithm, and the
+ * roundtrip measurment is less accurate (bigger).
+ */
+ rcc->latency_monitor.tcp_nodelay = 1;
+ if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+ &opt_size) == -1) {
+ spice_warning("getsockopt failed, %s", strerror(errno));
+ } else {
+ rcc->latency_monitor.tcp_nodelay = delay_val;
+ if (!delay_val) {
+ delay_val = 1;
+ if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+ sizeof(delay_val)) == -1) {
+ if (errno != ENOTSUP) {
+ spice_warning("setsockopt failed, %s", strerror(errno));
+ }
+ }
+ }
+ }
+ }
+
+ red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL);
+ ping.id = rcc->latency_monitor.id;
+ ping.timestamp = spice_get_monotonic_time_ns();
+ spice_marshall_msg_ping(rcc->send_data.marshaller, &ping);
+ red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base)
+{
+ RedEmptyMsgPipeItem *msg_pipe_item = SPICE_CONTAINEROF(base, RedEmptyMsgPipeItem, base);
+
+ red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL);
+ red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item)
+{
+ spice_assert(red_channel_client_no_item_being_sent(rcc));
+ red_channel_client_reset_send_data(rcc);
+ switch (item->type) {
+ case RED_PIPE_ITEM_TYPE_SET_ACK:
+ red_channel_client_send_set_ack(rcc);
+ break;
+ case RED_PIPE_ITEM_TYPE_MIGRATE:
+ red_channel_client_send_migrate(rcc);
+ break;
+ case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
+ red_channel_client_send_empty_msg(rcc, item);
+ break;
+ case RED_PIPE_ITEM_TYPE_PING:
+ red_channel_client_send_ping(rcc);
+ break;
+ default:
+ rcc->channel->channel_cbs.send_item(rcc, item);
+ return;
+ }
+ free(item);
+}
+
+static void red_channel_client_release_item(RedChannelClient *rcc,
+ RedPipeItem *item,
+ int item_pushed)
+{
+ switch (item->type) {
+ case RED_PIPE_ITEM_TYPE_SET_ACK:
+ case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
+ case RED_PIPE_ITEM_TYPE_MIGRATE:
+ case RED_PIPE_ITEM_TYPE_PING:
+ free(item);
+ break;
+ default:
+ rcc->channel->channel_cbs.release_item(rcc, item, item_pushed);
+ }
+}
+
+static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
+{
+ if (rcc->send_data.item) {
+ red_channel_client_release_item(rcc,
+ rcc->send_data.item, TRUE);
+ rcc->send_data.item = NULL;
+ }
+}
+
+static void red_channel_client_restore_main_sender(RedChannelClient *rcc)
+{
+ spice_marshaller_reset(rcc->send_data.urgent.marshaller);
+ rcc->send_data.marshaller = rcc->send_data.main.marshaller;
+ rcc->send_data.header.data = rcc->send_data.main.header_data;
+ if (!rcc->is_mini_header) {
+ rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
+ }
+ rcc->send_data.item = rcc->send_data.main.item;
+}
+
+void red_channel_client_on_out_msg_done(void *opaque)
+{
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
+ int fd;
+
+ rcc->send_data.size = 0;
+
+ if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) {
+ if (reds_stream_send_msgfd(rcc->stream, fd) < 0) {
+ perror("sendfd");
+ red_channel_client_disconnect(rcc);
+ if (fd != -1)
+ close(fd);
+ return;
+ }
+ if (fd != -1)
+ close(fd);
+ }
+
+ red_channel_client_release_sent_item(rcc);
+ if (rcc->send_data.blocked) {
+ rcc->send_data.blocked = FALSE;
+ rcc->channel->core->watch_update_mask(rcc->stream->watch,
+ SPICE_WATCH_EVENT_READ);
+ }
+
+ if (red_channel_client_urgent_marshaller_is_active(rcc)) {
+ red_channel_client_restore_main_sender(rcc);
+ spice_assert(rcc->send_data.header.data != NULL);
+ red_channel_client_begin_send_message(rcc);
+ } else {
+ if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) {
+ /* It is possible that the socket will become idle, so we may be able to test latency */
+ red_channel_client_restart_ping_timer(rcc);
+ }
+ }
+
+}
+
+static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item)
+{
+ rcc->pipe_size--;
+ ring_remove(&item->link);
+}
+
+static void red_channel_client_set_remote_caps(RedChannelClient* rcc,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps)
+{
+ rcc->remote_caps.num_common_caps = num_common_caps;
+ rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t));
+
+ rcc->remote_caps.num_caps = num_caps;
+ rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t));
+}
+
+static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc)
+{
+ rcc->remote_caps.num_common_caps = 0;
+ free(rcc->remote_caps.common_caps);
+ rcc->remote_caps.num_caps = 0;
+ free(rcc->remote_caps.caps);
+}
+
+int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap)
+{
+ return test_capability(rcc->remote_caps.common_caps,
+ rcc->remote_caps.num_common_caps,
+ cap);
+}
+
+int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap)
+{
+ return test_capability(rcc->remote_caps.caps,
+ rcc->remote_caps.num_caps,
+ cap);
+}
+
+static void red_channel_client_push_ping(RedChannelClient *rcc)
+{
+ spice_assert(rcc->latency_monitor.state == PING_STATE_NONE);
+ rcc->latency_monitor.state = PING_STATE_WARMUP;
+ rcc->latency_monitor.warmup_was_sent = FALSE;
+ rcc->latency_monitor.id = rand();
+ red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
+ red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
+}
+
+static void red_channel_client_ping_timer(void *opaque)
+{
+ RedChannelClient *rcc = opaque;
+
+ spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER);
+ red_channel_client_cancel_ping_timer(rcc);
+
+#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */
+ {
+ int so_unsent_size = 0;
+
+ /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */
+ if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) {
+ spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno));
+ }
+ if (so_unsent_size > 0) {
+ /* tcp snd buffer is still occupied. rescheduling ping */
+ red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+ } else {
+ red_channel_client_push_ping(rcc);
+ }
+ }
+#else /* ifdef HAVE_LINUX_SOCKIOS_H */
+ /* More portable alternative code path (less accurate but avoids bogus ioctls)*/
+ red_channel_client_push_ping(rcc);
+#endif /* ifdef HAVE_LINUX_SOCKIOS_H */
+}
+
+static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
+{
+ return (rcc->channel->handle_acks &&
+ (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
+}
+
+/*
+ * When a connection is not alive (and we can't detect it via a socket error), we
+ * reach one of these 2 states:
+ * (1) Sending msgs is blocked: either writes return EAGAIN
+ * or we are missing MSGC_ACK from the client.
+ * (2) MSG_PING was sent without receiving a MSGC_PONG in reply.
+ *
+ * The connectivity_timer callback tests if the channel's state matches one of the above.
+ * In case it does, on the next time the timer is called, it checks if the connection has
+ * been idle during the time that passed since the previous timer call. If the connection
+ * has been idle, we consider the client as disconnected.
+ */
+static void red_channel_client_connectivity_timer(void *opaque)
+{
+ RedChannelClient *rcc = opaque;
+ RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor;
+ int is_alive = TRUE;
+
+ if (monitor->state == CONNECTIVITY_STATE_BLOCKED) {
+ if (monitor->in_bytes == 0 && monitor->out_bytes == 0) {
+ if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) {
+ spice_error("mismatch between rcc-state and connectivity-state");
+ }
+ spice_debug("rcc is blocked; connection is idle");
+ is_alive = FALSE;
+ }
+ } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) {
+ if (monitor->in_bytes == 0) {
+ if (rcc->latency_monitor.state != PING_STATE_WARMUP &&
+ rcc->latency_monitor.state != PING_STATE_LATENCY) {
+ spice_error("mismatch between rcc-state and connectivity-state");
+ }
+ spice_debug("rcc waits for pong; connection is idle");
+ is_alive = FALSE;
+ }
+ }
+
+ if (is_alive) {
+ monitor->in_bytes = 0;
+ monitor->out_bytes = 0;
+ if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) {
+ monitor->state = CONNECTIVITY_STATE_BLOCKED;
+ } else if (rcc->latency_monitor.state == PING_STATE_WARMUP ||
+ rcc->latency_monitor.state == PING_STATE_LATENCY) {
+ monitor->state = CONNECTIVITY_STATE_WAIT_PONG;
+ } else {
+ monitor->state = CONNECTIVITY_STATE_CONNECTED;
+ }
+ rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
+ rcc->connectivity_monitor.timeout);
+ } else {
+ monitor->state = CONNECTIVITY_STATE_DISCONNECTED;
+ spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting",
+ rcc, rcc->channel->type, rcc->channel->id, monitor->timeout);
+ red_channel_client_disconnect(rcc);
+ }
+}
+
+void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms)
+{
+ if (!red_channel_client_is_connected(rcc)) {
+ return;
+ }
+ spice_debug(NULL);
+ spice_assert(timeout_ms > 0);
+ /*
+ * If latency_monitor is not active, we activate it in order to enable
+ * periodic ping messages so that we will be be able to identify a disconnected
+ * channel-client even if there are no ongoing channel specific messages
+ * on this channel.
+ */
+ if (rcc->latency_monitor.timer == NULL) {
+ rcc->latency_monitor.timer = rcc->channel->core->timer_add(
+ rcc->channel->core, red_channel_client_ping_timer, rcc);
+ if (!red_client_during_migrate_at_target(rcc->client)) {
+ red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+ }
+ rcc->latency_monitor.roundtrip = -1;
+ }
+ if (rcc->connectivity_monitor.timer == NULL) {
+ rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED;
+ rcc->connectivity_monitor.timer = rcc->channel->core->timer_add(
+ rcc->channel->core, red_channel_client_connectivity_timer, rcc);
+ rcc->connectivity_monitor.timeout = timeout_ms;
+ if (!red_client_during_migrate_at_target(rcc->client)) {
+ rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
+ rcc->connectivity_monitor.timeout);
+ }
+ }
+}
+
+static void red_channel_client_event(int fd, int event, void *data)
+{
+ RedChannelClient *rcc = (RedChannelClient *)data;
+
+ red_channel_client_ref(rcc);
+ if (event & SPICE_WATCH_EVENT_READ) {
+ red_channel_client_receive(rcc);
+ }
+ if (event & SPICE_WATCH_EVENT_WRITE) {
+ red_channel_client_push(rcc);
+ }
+ red_channel_client_unref(rcc);
+}
+
+static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header)
+{
+ return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size);
+}
+
+static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header)
+{
+ return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size);
+}
+
+static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header)
+{
+ return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type);
+}
+
+static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header)
+{
+ return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type);
+}
+
+static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
+{
+ ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type);
+}
+
+static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
+{
+ ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type);
+}
+
+static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
+{
+ ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size);
+}
+
+static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
+{
+ ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size);
+}
+
+static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
+{
+ ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial);
+}
+
+static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
+{
+ spice_error("attempt to set header serial on mini header");
+}
+
+static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
+{
+ ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list);
+}
+
+static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
+{
+ spice_error("attempt to set header sub list on mini header");
+}
+
+static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader),
+ full_header_set_msg_type,
+ full_header_set_msg_size,
+ full_header_set_msg_serial,
+ full_header_set_msg_sub_list,
+ full_header_get_msg_type,
+ full_header_get_msg_size};
+
+static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader),
+ mini_header_set_msg_type,
+ mini_header_set_msg_size,
+ mini_header_set_msg_serial,
+ mini_header_set_msg_sub_list,
+ mini_header_get_msg_type,
+ mini_header_get_msg_size};
+
+static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client)
+{
+ if (red_client_get_channel(client, channel->type, channel->id)) {
+ spice_printerr("Error client %p: duplicate channel type %d id %d",
+ client, channel->type, channel->id);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
+ RedsStream *stream,
+ int monitor_latency,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps)
+{
+ RedChannelClient *rcc = NULL;
+
+ pthread_mutex_lock(&client->lock);
+ if (!red_channel_client_pre_create_validate(channel, client)) {
+ goto error;
+ }
+ spice_assert(stream && channel && size >= sizeof(RedChannelClient));
+ rcc = spice_malloc0(size);
+ rcc->stream = stream;
+ rcc->channel = channel;
+ rcc->client = client;
+ rcc->refs = 1;
+ rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
+ // block flags)
+ rcc->ack_data.client_generation = ~0;
+ rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
+ rcc->send_data.main.marshaller = spice_marshaller_new();
+ rcc->send_data.urgent.marshaller = spice_marshaller_new();
+
+ rcc->send_data.marshaller = rcc->send_data.main.marshaller;
+
+ rcc->incoming.opaque = rcc;
+ rcc->incoming.cb = &channel->incoming_cb;
+
+ rcc->outgoing.opaque = rcc;
+ rcc->outgoing.cb = &channel->outgoing_cb;
+ rcc->outgoing.pos = 0;
+ rcc->outgoing.size = 0;
+
+ red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
+ if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
+ rcc->incoming.header = mini_header_wrapper;
+ rcc->send_data.header = mini_header_wrapper;
+ rcc->is_mini_header = TRUE;
+ } else {
+ rcc->incoming.header = full_header_wrapper;
+ rcc->send_data.header = full_header_wrapper;
+ rcc->is_mini_header = FALSE;
+ }
+
+ rcc->incoming.header.data = rcc->incoming.header_buf;
+ rcc->incoming.serial = 1;
+
+ if (!channel->channel_cbs.config_socket(rcc)) {
+ goto error;
+ }
+
+ ring_init(&rcc->pipe);
+ rcc->pipe_size = 0;
+
+ stream->watch = channel->core->watch_add(channel->core,
+ stream->socket,
+ SPICE_WATCH_EVENT_READ,
+ red_channel_client_event, rcc);
+ rcc->id = g_list_length(channel->clients);
+ red_channel_add_client(channel, rcc);
+ red_client_add_channel(client, rcc);
+ red_channel_ref(channel);
+ pthread_mutex_unlock(&client->lock);
+
+ if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) {
+ rcc->latency_monitor.timer = channel->core->timer_add(
+ channel->core, red_channel_client_ping_timer, rcc);
+ if (!client->during_target_migrate) {
+ red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+ }
+ rcc->latency_monitor.roundtrip = -1;
+ }
+
+ return rcc;
+error:
+ free(rcc);
+ reds_stream_free(stream);
+ pthread_mutex_unlock(&client->lock);
+ return NULL;
+}
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps)
+{
+ RedChannelClient *rcc = NULL;
+
+ spice_assert(size >= sizeof(RedChannelClient));
+
+ pthread_mutex_lock(&client->lock);
+ if (!red_channel_client_pre_create_validate(channel, client)) {
+ goto error;
+ }
+ rcc = spice_malloc0(size);
+ rcc->refs = 1;
+ rcc->client = client;
+ rcc->channel = channel;
+ red_channel_ref(channel);
+ red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
+ if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
+ rcc->incoming.header = mini_header_wrapper;
+ rcc->send_data.header = mini_header_wrapper;
+ rcc->is_mini_header = TRUE;
+ } else {
+ rcc->incoming.header = full_header_wrapper;
+ rcc->send_data.header = full_header_wrapper;
+ rcc->is_mini_header = FALSE;
+ }
+
+ rcc->incoming.header.data = rcc->incoming.header_buf;
+ rcc->incoming.serial = 1;
+ ring_init(&rcc->pipe);
+
+ rcc->dummy = TRUE;
+ rcc->dummy_connected = TRUE;
+ red_channel_add_client(channel, rcc);
+ red_client_add_channel(client, rcc);
+ pthread_mutex_unlock(&client->lock);
+ return rcc;
+error:
+ pthread_mutex_unlock(&client->lock);
+ return NULL;
+}
+
+void red_channel_client_seamless_migration_done(RedChannelClient *rcc)
+{
+ rcc->wait_migrate_data = FALSE;
+
+ if (red_client_seamless_migration_done_for_channel(rcc->client, rcc)) {
+ if (rcc->latency_monitor.timer) {
+ red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+ }
+ if (rcc->connectivity_monitor.timer) {
+ rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
+ rcc->connectivity_monitor.timeout);
+ }
+ }
+}
+
+void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc)
+{
+ if (rcc->latency_monitor.timer) {
+ red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+ }
+}
+
+int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc)
+{
+ return rcc->wait_migrate_data;
+}
+
+void red_channel_client_default_migrate(RedChannelClient *rcc)
+{
+ if (rcc->latency_monitor.timer) {
+ red_channel_client_cancel_ping_timer(rcc);
+ rcc->channel->core->timer_remove(rcc->latency_monitor.timer);
+ rcc->latency_monitor.timer = NULL;
+ }
+ if (rcc->connectivity_monitor.timer) {
+ rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer);
+ rcc->connectivity_monitor.timer = NULL;
+ }
+ red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE);
+}
+
+void red_channel_client_ref(RedChannelClient *rcc)
+{
+ rcc->refs++;
+}
+
+void red_channel_client_unref(RedChannelClient *rcc)
+{
+ if (--rcc->refs != 0) {
+ return;
+ }
+
+ spice_debug("destroy rcc=%p", rcc);
+
+ reds_stream_free(rcc->stream);
+ rcc->stream = NULL;
+
+ if (rcc->send_data.main.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.main.marshaller);
+ }
+
+ if (rcc->send_data.urgent.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.urgent.marshaller);
+ }
+
+ red_channel_client_destroy_remote_caps(rcc);
+ if (rcc->channel) {
+ red_channel_unref(rcc->channel);
+ }
+ free(rcc);
+}
+
+void red_channel_client_destroy(RedChannelClient *rcc)
+{
+ rcc->destroying = TRUE;
+ red_channel_client_disconnect(rcc);
+ red_client_remove_channel(rcc);
+ red_channel_client_unref(rcc);
+}
+
+void red_channel_client_shutdown(RedChannelClient *rcc)
+{
+ if (rcc->stream && !rcc->stream->shutdown) {
+ rcc->channel->core->watch_remove(rcc->stream->watch);
+ rcc->stream->watch = NULL;
+ shutdown(rcc->stream->socket, SHUT_RDWR);
+ rcc->stream->shutdown = TRUE;
+ }
+}
+
+static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
+{
+ ssize_t n;
+
+ if (!stream) {
+ return;
+ }
+
+ if (handler->size == 0) {
+ handler->vec = handler->vec_buf;
+ handler->size = handler->cb->get_msg_size(handler->opaque);
+ if (!handler->size) { // nothing to be sent
+ return;
+ }
+ }
+
+ for (;;) {
+ handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
+ n = reds_stream_writev(stream, handler->vec, handler->vec_size);
+ if (n == -1) {
+ switch (errno) {
+ case EAGAIN:
+ handler->cb->on_block(handler->opaque);
+ return;
+ case EINTR:
+ continue;
+ case EPIPE:
+ handler->cb->on_error(handler->opaque);
+ return;
+ default:
+ spice_printerr("%s", strerror(errno));
+ handler->cb->on_error(handler->opaque);
+ return;
+ }
+ } else {
+ handler->pos += n;
+ handler->cb->on_output(handler->opaque, n);
+ if (handler->pos == handler->size) { // finished writing data
+ /* reset handler before calling on_msg_done, since it
+ * can trigger another call to red_peer_handle_outgoing (when
+ * switching from the urgent marshaller to the main one */
+ handler->vec = handler->vec_buf;
+ handler->pos = 0;
+ handler->size = 0;
+ handler->cb->on_msg_done(handler->opaque);
+ return;
+ }
+ }
+ }
+}
+
+/* return the number of bytes read. -1 in case of error */
+static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
+{
+ uint8_t *pos = buf;
+ while (size) {
+ int now;
+ if (stream->shutdown) {
+ return -1;
+ }
+ now = reds_stream_read(stream, pos, size);
+ if (now <= 0) {
+ if (now == 0) {
+ return -1;
+ }
+ spice_assert(now == -1);
+ if (errno == EAGAIN) {
+ break;
+ } else if (errno == EINTR) {
+ continue;
+ } else if (errno == EPIPE) {
+ return -1;
+ } else {
+ spice_printerr("%s", strerror(errno));
+ return -1;
+ }
+ } else {
+ size -= now;
+ pos += now;
+ }
+ }
+ return pos - buf;
+}
+
+// TODO: this implementation, as opposed to the old implementation in red_worker,
+// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
+// arithmetic for the case where a single cb_read could return multiple messages. But
+// this is suboptimal potentially. Profile and consider fixing.
+static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler)
+{
+ int bytes_read;
+ uint8_t *parsed;
+ size_t parsed_size;
+ message_destructor_t parsed_free;
+ uint16_t msg_type;
+ uint32_t msg_size;
+
+ /* XXX: This needs further investigation as to the underlying cause, it happened
+ * after spicec disconnect (but not with spice-gtk) repeatedly. */
+ if (!stream) {
+ return;
+ }
+
+ for (;;) {
+ int ret_handle;
+ if (handler->header_pos < handler->header.header_size) {
+ bytes_read = red_peer_receive(stream,
+ handler->header.data + handler->header_pos,
+ handler->header.header_size - handler->header_pos);
+ if (bytes_read == -1) {
+ handler->cb->on_error(handler->opaque);
+ return;
+ }
+ handler->cb->on_input(handler->opaque, bytes_read);
+ handler->header_pos += bytes_read;
+
+ if (handler->header_pos != handler->header.header_size) {
+ return;
+ }
+ }
+
+ msg_size = handler->header.get_msg_size(&handler->header);
+ msg_type = handler->header.get_msg_type(&handler->header);
+ if (handler->msg_pos < msg_size) {
+ if (!handler->msg) {
+ handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size);
+ if (handler->msg == NULL) {
+ spice_printerr("ERROR: channel refused to allocate buffer.");
+ handler->cb->on_error(handler->opaque);
+ return;
+ }
+ }
+
+ bytes_read = red_peer_receive(stream,
+ handler->msg + handler->msg_pos,
+ msg_size - handler->msg_pos);
+ if (bytes_read == -1) {
+ handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
+ handler->cb->on_error(handler->opaque);
+ return;
+ }
+ handler->cb->on_input(handler->opaque, bytes_read);
+ handler->msg_pos += bytes_read;
+ if (handler->msg_pos != msg_size) {
+ return;
+ }
+ }
+
+ if (handler->cb->parser) {
+ parsed = handler->cb->parser(handler->msg,
+ handler->msg + msg_size, msg_type,
+ SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
+ if (parsed == NULL) {
+ spice_printerr("failed to parse message type %d", msg_type);
+ handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
+ handler->cb->on_error(handler->opaque);
+ return;
+ }
+ ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
+ msg_type, parsed);
+ parsed_free(parsed);
+ } else {
+ ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size,
+ handler->msg);
+ }
+ handler->msg_pos = 0;
+ handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
+ handler->msg = NULL;
+ handler->header_pos = 0;
+
+ if (!ret_handle) {
+ handler->cb->on_error(handler->opaque);
+ return;
+ }
+ }
+}
+
+void red_channel_client_receive(RedChannelClient *rcc)
+{
+ red_channel_client_ref(rcc);
+ red_peer_handle_incoming(rcc->stream, &rcc->incoming);
+ red_channel_client_unref(rcc);
+}
+
+void red_channel_client_send(RedChannelClient *rcc)
+{
+ red_channel_client_ref(rcc);
+ red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
+ red_channel_client_unref(rcc);
+}
+
+static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc)
+{
+ RedPipeItem *item;
+
+ if (!rcc || rcc->send_data.blocked
+ || red_channel_client_waiting_for_ack(rcc)
+ || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) {
+ return NULL;
+ }
+ red_channel_client_pipe_remove(rcc, item);
+ return item;
+}
+
+void red_channel_client_push(RedChannelClient *rcc)
+{
+ RedPipeItem *pipe_item;
+
+ if (!rcc->during_send) {
+ rcc->during_send = TRUE;
+ } else {
+ return;
+ }
+ red_channel_client_ref(rcc);
+ if (rcc->send_data.blocked) {
+ red_channel_client_send(rcc);
+ }
+
+ if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) {
+ rcc->send_data.blocked = TRUE;
+ spice_printerr("ERROR: an item waiting to be sent and not blocked");
+ }
+
+ while ((pipe_item = red_channel_client_pipe_item_get(rcc))) {
+ red_channel_client_send_item(rcc, pipe_item);
+ }
+ if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe)
+ && rcc->stream->watch) {
+ rcc->channel->core->watch_update_mask(rcc->stream->watch,
+ SPICE_WATCH_EVENT_READ);
+ }
+ rcc->during_send = FALSE;
+ red_channel_client_unref(rcc);
+}
+
+int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc)
+{
+ if (rcc->latency_monitor.roundtrip < 0) {
+ return rcc->latency_monitor.roundtrip;
+ }
+ return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC;
+}
+
+void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
+{
+ rcc->ack_data.messages_window = 0;
+ red_channel_client_push(rcc);
+}
+
+static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping)
+{
+ uint64_t now;
+
+ /* ignoring unexpected pongs, or post-migration pongs for pings that
+ * started just before migration */
+ if (ping->id != rcc->latency_monitor.id) {
+ spice_warning("ping-id (%u)!= pong-id %u",
+ rcc->latency_monitor.id, ping->id);
+ return;
+ }
+
+ now = spice_get_monotonic_time_ns();
+
+ if (rcc->latency_monitor.state == PING_STATE_WARMUP) {
+ rcc->latency_monitor.state = PING_STATE_LATENCY;
+ return;
+ } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) {
+ spice_warning("unexpected");
+ return;
+ }
+
+ /* set TCP_NODELAY=0, in case we reverted it for the test*/
+ if (!rcc->latency_monitor.tcp_nodelay) {
+ int delay_val = 0;
+
+ if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+ sizeof(delay_val)) == -1) {
+ if (errno != ENOTSUP) {
+ spice_warning("setsockopt failed, %s", strerror(errno));
+ }
+ }
+ }
+
+ /*
+ * The real network latency shouldn't change during the connection. However,
+ * the measurements can be bigger than the real roundtrip due to other
+ * threads or processes that are utilizing the network. We update the roundtrip
+ * measurement with the minimal value we encountered till now.
+ */
+ if (rcc->latency_monitor.roundtrip < 0 ||
+ now - ping->timestamp < rcc->latency_monitor.roundtrip) {
+ rcc->latency_monitor.roundtrip = now - ping->timestamp;
+ spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC);
+ }
+
+ rcc->latency_monitor.last_pong_time = now;
+ rcc->latency_monitor.state = PING_STATE_NONE;
+ red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS);
+}
+
+static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
+{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ if (channel->channel_cbs.handle_migrate_flush_mark) {
+ channel->channel_cbs.handle_migrate_flush_mark(rcc);
+ }
+}
+
+// TODO: the whole migration is broken with multiple clients. What do we want to do?
+// basically just
+// 1) source send mark to all
+// 2) source gets at various times the data (waits for all)
+// 3) source migrates to target
+// 4) target sends data to all
+// So need to make all the handlers work with per channel/client data (what data exactly?)
+static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
+{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ spice_debug("channel type %d id %d rcc %p size %u",
+ channel->type, channel->id, rcc, size);
+ if (!channel->channel_cbs.handle_migrate_data) {
+ return;
+ }
+ if (!red_channel_client_is_waiting_for_migrate_data(rcc)) {
+ spice_channel_client_error(rcc, "unexpected");
+ return;
+ }
+ if (channel->channel_cbs.handle_migrate_data_get_serial) {
+ red_channel_client_set_message_serial(rcc,
+ channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message));
+ }
+ if (!channel->channel_cbs.handle_migrate_data(rcc, size, message)) {
+ spice_channel_client_error(rcc, "handle_migrate_data failed");
+ return;
+ }
+ red_channel_client_seamless_migration_done(rcc);
+}
+
+
+int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
+ uint16_t type, void *message)
+{
+ switch (type) {
+ case SPICE_MSGC_ACK_SYNC:
+ if (size != sizeof(uint32_t)) {
+ spice_printerr("bad message size");
+ return FALSE;
+ }
+ rcc->ack_data.client_generation = *(uint32_t *)(message);
+ break;
+ case SPICE_MSGC_ACK:
+ if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
+ rcc->ack_data.messages_window -= rcc->ack_data.client_window;
+ red_channel_client_push(rcc);
+ }
+ break;
+ case SPICE_MSGC_DISCONNECTING:
+ break;
+ case SPICE_MSGC_MIGRATE_FLUSH_MARK:
+ if (!rcc->wait_migrate_flush_mark) {
+ spice_error("unexpected flush mark");
+ return FALSE;
+ }
+ red_channel_handle_migrate_flush_mark(rcc);
+ rcc->wait_migrate_flush_mark = FALSE;
+ break;
+ case SPICE_MSGC_MIGRATE_DATA:
+ red_channel_handle_migrate_data(rcc, size, message);
+ break;
+ case SPICE_MSGC_PONG:
+ red_channel_client_handle_pong(rcc, message);
+ break;
+ default:
+ spice_printerr("invalid message type %u", type);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item)
+{
+ spice_assert(red_channel_client_no_item_being_sent(rcc));
+ spice_assert(msg_type != 0);
+ rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type);
+ rcc->send_data.item = item;
+ if (item) {
+ rcc->channel->channel_cbs.hold_item(rcc, item);
+ }
+}
+
+void red_channel_client_begin_send_message(RedChannelClient *rcc)
+{
+ SpiceMarshaller *m = rcc->send_data.marshaller;
+
+ // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
+ if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) {
+ spice_printerr("BUG: header->type == 0");
+ return;
+ }
+
+ /* canceling the latency test timer till the nework is idle */
+ red_channel_client_cancel_ping_timer(rcc);
+
+ spice_marshaller_flush(m);
+ rcc->send_data.size = spice_marshaller_get_total_size(m);
+ rcc->send_data.header.set_msg_size(&rcc->send_data.header,
+ rcc->send_data.size - rcc->send_data.header.header_size);
+ rcc->ack_data.messages_window++;
+ rcc->send_data.last_sent_serial = rcc->send_data.serial;
+ rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */
+ red_channel_client_send(rcc);
+}
+
+SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc)
+{
+ spice_assert(red_channel_client_no_item_being_sent(rcc));
+ spice_assert(rcc->send_data.header.data != NULL);
+ rcc->send_data.main.header_data = rcc->send_data.header.data;
+ rcc->send_data.main.item = rcc->send_data.item;
+
+ rcc->send_data.marshaller = rcc->send_data.urgent.marshaller;
+ rcc->send_data.item = NULL;
+ red_channel_client_reset_send_data(rcc);
+ return rcc->send_data.marshaller;
+}
+
+uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
+{
+ return rcc->send_data.serial;
+}
+
+void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
+{
+ rcc->send_data.last_sent_serial = serial;
+ rcc->send_data.serial = serial;
+}
+
+static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos)
+{
+ spice_assert(rcc && item);
+ if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) {
+ spice_debug("rcc is disconnected %p", rcc);
+ red_channel_client_release_item(rcc, item, FALSE);
+ return FALSE;
+ }
+ if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) {
+ rcc->channel->core->watch_update_mask(rcc->stream->watch,
+ SPICE_WATCH_EVENT_READ |
+ SPICE_WATCH_EVENT_WRITE);
+ }
+ rcc->pipe_size++;
+ ring_add(pos, &item->link);
+ return TRUE;
+}
+
+void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item)
+{
+
+ client_pipe_add(rcc, item, &rcc->pipe);
+}
+
+void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item)
+{
+ red_channel_client_pipe_add(rcc, item);
+ red_channel_client_push(rcc);
+}
+
+void red_channel_client_pipe_add_after(RedChannelClient *rcc,
+ RedPipeItem *item,
+ RedPipeItem *pos)
+{
+ spice_assert(pos);
+ client_pipe_add(rcc, item, &pos->link);
+}
+
+int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc,
+ RedPipeItem *item)
+{
+ return ring_item_is_linked(&item->link);
+}
+
+void red_channel_client_pipe_add_tail(RedChannelClient *rcc,
+ RedPipeItem *item)
+{
+ client_pipe_add(rcc, item, rcc->pipe.prev);
+}
+
+void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item)
+{
+ if (client_pipe_add(rcc, item, rcc->pipe.prev)) {
+ red_channel_client_push(rcc);
+ }
+}
+
+void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type)
+{
+ RedPipeItem *item = spice_new(RedPipeItem, 1);
+
+ red_pipe_item_init(item, pipe_item_type);
+ red_channel_client_pipe_add(rcc, item);
+ red_channel_client_push(rcc);
+}
+
+void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type)
+{
+ RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1);
+
+ red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG);
+ item->msg = msg_type;
+ red_channel_client_pipe_add(rcc, &item->base);
+ red_channel_client_push(rcc);
+}
+
+gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc)
+{
+ g_return_val_if_fail(rcc != NULL, TRUE);
+ return (rcc->pipe_size == 0) && (ring_is_empty(&rcc->pipe));
+}
+
+uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc)
+{
+ return rcc->pipe_size;
+}
+
+int red_channel_client_is_connected(RedChannelClient *rcc)
+{
+ if (!rcc->dummy) {
+ return rcc->channel
+ && (g_list_find(rcc->channel->clients, rcc) != NULL);
+ } else {
+ return rcc->dummy_connected;
+ }
+}
+
+static void red_channel_client_clear_sent_item(RedChannelClient *rcc)
+{
+ if (rcc->send_data.item) {
+ red_channel_client_release_item(rcc, rcc->send_data.item, TRUE);
+ rcc->send_data.item = NULL;
+ }
+ rcc->send_data.blocked = FALSE;
+ rcc->send_data.size = 0;
+}
+
+void red_channel_client_pipe_clear(RedChannelClient *rcc)
+{
+ RedPipeItem *item;
+
+ if (rcc) {
+ red_channel_client_clear_sent_item(rcc);
+ }
+ while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) {
+ ring_remove(&item->link);
+ red_channel_client_release_item(rcc, item, FALSE);
+ }
+ rcc->pipe_size = 0;
+}
+
+void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
+{
+ rcc->ack_data.messages_window = 0;
+}
+
+void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
+{
+ rcc->ack_data.client_window = client_window;
+}
+
+void red_channel_client_push_set_ack(RedChannelClient *rcc)
+{
+ red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK);
+}
+
+static void red_channel_client_disconnect_dummy(RedChannelClient *rcc)
+{
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ GList *link;
+ spice_assert(rcc->dummy);
+ if (channel && (link = g_list_find(channel->clients, rcc))) {
+ spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel,
+ channel->type, channel->id);
+ red_channel_remove_client(channel, link->data);
+ }
+ rcc->dummy_connected = FALSE;
+}
+
+void red_channel_client_disconnect(RedChannelClient *rcc)
+{
+ RedChannel *channel = rcc->channel;
+
+ if (rcc->dummy) {
+ red_channel_client_disconnect_dummy(rcc);
+ return;
+ }
+ if (!red_channel_client_is_connected(rcc)) {
+ return;
+ }
+ spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel,
+ channel->type, channel->id);
+ red_channel_client_pipe_clear(rcc);
+ if (rcc->stream->watch) {
+ channel->core->watch_remove(rcc->stream->watch);
+ rcc->stream->watch = NULL;
+ }
+ if (rcc->latency_monitor.timer) {
+ channel->core->timer_remove(rcc->latency_monitor.timer);
+ rcc->latency_monitor.timer = NULL;
+ }
+ if (rcc->connectivity_monitor.timer) {
+ channel->core->timer_remove(rcc->connectivity_monitor.timer);
+ rcc->connectivity_monitor.timer = NULL;
+ }
+ red_channel_remove_client(channel, rcc);
+ channel->channel_cbs.on_disconnect(rcc);
+}
+
+int red_channel_client_is_blocked(RedChannelClient *rcc)
+{
+ return rcc && rcc->send_data.blocked;
+}
+
+int red_channel_client_send_message_pending(RedChannelClient *rcc)
+{
+ return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0;
+}
+
+SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
+{
+ return rcc->send_data.marshaller;
+}
+
+RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
+{
+ return rcc->stream;
+}
+
+RedClient *red_channel_client_get_client(RedChannelClient *rcc)
+{
+ return rcc->client;
+}
+
+void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list)
+{
+ rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list);
+}
+
+/* TODO: more evil sync stuff. anything with the word wait in it's name. */
+int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
+ RedPipeItem *item,
+ int64_t timeout)
+{
+ uint64_t end_time;
+ int item_in_pipe;
+
+ spice_info(NULL);
+
+ if (timeout != -1) {
+ end_time = spice_get_monotonic_time_ns() + timeout;
+ } else {
+ end_time = UINT64_MAX;
+ }
+
+ rcc->channel->channel_cbs.hold_item(rcc, item);
+
+ if (red_channel_client_is_blocked(rcc)) {
+ red_channel_client_receive(rcc);
+ red_channel_client_send(rcc);
+ }
+ red_channel_client_push(rcc);
+
+ while((item_in_pipe = ring_item_is_linked(&item->link)) &&
+ (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) {
+ usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
+ red_channel_client_receive(rcc);
+ red_channel_client_send(rcc);
+ red_channel_client_push(rcc);
+ }
+
+ red_channel_client_release_item(rcc, item, TRUE);
+ if (item_in_pipe) {
+ spice_warning("timeout");
+ return FALSE;
+ } else {
+ return red_channel_client_wait_outgoing_item(rcc,
+ timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns());
+ }
+}
+
+int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
+ int64_t timeout)
+{
+ uint64_t end_time;
+ int blocked;
+
+ if (!red_channel_client_is_blocked(rcc)) {
+ return TRUE;
+ }
+ if (timeout != -1) {
+ end_time = spice_get_monotonic_time_ns() + timeout;
+ } else {
+ end_time = UINT64_MAX;
+ }
+ spice_info("blocked");
+
+ do {
+ usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
+ red_channel_client_receive(rcc);
+ red_channel_client_send(rcc);
+ } while ((blocked = red_channel_client_is_blocked(rcc)) &&
+ (timeout == -1 || spice_get_monotonic_time_ns() < end_time));
+
+ if (blocked) {
+ spice_warning("timeout");
+ return FALSE;
+ } else {
+ spice_assert(red_channel_client_no_item_being_sent(rcc));
+ return TRUE;
+ }
+}
+
+void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc)
+{
+ if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) {
+ red_channel_client_disconnect(rcc);
+ } else {
+ spice_assert(red_channel_client_no_item_being_sent(rcc));
+ }
+}
+
+gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc)
+{
+ return !rcc || (rcc->send_data.size == 0);
+}
+
+void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
+ RedPipeItem *item)
+{
+ red_channel_client_pipe_remove(rcc, item);
+ red_channel_client_release_item(rcc, item, FALSE);
+}
+
+/* client mutex should be locked before this call */
+gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc)
+{
+ gboolean ret = FALSE;
+
+ if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) {
+ rcc->wait_migrate_data = TRUE;
+ ret = TRUE;
+ }
+ spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc,
+ rcc->wait_migrate_data);
+
+ return ret;
+}
+
+void red_channel_client_set_destroying(RedChannelClient *rcc)
+{
+ rcc->destroying = TRUE;
+}
+
+gboolean red_channel_client_is_destroying(RedChannelClient *rcc)
+{
+ return rcc->destroying;
+}
diff --git a/server/red-channel-client.h b/server/red-channel-client.h
new file mode 100644
index 0000000..cf70abd
--- /dev/null
+++ b/server/red-channel-client.h
@@ -0,0 +1,176 @@
+/*
+ Copyright (C) 2009-2015 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _H_RED_CHANNEL_CLIENT
+#define _H_RED_CHANNEL_CLIENT
+
+#include "common/marshaller.h"
+#include "red-pipe-item.h"
+#include "reds-stream.h"
+
+typedef struct RedChannel RedChannel;
+typedef struct RedClient RedClient;
+typedef struct IncomingHandler IncomingHandler;
+
+typedef struct RedChannelClient RedChannelClient;
+
+/*
+ * When an error occurs over a channel, we treat it as a warning
+ * for spice-server and shutdown the channel.
+ */
+#define spice_channel_client_error(rcc, format, ...) \
+ do { \
+ RedChannel *_ch = red_channel_client_get_channel(rcc); \
+ spice_warning("rcc %p type %u id %u: " format, rcc, \
+ _ch->type, _ch->id, ## __VA_ARGS__); \
+ red_channel_client_shutdown(rcc); \
+ } while (0)
+
+RedChannelClient *red_channel_client_create(int size, RedChannel *channel,
+ RedClient *client, RedsStream *stream,
+ int monitor_latency,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps);
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps);
+
+void red_channel_client_ref(RedChannelClient *rcc);
+void red_channel_client_unref(RedChannelClient *rcc);
+
+int red_channel_client_is_connected(RedChannelClient *rcc);
+void red_channel_client_default_migrate(RedChannelClient *rcc);
+int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc);
+void red_channel_client_destroy(RedChannelClient *rcc);
+int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap);
+int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap);
+/* shutdown is the only safe thing to do out of the client/channel
+ * thread. It will not touch the rings, just shutdown the socket.
+ * It should be followed by some way to gurantee a disconnection. */
+void red_channel_client_shutdown(RedChannelClient *rcc);
+/* handles general channel msgs from the client */
+int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
+ uint16_t type, void *message);
+/* when preparing send_data: should call init and then use marshaller */
+void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item);
+
+uint64_t red_channel_client_get_message_serial(RedChannelClient *channel);
+void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
+
+/* When sending a msg. Should first call red_channel_client_begin_send_message.
+ * It will first send the pending urgent data, if there is any, and then
+ * the rest of the data.
+ */
+void red_channel_client_begin_send_message(RedChannelClient *rcc);
+
+/*
+ * Stores the current send data, and switches to urgent send data.
+ * When it begins the actual send, it will send first the urgent data
+ * and afterward the rest of the data.
+ * Should be called only if during the marshalling of on message,
+ * the need to send another message, before, rises.
+ * Important: the serial of the non-urgent sent data, will be succeeded.
+ * return: the urgent send data marshaller
+ */
+SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc);
+
+/* returns -1 if we don't have an estimation */
+int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc);
+
+/* Checks periodically if the connection is still alive */
+void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms);
+
+void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos);
+int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add_tail(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item);
+/* for types that use this routine -> the pipe item should be freed */
+void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type);
+void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type);
+gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc);
+uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc);
+
+void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc);
+void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window);
+void red_channel_client_push_set_ack(RedChannelClient *rcc);
+
+gboolean red_channel_client_is_blocked(RedChannelClient *rcc);
+
+/* helper for channels that have complex logic that can possibly ready a send */
+int red_channel_client_send_message_pending(RedChannelClient *rcc);
+
+gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc);
+void red_channel_client_push(RedChannelClient *rcc);
+// TODO: again - what is the context exactly? this happens in channel disconnect. but our
+// current red_channel_shutdown also closes the socket - is there a socket to close?
+// are we reading from an fd here? arghh
+void red_channel_client_receive(RedChannelClient *rcc);
+void red_channel_client_send(RedChannelClient *rcc);
+void red_channel_client_disconnect(RedChannelClient *rcc);
+
+/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */
+SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc);
+RedsStream *red_channel_client_get_stream(RedChannelClient *rcc);
+RedClient *red_channel_client_get_client(RedChannelClient *rcc);
+
+/* Note that the header is valid only between red_channel_reset_send_data and
+ * red_channel_begin_send_message.*/
+void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list);
+
+/*
+ * blocking functions.
+ *
+ * timeout is in nano sec. -1 for no timeout.
+ *
+ * Return: TRUE if waiting succeeded. FALSE if timeout expired.
+ */
+
+int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
+ RedPipeItem *item,
+ int64_t timeout);
+int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
+ int64_t timeout);
+void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc);
+
+RedChannel* red_channel_client_get_channel(RedChannelClient* rcc);
+IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc);
+
+void red_channel_client_on_output(void *opaque, int n);
+void red_channel_client_on_input(void *opaque, int n);
+int red_channel_client_get_out_msg_size(void *opaque);
+void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec,
+ int *vec_size, int pos);
+void red_channel_client_on_out_block(void *opaque);
+void red_channel_client_on_out_msg_done(void *opaque);
+
+void red_channel_client_seamless_migration_done(RedChannelClient *rcc);
+void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc);
+void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc);
+
+gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc);
+void red_channel_client_set_destroying(RedChannelClient *rcc);
+gboolean red_channel_client_is_destroying(RedChannelClient *rcc);
+
+#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client))
+
+#endif /* _H_RED_CHANNEL_CLIENT */
diff --git a/server/red-channel.c b/server/red-channel.c
index c1b1e91..c714d90 100644
--- a/server/red-channel.c
+++ b/server/red-channel.c
@@ -22,20 +22,6 @@
#include <config.h>
#endif
-#include <glib.h>
-#include <stdio.h>
-#include <stdint.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <errno.h>
-#include <sys/ioctl.h>
-#ifdef HAVE_LINUX_SOCKIOS_H
-#include <linux/sockios.h> /* SIOCOUTQ */
-#endif
-
-#include "common/generated_server_marshallers.h"
#include "common/ring.h"
#include "red-channel.h"
@@ -44,41 +30,6 @@
#include "main-dispatcher.h"
#include "utils.h"
-typedef struct RedEmptyMsgPipeItem {
- RedPipeItem base;
- int msg;
-} RedEmptyMsgPipeItem;
-
-#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15)
-#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10)
-
-#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro
-
-enum QosPingState {
- PING_STATE_NONE,
- PING_STATE_TIMER,
- PING_STATE_WARMUP,
- PING_STATE_LATENCY,
-};
-
-enum ConnectivityState {
- CONNECTIVITY_STATE_CONNECTED,
- CONNECTIVITY_STATE_BLOCKED,
- CONNECTIVITY_STATE_WAIT_PONG,
- CONNECTIVITY_STATE_DISCONNECTED,
-};
-
-static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout);
-static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc);
-static void red_channel_client_restart_ping_timer(RedChannelClient *rcc);
-
-static void red_channel_client_event(int fd, int event, void *data);
-static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
-static void red_client_remove_channel(RedChannelClient *rcc);
-static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id);
-static void red_channel_client_restore_main_sender(RedChannelClient *rcc);
-static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc);
-
/*
* Lifetime of RedChannel, RedChannelClient and RedClient:
* RedChannel is created and destroyed by the calls to
@@ -113,574 +64,23 @@ static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc);
* If a call to red_channel_client_destroy is made from another location, it must be called
* from the channel's thread.
*/
-static void red_channel_ref(RedChannel *channel);
-static void red_channel_unref(RedChannel *channel);
-
-static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header)
-{
- return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size);
-}
-
-static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header)
-{
- return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size);
-}
-
-static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header)
-{
- return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type);
-}
-
-static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header)
-{
- return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type);
-}
-
-static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
-{
- ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type);
-}
-
-static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
-{
- ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type);
-}
-
-static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
-{
- ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size);
-}
-
-static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
-{
- ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size);
-}
-
-static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
-{
- ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial);
-}
-
-static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
-{
- spice_error("attempt to set header serial on mini header");
-}
-
-static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
-{
- ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list);
-}
-
-static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
-{
- spice_error("attempt to set header sub list on mini header");
-}
-
-static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader),
- full_header_set_msg_type,
- full_header_set_msg_size,
- full_header_set_msg_serial,
- full_header_set_msg_sub_list,
- full_header_get_msg_type,
- full_header_get_msg_size};
-
-static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader),
- mini_header_set_msg_type,
- mini_header_set_msg_size,
- mini_header_set_msg_serial,
- mini_header_set_msg_sub_list,
- mini_header_get_msg_type,
- mini_header_get_msg_size};
-
-/* return the number of bytes read. -1 in case of error */
-static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
-{
- uint8_t *pos = buf;
- while (size) {
- int now;
- if (stream->shutdown) {
- return -1;
- }
- now = reds_stream_read(stream, pos, size);
- if (now <= 0) {
- if (now == 0) {
- return -1;
- }
- spice_assert(now == -1);
- if (errno == EAGAIN) {
- break;
- } else if (errno == EINTR) {
- continue;
- } else if (errno == EPIPE) {
- return -1;
- } else {
- spice_printerr("%s", strerror(errno));
- return -1;
- }
- } else {
- size -= now;
- pos += now;
- }
- }
- return pos - buf;
-}
-
-// TODO: this implementation, as opposed to the old implementation in red_worker,
-// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
-// arithmetic for the case where a single cb_read could return multiple messages. But
-// this is suboptimal potentially. Profile and consider fixing.
-static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler)
-{
- int bytes_read;
- uint8_t *parsed;
- size_t parsed_size;
- message_destructor_t parsed_free;
- uint16_t msg_type;
- uint32_t msg_size;
-
- /* XXX: This needs further investigation as to the underlying cause, it happened
- * after spicec disconnect (but not with spice-gtk) repeatedly. */
- if (!stream) {
- return;
- }
-
- for (;;) {
- int ret_handle;
- if (handler->header_pos < handler->header.header_size) {
- bytes_read = red_peer_receive(stream,
- handler->header.data + handler->header_pos,
- handler->header.header_size - handler->header_pos);
- if (bytes_read == -1) {
- handler->cb->on_error(handler->opaque);
- return;
- }
- handler->cb->on_input(handler->opaque, bytes_read);
- handler->header_pos += bytes_read;
-
- if (handler->header_pos != handler->header.header_size) {
- return;
- }
- }
-
- msg_size = handler->header.get_msg_size(&handler->header);
- msg_type = handler->header.get_msg_type(&handler->header);
- if (handler->msg_pos < msg_size) {
- if (!handler->msg) {
- handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size);
- if (handler->msg == NULL) {
- spice_printerr("ERROR: channel refused to allocate buffer.");
- handler->cb->on_error(handler->opaque);
- return;
- }
- }
-
- bytes_read = red_peer_receive(stream,
- handler->msg + handler->msg_pos,
- msg_size - handler->msg_pos);
- if (bytes_read == -1) {
- handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
- handler->cb->on_error(handler->opaque);
- return;
- }
- handler->cb->on_input(handler->opaque, bytes_read);
- handler->msg_pos += bytes_read;
- if (handler->msg_pos != msg_size) {
- return;
- }
- }
-
- if (handler->cb->parser) {
- parsed = handler->cb->parser(handler->msg,
- handler->msg + msg_size, msg_type,
- SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
- if (parsed == NULL) {
- spice_printerr("failed to parse message type %d", msg_type);
- handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
- handler->cb->on_error(handler->opaque);
- return;
- }
- ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
- msg_type, parsed);
- parsed_free(parsed);
- } else {
- ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size,
- handler->msg);
- }
- handler->msg_pos = 0;
- handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
- handler->msg = NULL;
- handler->header_pos = 0;
-
- if (!ret_handle) {
- handler->cb->on_error(handler->opaque);
- return;
- }
- }
-}
-
-void red_channel_client_receive(RedChannelClient *rcc)
-{
- red_channel_client_ref(rcc);
- red_peer_handle_incoming(rcc->stream, &rcc->incoming);
- red_channel_client_unref(rcc);
-}
void red_channel_receive(RedChannel *channel)
{
g_list_foreach(channel->clients, (GFunc)red_channel_client_receive, NULL);
}
-static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
-{
- ssize_t n;
-
- if (!stream) {
- return;
- }
-
- if (handler->size == 0) {
- handler->vec = handler->vec_buf;
- handler->size = handler->cb->get_msg_size(handler->opaque);
- if (!handler->size) { // nothing to be sent
- return;
- }
- }
-
- for (;;) {
- handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
- n = reds_stream_writev(stream, handler->vec, handler->vec_size);
- if (n == -1) {
- switch (errno) {
- case EAGAIN:
- handler->cb->on_block(handler->opaque);
- return;
- case EINTR:
- continue;
- case EPIPE:
- handler->cb->on_error(handler->opaque);
- return;
- default:
- spice_printerr("%s", strerror(errno));
- handler->cb->on_error(handler->opaque);
- return;
- }
- } else {
- handler->pos += n;
- handler->cb->on_output(handler->opaque, n);
- if (handler->pos == handler->size) { // finished writing data
- /* reset handler before calling on_msg_done, since it
- * can trigger another call to red_peer_handle_outgoing (when
- * switching from the urgent marshaller to the main one */
- handler->vec = handler->vec_buf;
- handler->pos = 0;
- handler->size = 0;
- handler->cb->on_msg_done(handler->opaque);
- return;
- }
- }
- }
-}
-
-static void red_channel_client_on_output(void *opaque, int n)
-{
- RedChannelClient *rcc = opaque;
-
- if (rcc->connectivity_monitor.timer) {
- rcc->connectivity_monitor.out_bytes += n;
- }
- stat_inc_counter(reds, rcc->channel->out_bytes_counter, n);
-}
-
-static void red_channel_client_on_input(void *opaque, int n)
-{
- RedChannelClient *rcc = opaque;
-
- if (rcc->connectivity_monitor.timer) {
- rcc->connectivity_monitor.in_bytes += n;
- }
-}
-
static void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
{
red_channel_client_disconnect(rcc);
}
-static int red_channel_client_peer_get_out_msg_size(void *opaque)
-{
- RedChannelClient *rcc = (RedChannelClient *)opaque;
-
- return rcc->send_data.size;
-}
-
-static void red_channel_client_peer_prepare_out_msg(
- void *opaque, struct iovec *vec, int *vec_size, int pos)
-{
- RedChannelClient *rcc = (RedChannelClient *)opaque;
-
- *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
- vec, IOV_MAX, pos);
-}
-
-static void red_channel_client_peer_on_out_block(void *opaque)
-{
- RedChannelClient *rcc = (RedChannelClient *)opaque;
-
- rcc->send_data.blocked = TRUE;
- rcc->channel->core->watch_update_mask(rcc->stream->watch,
- SPICE_WATCH_EVENT_READ |
- SPICE_WATCH_EVENT_WRITE);
-}
-
-static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc)
-{
- return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller);
-}
-
-static void red_channel_client_reset_send_data(RedChannelClient *rcc)
-{
- spice_marshaller_reset(rcc->send_data.marshaller);
- rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller,
- rcc->send_data.header.header_size);
- spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size);
- rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0);
- rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0);
-
- /* Keeping the serial consecutive: resetting it if reset_send_data
- * has been called before, but no message has been sent since then.
- */
- if (rcc->send_data.last_sent_serial != rcc->send_data.serial) {
- spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1);
- /* When the urgent marshaller is active, the serial was incremented by
- * the call to reset_send_data that was made for the main marshaller.
- * The urgent msg receives this serial, and the main msg serial is
- * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial)
- * should be 1 in this case*/
- if (!red_channel_client_urgent_marshaller_is_active(rcc)) {
- rcc->send_data.serial = rcc->send_data.last_sent_serial;
- }
- }
- rcc->send_data.serial++;
-
- if (!rcc->is_mini_header) {
- spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller);
- rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0);
- rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
- }
-}
-
-void red_channel_client_push_set_ack(RedChannelClient *rcc)
-{
- red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK);
-}
-
-static void red_channel_client_send_set_ack(RedChannelClient *rcc)
-{
- SpiceMsgSetAck ack;
-
- spice_assert(rcc);
- red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
- ack.generation = ++rcc->ack_data.generation;
- ack.window = rcc->ack_data.client_window;
- rcc->ack_data.messages_window = 0;
-
- spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
-
- red_channel_client_begin_send_message(rcc);
-}
-
-static void red_channel_client_send_migrate(RedChannelClient *rcc)
-{
- SpiceMsgMigrate migrate;
-
- red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL);
- migrate.flags = rcc->channel->migration_flags;
- spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate);
- if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) {
- rcc->wait_migrate_flush_mark = TRUE;
- }
-
- red_channel_client_begin_send_message(rcc);
-}
-
-
-static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base)
-{
- RedEmptyMsgPipeItem *msg_pipe_item = SPICE_CONTAINEROF(base, RedEmptyMsgPipeItem, base);
-
- red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL);
- red_channel_client_begin_send_message(rcc);
-}
-
-static void red_channel_client_send_ping(RedChannelClient *rcc)
-{
- SpiceMsgPing ping;
-
- if (!rcc->latency_monitor.warmup_was_sent) { // latency test start
- int delay_val;
- socklen_t opt_size = sizeof(delay_val);
-
- rcc->latency_monitor.warmup_was_sent = TRUE;
- /*
- * When testing latency, TCP_NODELAY must be switched on, otherwise,
- * sending the ping message is delayed by Nagle algorithm, and the
- * roundtrip measurement is less accurate (bigger).
- */
- rcc->latency_monitor.tcp_nodelay = 1;
- if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
- &opt_size) == -1) {
- spice_warning("getsockopt failed, %s", strerror(errno));
- } else {
- rcc->latency_monitor.tcp_nodelay = delay_val;
- if (!delay_val) {
- delay_val = 1;
- if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
- sizeof(delay_val)) == -1) {
- if (errno != ENOTSUP) {
- spice_warning("setsockopt failed, %s", strerror(errno));
- }
- }
- }
- }
- }
-
- red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL);
- ping.id = rcc->latency_monitor.id;
- ping.timestamp = spice_get_monotonic_time_ns();
- spice_marshall_msg_ping(rcc->send_data.marshaller, &ping);
- red_channel_client_begin_send_message(rcc);
-}
-
-static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item)
-{
- spice_assert(red_channel_client_no_item_being_sent(rcc));
- red_channel_client_reset_send_data(rcc);
- switch (item->type) {
- case RED_PIPE_ITEM_TYPE_SET_ACK:
- red_channel_client_send_set_ack(rcc);
- break;
- case RED_PIPE_ITEM_TYPE_MIGRATE:
- red_channel_client_send_migrate(rcc);
- break;
- case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
- red_channel_client_send_empty_msg(rcc, item);
- break;
- case RED_PIPE_ITEM_TYPE_PING:
- red_channel_client_send_ping(rcc);
- break;
- default:
- rcc->channel->channel_cbs.send_item(rcc, item);
- return;
- }
- free(item);
-}
-
-static void red_channel_client_release_item(RedChannelClient *rcc, RedPipeItem *item, int item_pushed)
-{
- switch (item->type) {
- case RED_PIPE_ITEM_TYPE_SET_ACK:
- case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
- case RED_PIPE_ITEM_TYPE_MIGRATE:
- case RED_PIPE_ITEM_TYPE_PING:
- free(item);
- break;
- default:
- rcc->channel->channel_cbs.release_item(rcc, item, item_pushed);
- }
-}
-
-static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
-{
- if (rcc->send_data.item) {
- red_channel_client_release_item(rcc,
- rcc->send_data.item, TRUE);
- rcc->send_data.item = NULL;
- }
-}
-
-static void red_channel_client_on_out_msg_done(void *opaque)
-{
- RedChannelClient *rcc = (RedChannelClient *)opaque;
- int fd;
-
- rcc->send_data.size = 0;
-
- if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) {
- if (reds_stream_send_msgfd(rcc->stream, fd) < 0) {
- perror("sendfd");
- red_channel_client_disconnect(rcc);
- if (fd != -1)
- close(fd);
- return;
- }
- if (fd != -1)
- close(fd);
- }
-
- red_channel_client_release_sent_item(rcc);
- if (rcc->send_data.blocked) {
- rcc->send_data.blocked = FALSE;
- rcc->channel->core->watch_update_mask(rcc->stream->watch,
- SPICE_WATCH_EVENT_READ);
- }
-
- if (red_channel_client_urgent_marshaller_is_active(rcc)) {
- red_channel_client_restore_main_sender(rcc);
- spice_assert(rcc->send_data.header.data != NULL);
- red_channel_client_begin_send_message(rcc);
- } else {
- if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) {
- /* It is possible that the socket will become idle, so we may be able to test latency */
- red_channel_client_restart_ping_timer(rcc);
- }
- }
-
-}
-
-static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item)
-{
- rcc->pipe_size--;
- ring_remove(&item->link);
-}
-
-static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
+void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
{
spice_assert(rcc);
channel->clients = g_list_append(channel->clients, rcc);
}
-static void red_channel_client_set_remote_caps(RedChannelClient* rcc,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps)
-{
- rcc->remote_caps.num_common_caps = num_common_caps;
- rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t));
-
- rcc->remote_caps.num_caps = num_caps;
- rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t));
-}
-
-static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc)
-{
- rcc->remote_caps.num_common_caps = 0;
- free(rcc->remote_caps.common_caps);
- rcc->remote_caps.num_caps = 0;
- free(rcc->remote_caps.caps);
-}
-
-int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap)
-{
- return test_capability(rcc->remote_caps.common_caps,
- rcc->remote_caps.num_common_caps,
- cap);
-}
-
-int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap)
-{
- return test_capability(rcc->remote_caps.caps,
- rcc->remote_caps.num_caps,
- cap);
-}
-
int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap)
{
GList *link;
@@ -709,231 +109,9 @@ int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap)
return TRUE;
}
-static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client)
-{
- if (red_client_get_channel(client, channel->type, channel->id)) {
- spice_printerr("Error client %p: duplicate channel type %d id %d",
- client, channel->type, channel->id);
- return FALSE;
- }
- return TRUE;
-}
-
-static void red_channel_client_push_ping(RedChannelClient *rcc)
-{
- spice_assert(rcc->latency_monitor.state == PING_STATE_NONE);
- rcc->latency_monitor.state = PING_STATE_WARMUP;
- rcc->latency_monitor.warmup_was_sent = FALSE;
- rcc->latency_monitor.id = rand();
- red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
- red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
-}
-
-static void red_channel_client_ping_timer(void *opaque)
-{
- RedChannelClient *rcc = opaque;
-
- spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER);
- red_channel_client_cancel_ping_timer(rcc);
-
-#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */
- {
- int so_unsent_size = 0;
-
- /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */
- if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) {
- spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno));
- }
- if (so_unsent_size > 0) {
- /* tcp snd buffer is still occupied. rescheduling ping */
- red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
- } else {
- red_channel_client_push_ping(rcc);
- }
- }
-#else /* ifdef HAVE_LINUX_SOCKIOS_H */
- /* More portable alternative code path (less accurate but avoids bogus ioctls)*/
- red_channel_client_push_ping(rcc);
-#endif /* ifdef HAVE_LINUX_SOCKIOS_H */
-}
-
-/*
- * When a connection is not alive (and we can't detect it via a socket error), we
- * reach one of these 2 states:
- * (1) Sending msgs is blocked: either writes return EAGAIN
- * or we are missing MSGC_ACK from the client.
- * (2) MSG_PING was sent without receiving a MSGC_PONG in reply.
- *
- * The connectivity_timer callback tests if the channel's state matches one of the above.
- * In case it does, on the next time the timer is called, it checks if the connection has
- * been idle during the time that passed since the previous timer call. If the connection
- * has been idle, we consider the client as disconnected.
- */
-static void red_channel_client_connectivity_timer(void *opaque)
-{
- RedChannelClient *rcc = opaque;
- RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor;
- int is_alive = TRUE;
-
- if (monitor->state == CONNECTIVITY_STATE_BLOCKED) {
- if (monitor->in_bytes == 0 && monitor->out_bytes == 0) {
- if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) {
- spice_error("mismatch between rcc-state and connectivity-state");
- }
- spice_debug("rcc is blocked; connection is idle");
- is_alive = FALSE;
- }
- } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) {
- if (monitor->in_bytes == 0) {
- if (rcc->latency_monitor.state != PING_STATE_WARMUP &&
- rcc->latency_monitor.state != PING_STATE_LATENCY) {
- spice_error("mismatch between rcc-state and connectivity-state");
- }
- spice_debug("rcc waits for pong; connection is idle");
- is_alive = FALSE;
- }
- }
-
- if (is_alive) {
- monitor->in_bytes = 0;
- monitor->out_bytes = 0;
- if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) {
- monitor->state = CONNECTIVITY_STATE_BLOCKED;
- } else if (rcc->latency_monitor.state == PING_STATE_WARMUP ||
- rcc->latency_monitor.state == PING_STATE_LATENCY) {
- monitor->state = CONNECTIVITY_STATE_WAIT_PONG;
- } else {
- monitor->state = CONNECTIVITY_STATE_CONNECTED;
- }
- rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
- rcc->connectivity_monitor.timeout);
- } else {
- monitor->state = CONNECTIVITY_STATE_DISCONNECTED;
- spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting",
- rcc, rcc->channel->type, rcc->channel->id, monitor->timeout);
- red_channel_client_disconnect(rcc);
- }
-}
-
-void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms)
-{
- if (!red_channel_client_is_connected(rcc)) {
- return;
- }
- spice_debug(NULL);
- spice_assert(timeout_ms > 0);
- /*
- * If latency_monitor is not active, we activate it in order to enable
- * periodic ping messages so that we will be be able to identify a disconnected
- * channel-client even if there are no ongoing channel specific messages
- * on this channel.
- */
- if (rcc->latency_monitor.timer == NULL) {
- rcc->latency_monitor.timer = rcc->channel->core->timer_add(
- rcc->channel->core, red_channel_client_ping_timer, rcc);
- if (!red_client_during_migrate_at_target(rcc->client)) {
- red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
- }
- rcc->latency_monitor.roundtrip = -1;
- }
- if (rcc->connectivity_monitor.timer == NULL) {
- rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED;
- rcc->connectivity_monitor.timer = rcc->channel->core->timer_add(
- rcc->channel->core, red_channel_client_connectivity_timer, rcc);
- rcc->connectivity_monitor.timeout = timeout_ms;
- if (!red_client_during_migrate_at_target(rcc->client)) {
- rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
- rcc->connectivity_monitor.timeout);
- }
- }
-}
-
-RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
- RedsStream *stream,
- int monitor_latency,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps)
-{
- RedChannelClient *rcc = NULL;
-
- pthread_mutex_lock(&client->lock);
- if (!red_channel_client_pre_create_validate(channel, client)) {
- goto error;
- }
- spice_assert(stream && channel && size >= sizeof(RedChannelClient));
- rcc = spice_malloc0(size);
- rcc->stream = stream;
- rcc->channel = channel;
- rcc->client = client;
- rcc->refs = 1;
- rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
- // block flags)
- rcc->ack_data.client_generation = ~0;
- rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
- rcc->send_data.main.marshaller = spice_marshaller_new();
- rcc->send_data.urgent.marshaller = spice_marshaller_new();
-
- rcc->send_data.marshaller = rcc->send_data.main.marshaller;
-
- rcc->incoming.opaque = rcc;
- rcc->incoming.cb = &channel->incoming_cb;
-
- rcc->outgoing.opaque = rcc;
- rcc->outgoing.cb = &channel->outgoing_cb;
- rcc->outgoing.pos = 0;
- rcc->outgoing.size = 0;
-
- red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
- if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
- rcc->incoming.header = mini_header_wrapper;
- rcc->send_data.header = mini_header_wrapper;
- rcc->is_mini_header = TRUE;
- } else {
- rcc->incoming.header = full_header_wrapper;
- rcc->send_data.header = full_header_wrapper;
- rcc->is_mini_header = FALSE;
- }
-
- rcc->incoming.header.data = rcc->incoming.header_buf;
- rcc->incoming.serial = 1;
-
- if (!channel->channel_cbs.config_socket(rcc)) {
- goto error;
- }
-
- ring_init(&rcc->pipe);
- rcc->pipe_size = 0;
-
- stream->watch = channel->core->watch_add(channel->core,
- stream->socket,
- SPICE_WATCH_EVENT_READ,
- red_channel_client_event, rcc);
- rcc->id = g_list_length(channel->clients);
- red_channel_add_client(channel, rcc);
- red_client_add_channel(client, rcc);
- red_channel_ref(channel);
- pthread_mutex_unlock(&client->lock);
-
- if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) {
- rcc->latency_monitor.timer = channel->core->timer_add(
- channel->core, red_channel_client_ping_timer, rcc);
- if (!client->during_target_migrate) {
- red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
- }
- rcc->latency_monitor.roundtrip = -1;
- }
-
- return rcc;
-error:
- free(rcc);
- reds_stream_free(stream);
- pthread_mutex_unlock(&client->lock);
- return NULL;
-}
-
/* returns TRUE If all channels are finished migrating, FALSE otherwise */
-static gboolean red_client_seamless_migration_done_for_channel(RedClient *client,
- RedChannelClient *rcc)
+gboolean red_client_seamless_migration_done_for_channel(RedClient *client,
+ RedChannelClient *rcc)
{
gboolean ret = FALSE;
@@ -955,26 +133,6 @@ static gboolean red_client_seamless_migration_done_for_channel(RedClient *client
return ret;
}
-static void red_channel_client_seamless_migration_done(RedChannelClient *rcc)
-{
- rcc->wait_migrate_data = FALSE;
-
- if (red_client_seamless_migration_done_for_channel(rcc->client, rcc)) {
- if (rcc->latency_monitor.timer) {
- red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
- }
- if (rcc->connectivity_monitor.timer) {
- rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
- rcc->connectivity_monitor.timeout);
- }
- }
-}
-
-int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc)
-{
- return rcc->wait_migrate_data;
-}
-
int red_channel_is_waiting_for_migrate_data(RedChannel *channel)
{
RedChannelClient *rcc;
@@ -1006,20 +164,6 @@ static void red_channel_client_default_disconnect(RedChannelClient *base)
red_channel_client_disconnect(base);
}
-void red_channel_client_default_migrate(RedChannelClient *rcc)
-{
- if (rcc->latency_monitor.timer) {
- red_channel_client_cancel_ping_timer(rcc);
- rcc->channel->core->timer_remove(rcc->latency_monitor.timer);
- rcc->latency_monitor.timer = NULL;
- }
- if (rcc->connectivity_monitor.timer) {
- rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer);
- rcc->connectivity_monitor.timer = NULL;
- }
- red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE);
-}
-
RedChannel *red_channel_create(int size,
RedsState *reds,
const SpiceCoreInterfaceInternal *core,
@@ -1055,9 +199,9 @@ RedChannel *red_channel_create(int size,
channel->incoming_cb.on_error =
(on_incoming_error_proc)red_channel_client_default_peer_on_error;
channel->incoming_cb.on_input = red_channel_client_on_input;
- channel->outgoing_cb.get_msg_size = red_channel_client_peer_get_out_msg_size;
- channel->outgoing_cb.prepare = red_channel_client_peer_prepare_out_msg;
- channel->outgoing_cb.on_block = red_channel_client_peer_on_out_block;
+ channel->outgoing_cb.get_msg_size = red_channel_client_get_out_msg_size;
+ channel->outgoing_cb.prepare = red_channel_client_prepare_out_msg;
+ channel->outgoing_cb.on_block = red_channel_client_on_out_block;
channel->outgoing_cb.on_error =
(on_outgoing_error_proc)red_channel_client_default_peer_on_error;
channel->outgoing_cb.on_msg_done = red_channel_client_on_out_msg_done;
@@ -1120,587 +264,155 @@ RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, u
red_channel_register_client_cbs(channel, &client_cbs, NULL);
red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER);
- channel->thread_id = pthread_self();
- spice_debug("channel type %d id %d thread_id 0x%lx",
- channel->type, channel->id, channel->thread_id);
-
- channel->out_bytes_counter = 0;
-
- return channel;
-}
-
-static int do_nothing_handle_message(RedChannelClient *rcc,
- uint16_t type,
- uint32_t size,
- uint8_t *msg)
-{
- return TRUE;
-}
-
-RedChannel *red_channel_create_parser(int size,
- RedsState *reds,
- const SpiceCoreInterfaceInternal *core,
- uint32_t type, uint32_t id,
- int handle_acks,
- spice_parse_channel_func_t parser,
- channel_handle_parsed_proc handle_parsed,
- const ChannelCbs *channel_cbs,
- uint32_t migration_flags)
-{
- RedChannel *channel = red_channel_create(size, reds, core, type, id,
- handle_acks,
- do_nothing_handle_message,
- channel_cbs,
- migration_flags);
-
- if (channel == NULL) {
- return NULL;
- }
- channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
- channel->incoming_cb.parser = parser;
-
- return channel;
-}
-
-void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat)
-{
- spice_return_if_fail(channel != NULL);
- spice_return_if_fail(channel->stat == 0);
-
-#ifdef RED_STATISTICS
- channel->stat = stat;
- channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE);
-#endif
-}
-
-void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data)
-{
- spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN);
- channel->client_cbs.connect = client_cbs->connect;
-
- if (client_cbs->disconnect) {
- channel->client_cbs.disconnect = client_cbs->disconnect;
- }
-
- if (client_cbs->migrate) {
- channel->client_cbs.migrate = client_cbs->migrate;
- }
- channel->data = cbs_data;
-}
-
-int test_capability(const uint32_t *caps, int num_caps, uint32_t cap)
-{
- uint32_t index = cap / 32;
- if (num_caps < index + 1) {
- return FALSE;
- }
-
- return (caps[index] & (1 << (cap % 32))) != 0;
-}
-
-static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap)
-{
- int nbefore, n;
-
- nbefore = *num_caps;
- n = cap / 32;
- *num_caps = MAX(*num_caps, n + 1);
- *caps = spice_renew(uint32_t, *caps, *num_caps);
- memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t));
- (*caps)[n] |= (1 << (cap % 32));
-}
-
-void red_channel_set_common_cap(RedChannel *channel, uint32_t cap)
-{
- add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap);
-}
-
-void red_channel_set_cap(RedChannel *channel, uint32_t cap)
-{
- add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap);
-}
-
-static void red_channel_ref(RedChannel *channel)
-{
- channel->refs++;
-}
-
-static void red_channel_unref(RedChannel *channel)
-{
- if (!--channel->refs) {
- if (channel->local_caps.num_common_caps) {
- free(channel->local_caps.common_caps);
- }
-
- if (channel->local_caps.num_caps) {
- free(channel->local_caps.caps);
- }
-
- free(channel);
- }
-}
-
-void red_channel_client_ref(RedChannelClient *rcc)
-{
- rcc->refs++;
-}
-
-void red_channel_client_unref(RedChannelClient *rcc)
-{
- if (!--rcc->refs) {
- spice_debug("destroy rcc=%p", rcc);
-
- reds_stream_free(rcc->stream);
- rcc->stream = NULL;
-
- if (rcc->send_data.main.marshaller) {
- spice_marshaller_destroy(rcc->send_data.main.marshaller);
- }
-
- if (rcc->send_data.urgent.marshaller) {
- spice_marshaller_destroy(rcc->send_data.urgent.marshaller);
- }
-
- red_channel_client_destroy_remote_caps(rcc);
- if (rcc->channel) {
- red_channel_unref(rcc->channel);
- }
- free(rcc);
- }
-}
-
-void red_channel_client_destroy(RedChannelClient *rcc)
-{
- rcc->destroying = 1;
- red_channel_client_disconnect(rcc);
- red_client_remove_channel(rcc);
- red_channel_client_unref(rcc);
-}
-
-void red_channel_destroy(RedChannel *channel)
-{
- if (!channel) {
- return;
- }
-
- g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL);
- red_channel_unref(channel);
-}
-
-void red_channel_client_shutdown(RedChannelClient *rcc)
-{
- if (rcc->stream && !rcc->stream->shutdown) {
- rcc->channel->core->watch_remove(rcc->stream->watch);
- rcc->stream->watch = NULL;
- shutdown(rcc->stream->socket, SHUT_RDWR);
- rcc->stream->shutdown = TRUE;
- }
-}
-
-void red_channel_client_send(RedChannelClient *rcc)
-{
- red_channel_client_ref(rcc);
- red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
- red_channel_client_unref(rcc);
-}
-
-void red_channel_send(RedChannel *channel)
-{
- g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL);
-}
-
-static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
-{
- return (rcc->channel->handle_acks &&
- (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
-}
-
-static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc)
-{
- RedPipeItem *item;
-
- if (!rcc || rcc->send_data.blocked
- || red_channel_client_waiting_for_ack(rcc)
- || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) {
- return NULL;
- }
- red_channel_client_pipe_remove(rcc, item);
- return item;
-}
-
-void red_channel_client_push(RedChannelClient *rcc)
-{
- RedPipeItem *pipe_item;
-
- if (!rcc->during_send) {
- rcc->during_send = TRUE;
- } else {
- return;
- }
- red_channel_client_ref(rcc);
- if (rcc->send_data.blocked) {
- red_channel_client_send(rcc);
- }
-
- if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) {
- rcc->send_data.blocked = TRUE;
- spice_printerr("ERROR: an item waiting to be sent and not blocked");
- }
-
- while ((pipe_item = red_channel_client_pipe_item_get(rcc))) {
- red_channel_client_send_item(rcc, pipe_item);
- }
- if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe)
- && rcc->stream->watch) {
- rcc->channel->core->watch_update_mask(rcc->stream->watch,
- SPICE_WATCH_EVENT_READ);
- }
- rcc->during_send = FALSE;
- red_channel_client_unref(rcc);
-}
-
-void red_channel_push(RedChannel *channel)
-{
- if (!channel) {
- return;
- }
-
- g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL);
-}
-
-int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc)
-{
- if (rcc->latency_monitor.roundtrip < 0) {
- return rcc->latency_monitor.roundtrip;
- }
- return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC;
-}
-
-static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
-{
- rcc->ack_data.messages_window = 0;
- red_channel_client_push(rcc);
-}
-
-// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
-// specific
-void red_channel_init_outgoing_messages_window(RedChannel *channel)
-{
- g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL);
-}
-
-static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
-{
- if (rcc->channel->channel_cbs.handle_migrate_flush_mark) {
- rcc->channel->channel_cbs.handle_migrate_flush_mark(rcc);
- }
-}
-
-// TODO: the whole migration is broken with multiple clients. What do we want to do?
-// basically just
-// 1) source send mark to all
-// 2) source gets at various times the data (waits for all)
-// 3) source migrates to target
-// 4) target sends data to all
-// So need to make all the handlers work with per channel/client data (what data exactly?)
-static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
-{
- spice_debug("channel type %d id %d rcc %p size %u",
- rcc->channel->type, rcc->channel->id, rcc, size);
- if (!rcc->channel->channel_cbs.handle_migrate_data) {
- return;
- }
- if (!red_channel_client_is_waiting_for_migrate_data(rcc)) {
- spice_channel_client_error(rcc, "unexpected");
- return;
- }
- if (rcc->channel->channel_cbs.handle_migrate_data_get_serial) {
- red_channel_client_set_message_serial(rcc,
- rcc->channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message));
- }
- if (!rcc->channel->channel_cbs.handle_migrate_data(rcc, size, message)) {
- spice_channel_client_error(rcc, "handle_migrate_data failed");
- return;
- }
- red_channel_client_seamless_migration_done(rcc);
-}
-
-static void red_channel_client_restart_ping_timer(RedChannelClient *rcc)
-{
- uint64_t passed, timeout;
-
- passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC;
- timeout = PING_TEST_IDLE_NET_TIMEOUT_MS;
- if (passed < PING_TEST_TIMEOUT_MS) {
- timeout += PING_TEST_TIMEOUT_MS - passed;
- }
-
- red_channel_client_start_ping_timer(rcc, timeout);
-}
-
-static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout)
-{
- if (!rcc->latency_monitor.timer) {
- return;
- }
- if (rcc->latency_monitor.state != PING_STATE_NONE) {
- return;
- }
- rcc->latency_monitor.state = PING_STATE_TIMER;
- rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout);
-}
-
-static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc)
-{
- if (!rcc->latency_monitor.timer) {
- return;
- }
- if (rcc->latency_monitor.state != PING_STATE_TIMER) {
- return;
- }
-
- rcc->channel->core->timer_cancel(rcc->latency_monitor.timer);
- rcc->latency_monitor.state = PING_STATE_NONE;
-}
-
-static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping)
-{
- uint64_t now;
-
- /* ignoring unexpected pongs, or post-migration pongs for pings that
- * started just before migration */
- if (ping->id != rcc->latency_monitor.id) {
- spice_warning("ping-id (%u)!= pong-id %u",
- rcc->latency_monitor.id, ping->id);
- return;
- }
-
- now = spice_get_monotonic_time_ns();
-
- if (rcc->latency_monitor.state == PING_STATE_WARMUP) {
- rcc->latency_monitor.state = PING_STATE_LATENCY;
- return;
- } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) {
- spice_warning("unexpected");
- return;
- }
-
- /* set TCP_NODELAY=0, in case we reverted it for the test*/
- if (!rcc->latency_monitor.tcp_nodelay) {
- int delay_val = 0;
-
- if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
- sizeof(delay_val)) == -1) {
- if (errno != ENOTSUP) {
- spice_warning("setsockopt failed, %s", strerror(errno));
- }
- }
- }
-
- /*
- * The real network latency shouldn't change during the connection. However,
- * the measurements can be bigger than the real roundtrip due to other
- * threads or processes that are utilizing the network. We update the roundtrip
- * measurement with the minimal value we encountered till now.
- */
- if (rcc->latency_monitor.roundtrip < 0 ||
- now - ping->timestamp < rcc->latency_monitor.roundtrip) {
- rcc->latency_monitor.roundtrip = now - ping->timestamp;
- spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC);
- }
+ channel->thread_id = pthread_self();
+ spice_debug("channel type %d id %d thread_id 0x%lx",
+ channel->type, channel->id, channel->thread_id);
- rcc->latency_monitor.last_pong_time = now;
- rcc->latency_monitor.state = PING_STATE_NONE;
- red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS);
+ channel->out_bytes_counter = 0;
+
+ return channel;
}
-int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
- uint16_t type, void *message)
+static int do_nothing_handle_message(RedChannelClient *rcc,
+ uint16_t type,
+ uint32_t size,
+ uint8_t *msg)
{
- switch (type) {
- case SPICE_MSGC_ACK_SYNC:
- if (size != sizeof(uint32_t)) {
- spice_printerr("bad message size");
- return FALSE;
- }
- rcc->ack_data.client_generation = *(uint32_t *)(message);
- break;
- case SPICE_MSGC_ACK:
- if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
- rcc->ack_data.messages_window -= rcc->ack_data.client_window;
- red_channel_client_push(rcc);
- }
- break;
- case SPICE_MSGC_DISCONNECTING:
- break;
- case SPICE_MSGC_MIGRATE_FLUSH_MARK:
- if (!rcc->wait_migrate_flush_mark) {
- spice_error("unexpected flush mark");
- return FALSE;
- }
- red_channel_handle_migrate_flush_mark(rcc);
- rcc->wait_migrate_flush_mark = FALSE;
- break;
- case SPICE_MSGC_MIGRATE_DATA:
- red_channel_handle_migrate_data(rcc, size, message);
- break;
- case SPICE_MSGC_PONG:
- red_channel_client_handle_pong(rcc, message);
- break;
- default:
- spice_printerr("invalid message type %u", type);
- return FALSE;
- }
return TRUE;
}
-static void red_channel_client_event(int fd, int event, void *data)
+RedChannel *red_channel_create_parser(int size,
+ RedsState *reds,
+ const SpiceCoreInterfaceInternal *core,
+ uint32_t type, uint32_t id,
+ int handle_acks,
+ spice_parse_channel_func_t parser,
+ channel_handle_parsed_proc handle_parsed,
+ const ChannelCbs *channel_cbs,
+ uint32_t migration_flags)
{
- RedChannelClient *rcc = (RedChannelClient *)data;
+ RedChannel *channel = red_channel_create(size, reds, core, type, id,
+ handle_acks,
+ do_nothing_handle_message,
+ channel_cbs,
+ migration_flags);
- red_channel_client_ref(rcc);
- if (event & SPICE_WATCH_EVENT_READ) {
- red_channel_client_receive(rcc);
- }
- if (event & SPICE_WATCH_EVENT_WRITE) {
- red_channel_client_push(rcc);
+ if (channel == NULL) {
+ return NULL;
}
- red_channel_client_unref(rcc);
+ channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
+ channel->incoming_cb.parser = parser;
+
+ return channel;
}
-void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item)
+void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat)
{
- spice_assert(red_channel_client_no_item_being_sent(rcc));
- spice_assert(msg_type != 0);
- rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type);
- rcc->send_data.item = item;
- if (item) {
- rcc->channel->channel_cbs.hold_item(rcc, item);
- }
+ spice_return_if_fail(channel != NULL);
+ spice_return_if_fail(channel->stat == 0);
+
+#ifdef RED_STATISTICS
+ channel->stat = stat;
+ channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE);
+#endif
}
-void red_channel_client_begin_send_message(RedChannelClient *rcc)
+void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data)
{
- SpiceMarshaller *m = rcc->send_data.marshaller;
+ spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN);
+ channel->client_cbs.connect = client_cbs->connect;
- // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
- if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) {
- spice_printerr("BUG: header->type == 0");
- return;
+ if (client_cbs->disconnect) {
+ channel->client_cbs.disconnect = client_cbs->disconnect;
}
- /* canceling the latency test timer till the nework is idle */
- red_channel_client_cancel_ping_timer(rcc);
-
- spice_marshaller_flush(m);
- rcc->send_data.size = spice_marshaller_get_total_size(m);
- rcc->send_data.header.set_msg_size(&rcc->send_data.header,
- rcc->send_data.size - rcc->send_data.header.header_size);
- rcc->ack_data.messages_window++;
- rcc->send_data.last_sent_serial = rcc->send_data.serial;
- rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */
- red_channel_client_send(rcc);
+ if (client_cbs->migrate) {
+ channel->client_cbs.migrate = client_cbs->migrate;
+ }
+ channel->data = cbs_data;
}
-SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc)
+int test_capability(const uint32_t *caps, int num_caps, uint32_t cap)
{
- spice_assert(red_channel_client_no_item_being_sent(rcc));
- spice_assert(rcc->send_data.header.data != NULL);
- rcc->send_data.main.header_data = rcc->send_data.header.data;
- rcc->send_data.main.item = rcc->send_data.item;
-
- rcc->send_data.marshaller = rcc->send_data.urgent.marshaller;
- rcc->send_data.item = NULL;
- red_channel_client_reset_send_data(rcc);
- return rcc->send_data.marshaller;
+ uint32_t index = cap / 32;
+ if (num_caps < index + 1) {
+ return FALSE;
+ }
+
+ return (caps[index] & (1 << (cap % 32))) != 0;
}
-static void red_channel_client_restore_main_sender(RedChannelClient *rcc)
+static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap)
{
- spice_marshaller_reset(rcc->send_data.urgent.marshaller);
- rcc->send_data.marshaller = rcc->send_data.main.marshaller;
- rcc->send_data.header.data = rcc->send_data.main.header_data;
- if (!rcc->is_mini_header) {
- rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
- }
- rcc->send_data.item = rcc->send_data.main.item;
+ int nbefore, n;
+
+ nbefore = *num_caps;
+ n = cap / 32;
+ *num_caps = MAX(*num_caps, n + 1);
+ *caps = spice_renew(uint32_t, *caps, *num_caps);
+ memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t));
+ (*caps)[n] |= (1 << (cap % 32));
}
-uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
+void red_channel_set_common_cap(RedChannel *channel, uint32_t cap)
{
- return rcc->send_data.serial;
+ add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap);
}
-void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
+void red_channel_set_cap(RedChannel *channel, uint32_t cap)
{
- rcc->send_data.last_sent_serial = serial;
- rcc->send_data.serial = serial;
+ add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap);
}
-static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos)
+void red_channel_ref(RedChannel *channel)
{
- spice_assert(rcc && item);
- if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) {
- spice_debug("rcc is disconnected %p", rcc);
- red_channel_client_release_item(rcc, item, FALSE);
- return FALSE;
- }
- if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) {
- rcc->channel->core->watch_update_mask(rcc->stream->watch,
- SPICE_WATCH_EVENT_READ |
- SPICE_WATCH_EVENT_WRITE);
- }
- rcc->pipe_size++;
- ring_add(pos, &item->link);
- return TRUE;
+ channel->refs++;
}
-void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item)
+void red_channel_unref(RedChannel *channel)
{
+ if (--channel->refs == 0) {
+ if (channel->local_caps.num_common_caps) {
+ free(channel->local_caps.common_caps);
+ }
- client_pipe_add(rcc, item, &rcc->pipe);
-}
+ if (channel->local_caps.num_caps) {
+ free(channel->local_caps.caps);
+ }
-void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item)
-{
- red_channel_client_pipe_add(rcc, item);
- red_channel_client_push(rcc);
+ free(channel);
+ }
}
-void red_channel_client_pipe_add_after(RedChannelClient *rcc,
- RedPipeItem *item,
- RedPipeItem *pos)
+void red_channel_destroy(RedChannel *channel)
{
- spice_assert(pos);
- client_pipe_add(rcc, item, &pos->link);
-}
+ if (!channel) {
+ return;
+ }
-int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc,
- RedPipeItem *item)
-{
- return ring_item_is_linked(&item->link);
+ g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL);
+ red_channel_unref(channel);
}
-void red_channel_client_pipe_add_tail(RedChannelClient *rcc,
- RedPipeItem *item)
+void red_channel_send(RedChannel *channel)
{
- client_pipe_add(rcc, item, rcc->pipe.prev);
+ g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL);
}
-void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item)
+void red_channel_push(RedChannel *channel)
{
- if (client_pipe_add(rcc, item, rcc->pipe.prev)) {
- red_channel_client_push(rcc);
+ if (!channel) {
+ return;
}
+
+ g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL);
}
-void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type)
+// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
+// specific
+void red_channel_init_outgoing_messages_window(RedChannel *channel)
{
- RedPipeItem *item = spice_new(RedPipeItem, 1);
-
- red_pipe_item_init(item, pipe_item_type);
- red_channel_client_pipe_add(rcc, item);
- red_channel_client_push(rcc);
+ g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL);
}
static void red_channel_client_pipe_add_type_proxy(gpointer data, gpointer user_data)
@@ -1715,16 +427,6 @@ void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type)
GINT_TO_POINTER(pipe_item_type));
}
-void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type)
-{
- RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1);
-
- red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG);
- item->msg = msg_type;
- red_channel_client_pipe_add(rcc, &item->base);
- red_channel_client_push(rcc);
-}
-
static void red_channel_client_pipe_add_empty_msg_proxy(gpointer data, gpointer user_data)
{
int type = GPOINTER_TO_INT(user_data);
@@ -1736,120 +438,38 @@ void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type)
g_list_foreach(channel->clients, red_channel_client_pipe_add_empty_msg_proxy, GINT_TO_POINTER(msg_type));
}
-int red_channel_client_is_connected(RedChannelClient *rcc)
-{
- if (!rcc->dummy) {
- return rcc->channel
- && (g_list_find(rcc->channel->clients, rcc) != NULL);
- } else {
- return rcc->dummy_connected;
- }
-}
-
int red_channel_is_connected(RedChannel *channel)
{
return channel && channel->clients;
}
-void red_channel_client_clear_sent_item(RedChannelClient *rcc)
-{
- if (rcc->send_data.item) {
- red_channel_client_release_item(rcc, rcc->send_data.item, TRUE);
- rcc->send_data.item = NULL;
- }
- rcc->send_data.blocked = FALSE;
- rcc->send_data.size = 0;
-}
-
-void red_channel_client_pipe_clear(RedChannelClient *rcc)
-{
- RedPipeItem *item;
-
- if (rcc) {
- red_channel_client_clear_sent_item(rcc);
- }
- while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) {
- ring_remove(&item->link);
- red_channel_client_release_item(rcc, item, FALSE);
- }
- rcc->pipe_size = 0;
-}
-
-void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
-{
- rcc->ack_data.messages_window = 0;
-}
-
-void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
-{
- rcc->ack_data.client_window = client_window;
-}
-
-static void red_channel_remove_client(RedChannelClient *rcc)
+void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc)
{
GList *link;
+ g_return_if_fail(channel == red_channel_client_get_channel(rcc));
- if (!pthread_equal(pthread_self(), rcc->channel->thread_id)) {
+ if (!pthread_equal(pthread_self(), channel->thread_id)) {
spice_warning("channel type %d id %d - "
"channel->thread_id (0x%lx) != pthread_self (0x%lx)."
"If one of the threads is != io-thread && != vcpu-thread, "
"this might be a BUG",
- rcc->channel->type, rcc->channel->id,
- rcc->channel->thread_id, pthread_self());
+ channel->type, channel->id,
+ channel->thread_id, pthread_self());
}
- spice_return_if_fail(rcc->channel);
- link = g_list_find(rcc->channel->clients, rcc);
+ spice_return_if_fail(channel);
+ link = g_list_find(channel->clients, rcc);
spice_return_if_fail(link != NULL);
- rcc->channel->clients = g_list_remove_link(rcc->channel->clients, link);
+ channel->clients = g_list_remove_link(channel->clients, link);
// TODO: should we set rcc->channel to NULL???
}
-static void red_client_remove_channel(RedChannelClient *rcc)
-{
- pthread_mutex_lock(&rcc->client->lock);
- rcc->client->channels = g_list_remove(rcc->client->channels, rcc);
- pthread_mutex_unlock(&rcc->client->lock);
-}
-
-static void red_channel_client_disconnect_dummy(RedChannelClient *rcc)
-{
- GList *link;
- spice_assert(rcc->dummy);
- if (rcc->channel && (link = g_list_find(rcc->channel->clients, rcc))) {
- spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel,
- rcc->channel->type, rcc->channel->id);
- red_channel_remove_client(link->data);
- }
- rcc->dummy_connected = FALSE;
-}
-
-void red_channel_client_disconnect(RedChannelClient *rcc)
+void red_client_remove_channel(RedChannelClient *rcc)
{
- if (rcc->dummy) {
- red_channel_client_disconnect_dummy(rcc);
- return;
- }
- if (!red_channel_client_is_connected(rcc)) {
- return;
- }
- spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel,
- rcc->channel->type, rcc->channel->id);
- red_channel_client_pipe_clear(rcc);
- if (rcc->stream->watch) {
- rcc->channel->core->watch_remove(rcc->stream->watch);
- rcc->stream->watch = NULL;
- }
- if (rcc->latency_monitor.timer) {
- rcc->channel->core->timer_remove(rcc->latency_monitor.timer);
- rcc->latency_monitor.timer = NULL;
- }
- if (rcc->connectivity_monitor.timer) {
- rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer);
- rcc->connectivity_monitor.timer = NULL;
- }
- red_channel_remove_client(rcc);
- rcc->channel->channel_cbs.on_disconnect(rcc);
+ RedClient *client = red_channel_client_get_client(rcc);
+ pthread_mutex_lock(&client->lock);
+ client->channels = g_list_remove(client->channels, rcc);
+ pthread_mutex_unlock(&client->lock);
}
void red_channel_disconnect(RedChannel *channel)
@@ -1857,51 +477,6 @@ void red_channel_disconnect(RedChannel *channel)
g_list_foreach(channel->clients, (GFunc)red_channel_client_disconnect, NULL);
}
-RedChannelClient *red_channel_client_create_dummy(int size,
- RedChannel *channel,
- RedClient *client,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps)
-{
- RedChannelClient *rcc = NULL;
-
- spice_assert(size >= sizeof(RedChannelClient));
-
- pthread_mutex_lock(&client->lock);
- if (!red_channel_client_pre_create_validate(channel, client)) {
- goto error;
- }
- rcc = spice_malloc0(size);
- rcc->refs = 1;
- rcc->client = client;
- rcc->channel = channel;
- red_channel_ref(channel);
- red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
- if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
- rcc->incoming.header = mini_header_wrapper;
- rcc->send_data.header = mini_header_wrapper;
- rcc->is_mini_header = TRUE;
- } else {
- rcc->incoming.header = full_header_wrapper;
- rcc->send_data.header = full_header_wrapper;
- rcc->is_mini_header = FALSE;
- }
-
- rcc->incoming.header.data = rcc->incoming.header_buf;
- rcc->incoming.serial = 1;
- ring_init(&rcc->pipe);
-
- rcc->dummy = TRUE;
- rcc->dummy_connected = TRUE;
- red_channel_add_client(channel, rcc);
- red_client_add_channel(client, rcc);
- pthread_mutex_unlock(&client->lock);
- return rcc;
-error:
- pthread_mutex_unlock(&client->lock);
- return NULL;
-}
-
void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb)
{
g_list_foreach(channel->clients, (GFunc)cb, NULL);
@@ -1922,7 +497,7 @@ int red_channel_all_blocked(RedChannel *channel)
}
for (link = channel->clients; link != NULL; link = link->next) {
rcc = link->data;
- if (!rcc->send_data.blocked) {
+ if (!red_channel_client_is_blocked(rcc)) {
return FALSE;
}
}
@@ -1936,56 +511,25 @@ int red_channel_any_blocked(RedChannel *channel)
for (link = channel->clients; link != NULL; link = link->next) {
rcc = link->data;
- if (rcc->send_data.blocked) {
+ if (red_channel_client_is_blocked(rcc)) {
return TRUE;
}
}
return FALSE;
}
-int red_channel_client_blocked(RedChannelClient *rcc)
-{
- return rcc && rcc->send_data.blocked;
-}
-
-int red_channel_client_send_message_pending(RedChannelClient *rcc)
-{
- return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0;
-}
-
-/* accessors for RedChannelClient */
-SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
-{
- return rcc->send_data.marshaller;
-}
-
-RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
-{
- return rcc->stream;
-}
-
-RedClient *red_channel_client_get_client(RedChannelClient *rcc)
-{
- return rcc->client;
-}
-
-void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list)
-{
- rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list);
-}
-
-/* end of accessors */
-
int red_channel_get_first_socket(RedChannel *channel)
{
RedChannelClient *rcc;
+ RedsStream *stream;
if (!channel || !channel->clients) {
return -1;
}
rcc = channel->clients->data;
+ stream = red_channel_client_get_stream(rcc);
- return rcc->stream->socket;
+ return stream->socket;
}
int red_channel_no_item_being_sent(RedChannel *channel)
@@ -2002,18 +546,6 @@ int red_channel_no_item_being_sent(RedChannel *channel)
return TRUE;
}
-int red_channel_client_no_item_being_sent(RedChannelClient *rcc)
-{
- return !rcc || (rcc->send_data.size == 0);
-}
-
-void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
- RedPipeItem *item)
-{
- red_channel_client_pipe_remove(rcc, item);
- red_channel_client_release_item(rcc, item, FALSE);
-}
-
/*
* RedClient implementation - kept in red-channel.c because they are
* pretty tied together.
@@ -2051,21 +583,6 @@ RedClient *red_client_unref(RedClient *client)
return client;
}
-/* client mutex should be locked before this call */
-static gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc)
-{
- gboolean ret = FALSE;
-
- if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) {
- rcc->wait_migrate_data = TRUE;
- ret = TRUE;
- }
- spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc,
- rcc->wait_migrate_data);
-
- return ret;
-}
-
void red_client_set_migration_seamless(RedClient *client) // dest
{
GList *link;
@@ -2085,6 +602,7 @@ void red_client_migrate(RedClient *client)
{
GList *link, *next;
RedChannelClient *rcc;
+ RedChannel *channel;
spice_printerr("migrate client with #channels %d", g_list_length(client->channels));
if (!pthread_equal(pthread_self(), client->thread_id)) {
@@ -2097,8 +615,9 @@ void red_client_migrate(RedClient *client)
while (link) {
next = link->next;
rcc = link->data;
+ channel = red_channel_client_get_channel(rcc);
if (red_channel_client_is_connected(rcc)) {
- rcc->channel->client_cbs.migrate(rcc);
+ channel->client_cbs.migrate(rcc);
}
link = next;
}
@@ -2119,20 +638,21 @@ void red_client_destroy(RedClient *client)
}
link = client->channels;
while (link) {
+ RedChannel *channel;
next = link->next;
// some channels may be in other threads, so disconnection
// is not synchronous.
rcc = link->data;
- rcc->destroying = 1;
+ channel = red_channel_client_get_channel(rcc);
+ red_channel_client_set_destroying(rcc);
// some channels may be in other threads. However we currently
// assume disconnect is synchronous (we changed the dispatcher
// to wait for disconnection)
// TODO: should we go back to async. For this we need to use
// ref count for channel clients.
- rcc->channel->client_cbs.disconnect(rcc);
- spice_assert(ring_is_empty(&rcc->pipe));
- spice_assert(rcc->pipe_size == 0);
- spice_assert(rcc->send_data.size == 0);
+ channel->client_cbs.disconnect(rcc);
+ spice_assert(red_channel_client_pipe_is_empty(rcc));
+ spice_assert(red_channel_client_no_item_being_sent(rcc));
red_channel_client_destroy(rcc);
link = next;
}
@@ -2140,15 +660,17 @@ void red_client_destroy(RedClient *client)
}
/* client->lock should be locked */
-static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id)
+RedChannelClient *red_client_get_channel(RedClient *client, int type, int id)
{
GList *link;
RedChannelClient *rcc;
RedChannelClient *ret = NULL;
for (link = client->channels; link != NULL; link = link->next) {
+ RedChannel *channel;
rcc = link->data;
- if (rcc->channel->type == type && rcc->channel->id == id) {
+ channel = red_channel_client_get_channel(rcc);
+ if (channel->type == type && channel->id == id) {
ret = rcc;
break;
}
@@ -2157,7 +679,7 @@ static RedChannelClient *red_client_get_channel(RedClient *client, int type, int
}
/* client->lock should be locked */
-static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
+void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
{
spice_assert(rcc && client);
client->channels = g_list_append(client->channels, rcc);
@@ -2189,11 +711,7 @@ void red_client_semi_seamless_migrate_complete(RedClient *client)
link = client->channels;
while (link) {
next = link->next;
- RedChannelClient *rcc = link->data;
-
- if (rcc->latency_monitor.timer) {
- red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
- }
+ red_channel_client_semi_seamless_migration_complete(link->data);
link = next;
}
pthread_mutex_unlock(&client->lock);
@@ -2289,8 +807,10 @@ uint32_t red_channel_max_pipe_size(RedChannel *channel)
uint32_t pipe_size = 0;
for (link = channel->clients; link != NULL; link = link->next) {
+ uint32_t new_size;
rcc = link->data;
- pipe_size = MAX(pipe_size, rcc->pipe_size);
+ new_size = red_channel_client_get_pipe_size(rcc);
+ pipe_size = MAX(pipe_size, new_size);
}
return pipe_size;
}
@@ -2302,8 +822,10 @@ uint32_t red_channel_min_pipe_size(RedChannel *channel)
uint32_t pipe_size = ~0;
for (link = channel->clients; link != NULL; link = link->next) {
+ uint32_t new_size;
rcc = link->data;
- pipe_size = MIN(pipe_size, rcc->pipe_size);
+ new_size = red_channel_client_get_pipe_size(rcc);
+ pipe_size = MIN(pipe_size, new_size);
}
return pipe_size == ~0 ? 0 : pipe_size;
}
@@ -2316,85 +838,11 @@ uint32_t red_channel_sum_pipes_size(RedChannel *channel)
for (link = channel->clients; link != NULL; link = link->next) {
rcc = link->data;
- sum += rcc->pipe_size;
+ sum += red_channel_client_get_pipe_size(rcc);
}
return sum;
}
-int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
- int64_t timeout)
-{
- uint64_t end_time;
- int blocked;
-
- if (!red_channel_client_blocked(rcc)) {
- return TRUE;
- }
- if (timeout != -1) {
- end_time = spice_get_monotonic_time_ns() + timeout;
- } else {
- end_time = UINT64_MAX;
- }
- spice_info("blocked");
-
- do {
- usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
- red_channel_client_receive(rcc);
- red_channel_client_send(rcc);
- } while ((blocked = red_channel_client_blocked(rcc)) &&
- (timeout == -1 || spice_get_monotonic_time_ns() < end_time));
-
- if (blocked) {
- spice_warning("timeout");
- return FALSE;
- } else {
- spice_assert(red_channel_client_no_item_being_sent(rcc));
- return TRUE;
- }
-}
-
-/* TODO: more evil sync stuff. anything with the word wait in it's name. */
-int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
- RedPipeItem *item,
- int64_t timeout)
-{
- uint64_t end_time;
- int item_in_pipe;
-
- spice_info(NULL);
-
- if (timeout != -1) {
- end_time = spice_get_monotonic_time_ns() + timeout;
- } else {
- end_time = UINT64_MAX;
- }
-
- rcc->channel->channel_cbs.hold_item(rcc, item);
-
- if (red_channel_client_blocked(rcc)) {
- red_channel_client_receive(rcc);
- red_channel_client_send(rcc);
- }
- red_channel_client_push(rcc);
-
- while((item_in_pipe = ring_item_is_linked(&item->link)) &&
- (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) {
- usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
- red_channel_client_receive(rcc);
- red_channel_client_send(rcc);
- red_channel_client_push(rcc);
- }
-
- red_channel_client_release_item(rcc, item, TRUE);
- if (item_in_pipe) {
- spice_warning("timeout");
- return FALSE;
- } else {
- return red_channel_client_wait_outgoing_item(rcc,
- timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns());
- }
-}
-
int red_channel_wait_all_sent(RedChannel *channel,
int64_t timeout)
{
@@ -2429,15 +877,6 @@ int red_channel_wait_all_sent(RedChannel *channel,
}
}
-void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc)
-{
- if (red_channel_client_blocked(rcc) || rcc->pipe_size > 0) {
- red_channel_client_disconnect(rcc);
- } else {
- spice_assert(red_channel_client_no_item_being_sent(rcc));
- }
-}
-
RedsState* red_channel_get_server(RedChannel *channel)
{
return channel->reds;
diff --git a/server/red-channel.h b/server/red-channel.h
index 7b4ec75..af6be18 100644
--- a/server/red-channel.h
+++ b/server/red-channel.h
@@ -34,6 +34,7 @@
#include "reds-stream.h"
#include "stat.h"
#include "red-pipe-item.h"
+#include "red-channel-client.h"
#define MAX_SEND_BUFS 1000
#define CLIENT_ACK_WINDOW 20
@@ -130,7 +131,6 @@ typedef struct OutgoingHandler {
/* Red Channel interface */
typedef struct RedChannel RedChannel;
-typedef struct RedChannelClient RedChannelClient;
typedef struct RedClient RedClient;
typedef struct MainChannelClient MainChannelClient;
@@ -236,62 +236,6 @@ typedef struct RedChannelClientConnectivityMonitor {
SpiceTimer *timer;
} RedChannelClientConnectivityMonitor;
-struct RedChannelClient {
- RedChannel *channel;
- RedClient *client;
- RedsStream *stream;
- int dummy;
- int dummy_connected;
-
- uint32_t refs;
-
- struct {
- uint32_t generation;
- uint32_t client_generation;
- uint32_t messages_window;
- uint32_t client_window;
- } ack_data;
-
- struct {
- SpiceMarshaller *marshaller;
- SpiceDataHeaderOpaque header;
- uint32_t size;
- RedPipeItem *item;
- int blocked;
- uint64_t serial;
- uint64_t last_sent_serial;
-
- struct {
- SpiceMarshaller *marshaller;
- uint8_t *header_data;
- RedPipeItem *item;
- } main;
-
- struct {
- SpiceMarshaller *marshaller;
- } urgent;
- } send_data;
-
- OutgoingHandler outgoing;
- IncomingHandler incoming;
- int during_send;
- int id; // debugging purposes
- Ring pipe;
- uint32_t pipe_size;
-
- RedChannelCapabilities remote_caps;
- int is_mini_header;
- int destroying;
-
- int wait_migrate_data;
- int wait_migrate_flush_mark;
-
- RedChannelClientLatencyMonitor latency_monitor;
- RedChannelClientConnectivityMonitor connectivity_monitor;
-};
-
-#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client))
-
struct RedChannel {
uint32_t type;
uint32_t id;
@@ -335,17 +279,6 @@ struct RedChannel {
#define RED_CHANNEL(Channel) ((RedChannel *)(Channel))
-/*
- * When an error occurs over a channel, we treat it as a warning
- * for spice-server and shutdown the channel.
- */
-#define spice_channel_client_error(rcc, format, ...) \
- do { \
- spice_warning("rcc %p type %u id %u: " format, rcc, \
- rcc->channel->type, rcc->channel->id, ## __VA_ARGS__); \
- red_channel_client_shutdown(rcc); \
- } while (0)
-
/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
* explicitly destroy the channel */
RedChannel *red_channel_create(int size,
@@ -368,6 +301,11 @@ RedChannel *red_channel_create_parser(int size,
channel_handle_parsed_proc handle_parsed,
const ChannelCbs *channel_cbs,
uint32_t migration_flags);
+void red_channel_ref(RedChannel *channel);
+void red_channel_unref(RedChannel *channel);
+void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc);
+void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc);
+
void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat);
void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data);
@@ -375,26 +313,13 @@ void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *clien
void red_channel_set_common_cap(RedChannel *channel, uint32_t cap);
void red_channel_set_cap(RedChannel *channel, uint32_t cap);
-RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
- RedsStream *stream,
- int monitor_latency,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps);
// TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but
// do use the client callbacks. So the channel clients are not connected (the channel doesn't
// have list of them, but they do have a link to the channel, and the client has a list of them)
RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, uint32_t id);
-RedChannelClient *red_channel_client_create_dummy(int size,
- RedChannel *channel,
- RedClient *client,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps);
int red_channel_is_connected(RedChannel *channel);
-int red_channel_client_is_connected(RedChannelClient *rcc);
-void red_channel_client_default_migrate(RedChannelClient *rcc);
-int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc);
/* seamless migration is supported for only one client. This routine
* checks if the only channel client associated with channel is
* waiting for migration data */
@@ -407,61 +332,15 @@ int red_channel_is_waiting_for_migrate_data(RedChannel *channel);
* from red_client_destroy.
*/
-void red_channel_client_destroy(RedChannelClient *rcc);
void red_channel_destroy(RedChannel *channel);
-void red_channel_client_ref(RedChannelClient *rcc);
-void red_channel_client_unref(RedChannelClient *rcc);
-
-int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap);
-int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap);
/* return true if all the channel clients support the cap */
int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap);
int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap);
-/* shutdown is the only safe thing to do out of the client/channel
- * thread. It will not touch the rings, just shutdown the socket.
- * It should be followed by some way to gurantee a disconnection. */
-void red_channel_client_shutdown(RedChannelClient *rcc);
-
/* should be called when a new channel is ready to send messages */
void red_channel_init_outgoing_messages_window(RedChannel *channel);
-/* handles general channel msgs from the client */
-int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
- uint16_t type, void *message);
-
-/* when preparing send_data: should call init and then use marshaller */
-void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item);
-
-uint64_t red_channel_client_get_message_serial(RedChannelClient *channel);
-void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
-
-/* When sending a msg. Should first call red_channel_client_begin_send_message.
- * It will first send the pending urgent data, if there is any, and then
- * the rest of the data.
- */
-void red_channel_client_begin_send_message(RedChannelClient *rcc);
-
-/*
- * Stores the current send data, and switches to urgent send data.
- * When it begins the actual send, it will send first the urgent data
- * and afterward the rest of the data.
- * Should be called only if during the marshalling of on message,
- * the need to send another message, before, rises.
- * Important: the serial of the non-urgent sent data, will be succeeded.
- * return: the urgent send data marshaller
- */
-SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc);
-
-/* returns -1 if we don't have an estimation */
-int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc);
-
-/*
- * Checks periodically if the connection is still alive
- */
-void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms);
-
// TODO: add back the channel_pipe_add functionality - by adding reference counting
// to the RedPipeItem.
@@ -471,23 +350,10 @@ int red_channel_pipes_new_add_push(RedChannel *channel, new_pipe_item_t creator,
void red_channel_pipes_new_add(RedChannel *channel, new_pipe_item_t creator, void *data);
void red_channel_pipes_new_add_tail(RedChannel *channel, new_pipe_item_t creator, void *data);
-void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos);
-int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item);
-/* for types that use this routine -> the pipe item should be freed */
-void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type);
void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type);
-void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type);
void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type);
-void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc);
-void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window);
-void red_channel_client_push_set_ack(RedChannelClient *rcc);
-
int red_channel_get_first_socket(RedChannel *channel);
/* return TRUE if all of the connected clients to this channel are blocked */
@@ -496,24 +362,13 @@ int red_channel_all_blocked(RedChannel *channel);
/* return TRUE if any of the connected clients to this channel are blocked */
int red_channel_any_blocked(RedChannel *channel);
-int red_channel_client_blocked(RedChannelClient *rcc);
-
-/* helper for channels that have complex logic that can possibly ready a send */
-int red_channel_client_send_message_pending(RedChannelClient *rcc);
-
int red_channel_no_item_being_sent(RedChannel *channel);
-int red_channel_client_no_item_being_sent(RedChannelClient *rcc);
// TODO: unstaticed for display/cursor channels. they do some specific pushes not through
// adding elements or on events. but not sure if this is actually required (only result
// should be that they ""try"" a little harder, but if the event system is correct it
// should not make any difference.
void red_channel_push(RedChannel *channel);
-void red_channel_client_push(RedChannelClient *rcc);
-// TODO: again - what is the context exactly? this happens in channel disconnect. but our
-// current red_channel_shutdown also closes the socket - is there a socket to close?
-// are we reading from an fd here? arghh
-void red_channel_client_pipe_clear(RedChannelClient *rcc);
// Again, used in various places outside of event handler context (or in other event handler
// contexts):
// flush_display_commands/flush_cursor_commands
@@ -522,23 +377,10 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc);
// red_wait_pipe_item_sent
// handle_channel_events - this is the only one that was used before, and was in red-channel.c
void red_channel_receive(RedChannel *channel);
-void red_channel_client_receive(RedChannelClient *rcc);
// For red_worker
void red_channel_send(RedChannel *channel);
-void red_channel_client_send(RedChannelClient *rcc);
// For red_worker
void red_channel_disconnect(RedChannel *channel);
-void red_channel_client_disconnect(RedChannelClient *rcc);
-
-/* accessors for RedChannelClient */
-/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */
-SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc);
-RedsStream *red_channel_client_get_stream(RedChannelClient *rcc);
-RedClient *red_channel_client_get_client(RedChannelClient *rcc);
-
-/* Note that the header is valid only between red_channel_reset_send_data and
- * red_channel_begin_send_message.*/
-void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list);
/* return the sum of all the rcc pipe size */
uint32_t red_channel_max_pipe_size(RedChannel *channel);
@@ -592,6 +434,11 @@ RedClient *red_client_ref(RedClient *client);
*/
RedClient *red_client_unref(RedClient *client);
+/* client->lock should be locked */
+void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
+void red_client_remove_channel(RedChannelClient *rcc);
+RedChannelClient *red_client_get_channel(RedClient *client, int type, int id);
+
MainChannelClient *red_client_get_main(RedClient *client);
// main should be set once before all the other channels are created
void red_client_set_main(RedClient *client, MainChannelClient *mcc);
@@ -607,7 +454,8 @@ void red_client_semi_seamless_migrate_complete(RedClient *client); /* dst side *
int red_client_during_migrate_at_target(RedClient *client);
void red_client_migrate(RedClient *client);
-
+gboolean red_client_seamless_migration_done_for_channel(RedClient *client,
+ RedChannelClient *rcc);
/*
* blocking functions.
*
@@ -616,13 +464,9 @@ void red_client_migrate(RedClient *client);
* Return: TRUE if waiting succeeded. FALSE if timeout expired.
*/
-int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
- RedPipeItem *item,
- int64_t timeout);
-int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
- int64_t timeout);
int red_channel_wait_all_sent(RedChannel *channel,
int64_t timeout);
-void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc);
+
+#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro
#endif
diff --git a/server/red-qxl.c b/server/red-qxl.c
index abde0ba..ef39f0e 100644
--- a/server/red-qxl.c
+++ b/server/red-qxl.c
@@ -102,12 +102,13 @@ static void red_qxl_disconnect_display_peer(RedChannelClient *rcc)
{
RedWorkerMessageDisplayDisconnect payload;
Dispatcher *dispatcher;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
- if (!rcc->channel) {
+ if (!channel) {
return;
}
- dispatcher = (Dispatcher *)rcc->channel->data;
+ dispatcher = (Dispatcher *)channel->data;
spice_printerr("");
payload.rcc = rcc;
@@ -123,11 +124,12 @@ static void red_qxl_display_migrate(RedChannelClient *rcc)
{
RedWorkerMessageDisplayMigrate payload;
Dispatcher *dispatcher;
- if (!rcc->channel) {
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ if (!channel) {
return;
}
- dispatcher = (Dispatcher *)rcc->channel->data;
- spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id);
+ dispatcher = (Dispatcher *)channel->data;
+ spice_printerr("channel type %u id %u", channel->type, channel->id);
payload.rcc = rcc;
dispatcher_send_message(dispatcher,
RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
@@ -162,12 +164,13 @@ static void red_qxl_disconnect_cursor_peer(RedChannelClient *rcc)
{
RedWorkerMessageCursorDisconnect payload;
Dispatcher *dispatcher;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
- if (!rcc->channel) {
+ if (!channel) {
return;
}
- dispatcher = (Dispatcher *)rcc->channel->data;
+ dispatcher = (Dispatcher *)channel->data;
spice_printerr("");
payload.rcc = rcc;
@@ -180,12 +183,13 @@ static void red_qxl_cursor_migrate(RedChannelClient *rcc)
{
RedWorkerMessageCursorMigrate payload;
Dispatcher *dispatcher;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
- if (!rcc->channel) {
+ if (!channel) {
return;
}
- dispatcher = (Dispatcher *)rcc->channel->data;
- spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id);
+ dispatcher = (Dispatcher *)channel->data;
+ spice_printerr("channel type %u id %u", channel->type, channel->id);
payload.rcc = rcc;
dispatcher_send_message(dispatcher,
RED_WORKER_MESSAGE_CURSOR_MIGRATE,
diff --git a/server/red-worker.c b/server/red-worker.c
index 46fbc4d..8e3a90a 100644
--- a/server/red-worker.c
+++ b/server/red-worker.c
@@ -102,7 +102,8 @@ static int display_is_connected(RedWorker *worker)
static uint8_t *common_alloc_recv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size)
{
- CommonGraphicsChannel *common = SPICE_CONTAINEROF(rcc->channel, CommonGraphicsChannel, base);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ CommonGraphicsChannel *common = SPICE_CONTAINEROF(channel, CommonGraphicsChannel, base);
/* SPICE_MSGC_MIGRATE_DATA is the only client message whose size is dynamic */
if (type == SPICE_MSGC_MIGRATE_DATA) {
diff --git a/server/reds.c b/server/reds.c
index 5551db7..b89d6ee 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -1046,12 +1046,14 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
{
RedCharDevice *dev_state = RED_CHAR_DEVICE(reds->agent_dev);
RedChannelClient *rcc;
+ RedClient *client;
if (!reds->vdagent) {
return;
}
spice_assert(reds->vdagent->st && reds->vdagent->st == dev_state);
rcc = main_channel_client_get_base(mcc);
+ client = red_channel_client_get_client(rcc);
reds->agent_dev->priv->client_agent_started = TRUE;
/*
* Note that in older releases, send_tokens were set to ~0 on both client
@@ -1060,11 +1062,11 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
* and vice versa, the sending from the server to the client won't have
* flow control, but will have no other problem.
*/
- if (!red_char_device_client_exists(dev_state, rcc->client)) {
+ if (!red_char_device_client_exists(dev_state, client)) {
int client_added;
client_added = red_char_device_client_add(dev_state,
- rcc->client,
+ client,
TRUE, /* flow control */
REDS_VDI_PORT_NUM_RECEIVE_BUFFS,
REDS_AGENT_WINDOW_SIZE,
@@ -1078,7 +1080,7 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
}
} else {
red_char_device_send_to_client_tokens_set(dev_state,
- rcc->client,
+ client,
num_tokens);
}
@@ -1090,12 +1092,13 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
void reds_on_main_agent_tokens(RedsState *reds, MainChannelClient *mcc, uint32_t num_tokens)
{
+ RedClient *client = red_channel_client_get_client(main_channel_client_get_base(mcc));
if (!reds->vdagent) {
return;
}
spice_assert(reds->vdagent->st);
red_char_device_send_to_client_tokens_add(reds->vdagent->st,
- main_channel_client_get_base(mcc)->client,
+ client,
num_tokens);
}
@@ -1116,7 +1119,7 @@ uint8_t *reds_get_agent_data_buffer(RedsState *reds, MainChannelClient *mcc, siz
}
spice_assert(dev->priv->recv_from_client_buf == NULL);
- client = main_channel_client_get_base(mcc)->client;
+ client = red_channel_client_get_client(main_channel_client_get_base(mcc));
dev->priv->recv_from_client_buf = red_char_device_write_buffer_get(RED_CHAR_DEVICE(dev),
client,
size + sizeof(VDIChunkHeader));
@@ -1481,9 +1484,9 @@ int reds_handle_migrate_data(RedsState *reds, MainChannelClient *mcc,
} else {
spice_debug("agent was not attached on the source host");
if (reds->vdagent) {
+ RedClient *client = red_channel_client_get_client(main_channel_client_get_base(mcc));
/* red_char_device_client_remove disables waiting for migration data */
- red_char_device_client_remove(RED_CHAR_DEVICE(agent_dev),
- main_channel_client_get_base(mcc)->client);
+ red_char_device_client_remove(RED_CHAR_DEVICE(agent_dev), client);
main_channel_push_agent_connected(reds->main_channel);
}
}
@@ -1946,10 +1949,11 @@ int reds_on_migrate_dst_set_seamless(RedsState *reds, MainChannelClient *mcc, ui
reds->dst_do_seamless_migrate = FALSE;
} else {
RedChannelClient *rcc = main_channel_client_get_base(mcc);
+ RedClient *client = red_channel_client_get_client(rcc);
- red_client_set_migration_seamless(rcc->client);
+ red_client_set_migration_seamless(client);
/* linking all the channels that have been connected before migration handshake */
- reds->dst_do_seamless_migrate = reds_link_mig_target_channels(reds, rcc->client);
+ reds->dst_do_seamless_migrate = reds_link_mig_target_channels(reds, client);
}
return reds->dst_do_seamless_migrate;
}
diff --git a/server/smartcard.c b/server/smartcard.c
index a42bcd8..e544f19 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -30,7 +30,7 @@
#include "reds.h"
#include "char-device.h"
-#include "red-channel.h"
+#include "red-channel-client-private.h"
#include "smartcard.h"
#include "migration-protocol.h"
diff --git a/server/sound.c b/server/sound.c
index d790c7a..05ce9b5 100644
--- a/server/sound.c
+++ b/server/sound.c
@@ -35,6 +35,7 @@
#include "main-channel-client.h"
#include "reds.h"
#include "red-qxl.h"
+#include "red-channel-client-private.h"
#include "sound.h"
#include "common/snd_codec.h"
#include "demarshallers.h"
@@ -213,14 +214,16 @@ static void snd_disconnect_channel(SndChannel *channel)
{
SndWorker *worker;
RedsState *reds;
+ RedChannel *red_channel;
if (!channel || !channel->stream) {
spice_debug("not connected");
return;
}
+ red_channel = red_channel_client_get_channel(channel->channel_client);
reds = snd_channel_get_server(channel);
spice_debug("SndChannel=%p rcc=%p type=%d",
- channel, channel->channel_client, channel->channel_client->channel->type);
+ channel, channel->channel_client, red_channel->type);
worker = channel->worker;
channel->cleanup(channel);
red_channel_client_disconnect(worker->connection->channel_client);
@@ -423,12 +426,13 @@ static int snd_record_handle_message(SndChannel *channel, size_t size, uint32_t
static void snd_receive(SndChannel *channel)
{
SpiceDataHeaderOpaque *header;
+ IncomingHandler *incoming = red_channel_client_get_incoming_handler(channel->channel_client);
if (!channel) {
return;
}
- header = &channel->channel_client->incoming.header;
+ header = &incoming->header;
for (;;) {
ssize_t n;
diff --git a/server/spicevmc.c b/server/spicevmc.c
index dffdd9a..bfd93c1 100644
--- a/server/spicevmc.c
+++ b/server/spicevmc.c
@@ -34,6 +34,8 @@
#include "red-channel.h"
#include "reds.h"
#include "migration-protocol.h"
+/* FIXME: only included for sizeof RedChannelClient */
+#include "red-channel-client-private.h"
/* todo: add flow control. i.e.,
* (a) limit the tokens available for the client
@@ -147,14 +149,15 @@ static void spicevmc_chardev_send_msg_to_client(RedPipeItem *msg,
SpiceVmcState *state = opaque;
RedVmcPipeItem *vmc_msg = (RedVmcPipeItem *)msg;
- spice_assert(state->rcc->client == client);
+ spice_assert(red_channel_client_get_client(state->rcc) == client);
red_pipe_item_ref(vmc_msg);
red_channel_client_pipe_add_push(state->rcc, (RedPipeItem *)vmc_msg);
}
static void spicevmc_port_send_init(RedChannelClient *rcc)
{
- SpiceVmcState *state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ SpiceVmcState *state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
SpiceCharDeviceInstance *sin = state->chardev_sin;
RedPortInitPipeItem *item = spice_malloc(sizeof(RedPortInitPipeItem));
@@ -185,7 +188,8 @@ static void spicevmc_char_dev_remove_client(RedClient *client, void *opaque)
SpiceVmcState *state = opaque;
spice_printerr("vmc state %p, client %p", state, client);
- spice_assert(state->rcc && state->rcc->client == client);
+ spice_assert(state->rcc &&
+ red_channel_client_get_client(state->rcc) == client);
red_channel_client_shutdown(state->rcc);
}
@@ -194,8 +198,9 @@ static int spicevmc_red_channel_client_config_socket(RedChannelClient *rcc)
{
int delay_val = 1;
RedsStream *stream = red_channel_client_get_stream(rcc);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
- if (rcc->channel->type == SPICE_CHANNEL_USBREDIR) {
+ if (channel->type == SPICE_CHANNEL_USBREDIR) {
if (setsockopt(stream->socket, IPPROTO_TCP, TCP_NODELAY,
&delay_val, sizeof(delay_val)) != 0) {
if (errno != ENOTSUP && errno != ENOPROTOOPT) {
@@ -212,12 +217,14 @@ static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc)
{
SpiceVmcState *state;
SpiceCharDeviceInterface *sif;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ RedClient *client = red_channel_client_get_client(rcc);
if (!rcc) {
return;
}
- state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+ state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
if (state->recv_from_client_buf) { /* partial message which wasn't pushed to device */
red_char_device_write_buffer_release(state->chardev, state->recv_from_client_buf);
@@ -225,17 +232,17 @@ static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc)
}
if (state->chardev) {
- if (red_char_device_client_exists(state->chardev, rcc->client)) {
- red_char_device_client_remove(state->chardev, rcc->client);
+ if (red_char_device_client_exists(state->chardev, client)) {
+ red_char_device_client_remove(state->chardev, client);
} else {
spice_printerr("client %p have already been removed from char dev %p",
- rcc->client, state->chardev);
+ client, state->chardev);
}
}
/* Don't destroy the rcc if it is already being destroyed, as then
red_client_destroy/red_channel_client_destroy will already do this! */
- if (!rcc->destroying)
+ if (!red_channel_client_is_destroying(rcc))
red_channel_client_destroy(rcc);
state->rcc = NULL;
@@ -247,7 +254,8 @@ static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc)
static SpiceVmcState *spicevmc_red_channel_client_get_state(RedChannelClient *rcc)
{
- return SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ return SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
}
static int spicevmc_channel_client_handle_migrate_flush_mark(RedChannelClient *rcc)
@@ -316,15 +324,17 @@ static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
uint32_t size)
{
SpiceVmcState *state;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
+ RedClient *client = red_channel_client_get_client(rcc);
- state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+ state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
switch (type) {
case SPICE_MSGC_SPICEVMC_DATA:
assert(!state->recv_from_client_buf);
state->recv_from_client_buf = red_char_device_write_buffer_get(state->chardev,
- rcc->client,
+ client,
size);
if (!state->recv_from_client_buf) {
spice_error("failed to allocate write buffer");
@@ -344,8 +354,9 @@ static void spicevmc_red_channel_release_msg_rcv_buf(RedChannelClient *rcc,
uint8_t *msg)
{
SpiceVmcState *state;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
- state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+ state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
switch (type) {
case SPICE_MSGC_SPICEVMC_DATA:
@@ -380,8 +391,9 @@ static void spicevmc_red_channel_send_migrate_data(RedChannelClient *rcc,
RedPipeItem *item)
{
SpiceVmcState *state;
+ RedChannel *channel = red_channel_client_get_channel(rcc);
- state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+ state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item);
spice_marshaller_add_uint32(m, SPICE_MIGRATE_DATA_SPICEVMC_MAGIC);
spice_marshaller_add_uint32(m, SPICE_MIGRATE_DATA_SPICEVMC_VERSION);
diff --git a/server/stream.c b/server/stream.c
index ee3c0b0..f98177e 100644
--- a/server/stream.c
+++ b/server/stream.c
@@ -628,7 +628,7 @@ static uint64_t get_initial_bit_rate(DisplayChannelClient *dcc, Stream *stream)
MainChannelClient *mcc;
uint64_t net_test_bit_rate;
- mcc = red_client_get_main(RED_CHANNEL_CLIENT(dcc)->client);
+ mcc = red_client_get_main(red_channel_client_get_client(RED_CHANNEL_CLIENT(dcc)));
net_test_bit_rate = main_channel_client_is_network_info_initialized(mcc) ?
main_channel_client_get_bitrate_per_sec(mcc) :
0;
@@ -660,7 +660,8 @@ static uint32_t get_roundtrip_ms(void *opaque)
roundtrip = red_channel_client_get_roundtrip_ms(RED_CHANNEL_CLIENT(agent->dcc));
if (roundtrip < 0) {
- MainChannelClient *mcc = red_client_get_main(RED_CHANNEL_CLIENT(agent->dcc)->client);
+ MainChannelClient *mcc =
+ red_client_get_main(red_channel_client_get_client(RED_CHANNEL_CLIENT(agent->dcc)));
/*
* the main channel client roundtrip might not have been
@@ -684,7 +685,9 @@ static void update_client_playback_delay(void *opaque, uint32_t delay_ms)
{
StreamAgent *agent = opaque;
DisplayChannelClient *dcc = agent->dcc;
- RedsState *reds = red_channel_get_server(((RedChannelClient*)dcc)->channel);
+ RedChannel *channel = red_channel_client_get_channel(RED_CHANNEL_CLIENT(dcc));
+ RedClient *client = red_channel_client_get_client(RED_CHANNEL_CLIENT(dcc));
+ RedsState *reds = red_channel_get_server(channel);
dcc_update_streams_max_latency(dcc, agent);
@@ -694,7 +697,7 @@ static void update_client_playback_delay(void *opaque, uint32_t delay_ms)
}
spice_debug("resetting client latency: %u", dcc_get_max_stream_latency(agent->dcc));
main_dispatcher_set_mm_time_latency(reds_get_main_dispatcher(reds),
- RED_CHANNEL_CLIENT(agent->dcc)->client,
+ client,
dcc_get_max_stream_latency(agent->dcc));
}
--
2.4.11
More information about the Spice-devel
mailing list