[Spice-devel] [RFC v4 13/62] server: Add RedClient

Alon Levy alevy at redhat.com
Tue Apr 26 03:54:38 PDT 2011


That means RedClient tracks a ring of channels. Right now there will be only
a single client because of the disconnection mechanism - whenever a new
client comes we disconnect all existing clients. But this patch adds already
a ring of clients to reds.c (stored in RedServer).

There is a known problem handling many connections and disconnections at the
same time, trigerrable easily by the following script:

export NEW_DISPLAY=:3.0

Xephyr $NEW_DISPLAY -noreset &
for ((i = 0 ; i < 5; ++i)); do
    for ((j = 0 ; j < 10; ++j)); do
        DISPLAY=$NEW_DISPLAY c_win7x86_qxl_tests &
    done
    sleep 2;
done

I fixed a few of the problems resulting from this in the same patch. This
required already introducing a few other changes:
 * make sure all removal of channels happens in the main thread, for that
 two additional dispatcher calls are added to remove a specific channel
 client (RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT and
 RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT).
 * change some asserts in input channel.
 * make main channel disconnect not recursive
 * introduce disconnect call back to red_channel_create_parser

The remaining abort is from a double free in the main channel, still can't
find it (doesn't happen when running under valgrind - probably due to the
slowness resulting from that), but is easy to see when running under gdb.
---
 server/inputs_channel.c    |   59 ++++++++++--------
 server/main_channel.c      |   14 +++-
 server/main_channel.h      |    2 +-
 server/red_channel.c       |   89 +++++++++++++++++++++++++--
 server/red_channel.h       |   26 ++++++++-
 server/red_dispatcher.c    |   36 ++++++++++-
 server/red_dispatcher.h    |    7 ++-
 server/red_tunnel_worker.c |   14 ++--
 server/red_worker.c        |  147 +++++++++++++++++++++++++++++++++-----------
 server/red_worker.h        |    2 +
 server/reds.c              |   60 ++++++++++++------
 server/reds.h              |    6 +-
 server/smartcard.c         |    4 +-
 server/snd_worker.c        |   12 ++--
 14 files changed, 366 insertions(+), 112 deletions(-)

diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index c94fffc..9fc7bca 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -444,10 +444,16 @@ static void inputs_relase_keys(void)
     kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
 }
 
-static void inputs_channel_on_error(RedChannelClient *rcc)
+static void inputs_channel_disconnect(RedChannelClient *rcc)
 {
     inputs_relase_keys();
-    red_channel_client_destroy(rcc);
+    red_channel_client_disconnect(rcc);
+}
+
+static void inputs_channel_on_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    inputs_channel_disconnect(rcc);
 }
 
 static void inputs_shutdown(Channel *channel)
@@ -508,36 +514,39 @@ static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
 {
 }
 
-static void inputs_link(Channel *channel, RedsStream *stream, int migration,
+static void inputs_link(Channel *channel, RedClient *client, RedsStream *stream, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
-    InputsChannel *inputs_channel;
     RedChannelClient *rcc;
 
-    ASSERT(channel->data == NULL);
-
-    red_printf("input channel create");
-    g_inputs_channel = inputs_channel = (InputsChannel*)red_channel_create_parser(
-        sizeof(*inputs_channel), core, migration, FALSE /* handle_acks */
-        ,inputs_channel_config_socket
-        ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
-        ,inputs_channel_handle_parsed
-        ,inputs_channel_alloc_msg_rcv_buf
-        ,inputs_channel_release_msg_rcv_buf
-        ,inputs_channel_hold_pipe_item
-        ,inputs_channel_send_item
-        ,inputs_channel_release_pipe_item
-        ,inputs_channel_on_error
-        ,inputs_channel_on_error
-        ,NULL
-        ,NULL
-        ,NULL);
-    ASSERT(inputs_channel);
+    ASSERT(channel->data == g_inputs_channel);
+
+    if (channel->data == NULL) {
+        red_printf("input channel create");
+        g_inputs_channel = (InputsChannel*)red_channel_create_parser(
+            sizeof(InputsChannel), core, migration, FALSE /* handle_acks */
+            ,inputs_channel_config_socket
+            ,inputs_channel_disconnect
+            ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
+            ,inputs_channel_handle_parsed
+            ,inputs_channel_alloc_msg_rcv_buf
+            ,inputs_channel_release_msg_rcv_buf
+            ,inputs_channel_hold_pipe_item
+            ,inputs_channel_send_item
+            ,inputs_channel_release_pipe_item
+            ,inputs_channel_on_error
+            ,inputs_channel_on_error
+            ,NULL
+            ,NULL
+            ,NULL);
+    }
+    channel->data = g_inputs_channel;
+    ASSERT(g_inputs_channel);
     red_printf("input channel client create");
-    rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base, stream);
+    rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base,
+                                    client, stream);
     ASSERT(rcc);
-    channel->data = inputs_channel;
     inputs_pipe_add_init(rcc);
 }
 
diff --git a/server/main_channel.c b/server/main_channel.c
index 863fc85..c2e2465 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -136,6 +136,11 @@ enum NetTestStage {
 static uint64_t latency = 0;
 uint64_t bitrate_per_sec = ~0;
 
+static void main_channel_client_disconnect(RedChannelClient *rcc)
+{
+    red_channel_client_disconnect(rcc);
+}
+
 static void main_disconnect(MainChannel *main_chan)
 {
     red_channel_destroy(&main_chan->base);
@@ -793,7 +798,7 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
 
 static void main_channel_on_error(RedChannelClient *rcc)
 {
-    reds_disconnect();
+    reds_client_disconnect(rcc->client);
 }
 
 static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
@@ -824,19 +829,20 @@ static int main_channel_handle_migrate_flush_mark_proc(RedChannelClient *rcc)
     return TRUE;
 }
 
-MainChannelClient *main_channel_link(Channel *channel, RedsStream *stream, int migration,
+MainChannelClient *main_channel_link(Channel *channel, RedClient *client,
+                        RedsStream *stream, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
     MainChannel *main_chan;
     MainChannelClient *mcc;
 
-    ASSERT(channel->data == NULL);
     if (channel->data == NULL) {
         red_printf("create main channel");
         channel->data = red_channel_create_parser(
             sizeof(*main_chan), core, migration, FALSE /* handle_acks */
             ,main_channel_config_socket
+            ,main_channel_client_disconnect
             ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
             ,main_channel_handle_parsed
             ,main_channel_alloc_msg_rcv_buf
@@ -854,7 +860,7 @@ MainChannelClient *main_channel_link(Channel *channel, RedsStream *stream, int m
     main_chan = (MainChannel*)channel->data;
     red_printf("add main channel client");
     mcc = (MainChannelClient*)
-            red_channel_client_create(sizeof(MainChannelClient), &main_chan->base, stream);
+            red_channel_client_create(sizeof(MainChannelClient), &main_chan->base, client, stream);
     return mcc;
 }
 
diff --git a/server/main_channel.h b/server/main_channel.h
index ce012d2..9d3aaab 100644
--- a/server/main_channel.h
+++ b/server/main_channel.h
@@ -49,7 +49,7 @@ typedef struct MainChannelClient MainChannelClient;
 
 Channel *main_channel_init();
 /* This is a 'clone' from the reds.h Channel.link callback */
-MainChannelClient *main_channel_link(struct Channel *,
+MainChannelClient *main_channel_link(struct Channel *, RedClient *client,
                  RedsStream *stream, int migration, int num_common_caps,
                  uint32_t *common_caps, int num_caps, uint32_t *caps);
 void main_channel_close(MainChannel *main_chan); // not destroy, just socket close
diff --git a/server/red_channel.c b/server/red_channel.c
index 5aad98b..4c66f9a 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -31,6 +31,7 @@
 #include "generated_marshallers.h"
 
 static void red_channel_client_event(int fd, int event, void *data);
+static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
 
 /* return the number of bytes read. -1 in case of error */
 static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -365,6 +366,7 @@ static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
 RedChannelClient *red_channel_client_create(
     int size,
     RedChannel *channel,
+    RedClient  *client,
     RedsStream *stream)
 {
     RedChannelClient *rcc;
@@ -373,6 +375,7 @@ RedChannelClient *red_channel_client_create(
     rcc = spice_malloc0(size);
     rcc->stream = stream;
     rcc->channel = channel;
+    rcc->client = client;
     rcc->ack_data.messages_window = ~0;  // blocks send message (maybe use send_data.blocked +
                                              // block flags)
     rcc->ack_data.client_generation = ~0;
@@ -398,6 +401,7 @@ RedChannelClient *red_channel_client_create(
                                            red_channel_client_event, rcc);
     rcc->id = 0;
     red_channel_add_client(channel, rcc);
+    red_client_add_channel(client, rcc);
     return rcc;
 error:
     free(rcc);
@@ -457,10 +461,6 @@ RedChannel *red_channel_create(int size,
     return channel;
 }
 
-void do_nothing_disconnect(RedChannelClient *rcc)
-{
-}
-
 int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
 {
     return TRUE;
@@ -470,6 +470,7 @@ RedChannel *red_channel_create_parser(int size,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
+                               channel_disconnect_proc disconnect,
                                spice_parse_channel_func_t parser,
                                channel_handle_parsed_proc handle_parsed,
                                channel_alloc_msg_recv_buf_proc alloc_recv_buf,
@@ -484,7 +485,7 @@ RedChannel *red_channel_create_parser(int size,
                                channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
 {
     RedChannel *channel = red_channel_create(size,
-        core, migrate, handle_acks, config_socket, do_nothing_disconnect,
+        core, migrate, handle_acks, config_socket, disconnect,
         do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item,
         send_item, release_item, handle_migrate_flush_mark, handle_migrate_data,
         handle_migrate_data_get_serial);
@@ -520,7 +521,7 @@ void red_channel_destroy(RedChannel *channel)
     free(channel);
 }
 
-static void red_channel_client_shutdown(RedChannelClient *rcc)
+void red_channel_client_shutdown(RedChannelClient *rcc)
 {
     if (rcc->stream && !rcc->stream->shutdown) {
         rcc->channel->core->watch_remove(rcc->stream->watch);
@@ -967,3 +968,79 @@ void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
     red_channel_client_pipe_remove(rcc, item);
     red_channel_client_release_item(rcc, item, FALSE);
 }
+
+/*
+ * RedClient implementation - kept in red_channel.c because they are
+ * pretty tied together.
+ */
+
+RedClient *red_client_new()
+{
+    RedClient *client;
+
+    client = spice_malloc0(sizeof(RedClient));
+    ring_init(&client->channels);
+    return client;
+}
+
+void red_client_shutdown(RedClient *client)
+{
+    RingItem *link, *next;
+
+    red_printf("#channels %d", client->channels_num);
+    RING_FOREACH_SAFE(link, next, &client->channels) {
+        red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link));
+    }
+}
+
+void red_client_destroy(RedClient *client)
+{
+    RingItem *link, *next;
+    RedChannelClient *rcc;
+
+    red_printf("destroy client with #channels %d", client->channels_num);
+    RING_FOREACH_SAFE(link, next, &client->channels) {
+        // some channels may be in other threads, so disconnection
+        // is not synchronous.
+        rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+        rcc->channel->disconnect(rcc); // this may call another thread. it also frees. (eventually - doesn't have to be in sync)
+    }
+    free(client);
+}
+
+void red_client_disconnect(RedClient *client)
+{
+    RingItem *link, *next;
+    RedChannelClient *rcc;
+
+    red_printf("#channels %d", client->channels_num);
+    RING_FOREACH_SAFE(link, next, &client->channels) {
+        // some channels may be in other threads, so disconnection
+        // is not synchronous.
+        rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+        rcc->channel->disconnect(rcc);
+    }
+}
+
+static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
+{
+    ASSERT(rcc && client);
+    ring_add(&client->channels, &rcc->client_link);
+    client->channels_num++;
+}
+
+void red_client_remove_channel(RedClient *client, RedChannelClient *rcc)
+{
+    ring_remove(&rcc->client_link);
+    client->channels_num--;
+}
+
+MainChannelClient *red_client_get_main(RedClient *client) {
+    return client->mcc;
+}
+
+void red_client_set_main(RedClient *client, MainChannelClient *mcc) {
+    client->mcc = mcc;
+}
+
+
diff --git a/server/red_channel.h b/server/red_channel.h
index c137eb4..14f097a 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -32,6 +32,8 @@
 #define MAX_SEND_VEC 100
 #define CLIENT_ACK_WINDOW 20
 
+typedef struct MainChannelClient MainChannelClient;
+
 /* Basic interface for channels, without using the RedChannel interface.
    The intention is to move towards one channel interface gradually.
    At the final stage, this interface shouldn't be exposed. Only RedChannel will use it. */
@@ -140,7 +142,9 @@ typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannelClient
 
 struct RedChannelClient {
     RingItem channel_link;
+    RingItem client_link;
     RedChannel *channel;
+    RedClient  *client;
     RedsStream *stream;
     struct {
         uint32_t generation;
@@ -219,6 +223,7 @@ RedChannel *red_channel_create_parser(int size,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
+                               channel_disconnect_proc disconnect,
                                spice_parse_channel_func_t parser,
                                channel_handle_parsed_proc handle_parsed,
                                channel_alloc_msg_recv_buf_proc alloc_recv_buf,
@@ -231,13 +236,19 @@ RedChannel *red_channel_create_parser(int size,
                                channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
                                channel_handle_migrate_data_proc handle_migrate_data,
                                channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial);
-RedChannelClient *red_channel_client_create(int size, RedChannel *channel,
+RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
                                             RedsStream *stream);
 int red_channel_is_connected(RedChannel *channel);
 
 void red_channel_client_destroy(RedChannelClient *rcc);
 void red_channel_destroy(RedChannel *channel);
 
+/* shutdown is the only safe thing to do out of the client/channel
+ * thread. It will not touch the rings, just shutdown the socket.
+ * It should be followed by some way to gurantee a disconnection. */
+void red_channel_client_shutdown(RedChannelClient *rcc);
+void red_channel_shutdown(RedChannel *channel);
+
 /* should be called when a new channel is ready to send messages */
 void red_channel_init_outgoing_messages_window(RedChannel *channel);
 
@@ -347,4 +358,17 @@ typedef void (*channel_client_visitor_data)(RedChannelClient *rcc, void *data);
 void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v);
 void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data);
 
+struct RedClient {
+    RingItem link;
+    Ring channels;
+    int channels_num;
+    int disconnecting;
+    MainChannelClient *mcc;
+};
+
+RedClient *red_client_new();
+void red_client_destroy(RedClient *client);
+void red_client_set_main(RedClient *client, MainChannelClient *mcc);
+MainChannelClient *red_client_get_main(RedClient *client);
+
 #endif
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 75e0670..6229e9b 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -71,7 +71,8 @@ extern spice_wan_compression_t zlib_glz_state;
 
 static RedDispatcher *dispatchers = NULL;
 
-static void red_dispatcher_set_peer(Channel *channel, RedsStream *stream, int migration,
+static void red_dispatcher_set_peer(Channel *channel, RedClient *client,
+                                    RedsStream *stream, int migration,
                                     int num_common_caps, uint32_t *common_caps, int num_caps,
                                     uint32_t *caps)
 {
@@ -81,6 +82,7 @@ static void red_dispatcher_set_peer(Channel *channel, RedsStream *stream, int mi
     dispatcher = (RedDispatcher *)channel->data;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
     write_message(dispatcher->channel, &message);
+    send_data(dispatcher->channel, &client, sizeof(RedClient *));
     send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
     send_data(dispatcher->channel, &migration, sizeof(int));
 }
@@ -101,7 +103,7 @@ static void red_dispatcher_migrate(Channel *channel)
     write_message(dispatcher->channel, &message);
 }
 
-static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *stream,
+static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, RedsStream *stream,
                                            int migration, int num_common_caps,
                                            uint32_t *common_caps, int num_caps,
                                            uint32_t *caps)
@@ -110,6 +112,7 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *stream,
     red_printf("");
     RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
     write_message(dispatcher->channel, &message);
+    send_data(dispatcher->channel, &client, sizeof(RedClient *));
     send_data(dispatcher->channel, &stream, sizeof(RedsStream *));
     send_data(dispatcher->channel, &migration, sizeof(int));
 }
@@ -385,6 +388,35 @@ static void qxl_worker_stop(QXLWorker *qxl_worker)
     ASSERT(message == RED_WORKER_MESSAGE_READY);
 }
 
+static void red_dispatcher_send_disconnect(RedDispatcher *dispatcher,
+                    struct RedChannelClient *rcc, RedWorkerMessage message)
+{
+    write_message(dispatcher->channel, &message);
+    send_data(dispatcher->channel, &rcc, sizeof(struct RedChannelClient *));
+}
+
+void red_dispatcher_disconnect_display_client(RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc)
+{
+    RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+
+    red_dispatcher_send_disconnect(dispatcher, rcc,
+            RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT);
+    read_message(dispatcher->channel, &message);
+    ASSERT(message == RED_WORKER_MESSAGE_READY);
+}
+
+void red_dispatcher_disconnect_cursor_client(RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc)
+{
+    RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
+
+    red_dispatcher_send_disconnect(dispatcher, rcc,
+            RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT);
+    read_message(dispatcher->channel, &message);
+    ASSERT(message == RED_WORKER_MESSAGE_READY);
+}
+
 void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
                                 struct QXLCommandExt *ext, uint32_t count)
 {
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index 7f8973a..fd30c79 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -18,6 +18,7 @@
 #ifndef _H_RED_DISPATCHER
 #define _H_RED_DISPATCHER
 
+struct RedChannelClient;
 
 struct RedDispatcher *red_dispatcher_init(QXLInstance *qxl);
 
@@ -29,5 +30,9 @@ int red_dispatcher_count();
 int red_dispatcher_add_renderer(const char *name);
 uint32_t red_dispatcher_qxl_ram_size();
 int red_dispatcher_qxl_count();
-#endif
+void red_dispatcher_disconnect_display_client(struct RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc);
+void red_dispatcher_disconnect_cursor_client(struct RedDispatcher *dispatcher,
+                                      struct RedChannelClient *rcc);
 
+#endif
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
index dd8a27d..0392c9b 100644
--- a/server/red_tunnel_worker.c
+++ b/server/red_tunnel_worker.c
@@ -599,9 +599,9 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
 
 
 /* reds interface */
-static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration,
-                                       int num_common_caps, uint32_t *common_caps, int num_caps,
-                                       uint32_t *caps);
+static void handle_tunnel_channel_link(Channel *channel, RedClient *client, RedsStream *stream,
+                                       int migration, int num_common_caps, uint32_t *common_caps,
+                                       int num_caps, uint32_t *caps);
 static void handle_tunnel_channel_shutdown(struct Channel *channel);
 static void handle_tunnel_channel_migrate(struct Channel *channel);
 
@@ -3431,9 +3431,9 @@ static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
 {
 }
 
-static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration,
-                                       int num_common_caps, uint32_t *common_caps, int num_caps,
-                                       uint32_t *caps)
+static void handle_tunnel_channel_link(Channel *channel, RedClient *client, RedsStream *stream,
+                                       int migration, int num_common_caps, uint32_t *common_caps,
+                                       int num_caps, uint32_t *caps)
 {
     TunnelChannel *tunnel_channel;
     TunnelWorker *worker = (TunnelWorker *)channel->data;
@@ -3459,7 +3459,7 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int
     if (!tunnel_channel) {
         return;
     }
-    red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, stream);
+    red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, client, stream);
 
     tunnel_channel->worker = worker;
     tunnel_channel->worker->channel = tunnel_channel;
diff --git a/server/red_worker.c b/server/red_worker.c
index 389af58..950715e 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -58,6 +58,7 @@
 #include "generated_marshallers.h"
 #include "zlib_encoder.h"
 #include "red_channel.h"
+#include "red_dispatcher.h"
 
 //#define COMPRESS_STAT
 //#define DUMP_BITMAP
@@ -9016,6 +9017,7 @@ static void free_common_channel_from_listener(EventListener *ctx)
 
     free(common);
 }
+
 void worker_watch_update_mask(SpiceWatch *watch, int event_mask)
 {
 }
@@ -9036,13 +9038,15 @@ SpiceCoreInterface worker_core = {
 };
 
 static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
-                                 RedsStream *stream, int migrate,
+                                 RedClient *client, RedsStream *stream, int migrate,
                                  event_listener_action_proc handler,
                                  channel_disconnect_proc disconnect,
                                  channel_send_pipe_item_proc send_item,
                                  channel_hold_pipe_item_proc hold_item,
                                  channel_release_pipe_item_proc release_item,
                                  channel_handle_parsed_proc handle_parsed,
+                                 channel_on_incoming_error_proc on_incoming_error,
+                                 channel_on_outgoing_error_proc on_outgoing_error,
                                  channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
                                  channel_handle_migrate_data_proc handle_migrate_data,
                                  channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
@@ -9054,6 +9058,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
     channel = red_channel_create_parser(size, &worker_core, migrate,
                                         TRUE /* handle_acks */,
                                         common_channel_config_socket,
+                                        disconnect,
                                         spice_get_client_channel_parser(channel_id, NULL),
                                         handle_parsed,
                                         common_alloc_recv_buf,
@@ -9061,8 +9066,8 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
                                         hold_item,
                                         send_item,
                                         release_item,
-                                        red_channel_client_default_peer_on_error,
-                                        red_channel_client_default_peer_on_error,
+                                        on_incoming_error,
+                                        on_outgoing_error,
                                         handle_migrate_flush_mark,
                                         handle_migrate_data,
                                         handle_migrate_data_get_serial);
@@ -9070,7 +9075,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
     if (!channel) {
         goto error;
     }
-    red_channel_client_create(sizeof(RedChannelClient), channel, stream);
+    red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
     common->id = worker->id;
     common->listener.refs = 1;
     common->listener.action = handler;
@@ -9161,25 +9166,72 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item,
     }
 }
 
-static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, int migrate)
+static void display_channel_on_incoming_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+static void display_channel_on_outgoing_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+static void cursor_channel_on_incoming_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+static void cursor_channel_on_outgoing_error(RedChannelClient *rcc)
+{
+    red_printf("");
+    red_channel_client_shutdown(rcc);
+}
+
+// call this from dispatcher thread context
+static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc)
+{
+    RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker;
+    struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
+
+    red_printf("");
+    red_dispatcher_disconnect_display_client(dispatcher, rcc);
+}
+
+// call this from dispatcher thread context
+static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc)
+{
+    RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker;
+    struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
+
+    red_printf("");
+    red_dispatcher_disconnect_cursor_client(dispatcher, rcc);
+}
+
+static void handle_new_display_channel(RedWorker *worker, RedClient *client, RedsStream *stream,
+                                       int migrate)
 {
     DisplayChannel *display_channel;
     size_t stream_buf_size;
 
     red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel);
 
-    if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel),
-                                                            SPICE_CHANNEL_DISPLAY, stream,
-                                                            migrate, handle_channel_events,
-                                                            red_disconnect_display,
-                                                            display_channel_send_item,
-                                                            display_channel_hold_pipe_item,
-                                                            display_channel_release_item,
-                                                            display_channel_handle_message,
-                                                            display_channel_handle_migrate_mark,
-                                                            display_channel_handle_migrate_data,
-                                                            display_channel_handle_migrate_data_get_serial_proc
-                                                            ))) {
+    if (!(display_channel = (DisplayChannel *)__new_channel(
+            worker, sizeof(*display_channel),
+            SPICE_CHANNEL_DISPLAY, client, stream,
+            migrate, handle_channel_events,
+            dispatch_display_channel_client_disconnect,
+            display_channel_send_item,
+            display_channel_hold_pipe_item,
+            display_channel_release_item,
+            display_channel_handle_message,
+            display_channel_on_incoming_error,
+            display_channel_on_outgoing_error,
+            display_channel_handle_migrate_mark,
+            display_channel_handle_migrate_data,
+            display_channel_handle_migrate_data_get_serial_proc))) {
         return;
     }
 #ifdef RED_STATISTICS
@@ -9244,11 +9296,6 @@ static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, in
     stat_compress_init(&display_channel->jpeg_alpha_stat, jpeg_alpha_stat_name);
 }
 
-static void red_disconnect_cursor_client(RedChannelClient *rcc)
-{
-    red_disconnect_cursor(rcc->channel);
-}
-
 static void red_disconnect_cursor(RedChannel *channel)
 {
     CommonChannel *common;
@@ -9303,23 +9350,27 @@ static void cursor_channel_release_item(RedChannelClient *rcc, PipeItem *item, i
     }
 }
 
-static void red_connect_cursor(RedWorker *worker, RedsStream *stream, int migrate)
+static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream *stream,
+                               int migrate)
 {
     CursorChannel *channel;
 
     red_disconnect_cursor((RedChannel *)worker->cursor_channel);
 
-    if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel),
-                                                   SPICE_CHANNEL_CURSOR, stream, migrate,
-                                                   handle_channel_events,
-                                                   red_disconnect_cursor_client,
-                                                   cursor_channel_send_item,
-                                                   cursor_channel_hold_pipe_item,
-                                                   cursor_channel_release_item,
-                                                   red_channel_client_handle_message,
-                                                   NULL,
-                                                   NULL,
-                                                   NULL))) {
+    if (!(channel = (CursorChannel *)__new_channel(
+            worker, sizeof(*channel),
+            SPICE_CHANNEL_CURSOR, client, stream, migrate,
+            handle_channel_events,
+            dispatch_cursor_channel_client_disconnect,
+            cursor_channel_send_item,
+            cursor_channel_hold_pipe_item,
+            cursor_channel_release_item,
+            red_channel_client_handle_message,
+            cursor_channel_on_incoming_error,
+            cursor_channel_on_outgoing_error,
+            NULL,
+            NULL,
+            NULL))) {
         return;
     }
 #ifdef RED_STATISTICS
@@ -9755,12 +9806,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
         break;
     case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
         RedsStream *stream;
+        RedClient *client;
         int migrate;
         red_printf("connect");
 
+        receive_data(worker->channel, &client, sizeof(RedClient *));
         receive_data(worker->channel, &stream, sizeof(RedsStream *));
         receive_data(worker->channel, &migrate, sizeof(int));
-        handle_new_display_channel(worker, stream, migrate);
+        handle_new_display_channel(worker, client, stream, migrate);
+        break;
+    }
+    case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: {
+        RedChannelClient *rcc;
+
+        red_printf("disconnect display client");
+        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+        red_disconnect_display(rcc);
+        message = RED_WORKER_MESSAGE_READY;
+        write_message(worker->channel, &message);
         break;
     }
     case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
@@ -9803,12 +9866,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
         break;
     case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
         RedsStream *stream;
+        RedClient *client;
         int migrate;
 
         red_printf("cursor connect");
+        receive_data(worker->channel, &client, sizeof(RedClient *));
         receive_data(worker->channel, &stream, sizeof(RedsStream *));
         receive_data(worker->channel, &migrate, sizeof(int));
-        red_connect_cursor(worker, stream, migrate);
+        red_connect_cursor(worker, client, stream, migrate);
+        break;
+    }
+    case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: {
+        RedChannelClient *rcc;
+
+        red_printf("disconnect cursor client");
+        receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+        red_disconnect_cursor(rcc->channel); /* TODO - assumes a single client */
+        message = RED_WORKER_MESSAGE_READY;
+        write_message(worker->channel, &message);
         break;
     }
     case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
diff --git a/server/red_worker.h b/server/red_worker.h
index ae2eaee..c6111cb 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -49,11 +49,13 @@ enum {
     RED_WORKER_MESSAGE_READY,
     RED_WORKER_MESSAGE_DISPLAY_CONNECT,
     RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
+    RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT,
     RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
     RED_WORKER_MESSAGE_START,
     RED_WORKER_MESSAGE_STOP,
     RED_WORKER_MESSAGE_CURSOR_CONNECT,
     RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
+    RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT,
     RED_WORKER_MESSAGE_CURSOR_MIGRATE,
     RED_WORKER_MESSAGE_SET_COMPRESSION,
     RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
diff --git a/server/reds.c b/server/reds.c
index 5aeb2b8..5017de1 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -198,6 +198,8 @@ typedef struct RedsState {
     int disconnecting;
     VDIPortState agent_state;
     int pending_mouse_event;
+    Ring clients;
+    int num_clients;
     uint32_t link_id;
     Channel *main_channel_factory;
     MainChannel *main_channel;
@@ -535,15 +537,6 @@ static Channel *reds_find_channel(uint32_t type, uint32_t id)
     return channel;
 }
 
-static void reds_shatdown_channels()
-{
-    Channel *channel = reds->channels;
-    while (channel) {
-        channel->shutdown(channel);
-        channel = channel->next;
-    }
-}
-
 static void reds_mig_cleanup()
 {
     if (reds->mig_inprogress) {
@@ -588,14 +581,14 @@ int reds_main_channel_connected()
     return !!reds->main_channel;
 }
 
-void reds_disconnect()
+void reds_client_disconnect(RedClient *client)
 {
-    if (!reds_main_channel_connected() || reds->disconnecting) {
+    if (!reds_main_channel_connected() || client->disconnecting) {
         return;
     }
 
     red_printf("");
-    reds->disconnecting = TRUE;
+    client->disconnecting = TRUE;
     reds->link_id = 0;
 
     /* Reset write filter to start with clean state on client reconnect */
@@ -615,14 +608,26 @@ void reds_disconnect()
         }
     }
 
-    reds_shatdown_channels();
-    reds->main_channel_factory->shutdown(reds->main_channel_factory);
-    reds->main_channel_factory->data = NULL;
-    reds->main_channel = NULL;
+    ring_remove(&client->link);
+    reds->num_clients--;
+    red_client_destroy(client);
+
     reds_mig_cleanup();
     reds->disconnecting = FALSE;
 }
 
+// TODO: go over all usage of reds_disconnect, most/some of it should be converted to
+// reds_client_disconnect
+static void reds_disconnect(void)
+{
+    RingItem *link, *next;
+
+    red_printf("");
+    RING_FOREACH_SAFE(link, next, &reds->clients) {
+        reds_client_disconnect(SPICE_CONTAINEROF(link, RedClient, link));
+    }
+}
+
 static void reds_mig_disconnect()
 {
     if (reds_main_channel_connected()) {
@@ -1360,6 +1365,7 @@ void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
 static int sync_write(RedsStream *stream, const void *in_buf, size_t n)
 {
     const uint8_t *buf = (uint8_t *)in_buf;
+
     while (n) {
         int now = reds_stream_write(stream, buf, n);
         if (now <= 0) {
@@ -1459,7 +1465,6 @@ static int reds_send_link_ack(RedLinkInfo *link)
     BIO_get_mem_ptr(bio, &bmBuf);
     memcpy(ack.pub_key, bmBuf->data, sizeof(ack.pub_key));
 
-
     if (!sync_write(link->stream, &header, sizeof(header)))
         goto end;
     if (!sync_write(link->stream, &ack, sizeof(ack)))
@@ -1517,6 +1522,7 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
 // actually be joined with reds_handle_other_links, become reds_handle_link
 static void reds_handle_main_link(RedLinkInfo *link)
 {
+    RedClient *client;
     RedsStream *stream;
     SpiceLinkMess *link_mess;
     uint32_t *caps;
@@ -1559,13 +1565,17 @@ static void reds_handle_main_link(RedLinkInfo *link)
     if (!reds->main_channel_factory) {
         reds->main_channel_factory = main_channel_init();
     }
-    mcc = main_channel_link(reds->main_channel_factory,
+    client = red_client_new();
+    ring_add(&reds->clients, &client->link);
+    reds->num_clients++;
+    mcc = main_channel_link(reds->main_channel_factory, client,
                   stream, reds->mig_target, link_mess->num_common_caps,
                   link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
                   link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
     reds->main_channel = (MainChannel*)reds->main_channel_factory->data;
     ASSERT(reds->main_channel);
     free(link_mess);
+    red_client_set_main(client, mcc);
 
     if (vdagent) {
         SpiceCharDeviceInterface *sif;
@@ -1627,11 +1637,21 @@ static void openssl_init(RedLinkInfo *link)
 static void reds_handle_other_links(RedLinkInfo *link)
 {
     Channel *channel;
+    RedClient *client = NULL;
     RedsStream *stream;
     SpiceLinkMess *link_mess;
     uint32_t *caps;
 
     link_mess = link->link_mess;
+    if (reds->num_clients == 1) {
+        client = SPICE_CONTAINEROF(ring_get_head(&reds->clients), RedClient, link);
+    }
+
+    if (!client) {
+        reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
+        reds_link_free(link);
+        return;
+    }
 
     if (!reds->link_id || reds->link_id != link_mess->connection_id) {
         reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
@@ -1659,7 +1679,7 @@ static void reds_handle_other_links(RedLinkInfo *link)
     link->link_mess = NULL;
     reds_link_free(link);
     caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
-    channel->link(channel, stream, reds->mig_target, link_mess->num_common_caps,
+    channel->link(channel, client, stream, reds->mig_target, link_mess->num_common_caps,
                   link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
                   link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
     free(link_mess);
@@ -3466,6 +3486,8 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
     reds->listen_socket = -1;
     reds->secure_listen_socket = -1;
     init_vd_agent_resources();
+    ring_init(&reds->clients);
+    reds->num_clients = 0;
 
     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
         red_error("migration timer create failed");
diff --git a/server/reds.h b/server/reds.h
index 485d9eb..b95decd 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -35,6 +35,7 @@
 #define __visible__ __attribute__ ((visibility ("default")))
 
 typedef struct RedsStream RedsStream;
+typedef struct RedClient RedClient;
 
 #if HAVE_SASL
 typedef struct RedsSASL {
@@ -94,7 +95,8 @@ typedef struct Channel {
     uint32_t *common_caps;
     int num_caps;
     uint32_t *caps;
-    void (*link)(struct Channel *, RedsStream *stream, int migration, int num_common_caps,
+    void (*link)(struct Channel *, RedClient *client, RedsStream *stream,
+                 int migration, int num_common_caps,
                  uint32_t *common_caps, int num_caps, uint32_t *caps);
     void (*shutdown)(struct Channel *);
     void (*migrate)(struct Channel *);
@@ -137,7 +139,7 @@ extern uint64_t bitrate_per_sec;
 #define IS_LOW_BANDWIDTH() (bitrate_per_sec < 10 * 1024 * 1024)
 
 // Temporary measures to make splitting reds.c to inputs_channel.c easier
-void reds_disconnect(void);
+void reds_client_disconnect(RedClient *client);
 
 // Temporary (?) for splitting main channel
 typedef struct MainMigrateData MainMigrateData;
diff --git a/server/smartcard.c b/server/smartcard.c
index d6f53a3..15f62fc 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -466,7 +466,7 @@ static void smartcard_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *it
 {
 }
 
-static void smartcard_link(Channel *channel, RedsStream *stream,
+static void smartcard_link(Channel *channel, RedClient *client, RedsStream *stream,
                         int migration, int num_common_caps,
                         uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
@@ -495,7 +495,7 @@ static void smartcard_link(Channel *channel, RedsStream *stream,
         red_printf("ERROR: smartcard channel creation failed");
         return;
     }
-    red_channel_client_create(sizeof(RedChannelClient), channel->data, stream);
+    red_channel_client_create(sizeof(RedChannelClient), channel->data, client, stream);
     red_channel_init_outgoing_messages_window((RedChannel*)channel->data);
 }
 
diff --git a/server/snd_worker.c b/server/snd_worker.c
index 1a4840c..6bf1edd 100644
--- a/server/snd_worker.c
+++ b/server/snd_worker.c
@@ -933,9 +933,9 @@ static void snd_playback_cleanup(SndChannel *channel)
     celt051_mode_destroy(playback_channel->celt_mode);
 }
 
-static void snd_set_playback_peer(Channel *channel, RedsStream *stream, int migration,
-                                  int num_common_caps, uint32_t *common_caps, int num_caps,
-                                  uint32_t *caps)
+static void snd_set_playback_peer(Channel *channel, RedClient *client, RedsStream *stream,
+                                  int migration, int num_common_caps, uint32_t *common_caps,
+                                  int num_caps, uint32_t *caps)
 {
     SndWorker *worker = (SndWorker *)channel;
     SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker);
@@ -1099,9 +1099,9 @@ static void snd_record_cleanup(SndChannel *channel)
     celt051_mode_destroy(record_channel->celt_mode);
 }
 
-static void snd_set_record_peer(Channel *channel, RedsStream *stream, int migration,
-                                int num_common_caps, uint32_t *common_caps, int num_caps,
-                                uint32_t *caps)
+static void snd_set_record_peer(Channel *channel, RedClient *client, RedsStream *stream,
+                                int migration, int num_common_caps, uint32_t *common_caps,
+                                int num_caps, uint32_t *caps)
 {
     SndWorker *worker = (SndWorker *)channel;
     SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker);
-- 
1.7.4.4



More information about the Spice-devel mailing list