No subject


Wed Mar 2 00:13:32 PST 2011


---
 server/red_worker.c |  151 +++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 115 insertions(+), 36 deletions(-)

diff --git a/server/red_worker.c b/server/red_worker.c
index 683f8ad..3614eb8 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -354,6 +354,31 @@ typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item)
 typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, PipeItem *item, int item_pushed);
 typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type, void *message);
 
+#define MAX_SEND_VEC 100
+
+typedef int (*get_outgoing_msg_size_proc)(void *opaque);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
+typedef void (*on_outgoing_error_proc)(void *opaque);
+typedef void (*on_outgoing_block_proc)(void *opaque);
+typedef void (*on_outgoing_msg_done_proc)(void *opaque);
+
+typedef struct OutgoingHandler {
+    void *opaque;
+    struct iovec vec_buf[MAX_SEND_VEC];
+    int vec_size;
+    struct iovec *vec;
+    int pos;
+    int size;
+    get_outgoing_msg_size_proc get_msg_size;
+    prepare_outgoing_proc prepare;
+    on_outgoing_error_proc on_error;
+    on_outgoing_block_proc on_block;
+    on_outgoing_msg_done_proc on_msg_done;
+#ifdef RED_STATISTICS
+    uint64_t *out_bytes_counter;
+#endif
+} OutgoingHandler;
+
 struct RedChannel {
     spice_parse_channel_func_t parser;
     RedsStream *stream;
@@ -386,6 +411,8 @@ struct RedChannel {
         uint8_t *end;
     } incoming;
 
+    OutgoingHandler outgoing;
+
     channel_disconnect_proc disconnect;
     channel_hold_pipe_item_proc hold_item;
     channel_release_pipe_item_proc release_item;
@@ -393,12 +420,6 @@ struct RedChannel {
     channel_send_pipe_item_proc send_item;
 
     int during_send;
-
-#ifdef RED_STATISTICS
-    struct {
-        uint64_t *out_bytes_counter;
-    } outgoing;
-#endif
 };
 
 typedef struct ImageItem {
@@ -7277,8 +7298,6 @@ static inline void red_send_qxl_drawable(RedWorker *worker, DisplayChannel *disp
     display_begin_send_message(display_channel);
 }
 
-#define MAX_SEND_VEC 100
-
 static void inline channel_release_res(RedChannel *channel)
 {
     if (!channel->send_data.item) {
@@ -7288,47 +7307,83 @@ static void inline channel_release_res(RedChannel *channel)
     channel->send_data.item = NULL;
 }
 
-static void red_channel_send(RedChannel *channel)
+static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec,
+                                             int *vec_size, int pos)
 {
-    for (;;) {
-        ssize_t n = channel->send_data.size - channel->send_data.pos;
-        struct iovec vec[MAX_SEND_VEC];
-        size_t vec_size;
-
-        if (!n) {
-            channel->send_data.blocked = FALSE;
-            if (channel->send_data.item) {
-                channel->release_item(channel, channel->send_data.item, FALSE);
-                channel->send_data.item = NULL;
-            }
-            break;
+    RedChannel *channel = (RedChannel *)opaque;
+
+    *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
+                                            vec, MAX_SEND_VEC, pos);
+}
+
+static void red_channel_peer_on_out_block(void *opaque)
+{
+    RedChannel *channel = (RedChannel *)opaque;
+
+    channel->send_data.blocked = TRUE;
+}
+
+static void red_channel_peer_on_out_msg_done(void *opaque)
+{
+    RedChannel *channel = (RedChannel *)opaque;
+
+    channel->send_data.size = 0;
+    if (channel->send_data.item) {
+        channel->release_item(channel, channel->send_data.item, TRUE);
+        channel->send_data.item = NULL;
+    }
+    channel->send_data.blocked = FALSE;
+}
+
+static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
+{
+    int n;
+
+    ASSERT(stream);
+    if (handler->size == 0) {
+        handler->vec = handler->vec_buf;
+        handler->size = handler->get_msg_size(handler->opaque);
+        if (!handler->size) {  // nothing to be sent
+            return;
         }
-        vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
-                                               vec, MAX_SEND_VEC, channel->send_data.pos);
-        ASSERT(channel->stream);
-        n = reds_stream_writev(channel->stream, vec, vec_size);
+    }
+    for (;;) {
+        handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
+        n = reds_stream_writev(stream, handler->vec, handler->vec_size);
         if (n == -1) {
             switch (errno) {
             case EAGAIN:
-                channel->send_data.blocked = TRUE;
+                handler->on_block(handler->opaque);
                 return;
             case EINTR:
                 break;
             case EPIPE:
-                channel->disconnect(channel);
+                handler->on_error(handler->opaque);
                 return;
             default:
                 red_printf("%s", strerror(errno));
-                channel->disconnect(channel);
+                handler->on_error(handler->opaque);
                 return;
             }
         } else {
-            channel->send_data.pos += n;
-            stat_inc_counter(channel->outgoing.out_bytes_counter, n);
+            handler->pos += n;
+            stat_inc_counter(handler->out_bytes_counter, n);
+            if (handler->pos == handler->size) { // finished writing data
+                handler->on_msg_done(handler->opaque);
+                handler->vec = handler->vec_buf;
+                handler->pos = 0;
+                handler->size = 0;
+                return;
+            }
         }
     }
 }
 
+static void red_channel_send(RedChannel *channel)
+{
+    red_peer_handle_outgoing(channel->stream, &channel->outgoing);
+}
+
 static void display_channel_push_release(DisplayChannel *channel, uint8_t type, uint64_t id,
                                          uint64_t* sync_data)
 {
@@ -8244,16 +8299,17 @@ static void red_send_surface_destroy(DisplayChannel *display, uint32_t surface_i
     red_channel_begin_send_message(channel);
 }
 
+static inline int red_channel_waiting_for_ack(RedChannel *channel)
+{
+    return (channel->ack_data.messages_window > channel->ack_data.client_window * 2);
+}
+
 static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
 {
     PipeItem *item;
     if (!channel || channel->send_data.blocked ||
-                                            !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
-        return NULL;
-    }
-
-    if (channel->ack_data.messages_window > channel->ack_data.client_window * 2) {
-        channel->send_data.blocked = TRUE;
+        red_channel_waiting_for_ack(channel) ||
+        !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
         return NULL;
     }
 
@@ -9336,6 +9392,18 @@ static void free_common_channel_from_listener(EventListener *ctx)
     free(common);
 }
 
+static void red_channel_default_peer_on_error(RedChannel *channel)
+{
+    channel->disconnect(channel);
+}
+
+static int red_channel_peer_get_out_msg_size(void *opaque)
+{
+    RedChannel *channel = (RedChannel *)opaque;
+
+    return channel->send_data.size;
+}
+
 static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
                                  RedsStream *stream, int migrate,
                                  event_listener_action_proc handler,
@@ -9380,6 +9448,17 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
     ring_init(&channel->pipe);
     channel->send_data.marshaller = spice_marshaller_new();
 
+    channel->outgoing.opaque = channel;
+    channel->outgoing.pos = 0;
+    channel->outgoing.size = 0;
+    channel->outgoing.out_bytes_counter = 0;
+
+    channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
+    channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
+    channel->outgoing.on_block = red_channel_peer_on_out_block;
+    channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
+    channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
+
     event.events = EPOLLIN | EPOLLOUT | EPOLLET;
     event.data.ptr = &common->listener;
     if (epoll_ctl(worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) {
-- 
1.7.4.1



More information about the Spice-devel mailing list