[Spice-devel] [PATCH spice-server v2 01/12] dispatcher: Allows to manage messages without registering them

Frediano Ziglio fziglio at redhat.com
Tue Mar 26 19:10:27 UTC 2019


The only way to add new message to Dispatcher was to register
using a number. These numbers corresponded to array indexes.
This is good if the list of messages is allocated statically
and contiguously, on the contrary this method is not that
flexible.
Writing a header of 4 or 16 bytes using system call does not
make much difference so pass all message information in the
payload header.
A new dispatcher_send_message_custom function allows to send
a message passing all message information, including the
pointer to the handler.
This will allow for instance a Dispatcher associate to a given
thread to be reused by different classes.

Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
---
 server/dispatcher.c | 77 +++++++++++++++++++++++++++++++--------------
 server/dispatcher.h | 15 +++++++++
 2 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/server/dispatcher.c b/server/dispatcher.c
index 5f839ec4..ed4037aa 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -38,10 +38,19 @@
 static void setup_dummy_signal_handler(void);
 #endif
 
+#define DISPATCHER_CUSTOM_TYPE 0x7fffffffu
+
+/* structure to store message header information.
+ * That structure is sent through a socketpair so it's optimized
+ * to be transfered via sockets.
+ * Is also packaged to not leave holes in both 32 and 64 environments
+ * so memory instrumentation tools should not find uninitialised bytes.
+ */
 typedef struct DispatcherMessage {
-    size_t size;
-    bool ack;
     dispatcher_handle_message handler;
+    uint32_t size;
+    uint32_t type:31;
+    uint32_t ack:1;
 } DispatcherMessage;
 
 struct DispatcherPrivate {
@@ -249,12 +258,11 @@ static int write_safe(int fd, uint8_t *buf, size_t size)
 static int dispatcher_handle_single_read(Dispatcher *dispatcher)
 {
     int ret;
-    uint32_t type;
-    DispatcherMessage *msg = NULL;
-    uint8_t *payload = dispatcher->priv->payload;
+    DispatcherMessage msg[1];
+    uint8_t *payload;
     uint32_t ack = ACK;
 
-    if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) {
+    if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg, sizeof(msg), 0)) == -1) {
         g_warning("error reading from dispatcher: %d", errno);
         return 0;
     }
@@ -262,28 +270,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
         /* no message */
         return 0;
     }
-    if (type >= dispatcher->priv->max_message_type) {
-        spice_error("Invalid message type for this dispatcher: %u", type);
-        return 0;
+    if (G_UNLIKELY(msg->size > dispatcher->priv->payload_size)) {
+        dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
+        dispatcher->priv->payload_size = msg->size;
     }
-    msg = &dispatcher->priv->messages[type];
+    payload = dispatcher->priv->payload;
     if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) {
         g_warning("error reading from dispatcher: %d", errno);
         /* TODO: close socketpair? */
         return 0;
     }
-    if (dispatcher->priv->any_handler) {
-        dispatcher->priv->any_handler(dispatcher->priv->opaque, type, payload);
+    if (dispatcher->priv->any_handler && msg->type != DISPATCHER_CUSTOM_TYPE) {
+        dispatcher->priv->any_handler(dispatcher->priv->opaque, msg->type, payload);
     }
     if (msg->handler) {
         msg->handler(dispatcher->priv->opaque, payload);
     } else {
-        g_warning("error: no handler for message type %d", type);
+        g_warning("error: no handler for message type %d", msg->type);
     }
     if (msg->ack) {
         if (write_safe(dispatcher->priv->recv_fd,
                        (uint8_t*)&ack, sizeof(ack)) == -1) {
-            g_warning("error writing ack for message %d", type);
+            g_warning("error writing ack for message %d", msg->type);
             /* TODO: close socketpair? */
         }
     }
@@ -300,25 +308,22 @@ void dispatcher_handle_recv_read(Dispatcher *dispatcher)
     }
 }
 
-void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
-                             void *payload)
+static void
+dispatcher_send_message_internal(Dispatcher *dispatcher, const DispatcherMessage*msg,
+                                 void *payload)
 {
-    DispatcherMessage *msg;
     uint32_t ack;
     int send_fd = dispatcher->priv->send_fd;
 
-    assert(dispatcher->priv->max_message_type > message_type);
-    assert(dispatcher->priv->messages[message_type].handler);
-    msg = &dispatcher->priv->messages[message_type];
     pthread_mutex_lock(&dispatcher->priv->lock);
-    if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) {
+    if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
         g_warning("error: failed to send message type for message %d",
-                  message_type);
+                  msg->type);
         goto unlock;
     }
     if (write_safe(send_fd, payload, msg->size) == -1) {
         g_warning("error: failed to send message body for message %d",
-                  message_type);
+                  msg->type);
         goto unlock;
     }
     if (msg->ack) {
@@ -326,7 +331,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
             g_warning("error: failed to read ack");
         } else if (ack != ACK) {
             g_warning("error: got wrong ack value in dispatcher "
-                      "for message %d\n", message_type);
+                      "for message %d\n", msg->type);
             /* TODO handling error? */
         }
     }
@@ -334,6 +339,29 @@ unlock:
     pthread_mutex_unlock(&dispatcher->priv->lock);
 }
 
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+                             void *payload)
+{
+    DispatcherMessage *msg;
+
+    assert(dispatcher->priv->max_message_type > message_type);
+    assert(dispatcher->priv->messages[message_type].handler);
+    msg = &dispatcher->priv->messages[message_type];
+    dispatcher_send_message_internal(dispatcher, msg, payload);
+}
+
+void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_message handler,
+                                    void *payload, uint32_t payload_size, bool ack)
+{
+    DispatcherMessage msg = {
+        .handler = handler,
+        .size = payload_size,
+        .type = DISPATCHER_CUSTOM_TYPE,
+        .ack = ack,
+    };
+    dispatcher_send_message_internal(dispatcher, &msg, payload);
+}
+
 void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
                                  dispatcher_handle_message handler,
                                  size_t size, bool ack)
@@ -345,6 +373,7 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
     msg = &dispatcher->priv->messages[message_type];
     msg->handler = handler;
     msg->size = size;
+    msg->type = message_type;
     msg->ack = ack;
     if (msg->size > dispatcher->priv->payload_size) {
         dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
diff --git a/server/dispatcher.h b/server/dispatcher.h
index bb968e56..a7aaebed 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -99,6 +99,21 @@ typedef void (*dispatcher_handle_any_message)(void *opaque,
 void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
                              void *payload);
 
+/* dispatcher_send_message_custom
+ *
+ * Sends a message to the receiving thread.
+ *
+ * If the sent message is a message type requires an ACK, this function will
+ * block until it receives an ACK from the receiving thread.
+ *
+ * @handler:      callback to handle message
+ * @payload:      payload
+ * @payload_size: size of payload
+ * @ack:          acknowledge required. Make message synchronous
+ */
+void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_message handler,
+                                    void *payload, uint32_t payload_size, bool ack);
+
 /* dispatcher_register_handler
  *
  * This function registers a message type with the dispatcher, and registers
-- 
2.20.1



More information about the Spice-devel mailing list