[Spice-devel] [RFCv5 12/47] server: Add RedClient

Alon Levy alevy at redhat.com
Sun May 8 06:11:08 PDT 2011


That means RedClient tracks a ring of channels. Right now there will be only
a single client because of the disconnection mechanism - whenever a new
client comes we disconnect all existing clients. But this patch adds already
a ring of clients to reds.c (stored in RedServer).

There is a known problem handling many connections and disconnections at the
same time, trigerrable easily by the following script:

export NEW_DISPLAY=:3.0

Xephyr $NEW_DISPLAY -noreset &
for ((i = 0 ; i < 5; ++i)); do
    for ((j = 0 ; j < 10; ++j)); do
        DISPLAY=$NEW_DISPLAY c_win7x86_qxl_tests &
    done
    sleep 2;
done

I fixed a few of the problems resulting from this in the same patch. This
required already introducing a few other changes:
 * make sure all removal of channels happens in the main thread, for that
 two additional dispatcher calls are added to remove a specific channel
 client (RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT and
 RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT).
 * change some asserts in input channel.
 * make main channel disconnect not recursive
 * introduce disconnect call back to red_channel_create_parser

The remaining abort is from a double free in the main channel, still can't
find it (doesn't happen when running under valgrind - probably due to the
slowness resulting from that), but is easy to see when running under gdb.
---
 server/inputs_channel.c    |   59 ++++++++++--------
 server/main_channel.c      |   14 +++-
 server/main_channel.h      |    3 +-
 server/red_channel.c       |   95 ++++++++++++++++++++++++++---
 server/red_channel.h       |   25 +++++++-
 server/red_dispatcher.c    |   39 +++++++++++-
 server/red_dispatcher.h    |    7 ++-
 server/red_tunnel_worker.c |   14 ++--
 server/red_worker.c        |  147 +++++++++++++++++++++++++++++++++-----------
 server/red_worker.h        |    2 +
 server/reds.c              |   60 ++++++++++++------
 server/reds.h              |    6 +-
 server/smartcard.c         |    4 +-
 server/snd_worker.c        |   12 ++--
 14 files changed, 370 insertions(+), 117 deletions(-)

diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index e350689..0fd4bd6 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -429,10 +429,16 @@ static void inputs_relase_keys(void)
     kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
 }
 
-static void inputs_channel_on_error(RedChannelClient *rcc)
+static void inputs_channel_disconnect(RedChannelClient *rcc)
 {
     inputs_relase_keys();
-    red_channel_client_destroy(rcc);
+    red_channel_client_disconnect(rcc);
+}
+
+static void inputs_channel_on_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    inputs_channel_disconnect(rcc);
 }
 
 static void inputs_shutdown(Channel *channel)
@@ -491,36 +497,39 @@ static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
 {
 }
 
-static void inputs_link(Channel *channel, RedsStream *stream, int migration,
+static void inputs_link(Channel *channel, RedClient *client, RedsStream *stream, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
-    InputsChannel *inputs_channel;
     RedChannelClient *rcc;
 
-    ASSERT(channel->data == NULL);
-
-    red_printf("input channel create");
-    g_inputs_channel = inputs_channel = (InputsChannel*)red_channel_create_parser(
-        sizeof(*inputs_channel), core, migration, FALSE /* handle_acks */
-        ,inputs_channel_config_socket
-        ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
-        ,inputs_channel_handle_parsed
-        ,inputs_channel_alloc_msg_rcv_buf
-        ,inputs_channel_release_msg_rcv_buf
-        ,inputs_channel_hold_pipe_item
-        ,inputs_channel_send_item
-        ,inputs_channel_release_pipe_item
-        ,inputs_channel_on_error
-        ,inputs_channel_on_error
-        ,NULL
-        ,NULL
-        ,NULL);
-    ASSERT(inputs_channel);
+    ASSERT(channel->data == g_inputs_channel);
+
+    if (channel->data == NULL) {
+        red_printf("input channel create");
+        g_inputs_channel = (InputsChannel*)red_channel_create_parser(
+            sizeof(InputsChannel), core, migration, FALSE /* handle_acks */
+            ,inputs_channel_config_socket
+            ,inputs_channel_disconnect
+            ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
+            ,inputs_channel_handle_parsed
+            ,inputs_channel_alloc_msg_rcv_buf
+            ,inputs_channel_release_msg_rcv_buf
+            ,inputs_channel_hold_pipe_item
+            ,inputs_channel_send_item
+            ,inputs_channel_release_pipe_item
+            ,inputs_channel_on_error
+            ,inputs_channel_on_error
+            ,NULL
+            ,NULL
+            ,NULL);
+    }
+    channel->data = g_inputs_channel;
+    ASSERT(g_inputs_channel);
     red_printf("input channel client create");
-    rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base, stream);
+    rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base,
+                                    client, stream);
     ASSERT(rcc);
-    channel->data = inputs_channel;
     inputs_pipe_add_init(rcc);
 }
 
diff --git a/server/main_channel.c b/server/main_channel.c
index a7fca14..9c07abd 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -139,6 +139,11 @@ enum NetTestStage {
 static uint64_t latency = 0;
 uint64_t bitrate_per_sec = ~0;
 
+static void main_channel_client_disconnect(RedChannelClient *rcc)
+{
+    red_channel_client_disconnect(rcc);
+}
+
 static void main_disconnect(MainChannel *main_chan)
 {
     red_channel_destroy(&main_chan->base);
@@ -788,7 +793,7 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
 
 static void main_channel_on_error(RedChannelClient *rcc)
 {
-    reds_disconnect();
+    reds_client_disconnect(rcc->client);
 }
 
 static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
@@ -819,19 +824,20 @@ static int main_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
     return TRUE;
 }
 
-MainChannelClient *main_channel_link(Channel *channel, RedsStream *stream, int migration,
+MainChannelClient *main_channel_link(Channel *channel, RedClient *client,
+                        RedsStream *stream, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
     MainChannel *main_chan;
     MainChannelClient *mcc;
 
-    ASSERT(channel->data == NULL);
     if (channel->data == NULL) {
         red_printf("create main channel");
         channel->data = red_channel_create_parser(
             sizeof(*main_chan), core, migration, FALSE /* handle_acks */
             ,main_channel_config_socket
+            ,main_channel_client_disconnect
             ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
             ,main_channel_handle_parsed
             ,main_channel_alloc_msg_rcv_buf
@@ -849,7 +855,7 @@ MainChannelClient *main_channel_link(Channel *channel, RedsStream *stream, int m
     main_chan = (MainChannel*)channel->data;
     red_printf("add main channel client");
     mcc = (MainChannelClient*)
-            red_channel_client_create(sizeof(MainChannelClient), &main_chan->base, stream);
+            red_channel_client_create(sizeof(MainChannelClient), &main_chan->base, client, stream);
     return mcc;
 }
 
diff --git a/server/main_channel.h b/server/main_channel.h
index f715b43..ed17d53 100644
--- a/server/main_channel.h
+++ b/server/main_channel.h
@@ -45,11 +45,10 @@ struct MainMigrateData {
 };
 
 typedef struct MainChannel MainChannel;
-typedef struct MainChannelClient MainChannelClient;
 
 Channel *main_channel_init(void);
 /* This is a 'clone' from the reds.h Channel.link callback */
-MainChannelClient *main_channel_link(struct Channel *,
+MainChannelClient *main_channel_link(struct Channel *, RedClient *client,
                  RedsStream *stream, int migration, int num_common_caps,
                  uint32_t *common_caps, int num_caps, uint32_t *caps);
 void main_channel_close(MainChannel *main_chan); // not destroy, just socket close
diff --git a/server/red_channel.c b/server/red_channel.c
index 60b6d89..d950e59 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -34,6 +34,7 @@
 #include "generated_marshallers.h"
 
 static void red_channel_client_event(int fd, int event, void *data);
+static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
 
 /* return the number of bytes read. -1 in case of error */
 static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -356,11 +357,13 @@ static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
 {
     ASSERT(rcc && !channel->rcc);
 	channel->rcc = rcc;
+    channel->clients_num++;
 }
 
 RedChannelClient *red_channel_client_create(
     int size,
     RedChannel *channel,
+    RedClient  *client,
     RedsStream *stream)
 {
     RedChannelClient *rcc;
@@ -369,6 +372,7 @@ RedChannelClient *red_channel_client_create(
     rcc = spice_malloc0(size);
     rcc->stream = stream;
     rcc->channel = channel;
+    rcc->client = client;
     rcc->ack_data.messages_window = ~0;  // blocks send message (maybe use send_data.blocked +
                                              // block flags)
     rcc->ack_data.client_generation = ~0;
@@ -392,8 +396,9 @@ RedChannelClient *red_channel_client_create(
     stream->watch = channel->core->watch_add(stream->socket,
                                            SPICE_WATCH_EVENT_READ,
                                            red_channel_client_event, rcc);
-    rcc->id = 0;
+    rcc->id = channel->clients_num;
     red_channel_add_client(channel, rcc);
+    red_client_add_channel(client, rcc);
     return rcc;
 error:
     free(rcc);
@@ -453,10 +458,6 @@ RedChannel *red_channel_create(int size,
     return channel;
 }
 
-static void do_nothing_disconnect(RedChannelClient *rcc)
-{
-}
-
 static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
 {
     return TRUE;
@@ -466,6 +467,7 @@ RedChannel *red_channel_create_parser(int size,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
+                               channel_disconnect_proc disconnect,
                                spice_parse_channel_func_t parser,
                                channel_handle_parsed_proc handle_parsed,
                                channel_alloc_msg_recv_buf_proc alloc_recv_buf,
@@ -480,7 +482,7 @@ RedChannel *red_channel_create_parser(int size,
                                channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
 {
     RedChannel *channel = red_channel_create(size,
-        core, migrate, handle_acks, config_socket, do_nothing_disconnect,
+        core, migrate, handle_acks, config_socket, disconnect,
         do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item,
         send_item, release_item, handle_migrate_flush_mark, handle_migrate_data,
         handle_migrate_data_get_serial);
@@ -516,7 +518,7 @@ void red_channel_destroy(RedChannel *channel)
     free(channel);
 }
 
-static void red_channel_client_shutdown(RedChannelClient *rcc)
+void red_channel_client_shutdown(RedChannelClient *rcc)
 {
     if (rcc->stream && !rcc->stream->shutdown) {
         rcc->channel->core->watch_remove(rcc->stream->watch);
@@ -862,6 +864,15 @@ void red_channel_ack_set_client_window(RedChannel* channel, int client_window)
     }
 }
 
+static void red_channel_client_remove(RedChannelClient *rcc)
+{
+    ring_remove(&rcc->client_link);
+    rcc->client->channels_num--;
+    ASSERT(rcc->channel->rcc == rcc);
+    rcc->channel->rcc = NULL;
+    rcc->channel->clients_num--;
+}
+
 void red_channel_client_disconnect(RedChannelClient *rcc)
 {
     red_printf("%p (channel %p)", rcc, rcc->channel);
@@ -874,7 +885,7 @@ void red_channel_client_disconnect(RedChannelClient *rcc)
     rcc->send_data.item = NULL;
     rcc->send_data.blocked = FALSE;
     rcc->send_data.size = 0;
-    rcc->channel->rcc = NULL;
+    red_channel_client_remove(rcc);
 }
 
 void red_channel_disconnect(RedChannel *channel)
@@ -982,3 +993,71 @@ void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
     red_channel_client_pipe_remove(rcc, item);
     red_channel_client_release_item(rcc, item, FALSE);
 }
+
+/*
+ * RedClient implementation - kept in red_channel.c because they are
+ * pretty tied together.
+ */
+
+RedClient *red_client_new()
+{
+    RedClient *client;
+
+    client = spice_malloc0(sizeof(RedClient));
+    ring_init(&client->channels);
+    return client;
+}
+
+void red_client_shutdown(RedClient *client)
+{
+    RingItem *link, *next;
+
+    red_printf("#channels %d", client->channels_num);
+    RING_FOREACH_SAFE(link, next, &client->channels) {
+        red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link));
+    }
+}
+
+void red_client_destroy(RedClient *client)
+{
+    RingItem *link, *next;
+    RedChannelClient *rcc;
+
+    red_printf("destroy client with #channels %d", client->channels_num);
+    RING_FOREACH_SAFE(link, next, &client->channels) {
+        // some channels may be in other threads, so disconnection
+        // is not synchronous.
+        rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+        rcc->channel->disconnect(rcc); // this may call another thread. it also frees. (eventually - doesn't have to be in sync)
+    }
+    free(client);
+}
+
+void red_client_disconnect(RedClient *client)
+{
+    RingItem *link, *next;
+    RedChannelClient *rcc;
+
+    red_printf("#channels %d", client->channels_num);
+    RING_FOREACH_SAFE(link, next, &client->channels) {
+        // some channels may be in other threads, so disconnection
+        // is not synchronous.
+        rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+        rcc->channel->disconnect(rcc);
+    }
+}
+
+static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
+{
+    ASSERT(rcc && client);
+    ring_add(&client->channels, &rcc->client_link);
+    client->channels_num++;
+}
+
+MainChannelClient *red_client_get_main(RedClient *client) {
+    return client->mcc;
+}
+
+void red_client_set_main(RedClient *client, MainChannelClient *mcc) {
+    client->mcc = mcc;
+}
diff --git a/server/red_channel.h b/server/red_channel.h
index 2bd3054..874fdf0 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -140,7 +140,9 @@ typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannelClient
 
 struct RedChannelClient {
     RingItem channel_link;
+    RingItem client_link;
     RedChannel *channel;
+    RedClient  *client;
     RedsStream *stream;
     struct {
         uint32_t generation;
@@ -172,6 +174,7 @@ struct RedChannel {
     int handle_acks;
 
     RedChannelClient *rcc;
+    uint32_t clients_num;
 
     OutgoingHandlerInterface outgoing_cb;
     IncomingHandlerInterface incoming_cb;
@@ -219,6 +222,7 @@ RedChannel *red_channel_create_parser(int size,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
+                               channel_disconnect_proc disconnect,
                                spice_parse_channel_func_t parser,
                                channel_handle_parsed_proc handle_parsed,
                                channel_alloc_msg_recv_buf_proc alloc_recv_buf,
@@ -231,13 +235,19 @@ RedChannel *red_channel_create_parser(int size,
                                channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
                                channel_handle_migrate_data_proc handle_migrate_data,
                                channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial);
-RedChannelClient *red_channel_client_create(int size, RedChannel *channel,
+RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
                                             RedsStream *stream);
 int red_channel_is_connected(RedChannel *channel);
 
 void red_channel_client_destroy(RedChannelClient *rcc);
 void red_channel_destroy(RedChannel *channel);
 
+/* shutdown is the only safe thing to do out of the client/channel
+ * thread. It will not touch the rings, just shutdown the socket.
+ * It should be followed by some way to gurantee a disconnection. */
+void red_channel_client_shutdown(RedChannelClient *rcc);
+void red_channel_shutdown(RedChannel *channel);
+
 /* should be called when a new channel is ready to send messages */
 void red_channel_init_outgoing_messages_window(RedChannel *channel);
 
@@ -350,4 +360,17 @@ typedef void (*channel_client_visitor_data)(RedChannelClient *rcc, void *data);
 void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v);
 void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data);
 
+struct RedClient {
+    RingItem link;
+    Ring channels;
+    int channels_num;
+    int disconnecting;
+    MainChannelClient *mcc;
+};
+
+RedClient *red_client_new();
+void red_client_destroy(RedClient *client);
+void red_client_set_main(RedClient *client, MainChannelClient *mcc);
+MainChannelClient *red_client_get_main(RedClient *client);
+
 #endif
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 56446ab..5463053 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -74,7 +74,8 @@ extern spice_wan_compression_t zlib_glz_state;
 
 static RedDispatcher *dispatchers = NULL;
 
-static void red_dispatcher_set_peer(Channel *channel, RedsStream *stream, int migration,
+static void red_dispatcher_set_peer(Channel *channel, RedClient *client,
+                                    RedsStream *stream, int migration,
                                     int num_common_caps, uint32_t *common_caps, int num_caps,
                                     uint32_t *caps)
 {
@@ -84,6 +85,7 @@ static void red_dispatcher_set_peer(Channel *channel, RedsStream *stream, int mi
     dispatcher = (RedDispatcher *)channel->data;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
     write_message(dispatcher->channel, &message);
+    send_data(dispatcher->channel, &client, sizeof(RedClient *));
     send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
     send_data(dispatcher->channel, &migration, sizeof(int));
 }
@@ -104,7 +106,7 @@ static void red_dispatcher_migrate(Channel *channel)
     write_message(dispatcher->channel, &message);
 }
 
-static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *stream,
+static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, RedsStream *stream,
                                            int migration, int num_common_caps,
                                            uint32_t *common_caps, int num_caps,
                                            uint32_t *caps)
@@ -113,6 +115,7 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *stream,
     red_printf("");
     RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
     write_message(dispatcher->channel, &message);
+    send_data(dispatcher->channel, &client, sizeof(RedClient *));
     send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
     send_data(dispatcher->channel, &migration, sizeof(int));
 }
@@ -388,9 +391,37 @@ static void qxl_worker_stop(QXLWorker *qxl_worker)
     ASSERT(message == RED_WORKER_MESSAGE_READY);
 }
 
+static void red_dispatcher_send_disconnect(RedDispatcher *dispatcher,
+                    struct RedChannelClient *rcc, RedWorkerMessage message)
+{
+    write_message(dispatcher->channel, &message);
+    send_data(dispatcher->channel, &rcc, sizeof(struct RedChannelClient *));
+}
+
+void red_dispatcher_disconnect_display_client(RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc)
+{
+    RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+
+    red_dispatcher_send_disconnect(dispatcher, rcc,
+            RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT);
+    read_message(dispatcher->channel, &message);
+    ASSERT(message == RED_WORKER_MESSAGE_READY);
+}
+
+void red_dispatcher_disconnect_cursor_client(RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc)
+{
+    RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+
+    red_dispatcher_send_disconnect(dispatcher, rcc,
+            RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT);
+    read_message(dispatcher->channel, &message);
+    ASSERT(message == RED_WORKER_MESSAGE_READY);
+}
+
 static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
-                                       struct QXLCommandExt *ext,
-                                       uint32_t count)
+                                struct QXLCommandExt *ext, uint32_t count)
 {
     RedDispatcher *dispatcher = (RedDispatcher *)qxl_worker;
     RedWorkerMessage message = RED_WORKER_MESSAGE_LOADVM_COMMANDS;
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index 3f3c1ae..dc711db 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -18,6 +18,7 @@
 #ifndef _H_RED_DISPATCHER
 #define _H_RED_DISPATCHER
 
+struct RedChannelClient;
 
 struct RedDispatcher *red_dispatcher_init(QXLInstance *qxl);
 
@@ -29,5 +30,9 @@ int red_dispatcher_count(void);
 int red_dispatcher_add_renderer(const char *name);
 uint32_t red_dispatcher_qxl_ram_size(void);
 int red_dispatcher_qxl_count(void);
-#endif
+void red_dispatcher_disconnect_display_client(struct RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc);
+void red_dispatcher_disconnect_cursor_client(struct RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc);
 
+#endif
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
index c18c773..a07cf49 100644
--- a/server/red_tunnel_worker.c
+++ b/server/red_tunnel_worker.c
@@ -602,9 +602,9 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
 
 
 /* reds interface */
-static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration,
-                                       int num_common_caps, uint32_t *common_caps, int num_caps,
-                                       uint32_t *caps);
+static void handle_tunnel_channel_link(Channel *channel, RedClient *client, RedsStream *stream,
+                                       int migration, int num_common_caps, uint32_t *common_caps,
+                                       int num_caps, uint32_t *caps);
 static void handle_tunnel_channel_shutdown(struct Channel *channel);
 static void handle_tunnel_channel_migrate(struct Channel *channel);
 
@@ -3434,9 +3434,9 @@ static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
 {
 }
 
-static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration,
-                                       int num_common_caps, uint32_t *common_caps, int num_caps,
-                                       uint32_t *caps)
+static void handle_tunnel_channel_link(Channel *channel, RedClient *client, RedsStream *stream,
+                                       int migration, int num_common_caps, uint32_t *common_caps,
+                                       int num_caps, uint32_t *caps)
 {
     TunnelChannel *tunnel_channel;
     TunnelWorker *worker = (TunnelWorker *)channel->data;
@@ -3462,7 +3462,7 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int
     if (!tunnel_channel) {
         return;
     }
-    red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, stream);
+    red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, client, stream);
 
     tunnel_channel->worker = worker;
     tunnel_channel->worker->channel = tunnel_channel;
diff --git a/server/red_worker.c b/server/red_worker.c
index 1379b51..5a86f0c 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -61,6 +61,7 @@
 #include "generated_marshallers.h"
 #include "zlib_encoder.h"
 #include "red_channel.h"
+#include "red_dispatcher.h"
 
 //#define COMPRESS_STAT
 //#define DUMP_BITMAP
@@ -9026,6 +9027,7 @@ static void free_common_channel_from_listener(EventListener *ctx)
 
     free(common);
 }
+
 static void worker_watch_update_mask(SpiceWatch *watch, int event_mask)
 {
 }
@@ -9046,13 +9048,15 @@ SpiceCoreInterface worker_core = {
 };
 
 static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
-                                 RedsStream *stream, int migrate,
+                                 RedClient *client, RedsStream *stream, int migrate,
                                  event_listener_action_proc handler,
                                  channel_disconnect_proc disconnect,
                                  channel_send_pipe_item_proc send_item,
                                  channel_hold_pipe_item_proc hold_item,
                                  channel_release_pipe_item_proc release_item,
                                  channel_handle_parsed_proc handle_parsed,
+                                 channel_on_incoming_error_proc on_incoming_error,
+                                 channel_on_outgoing_error_proc on_outgoing_error,
                                  channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
                                  channel_handle_migrate_data_proc handle_migrate_data,
                                  channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
@@ -9064,6 +9068,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
     channel = red_channel_create_parser(size, &worker_core, migrate,
                                         TRUE /* handle_acks */,
                                         common_channel_config_socket,
+                                        disconnect,
                                         spice_get_client_channel_parser(channel_id, NULL),
                                         handle_parsed,
                                         common_alloc_recv_buf,
@@ -9071,8 +9076,8 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
                                         hold_item,
                                         send_item,
                                         release_item,
-                                        red_channel_client_default_peer_on_error,
-                                        red_channel_client_default_peer_on_error,
+                                        on_incoming_error,
+                                        on_outgoing_error,
                                         handle_migrate_flush_mark,
                                         handle_migrate_data,
                                         handle_migrate_data_get_serial);
@@ -9080,7 +9085,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
     if (!channel) {
         goto error;
     }
-    red_channel_client_create(sizeof(RedChannelClient), channel, stream);
+    red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
     common->id = worker->id;
     common->listener.refs = 1;
     common->listener.action = handler;
@@ -9171,25 +9176,72 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item,
     }
 }
 
-static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, int migrate)
+static void display_channel_on_incoming_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+static void display_channel_on_outgoing_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+static void cursor_channel_on_incoming_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+static void cursor_channel_on_outgoing_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+// call this from dispatcher thread context
+static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc)
+{
+    RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker;
+    struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
+
+    red_printf("");
+    red_dispatcher_disconnect_display_client(dispatcher, rcc);
+}
+
+// call this from dispatcher thread context
+static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc)
+{
+    RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker;
+    struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
+
+    red_printf("");
+    red_dispatcher_disconnect_cursor_client(dispatcher, rcc);
+}
+
+static void handle_new_display_channel(RedWorker *worker, RedClient *client, RedsStream *stream,
+                                       int migrate)
 {
     DisplayChannel *display_channel;
     size_t stream_buf_size;
 
     red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel);
 
-    if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel),
-                                                            SPICE_CHANNEL_DISPLAY, stream,
-                                                            migrate, handle_channel_events,
-                                                            red_disconnect_display,
-                                                            display_channel_send_item,
-                                                            display_channel_hold_pipe_item,
-                                                            display_channel_release_item,
-                                                            display_channel_handle_message,
-                                                            display_channel_handle_migrate_mark,
-                                                            display_channel_handle_migrate_data,
-                                                            display_channel_handle_migrate_data_get_serial
-                                                            ))) {
+    if (!(display_channel = (DisplayChannel *)__new_channel(
+            worker, sizeof(*display_channel),
+            SPICE_CHANNEL_DISPLAY, client, stream,
+            migrate, handle_channel_events,
+            dispatch_display_channel_client_disconnect,
+            display_channel_send_item,
+            display_channel_hold_pipe_item,
+            display_channel_release_item,
+            display_channel_handle_message,
+            display_channel_on_incoming_error,
+            display_channel_on_outgoing_error,
+            display_channel_handle_migrate_mark,
+            display_channel_handle_migrate_data,
+            display_channel_handle_migrate_data_get_serial))) {
         return;
     }
 #ifdef RED_STATISTICS
@@ -9254,11 +9306,6 @@ static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, in
     stat_compress_init(&display_channel->jpeg_alpha_stat, jpeg_alpha_stat_name);
 }
 
-static void red_disconnect_cursor_client(RedChannelClient *rcc)
-{
-    red_disconnect_cursor(rcc->channel);
-}
-
 static void red_disconnect_cursor(RedChannel *channel)
 {
     CommonChannel *common;
@@ -9315,23 +9362,27 @@ static void cursor_channel_release_item(RedChannelClient *rcc, PipeItem *item, i
     }
 }
 
-static void red_connect_cursor(RedWorker *worker, RedsStream *stream, int migrate)
+static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream *stream,
+                               int migrate)
 {
     CursorChannel *channel;
 
     red_disconnect_cursor((RedChannel *)worker->cursor_channel);
 
-    if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel),
-                                                   SPICE_CHANNEL_CURSOR, stream, migrate,
-                                                   handle_channel_events,
-                                                   red_disconnect_cursor_client,
-                                                   cursor_channel_send_item,
-                                                   cursor_channel_hold_pipe_item,
-                                                   cursor_channel_release_item,
-                                                   red_channel_client_handle_message,
-                                                   NULL,
-                                                   NULL,
-                                                   NULL))) {
+    if (!(channel = (CursorChannel *)__new_channel(
+            worker, sizeof(*channel),
+            SPICE_CHANNEL_CURSOR, client, stream, migrate,
+            handle_channel_events,
+            dispatch_cursor_channel_client_disconnect,
+            cursor_channel_send_item,
+            cursor_channel_hold_pipe_item,
+            cursor_channel_release_item,
+            red_channel_client_handle_message,
+            cursor_channel_on_incoming_error,
+            cursor_channel_on_outgoing_error,
+            NULL,
+            NULL,
+            NULL))) {
         return;
     }
 #ifdef RED_STATISTICS
@@ -9769,12 +9820,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
         break;
     case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
         RedsStream *stream;
+        RedClient *client;
         int migrate;
         red_printf("connect");
 
+        receive_data(worker->channel, &client, sizeof(RedClient *));
         receive_data(worker->channel, &stream, sizeof(RedsStream *));
         receive_data(worker->channel, &migrate, sizeof(int));
-        handle_new_display_channel(worker, stream, migrate);
+        handle_new_display_channel(worker, client, stream, migrate);
+        break;
+    }
+    case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: {
+        RedChannelClient *rcc;
+
+        red_printf("disconnect display client");
+        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+        red_disconnect_display(rcc);
+        message = RED_WORKER_MESSAGE_READY;
+        write_message(worker->channel, &message);
         break;
     }
     case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
@@ -9817,12 +9880,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
         break;
     case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
         RedsStream *stream;
+        RedClient *client;
         int migrate;
 
         red_printf("cursor connect");
+        receive_data(worker->channel, &client, sizeof(RedClient *));
         receive_data(worker->channel, &stream, sizeof(RedsStream *));
         receive_data(worker->channel, &migrate, sizeof(int));
-        red_connect_cursor(worker, stream, migrate);
+        red_connect_cursor(worker, client, stream, migrate);
+        break;
+    }
+    case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: {
+        RedChannelClient *rcc;
+
+        red_printf("disconnect cursor client");
+        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+        red_disconnect_cursor(rcc->channel); /* TODO - assumes a single client */
+        message = RED_WORKER_MESSAGE_READY;
+        write_message(worker->channel, &message);
         break;
     }
     case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
diff --git a/server/red_worker.h b/server/red_worker.h
index b4e2ed2..f79412f 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -51,11 +51,13 @@ enum {
     RED_WORKER_MESSAGE_READY,
     RED_WORKER_MESSAGE_DISPLAY_CONNECT,
     RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+    RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT,
     RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
     RED_WORKER_MESSAGE_START,
     RED_WORKER_MESSAGE_STOP,
     RED_WORKER_MESSAGE_CURSOR_CONNECT,
     RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+    RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT,
     RED_WORKER_MESSAGE_CURSOR_MIGRATE,
     RED_WORKER_MESSAGE_SET_COMPRESSION,
     RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
diff --git a/server/reds.c b/server/reds.c
index 9e3133e..347fa4b 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -199,6 +199,8 @@ typedef struct RedsState {
     int disconnecting;
     VDIPortState agent_state;
     int pending_mouse_event;
+    Ring clients;
+    int num_clients;
     uint32_t link_id;
     Channel *main_channel_factory;
     MainChannel *main_channel;
@@ -536,15 +538,6 @@ static Channel *reds_find_channel(uint32_t type, uint32_t id)
     return channel;
 }
 
-static void reds_shatdown_channels()
-{
-    Channel *channel = reds->channels;
-    while (channel) {
-        channel->shutdown(channel);
-        channel = channel->next;
-    }
-}
-
 static void reds_mig_cleanup()
 {
     if (reds->mig_inprogress) {
@@ -589,14 +582,14 @@ static int reds_main_channel_connected(void)
     return !!reds->main_channel;
 }
 
-void reds_disconnect()
+void reds_client_disconnect(RedClient *client)
 {
-    if (!reds_main_channel_connected() || reds->disconnecting) {
+    if (!reds_main_channel_connected() || client->disconnecting) {
         return;
     }
 
     red_printf("");
-    reds->disconnecting = TRUE;
+    client->disconnecting = TRUE;
     reds->link_id = 0;
 
     /* Reset write filter to start with clean state on client reconnect */
@@ -616,14 +609,26 @@ void reds_disconnect()
         }
     }
 
-    reds_shatdown_channels();
-    reds->main_channel_factory->shutdown(reds->main_channel_factory);
-    reds->main_channel_factory->data = NULL;
-    reds->main_channel = NULL;
+    ring_remove(&client->link);
+    reds->num_clients--;
+    red_client_destroy(client);
+
     reds_mig_cleanup();
     reds->disconnecting = FALSE;
 }
 
+// TODO: go over all usage of reds_disconnect, most/some of it should be converted to
+// reds_client_disconnect
+static void reds_disconnect(void)
+{
+    RingItem *link, *next;
+
+    red_printf("");
+    RING_FOREACH_SAFE(link, next, &reds->clients) {
+        reds_client_disconnect(SPICE_CONTAINEROF(link, RedClient, link));
+    }
+}
+
 static void reds_mig_disconnect()
 {
     if (reds_main_channel_connected()) {
@@ -1361,6 +1366,7 @@ void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
 static int sync_write(RedsStream *stream, const void *in_buf, size_t n)
 {
     const uint8_t *buf = (uint8_t *)in_buf;
+
     while (n) {
         int now = reds_stream_write(stream, buf, n);
         if (now <= 0) {
@@ -1460,7 +1466,6 @@ static int reds_send_link_ack(RedLinkInfo *link)
     BIO_get_mem_ptr(bio, &bmBuf);
     memcpy(ack.pub_key, bmBuf->data, sizeof(ack.pub_key));
 
-
     if (!sync_write(link->stream, &header, sizeof(header)))
         goto end;
     if (!sync_write(link->stream, &ack, sizeof(ack)))
@@ -1518,6 +1523,7 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
 // actually be joined with reds_handle_other_links, become reds_handle_link
 static void reds_handle_main_link(RedLinkInfo *link)
 {
+    RedClient *client;
     RedsStream *stream;
     SpiceLinkMess *link_mess;
     uint32_t *caps;
@@ -1560,13 +1566,17 @@ static void reds_handle_main_link(RedLinkInfo *link)
     if (!reds->main_channel_factory) {
         reds->main_channel_factory = main_channel_init();
     }
-    mcc = main_channel_link(reds->main_channel_factory,
+    client = red_client_new();
+    ring_add(&reds->clients, &client->link);
+    reds->num_clients++;
+    mcc = main_channel_link(reds->main_channel_factory, client,
                   stream, reds->mig_target, link_mess->num_common_caps,
                   link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
                   link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
     reds->main_channel = (MainChannel*)reds->main_channel_factory->data;
     ASSERT(reds->main_channel);
     free(link_mess);
+    red_client_set_main(client, mcc);
 
     if (vdagent) {
         SpiceCharDeviceInterface *sif;
@@ -1628,11 +1638,21 @@ static void openssl_init(RedLinkInfo *link)
 static void reds_handle_other_links(RedLinkInfo *link)
 {
     Channel *channel;
+    RedClient *client = NULL;
     RedsStream *stream;
     SpiceLinkMess *link_mess;
     uint32_t *caps;
 
     link_mess = link->link_mess;
+    if (reds->num_clients == 1) {
+        client = SPICE_CONTAINEROF(ring_get_head(&reds->clients), RedClient, link);
+    }
+
+    if (!client) {
+        reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
+        reds_link_free(link);
+        return;
+    }
 
     if (!reds->link_id || reds->link_id != link_mess->connection_id) {
         reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
@@ -1660,7 +1680,7 @@ static void reds_handle_other_links(RedLinkInfo *link)
     link->link_mess = NULL;
     reds_link_free(link);
     caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
-    channel->link(channel, stream, reds->mig_target, link_mess->num_common_caps,
+    channel->link(channel, client, stream, reds->mig_target, link_mess->num_common_caps,
                   link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
                   link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
     free(link_mess);
@@ -3467,6 +3487,8 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
     reds->listen_socket = -1;
     reds->secure_listen_socket = -1;
     init_vd_agent_resources();
+    ring_init(&reds->clients);
+    reds->num_clients = 0;
 
     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
         red_error("migration timer create failed");
diff --git a/server/reds.h b/server/reds.h
index 464803d..29ce15f 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -33,6 +33,7 @@
 #define __visible__ __attribute__ ((visibility ("default")))
 
 typedef struct RedsStream RedsStream;
+typedef struct RedClient RedClient;
 typedef struct MainChannelClient MainChannelClient;
 
 #if HAVE_SASL
@@ -93,7 +94,8 @@ typedef struct Channel {
     uint32_t *common_caps;
     int num_caps;
     uint32_t *caps;
-    void (*link)(struct Channel *, RedsStream *stream, int migration, int num_common_caps,
+    void (*link)(struct Channel *, RedClient *client, RedsStream *stream,
+                 int migration, int num_common_caps,
                  uint32_t *common_caps, int num_caps, uint32_t *caps);
     void (*shutdown)(struct Channel *);
     void (*migrate)(struct Channel *);
@@ -136,7 +138,7 @@ extern uint64_t bitrate_per_sec;
 #define IS_LOW_BANDWIDTH() (bitrate_per_sec < 10 * 1024 * 1024)
 
 // Temporary measures to make splitting reds.c to inputs_channel.c easier
-void reds_disconnect(void);
+void reds_client_disconnect(RedClient *client);
 
 // Temporary (?) for splitting main channel
 typedef struct MainMigrateData MainMigrateData;
diff --git a/server/smartcard.c b/server/smartcard.c
index 36855a1..48072a3 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -487,7 +487,7 @@ static void smartcard_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *it
 {
 }
 
-static void smartcard_link(Channel *channel, RedsStream *stream,
+static void smartcard_link(Channel *channel, RedClient *client, RedsStream *stream,
                         int migration, int num_common_caps,
                         uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
@@ -516,7 +516,7 @@ static void smartcard_link(Channel *channel, RedsStream *stream,
         red_printf("ERROR: smartcard channel creation failed");
         return;
     }
-    red_channel_client_create(sizeof(RedChannelClient), channel->data, stream);
+    red_channel_client_create(sizeof(RedChannelClient), channel->data, client, stream);
     red_channel_init_outgoing_messages_window((RedChannel*)channel->data);
 }
 
diff --git a/server/snd_worker.c b/server/snd_worker.c
index 8da11e1..71b7f19 100644
--- a/server/snd_worker.c
+++ b/server/snd_worker.c
@@ -936,9 +936,9 @@ static void snd_playback_cleanup(SndChannel *channel)
     celt051_mode_destroy(playback_channel->celt_mode);
 }
 
-static void snd_set_playback_peer(Channel *channel, RedsStream *stream, int migration,
-                                  int num_common_caps, uint32_t *common_caps, int num_caps,
-                                  uint32_t *caps)
+static void snd_set_playback_peer(Channel *channel, RedClient *client, RedsStream *stream,
+                                  int migration, int num_common_caps, uint32_t *common_caps,
+                                  int num_caps, uint32_t *caps)
 {
     SndWorker *worker = (SndWorker *)channel;
     SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker);
@@ -1102,9 +1102,9 @@ static void snd_record_cleanup(SndChannel *channel)
     celt051_mode_destroy(record_channel->celt_mode);
 }
 
-static void snd_set_record_peer(Channel *channel, RedsStream *stream, int migration,
-                                int num_common_caps, uint32_t *common_caps, int num_caps,
-                                uint32_t *caps)
+static void snd_set_record_peer(Channel *channel, RedClient *client, RedsStream *stream,
+                                int migration, int num_common_caps, uint32_t *common_caps,
+                                int num_caps, uint32_t *caps)
 {
     SndWorker *worker = (SndWorker *)channel;
     SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker);
-- 
1.7.5.1



More information about the Spice-devel mailing list