[Spice-devel] [PATCH 05/13] server: inputs_channel: use red_channel

Alon Levy alevy at redhat.com
Tue Dec 7 04:44:27 PST 2010


---
 server/inputs_channel.c |  407 +++++++++++++++--------------------------------
 1 files changed, 125 insertions(+), 282 deletions(-)

diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index f3e71f3..bcd1a86 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -28,9 +28,10 @@
 #include "server/demarshallers.h"
 #include "server/generated_marshallers.h"
 #include "spice.h"
-#include "inputs_channel.h"
 #include "red_common.h"
 #include "reds.h"
+#include "red_channel.h"
+#include "inputs_channel.h"
 
 // TODO: RECEIVE_BUF_SIZE used to be the same for inputs_channel and main_channel
 // since it was defined once in reds.c which contained both.
@@ -43,39 +44,26 @@
 #define RECEIVE_BUF_SIZE \
     (4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * SPICE_AGENT_MAX_DATA_SIZE)
 
-#define SEND_BUF_SIZE 4096
-
-typedef struct IncomingHandler {
-    spice_parse_channel_func_t parser;
-    void *opaque;
-    int shut;
-    uint8_t buf[RECEIVE_BUF_SIZE];
-    uint32_t end_pos;
-    void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
-} IncomingHandler;
-
-typedef struct OutgoingHandler {
-    void *opaque;
-    uint8_t buf[SEND_BUF_SIZE];
-    uint8_t *now;
-    uint32_t length;
-    void (*select)(void *opaque, int select);
-    void (*may_write)(void *opaque);
-} OutgoingHandler;
-
-
-// Temporarily here to make splitting reds.c to inputs_channel.c easier,
-// TODO - remove from here, leave private to inputs_channel.c
 typedef struct InputsChannel {
-    Channel *channel;
-    RedsStreamContext *peer;
-    IncomingHandler in_handler;
-    OutgoingHandler out_handler;
+    RedChannel base;
+    uint8_t recv_buf[RECEIVE_BUF_SIZE];
     VDAgentMouseState mouse_state;
     uint32_t motion_count;
-    uint64_t serial; //migrate me
 } InputsChannel;
 
+enum {
+    PIPE_ITEM_INIT = SPICE_MSG_INPUTS_INIT,
+    PIPE_ITEM_MOUSE_MOTION_ACK = SPICE_MSG_INPUTS_MOUSE_MOTION_ACK,
+    PIPE_ITEM_KEY_MODIFIERS = SPICE_MSG_INPUTS_KEY_MODIFIERS,
+    PIPE_ITEM_MIGRATE = SPICE_MSG_MIGRATE,
+};
+
+typedef struct InputsPipeItem {
+    PipeItem base;
+    SpiceMarshaller *m;
+    uint8_t *data;      /* If the marshaller malloced, pointer is here */
+} InputsPipeItem;
+
 static SpiceKbdInstance *keyboard = NULL;
 static SpiceMouseInstance *mouse = NULL;
 static SpiceTabletInstance *tablet = NULL;
@@ -153,132 +141,22 @@ const VDAgentMouseState *inputs_get_mouse_state(void)
     return &inputs_channel->mouse_state;
 }
 
-static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
 {
-    for (;;) {
-        uint8_t *buf = handler->buf;
-        uint32_t pos = handler->end_pos;
-        uint8_t *end = buf + pos;
-        SpiceDataHeader *header;
-        int n;
-        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
-        if (n <= 0) {
-            if (n == 0) {
-                return -1;
-            }
-            switch (errno) {
-            case EAGAIN:
-                return 0;
-            case EINTR:
-                break;
-            case EPIPE:
-                return -1;
-            default:
-                red_printf("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            pos += n;
-            end = buf + pos;
-            while (buf + sizeof(SpiceDataHeader) <= end &&
-                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
-                uint8_t *data = (uint8_t *)(header+1);
-                size_t parsed_size;
-                uint8_t *parsed;
-                message_destructor_t parsed_free;
-
-
-                buf += sizeof(SpiceDataHeader) + header->size;
-                parsed = handler->parser(data, data + header->size, header->type,
-                                         SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
-                if (parsed == NULL) {
-                    red_printf("failed to parse message type %d", header->type);
-                    return -1;
-                }
-                handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
-                parsed_free(parsed);
-                if (handler->shut) {
-                    return -1;
-                }
-            }
-            memmove(handler->buf, buf, (handler->end_pos = end - buf));
-        }
-    }
+    InputsChannel *inputs_channel = SPICE_CONTAINEROF(channel, InputsChannel, base);
+
+    return inputs_channel->recv_buf;
 }
 
-static int handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
+static void inputs_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
+                                               uint8_t *msg)
 {
-    if (!handler->length) {
-        return 0;
-    }
-
-    while (handler->length) {
-        int n;
-
-        n = peer->cb_write(peer->ctx, handler->now, handler->length);
-        if (n <= 0) {
-            if (n == 0) {
-                return -1;
-            }
-            switch (errno) {
-            case EAGAIN:
-                return 0;
-            case EINTR:
-                break;
-            case EPIPE:
-                return -1;
-            default:
-                red_printf("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            handler->now += n;
-            handler->length -= n;
-        }
-    }
-    handler->select(handler->opaque, FALSE);
-    handler->may_write(handler->opaque);
-    return 0;
 }
 
 #define OUTGOING_OK 0
 #define OUTGOING_FAILED -1
 #define OUTGOING_BLOCKED 1
 
-static int outgoing_write(RedsStreamContext *peer, OutgoingHandler *handler, void *in_data,
-                          int length)
-{
-    uint8_t *data = in_data;
-    ASSERT(length <= SEND_BUF_SIZE);
-    if (handler->length) {
-        return OUTGOING_BLOCKED;
-    }
-
-    while (length) {
-        int n = peer->cb_write(peer->ctx, data, length);
-        if (n < 0) {
-            switch (errno) {
-            case EAGAIN:
-                handler->length = length;
-                memcpy(handler->buf, data, length);
-                handler->select(handler->opaque, TRUE);
-                return OUTGOING_OK;
-            case EINTR:
-                break;
-            case EPIPE:
-                return OUTGOING_FAILED;
-            default:
-                red_printf("%s", strerror(errno));
-                return OUTGOING_FAILED;
-            }
-        } else {
-            data += n;
-            length -= n;
-        }
-    }
-    return OUTGOING_OK;
-}
-
 #define RED_MOUSE_STATE_TO_LOCAL(state)     \
     ((state & SPICE_MOUSE_BUTTON_MASK_LEFT) |          \
      ((state & SPICE_MOUSE_BUTTON_MASK_MIDDLE) << 1) |   \
@@ -316,52 +194,64 @@ static uint8_t kbd_get_leds(SpiceKbdInstance *sin)
     return sif->get_leds(sin);
 }
 
-static SpiceMarshaller *marshaller_new_for_outgoing(InputsChannel *state, int type)
+static InputsPipeItem *inputs_pipe_item_new(InputsChannel *channel, int type)
 {
-    SpiceMarshaller *m;
-    SpiceDataHeader *header;
+    InputsPipeItem *item = spice_malloc(sizeof(InputsPipeItem));
 
-    m = spice_marshaller_new();
-    header = (SpiceDataHeader *)
-        spice_marshaller_reserve_space(m, sizeof(SpiceDataHeader));
-    header->serial = ++state->serial;
-    header->type = type;
-    header->sub_list = 0;
+    red_channel_pipe_item_init(&channel->base, &item->base, type);
+    item->m = spice_marshaller_new();
+    item->data = NULL;
+    return item;
+}
 
-    return m;
+// Right now every PipeItem we add is an InputsPipeItem, later maybe make it simpler
+// for type only PipeItems
+static void inputs_pipe_add_type(InputsChannel *channel, int type)
+{
+    InputsPipeItem* pipe_item = inputs_pipe_item_new(channel, type);
+
+    red_channel_pipe_add(&channel->base, &pipe_item->base);
+}
+
+static void inputs_channel_release_pipe_item(RedChannel *channel,
+    PipeItem *base, int item_pushed)
+{
+    // All PipeItems we push are InputsPipeItem
+    InputsPipeItem *item = (InputsPipeItem*)base;
+
+    if (item->data) {
+        free(item->data);
+    }
+    spice_marshaller_destroy(item->m);
+    free(item);
 }
 
-static int marshaller_outgoing_write(SpiceMarshaller *m,
-                                     InputsChannel *state)
+static void inputs_channel_send_item(RedChannel *channel, PipeItem *base)
 {
-    SpiceDataHeader *header = (SpiceDataHeader *)spice_marshaller_get_ptr(m);
+    InputsPipeItem *item = SPICE_CONTAINEROF(base, InputsPipeItem, base);
+    SpiceMarshaller *m = item->m;
     uint8_t *data;
     size_t len;
     int free_data;
 
+    red_channel_reset_send_data(channel);
+    red_channel_init_send_data(channel, base->type, base);
     spice_marshaller_flush(m);
-    header->size = spice_marshaller_get_total_size(m) - sizeof(SpiceDataHeader);
-
+    // TODO: use spice_marshaller_fill_iovec. Right now we are doing something stupid,
+    // namely copying twice. See reds.c.
     data = spice_marshaller_linearize(m, 0, &len, &free_data);
-
-    if (outgoing_write(state->peer, &state->out_handler, data, len) != OUTGOING_OK) {
-        return FALSE;
+    item->data = (free_data && len > 0) ? data : NULL;
+    if (len > 0) {
+        red_channel_add_buf(channel, data, len);
     }
-
-    if (free_data) {
-        free(data);
-    }
-
-    return TRUE;
+    red_channel_begin_send_message(channel);
 }
 
-
-static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *message)
+static int inputs_channel_handle_parsed(RedChannel *channel, size_t size, uint32_t type, void *message)
 {
-    InputsChannel *state = (InputsChannel *)opaque;
     uint8_t *buf = (uint8_t *)message;
-    SpiceMarshaller *m;
 
+    ASSERT(inputs_channel == (InputsChannel*)channel);
     switch (type) {
     case SPICE_MSGC_INPUTS_KEY_DOWN: {
         SpiceMsgcKeyDown *key_up = (SpiceMsgcKeyDown *)buf;
@@ -382,13 +272,8 @@ static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *
     case SPICE_MSGC_INPUTS_MOUSE_MOTION: {
         SpiceMsgcMouseMotion *mouse_motion = (SpiceMsgcMouseMotion *)buf;
 
-        if (++state->motion_count % SPICE_INPUT_MOTION_ACK_BUNCH == 0) {
-            m = marshaller_new_for_outgoing(state, SPICE_MSG_INPUTS_MOUSE_MOTION_ACK);
-            if (!marshaller_outgoing_write(m, state)) {
-                red_printf("motion ack failed");
-                reds_disconnect();
-            }
-            spice_marshaller_destroy(m);
+        if (++inputs_channel->motion_count % SPICE_INPUT_MOTION_ACK_BUNCH == 0) {
+            inputs_pipe_add_type(inputs_channel, PIPE_ITEM_MOUSE_MOTION_ACK);
         }
         if (mouse && reds_get_mouse_mode() == SPICE_MOUSE_MODE_SERVER) {
             SpiceMouseInterface *sif;
@@ -402,13 +287,8 @@ static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *
     case SPICE_MSGC_INPUTS_MOUSE_POSITION: {
         SpiceMsgcMousePosition *pos = (SpiceMsgcMousePosition *)buf;
 
-        if (++state->motion_count % SPICE_INPUT_MOTION_ACK_BUNCH == 0) {
-            m = marshaller_new_for_outgoing(state, SPICE_MSG_INPUTS_MOUSE_MOTION_ACK);
-            if (!marshaller_outgoing_write(m, state)) {
-                red_printf("position ack failed");
-                reds_disconnect();
-            }
-            spice_marshaller_destroy(m);
+        if (++inputs_channel->motion_count % SPICE_INPUT_MOTION_ACK_BUNCH == 0) {
+            inputs_pipe_add_type(inputs_channel, PIPE_ITEM_MOUSE_MOTION_ACK);
         }
         if (reds_get_mouse_mode() != SPICE_MOUSE_MODE_CLIENT) {
             break;
@@ -420,7 +300,7 @@ static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *
             sif->position(tablet, pos->x, pos->y, RED_MOUSE_STATE_TO_LOCAL(pos->buttons_state));
             break;
         }
-        VDAgentMouseState *mouse_state = &state->mouse_state;
+        VDAgentMouseState *mouse_state = &inputs_channel->mouse_state;
         mouse_state->x = pos->x;
         mouse_state->y = pos->y;
         mouse_state->buttons = RED_MOUSE_BUTTON_STATE_TO_AGENT(pos->buttons_state);
@@ -506,7 +386,9 @@ static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *
         break;
     default:
         red_printf("unexpected type %d", type);
+        return FALSE;
     }
+    return TRUE;
 }
 
 static void inputs_relase_keys(void)
@@ -519,146 +401,107 @@ static void inputs_relase_keys(void)
     kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
 }
 
-static void inputs_event(int fd, int event, void *data)
+static void inputs_channel_on_incoming_error(RedChannel *channel)
 {
-    if (data != inputs_channel) {
-        return; // shutdown already happened
-    }
-    if (event & SPICE_WATCH_EVENT_READ) {
-        if (handle_incoming(inputs_channel->peer, &inputs_channel->in_handler)) {
-            inputs_relase_keys();
-            core->watch_remove(inputs_channel->peer->watch);
-            inputs_channel->peer->watch = NULL;
-            if (inputs_channel->channel) {
-                inputs_channel->channel->data = NULL;
-            }
-            inputs_channel->peer->cb_free(inputs_channel->peer);
-            free(inputs_channel);
-            inputs_channel = NULL;
-        }
-    }
-    if (event & SPICE_WATCH_EVENT_WRITE) {
-        if (handle_outgoing(inputs_channel->peer, &inputs_channel->out_handler)) {
-            reds_disconnect();
-        }
-    }
+    inputs_relase_keys();
+    red_channel_destroy(channel);
 }
 
+static void inputs_channel_on_outgoing_error(RedChannel *channel)
+{
+    reds_disconnect();
+}
 
 static void inputs_shutdown(Channel *channel)
 {
-    InputsChannel *state = (InputsChannel *)channel->data;
-    if (state) {
-        state->in_handler.shut = TRUE;
-        shutdown(state->peer->socket, SHUT_RDWR);
+    ASSERT(inputs_channel == (InputsChannel *)channel->data);
+    if (inputs_channel) {
+        red_channel_shutdown(&inputs_channel->base);
+        inputs_channel->base.incoming.shut = TRUE;
         channel->data = NULL;
-        state->channel = NULL;
         inputs_channel = NULL;
     }
 }
 
 static void inputs_migrate(Channel *channel)
 {
-    InputsChannel *state = (InputsChannel *)channel->data;
-    SpiceMarshaller *m;
+    InputsPipeItem *pipe_item = inputs_pipe_item_new(inputs_channel, PIPE_ITEM_MIGRATE);
+    SpiceMarshaller *m = pipe_item->m;
     SpiceMsgMigrate migrate;
 
-    m = marshaller_new_for_outgoing(state, SPICE_MSG_MIGRATE);
-
+    ASSERT(inputs_channel == (InputsChannel *)channel->data);
     migrate.flags = 0;
     spice_marshall_msg_migrate(m, &migrate);
-
-    if (!marshaller_outgoing_write(m, state)) {
-        red_printf("write failed");
-    }
-    spice_marshaller_destroy(m);
+    red_channel_pipe_add(&inputs_channel->base, &pipe_item->base);
 }
 
-static void inputs_select(void *opaque, int select)
+static void inputs_pipe_add_init(InputsChannel *channel)
 {
-    int eventmask = SPICE_WATCH_EVENT_READ;
-    red_printf("");
+    SpiceMsgInputsInit inputs_init;
+    InputsPipeItem *pipe_item = inputs_pipe_item_new(channel, PIPE_ITEM_INIT);
+    SpiceMarshaller *m = pipe_item->m;
 
-    ASSERT(opaque == inputs_channel);
-    if (select) {
-        eventmask |= SPICE_WATCH_EVENT_WRITE;
-    }
-    core->watch_update_mask(inputs_channel->peer->watch, eventmask);
+    inputs_init.keyboard_modifiers = kbd_get_leds(keyboard);
+    spice_marshall_msg_inputs_init(m, &inputs_init);
+    red_channel_pipe_add(&channel->base, &pipe_item->base);
 }
 
-static void inputs_may_write(void *opaque)
+static int inputs_channel_config_socket(RedChannel *channel)
 {
-    red_printf("");
+    int flags;
+    int delay_val = 1;
+
+    if (setsockopt(channel->peer->socket, IPPROTO_TCP, TCP_NODELAY,
+            &delay_val, sizeof(delay_val)) == -1) {
+        red_printf("setsockopt failed, %s", strerror(errno));
+        return FALSE;
+    }
+
+    if ((flags = fcntl(channel->peer->socket, F_GETFL)) == -1 ||
+                 fcntl(channel->peer->socket, F_SETFL, flags | O_ASYNC) == -1) {
+        red_printf("fcntl failed, %s", strerror(errno));
+        return FALSE;
+    }
+    return TRUE;
 }
 
 static void inputs_link(Channel *channel, RedsStreamContext *peer, int migration,
                         int num_common_caps, uint32_t *common_caps, int num_caps,
                         uint32_t *caps)
 {
-    int delay_val;
-    int flags;
-
     red_printf("");
     ASSERT(channel->data == NULL);
 
-    inputs_channel = spice_new0(InputsChannel, 1);
-
-    delay_val = 1;
-    if (setsockopt(peer->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, sizeof(delay_val)) == -1) {
-        red_printf("setsockopt failed, %s", strerror(errno));
-    }
-
-    if ((flags = fcntl(peer->socket, F_GETFL)) == -1 ||
-                                            fcntl(peer->socket, F_SETFL, flags | O_ASYNC) == -1) {
-        red_printf("fcntl failed, %s", strerror(errno));
-    }
-
-    inputs_channel->peer = peer;
-    inputs_channel->channel = channel;
-    inputs_channel->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL);
-    inputs_channel->in_handler.opaque = inputs_channel;
-    inputs_channel->in_handler.handle_message = inputs_handle_input;
-    inputs_channel->out_handler.length = 0;
-    inputs_channel->out_handler.opaque = inputs_channel;
-    inputs_channel->out_handler.select = inputs_select;
-    inputs_channel->out_handler.may_write = inputs_may_write;
+    inputs_channel = (InputsChannel*)red_channel_create_parser(
+        sizeof(*inputs_channel), peer, core, migration, FALSE /* handle_acks */
+        ,inputs_channel_config_socket
+        ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
+        ,inputs_channel_handle_parsed
+        ,inputs_channel_alloc_msg_rcv_buf
+        ,inputs_channel_release_msg_rcv_buf
+        ,inputs_channel_send_item
+        ,inputs_channel_release_pipe_item
+        ,inputs_channel_on_incoming_error
+        ,inputs_channel_on_outgoing_error);
+    ASSERT(inputs_channel);
     channel->data = inputs_channel;
-    peer->watch = core->watch_add(peer->socket, SPICE_WATCH_EVENT_READ,
-                                  inputs_event, inputs_channel);
-
-    SpiceMarshaller *m;
-    SpiceMsgInputsInit inputs_init;
-    m = marshaller_new_for_outgoing(inputs_channel, SPICE_MSG_INPUTS_INIT);
-    inputs_init.keyboard_modifiers = kbd_get_leds(keyboard);
-    spice_marshall_msg_inputs_init(m, &inputs_init);
-    if (!marshaller_outgoing_write(m, inputs_channel)) {
-        red_printf("failed to send modifiers state");
-        reds_disconnect();
-    }
-    spice_marshaller_destroy(m);
+    inputs_pipe_add_init(inputs_channel);
 }
 
 void inputs_send_keyboard_modifiers(uint8_t modifiers)
 {
     SpiceMsgInputsKeyModifiers key_modifiers;
+    InputsPipeItem *pipe_item;
     SpiceMarshaller *m;
 
     if (!inputs_channel) {
         return;
     }
-    ASSERT(inputs_channel->peer);
-
-    m = marshaller_new_for_outgoing(inputs_channel,
-                    SPICE_MSG_INPUTS_KEY_MODIFIERS);
-
+    pipe_item = inputs_pipe_item_new(inputs_channel, PIPE_ITEM_KEY_MODIFIERS);
+    m = pipe_item->m;
     key_modifiers.modifiers = modifiers;
     spice_marshall_msg_inputs_key_modifiers(m, &key_modifiers);
-
-    if (!marshaller_outgoing_write(m, inputs_channel)) {
-        red_printf("failed to send modifiers state");
-        reds_disconnect();
-    }
-    spice_marshaller_destroy(m);
+    red_channel_pipe_add(&inputs_channel->base, &pipe_item->base);
 }
 
 void inputs_on_keyboard_leds_change(void *opaque, uint8_t leds)
-- 
1.7.3.2



More information about the Spice-devel mailing list