[Spice-devel] [PATCH 04/12] server: red_channel: add optional parser and separate incoming/outgoing error handlers for later inputs/main channel usage

Alon Levy alevy at redhat.com
Mon Dec 6 02:49:11 PST 2010


---
 server/red_channel.c |   80 ++++++++++++++++++++++++++++++++++++++++++++++++--
 server/red_channel.h |   30 ++++++++++++++++++
 2 files changed, 107 insertions(+), 3 deletions(-)

diff --git a/server/red_channel.c b/server/red_channel.c
index f547f5a..be861fd 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -68,6 +68,9 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
 static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
 {
     int bytes_read;
+    uint8_t *parsed;
+    size_t parsed_size;
+    message_destructor_t parsed_free;
 
     for (;;) {
         int ret_handle;
@@ -105,8 +108,25 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
             }
         }
 
-        ret_handle = handler->handle_message(handler->opaque, &handler->header,
-                                             handler->msg);
+        if (handler->parser) {
+            parsed = handler->parser(handler->msg, handler->msg + handler->header.size, handler->header.type,
+                                     SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
+            if (parsed == NULL) {
+                red_printf("failed to parse message type %d", handler->header.type);
+                handler->on_error(handler->opaque);
+                return;
+            }
+            ret_handle = handler->handle_parsed(handler->opaque, parsed_size,
+                                    handler->header.type, parsed);
+            parsed_free(parsed);
+        } else {
+            ret_handle = handler->handle_message(handler->opaque, &handler->header,
+                                                 handler->msg);
+        }
+        if (handler->shut) {
+            handler->on_error(handler->opaque);
+            return;
+        }
         handler->msg_pos = 0;
         handler->msg = NULL;
         handler->header_pos = 0;
@@ -183,13 +203,27 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
 
 static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
 
-
 static void red_channel_peer_on_error(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
+
     channel->disconnect(channel);
 }
 
+static void red_channel_peer_on_incoming_error(void *opaque)
+{
+    RedChannel *channel = (RedChannel *)opaque;
+
+    channel->on_incoming_error(channel);
+}
+
+static void red_channel_peer_on_outgoing_error(void *opaque)
+{
+    RedChannel *channel = (RedChannel *)opaque;
+
+    channel->on_outgoing_error(channel);
+}
+
 static int red_channel_peer_get_out_msg_size(void *opaque)
 {
     RedChannel *channel = (RedChannel *)opaque;
@@ -276,6 +310,8 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
     channel->outgoing.on_error = red_channel_peer_on_error;
     channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
 
+    channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
+
     if (!config_socket(channel)) {
         goto error;
     }
@@ -293,6 +329,44 @@ error:
     return NULL;
 }
 
+void do_nothing_disconnect(RedChannel *red_channel)
+{
+}
+
+int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header, uint8_t *msg)
+{
+    return TRUE;
+}
+
+RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
+                               SpiceCoreInterface *core,
+                               int migrate, int handle_acks,
+                               channel_configure_socket_proc config_socket,
+                               spice_parse_channel_func_t parser,
+                               channel_handle_parsed_proc handle_parsed,
+                               channel_alloc_msg_recv_buf_proc alloc_recv_buf,
+                               channel_release_msg_recv_buf_proc release_recv_buf,
+                               channel_send_pipe_item_proc send_item,
+                               channel_release_pipe_item_proc release_item,
+                               channel_on_incoming_error_proc incoming_error,
+                               channel_on_outgoing_error_proc outgoing_error)
+{
+    RedChannel *channel = red_channel_create(size, peer,
+        core, migrate, handle_acks, config_socket, do_nothing_disconnect, do_nothing_handle_message,
+        alloc_recv_buf, release_recv_buf, send_item, release_item);
+
+    if (channel == NULL) {
+        return NULL;
+    }
+    channel->incoming.handle_parsed = (handle_parsed_proc)handle_parsed;
+    channel->incoming.parser = parser;
+    channel->on_incoming_error = incoming_error;
+    channel->on_outgoing_error = outgoing_error;
+    channel->incoming.on_error = red_channel_peer_on_incoming_error;
+    channel->outgoing.on_error = red_channel_peer_on_outgoing_error;
+    return channel;
+}
+
 void red_channel_destroy(RedChannel *channel)
 {
     if (!channel) {
diff --git a/server/red_channel.h b/server/red_channel.h
index 509da77..24f969b 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -26,6 +26,7 @@
 #include "reds.h"
 #include "spice.h"
 #include "ring.h"
+#include "server/demarshallers.h"
 
 #define MAX_SEND_BUFS 1000
 #define MAX_SEND_VEC 50
@@ -37,6 +38,7 @@
 
 typedef int (*handle_message_proc)(void *opaque,
                                    SpiceDataHeader *header, uint8_t *msg);
+typedef int (*handle_parsed_proc)(void *opaque, size_t size, uint32_t type, void *message);
 typedef uint8_t *(*alloc_msg_recv_buf_proc)(void *opaque, SpiceDataHeader *msg_header);
 typedef void (*release_msg_recv_buf_proc)(void *opaque,
                                           SpiceDataHeader *msg_header, uint8_t *msg);
@@ -52,6 +54,10 @@ typedef struct IncomingHandler {
     alloc_msg_recv_buf_proc alloc_msg_buf;
     on_incoming_error_proc on_error; // recv error or handle_message error
     release_msg_recv_buf_proc release_msg_buf; // for errors
+    // The following is an optional alternative to handle_message, used if not null
+    spice_parse_channel_func_t parser;
+    handle_parsed_proc handle_parsed;
+    int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX
 } IncomingHandler;
 
 typedef int (*get_outgoing_msg_size_proc)(void *opaque);
@@ -89,6 +95,8 @@ typedef struct RedChannel RedChannel;
 
 typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel,
                                                     SpiceDataHeader *msg_header);
+typedef int (*channel_handle_parsed_proc)(RedChannel *channel, size_t size, uint32_t type,
+                                        void *message);
 typedef int (*channel_handle_message_proc)(RedChannel *channel,
                                            SpiceDataHeader *header, uint8_t *msg);
 typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel,
@@ -98,6 +106,8 @@ typedef int (*channel_configure_socket_proc)(RedChannel *channel);
 typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item);
 typedef void (*channel_release_pipe_item_proc)(RedChannel *channel,
                                                PipeItem *item, int item_pushed);
+typedef void (*channel_on_incoming_error_proc)(RedChannel *channel);
+typedef void (*channel_on_outgoing_error_proc)(RedChannel *channel);
 
 struct RedChannel {
     RedsStreamContext *peer;
@@ -137,6 +147,11 @@ struct RedChannel {
     channel_release_pipe_item_proc release_item;
 
     int during_send;
+    /* Stuff below added for Main and Inputs channels switch to RedChannel
+     * (might be removed later) */
+    channel_on_incoming_error_proc on_incoming_error; /* alternative to disconnect */
+    channel_on_outgoing_error_proc on_outgoing_error;
+    int shut; /* signal channel is to be closed */
 };
 
 /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
@@ -152,6 +167,21 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
                                channel_send_pipe_item_proc send_item,
                                channel_release_pipe_item_proc release_item);
 
+/* alternative constructor, meant for marshaller based (inputs,main) channels,
+ * will become default eventually */
+RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
+                               SpiceCoreInterface *core,
+                               int migrate, int handle_acks,
+                               channel_configure_socket_proc config_socket,
+                               spice_parse_channel_func_t parser,
+                               channel_handle_parsed_proc handle_parsed,
+                               channel_alloc_msg_recv_buf_proc alloc_recv_buf,
+                               channel_release_msg_recv_buf_proc release_recv_buf,
+                               channel_send_pipe_item_proc send_item,
+                               channel_release_pipe_item_proc release_item,
+                               channel_on_incoming_error_proc incoming_error,
+                               channel_on_outgoing_error_proc outgoing_error);
+
 void red_channel_destroy(RedChannel *channel);
 
 void red_channel_shutdown(RedChannel *channel);
-- 
1.7.3.2



More information about the Spice-devel mailing list