[Spice-devel] [PATCH 02/13] server: use proper methods for Stream read/write()

Alon Levy alevy at redhat.com
Thu Feb 24 08:30:22 PST 2011


On Tue, Feb 22, 2011 at 05:08:56PM +0100, Marc-André Lureau wrote:
> This allows easier modification of the underlying IO.
> 
> We also avoid using the "generic ctx pointer"
> 
> Also rename StreamContext for Stream, stylistic change (it's obviously
> a context, no?).

I'm really having a problem reviewing this one - it's got a number of changes
going on at once. It seems to be good, but could you split it? renames aside,
and actual contents (moving the event notification, I can't be sure it still happens
at the correct points). Also not sure if all the renames are good (removing the cb_
prefix? I don't care one way or the other, but seems like just churn).

> ---
>  server/inputs_channel.c    |    2 +-
>  server/main_channel.c      |    2 +-
>  server/red_channel.c       |   25 +++---
>  server/red_channel.h       |    6 +-
>  server/red_dispatcher.c    |    8 +-
>  server/red_tunnel_worker.c |    4 +-
>  server/red_worker.c        |   32 ++++----
>  server/reds.c              |  196 +++++++++++++++++++++++---------------------
>  server/reds.h              |   22 +++--
>  server/smartcard.c         |    2 +-
>  server/snd_worker.c        |   22 +++--
>  11 files changed, 171 insertions(+), 150 deletions(-)
> 
> diff --git a/server/inputs_channel.c b/server/inputs_channel.c
> index b7ae55a..0781e62 100644
> --- a/server/inputs_channel.c
> +++ b/server/inputs_channel.c
> @@ -508,7 +508,7 @@ static int inputs_channel_config_socket(RedChannel *channel)
>      return TRUE;
>  }
>  
> -static void inputs_link(Channel *channel, RedsStreamContext *peer, int migration,
> +static void inputs_link(Channel *channel, RedsStream *peer, int migration,
>                          int num_common_caps, uint32_t *common_caps, int num_caps,
>                          uint32_t *caps)
>  {
> diff --git a/server/main_channel.c b/server/main_channel.c
> index f1fb4c6..ec234dd 100644
> --- a/server/main_channel.c
> +++ b/server/main_channel.c
> @@ -776,7 +776,7 @@ static int main_channel_config_socket(RedChannel *channel)
>      return TRUE;
>  }
>  
> -static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
> +static void main_channel_link(Channel *channel, RedsStream *peer, int migration,
>                          int num_common_caps, uint32_t *common_caps, int num_caps,
>                          uint32_t *caps)
>  {
> diff --git a/server/red_channel.c b/server/red_channel.c
> index a13ef0e..36e9f68 100644
> --- a/server/red_channel.c
> +++ b/server/red_channel.c
> @@ -34,7 +34,7 @@ static void red_channel_pipe_clear(RedChannel *channel);
>  static void red_channel_event(int fd, int event, void *data);
>  
>  /* return the number of bytes read. -1 in case of error */
> -static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size)
> +static int red_peer_receive(RedsStream *peer, uint8_t *buf, uint32_t size)
>  {
>      uint8_t *pos = buf;
>      while (size) {
> @@ -42,7 +42,8 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
>          if (peer->shutdown) {
>              return -1;
>          }
> -        if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) {
> +        now = reds_stream_read(peer, pos, size);
> +        if (now <= 0) {
>              if (now == 0) {
>                  return -1;
>              }
> @@ -65,7 +66,7 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
>      return pos - buf;
>  }
>  
> -static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
> +static void red_peer_handle_incoming(RedsStream *peer, IncomingHandler *handler)
>  {
>      int bytes_read;
>      uint8_t *parsed;
> @@ -143,9 +144,10 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
>      }
>  }
>  
> -static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
> +static void red_peer_handle_outgoing(RedsStream *peer, OutgoingHandler *handler)
>  {
> -    int n;
> +    ssize_t n;
> +
>      if (handler->size == 0) {
>          handler->vec = handler->vec_buf;
>          handler->size = handler->get_msg_size(handler->opaque);
> @@ -153,9 +155,11 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
>              return;
>          }
>      }
> +
>      for (;;) {
>          handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
> -        if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
> +        n = reds_stream_writev(peer, handler->vec, handler->vec_size);
> +        if (n == -1) {
>              switch (errno) {
>              case EAGAIN:
>                  handler->on_block(handler->opaque);
> @@ -243,7 +247,7 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
>      }
>  }
>  
> -RedChannel *red_channel_create(int size, RedsStreamContext *peer,
> +RedChannel *red_channel_create(int size, RedsStream *peer,
>                                 SpiceCoreInterface *core,
>                                 int migrate, int handle_acks,
>                                 channel_configure_socket_proc config_socket,
> @@ -307,7 +311,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
>  error:
>      spice_marshaller_destroy(channel->send_data.marshaller);
>      free(channel);
> -    peer->cb_free(peer);
> +    reds_stream_free(peer);
>  
>      return NULL;
>  }
> @@ -321,7 +325,7 @@ int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header,
>      return TRUE;
>  }
>  
> -RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
> +RedChannel *red_channel_create_parser(int size, RedsStream *peer,
>                                 SpiceCoreInterface *core,
>                                 int migrate, int handle_acks,
>                                 channel_configure_socket_proc config_socket,
> @@ -356,8 +360,7 @@ void red_channel_destroy(RedChannel *channel)
>          return;
>      }
>      red_channel_pipe_clear(channel);
> -    channel->core->watch_remove(channel->peer->watch);
> -    channel->peer->cb_free(channel->peer);
> +    reds_stream_free(channel->peer);
>      spice_marshaller_destroy(channel->send_data.marshaller);
>      free(channel);
>  }
> diff --git a/server/red_channel.h b/server/red_channel.h
> index 893a7f8..ae58522 100644
> --- a/server/red_channel.h
> +++ b/server/red_channel.h
> @@ -110,7 +110,7 @@ typedef void (*channel_on_incoming_error_proc)(RedChannel *channel);
>  typedef void (*channel_on_outgoing_error_proc)(RedChannel *channel);
>  
>  struct RedChannel {
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      SpiceCoreInterface *core;
>      int migrate;
>      int handle_acks;
> @@ -154,7 +154,7 @@ struct RedChannel {
>  
>  /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
>     explicitly destroy the channel */
> -RedChannel *red_channel_create(int size, RedsStreamContext *peer,
> +RedChannel *red_channel_create(int size, RedsStream *peer,
>                                 SpiceCoreInterface *core,
>                                 int migrate, int handle_acks,
>                                 channel_configure_socket_proc config_socket,
> @@ -167,7 +167,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
>  
>  /* alternative constructor, meant for marshaller based (inputs,main) channels,
>   * will become default eventually */
> -RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
> +RedChannel *red_channel_create_parser(int size, RedsStream *peer,
>                                 SpiceCoreInterface *core,
>                                 int migrate, int handle_acks,
>                                 channel_configure_socket_proc config_socket,
> diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
> index 2a3c297..3816e14 100644
> --- a/server/red_dispatcher.c
> +++ b/server/red_dispatcher.c
> @@ -71,7 +71,7 @@ extern spice_wan_compression_t zlib_glz_state;
>  
>  static RedDispatcher *dispatchers = NULL;
>  
> -static void red_dispatcher_set_peer(Channel *channel, RedsStreamContext *peer, int migration,
> +static void red_dispatcher_set_peer(Channel *channel, RedsStream *peer, int migration,
>                                      int num_common_caps, uint32_t *common_caps, int num_caps,
>                                      uint32_t *caps)
>  {
> @@ -81,7 +81,7 @@ static void red_dispatcher_set_peer(Channel *channel, RedsStreamContext *peer, i
>      dispatcher = (RedDispatcher *)channel->data;
>      RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
>      write_message(dispatcher->channel, &message);
> -    send_data(dispatcher->channel, &peer, sizeof(RedsStreamContext *));
> +    send_data(dispatcher->channel, &peer, sizeof(RedsStream *));
>      send_data(dispatcher->channel, &migration, sizeof(int));
>  }
>  
> @@ -101,7 +101,7 @@ static void red_dispatcher_migrate(Channel *channel)
>      write_message(dispatcher->channel, &message);
>  }
>  
> -static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStreamContext *peer,
> +static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *peer,
>                                             int migration, int num_common_caps,
>                                             uint32_t *common_caps, int num_caps,
>                                             uint32_t *caps)
> @@ -110,7 +110,7 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStreamContext *
>      red_printf("");
>      RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
>      write_message(dispatcher->channel, &message);
> -    send_data(dispatcher->channel, &peer, sizeof(RedsStreamContext *));
> +    send_data(dispatcher->channel, &peer, sizeof(RedsStream *));
>      send_data(dispatcher->channel, &migration, sizeof(int));
>  }
>  
> diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
> index 6092a76..267de4a 100644
> --- a/server/red_tunnel_worker.c
> +++ b/server/red_tunnel_worker.c
> @@ -598,7 +598,7 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
>  
>  
>  /* reds interface */
> -static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
> +static void handle_tunnel_channel_link(Channel *channel, RedsStream *peer, int migration,
>                                         int num_common_caps, uint32_t *common_caps, int num_caps,
>                                         uint32_t *caps);
>  static void handle_tunnel_channel_shutdown(struct Channel *channel);
> @@ -3420,7 +3420,7 @@ static void on_new_tunnel_channel(TunnelChannel *channel)
>      }
>  }
>  
> -static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
> +static void handle_tunnel_channel_link(Channel *channel, RedsStream *peer, int migration,
>                                         int num_common_caps, uint32_t *common_caps, int num_caps,
>                                         uint32_t *caps)
>  {
> diff --git a/server/red_worker.c b/server/red_worker.c
> index dc7bc9e..446fae4 100644
> --- a/server/red_worker.c
> +++ b/server/red_worker.c
> @@ -356,7 +356,7 @@ struct RedChannel {
>      uint32_t id;
>      spice_parse_channel_func_t parser;
>      struct RedWorker *worker;
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      int migrate;
>  
>      Ring pipe;
> @@ -7324,9 +7324,9 @@ static void inline channel_release_res(RedChannel *channel)
>  static void red_send_data(RedChannel *channel, void *item)
>  {
>      for (;;) {
> -        uint32_t n = channel->send_data.size - channel->send_data.pos;
> +        ssize_t n = channel->send_data.size - channel->send_data.pos;
>          struct iovec vec[MAX_SEND_VEC];
> -        int vec_size;
> +        size_t vec_size;
>  
>          if (!n) {
>              channel->send_data.blocked = FALSE;
> @@ -7339,7 +7339,8 @@ static void red_send_data(RedChannel *channel, void *item)
>          vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
>                                                 vec, MAX_SEND_VEC, channel->send_data.pos);
>          ASSERT(channel->peer);
> -        if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
> +        n = reds_stream_writev(channel->peer, vec, vec_size);
> +        if (n == -1) {
>              switch (errno) {
>              case EAGAIN:
>                  channel->send_data.blocked = TRUE;
> @@ -8524,9 +8525,7 @@ static void red_disconnect_channel(RedChannel *channel)
>  {
>      channel_release_res(channel);
>      red_pipe_clear(channel);
> -
> -    channel->peer->cb_free(channel->peer);
> -
> +    reds_stream_free(channel->peer);
>      channel->peer = NULL;
>      channel->send_data.blocked = FALSE;
>      channel->send_data.size = channel->send_data.pos = 0;
> @@ -9253,7 +9252,8 @@ static void red_receive(RedChannel *channel)
>          n = channel->recive_data.end - channel->recive_data.now;
>          ASSERT(n);
>          ASSERT(channel->peer);
> -        if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
> +        n = reds_stream_read(channel->peer, channel->recive_data.now, n);
> +        if (n <= 0) {
>              if (n == 0) {
>                  channel->disconnect(channel);
>                  return;
> @@ -9319,7 +9319,7 @@ static void red_receive(RedChannel *channel)
>  }
>  
>  static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
> -                                 RedsStreamContext *peer, int migrate,
> +                                 RedsStream *peer, int migrate,
>                                   event_listener_action_proc handler,
>                                   disconnect_channel_proc disconnect,
>                                   hold_item_proc hold_item,
> @@ -9383,7 +9383,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
>  error2:
>      free(channel);
>  error1:
> -    peer->cb_free(peer);
> +    reds_stream_free(peer);
>  
>      return NULL;
>  }
> @@ -9445,7 +9445,7 @@ static void display_channel_release_item(RedChannel *channel, void *item)
>      }
>  }
>  
> -static void handle_new_display_channel(RedWorker *worker, RedsStreamContext *peer, int migrate)
> +static void handle_new_display_channel(RedWorker *worker, RedsStream *peer, int migrate)
>  {
>      DisplayChannel *display_channel;
>      size_t stream_buf_size;
> @@ -9568,7 +9568,7 @@ static void cursor_channel_release_item(RedChannel *channel, void *item)
>      red_release_cursor(channel->worker, item);
>  }
>  
> -static void red_connect_cursor(RedWorker *worker, RedsStreamContext *peer, int migrate)
> +static void red_connect_cursor(RedWorker *worker, RedsStream *peer, int migrate)
>  {
>      CursorChannel *channel;
>  
> @@ -10004,11 +10004,11 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
>          handle_dev_destroy_primary_surface(worker);
>          break;
>      case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
> -        RedsStreamContext *peer;
> +        RedsStream *peer;
>          int migrate;
>          red_printf("connect");
>  
> -        receive_data(worker->channel, &peer, sizeof(RedsStreamContext *));
> +        receive_data(worker->channel, &peer, sizeof(RedsStream *));
>          receive_data(worker->channel, &migrate, sizeof(int));
>          handle_new_display_channel(worker, peer, migrate);
>          break;
> @@ -10052,11 +10052,11 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
>          red_migrate_display(worker);
>          break;
>      case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
> -        RedsStreamContext *peer;
> +        RedsStream *peer;
>          int migrate;
>  
>          red_printf("cursor connect");
> -        receive_data(worker->channel, &peer, sizeof(RedsStreamContext *));
> +        receive_data(worker->channel, &peer, sizeof(RedsStream *));
>          receive_data(worker->channel, &migrate, sizeof(int));
>          red_connect_cursor(worker, peer, migrate);
>          break;
> diff --git a/server/reds.c b/server/reds.c
> index d92f701..d597e93 100644
> --- a/server/reds.c
> +++ b/server/reds.c
> @@ -224,7 +224,7 @@ typedef struct RedsState {
>  static RedsState *reds = NULL;
>  
>  typedef struct AsyncRead {
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      void *opaque;
>      uint8_t *now;
>      uint8_t *end;
> @@ -233,7 +233,7 @@ typedef struct AsyncRead {
>  } AsyncRead;
>  
>  typedef struct RedLinkInfo {
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      AsyncRead asyc_read;
>      SpiceLinkHeader link_header;
>      SpiceLinkMess *link_mess;
> @@ -297,85 +297,65 @@ static ChannelSecurityOptions *find_channel_security(int id)
>      return now;
>  }
>  
> -static void reds_channel_event(RedsStreamContext *peer, int event)
> +static void reds_channel_event(RedsStream *peer, int event)
>  {
>      if (core->base.minor_version < 3 || core->channel_event == NULL)
>          return;
>      core->channel_event(event, &peer->info);
>  }
>  
> -static int reds_write(void *ctx, void *buf, size_t size)
> +static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
>  {
> -    int return_code;
> -    int sock = (long)ctx;
> -    size_t count = size;
> -
> -    return_code = write(sock, buf, count);
> -
> -    return (return_code);
> +    return write(s->socket, buf, size);
>  }
>  
> -static int reds_read(void *ctx, void *buf, size_t size)
> +static ssize_t stream_writev_cb(RedsStream *s, const struct iovec *iov, int iovcnt)
>  {
> -    int return_code;
> -    int sock = (long)ctx;
> -    size_t count = size;
> -
> -    return_code = read(sock, buf, count);
> -
> -    return (return_code);
> +    return writev(s->socket, iov, iovcnt);
>  }
>  
> -static int reds_free(RedsStreamContext *peer)
> +static ssize_t stream_read_cb(RedsStream *s, void *buf, size_t size)
>  {
> -    reds_channel_event(peer, SPICE_CHANNEL_EVENT_DISCONNECTED);
> -    close(peer->socket);
> -    free(peer);
> -    return 0;
> +    return read(s->socket, buf, size);
>  }
>  
> -static int reds_ssl_write(void *ctx, void *buf, size_t size)
> +static ssize_t stream_ssl_write_cb(RedsStream *s, const void *buf, size_t size)
>  {
>      int return_code;
>      int ssl_error;
> -    SSL *ssl = ctx;
>  
> -    return_code = SSL_write(ssl, buf, size);
> +    return_code = SSL_write(s->ssl, buf, size);
>  
> -    if (return_code < 0) {
> -        ssl_error = SSL_get_error(ssl, return_code);
> -    }
> +    if (return_code < 0)
> +        ssl_error = SSL_get_error(s->ssl, return_code);
>  
> -    return (return_code);
> +    return return_code;
>  }
>  
> -static int reds_ssl_read(void *ctx, void *buf, size_t size)
> +static ssize_t stream_ssl_read_cb(RedsStream *s, void *buf, size_t size)
>  {
>      int return_code;
>      int ssl_error;
> -    SSL *ssl = ctx;
>  
> -    return_code = SSL_read(ssl, buf, size);
> +    return_code = SSL_read(s->ssl, buf, size);
>  
> -    if (return_code < 0) {
> -        ssl_error = SSL_get_error(ssl, return_code);
> -    }
> +    if (return_code < 0)
> +        ssl_error = SSL_get_error(s->ssl, return_code);
>  
> -    return (return_code);
> +    return return_code;
>  }
>  
> -static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
> +static ssize_t stream_ssl_writev_cb(RedsStream *s, const struct iovec *vector, int count)
>  {
>      int i;
>      int n;
> -    int return_code = 0;
> +    ssize_t return_code = 0;
>      int ssl_error;
> -    SSL *ssl = ctx;
>  
>      for (i = 0; i < count; ++i) {
> -        n = SSL_write(ssl, vector[i].iov_base, vector[i].iov_len);
> +        n = SSL_write(s->ssl, vector[i].iov_base, vector[i].iov_len);
>          if (n <= 0) {
> -            ssl_error = SSL_get_error(ssl, n);
> +            ssl_error = SSL_get_error(s->ssl, n);
>              if (return_code <= 0) {
>                  return n;
>              } else {
> @@ -389,35 +369,31 @@ static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
>      return return_code;
>  }
>  
> -static int reds_ssl_free(RedsStreamContext *peer)
> +static void reds_stream_remove_watch(RedsStream* s)
>  {
> -    reds_channel_event(peer, SPICE_CHANNEL_EVENT_DISCONNECTED);
> -    SSL_free(peer->ssl);
> -    close(peer->socket);
> -    free(peer);
> -    return 0;
> +    if (s->watch) {
> +        core->watch_remove(s->watch);
> +        s->watch = NULL;
> +    }
>  }
>  
> -static void __reds_release_link(RedLinkInfo *link)
> +static void reds_link_free(RedLinkInfo *link)
>  {
> -    ASSERT(link->peer);
> -    if (link->peer->watch) {
> -        core->watch_remove(link->peer->watch);
> -        link->peer->watch = NULL;
> -    }
> +    reds_stream_free(link->peer);
> +    link->peer = NULL;
> +
>      free(link->link_mess);
> +    link->link_mess = NULL;
> +
>      BN_free(link->tiTicketing.bn);
> +    link->tiTicketing.bn = NULL;
> +
>      if (link->tiTicketing.rsa) {
>          RSA_free(link->tiTicketing.rsa);
> +        link->tiTicketing.rsa = NULL;
>      }
> -    free(link);
> -}
>  
> -static inline void reds_release_link(RedLinkInfo *link)
> -{
> -    RedsStreamContext *peer = link->peer;
> -    __reds_release_link(link);
> -    peer->cb_free(peer);
> +    free(link);
>  }
>  
>  #ifdef RED_STATISTICS
> @@ -1367,11 +1343,11 @@ void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
>      while (write_to_vdi_port() || read_from_vdi_port());
>  }
>  
> -static int sync_write(RedsStreamContext *peer, void *in_buf, size_t n)
> +static int sync_write(RedsStream *peer, void *in_buf, size_t n)
>  {
>      uint8_t *buf = (uint8_t *)in_buf;
>      while (n) {
> -        int now = peer->cb_write(peer->ctx, buf, n);
> +        int now = reds_stream_write(peer, buf, n);
>          if (now <= 0) {
>              if (now == -1 && (errno == EINTR || errno == EAGAIN)) {
>                  continue;
> @@ -1479,7 +1455,7 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
>  // actually be joined with reds_handle_other_links, ebcome reds_handle_link
>  static void reds_handle_main_link(RedLinkInfo *link)
>  {
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      SpiceLinkMess *link_mess;
>      uint32_t *caps;
>      uint32_t connection_id;
> @@ -1497,7 +1473,7 @@ static void reds_handle_main_link(RedLinkInfo *link)
>      } else {
>          if (link_mess->connection_id != reds->link_id) {
>              reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
> -            reds_release_link(link);
> +            reds_link_free(link);
>              return;
>          }
>          reds_send_link_result(link, SPICE_LINK_ERR_OK);
> @@ -1512,8 +1488,10 @@ static void reds_handle_main_link(RedLinkInfo *link)
>  
>      reds_show_new_channel(link, connection_id);
>      peer = link->peer;
> +    reds_stream_remove_watch(peer);
> +    link->peer = NULL;
>      link->link_mess = NULL;
> -    __reds_release_link(link);
> +    reds_link_free(link);
>      caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
>      reds->main_channel = main_channel_init();
>      reds->main_channel->link(reds->main_channel, peer, reds->mig_target, link_mess->num_common_caps,
> @@ -1580,7 +1558,7 @@ static void openssl_init(RedLinkInfo *link)
>  static void reds_handle_other_links(RedLinkInfo *link)
>  {
>      Channel *channel;
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      SpiceLinkMess *link_mess;
>      uint32_t *caps;
>  
> @@ -1588,14 +1566,14 @@ static void reds_handle_other_links(RedLinkInfo *link)
>  
>      if (!reds->link_id || reds->link_id != link_mess->connection_id) {
>          reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
>      if (!(channel = reds_find_channel(link_mess->channel_type,
>                                        link_mess->channel_id))) {
>          reds_send_link_result(link, SPICE_LINK_ERR_CHANNEL_NOT_AVAILABLE);
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
> @@ -1607,8 +1585,10 @@ static void reds_handle_other_links(RedLinkInfo *link)
>          main_channel_push_notify(reds->main_channel, (uint8_t*)mess, mess_len);
>      }
>      peer = link->peer;
> +    reds_stream_remove_watch(peer);
> +    link->peer = NULL;
>      link->link_mess = NULL;
> -    __reds_release_link(link);
> +    reds_link_free(link);
>      caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
>      channel->link(channel, peer, reds->mig_target, link_mess->num_common_caps,
>                    link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
> @@ -1636,13 +1616,13 @@ static void reds_handle_ticket(void *opaque)
>              reds_send_link_result(link, SPICE_LINK_ERR_PERMISSION_DENIED);
>              red_printf("Ticketing is enabled, but no password is set. "
>                         "please set a ticket first");
> -            reds_release_link(link);
> +            reds_link_free(link);
>              return;
>          }
>  
>          if (expired || strncmp(password, actual_sever_pass, SPICE_MAX_PASSWORD_LENGTH) != 0) {
>              reds_send_link_result(link, SPICE_LINK_ERR_PERMISSION_DENIED);
> -            reds_release_link(link);
> +            reds_link_free(link);
>              return;
>          }
>      }
> @@ -1670,7 +1650,8 @@ static void async_read_handler(int fd, int event, void *data)
>          int n = obj->end - obj->now;
>  
>          ASSERT(n > 0);
> -        if ((n = obj->peer->cb_read(obj->peer->ctx, obj->now, n)) <= 0) {
> +        n = reds_stream_read(obj->peer, obj->now, n);
> +        if (n <= 0) {
>              if (n < 0) {
>                  switch (errno) {
>                  case EAGAIN:
> @@ -1722,7 +1703,7 @@ static void reds_handle_read_link_done(void *opaque)
>                                                     link->link_header.size ||
>                                                       link_mess->caps_offset < sizeof(*link_mess))) {
>          reds_send_link_error(link, SPICE_LINK_ERR_INVALID_DATA);
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
> @@ -1734,12 +1715,12 @@ static void reds_handle_read_link_done(void *opaque)
>              red_printf("spice channels %d should be encrypted", link_mess->channel_type);
>              reds_send_link_error(link, SPICE_LINK_ERR_NEED_SECURED);
>          }
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
>      if (!reds_send_link_ack(link)) {
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
> @@ -1760,7 +1741,7 @@ static void reds_handle_link_error(void *opaque, int err)
>          red_printf("%s", strerror(errno));
>          break;
>      }
> -    reds_release_link(link);
> +    reds_link_free(link);
>  }
>  
>  static void reds_handle_read_header_done(void *opaque)
> @@ -1771,7 +1752,7 @@ static void reds_handle_read_header_done(void *opaque)
>  
>      if (header->magic != SPICE_MAGIC) {
>          reds_send_link_error(link, SPICE_LINK_ERR_INVALID_MAGIC);
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
> @@ -1781,7 +1762,7 @@ static void reds_handle_read_header_done(void *opaque)
>          }
>  
>          red_printf("version mismatch");
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
> @@ -1790,7 +1771,7 @@ static void reds_handle_read_header_done(void *opaque)
>      if (header->size < sizeof(SpiceLinkMess)) {
>          reds_send_link_error(link, SPICE_LINK_ERR_INVALID_DATA);
>          red_printf("bad size %u", header->size);
> -        reds_release_link(link);
> +        reds_link_free(link);
>          return;
>      }
>  
> @@ -1823,7 +1804,7 @@ static void reds_handle_ssl_accept(int fd, int event, void *data)
>          int ssl_error = SSL_get_error(link->peer->ssl, return_code);
>          if (ssl_error != SSL_ERROR_WANT_READ && ssl_error != SSL_ERROR_WANT_WRITE) {
>              red_printf("SSL_accept failed, error=%d", ssl_error);
> -            reds_release_link(link);
> +            reds_link_free(link);
>          } else {
>              if (ssl_error == SSL_ERROR_WANT_READ) {
>                  core->watch_update_mask(link->peer->watch, SPICE_WATCH_EVENT_READ);
> @@ -1841,7 +1822,7 @@ static void reds_handle_ssl_accept(int fd, int event, void *data)
>  static RedLinkInfo *__reds_accept_connection(int listen_socket)
>  {
>      RedLinkInfo *link;
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      int delay_val = 1;
>      int flags;
>      int socket;
> @@ -1866,7 +1847,7 @@ static RedLinkInfo *__reds_accept_connection(int listen_socket)
>      }
>  
>      link = spice_new0(RedLinkInfo, 1);
> -    peer = spice_new0(RedsStreamContext, 1);
> +    peer = spice_new0(RedsStream, 1);
>      link->peer = peer;
>      peer->socket = socket;
>  
> @@ -1890,17 +1871,16 @@ error:
>  static RedLinkInfo *reds_accept_connection(int listen_socket)
>  {
>      RedLinkInfo *link;
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>  
>      if (!(link = __reds_accept_connection(listen_socket))) {
>          return NULL;
>      }
>      peer = link->peer;
>      peer->ctx = (void *)((unsigned long)link->peer->socket);
> -    peer->cb_read = (int (*)(void *, void *, int))reds_read;
> -    peer->cb_write = (int (*)(void *, void *, int))reds_write;
> -    peer->cb_writev = (int (*)(void *, const struct iovec *vector, int count))writev;
> -    peer->cb_free = (int (*)(RedsStreamContext *))reds_free;
> +    peer->read = stream_read_cb;
> +    peer->write = stream_write_cb;
> +    peer->writev = stream_writev_cb;
>  
>      return link;
>  }
> @@ -1933,10 +1913,9 @@ static void reds_accept_ssl_connection(int fd, int event, void *data)
>      SSL_set_bio(link->peer->ssl, sbio, sbio);
>  
>      link->peer->ctx = (void *)(link->peer->ssl);
> -    link->peer->cb_write = (int (*)(void *, void *, int))reds_ssl_write;
> -    link->peer->cb_read = (int (*)(void *, void *, int))reds_ssl_read;
> -    link->peer->cb_writev = reds_ssl_writev;
> -    link->peer->cb_free = (int (*)(RedsStreamContext *))reds_ssl_free;
> +    link->peer->write = stream_ssl_write_cb;
> +    link->peer->read = stream_ssl_read_cb;
> +    link->peer->writev = stream_ssl_writev_cb;
>  
>      return_code = SSL_accept(link->peer->ssl);
>      if (return_code == 1) {
> @@ -3169,3 +3148,34 @@ __visible__ int spice_server_migrate_switch(SpiceServer *s)
>      reds_mig_switch();
>      return 0;
>  }
> +
> +ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte)
> +{
> +    return s->read(s, buf, nbyte);
> +}
> +
> +ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte)
> +{
> +    return s->write(s, buf, nbyte);
> +}
> +
> +ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt)
> +{
> +    return s->writev(s, iov, iovcnt);
> +}
> +
> +void reds_stream_free(RedsStream *s)
> +{
> +    if (!s)
> +        return;
> +
> +    reds_channel_event(s, SPICE_CHANNEL_EVENT_DISCONNECTED);
> +
> +    if (s->ssl)
> +        SSL_free(s->ssl);
> +
> +    reds_stream_remove_watch(s);
> +    close(s->socket);
> +
> +    free(s);
> +}
> diff --git a/server/reds.h b/server/reds.h
> index 547c33c..63b73c4 100644
> --- a/server/reds.h
> +++ b/server/reds.h
> @@ -28,7 +28,9 @@
>  
>  #define __visible__ __attribute__ ((visibility ("default")))
>  
> -typedef struct RedsStreamContext {
> +typedef struct RedsStream RedsStream;
> +
> +struct RedsStream {
>      void *ctx;
>  
>      int socket;
> @@ -41,12 +43,11 @@ typedef struct RedsStreamContext {
>  
>      SpiceChannelEventInfo info;
>  
> -    int (*cb_write)(void *, void *, int);
> -    int (*cb_read)(void *, void *, int);
> -
> -    int (*cb_writev)(void *, const struct iovec *vector, int count);
> -    int (*cb_free)(struct RedsStreamContext *);
> -} RedsStreamContext;
> +    /* private */
> +    ssize_t (*read)(RedsStream *s, void *buf, size_t nbyte);
> +    ssize_t (*write)(RedsStream *s, const void *buf, size_t nbyte);
> +    ssize_t (*writev)(RedsStream *s, const struct iovec *iov, int iovcnt);
> +};
>  
>  typedef struct Channel {
>      struct Channel *next;
> @@ -56,7 +57,7 @@ typedef struct Channel {
>      uint32_t *common_caps;
>      int num_caps;
>      uint32_t *caps;
> -    void (*link)(struct Channel *, RedsStreamContext *peer, int migration, int num_common_caps,
> +    void (*link)(struct Channel *, RedsStream *peer, int migration, int num_common_caps,
>                   uint32_t *common_caps, int num_caps, uint32_t *caps);
>      void (*shutdown)(struct Channel *);
>      void (*migrate)(struct Channel *);
> @@ -73,6 +74,11 @@ struct SpiceNetWireState {
>      struct TunnelWorker *worker;
>  };
>  
> +ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte);
> +ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte);
> +ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt);
> +void reds_stream_free(RedsStream *s);
> +
>  void reds_desable_mm_timer();
>  void reds_enable_mm_timer();
>  void reds_update_mm_timer(uint32_t mm_time);
> diff --git a/server/smartcard.c b/server/smartcard.c
> index 7c0a5aa..a7d26b6 100644
> --- a/server/smartcard.c
> +++ b/server/smartcard.c
> @@ -485,7 +485,7 @@ static int smartcard_channel_handle_message(RedChannel *channel, SpiceDataHeader
>      return TRUE;
>  }
>  
> -static void smartcard_link(Channel *channel, RedsStreamContext *peer,
> +static void smartcard_link(Channel *channel, RedsStream *peer,
>                          int migration, int num_common_caps,
>                          uint32_t *common_caps, int num_caps,
>                          uint32_t *caps)
> diff --git a/server/snd_worker.c b/server/snd_worker.c
> index 6c0f9d6..2382a29 100644
> --- a/server/snd_worker.c
> +++ b/server/snd_worker.c
> @@ -73,7 +73,7 @@ typedef void (*cleanup_channel_proc)(SndChannel *channel);
>  typedef struct SndWorker SndWorker;
>  
>  struct SndChannel {
> -    RedsStreamContext *peer;
> +    RedsStream *peer;
>      SndWorker *worker;
>      spice_parse_channel_func_t parser;
>  
> @@ -180,15 +180,15 @@ static void snd_disconnect_channel(SndChannel *channel)
>  {
>      SndWorker *worker;
>  
> -    if (!channel) {
> +    if (!channel)
>          return;
> -    }
> +
>      channel->cleanup(channel);
>      worker = channel->worker;
>      worker->connection = NULL;
>      core->watch_remove(channel->peer->watch);
>      channel->peer->watch = NULL;
> -    channel->peer->cb_free(channel->peer);
> +    reds_stream_free(channel->peer);
>      spice_marshaller_destroy(channel->send_data.marshaller);
>      free(channel);
>  }
> @@ -243,7 +243,8 @@ static int snd_send_data(SndChannel *channel)
>  
>          vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
>                                                 vec, MAX_SEND_VEC, channel->send_data.pos);
> -        if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
> +        n = reds_stream_writev(channel->peer, vec, vec_size);
> +        if (n == -1) {
>              switch (errno) {
>              case EAGAIN:
>                  channel->blocked = TRUE;
> @@ -389,7 +390,8 @@ static void snd_receive(void* data)
>          ssize_t n;
>          n = channel->recive_data.end - channel->recive_data.now;
>          ASSERT(n);
> -        if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
> +        n = reds_stream_read(channel->peer, channel->recive_data.now, n);
> +        if (n <= 0) {
>              if (n == 0) {
>                  snd_disconnect_channel(channel);
>                  return;
> @@ -734,7 +736,7 @@ static void snd_record_send(void* data)
>  }
>  
>  static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id,
> -                                 RedsStreamContext *peer,
> +                                 RedsStream *peer,
>                                   int migrate, send_messages_proc send_messages,
>                                   handle_message_proc handle_message,
>                                   on_message_done_proc on_message_done,
> @@ -800,7 +802,7 @@ error2:
>      free(channel);
>  
>  error1:
> -    peer->cb_free(peer);
> +    reds_stream_free(peer);
>      return NULL;
>  }
>  
> @@ -931,7 +933,7 @@ static void snd_playback_cleanup(SndChannel *channel)
>      celt051_mode_destroy(playback_channel->celt_mode);
>  }
>  
> -static void snd_set_playback_peer(Channel *channel, RedsStreamContext *peer, int migration,
> +static void snd_set_playback_peer(Channel *channel, RedsStream *peer, int migration,
>                                    int num_common_caps, uint32_t *common_caps, int num_caps,
>                                    uint32_t *caps)
>  {
> @@ -1097,7 +1099,7 @@ static void snd_record_cleanup(SndChannel *channel)
>      celt051_mode_destroy(record_channel->celt_mode);
>  }
>  
> -static void snd_set_record_peer(Channel *channel, RedsStreamContext *peer, int migration,
> +static void snd_set_record_peer(Channel *channel, RedsStream *peer, int migration,
>                                  int num_common_caps, uint32_t *common_caps, int num_caps,
>                                  uint32_t *caps)
>  {
> -- 
> 1.7.4
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel


More information about the Spice-devel mailing list