[Spice-devel] [PATCH 13/23] server/red_worker: introduce red_peer_handle_outgoing and OutgoingHandler

Marc-André Lureau marcandre.lureau at gmail.com
Sat Feb 12 11:56:55 PST 2011


ack

On Fri, Feb 11, 2011 at 6:23 PM, Alon Levy <alevy at redhat.com> wrote:
> From red_channel.
> ---
>  server/red_worker.c |  151 +++++++++++++++++++++++++++++++++++++++------------
>  1 files changed, 115 insertions(+), 36 deletions(-)
>
> diff --git a/server/red_worker.c b/server/red_worker.c
> index 06b53de..cd3d1a8 100644
> --- a/server/red_worker.c
> +++ b/server/red_worker.c
> @@ -354,6 +354,31 @@ typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item)
>  typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, PipeItem *item, int item_pushed);
>  typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type, void *message);
>
> +#define MAX_SEND_VEC 100
> +
> +typedef int (*get_outgoing_msg_size_proc)(void *opaque);
> +typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
> +typedef void (*on_outgoing_error_proc)(void *opaque);
> +typedef void (*on_outgoing_block_proc)(void *opaque);
> +typedef void (*on_outgoing_msg_done_proc)(void *opaque);
> +
> +typedef struct OutgoingHandler {
> +    void *opaque;
> +    struct iovec vec_buf[MAX_SEND_VEC];
> +    int vec_size;
> +    struct iovec *vec;
> +    int pos;
> +    int size;
> +    get_outgoing_msg_size_proc get_msg_size;
> +    prepare_outgoing_proc prepare;
> +    on_outgoing_error_proc on_error;
> +    on_outgoing_block_proc on_block;
> +    on_outgoing_msg_done_proc on_msg_done;
> +#ifdef RED_STATISTICS
> +    uint64_t *out_bytes_counter;
> +#endif
> +} OutgoingHandler;
> +
>  struct RedChannel {
>     spice_parse_channel_func_t parser;
>     RedsStreamContext *peer;
> @@ -386,6 +411,8 @@ struct RedChannel {
>         uint8_t *end;
>     } incoming;
>
> +    OutgoingHandler outgoing;
> +
>     channel_disconnect_proc disconnect;
>     channel_hold_pipe_item_proc hold_item;
>     channel_release_pipe_item_proc release_item;
> @@ -393,12 +420,6 @@ struct RedChannel {
>     channel_send_pipe_item_proc send_item;
>
>     int during_send;
> -
> -#ifdef RED_STATISTICS
> -    struct {
> -        uint64_t *out_bytes_counter;
> -    } outgoing;
> -#endif
>  };
>
>  typedef struct ImageItem {
> @@ -7277,8 +7298,6 @@ static inline void red_send_qxl_drawable(RedWorker *worker, DisplayChannel *disp
>     display_begin_send_message(display_channel);
>  }
>
> -#define MAX_SEND_VEC 100
> -
>  static void inline channel_release_res(RedChannel *channel)
>  {
>     if (!channel->send_data.item) {
> @@ -7288,46 +7307,82 @@ static void inline channel_release_res(RedChannel *channel)
>     channel->send_data.item = NULL;
>  }
>
> -static void red_channel_send(RedChannel *channel)
> +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec,
> +                                             int *vec_size, int pos)
>  {
> -    for (;;) {
> -        uint32_t n = channel->send_data.size - channel->send_data.pos;
> -        struct iovec vec[MAX_SEND_VEC];
> -        int vec_size;
> -
> -        if (!n) {
> -            channel->send_data.blocked = FALSE;
> -            if (channel->send_data.item) {
> -                channel->release_item(channel, channel->send_data.item, FALSE);
> -                channel->send_data.item = NULL;
> -            }
> -            break;
> +    RedChannel *channel = (RedChannel *)opaque;
> +
> +    *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
> +                                            vec, MAX_SEND_VEC, pos);
> +}
> +
> +static void red_channel_peer_on_out_block(void *opaque)
> +{
> +    RedChannel *channel = (RedChannel *)opaque;
> +
> +    channel->send_data.blocked = TRUE;
> +}
> +
> +static void red_channel_peer_on_out_msg_done(void *opaque)
> +{
> +    RedChannel *channel = (RedChannel *)opaque;
> +
> +    channel->send_data.size = 0;
> +    if (channel->send_data.item) {
> +        channel->release_item(channel, channel->send_data.item, TRUE);
> +        channel->send_data.item = NULL;
> +    }
> +    channel->send_data.blocked = FALSE;
> +}
> +
> +static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
> +{
> +    int n;
> +
> +    ASSERT(peer);
> +    if (handler->size == 0) {
> +        handler->vec = handler->vec_buf;
> +        handler->size = handler->get_msg_size(handler->opaque);
> +        if (!handler->size) {  // nothing to be sent
> +            return;
>         }
> -        vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
> -                                               vec, MAX_SEND_VEC, channel->send_data.pos);
> -        ASSERT(channel->peer);
> -        if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
> +    }
> +    for (;;) {
> +        handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
> +        if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
>             switch (errno) {
>             case EAGAIN:
> -                channel->send_data.blocked = TRUE;
> +                handler->on_block(handler->opaque);
>                 return;
>             case EINTR:
>                 break;
>             case EPIPE:
> -                channel->disconnect(channel);
> +                handler->on_error(handler->opaque);
>                 return;
>             default:
>                 red_printf("%s", strerror(errno));
> -                channel->disconnect(channel);
> +                handler->on_error(handler->opaque);
>                 return;
>             }
>         } else {
> -            channel->send_data.pos += n;
> -            stat_inc_counter(channel->outgoing.out_bytes_counter, n);
> +            handler->pos += n;
> +            stat_inc_counter(handler->out_bytes_counter, n);
> +            if (handler->pos == handler->size) { // finished writing data
> +                handler->on_msg_done(handler->opaque);
> +                handler->vec = handler->vec_buf;
> +                handler->pos = 0;
> +                handler->size = 0;
> +                return;
> +            }
>         }
>     }
>  }
>
> +static void red_channel_send(RedChannel *channel)
> +{
> +    red_peer_handle_outgoing(channel->peer, &channel->outgoing);
> +}
> +
>  static void display_channel_push_release(DisplayChannel *channel, uint8_t type, uint64_t id,
>                                          uint64_t* sync_data)
>  {
> @@ -8243,16 +8298,17 @@ static void red_send_surface_destroy(DisplayChannel *display, uint32_t surface_i
>     red_channel_begin_send_message(channel);
>  }
>
> +static inline int red_channel_waiting_for_ack(RedChannel *channel)
> +{
> +    return (channel->ack_data.messages_window > channel->ack_data.client_window * 2);
> +}
> +
>  static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
>  {
>     PipeItem *item;
>     if (!channel || channel->send_data.blocked ||
> -                                            !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
> -        return NULL;
> -    }
> -
> -    if (channel->ack_data.messages_window > channel->ack_data.client_window * 2) {
> -        channel->send_data.blocked = TRUE;
> +        red_channel_waiting_for_ack(channel) ||
> +        !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
>         return NULL;
>     }
>
> @@ -9336,6 +9392,18 @@ static void free_common_channel_from_listener(EventListener *ctx)
>     free(common);
>  }
>
> +static void red_channel_default_peer_on_error(RedChannel *channel)
> +{
> +    channel->disconnect(channel);
> +}
> +
> +static int red_channel_peer_get_out_msg_size(void *opaque)
> +{
> +    RedChannel *channel = (RedChannel *)opaque;
> +
> +    return channel->send_data.size;
> +}
> +
>  static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
>                                  RedsStreamContext *peer, int migrate,
>                                  event_listener_action_proc handler,
> @@ -9380,6 +9448,17 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
>     ring_init(&channel->pipe);
>     channel->send_data.marshaller = spice_marshaller_new();
>
> +    channel->outgoing.opaque = channel;
> +    channel->outgoing.pos = 0;
> +    channel->outgoing.size = 0;
> +    channel->outgoing.out_bytes_counter = 0;
> +
> +    channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
> +    channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
> +    channel->outgoing.on_block = red_channel_peer_on_out_block;
> +    channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
> +    channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
> +
>     event.events = EPOLLIN | EPOLLOUT | EPOLLET;
>     event.data.ptr = &common->listener;
>     if (epoll_ctl(worker->epoll, EPOLL_CTL_ADD, peer->socket, &event) == -1) {
> --
> 1.7.4
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
>



-- 
Marc-André Lureau


More information about the Spice-devel mailing list