[Spice-devel] [PATCH 07/10] server/red_channel: go marshaller for outgoing (copied from red_worker)
Alon Levy
alevy at redhat.com
Wed Jan 12 21:01:40 PST 2011
---
server/red_channel.c | 105 +++++++++++++++++---------------------------------
server/red_channel.h | 11 ++---
2 files changed, 41 insertions(+), 75 deletions(-)
diff --git a/server/red_channel.c b/server/red_channel.c
index 3c1aede..b6c13d1 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -143,21 +143,6 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
}
}
-static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
-{
- struct iovec *now = vec;
-
- while ((skip) && (skip >= now->iov_len)) {
- skip -= now->iov_len;
- --*vec_size;
- now++;
- }
-
- now->iov_base = (uint8_t *)now->iov_base + skip;
- now->iov_len -= skip;
- return now;
-}
-
static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
{
int n;
@@ -167,9 +152,9 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
if (!handler->size) { // nothing to be sent
return;
}
- handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
}
for (;;) {
+ handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
switch (errno) {
case EAGAIN:
@@ -187,27 +172,17 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
}
} else {
handler->pos += n;
- handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size);
- if (!handler->vec_size) {
- if (handler->pos == handler->size) { // finished writing data
- handler->on_msg_done(handler->opaque);
- handler->vec = handler->vec_buf;
- handler->pos = 0;
- handler->size = 0;
- return;
- } else {
- // There wasn't enough place for all the outgoing data in one iovec array.
- // Filling the rest of the data.
- handler->vec = handler->vec_buf;
- handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
- }
+ if (!handler->vec_size && handler->pos == handler->size) { // finished writing data
+ handler->on_msg_done(handler->opaque);
+ handler->vec = handler->vec_buf;
+ handler->pos = 0;
+ handler->size = 0;
+ return;
}
}
}
}
-static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
-
static void red_channel_peer_on_error(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
@@ -232,13 +207,16 @@ static void red_channel_peer_on_outgoing_error(void *opaque)
static int red_channel_peer_get_out_msg_size(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
+
return channel->send_data.size;
}
-static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
+static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
{
RedChannel *channel = (RedChannel *)opaque;
- red_channel_fill_iovec(channel, vec, vec_size);
+
+ *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
+ vec, MAX_SEND_VEC, pos);
}
static void red_channel_peer_on_out_block(void *opaque)
@@ -254,8 +232,6 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
channel->send_data.size = 0;
- channel->send_data.n_bufs = 0;
- channel->send_data.not_sent_buf_head = 0;
if (channel->send_data.item) {
channel->release_item(channel, channel->send_data.item, TRUE);
channel->send_data.item = NULL;
@@ -298,6 +274,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
channel->migrate = migrate;
ring_init(&channel->pipe);
+ channel->send_data.marshaller = spice_marshaller_new();
channel->incoming.opaque = channel;
channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
@@ -328,6 +305,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
return channel;
error:
+ spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
peer->cb_free(peer);
@@ -380,6 +358,7 @@ void red_channel_destroy(RedChannel *channel)
red_channel_pipe_clear(channel);
channel->core->watch_remove(channel->peer->watch);
channel->peer->cb_free(channel->peer);
+ spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
}
@@ -436,50 +415,30 @@ static void red_channel_event(int fd, int event, void *data)
}
}
-static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
-{
- int pos = channel->send_data.n_bufs++;
- ASSERT(pos < MAX_SEND_BUFS);
- channel->send_data.bufs[pos].size = size;
- channel->send_data.bufs[pos].data = data;
-}
-
void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
{
- __red_channel_add_buf(channel, data, size);
- channel->send_data.header.size += size;
+ spice_marshaller_add_ref(channel->send_data.marshaller, data, size);
+ channel->send_data.header->size += size;
}
void red_channel_reset_send_data(RedChannel *channel)
{
- channel->send_data.n_bufs = 0;
- channel->send_data.header.size = 0;
- channel->send_data.header.sub_list = 0;
- ++channel->send_data.header.serial;
- __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(SpiceDataHeader));
+ spice_marshaller_reset(channel->send_data.marshaller);
+ channel->send_data.header = (SpiceDataHeader *)
+ spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
+ spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
+ channel->send_data.header->type = 0;
+ channel->send_data.header->size = 0;
+ channel->send_data.header->sub_list = 0;
+ channel->send_data.header->serial = ++channel->send_data.serial;
}
void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
{
- channel->send_data.header.type = msg_type;
+ channel->send_data.header->type = msg_type;
channel->send_data.item = item;
}
-static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
-{
- BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
- ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs);
- *vec_size = 0;
- do {
- vec[*vec_size].iov_base = buf->data;
- vec[*vec_size].iov_len = buf->size;
- (*vec_size)++;
- buf++;
- channel->send_data.not_sent_buf_head++;
- } while (((*vec_size) < MAX_SEND_VEC) &&
- (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
-}
-
static void red_channel_send(RedChannel *channel)
{
red_peer_handle_outgoing(channel->peer, &channel->outgoing);
@@ -487,8 +446,11 @@ static void red_channel_send(RedChannel *channel)
void red_channel_begin_send_message(RedChannel *channel)
{
- channel->send_data.size = channel->send_data.header.size + sizeof(SpiceDataHeader);
+ spice_marshaller_flush(channel->send_data.marshaller);
+ channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
+ channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader);
channel->ack_data.messages_window++;
+ channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
red_channel_send(channel);
}
@@ -514,7 +476,12 @@ static void red_channel_push(RedChannel *channel)
uint64_t red_channel_get_message_serial(RedChannel *channel)
{
- return channel->send_data.header.serial;
+ return channel->send_data.serial;
+}
+
+void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
+{
+ channel->send_data.serial = serial;
}
void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
diff --git a/server/red_channel.h b/server/red_channel.h
index 30adfc6..893a7f8 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -61,7 +61,7 @@ typedef struct IncomingHandler {
} IncomingHandler;
typedef int (*get_outgoing_msg_size_proc)(void *opaque);
-typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
typedef void (*on_outgoing_error_proc)(void *opaque);
typedef void (*on_outgoing_block_proc)(void *opaque);
typedef void (*on_outgoing_msg_done_proc)(void *opaque);
@@ -125,18 +125,16 @@ struct RedChannel {
uint32_t pipe_size;
struct {
- SpiceDataHeader header;
+ SpiceMarshaller *marshaller;
+ SpiceDataHeader *header;
union {
SpiceMsgSetAck ack;
SpiceMsgMigrate migrate;
} u;
- uint32_t n_bufs;
- BufDescriptor bufs[MAX_SEND_BUFS];
uint32_t size;
- uint32_t not_sent_buf_head;
-
PipeItem *item;
int blocked;
+ uint64_t serial;
} send_data;
OutgoingHandler outgoing;
@@ -200,6 +198,7 @@ void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem
void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
uint64_t red_channel_get_message_serial(RedChannel *channel);
+void red_channel_set_message_serial(RedChannel *channel, uint64_t);
/* when sending a msg. should first call red_channel_begin_send_message */
void red_channel_begin_send_message(RedChannel *channel);
--
1.7.3.4
More information about the Spice-devel
mailing list