[Spice-devel] [PATCH 07/10] server/red_channel: go marshaller for outgoing (copied from red_worker)
Hans de Goede
hdegoede at redhat.com
Wed Jan 12 23:47:12 PST 2011
Ack.
On 01/13/2011 06:01 AM, Alon Levy wrote:
> ---
> server/red_channel.c | 105 +++++++++++++++++---------------------------------
> server/red_channel.h | 11 ++---
> 2 files changed, 41 insertions(+), 75 deletions(-)
>
> diff --git a/server/red_channel.c b/server/red_channel.c
> index 3c1aede..b6c13d1 100644
> --- a/server/red_channel.c
> +++ b/server/red_channel.c
> @@ -143,21 +143,6 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
> }
> }
>
> -static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
> -{
> - struct iovec *now = vec;
> -
> - while ((skip)&& (skip>= now->iov_len)) {
> - skip -= now->iov_len;
> - --*vec_size;
> - now++;
> - }
> -
> - now->iov_base = (uint8_t *)now->iov_base + skip;
> - now->iov_len -= skip;
> - return now;
> -}
> -
> static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
> {
> int n;
> @@ -167,9 +152,9 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
> if (!handler->size) { // nothing to be sent
> return;
> }
> - handler->prepare(handler->opaque, handler->vec,&handler->vec_size);
> }
> for (;;) {
> + handler->prepare(handler->opaque, handler->vec,&handler->vec_size, handler->pos);
> if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
> switch (errno) {
> case EAGAIN:
> @@ -187,27 +172,17 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
> }
> } else {
> handler->pos += n;
> - handler->vec = __iovec_skip(handler->vec, n,&handler->vec_size);
> - if (!handler->vec_size) {
> - if (handler->pos == handler->size) { // finished writing data
> - handler->on_msg_done(handler->opaque);
> - handler->vec = handler->vec_buf;
> - handler->pos = 0;
> - handler->size = 0;
> - return;
> - } else {
> - // There wasn't enough place for all the outgoing data in one iovec array.
> - // Filling the rest of the data.
> - handler->vec = handler->vec_buf;
> - handler->prepare(handler->opaque, handler->vec,&handler->vec_size);
> - }
> + if (!handler->vec_size&& handler->pos == handler->size) { // finished writing data
> + handler->on_msg_done(handler->opaque);
> + handler->vec = handler->vec_buf;
> + handler->pos = 0;
> + handler->size = 0;
> + return;
> }
> }
> }
> }
>
> -static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
> -
> static void red_channel_peer_on_error(void *opaque)
> {
> RedChannel *channel = (RedChannel *)opaque;
> @@ -232,13 +207,16 @@ static void red_channel_peer_on_outgoing_error(void *opaque)
> static int red_channel_peer_get_out_msg_size(void *opaque)
> {
> RedChannel *channel = (RedChannel *)opaque;
> +
> return channel->send_data.size;
> }
>
> -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
> +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
> {
> RedChannel *channel = (RedChannel *)opaque;
> - red_channel_fill_iovec(channel, vec, vec_size);
> +
> + *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
> + vec, MAX_SEND_VEC, pos);
> }
>
> static void red_channel_peer_on_out_block(void *opaque)
> @@ -254,8 +232,6 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
> {
> RedChannel *channel = (RedChannel *)opaque;
> channel->send_data.size = 0;
> - channel->send_data.n_bufs = 0;
> - channel->send_data.not_sent_buf_head = 0;
> if (channel->send_data.item) {
> channel->release_item(channel, channel->send_data.item, TRUE);
> channel->send_data.item = NULL;
> @@ -298,6 +274,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
>
> channel->migrate = migrate;
> ring_init(&channel->pipe);
> + channel->send_data.marshaller = spice_marshaller_new();
>
> channel->incoming.opaque = channel;
> channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
> @@ -328,6 +305,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
> return channel;
>
> error:
> + spice_marshaller_destroy(channel->send_data.marshaller);
> free(channel);
> peer->cb_free(peer);
>
> @@ -380,6 +358,7 @@ void red_channel_destroy(RedChannel *channel)
> red_channel_pipe_clear(channel);
> channel->core->watch_remove(channel->peer->watch);
> channel->peer->cb_free(channel->peer);
> + spice_marshaller_destroy(channel->send_data.marshaller);
> free(channel);
> }
>
> @@ -436,50 +415,30 @@ static void red_channel_event(int fd, int event, void *data)
> }
> }
>
> -static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
> -{
> - int pos = channel->send_data.n_bufs++;
> - ASSERT(pos< MAX_SEND_BUFS);
> - channel->send_data.bufs[pos].size = size;
> - channel->send_data.bufs[pos].data = data;
> -}
> -
> void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
> {
> - __red_channel_add_buf(channel, data, size);
> - channel->send_data.header.size += size;
> + spice_marshaller_add_ref(channel->send_data.marshaller, data, size);
> + channel->send_data.header->size += size;
> }
>
> void red_channel_reset_send_data(RedChannel *channel)
> {
> - channel->send_data.n_bufs = 0;
> - channel->send_data.header.size = 0;
> - channel->send_data.header.sub_list = 0;
> - ++channel->send_data.header.serial;
> - __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(SpiceDataHeader));
> + spice_marshaller_reset(channel->send_data.marshaller);
> + channel->send_data.header = (SpiceDataHeader *)
> + spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> + spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
> + channel->send_data.header->type = 0;
> + channel->send_data.header->size = 0;
> + channel->send_data.header->sub_list = 0;
> + channel->send_data.header->serial = ++channel->send_data.serial;
> }
>
> void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
> {
> - channel->send_data.header.type = msg_type;
> + channel->send_data.header->type = msg_type;
> channel->send_data.item = item;
> }
>
> -static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
> -{
> - BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
> - ASSERT(channel->send_data.not_sent_buf_head< channel->send_data.n_bufs);
> - *vec_size = 0;
> - do {
> - vec[*vec_size].iov_base = buf->data;
> - vec[*vec_size].iov_len = buf->size;
> - (*vec_size)++;
> - buf++;
> - channel->send_data.not_sent_buf_head++;
> - } while (((*vec_size)< MAX_SEND_VEC)&&
> - (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
> -}
> -
> static void red_channel_send(RedChannel *channel)
> {
> red_peer_handle_outgoing(channel->peer,&channel->outgoing);
> @@ -487,8 +446,11 @@ static void red_channel_send(RedChannel *channel)
>
> void red_channel_begin_send_message(RedChannel *channel)
> {
> - channel->send_data.size = channel->send_data.header.size + sizeof(SpiceDataHeader);
> + spice_marshaller_flush(channel->send_data.marshaller);
> + channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
> + channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader);
> channel->ack_data.messages_window++;
> + channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
> red_channel_send(channel);
> }
>
> @@ -514,7 +476,12 @@ static void red_channel_push(RedChannel *channel)
>
> uint64_t red_channel_get_message_serial(RedChannel *channel)
> {
> - return channel->send_data.header.serial;
> + return channel->send_data.serial;
> +}
> +
> +void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
> +{
> + channel->send_data.serial = serial;
> }
>
> void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
> diff --git a/server/red_channel.h b/server/red_channel.h
> index 30adfc6..893a7f8 100644
> --- a/server/red_channel.h
> +++ b/server/red_channel.h
> @@ -61,7 +61,7 @@ typedef struct IncomingHandler {
> } IncomingHandler;
>
> typedef int (*get_outgoing_msg_size_proc)(void *opaque);
> -typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
> +typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
> typedef void (*on_outgoing_error_proc)(void *opaque);
> typedef void (*on_outgoing_block_proc)(void *opaque);
> typedef void (*on_outgoing_msg_done_proc)(void *opaque);
> @@ -125,18 +125,16 @@ struct RedChannel {
> uint32_t pipe_size;
>
> struct {
> - SpiceDataHeader header;
> + SpiceMarshaller *marshaller;
> + SpiceDataHeader *header;
> union {
> SpiceMsgSetAck ack;
> SpiceMsgMigrate migrate;
> } u;
> - uint32_t n_bufs;
> - BufDescriptor bufs[MAX_SEND_BUFS];
> uint32_t size;
> - uint32_t not_sent_buf_head;
> -
> PipeItem *item;
> int blocked;
> + uint64_t serial;
> } send_data;
>
> OutgoingHandler outgoing;
> @@ -200,6 +198,7 @@ void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem
> void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
>
> uint64_t red_channel_get_message_serial(RedChannel *channel);
> +void red_channel_set_message_serial(RedChannel *channel, uint64_t);
>
> /* when sending a msg. should first call red_channel_begin_send_message */
> void red_channel_begin_send_message(RedChannel *channel);
More information about the Spice-devel
mailing list