[Spice-devel] [PATCH 02/13] server: use proper methods for Stream read/write()

Marc-André Lureau marcandre.lureau at redhat.com
Tue Feb 22 08:08:56 PST 2011


This allows easier modification of the underlying IO.

We also avoid using the "generic ctx pointer"

Also rename StreamContext for Stream, stylistic change (it's obviously
a context, no?).
---
 server/inputs_channel.c    |    2 +-
 server/main_channel.c      |    2 +-
 server/red_channel.c       |   25 +++---
 server/red_channel.h       |    6 +-
 server/red_dispatcher.c    |    8 +-
 server/red_tunnel_worker.c |    4 +-
 server/red_worker.c        |   32 ++++----
 server/reds.c              |  196 +++++++++++++++++++++++---------------------
 server/reds.h              |   22 +++--
 server/smartcard.c         |    2 +-
 server/snd_worker.c        |   22 +++--
 11 files changed, 171 insertions(+), 150 deletions(-)

diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index b7ae55a..0781e62 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -508,7 +508,7 @@ static int inputs_channel_config_socket(RedChannel *channel)
     return TRUE;
 }
 
-static void inputs_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void inputs_link(Channel *channel, RedsStream *peer, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
diff --git a/server/main_channel.c b/server/main_channel.c
index f1fb4c6..ec234dd 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -776,7 +776,7 @@ static int main_channel_config_socket(RedChannel *channel)
     return TRUE;
 }
 
-static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void main_channel_link(Channel *channel, RedsStream *peer, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
diff --git a/server/red_channel.c b/server/red_channel.c
index a13ef0e..36e9f68 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -34,7 +34,7 @@ static void red_channel_pipe_clear(RedChannel *channel);
 static void red_channel_event(int fd, int event, void *data);
 
 /* return the number of bytes read. -1 in case of error */
-static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size)
+static int red_peer_receive(RedsStream *peer, uint8_t *buf, uint32_t size)
 {
     uint8_t *pos = buf;
     while (size) {
@@ -42,7 +42,8 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
         if (peer->shutdown) {
             return -1;
         }
-        if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) {
+        now = reds_stream_read(peer, pos, size);
+        if (now <= 0) {
             if (now == 0) {
                 return -1;
             }
@@ -65,7 +66,7 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
     return pos - buf;
 }
 
-static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+static void red_peer_handle_incoming(RedsStream *peer, IncomingHandler *handler)
 {
     int bytes_read;
     uint8_t *parsed;
@@ -143,9 +144,10 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
     }
 }
 
-static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
+static void red_peer_handle_outgoing(RedsStream *peer, OutgoingHandler *handler)
 {
-    int n;
+    ssize_t n;
+
     if (handler->size == 0) {
         handler->vec = handler->vec_buf;
         handler->size = handler->get_msg_size(handler->opaque);
@@ -153,9 +155,11 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
             return;
         }
     }
+
     for (;;) {
         handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
-        if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
+        n = reds_stream_writev(peer, handler->vec, handler->vec_size);
+        if (n == -1) {
             switch (errno) {
             case EAGAIN:
                 handler->on_block(handler->opaque);
@@ -243,7 +247,7 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
     }
 }
 
-RedChannel *red_channel_create(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create(int size, RedsStream *peer,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
@@ -307,7 +311,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
 error:
     spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
-    peer->cb_free(peer);
+    reds_stream_free(peer);
 
     return NULL;
 }
@@ -321,7 +325,7 @@ int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header,
     return TRUE;
 }
 
-RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create_parser(int size, RedsStream *peer,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
@@ -356,8 +360,7 @@ void red_channel_destroy(RedChannel *channel)
         return;
     }
     red_channel_pipe_clear(channel);
-    channel->core->watch_remove(channel->peer->watch);
-    channel->peer->cb_free(channel->peer);
+    reds_stream_free(channel->peer);
     spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
 }
diff --git a/server/red_channel.h b/server/red_channel.h
index 893a7f8..ae58522 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -110,7 +110,7 @@ typedef void (*channel_on_incoming_error_proc)(RedChannel *channel);
 typedef void (*channel_on_outgoing_error_proc)(RedChannel *channel);
 
 struct RedChannel {
-    RedsStreamContext *peer;
+    RedsStream *peer;
     SpiceCoreInterface *core;
     int migrate;
     int handle_acks;
@@ -154,7 +154,7 @@ struct RedChannel {
 
 /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
    explicitly destroy the channel */
-RedChannel *red_channel_create(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create(int size, RedsStream *peer,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
@@ -167,7 +167,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
 
 /* alternative constructor, meant for marshaller based (inputs,main) channels,
  * will become default eventually */
-RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create_parser(int size, RedsStream *peer,
                                SpiceCoreInterface *core,
                                int migrate, int handle_acks,
                                channel_configure_socket_proc config_socket,
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 2a3c297..3816e14 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -71,7 +71,7 @@ extern spice_wan_compression_t zlib_glz_state;
 
 static RedDispatcher *dispatchers = NULL;
 
-static void red_dispatcher_set_peer(Channel *channel, RedsStreamContext *peer, int migration,
+static void red_dispatcher_set_peer(Channel *channel, RedsStream *peer, int migration,
                                     int num_common_caps, uint32_t *common_caps, int num_caps,
                                     uint32_t *caps)
 {
@@ -81,7 +81,7 @@ static void red_dispatcher_set_peer(Channel *channel, RedsStreamContext *peer, i
     dispatcher = (RedDispatcher *)channel->data;
     RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
     write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &peer, sizeof(RedsStreamContext *));
+    send_data(dispatcher->channel, &peer, sizeof(RedsStream *));
     send_data(dispatcher->channel, &migration, sizeof(int));
 }
 
@@ -101,7 +101,7 @@ static void red_dispatcher_migrate(Channel *channel)
     write_message(dispatcher->channel, &message);
 }
 
-static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStreamContext *peer,
+static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *peer,
                                            int migration, int num_common_caps,
                                            uint32_t *common_caps, int num_caps,
                                            uint32_t *caps)
@@ -110,7 +110,7 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStreamContext *
     red_printf("");
     RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
     write_message(dispatcher->channel, &message);
-    send_data(dispatcher->channel, &peer, sizeof(RedsStreamContext *));
+    send_data(dispatcher->channel, &peer, sizeof(RedsStream *));
     send_data(dispatcher->channel, &migration, sizeof(int));
 }
 
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
index 6092a76..267de4a 100644
--- a/server/red_tunnel_worker.c
+++ b/server/red_tunnel_worker.c
@@ -598,7 +598,7 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
 
 
 /* reds interface */
-static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void handle_tunnel_channel_link(Channel *channel, RedsStream *peer, int migration,
                                        int num_common_caps, uint32_t *common_caps, int num_caps,
                                        uint32_t *caps);
 static void handle_tunnel_channel_shutdown(struct Channel *channel);
@@ -3420,7 +3420,7 @@ static void on_new_tunnel_channel(TunnelChannel *channel)
     }
 }
 
-static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void handle_tunnel_channel_link(Channel *channel, RedsStream *peer, int migration,
                                        int num_common_caps, uint32_t *common_caps, int num_caps,
                                        uint32_t *caps)
 {
diff --git a/server/red_worker.c b/server/red_worker.c
index dc7bc9e..446fae4 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -356,7 +356,7 @@ struct RedChannel {
     uint32_t id;
     spice_parse_channel_func_t parser;
     struct RedWorker *worker;
-    RedsStreamContext *peer;
+    RedsStream *peer;
     int migrate;
 
     Ring pipe;
@@ -7324,9 +7324,9 @@ static void inline channel_release_res(RedChannel *channel)
 static void red_send_data(RedChannel *channel, void *item)
 {
     for (;;) {
-        uint32_t n = channel->send_data.size - channel->send_data.pos;
+        ssize_t n = channel->send_data.size - channel->send_data.pos;
         struct iovec vec[MAX_SEND_VEC];
-        int vec_size;
+        size_t vec_size;
 
         if (!n) {
             channel->send_data.blocked = FALSE;
@@ -7339,7 +7339,8 @@ static void red_send_data(RedChannel *channel, void *item)
         vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
                                                vec, MAX_SEND_VEC, channel->send_data.pos);
         ASSERT(channel->peer);
-        if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
+        n = reds_stream_writev(channel->peer, vec, vec_size);
+        if (n == -1) {
             switch (errno) {
             case EAGAIN:
                 channel->send_data.blocked = TRUE;
@@ -8524,9 +8525,7 @@ static void red_disconnect_channel(RedChannel *channel)
 {
     channel_release_res(channel);
     red_pipe_clear(channel);
-
-    channel->peer->cb_free(channel->peer);
-
+    reds_stream_free(channel->peer);
     channel->peer = NULL;
     channel->send_data.blocked = FALSE;
     channel->send_data.size = channel->send_data.pos = 0;
@@ -9253,7 +9252,8 @@ static void red_receive(RedChannel *channel)
         n = channel->recive_data.end - channel->recive_data.now;
         ASSERT(n);
         ASSERT(channel->peer);
-        if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
+        n = reds_stream_read(channel->peer, channel->recive_data.now, n);
+        if (n <= 0) {
             if (n == 0) {
                 channel->disconnect(channel);
                 return;
@@ -9319,7 +9319,7 @@ static void red_receive(RedChannel *channel)
 }
 
 static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
-                                 RedsStreamContext *peer, int migrate,
+                                 RedsStream *peer, int migrate,
                                  event_listener_action_proc handler,
                                  disconnect_channel_proc disconnect,
                                  hold_item_proc hold_item,
@@ -9383,7 +9383,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
 error2:
     free(channel);
 error1:
-    peer->cb_free(peer);
+    reds_stream_free(peer);
 
     return NULL;
 }
@@ -9445,7 +9445,7 @@ static void display_channel_release_item(RedChannel *channel, void *item)
     }
 }
 
-static void handle_new_display_channel(RedWorker *worker, RedsStreamContext *peer, int migrate)
+static void handle_new_display_channel(RedWorker *worker, RedsStream *peer, int migrate)
 {
     DisplayChannel *display_channel;
     size_t stream_buf_size;
@@ -9568,7 +9568,7 @@ static void cursor_channel_release_item(RedChannel *channel, void *item)
     red_release_cursor(channel->worker, item);
 }
 
-static void red_connect_cursor(RedWorker *worker, RedsStreamContext *peer, int migrate)
+static void red_connect_cursor(RedWorker *worker, RedsStream *peer, int migrate)
 {
     CursorChannel *channel;
 
@@ -10004,11 +10004,11 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
         handle_dev_destroy_primary_surface(worker);
         break;
     case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
-        RedsStreamContext *peer;
+        RedsStream *peer;
         int migrate;
         red_printf("connect");
 
-        receive_data(worker->channel, &peer, sizeof(RedsStreamContext *));
+        receive_data(worker->channel, &peer, sizeof(RedsStream *));
         receive_data(worker->channel, &migrate, sizeof(int));
         handle_new_display_channel(worker, peer, migrate);
         break;
@@ -10052,11 +10052,11 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
         red_migrate_display(worker);
         break;
     case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
-        RedsStreamContext *peer;
+        RedsStream *peer;
         int migrate;
 
         red_printf("cursor connect");
-        receive_data(worker->channel, &peer, sizeof(RedsStreamContext *));
+        receive_data(worker->channel, &peer, sizeof(RedsStream *));
         receive_data(worker->channel, &migrate, sizeof(int));
         red_connect_cursor(worker, peer, migrate);
         break;
diff --git a/server/reds.c b/server/reds.c
index d92f701..d597e93 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -224,7 +224,7 @@ typedef struct RedsState {
 static RedsState *reds = NULL;
 
 typedef struct AsyncRead {
-    RedsStreamContext *peer;
+    RedsStream *peer;
     void *opaque;
     uint8_t *now;
     uint8_t *end;
@@ -233,7 +233,7 @@ typedef struct AsyncRead {
 } AsyncRead;
 
 typedef struct RedLinkInfo {
-    RedsStreamContext *peer;
+    RedsStream *peer;
     AsyncRead asyc_read;
     SpiceLinkHeader link_header;
     SpiceLinkMess *link_mess;
@@ -297,85 +297,65 @@ static ChannelSecurityOptions *find_channel_security(int id)
     return now;
 }
 
-static void reds_channel_event(RedsStreamContext *peer, int event)
+static void reds_channel_event(RedsStream *peer, int event)
 {
     if (core->base.minor_version < 3 || core->channel_event == NULL)
         return;
     core->channel_event(event, &peer->info);
 }
 
-static int reds_write(void *ctx, void *buf, size_t size)
+static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
 {
-    int return_code;
-    int sock = (long)ctx;
-    size_t count = size;
-
-    return_code = write(sock, buf, count);
-
-    return (return_code);
+    return write(s->socket, buf, size);
 }
 
-static int reds_read(void *ctx, void *buf, size_t size)
+static ssize_t stream_writev_cb(RedsStream *s, const struct iovec *iov, int iovcnt)
 {
-    int return_code;
-    int sock = (long)ctx;
-    size_t count = size;
-
-    return_code = read(sock, buf, count);
-
-    return (return_code);
+    return writev(s->socket, iov, iovcnt);
 }
 
-static int reds_free(RedsStreamContext *peer)
+static ssize_t stream_read_cb(RedsStream *s, void *buf, size_t size)
 {
-    reds_channel_event(peer, SPICE_CHANNEL_EVENT_DISCONNECTED);
-    close(peer->socket);
-    free(peer);
-    return 0;
+    return read(s->socket, buf, size);
 }
 
-static int reds_ssl_write(void *ctx, void *buf, size_t size)
+static ssize_t stream_ssl_write_cb(RedsStream *s, const void *buf, size_t size)
 {
     int return_code;
     int ssl_error;
-    SSL *ssl = ctx;
 
-    return_code = SSL_write(ssl, buf, size);
+    return_code = SSL_write(s->ssl, buf, size);
 
-    if (return_code < 0) {
-        ssl_error = SSL_get_error(ssl, return_code);
-    }
+    if (return_code < 0)
+        ssl_error = SSL_get_error(s->ssl, return_code);
 
-    return (return_code);
+    return return_code;
 }
 
-static int reds_ssl_read(void *ctx, void *buf, size_t size)
+static ssize_t stream_ssl_read_cb(RedsStream *s, void *buf, size_t size)
 {
     int return_code;
     int ssl_error;
-    SSL *ssl = ctx;
 
-    return_code = SSL_read(ssl, buf, size);
+    return_code = SSL_read(s->ssl, buf, size);
 
-    if (return_code < 0) {
-        ssl_error = SSL_get_error(ssl, return_code);
-    }
+    if (return_code < 0)
+        ssl_error = SSL_get_error(s->ssl, return_code);
 
-    return (return_code);
+    return return_code;
 }
 
-static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
+static ssize_t stream_ssl_writev_cb(RedsStream *s, const struct iovec *vector, int count)
 {
     int i;
     int n;
-    int return_code = 0;
+    ssize_t return_code = 0;
     int ssl_error;
-    SSL *ssl = ctx;
 
     for (i = 0; i < count; ++i) {
-        n = SSL_write(ssl, vector[i].iov_base, vector[i].iov_len);
+        n = SSL_write(s->ssl, vector[i].iov_base, vector[i].iov_len);
         if (n <= 0) {
-            ssl_error = SSL_get_error(ssl, n);
+            ssl_error = SSL_get_error(s->ssl, n);
             if (return_code <= 0) {
                 return n;
             } else {
@@ -389,35 +369,31 @@ static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
     return return_code;
 }
 
-static int reds_ssl_free(RedsStreamContext *peer)
+static void reds_stream_remove_watch(RedsStream* s)
 {
-    reds_channel_event(peer, SPICE_CHANNEL_EVENT_DISCONNECTED);
-    SSL_free(peer->ssl);
-    close(peer->socket);
-    free(peer);
-    return 0;
+    if (s->watch) {
+        core->watch_remove(s->watch);
+        s->watch = NULL;
+    }
 }
 
-static void __reds_release_link(RedLinkInfo *link)
+static void reds_link_free(RedLinkInfo *link)
 {
-    ASSERT(link->peer);
-    if (link->peer->watch) {
-        core->watch_remove(link->peer->watch);
-        link->peer->watch = NULL;
-    }
+    reds_stream_free(link->peer);
+    link->peer = NULL;
+
     free(link->link_mess);
+    link->link_mess = NULL;
+
     BN_free(link->tiTicketing.bn);
+    link->tiTicketing.bn = NULL;
+
     if (link->tiTicketing.rsa) {
         RSA_free(link->tiTicketing.rsa);
+        link->tiTicketing.rsa = NULL;
     }
-    free(link);
-}
 
-static inline void reds_release_link(RedLinkInfo *link)
-{
-    RedsStreamContext *peer = link->peer;
-    __reds_release_link(link);
-    peer->cb_free(peer);
+    free(link);
 }
 
 #ifdef RED_STATISTICS
@@ -1367,11 +1343,11 @@ void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
     while (write_to_vdi_port() || read_from_vdi_port());
 }
 
-static int sync_write(RedsStreamContext *peer, void *in_buf, size_t n)
+static int sync_write(RedsStream *peer, void *in_buf, size_t n)
 {
     uint8_t *buf = (uint8_t *)in_buf;
     while (n) {
-        int now = peer->cb_write(peer->ctx, buf, n);
+        int now = reds_stream_write(peer, buf, n);
         if (now <= 0) {
             if (now == -1 && (errno == EINTR || errno == EAGAIN)) {
                 continue;
@@ -1479,7 +1455,7 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
 // actually be joined with reds_handle_other_links, ebcome reds_handle_link
 static void reds_handle_main_link(RedLinkInfo *link)
 {
-    RedsStreamContext *peer;
+    RedsStream *peer;
     SpiceLinkMess *link_mess;
     uint32_t *caps;
     uint32_t connection_id;
@@ -1497,7 +1473,7 @@ static void reds_handle_main_link(RedLinkInfo *link)
     } else {
         if (link_mess->connection_id != reds->link_id) {
             reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
-            reds_release_link(link);
+            reds_link_free(link);
             return;
         }
         reds_send_link_result(link, SPICE_LINK_ERR_OK);
@@ -1512,8 +1488,10 @@ static void reds_handle_main_link(RedLinkInfo *link)
 
     reds_show_new_channel(link, connection_id);
     peer = link->peer;
+    reds_stream_remove_watch(peer);
+    link->peer = NULL;
     link->link_mess = NULL;
-    __reds_release_link(link);
+    reds_link_free(link);
     caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
     reds->main_channel = main_channel_init();
     reds->main_channel->link(reds->main_channel, peer, reds->mig_target, link_mess->num_common_caps,
@@ -1580,7 +1558,7 @@ static void openssl_init(RedLinkInfo *link)
 static void reds_handle_other_links(RedLinkInfo *link)
 {
     Channel *channel;
-    RedsStreamContext *peer;
+    RedsStream *peer;
     SpiceLinkMess *link_mess;
     uint32_t *caps;
 
@@ -1588,14 +1566,14 @@ static void reds_handle_other_links(RedLinkInfo *link)
 
     if (!reds->link_id || reds->link_id != link_mess->connection_id) {
         reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
     if (!(channel = reds_find_channel(link_mess->channel_type,
                                       link_mess->channel_id))) {
         reds_send_link_result(link, SPICE_LINK_ERR_CHANNEL_NOT_AVAILABLE);
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
@@ -1607,8 +1585,10 @@ static void reds_handle_other_links(RedLinkInfo *link)
         main_channel_push_notify(reds->main_channel, (uint8_t*)mess, mess_len);
     }
     peer = link->peer;
+    reds_stream_remove_watch(peer);
+    link->peer = NULL;
     link->link_mess = NULL;
-    __reds_release_link(link);
+    reds_link_free(link);
     caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
     channel->link(channel, peer, reds->mig_target, link_mess->num_common_caps,
                   link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
@@ -1636,13 +1616,13 @@ static void reds_handle_ticket(void *opaque)
             reds_send_link_result(link, SPICE_LINK_ERR_PERMISSION_DENIED);
             red_printf("Ticketing is enabled, but no password is set. "
                        "please set a ticket first");
-            reds_release_link(link);
+            reds_link_free(link);
             return;
         }
 
         if (expired || strncmp(password, actual_sever_pass, SPICE_MAX_PASSWORD_LENGTH) != 0) {
             reds_send_link_result(link, SPICE_LINK_ERR_PERMISSION_DENIED);
-            reds_release_link(link);
+            reds_link_free(link);
             return;
         }
     }
@@ -1670,7 +1650,8 @@ static void async_read_handler(int fd, int event, void *data)
         int n = obj->end - obj->now;
 
         ASSERT(n > 0);
-        if ((n = obj->peer->cb_read(obj->peer->ctx, obj->now, n)) <= 0) {
+        n = reds_stream_read(obj->peer, obj->now, n);
+        if (n <= 0) {
             if (n < 0) {
                 switch (errno) {
                 case EAGAIN:
@@ -1722,7 +1703,7 @@ static void reds_handle_read_link_done(void *opaque)
                                                    link->link_header.size ||
                                                      link_mess->caps_offset < sizeof(*link_mess))) {
         reds_send_link_error(link, SPICE_LINK_ERR_INVALID_DATA);
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
@@ -1734,12 +1715,12 @@ static void reds_handle_read_link_done(void *opaque)
             red_printf("spice channels %d should be encrypted", link_mess->channel_type);
             reds_send_link_error(link, SPICE_LINK_ERR_NEED_SECURED);
         }
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
     if (!reds_send_link_ack(link)) {
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
@@ -1760,7 +1741,7 @@ static void reds_handle_link_error(void *opaque, int err)
         red_printf("%s", strerror(errno));
         break;
     }
-    reds_release_link(link);
+    reds_link_free(link);
 }
 
 static void reds_handle_read_header_done(void *opaque)
@@ -1771,7 +1752,7 @@ static void reds_handle_read_header_done(void *opaque)
 
     if (header->magic != SPICE_MAGIC) {
         reds_send_link_error(link, SPICE_LINK_ERR_INVALID_MAGIC);
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
@@ -1781,7 +1762,7 @@ static void reds_handle_read_header_done(void *opaque)
         }
 
         red_printf("version mismatch");
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
@@ -1790,7 +1771,7 @@ static void reds_handle_read_header_done(void *opaque)
     if (header->size < sizeof(SpiceLinkMess)) {
         reds_send_link_error(link, SPICE_LINK_ERR_INVALID_DATA);
         red_printf("bad size %u", header->size);
-        reds_release_link(link);
+        reds_link_free(link);
         return;
     }
 
@@ -1823,7 +1804,7 @@ static void reds_handle_ssl_accept(int fd, int event, void *data)
         int ssl_error = SSL_get_error(link->peer->ssl, return_code);
         if (ssl_error != SSL_ERROR_WANT_READ && ssl_error != SSL_ERROR_WANT_WRITE) {
             red_printf("SSL_accept failed, error=%d", ssl_error);
-            reds_release_link(link);
+            reds_link_free(link);
         } else {
             if (ssl_error == SSL_ERROR_WANT_READ) {
                 core->watch_update_mask(link->peer->watch, SPICE_WATCH_EVENT_READ);
@@ -1841,7 +1822,7 @@ static void reds_handle_ssl_accept(int fd, int event, void *data)
 static RedLinkInfo *__reds_accept_connection(int listen_socket)
 {
     RedLinkInfo *link;
-    RedsStreamContext *peer;
+    RedsStream *peer;
     int delay_val = 1;
     int flags;
     int socket;
@@ -1866,7 +1847,7 @@ static RedLinkInfo *__reds_accept_connection(int listen_socket)
     }
 
     link = spice_new0(RedLinkInfo, 1);
-    peer = spice_new0(RedsStreamContext, 1);
+    peer = spice_new0(RedsStream, 1);
     link->peer = peer;
     peer->socket = socket;
 
@@ -1890,17 +1871,16 @@ error:
 static RedLinkInfo *reds_accept_connection(int listen_socket)
 {
     RedLinkInfo *link;
-    RedsStreamContext *peer;
+    RedsStream *peer;
 
     if (!(link = __reds_accept_connection(listen_socket))) {
         return NULL;
     }
     peer = link->peer;
     peer->ctx = (void *)((unsigned long)link->peer->socket);
-    peer->cb_read = (int (*)(void *, void *, int))reds_read;
-    peer->cb_write = (int (*)(void *, void *, int))reds_write;
-    peer->cb_writev = (int (*)(void *, const struct iovec *vector, int count))writev;
-    peer->cb_free = (int (*)(RedsStreamContext *))reds_free;
+    peer->read = stream_read_cb;
+    peer->write = stream_write_cb;
+    peer->writev = stream_writev_cb;
 
     return link;
 }
@@ -1933,10 +1913,9 @@ static void reds_accept_ssl_connection(int fd, int event, void *data)
     SSL_set_bio(link->peer->ssl, sbio, sbio);
 
     link->peer->ctx = (void *)(link->peer->ssl);
-    link->peer->cb_write = (int (*)(void *, void *, int))reds_ssl_write;
-    link->peer->cb_read = (int (*)(void *, void *, int))reds_ssl_read;
-    link->peer->cb_writev = reds_ssl_writev;
-    link->peer->cb_free = (int (*)(RedsStreamContext *))reds_ssl_free;
+    link->peer->write = stream_ssl_write_cb;
+    link->peer->read = stream_ssl_read_cb;
+    link->peer->writev = stream_ssl_writev_cb;
 
     return_code = SSL_accept(link->peer->ssl);
     if (return_code == 1) {
@@ -3169,3 +3148,34 @@ __visible__ int spice_server_migrate_switch(SpiceServer *s)
     reds_mig_switch();
     return 0;
 }
+
+ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte)
+{
+    return s->read(s, buf, nbyte);
+}
+
+ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte)
+{
+    return s->write(s, buf, nbyte);
+}
+
+ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt)
+{
+    return s->writev(s, iov, iovcnt);
+}
+
+void reds_stream_free(RedsStream *s)
+{
+    if (!s)
+        return;
+
+    reds_channel_event(s, SPICE_CHANNEL_EVENT_DISCONNECTED);
+
+    if (s->ssl)
+        SSL_free(s->ssl);
+
+    reds_stream_remove_watch(s);
+    close(s->socket);
+
+    free(s);
+}
diff --git a/server/reds.h b/server/reds.h
index 547c33c..63b73c4 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -28,7 +28,9 @@
 
 #define __visible__ __attribute__ ((visibility ("default")))
 
-typedef struct RedsStreamContext {
+typedef struct RedsStream RedsStream;
+
+struct RedsStream {
     void *ctx;
 
     int socket;
@@ -41,12 +43,11 @@ typedef struct RedsStreamContext {
 
     SpiceChannelEventInfo info;
 
-    int (*cb_write)(void *, void *, int);
-    int (*cb_read)(void *, void *, int);
-
-    int (*cb_writev)(void *, const struct iovec *vector, int count);
-    int (*cb_free)(struct RedsStreamContext *);
-} RedsStreamContext;
+    /* private */
+    ssize_t (*read)(RedsStream *s, void *buf, size_t nbyte);
+    ssize_t (*write)(RedsStream *s, const void *buf, size_t nbyte);
+    ssize_t (*writev)(RedsStream *s, const struct iovec *iov, int iovcnt);
+};
 
 typedef struct Channel {
     struct Channel *next;
@@ -56,7 +57,7 @@ typedef struct Channel {
     uint32_t *common_caps;
     int num_caps;
     uint32_t *caps;
-    void (*link)(struct Channel *, RedsStreamContext *peer, int migration, int num_common_caps,
+    void (*link)(struct Channel *, RedsStream *peer, int migration, int num_common_caps,
                  uint32_t *common_caps, int num_caps, uint32_t *caps);
     void (*shutdown)(struct Channel *);
     void (*migrate)(struct Channel *);
@@ -73,6 +74,11 @@ struct SpiceNetWireState {
     struct TunnelWorker *worker;
 };
 
+ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte);
+ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte);
+ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt);
+void reds_stream_free(RedsStream *s);
+
 void reds_desable_mm_timer();
 void reds_enable_mm_timer();
 void reds_update_mm_timer(uint32_t mm_time);
diff --git a/server/smartcard.c b/server/smartcard.c
index 7c0a5aa..a7d26b6 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -485,7 +485,7 @@ static int smartcard_channel_handle_message(RedChannel *channel, SpiceDataHeader
     return TRUE;
 }
 
-static void smartcard_link(Channel *channel, RedsStreamContext *peer,
+static void smartcard_link(Channel *channel, RedsStream *peer,
                         int migration, int num_common_caps,
                         uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
diff --git a/server/snd_worker.c b/server/snd_worker.c
index 6c0f9d6..2382a29 100644
--- a/server/snd_worker.c
+++ b/server/snd_worker.c
@@ -73,7 +73,7 @@ typedef void (*cleanup_channel_proc)(SndChannel *channel);
 typedef struct SndWorker SndWorker;
 
 struct SndChannel {
-    RedsStreamContext *peer;
+    RedsStream *peer;
     SndWorker *worker;
     spice_parse_channel_func_t parser;
 
@@ -180,15 +180,15 @@ static void snd_disconnect_channel(SndChannel *channel)
 {
     SndWorker *worker;
 
-    if (!channel) {
+    if (!channel)
         return;
-    }
+
     channel->cleanup(channel);
     worker = channel->worker;
     worker->connection = NULL;
     core->watch_remove(channel->peer->watch);
     channel->peer->watch = NULL;
-    channel->peer->cb_free(channel->peer);
+    reds_stream_free(channel->peer);
     spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
 }
@@ -243,7 +243,8 @@ static int snd_send_data(SndChannel *channel)
 
         vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
                                                vec, MAX_SEND_VEC, channel->send_data.pos);
-        if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
+        n = reds_stream_writev(channel->peer, vec, vec_size);
+        if (n == -1) {
             switch (errno) {
             case EAGAIN:
                 channel->blocked = TRUE;
@@ -389,7 +390,8 @@ static void snd_receive(void* data)
         ssize_t n;
         n = channel->recive_data.end - channel->recive_data.now;
         ASSERT(n);
-        if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
+        n = reds_stream_read(channel->peer, channel->recive_data.now, n);
+        if (n <= 0) {
             if (n == 0) {
                 snd_disconnect_channel(channel);
                 return;
@@ -734,7 +736,7 @@ static void snd_record_send(void* data)
 }
 
 static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id,
-                                 RedsStreamContext *peer,
+                                 RedsStream *peer,
                                  int migrate, send_messages_proc send_messages,
                                  handle_message_proc handle_message,
                                  on_message_done_proc on_message_done,
@@ -800,7 +802,7 @@ error2:
     free(channel);
 
 error1:
-    peer->cb_free(peer);
+    reds_stream_free(peer);
     return NULL;
 }
 
@@ -931,7 +933,7 @@ static void snd_playback_cleanup(SndChannel *channel)
     celt051_mode_destroy(playback_channel->celt_mode);
 }
 
-static void snd_set_playback_peer(Channel *channel, RedsStreamContext *peer, int migration,
+static void snd_set_playback_peer(Channel *channel, RedsStream *peer, int migration,
                                   int num_common_caps, uint32_t *common_caps, int num_caps,
                                   uint32_t *caps)
 {
@@ -1097,7 +1099,7 @@ static void snd_record_cleanup(SndChannel *channel)
     celt051_mode_destroy(record_channel->celt_mode);
 }
 
-static void snd_set_record_peer(Channel *channel, RedsStreamContext *peer, int migration,
+static void snd_set_record_peer(Channel *channel, RedsStream *peer, int migration,
                                 int num_common_caps, uint32_t *common_caps, int num_caps,
                                 uint32_t *caps)
 {
-- 
1.7.4



More information about the Spice-devel mailing list