[Spice-devel] [PATCH 07/10] server/red_channel: go marshaller for outgoing (copied from red_worker)

Hans de Goede hdegoede at redhat.com
Wed Jan 12 23:47:12 PST 2011


Ack.

On 01/13/2011 06:01 AM, Alon Levy wrote:
> ---
>   server/red_channel.c |  105 +++++++++++++++++---------------------------------
>   server/red_channel.h |   11 ++---
>   2 files changed, 41 insertions(+), 75 deletions(-)
>
> diff --git a/server/red_channel.c b/server/red_channel.c
> index 3c1aede..b6c13d1 100644
> --- a/server/red_channel.c
> +++ b/server/red_channel.c
> @@ -143,21 +143,6 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
>       }
>   }
>
> -static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
> -{
> -    struct iovec *now = vec;
> -
> -    while ((skip)&&  (skip>= now->iov_len)) {
> -        skip -= now->iov_len;
> -        --*vec_size;
> -        now++;
> -    }
> -
> -    now->iov_base = (uint8_t *)now->iov_base + skip;
> -    now->iov_len -= skip;
> -    return now;
> -}
> -
>   static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
>   {
>       int n;
> @@ -167,9 +152,9 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
>           if (!handler->size) {  // nothing to be sent
>               return;
>           }
> -        handler->prepare(handler->opaque, handler->vec,&handler->vec_size);
>       }
>       for (;;) {
> +        handler->prepare(handler->opaque, handler->vec,&handler->vec_size, handler->pos);
>           if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
>               switch (errno) {
>               case EAGAIN:
> @@ -187,27 +172,17 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
>               }
>           } else {
>               handler->pos += n;
> -            handler->vec = __iovec_skip(handler->vec, n,&handler->vec_size);
> -            if (!handler->vec_size) {
> -                if (handler->pos == handler->size) { // finished writing data
> -                    handler->on_msg_done(handler->opaque);
> -                    handler->vec = handler->vec_buf;
> -                    handler->pos = 0;
> -                    handler->size = 0;
> -                    return;
> -                } else {
> -                    // There wasn't enough place for all the outgoing data in one iovec array.
> -                    // Filling the rest of the data.
> -                    handler->vec = handler->vec_buf;
> -                    handler->prepare(handler->opaque, handler->vec,&handler->vec_size);
> -                }
> +            if (!handler->vec_size&&  handler->pos == handler->size) { // finished writing data
> +                handler->on_msg_done(handler->opaque);
> +                handler->vec = handler->vec_buf;
> +                handler->pos = 0;
> +                handler->size = 0;
> +                return;
>               }
>           }
>       }
>   }
>
> -static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
> -
>   static void red_channel_peer_on_error(void *opaque)
>   {
>       RedChannel *channel = (RedChannel *)opaque;
> @@ -232,13 +207,16 @@ static void red_channel_peer_on_outgoing_error(void *opaque)
>   static int red_channel_peer_get_out_msg_size(void *opaque)
>   {
>       RedChannel *channel = (RedChannel *)opaque;
> +
>       return channel->send_data.size;
>   }
>
> -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
> +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
>   {
>       RedChannel *channel = (RedChannel *)opaque;
> -    red_channel_fill_iovec(channel, vec, vec_size);
> +
> +    *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
> +                                            vec, MAX_SEND_VEC, pos);
>   }
>
>   static void red_channel_peer_on_out_block(void *opaque)
> @@ -254,8 +232,6 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
>   {
>       RedChannel *channel = (RedChannel *)opaque;
>       channel->send_data.size = 0;
> -    channel->send_data.n_bufs = 0;
> -    channel->send_data.not_sent_buf_head = 0;
>       if (channel->send_data.item) {
>           channel->release_item(channel, channel->send_data.item, TRUE);
>           channel->send_data.item = NULL;
> @@ -298,6 +274,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
>
>       channel->migrate = migrate;
>       ring_init(&channel->pipe);
> +    channel->send_data.marshaller = spice_marshaller_new();
>
>       channel->incoming.opaque = channel;
>       channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
> @@ -328,6 +305,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
>       return channel;
>
>   error:
> +    spice_marshaller_destroy(channel->send_data.marshaller);
>       free(channel);
>       peer->cb_free(peer);
>
> @@ -380,6 +358,7 @@ void red_channel_destroy(RedChannel *channel)
>       red_channel_pipe_clear(channel);
>       channel->core->watch_remove(channel->peer->watch);
>       channel->peer->cb_free(channel->peer);
> +    spice_marshaller_destroy(channel->send_data.marshaller);
>       free(channel);
>   }
>
> @@ -436,50 +415,30 @@ static void red_channel_event(int fd, int event, void *data)
>       }
>   }
>
> -static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
> -{
> -    int pos = channel->send_data.n_bufs++;
> -    ASSERT(pos<  MAX_SEND_BUFS);
> -    channel->send_data.bufs[pos].size = size;
> -    channel->send_data.bufs[pos].data = data;
> -}
> -
>   void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
>   {
> -    __red_channel_add_buf(channel, data, size);
> -    channel->send_data.header.size += size;
> +    spice_marshaller_add_ref(channel->send_data.marshaller, data, size);
> +    channel->send_data.header->size += size;
>   }
>
>   void red_channel_reset_send_data(RedChannel *channel)
>   {
> -    channel->send_data.n_bufs = 0;
> -    channel->send_data.header.size = 0;
> -    channel->send_data.header.sub_list = 0;
> -    ++channel->send_data.header.serial;
> -    __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(SpiceDataHeader));
> +    spice_marshaller_reset(channel->send_data.marshaller);
> +    channel->send_data.header = (SpiceDataHeader *)
> +        spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> +    spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> +    channel->send_data.header->type = 0;
> +    channel->send_data.header->size = 0;
> +    channel->send_data.header->sub_list = 0;
> +    channel->send_data.header->serial = ++channel->send_data.serial;
>   }
>
>   void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
>   {
> -    channel->send_data.header.type = msg_type;
> +    channel->send_data.header->type = msg_type;
>       channel->send_data.item = item;
>   }
>
> -static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
> -{
> -    BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
> -    ASSERT(channel->send_data.not_sent_buf_head<  channel->send_data.n_bufs);
> -    *vec_size = 0;
> -    do {
> -        vec[*vec_size].iov_base = buf->data;
> -        vec[*vec_size].iov_len = buf->size;
> -        (*vec_size)++;
> -        buf++;
> -        channel->send_data.not_sent_buf_head++;
> -    } while (((*vec_size)<  MAX_SEND_VEC)&&
> -             (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
> -}
> -
>   static void red_channel_send(RedChannel *channel)
>   {
>       red_peer_handle_outgoing(channel->peer,&channel->outgoing);
> @@ -487,8 +446,11 @@ static void red_channel_send(RedChannel *channel)
>
>   void red_channel_begin_send_message(RedChannel *channel)
>   {
> -    channel->send_data.size = channel->send_data.header.size + sizeof(SpiceDataHeader);
> +    spice_marshaller_flush(channel->send_data.marshaller);
> +    channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
> +    channel->send_data.header->size =  channel->send_data.size - sizeof(SpiceDataHeader);
>       channel->ack_data.messages_window++;
> +    channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
>       red_channel_send(channel);
>   }
>
> @@ -514,7 +476,12 @@ static void red_channel_push(RedChannel *channel)
>
>   uint64_t red_channel_get_message_serial(RedChannel *channel)
>   {
> -    return channel->send_data.header.serial;
> +    return channel->send_data.serial;
> +}
> +
> +void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
> +{
> +    channel->send_data.serial = serial;
>   }
>
>   void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
> diff --git a/server/red_channel.h b/server/red_channel.h
> index 30adfc6..893a7f8 100644
> --- a/server/red_channel.h
> +++ b/server/red_channel.h
> @@ -61,7 +61,7 @@ typedef struct IncomingHandler {
>   } IncomingHandler;
>
>   typedef int (*get_outgoing_msg_size_proc)(void *opaque);
> -typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
> +typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
>   typedef void (*on_outgoing_error_proc)(void *opaque);
>   typedef void (*on_outgoing_block_proc)(void *opaque);
>   typedef void (*on_outgoing_msg_done_proc)(void *opaque);
> @@ -125,18 +125,16 @@ struct RedChannel {
>       uint32_t pipe_size;
>
>       struct {
> -        SpiceDataHeader header;
> +        SpiceMarshaller *marshaller;
> +        SpiceDataHeader *header;
>           union {
>               SpiceMsgSetAck ack;
>               SpiceMsgMigrate migrate;
>           } u;
> -        uint32_t n_bufs;
> -        BufDescriptor bufs[MAX_SEND_BUFS];
>           uint32_t size;
> -        uint32_t not_sent_buf_head;
> -
>           PipeItem *item;
>           int blocked;
> +        uint64_t serial;
>       } send_data;
>
>       OutgoingHandler outgoing;
> @@ -200,6 +198,7 @@ void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem
>   void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
>
>   uint64_t red_channel_get_message_serial(RedChannel *channel);
> +void red_channel_set_message_serial(RedChannel *channel, uint64_t);
>
>   /* when sending a msg. should first call red_channel_begin_send_message */
>   void red_channel_begin_send_message(RedChannel *channel);


More information about the Spice-devel mailing list