[Spice-devel] [PATCH 01/10] server: split main_channel from reds
Alon Levy
alevy at redhat.com
Wed Jan 12 21:01:34 PST 2011
---
server/Makefile.am | 1 +
server/main_channel.c | 745 +++++++++++++++++++++++++++++++++++++++
server/main_channel.h | 78 +++++
server/reds.c | 917 +++++++++++--------------------------------------
server/reds.h | 22 ++
5 files changed, 1044 insertions(+), 719 deletions(-)
create mode 100644 server/main_channel.c
create mode 100644 server/main_channel.h
diff --git a/server/Makefile.am b/server/Makefile.am
index ab66ba7..d265bfb 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -114,6 +114,7 @@ libspice_server_la_SOURCES = \
red_parse_qxl.c \
red_parse_qxl.h \
reds.c \
+ main_channel.c \
inputs_channel.c \
reds.h \
stat.h \
diff --git a/server/main_channel.c b/server/main_channel.c
new file mode 100644
index 0000000..7dff2b4
--- /dev/null
+++ b/server/main_channel.c
@@ -0,0 +1,745 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <limits.h>
+#include <time.h>
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <ctype.h>
+
+#include "server/red_common.h"
+#include "server/demarshallers.h"
+#include "common/ring.h"
+#include "common/messages.h"
+#include "reds.h"
+#include "main_channel.h"
+#include "generated_marshallers.h"
+
+#define ZERO_BUF_SIZE 4096
+
+// approximate max receive message size for main channel
+#define RECEIVE_BUF_SIZE \
+ (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
+
+#define REDS_MAX_SEND_IOVEC 100
+
+#define NET_TEST_WARMUP_BYTES 0
+#define NET_TEST_BYTES (1024 * 250)
+
+static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
+
+typedef struct RedsOutItem RedsOutItem;
+struct RedsOutItem {
+ RingItem link;
+ SpiceMarshaller *m;
+ SpiceDataHeader *header;
+};
+
+typedef struct RedsOutgoingData {
+ Ring pipe;
+ RedsOutItem *item;
+ int vec_size;
+ struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
+ struct iovec *vec;
+} RedsOutgoingData;
+
+// TODO - remove and use red_channel.h
+typedef struct IncomingHandler {
+ spice_parse_channel_func_t parser;
+ void *opaque;
+ int shut;
+ uint8_t buf[RECEIVE_BUF_SIZE];
+ uint32_t end_pos;
+ void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
+} IncomingHandler;
+
+typedef struct MainChannel {
+ RedsStreamContext *peer;
+ IncomingHandler in_handler;
+ RedsOutgoingData outgoing;
+ uint64_t serial; //migrate me
+ uint32_t ping_id;
+ uint32_t net_test_id;
+ int net_test_stage;
+} MainChannel;
+
+enum NetTestStage {
+ NET_TEST_STAGE_INVALID,
+ NET_TEST_STAGE_WARMUP,
+ NET_TEST_STAGE_LATENCY,
+ NET_TEST_STAGE_RATE,
+};
+
+static uint64_t latency = 0;
+uint64_t bitrate_per_sec = ~0;
+
+static void main_channel_out_item_free(RedsOutItem *item);
+
+static void main_reset_outgoing(MainChannel *main_chan)
+{
+ RedsOutgoingData *outgoing = &main_chan->outgoing;
+ RingItem *ring_item;
+
+ if (outgoing->item) {
+ main_channel_out_item_free(outgoing->item);
+ outgoing->item = NULL;
+ }
+ while ((ring_item = ring_get_tail(&outgoing->pipe))) {
+ RedsOutItem *out_item = (RedsOutItem *)ring_item;
+ ring_remove(ring_item);
+ main_channel_out_item_free(out_item);
+ }
+ outgoing->vec_size = 0;
+ outgoing->vec = outgoing->vec_buf;
+}
+
+// ALON from reds_disconnect
+static void main_disconnect(MainChannel *main_chan)
+{
+ if (!main_chan || !main_chan->peer) {
+ return;
+ }
+ main_reset_outgoing(main_chan);
+ core->watch_remove(main_chan->peer->watch);
+ main_chan->peer->watch = NULL;
+ main_chan->peer->cb_free(main_chan->peer);
+ main_chan->peer = NULL;
+ main_chan->in_handler.shut = TRUE;
+ main_chan->serial = 0;
+ main_chan->ping_id = 0;
+ main_chan->net_test_id = 0;
+ main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
+ main_chan->in_handler.end_pos = 0;
+
+ // TODO: Should probably reset these on the ping start, not here
+ latency = 0;
+ bitrate_per_sec = ~0;
+}
+
+void main_channel_start_net_test(Channel *channel)
+{
+ MainChannel *main_chan = channel->data;
+
+ if (!main_chan || main_chan->net_test_id) {
+ return;
+ }
+
+ if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES) &&
+ main_channel_push_ping(channel, 0) &&
+ main_channel_push_ping(channel, NET_TEST_BYTES)) {
+ main_chan->net_test_id = main_chan->ping_id - 2;
+ main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
+ }
+}
+
+static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+{
+ for (;;) {
+ uint8_t *buf = handler->buf;
+ uint32_t pos = handler->end_pos;
+ uint8_t *end = buf + pos;
+ SpiceDataHeader *header;
+ int n;
+ n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
+ if (n <= 0) {
+ if (n == 0) {
+ return -1;
+ }
+ switch (errno) {
+ case EAGAIN:
+ return 0;
+ case EINTR:
+ break;
+ case EPIPE:
+ return -1;
+ default:
+ red_printf("%s", strerror(errno));
+ return -1;
+ }
+ } else {
+ pos += n;
+ end = buf + pos;
+ while (buf + sizeof(SpiceDataHeader) <= end &&
+ buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
+ uint8_t *data = (uint8_t *)(header+1);
+ size_t parsed_size;
+ uint8_t *parsed;
+ message_destructor_t parsed_free;
+
+ buf += sizeof(SpiceDataHeader) + header->size;
+ parsed = handler->parser(data, data + header->size, header->type,
+ SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
+ if (parsed == NULL) {
+ red_printf("failed to parse message type %d", header->type);
+ return -1;
+ }
+ handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
+ parsed_free(parsed);
+ if (handler->shut) {
+ return -1;
+ }
+ }
+ memmove(handler->buf, buf, (handler->end_pos = end - buf));
+ }
+ }
+}
+
+static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
+{
+ RedsOutItem *item;
+
+ item = spice_new(RedsOutItem, 1);
+ ring_item_init(&item->link);
+
+ item->m = spice_marshaller_new();
+ item->header = (SpiceDataHeader *)
+ spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
+ spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
+
+ item->header->serial = ++main_chan->serial;
+ item->header->type = type;
+ item->header->sub_list = 0;
+
+ return item;
+}
+
+static void main_channel_out_item_free(RedsOutItem *item)
+{
+ spice_marshaller_destroy(item->m);
+ free(item);
+}
+
+static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int *vec_size)
+{
+ struct iovec *now = vec;
+
+ while (skip && skip >= now->iov_len) {
+ skip -= now->iov_len;
+ --*vec_size;
+ now++;
+ }
+ now->iov_base = (uint8_t *)now->iov_base + skip;
+ now->iov_len -= skip;
+ return now;
+}
+
+static int main_channel_send_data(MainChannel *main_chan)
+{
+ RedsOutgoingData *outgoing = &main_chan->outgoing;
+ int n;
+
+ if (!outgoing->item) {
+ return TRUE;
+ }
+
+ ASSERT(outgoing->vec_size);
+ for (;;) {
+ if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
+ switch (errno) {
+ case EAGAIN:
+ core->watch_update_mask(main_chan->peer->watch,
+ SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
+ return FALSE;
+ case EINTR:
+ break;
+ case EPIPE:
+ reds_disconnect();
+ return FALSE;
+ default:
+ red_printf("%s", strerror(errno));
+ reds_disconnect();
+ return FALSE;
+ }
+ } else {
+ outgoing->vec = main_channel_iovec_skip(outgoing->vec, n, &outgoing->vec_size);
+ if (!outgoing->vec_size) {
+ main_channel_out_item_free(outgoing->item);
+ outgoing->item = NULL;
+ outgoing->vec = outgoing->vec_buf;
+ return TRUE;
+ }
+ }
+ }
+}
+
+static void main_channel_push(MainChannel *main_chan)
+{
+ RedsOutgoingData *outgoing = &main_chan->outgoing;
+ RingItem *ring_item;
+ RedsOutItem *item;
+
+ for (;;) {
+ if (!main_chan->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
+ return;
+ }
+ ring_remove(ring_item);
+ outgoing->item = item = (RedsOutItem *)ring_item;
+
+ spice_marshaller_flush(item->m);
+ item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
+
+ outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
+ outgoing->vec_buf,
+ REDS_MAX_SEND_IOVEC, 0);
+ main_channel_send_data(main_chan);
+ }
+}
+
+static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem *item)
+{
+ ring_add(&main_chan->outgoing.pipe, &item->link);
+ main_channel_push(main_chan);
+}
+
+static void main_channel_push_channels(MainChannel *main_chan)
+{
+ SpiceMsgChannels* channels_info;
+ RedsOutItem *item;
+
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
+ channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds_num_of_channels() * sizeof(SpiceChannelId));
+ reds_fill_channels(channels_info);
+ spice_marshall_msg_main_channels_list(item->m, channels_info);
+ free(channels_info);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+int main_channel_push_ping(Channel *channel, int size)
+{
+ struct timespec time_space;
+ RedsOutItem *item;
+ SpiceMsgPing ping;
+ MainChannel *main_chan = channel->data;
+
+ if (!main_chan) {
+ return FALSE;
+ }
+ item = new_out_item(main_chan, SPICE_MSG_PING);
+ ping.id = ++main_chan->ping_id;
+ clock_gettime(CLOCK_MONOTONIC, &time_space);
+ ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
+ spice_marshall_msg_ping(item->m, &ping);
+
+ while (size > 0) {
+ int now = MIN(ZERO_BUF_SIZE, size);
+ size -= now;
+ spice_marshaller_add_ref(item->m, zero_page, now);
+ }
+
+ main_channel_push_pipe_item(main_chan, item);
+
+ return TRUE;
+}
+
+void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed)
+{
+ SpiceMsgMainMouseMode mouse_mode;
+ RedsOutItem *item;
+ MainChannel *main_chan;
+
+ if (!channel) {
+ return;
+ }
+ main_chan = channel->data;
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
+ mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
+ if (is_client_mouse_allowed) {
+ mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
+ }
+ mouse_mode.current_mode = current_mode;
+
+ spice_marshall_msg_main_mouse_mode(item->m, &mouse_mode);
+
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_agent_connected(Channel *channel)
+{
+ RedsOutItem *item;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_agent_disconnected(Channel *channel)
+{
+ SpiceMsgMainAgentDisconnect disconnect;
+ RedsOutItem *item;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+ disconnect.error_code = SPICE_LINK_ERR_OK;
+ spice_marshall_msg_main_agent_disconnected(item->m, &disconnect);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
+{
+ SpiceMsgMainAgentTokens tokens;
+ RedsOutItem *item;
+ MainChannel *main_chan = channel->data;
+
+ if (!main_chan) {
+ return;
+ }
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
+ tokens.num_tokens = num_tokens;
+ spice_marshall_msg_main_agent_token(item->m, &tokens);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
+ spice_marshaller_item_free_func free_data, void *opaque)
+{
+ RedsOutItem *item;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
+ spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+static void main_channel_push_migrate_data_item(MainChannel *main_chan)
+{
+ RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
+ SpiceMarshaller *m = item->m;
+ MainMigrateData *data = (MainMigrateData *)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));
+
+ reds_push_migrate_data_item(m, data); // TODO: from reds split. ugly separation.
+ data->serial = main_chan->serial;
+ data->ping_id = main_chan->ping_id;
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+static void main_channel_receive_migrate_data(MainChannel *main_chan, MainMigrateData *data, uint8_t *end)
+{
+ main_chan->serial = data->serial;
+ main_chan->ping_id = data->ping_id;
+}
+
+void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
+ int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+ int ram_hint)
+{
+ RedsOutItem *item;
+ SpiceMsgMainInit init;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
+ init.session_id = connection_id;
+ init.display_channels_hint = display_channels_hint;
+ init.current_mouse_mode = current_mouse_mode;
+ init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
+ if (is_client_mouse_allowed) {
+ init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
+ }
+ init.agent_connected = reds_has_vdagent();
+ init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
+ init.multi_media_time = multi_media_time;
+ init.ram_hint = ram_hint;
+ spice_marshall_msg_main_init(item->m, &init);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len)
+{
+ // TODO possible free-then-use bug - caller frees mess after this, but is that pointer being
+ // used by spice_marshaller?
+ RedsOutItem *item;
+ SpiceMsgNotify notify;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
+ notify.time_stamp = get_time_stamp();
+ notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
+ notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
+ notify.what = SPICE_WARN_GENERAL;
+ notify.message_len = mess_len;
+ spice_marshall_msg_notify(item->m, ¬ify);
+ spice_marshaller_add(item->m, mess, mess_len + 1);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
+ uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key)
+{
+ MainChannel *main_chan = channel->data;
+ RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
+ SpiceMsgMainMigrationBegin migrate;
+
+ migrate.port = port;
+ migrate.sport = sport;
+ migrate.host_size = strlen(host) + 1;
+ migrate.host_data = (uint8_t *)host;
+ migrate.pub_key_type = cert_pub_key_type;
+ migrate.pub_key_size = cert_pub_key_len;
+ migrate.pub_key_data = cert_pub_key;
+ spice_marshall_msg_main_migrate_begin(item->m, &migrate);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate(Channel *channel)
+{
+ RedsOutItem *item;
+ SpiceMsgMigrate migrate;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
+ migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
+ spice_marshall_msg_migrate(item->m, &migrate);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate_cancel(Channel *channel)
+{
+ MainChannel *main_chan = channel->data;
+ RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
+
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_multi_media_time(Channel *channel, int time)
+{
+ SpiceMsgMainMultiMediaTime time_mes;
+ RedsOutItem *item;
+ MainChannel *main_chan = channel->data;
+
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
+ time_mes.time = time;
+ spice_marshall_msg_main_multi_media_time(item->m, &time_mes);
+ main_channel_push_pipe_item(main_chan, item);
+}
+
+void main_channel_push_migrate_switch(Channel *channel)
+{
+ SpiceMsgMainMigrationSwitchHost migrate;
+ RedsOutItem *item;
+ MainChannel *main_chan;
+
+ if (!channel) {
+ return;
+ }
+ main_chan = channel->data;
+ red_printf("");
+ item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+ reds_fill_mig_switch(&migrate);
+ spice_marshall_msg_main_migrate_switch_host(item->m, &migrate);
+ main_channel_push_pipe_item(main_chan, item);
+ reds_mig_release();
+}
+
+static void main_channel_handle_message(void *opaque, size_t size, uint32_t type, void *message)
+{
+ MainChannel *main_chan = opaque;
+
+ switch (type) {
+ case SPICE_MSGC_MAIN_AGENT_START:
+ red_printf("agent start");
+ if (!main_chan) {
+ return;
+ }
+ reds_on_main_agent_start(main_chan);
+ break;
+ case SPICE_MSGC_MAIN_AGENT_DATA: {
+ reds_on_main_agent_data(message, size);
+ break;
+ }
+ case SPICE_MSGC_MAIN_AGENT_TOKEN:
+ break;
+ case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
+ main_channel_push_channels(main_chan);
+ break;
+ case SPICE_MSGC_MAIN_MIGRATE_CONNECTED:
+ red_printf("connected");
+ reds_on_main_migrate_connected();
+ break;
+ case SPICE_MSGC_MAIN_MIGRATE_CONNECT_ERROR:
+ red_printf("mig connect error");
+ reds_on_main_migrate_connect_error();
+ break;
+ case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST:
+ reds_on_main_mouse_mode_request(message, size);
+ break;
+ case SPICE_MSGC_PONG: {
+ SpiceMsgPing *ping = (SpiceMsgPing *)message;
+ uint64_t roundtrip;
+ struct timespec ts;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ roundtrip = ts.tv_sec * 1000000LL + ts.tv_nsec / 1000LL - ping->timestamp;
+
+ if (ping->id == main_chan->net_test_id) {
+ switch (main_chan->net_test_stage) {
+ case NET_TEST_STAGE_WARMUP:
+ main_chan->net_test_id++;
+ main_chan->net_test_stage = NET_TEST_STAGE_LATENCY;
+ break;
+ case NET_TEST_STAGE_LATENCY:
+ main_chan->net_test_id++;
+ main_chan->net_test_stage = NET_TEST_STAGE_RATE;
+ latency = roundtrip;
+ break;
+ case NET_TEST_STAGE_RATE:
+ main_chan->net_test_id = 0;
+ if (roundtrip <= latency) {
+ // probably high load on client or server result with incorrect values
+ latency = 0;
+ red_printf("net test: invalid values, latency %lu roundtrip %lu. assuming high"
+ "bandwidth", latency, roundtrip);
+ break;
+ }
+ bitrate_per_sec = (uint64_t)(NET_TEST_BYTES * 8) * 1000000 / (roundtrip - latency);
+ red_printf("net test: latency %f ms, bitrate %lu bps (%f Mbps)%s",
+ (double)latency / 1000,
+ bitrate_per_sec,
+ (double)bitrate_per_sec / 1024 / 1024,
+ IS_LOW_BANDWIDTH() ? " LOW BANDWIDTH" : "");
+ main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
+ break;
+ default:
+ red_printf("invalid net test stage, ping id %d test id %d stage %d",
+ ping->id,
+ main_chan->net_test_id,
+ main_chan->net_test_stage);
+ }
+ break;
+ }
+#ifdef RED_STATISTICS
+ reds_update_stat_value(roundtrip);
+#endif
+ break;
+ }
+ case SPICE_MSGC_MIGRATE_FLUSH_MARK:
+ main_channel_push_migrate_data_item(main_chan);
+ break;
+ case SPICE_MSGC_MIGRATE_DATA: {
+ MainMigrateData *data = (MainMigrateData *)message;
+ uint8_t *end = ((uint8_t *)message) + size;
+ main_channel_receive_migrate_data(main_chan, data, end);
+ reds_on_main_receive_migrate_data(data, end);
+ break;
+ }
+ case SPICE_MSGC_DISCONNECTING:
+ break;
+ default:
+ red_printf("unexpected type %d", type);
+ }
+}
+
+static void main_channel_event(int fd, int event, void *data)
+{
+ MainChannel *main_chan = data;
+
+ if (event & SPICE_WATCH_EVENT_READ) {
+ if (handle_incoming(main_chan->peer, &main_chan->in_handler)) {
+ main_disconnect(main_chan);
+ reds_disconnect();
+ }
+ }
+ if (event & SPICE_WATCH_EVENT_WRITE) {
+ RedsOutgoingData *outgoing = &main_chan->outgoing;
+ if (main_channel_send_data(main_chan)) {
+ main_channel_push(main_chan);
+ if (!outgoing->item && main_chan->peer) {
+ core->watch_update_mask(main_chan->peer->watch,
+ SPICE_WATCH_EVENT_READ);
+ }
+ }
+ }
+}
+
+static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
+{
+ MainChannel *main_chan;
+
+ main_chan = spice_malloc0(sizeof(MainChannel));
+ channel->data = main_chan;
+ main_chan->peer = peer;
+ main_chan->in_handler.shut = FALSE;
+ main_chan->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
+ main_chan->in_handler.opaque = main_chan;
+ main_chan->in_handler.handle_message = main_channel_handle_message;
+ ring_init(&main_chan->outgoing.pipe);
+ main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
+ peer->watch = core->watch_add(peer->socket,
+ SPICE_WATCH_EVENT_READ,
+ main_channel_event, main_chan);
+}
+
+int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen)
+{
+ MainChannel *main_chan = channel->data;
+
+ return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
+}
+
+int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen)
+{
+ MainChannel *main_chan = channel->data;
+
+ return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
+}
+
+void main_channel_close(Channel *channel)
+{
+ MainChannel *main_chan = channel->data;
+
+ if (main_chan && main_chan->peer) {
+ close(main_chan->peer->socket);
+ }
+}
+
+static void main_channel_shutdown(Channel *channel)
+{
+ MainChannel *main_chan = channel->data;
+
+ if (main_chan != NULL) {
+ main_disconnect(main_chan); // TODO - really here? reset peer etc.
+ }
+ free(main_chan);
+}
+
+static void main_channel_migrate()
+{
+}
+
+Channel* main_channel_init(void)
+{
+ Channel *channel;
+
+ channel = spice_new0(Channel, 1);
+ channel->type = SPICE_CHANNEL_MAIN;
+ channel->link = main_channel_link;
+ channel->shutdown = main_channel_shutdown;
+ channel->migrate = main_channel_migrate;
+ return channel;
+}
+
diff --git a/server/main_channel.h b/server/main_channel.h
new file mode 100644
index 0000000..db95dc2
--- /dev/null
+++ b/server/main_channel.h
@@ -0,0 +1,78 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __MAIN_CHANNEL_H__
+#define __MAIN_CHANNEL_H__
+
+#include <stdint.h>
+#include <spice/vd_agent.h>
+#include "common/marshaller.h"
+
+/* This is a temporary measure for reds/main split - should not be in a header,
+ * but private (although only reds.c includes main_channel.h) */
+struct MainMigrateData {
+ uint32_t version;
+ uint32_t serial;
+ uint32_t ping_id;
+
+ uint32_t agent_connected;
+ uint32_t client_agent_started;
+ uint32_t num_client_tokens;
+ uint32_t send_tokens;
+
+ uint32_t read_state;
+ VDIChunkHeader vdi_chunk_header;
+ uint32_t recive_len;
+ uint32_t message_recive_len;
+ uint32_t read_buf_len;
+
+ uint32_t write_queue_size;
+};
+
+Channel *main_channel_init();
+void main_channel_close(Channel *channel); // not destroy, just socket close
+int main_channel_push_ping(Channel *channel, int size);
+void main_channel_push_mouse_mode(Channel *channel, int current_mode, int is_client_mouse_allowed);
+void main_channel_push_agent_connected(Channel *channel);
+void main_channel_push_agent_disconnected(Channel *channel);
+void main_channel_push_tokens(Channel *channel, uint32_t num_tokens);
+void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
+ spice_marshaller_item_free_func free_data, void *opaque);
+void main_channel_start_net_test(Channel *channel);
+// TODO: huge. Consider making a reds_* interface for these functions
+// and calling from main.
+void main_channel_push_init(Channel *channel, int connection_id, int display_channels_hint,
+ int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+ int ram_hint);
+void main_channel_push_notify(Channel *channel, uint8_t *mess, const int mess_len);
+// TODO: consider exporting RedsMigSpice from reds.c
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport, char *host,
+ uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t *cert_pub_key);
+void main_channel_push_migrate(Channel *channel);
+void main_channel_push_migrate_switch(Channel *channel);
+void main_channel_push_migrate_cancel(Channel *channel);
+void main_channel_push_multi_media_time(Channel *channel, int time);
+int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t *salen);
+int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t *salen);
+
+// TODO: Defines used to calculate receive buffer size, and also by reds.c
+// other options: is to make a reds_main_consts.h, to duplicate defines.
+#define REDS_AGENT_WINDOW_SIZE 10
+#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
+
+#endif
+
diff --git a/server/reds.c b/server/reds.c
index ba6f552..d6397e5 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -46,6 +46,7 @@
#include <spice/vd_agent.h>
#include "inputs_channel.h"
+#include "main_channel.h"
#include "red_common.h"
#include "red_dispatcher.h"
#include "snd_worker.h"
@@ -74,14 +75,8 @@ static SpiceCharDeviceInstance *vdagent = NULL;
#define REDS_MIG_ABORT 2
#define REDS_MIG_DIFF_VERSION 3
-#define REDS_AGENT_WINDOW_SIZE 10
#define REDS_TOKENS_TO_SEND 5
-#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
#define REDS_VDI_PORT_NUM_RECEIVE_BUFFS 5
-#define REDS_MAX_SEND_IOVEC 100
-
-#define NET_TEST_WARMUP_BYTES 0
-#define NET_TEST_BYTES (1024 * 250)
static int spice_port = -1;
static int spice_secure_port = -1;
@@ -109,24 +104,6 @@ static void openssl_init();
#define MM_TIME_DELTA 400 /*ms*/
#define VDI_PORT_WRITE_RETRY_TIMEOUT 100 /*ms*/
-// approximate max receive message size for main channel
-#define RECEIVE_BUF_SIZE \
- (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
-
-#define SCROLL_LOCK_SCAN_CODE 0x46
-#define NUM_LOCK_SCAN_CODE 0x45
-#define CAPS_LOCK_SCAN_CODE 0x3a
-
-// TODO - remove and use red_channel.h
-typedef struct IncomingHandler {
- spice_parse_channel_func_t parser;
- void *opaque;
- int shut;
- uint8_t buf[RECEIVE_BUF_SIZE];
- uint32_t end_pos;
- void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
-} IncomingHandler;
-
typedef struct TicketAuthentication {
char password[SPICE_MAX_PASSWORD_LENGTH];
@@ -147,13 +124,6 @@ typedef struct MonitorMode {
uint32_t y_res;
} MonitorMode;
-typedef struct RedsOutItem RedsOutItem;
-struct RedsOutItem {
- RingItem link;
- SpiceMarshaller *m;
- SpiceDataHeader *header;
-};
-
typedef struct VDIReadBuf {
RingItem link;
int len;
@@ -193,21 +163,6 @@ typedef struct VDIPortState {
int client_agent_started;
} VDIPortState;
-typedef struct RedsOutgoingData {
- Ring pipe;
- RedsOutItem *item;
- int vec_size;
- struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
- struct iovec *vec;
-} RedsOutgoingData;
-
-enum NetTestStage {
- NET_TEST_STAGE_INVALID,
- NET_TEST_STAGE_WARMUP,
- NET_TEST_STAGE_LATENCY,
- NET_TEST_STAGE_RATE,
-};
-
#ifdef RED_STATISTICS
#define REDS_MAX_STAT_NODES 100
@@ -230,12 +185,11 @@ typedef struct RedsState {
int secure_listen_socket;
SpiceWatch *listen_watch;
SpiceWatch *secure_listen_watch;
- RedsStreamContext *peer;
int disconnecting;
- uint32_t link_id;
- uint64_t serial; //migrate me
VDIPortState agent_state;
int pending_mouse_event;
+ uint32_t link_id;
+ Channel *main_channel;
int mig_wait_connect;
int mig_wait_disconnect;
@@ -243,8 +197,6 @@ typedef struct RedsState {
int mig_target;
RedsMigSpice *mig_spice;
int num_of_channels;
- IncomingHandler in_handler;
- RedsOutgoingData outgoing;
Channel *channels;
int mouse_mode;
int is_client_mouse_allowed;
@@ -266,15 +218,9 @@ typedef struct RedsState {
SpiceTimer *ping_timer;
int ping_interval;
#endif
- uint32_t ping_id;
- uint32_t net_test_id;
- int net_test_stage;
int peer_minor_version;
} RedsState;
-uint64_t bitrate_per_sec = ~0;
-static uint64_t latency = 0;
-
static RedsState *reds = NULL;
typedef struct AsyncRead {
@@ -336,13 +282,6 @@ struct ChannelSecurityOptions {
ChannelSecurityOptions *next;
};
-#define ZERO_BUF_SIZE 4096
-
-static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
-
-static void reds_push();
-static void reds_out_item_free(RedsOutItem *item);
-
static ChannelSecurityOptions *channels_security = NULL;
static int default_channel_security =
SPICE_CHANNEL_SECURITY_NONE | SPICE_CHANNEL_SECURITY_SSL;
@@ -481,20 +420,6 @@ static inline void reds_release_link(RedLinkInfo *link)
peer->cb_free(peer);
}
-static struct iovec *reds_iovec_skip(struct iovec vec[], int skip, int *vec_size)
-{
- struct iovec *now = vec;
-
- while (skip && skip >= now->iov_len) {
- skip -= now->iov_len;
- --*vec_size;
- now++;
- }
- now->iov_base = (uint8_t *)now->iov_base + skip;
- now->iov_len -= skip;
- return now;
-}
-
#ifdef RED_STATISTICS
void insert_stat_node(StatNodeRef parent, StatNodeRef ref)
@@ -598,8 +523,10 @@ void stat_remove_counter(uint64_t *counter)
stat_remove((SpiceStatNode *)(counter - offsetof(SpiceStatNode, value)));
}
-static void reds_update_stat_value(RedsStatValue* stat_value, uint32_t value)
+void reds_update_stat_value(uint32_t value)
{
+ RedsStatValue *stat_value = &reds->roundtrip_stat;
+
stat_value->value = value;
stat_value->min = (stat_value->count ? MIN(stat_value->min, value) : value);
stat_value->max = MAX(stat_value->max, value);
@@ -685,33 +612,20 @@ static void reds_reset_vdp()
state->client_agent_started = FALSE;
}
-static void reds_reset_outgoing()
+int reds_main_channel_connected()
{
- RedsOutgoingData *outgoing = &reds->outgoing;
- RingItem *ring_item;
-
- if (outgoing->item) {
- reds_out_item_free(outgoing->item);
- outgoing->item = NULL;
- }
- while ((ring_item = ring_get_tail(&outgoing->pipe))) {
- RedsOutItem *out_item = (RedsOutItem *)ring_item;
- ring_remove(ring_item);
- reds_out_item_free(out_item);
- }
- outgoing->vec_size = 0;
- outgoing->vec = outgoing->vec_buf;
+ return !!reds->main_channel;
}
void reds_disconnect()
{
- if (!reds->peer || reds->disconnecting) {
+ if (!reds_main_channel_connected() || reds->disconnecting) {
return;
}
red_printf("");
reds->disconnecting = TRUE;
- reds_reset_outgoing();
+ reds->link_id = 0;
if (reds->agent_state.connected) {
SpiceCharDeviceInterface *sif;
@@ -724,178 +638,32 @@ void reds_disconnect()
}
reds_shatdown_channels();
- core->watch_remove(reds->peer->watch);
- reds->peer->watch = NULL;
- reds->peer->cb_free(reds->peer);
- reds->peer = NULL;
- reds->in_handler.shut = TRUE;
- reds->link_id = 0;
- reds->serial = 0;
- reds->ping_id = 0;
- reds->net_test_id = 0;
- reds->net_test_stage = NET_TEST_STAGE_INVALID;
- reds->in_handler.end_pos = 0;
-
- bitrate_per_sec = ~0;
- latency = 0;
-
+ reds->main_channel->shutdown(reds->main_channel);
+ reds->main_channel = NULL;
reds_mig_cleanup();
reds->disconnecting = FALSE;
}
static void reds_mig_disconnect()
{
- if (reds->peer) {
+ if (reds_main_channel_connected()) {
reds_disconnect();
} else {
reds_mig_cleanup();
}
}
-static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
-{
- for (;;) {
- uint8_t *buf = handler->buf;
- uint32_t pos = handler->end_pos;
- uint8_t *end = buf + pos;
- SpiceDataHeader *header;
- int n;
- n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
- if (n <= 0) {
- if (n == 0) {
- return -1;
- }
- switch (errno) {
- case EAGAIN:
- return 0;
- case EINTR:
- break;
- case EPIPE:
- return -1;
- default:
- red_printf("%s", strerror(errno));
- return -1;
- }
- } else {
- pos += n;
- end = buf + pos;
- while (buf + sizeof(SpiceDataHeader) <= end &&
- buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
- uint8_t *data = (uint8_t *)(header+1);
- size_t parsed_size;
- uint8_t *parsed;
- message_destructor_t parsed_free;
-
-
- buf += sizeof(SpiceDataHeader) + header->size;
- parsed = handler->parser(data, data + header->size, header->type,
- SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
- if (parsed == NULL) {
- red_printf("failed to parse message type %d", header->type);
- return -1;
- }
- handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
- parsed_free(parsed);
- if (handler->shut) {
- return -1;
- }
- }
- memmove(handler->buf, buf, (handler->end_pos = end - buf));
- }
- }
-}
-
-static RedsOutItem *new_out_item(uint32_t type)
-{
- RedsOutItem *item;
-
- item = spice_new(RedsOutItem, 1);
- ring_item_init(&item->link);
-
- item->m = spice_marshaller_new();
- item->header = (SpiceDataHeader *)
- spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
- spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
-
- item->header->serial = ++reds->serial;
- item->header->type = type;
- item->header->sub_list = 0;
-
- return item;
-}
-
-static void reds_out_item_free(RedsOutItem *item)
-{
- spice_marshaller_destroy(item->m);
- free(item);
-}
-
-static void reds_push_pipe_item(RedsOutItem *item)
-{
- ring_add(&reds->outgoing.pipe, &item->link);
- reds_push();
-}
-
-static void reds_send_channels()
-{
- SpiceMsgChannels* channels_info;
- RedsOutItem *item;
- Channel *channel;
- int i;
-
- item = new_out_item(SPICE_MSG_MAIN_CHANNELS_LIST);
- channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) + reds->num_of_channels * sizeof(SpiceChannelId));
- channels_info->num_of_channels = reds->num_of_channels;
- channel = reds->channels;
-
- for (i = 0; i < reds->num_of_channels; i++) {
- ASSERT(channel);
- channels_info->channels[i].type = channel->type;
- channels_info->channels[i].id = channel->id;
- channel = channel->next;
- }
- spice_marshall_msg_main_channels_list(item->m, channels_info);
- free(channels_info);
- reds_push_pipe_item(item);
-}
-
-static int send_ping(int size)
-{
- struct timespec time_space;
- RedsOutItem *item;
- SpiceMsgPing ping;
-
- if (!reds->peer) {
- return FALSE;
- }
- item = new_out_item(SPICE_MSG_PING);
- ping.id = ++reds->ping_id;
- clock_gettime(CLOCK_MONOTONIC, &time_space);
- ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
- spice_marshall_msg_ping(item->m, &ping);
-
- while (size > 0) {
- int now = MIN(ZERO_BUF_SIZE, size);
- size -= now;
- spice_marshaller_add_ref(item->m, zero_page, now);
- }
-
- reds_push_pipe_item(item);
-
- return TRUE;
-}
-
#ifdef RED_STATISTICS
static void do_ping_client(const char *opt, int has_interval, int interval)
{
- if (!reds->peer) {
+ if (!reds_main_channel_connected()) {
red_printf("not connected to peer");
return;
}
if (!opt) {
- send_ping(0);
+ main_channel_push_ping(reds->main_channel, 0);
} else if (!strcmp(opt, "on")) {
if (has_interval && interval > 0) {
reds->ping_interval = interval * 1000;
@@ -910,7 +678,7 @@ static void do_ping_client(const char *opt, int has_interval, int interval)
static void ping_timer_cb()
{
- if (!reds->peer) {
+ if (!reds_main_channel_connected()) {
red_printf("not connected to peer, ping off");
core->timer_cancel(reds->ping_timer);
return;
@@ -921,27 +689,6 @@ static void ping_timer_cb()
#endif
-static void reds_send_mouse_mode()
-{
- SpiceMsgMainMouseMode mouse_mode;
- RedsOutItem *item;
-
- if (!reds->peer) {
- return;
- }
-
- item = new_out_item(SPICE_MSG_MAIN_MOUSE_MODE);
- mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
- if (reds->is_client_mouse_allowed) {
- mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
- }
- mouse_mode.current_mode = reds->mouse_mode;
-
- spice_marshall_msg_main_mouse_mode(item->m, &mouse_mode);
-
- reds_push_pipe_item(item);
-}
-
int reds_get_mouse_mode(void)
{
return reds->mouse_mode;
@@ -954,7 +701,7 @@ static void reds_set_mouse_mode(uint32_t mode)
}
reds->mouse_mode = mode;
red_dispatcher_set_mouse_mode(reds->mouse_mode);
- reds_send_mouse_mode();
+ main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
}
int reds_get_agent_mouse(void)
@@ -978,26 +725,7 @@ static void reds_update_mouse_mode()
reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
return;
}
- reds_send_mouse_mode();
-}
-
-static void reds_send_agent_connected()
-{
- RedsOutItem *item;
-
- item = new_out_item(SPICE_MSG_MAIN_AGENT_CONNECTED);
- reds_push_pipe_item(item);
-}
-
-static void reds_send_agent_disconnected()
-{
- SpiceMsgMainAgentDisconnect disconnect;
- RedsOutItem *item;
-
- item = new_out_item(SPICE_MSG_MAIN_AGENT_DISCONNECTED);
- disconnect.error_code = SPICE_LINK_ERR_OK;
- spice_marshall_msg_main_agent_disconnected(item->m, &disconnect);
- reds_push_pipe_item(item);
+ main_channel_push_mouse_mode(reds->main_channel, reds->mouse_mode, reds->is_client_mouse_allowed);
}
static void reds_agent_remove()
@@ -1008,7 +736,7 @@ static void reds_agent_remove()
vdagent = NULL;
reds_update_mouse_mode();
- if (!reds->peer || !sin) {
+ if (!reds_main_channel_connected() || !sin) {
return;
}
@@ -1024,27 +752,15 @@ static void reds_agent_remove()
}
reds_reset_vdp();
- reds_send_agent_disconnected();
+ main_channel_push_agent_disconnected(reds->main_channel);
}
-static void reds_send_tokens()
+static void reds_push_tokens()
{
- SpiceMsgMainAgentTokens tokens;
- RedsOutItem *item;
-
- if (!reds->peer) {
- return;
- }
-
- item = new_out_item(SPICE_MSG_MAIN_AGENT_TOKEN);
- tokens.num_tokens = reds->agent_state.num_tokens;
- reds->agent_state.num_client_tokens += tokens.num_tokens;
+ reds->agent_state.num_client_tokens += reds->agent_state.num_tokens;
ASSERT(reds->agent_state.num_client_tokens <= REDS_AGENT_WINDOW_SIZE);
+ main_channel_push_tokens(reds->main_channel, reds->agent_state.num_tokens);
reds->agent_state.num_tokens = 0;
-
- spice_marshall_msg_main_agent_token(item->m, &tokens);
-
- reds_push_pipe_item(item);
}
static int write_to_vdi_port();
@@ -1122,16 +838,12 @@ void vdi_read_buf_release(uint8_t *data, void *opaque)
static void dispatch_vdi_port_data(int port, VDIReadBuf *buf)
{
VDIPortState *state = &reds->agent_state;
- RedsOutItem *item;
switch (port) {
case VDP_CLIENT_PORT: {
if (reds->agent_state.connected) {
- item = new_out_item(SPICE_MSG_MAIN_AGENT_DATA);
-
- spice_marshaller_add_ref_full(item->m, buf->data, buf->len,
- vdi_read_buf_release, buf);
- reds_push_pipe_item(item);
+ main_channel_push_agent_data(reds->main_channel, buf->data, buf->len,
+ vdi_read_buf_release, buf);
} else {
red_printf("throwing away, no client: %d", buf->len);
vdi_read_buf_release(buf->data, buf);
@@ -1281,28 +993,110 @@ static void add_token()
VDIPortState *state = &reds->agent_state;
if (++state->num_tokens == REDS_TOKENS_TO_SEND) {
- reds_send_tokens();
+ reds_push_tokens();
}
}
-typedef struct MainMigrateData {
- uint32_t version;
- uint32_t serial;
- uint32_t ping_id;
+int reds_num_of_channels()
+{
+ return reds ? reds->num_of_channels : 0;
+}
- uint32_t agent_connected;
- uint32_t client_agent_started;
- uint32_t num_client_tokens;
- uint32_t send_tokens;
+void reds_fill_channels(SpiceMsgChannels *channels_info)
+{
+ Channel *channel;
+ int i;
- uint32_t read_state;
- VDIChunkHeader vdi_chunk_header;
- uint32_t recive_len;
- uint32_t message_recive_len;
- uint32_t read_buf_len;
+ channels_info->num_of_channels = reds->num_of_channels;
+ channel = reds->channels;
+ for (i = 0; i < reds->num_of_channels; i++) {
+ ASSERT(channel);
+ channels_info->channels[i].type = channel->type;
+ channels_info->channels[i].id = channel->id;
+ channel = channel->next;
+ }
+}
- uint32_t write_queue_size;
-} MainMigrateData;
+void reds_on_main_agent_start()
+{
+ reds->agent_state.client_agent_started = TRUE;
+}
+
+void reds_on_main_agent_data(void *message, size_t size)
+{
+ RingItem *ring_item;
+ VDAgentExtBuf *buf;
+
+ if (!reds->agent_state.num_client_tokens) {
+ red_printf("token violation");
+ reds_disconnect();
+ return;
+ }
+ --reds->agent_state.num_client_tokens;
+
+ if (!vdagent) {
+ add_token();
+ return;
+ }
+
+ if (!reds->agent_state.client_agent_started) {
+ red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
+ add_token();
+ return;
+ }
+
+ if (size > SPICE_AGENT_MAX_DATA_SIZE) {
+ red_printf("invalid agent message");
+ reds_disconnect();
+ return;
+ }
+
+ if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
+ red_printf("no agent free bufs");
+ reds_disconnect();
+ return;
+ }
+ ring_remove(ring_item);
+ buf = (VDAgentExtBuf *)ring_item;
+ buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
+ buf->base.write_len = size + sizeof(VDIChunkHeader);
+ buf->base.chunk_header.size = size;
+ memcpy(buf->buf, message, size);
+ ring_add(&reds->agent_state.write_queue, ring_item);
+ write_to_vdi_port();
+}
+
+void reds_on_main_migrate_connected()
+{
+ if (reds->mig_wait_connect) {
+ reds_mig_cleanup();
+ }
+}
+
+void reds_on_main_migrate_connect_error()
+{
+ if (reds->mig_wait_connect) {
+ reds_mig_cleanup();
+ }
+}
+
+void reds_on_main_mouse_mode_request(void *message, size_t size)
+{
+ switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
+ case SPICE_MOUSE_MODE_CLIENT:
+ if (reds->is_client_mouse_allowed) {
+ reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
+ } else {
+ red_printf("client mouse is disabled");
+ }
+ break;
+ case SPICE_MOUSE_MODE_SERVER:
+ reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
+ break;
+ default:
+ red_printf("unsupported mouse mode");
+ }
+}
#define MAIN_CHANNEL_MIG_DATA_VERSION 1
@@ -1311,20 +1105,13 @@ typedef struct WriteQueueInfo {
uint32_t len;
} WriteQueueInfo;
-static void main_channel_push_migrate_data_item()
+void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data)
{
- RedsOutItem *item;
- MainMigrateData *data;
VDIPortState *state = &reds->agent_state;
int buf_index;
RingItem *now;
- item = new_out_item(SPICE_MSG_MIGRATE_DATA);
-
- data = (MainMigrateData *)spice_marshaller_reserve_space(item->m, sizeof(MainMigrateData));
data->version = MAIN_CHANNEL_MIG_DATA_VERSION;
- data->serial = reds->serial;
- data->ping_id = reds->ping_id;
data->agent_connected = !!state->connected;
data->client_agent_started = state->client_agent_started;
@@ -1340,7 +1127,7 @@ static void main_channel_push_migrate_data_item()
data->read_buf_len = state->current_read_buf->len;
if (data->read_buf_len - data->recive_len) {
- spice_marshaller_add_ref(item->m,
+ spice_marshaller_add_ref(m,
state->current_read_buf->data,
data->read_buf_len - data->recive_len);
}
@@ -1357,7 +1144,7 @@ static void main_channel_push_migrate_data_item()
WriteQueueInfo *queue_info;
queue_info = (WriteQueueInfo *)
- spice_marshaller_reserve_space(item->m,
+ spice_marshaller_reserve_space(m,
data->write_queue_size * sizeof(queue_info[0]));
buf_index = 0;
@@ -1366,15 +1153,13 @@ static void main_channel_push_migrate_data_item()
VDIPortBuf *buf = (VDIPortBuf *)now;
queue_info[buf_index].port = buf->chunk_header.port;
queue_info[buf_index++].len = buf->write_len;
- spice_marshaller_add_ref(item->m, buf->now, buf->write_len);
+ spice_marshaller_add_ref(m, buf->now, buf->write_len);
}
}
-
- reds_push_pipe_item((RedsOutItem *)item);
}
-static int main_channel_restore_vdi_read_state(MainMigrateData *data, uint8_t **in_pos,
+static int reds_main_channel_restore_vdi_read_state(MainMigrateData *data, uint8_t **in_pos,
uint8_t *end)
{
VDIPortState *state = &reds->agent_state;
@@ -1459,7 +1244,7 @@ static void free_tmp_internal_buf(VDIPortBuf *buf)
free(buf);
}
-static int main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos, uint8_t *end)
+static int reds_main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos, uint8_t *end)
{
VDIPortState *state = &reds->agent_state;
WriteQueueInfo *inf;
@@ -1530,7 +1315,7 @@ static int main_channel_restore_vdi_wqueue(MainMigrateData *data, uint8_t *pos,
return TRUE;
}
-static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end)
+void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
{
VDIPortState *state = &reds->agent_state;
uint8_t *pos;
@@ -1541,9 +1326,6 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
return;
}
- reds->serial = data->serial;
- reds->ping_id = data->ping_id;
-
state->num_client_tokens = data->num_client_tokens;
ASSERT(state->num_client_tokens + data->write_queue_size <= REDS_AGENT_WINDOW_SIZE +
REDS_NUM_INTERNAL_AGENT_MESSAGES);
@@ -1551,19 +1333,19 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
if (!data->agent_connected) {
if (state->connected) {
- reds_send_agent_connected();
+ main_channel_push_agent_connected(reds->main_channel);
}
return;
}
if (!state->connected) {
- reds_send_agent_disconnected();
+ main_channel_push_agent_disconnected(reds->main_channel);
return;
}
if (state->plug_generation > 1) {
- reds_send_agent_disconnected();
- reds_send_agent_connected();
+ main_channel_push_agent_disconnected(reds->main_channel);
+ main_channel_push_agent_connected(reds->main_channel);
return;
}
@@ -1571,245 +1353,15 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
pos = (uint8_t *)(data + 1);
- if (!main_channel_restore_vdi_read_state(data, &pos, end)) {
+ if (!reds_main_channel_restore_vdi_read_state(data, &pos, end)) {
return;
}
- main_channel_restore_vdi_wqueue(data, pos, end);
+ reds_main_channel_restore_vdi_wqueue(data, pos, end);
ASSERT(state->num_client_tokens + state->num_tokens == REDS_AGENT_WINDOW_SIZE);
-}
-
-static void reds_main_handle_message(void *opaque, size_t size, uint32_t type, void *message)
-{
- switch (type) {
- case SPICE_MSGC_MAIN_AGENT_START:
- red_printf("agent start");
- if (!reds->peer) {
- return;
- }
- reds->agent_state.client_agent_started = TRUE;
- break;
- case SPICE_MSGC_MAIN_AGENT_DATA: {
- RingItem *ring_item;
- VDAgentExtBuf *buf;
-
- if (!reds->agent_state.num_client_tokens) {
- red_printf("token violation");
- reds_disconnect();
- break;
- }
- --reds->agent_state.num_client_tokens;
-
- if (!vdagent) {
- add_token();
- break;
- }
-
- if (!reds->agent_state.client_agent_started) {
- red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
- add_token();
- break;
- }
-
- if (size > SPICE_AGENT_MAX_DATA_SIZE) {
- red_printf("invalid agent message");
- reds_disconnect();
- break;
- }
-
- if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
- red_printf("no agent free bufs");
- reds_disconnect();
- break;
- }
- ring_remove(ring_item);
- buf = (VDAgentExtBuf *)ring_item;
- buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
- buf->base.write_len = size + sizeof(VDIChunkHeader);
- buf->base.chunk_header.size = size;
- memcpy(buf->buf, message, size);
- ring_add(&reds->agent_state.write_queue, ring_item);
- write_to_vdi_port();
- break;
- }
- case SPICE_MSGC_MAIN_AGENT_TOKEN:
- break;
- case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
- reds_send_channels();
- break;
- case SPICE_MSGC_MAIN_MIGRATE_CONNECTED:
- red_printf("connected");
- if (reds->mig_wait_connect) {
- reds_mig_cleanup();
- }
- break;
- case SPICE_MSGC_MAIN_MIGRATE_CONNECT_ERROR:
- red_printf("mig connect error");
- if (reds->mig_wait_connect) {
- reds_mig_cleanup();
- }
- break;
- case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST: {
- switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
- case SPICE_MOUSE_MODE_CLIENT:
- if (reds->is_client_mouse_allowed) {
- reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
- } else {
- red_printf("client mouse is disabled");
- }
- break;
- case SPICE_MOUSE_MODE_SERVER:
- reds_set_mouse_mode(SPICE_MOUSE_MODE_SERVER);
- break;
- default:
- red_printf("unsupported mouse mode");
- }
- break;
- }
- case SPICE_MSGC_PONG: {
- SpiceMsgPing *ping = (SpiceMsgPing *)message;
- uint64_t roundtrip;
- struct timespec ts;
-
- clock_gettime(CLOCK_MONOTONIC, &ts);
- roundtrip = ts.tv_sec * 1000000LL + ts.tv_nsec / 1000LL - ping->timestamp;
-
- if (ping->id == reds->net_test_id) {
- switch (reds->net_test_stage) {
- case NET_TEST_STAGE_WARMUP:
- reds->net_test_id++;
- reds->net_test_stage = NET_TEST_STAGE_LATENCY;
- break;
- case NET_TEST_STAGE_LATENCY:
- reds->net_test_id++;
- reds->net_test_stage = NET_TEST_STAGE_RATE;
- latency = roundtrip;
- break;
- case NET_TEST_STAGE_RATE:
- reds->net_test_id = 0;
- if (roundtrip <= latency) {
- // probably high load on client or server result with incorrect values
- latency = 0;
- red_printf("net test: invalid values, latency %lu roundtrip %lu. assuming high"
- "bandwidth", latency, roundtrip);
- break;
- }
- bitrate_per_sec = (uint64_t)(NET_TEST_BYTES * 8) * 1000000 / (roundtrip - latency);
- red_printf("net test: latency %f ms, bitrate %lu bps (%f Mbps)%s",
- (double)latency / 1000,
- bitrate_per_sec,
- (double)bitrate_per_sec / 1024 / 1024,
- IS_LOW_BANDWIDTH() ? " LOW BANDWIDTH" : "");
- reds->net_test_stage = NET_TEST_STAGE_INVALID;
- break;
- default:
- red_printf("invalid net test stage, ping id %d test id %d stage %d",
- ping->id,
- reds->net_test_id,
- reds->net_test_stage);
- }
- break;
- }
-#ifdef RED_STATISTICS
- reds_update_stat_value(&reds->roundtrip_stat, roundtrip);
-#endif
- break;
- }
- case SPICE_MSGC_MIGRATE_FLUSH_MARK:
- main_channel_push_migrate_data_item();
- break;
- case SPICE_MSGC_MIGRATE_DATA:
- main_channel_recive_migrate_data((MainMigrateData *)message,
- ((uint8_t *)message) + size);
- reds->mig_target = FALSE;
- while (write_to_vdi_port() || read_from_vdi_port());
- break;
- case SPICE_MSGC_DISCONNECTING:
- break;
- default:
- red_printf("unexpected type %d", type);
- }
-}
-
-static int reds_send_data()
-{
- RedsOutgoingData *outgoing = &reds->outgoing;
- int n;
-
- if (!outgoing->item) {
- return TRUE;
- }
- ASSERT(outgoing->vec_size);
- for (;;) {
- if ((n = reds->peer->cb_writev(reds->peer->ctx, outgoing->vec, outgoing->vec_size)) == -1) {
- switch (errno) {
- case EAGAIN:
- core->watch_update_mask(reds->peer->watch,
- SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
- return FALSE;
- case EINTR:
- break;
- case EPIPE:
- reds_disconnect();
- return FALSE;
- default:
- red_printf("%s", strerror(errno));
- reds_disconnect();
- return FALSE;
- }
- } else {
- outgoing->vec = reds_iovec_skip(outgoing->vec, n, &outgoing->vec_size);
- if (!outgoing->vec_size) {
- reds_out_item_free(outgoing->item);
- outgoing->item = NULL;
- outgoing->vec = outgoing->vec_buf;
- return TRUE;
- }
- }
- }
-}
-
-static void reds_push()
-{
- RedsOutgoingData *outgoing = &reds->outgoing;
- RingItem *ring_item;
- RedsOutItem *item;
-
- for (;;) {
- if (!reds->peer || outgoing->item || !(ring_item = ring_get_tail(&outgoing->pipe))) {
- return;
- }
- ring_remove(ring_item);
- outgoing->item = item = (RedsOutItem *)ring_item;
-
- spice_marshaller_flush(item->m);
- item->header->size = spice_marshaller_get_total_size(item->m) - sizeof(SpiceDataHeader);
-
- outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
- outgoing->vec_buf,
- REDS_MAX_SEND_IOVEC, 0);
- reds_send_data();
- }
-}
-
-static void reds_main_event(int fd, int event, void *data)
-{
- if (event & SPICE_WATCH_EVENT_READ) {
- if (handle_incoming(reds->peer, &reds->in_handler)) {
- reds_disconnect();
- }
- }
- if (event & SPICE_WATCH_EVENT_WRITE) {
- RedsOutgoingData *outgoing = &reds->outgoing;
- if (reds_send_data()) {
- reds_push();
- if (!outgoing->item && reds->peer) {
- core->watch_update_mask(reds->peer->watch,
- SPICE_WATCH_EVENT_READ);
- }
- }
- }
+ reds->mig_target = FALSE;
+ while (write_to_vdi_port() || read_from_vdi_port());
}
static int sync_write(RedsStreamContext *peer, void *in_buf, size_t n)
@@ -1920,40 +1472,33 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
sync_write(link->peer, &error, sizeof(error));
}
-static void reds_start_net_test()
-{
- if (!reds->peer || reds->net_test_id) {
- return;
- }
-
- if (send_ping(NET_TEST_WARMUP_BYTES) && send_ping(0) && send_ping(NET_TEST_BYTES)) {
- reds->net_test_id = reds->ping_id - 2;
- reds->net_test_stage = NET_TEST_STAGE_WARMUP;
- }
-}
-
+// TODO: now that main is a separate channel this should
+// actually be joined with reds_handle_other_links, ebcome reds_handle_link
static void reds_handle_main_link(RedLinkInfo *link)
{
+ RedsStreamContext *peer;
+ SpiceLinkMess *link_mess;
+ uint32_t *caps;
uint32_t connection_id;
red_printf("");
-
+ link_mess = link->link_mess;
reds_disconnect();
- if (!link->link_mess->connection_id) {
+ if (!link_mess->connection_id) {
reds_send_link_result(link, SPICE_LINK_ERR_OK);
while((connection_id = rand()) == 0);
reds->agent_state.num_tokens = 0;
memcpy(&(reds->taTicket), &taTicket, sizeof(reds->taTicket));
reds->mig_target = FALSE;
} else {
- if (link->link_mess->connection_id != reds->link_id) {
+ if (link_mess->connection_id != reds->link_id) {
reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
reds_release_link(link);
return;
}
reds_send_link_result(link, SPICE_LINK_ERR_OK);
- connection_id = link->link_mess->connection_id;
+ connection_id = link_mess->connection_id;
reds->mig_target = TRUE;
}
@@ -1961,11 +1506,18 @@ static void reds_handle_main_link(RedLinkInfo *link)
reds->mig_inprogress = FALSE;
reds->mig_wait_connect = FALSE;
reds->mig_wait_disconnect = FALSE;
- reds->peer = link->peer;
- reds->in_handler.shut = FALSE;
reds_show_new_channel(link, connection_id);
+ peer = link->peer;
+ link->link_mess = NULL;
__reds_release_link(link);
+ caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
+ reds->main_channel = main_channel_init();
+ reds->main_channel->link(reds->main_channel, peer, reds->mig_target, link_mess->num_common_caps,
+ link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
+ link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
+ free(link_mess);
+
if (vdagent) {
SpiceCharDeviceInterface *sif;
sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
@@ -1975,32 +1527,15 @@ static void reds_handle_main_link(RedLinkInfo *link)
}
reds->agent_state.plug_generation++;
}
- reds->peer->watch = core->watch_add(reds->peer->socket,
- SPICE_WATCH_EVENT_READ,
- reds_main_event, NULL);
if (!reds->mig_target) {
- RedsOutItem *item;
- SpiceMsgMainInit init;
-
- item = new_out_item(SPICE_MSG_MAIN_INIT);
- init.session_id = connection_id;
- init.display_channels_hint = red_dispatcher_count();
- init.current_mouse_mode = reds->mouse_mode;
- init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
- if (reds->is_client_mouse_allowed) {
- init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
- }
- init.agent_connected = !!vdagent;
- init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
reds->agent_state.num_client_tokens = REDS_AGENT_WINDOW_SIZE;
- init.multi_media_time = reds_get_mm_time() - MM_TIME_DELTA;
- init.ram_hint = red_dispatcher_qxl_ram_size();
-
- spice_marshall_msg_main_init(item->m, &init);
+ main_channel_push_init(reds->main_channel, connection_id, red_dispatcher_count(),
+ reds->mouse_mode, reds->is_client_mouse_allowed,
+ reds_get_mm_time() - MM_TIME_DELTA,
+ red_dispatcher_qxl_ram_size());
- reds_push_pipe_item(item);
- reds_start_net_test();
+ main_channel_start_net_test(reds->main_channel);
/* Now that we have a client, forward any pending agent data */
while (read_from_vdi_port());
}
@@ -2064,23 +1599,9 @@ static void reds_handle_other_links(RedLinkInfo *link)
reds_send_link_result(link, SPICE_LINK_ERR_OK);
reds_show_new_channel(link, reds->link_id);
if (link_mess->channel_type == SPICE_CHANNEL_INPUTS && !link->peer->ssl) {
- RedsOutItem *item;
- SpiceMsgNotify notify;
char *mess = "keyboard channel is insecure";
const int mess_len = strlen(mess);
-
- item = new_out_item(SPICE_MSG_NOTIFY);
-
- notify.time_stamp = get_time_stamp();
- notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
- notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
- notify.what = SPICE_WARN_GENERAL;
- notify.message_len = mess_len;
-
- spice_marshall_msg_notify(item->m, ¬ify);
- spice_marshaller_add(item->m, (uint8_t *)mess, mess_len + 1);
-
- reds_push_pipe_item(item);
+ main_channel_push_notify(reds->main_channel, (uint8_t*)mess, mess_len);
}
peer = link->peer;
link->link_mess = NULL;
@@ -2681,9 +2202,7 @@ static void reds_init_ssl()
static void reds_exit()
{
- if (reds->peer) {
- close(reds->peer->socket);
- }
+ main_channel_close(reds->main_channel);
#ifdef RED_STATISTICS
shm_unlink(reds->stat_shm_name);
free(reds->stat_shm_name);
@@ -2725,7 +2244,7 @@ enum {
static inline void on_activating_ticketing()
{
- if (!ticketing_enabled && reds->peer) {
+ if (!ticketing_enabled && reds_main_channel_connected()) {
red_printf("disconnecting");
reds_disconnect();
}
@@ -2778,7 +2297,7 @@ typedef struct RedsMigCertPubKeyInfo {
uint32_t len;
} RedsMigCertPubKeyInfo;
-static void reds_mig_release(void)
+void reds_mig_release(void)
{
if (reds->mig_spice) {
free(reds->mig_spice->cert_subject);
@@ -2791,22 +2310,10 @@ static void reds_mig_release(void)
static void reds_mig_continue(void)
{
RedsMigSpice *s = reds->mig_spice;
- SpiceMsgMainMigrationBegin migrate;
- RedsOutItem *item;
red_printf("");
- item = new_out_item(SPICE_MSG_MAIN_MIGRATE_BEGIN);
-
- migrate.port = s->port;
- migrate.sport = s->sport;
- migrate.host_size = strlen(s->host) + 1;
- migrate.host_data = (uint8_t *)s->host;
- migrate.pub_key_type = s->cert_pub_key_type;
- migrate.pub_key_size = s->cert_pub_key_len;
- migrate.pub_key_data = s->cert_pub_key;
- spice_marshall_msg_main_migrate_begin(item->m, &migrate);
-
- reds_push_pipe_item(item);
+ main_channel_push_migrate_begin(reds->main_channel, s->port, s->sport,
+ s->host, s->cert_pub_key_type, s->cert_pub_key_len, s->cert_pub_key);
reds_mig_release();
@@ -2828,7 +2335,7 @@ static void reds_mig_started(void)
core->watch_update_mask(reds->secure_listen_watch, 0);
}
- if (reds->peer == NULL) {
+ if (!reds_main_channel_connected()) {
red_printf("not connected to peer");
goto error;
}
@@ -2849,8 +2356,6 @@ error:
static void reds_mig_finished(int completed)
{
- RedsOutItem *item;
-
red_printf("");
if (reds->listen_watch != NULL) {
core->watch_update_mask(reds->listen_watch, SPICE_WATCH_EVENT_READ);
@@ -2860,7 +2365,7 @@ static void reds_mig_finished(int completed)
core->watch_update_mask(reds->secure_listen_watch, SPICE_WATCH_EVENT_READ);
}
- if (reds->peer == NULL) {
+ if (!reds_main_channel_connected()) {
red_printf("no peer connected");
return;
}
@@ -2868,53 +2373,45 @@ static void reds_mig_finished(int completed)
if (completed) {
Channel *channel;
- SpiceMsgMigrate migrate;
reds->mig_wait_disconnect = TRUE;
core->timer_start(reds->mig_timer, MIGRATE_TIMEOUT);
- item = new_out_item(SPICE_MSG_MIGRATE);
- migrate.flags = SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
- spice_marshall_msg_migrate(item->m, &migrate);
-
- reds_push_pipe_item(item);
+ // TODO: so now that main channel is separate, how exactly does migration of it work?
+ // - it can have an empty migrate - that seems ok
+ // - I can try to fill it's migrate, then move stuff from reds.c there, but a lot of data
+ // is in reds state right now.
+ main_channel_push_migrate(reds->main_channel);
channel = reds->channels;
while (channel) {
channel->migrate(channel);
channel = channel->next;
}
} else {
- item = new_out_item(SPICE_MSG_MAIN_MIGRATE_CANCEL);
- reds_push_pipe_item(item);
+ main_channel_push_migrate_cancel(reds->main_channel);
reds_mig_cleanup();
}
}
-static void reds_mig_switch(void)
+void reds_mig_switch(void)
{
- RedsMigSpice *s = reds->mig_spice;
- SpiceMsgMainMigrationSwitchHost migrate;
- RedsOutItem *item;
-
- red_printf("");
- item = new_out_item(SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+ main_channel_push_migrate_switch(reds->main_channel);
+}
- migrate.port = s->port;
- migrate.sport = s->sport;
- migrate.host_size = strlen(s->host) + 1;
- migrate.host_data = (uint8_t *)s->host;
+void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate)
+{
+ RedsMigSpice *s = reds->mig_spice;
+ migrate->port = s->port;
+ migrate->sport = s->sport;
+ migrate->host_size = strlen(s->host) + 1;
+ migrate->host_data = (uint8_t *)s->host;
if (s->cert_subject) {
- migrate.cert_subject_size = strlen(s->cert_subject) + 1;
- migrate.cert_subject_data = (uint8_t *)s->cert_subject;
+ migrate->cert_subject_size = strlen(s->cert_subject) + 1;
+ migrate->cert_subject_data = (uint8_t *)s->cert_subject;
} else {
- migrate.cert_subject_size = 0;
- migrate.cert_subject_data = NULL;
+ migrate->cert_subject_size = 0;
+ migrate->cert_subject_data = NULL;
}
- spice_marshall_msg_main_migrate_switch_host(item->m, &migrate);
-
- reds_push_pipe_item(item);
-
- reds_mig_release();
}
static void migrate_timout(void *opaque)
@@ -2938,18 +2435,11 @@ void reds_update_mm_timer(uint32_t mm_time)
void reds_enable_mm_timer()
{
- SpiceMsgMainMultiMediaTime time_mes;
- RedsOutItem *item;
-
core->timer_start(reds->mm_timer, MM_TIMER_GRANULARITY_MS);
- if (!reds->peer) {
+ if (!reds_main_channel_connected()) {
return;
}
-
- item = new_out_item(SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
- time_mes.time = reds_get_mm_time() - MM_TIME_DELTA;
- spice_marshall_msg_main_multi_media_time(item->m, &time_mes);
- reds_push_pipe_item(item);
+ main_channel_push_multi_media_time(reds->main_channel, reds_get_mm_time() - MM_TIME_DELTA);
}
void reds_desable_mm_timer()
@@ -2970,7 +2460,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
vdagent = sin;
reds_update_mouse_mode();
- if (!reds->peer) {
+ if (!reds_main_channel_connected()) {
return;
}
sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
@@ -2984,7 +2474,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
return;
}
- reds_send_agent_connected();
+ main_channel_push_agent_connected(reds->main_channel);
}
__visible__ void spice_server_char_device_wakeup(SpiceCharDeviceInstance* sin)
@@ -3260,12 +2750,6 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
core = core_interface;
reds->listen_socket = -1;
reds->secure_listen_socket = -1;
- reds->peer = NULL;
- reds->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
- reds->in_handler.handle_message = reds_main_handle_message;
- ring_init(&reds->outgoing.pipe);
- reds->outgoing.vec = reds->outgoing.vec_buf;
-
init_vd_agent_resources();
if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
@@ -3317,6 +2801,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
if (reds->secure_listen_socket != -1) {
reds_init_ssl();
}
+ reds->main_channel = NULL;
inputs_init();
#ifdef USE_SMARTCARD
@@ -3416,7 +2901,7 @@ __visible__ int spice_server_set_ticket(SpiceServer *s,
{
ASSERT(reds == s);
- if (reds->peer) {
+ if (reds_main_channel_connected()) {
if (fail_if_connected) {
return -1;
}
@@ -3549,10 +3034,7 @@ __visible__ int spice_server_set_channel_security(SpiceServer *s, const char *ch
__visible__ int spice_server_get_sock_info(SpiceServer *s, struct sockaddr *sa, socklen_t *salen)
{
ASSERT(reds == s);
- if (!reds->peer) {
- return -1;
- }
- if (getsockname(reds->peer->socket, sa, salen) < 0) {
+ if (main_channel_getsockname(reds->main_channel, sa, salen) < 0) {
return -1;
}
return 0;
@@ -3561,10 +3043,7 @@ __visible__ int spice_server_get_sock_info(SpiceServer *s, struct sockaddr *sa,
__visible__ int spice_server_get_peer_info(SpiceServer *s, struct sockaddr *sa, socklen_t *salen)
{
ASSERT(reds == s);
- if (!reds->peer) {
- return -1;
- }
- if (getpeername(reds->peer->socket, sa, salen) < 0) {
+ if (main_channel_getpeername(reds->main_channel, sa, salen) < 0) {
return -1;
}
return 0;
@@ -3658,7 +3137,7 @@ __visible__ int spice_server_migrate_client_state(SpiceServer *s)
{
ASSERT(reds == s);
- if (!reds->peer) {
+ if (!reds_main_channel_connected()) {
return SPICE_MIGRATE_CLIENT_NONE;
} else if (reds->mig_wait_connect) {
return SPICE_MIGRATE_CLIENT_WAITING;
diff --git a/server/reds.h b/server/reds.h
index e440804..e1a5ab7 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -22,6 +22,9 @@
#include <openssl/ssl.h>
#include <sys/uio.h>
#include <spice/vd_agent.h>
+#include "common/marshaller.h"
+#include "common/messages.h"
+#include "spice.h"
#define __visible__ __attribute__ ((visibility ("default")))
@@ -92,5 +95,24 @@ extern uint64_t bitrate_per_sec;
// Temporary measures to make splitting reds.c to inputs_channel.c easier
void reds_disconnect(void);
+// Temporary (?) for splitting main channel
+typedef struct MainMigrateData MainMigrateData;
+void reds_push_migrate_data_item(SpiceMarshaller *m, MainMigrateData *data);
+void reds_fill_channels(SpiceMsgChannels *channels_info);
+void reds_fill_mig_switch(SpiceMsgMainMigrationSwitchHost *migrate);
+void reds_mig_release(void);
+int reds_num_of_channels(void);
+#ifdef RED_STATISTICS
+void reds_update_stat_value(uint32_t value);
+#endif
+
+// callbacks from main channel messages
+void reds_on_main_agent_start();
+void reds_on_main_agent_data(void *message, size_t size);
+void reds_on_main_migrate_connected();
+void reds_on_main_migrate_connect_error();
+void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end);
+void reds_on_main_mouse_mode_request(void *message, size_t size);
+
#endif
--
1.7.3.4
More information about the Spice-devel
mailing list