[Spice-devel] [PATCH 07/10] server/red_channel: go marshaller for outgoing (copied from red_worker)

Alon Levy alevy at redhat.com
Wed Jan 12 21:01:40 PST 2011


---
 server/red_channel.c |  105 +++++++++++++++++---------------------------------
 server/red_channel.h |   11 ++---
 2 files changed, 41 insertions(+), 75 deletions(-)

diff --git a/server/red_channel.c b/server/red_channel.c
index 3c1aede..b6c13d1 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -143,21 +143,6 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
     }
 }
 
-static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
-{
-    struct iovec *now = vec;
-
-    while ((skip) && (skip >= now->iov_len)) {
-        skip -= now->iov_len;
-        --*vec_size;
-        now++;
-    }
-
-    now->iov_base = (uint8_t *)now->iov_base + skip;
-    now->iov_len -= skip;
-    return now;
-}
-
 static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
 {
     int n;
@@ -167,9 +152,9 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
         if (!handler->size) {  // nothing to be sent
             return;
         }
-        handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
     }
     for (;;) {
+        handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
         if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
             switch (errno) {
             case EAGAIN:
@@ -187,27 +172,17 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
             }
         } else {
             handler->pos += n;
-            handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size);
-            if (!handler->vec_size) {
-                if (handler->pos == handler->size) { // finished writing data
-                    handler->on_msg_done(handler->opaque);
-                    handler->vec = handler->vec_buf;
-                    handler->pos = 0;
-                    handler->size = 0;
-                    return;
-                } else {
-                    // There wasn't enough place for all the outgoing data in one iovec array.
-                    // Filling the rest of the data.
-                    handler->vec = handler->vec_buf;
-                    handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
-                }
+            if (!handler->vec_size && handler->pos == handler->size) { // finished writing data
+                handler->on_msg_done(handler->opaque);
+                handler->vec = handler->vec_buf;
+                handler->pos = 0;
+                handler->size = 0;
+                return;
             }
         }
     }
 }
 
-static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
-
 static void red_channel_peer_on_error(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
@@ -232,13 +207,16 @@ static void red_channel_peer_on_outgoing_error(void *opaque)
 static int red_channel_peer_get_out_msg_size(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
+
     return channel->send_data.size;
 }
 
-static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
+static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
 {
     RedChannel *channel = (RedChannel *)opaque;
-    red_channel_fill_iovec(channel, vec, vec_size);
+
+    *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
+                                            vec, MAX_SEND_VEC, pos);
 }
 
 static void red_channel_peer_on_out_block(void *opaque)
@@ -254,8 +232,6 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
     channel->send_data.size = 0;
-    channel->send_data.n_bufs = 0;
-    channel->send_data.not_sent_buf_head = 0;
     if (channel->send_data.item) {
         channel->release_item(channel, channel->send_data.item, TRUE);
         channel->send_data.item = NULL;
@@ -298,6 +274,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
 
     channel->migrate = migrate;
     ring_init(&channel->pipe);
+    channel->send_data.marshaller = spice_marshaller_new();
 
     channel->incoming.opaque = channel;
     channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
@@ -328,6 +305,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
     return channel;
 
 error:
+    spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
     peer->cb_free(peer);
 
@@ -380,6 +358,7 @@ void red_channel_destroy(RedChannel *channel)
     red_channel_pipe_clear(channel);
     channel->core->watch_remove(channel->peer->watch);
     channel->peer->cb_free(channel->peer);
+    spice_marshaller_destroy(channel->send_data.marshaller);
     free(channel);
 }
 
@@ -436,50 +415,30 @@ static void red_channel_event(int fd, int event, void *data)
     }
 }
 
-static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
-{
-    int pos = channel->send_data.n_bufs++;
-    ASSERT(pos < MAX_SEND_BUFS);
-    channel->send_data.bufs[pos].size = size;
-    channel->send_data.bufs[pos].data = data;
-}
-
 void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
 {
-    __red_channel_add_buf(channel, data, size);
-    channel->send_data.header.size += size;
+    spice_marshaller_add_ref(channel->send_data.marshaller, data, size);
+    channel->send_data.header->size += size;
 }
 
 void red_channel_reset_send_data(RedChannel *channel)
 {
-    channel->send_data.n_bufs = 0;
-    channel->send_data.header.size = 0;
-    channel->send_data.header.sub_list = 0;
-    ++channel->send_data.header.serial;
-    __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(SpiceDataHeader));
+    spice_marshaller_reset(channel->send_data.marshaller);
+    channel->send_data.header = (SpiceDataHeader *)
+        spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
+    spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
+    channel->send_data.header->type = 0;
+    channel->send_data.header->size = 0;
+    channel->send_data.header->sub_list = 0;
+    channel->send_data.header->serial = ++channel->send_data.serial;
 }
 
 void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
 {
-    channel->send_data.header.type = msg_type;
+    channel->send_data.header->type = msg_type;
     channel->send_data.item = item;
 }
 
-static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
-{
-    BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
-    ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs);
-    *vec_size = 0;
-    do {
-        vec[*vec_size].iov_base = buf->data;
-        vec[*vec_size].iov_len = buf->size;
-        (*vec_size)++;
-        buf++;
-        channel->send_data.not_sent_buf_head++;
-    } while (((*vec_size) < MAX_SEND_VEC) &&
-             (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
-}
-
 static void red_channel_send(RedChannel *channel)
 {
     red_peer_handle_outgoing(channel->peer, &channel->outgoing);
@@ -487,8 +446,11 @@ static void red_channel_send(RedChannel *channel)
 
 void red_channel_begin_send_message(RedChannel *channel)
 {
-    channel->send_data.size = channel->send_data.header.size + sizeof(SpiceDataHeader);
+    spice_marshaller_flush(channel->send_data.marshaller);
+    channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
+    channel->send_data.header->size =  channel->send_data.size - sizeof(SpiceDataHeader);
     channel->ack_data.messages_window++;
+    channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
     red_channel_send(channel);
 }
 
@@ -514,7 +476,12 @@ static void red_channel_push(RedChannel *channel)
 
 uint64_t red_channel_get_message_serial(RedChannel *channel)
 {
-    return channel->send_data.header.serial;
+    return channel->send_data.serial;
+}
+
+void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
+{
+    channel->send_data.serial = serial;
 }
 
 void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
diff --git a/server/red_channel.h b/server/red_channel.h
index 30adfc6..893a7f8 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -61,7 +61,7 @@ typedef struct IncomingHandler {
 } IncomingHandler;
 
 typedef int (*get_outgoing_msg_size_proc)(void *opaque);
-typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
 typedef void (*on_outgoing_error_proc)(void *opaque);
 typedef void (*on_outgoing_block_proc)(void *opaque);
 typedef void (*on_outgoing_msg_done_proc)(void *opaque);
@@ -125,18 +125,16 @@ struct RedChannel {
     uint32_t pipe_size;
 
     struct {
-        SpiceDataHeader header;
+        SpiceMarshaller *marshaller;
+        SpiceDataHeader *header;
         union {
             SpiceMsgSetAck ack;
             SpiceMsgMigrate migrate;
         } u;
-        uint32_t n_bufs;
-        BufDescriptor bufs[MAX_SEND_BUFS];
         uint32_t size;
-        uint32_t not_sent_buf_head;
-
         PipeItem *item;
         int blocked;
+        uint64_t serial;
     } send_data;
 
     OutgoingHandler outgoing;
@@ -200,6 +198,7 @@ void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem
 void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
 
 uint64_t red_channel_get_message_serial(RedChannel *channel);
+void red_channel_set_message_serial(RedChannel *channel, uint64_t);
 
 /* when sending a msg. should first call red_channel_begin_send_message */
 void red_channel_begin_send_message(RedChannel *channel);
-- 
1.7.3.4



More information about the Spice-devel mailing list