[Spice-devel] [PATCH spice-server 02/10] dispatcher: Allows to manage message without registering them

Frediano Ziglio fziglio at redhat.com
Wed Mar 20 09:59:11 UTC 2019


Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
---
 server/dispatcher.c | 71 ++++++++++++++++++++++++++++++---------------
 server/dispatcher.h | 15 ++++++++++
 2 files changed, 62 insertions(+), 24 deletions(-)

diff --git a/server/dispatcher.c b/server/dispatcher.c
index 5f839ec4..0b18b32d 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -38,10 +38,13 @@
 static void setup_dummy_signal_handler(void);
 #endif
 
+#define DISPATCHER_GENERIC_TYPE 0x7fffffffu
+
 typedef struct DispatcherMessage {
-    size_t size;
-    bool ack;
     dispatcher_handle_message handler;
+    uint32_t size;
+    uint32_t type:31;
+    uint32_t ack:1;
 } DispatcherMessage;
 
 struct DispatcherPrivate {
@@ -249,12 +252,11 @@ static int write_safe(int fd, uint8_t *buf, size_t size)
 static int dispatcher_handle_single_read(Dispatcher *dispatcher)
 {
     int ret;
-    uint32_t type;
-    DispatcherMessage *msg = NULL;
-    uint8_t *payload = dispatcher->priv->payload;
+    DispatcherMessage msg[1];
+    uint8_t *payload;
     uint32_t ack = ACK;
 
-    if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) {
+    if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg, sizeof(msg), 0)) == -1) {
         g_warning("error reading from dispatcher: %d", errno);
         return 0;
     }
@@ -262,28 +264,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
         /* no message */
         return 0;
     }
-    if (type >= dispatcher->priv->max_message_type) {
-        spice_error("Invalid message type for this dispatcher: %u", type);
-        return 0;
+    if (SPICE_UNLIKELY(msg->size > dispatcher->priv->payload_size)) {
+        dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
+        dispatcher->priv->payload_size = msg->size;
     }
-    msg = &dispatcher->priv->messages[type];
+    payload = dispatcher->priv->payload;
     if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) {
         g_warning("error reading from dispatcher: %d", errno);
         /* TODO: close socketpair? */
         return 0;
     }
-    if (dispatcher->priv->any_handler) {
-        dispatcher->priv->any_handler(dispatcher->priv->opaque, type, payload);
+    if (dispatcher->priv->any_handler && msg->type != DISPATCHER_GENERIC_TYPE) {
+        dispatcher->priv->any_handler(dispatcher->priv->opaque, msg->type, payload);
     }
     if (msg->handler) {
         msg->handler(dispatcher->priv->opaque, payload);
     } else {
-        g_warning("error: no handler for message type %d", type);
+        g_warning("error: no handler for message type %d", msg->type);
     }
     if (msg->ack) {
         if (write_safe(dispatcher->priv->recv_fd,
                        (uint8_t*)&ack, sizeof(ack)) == -1) {
-            g_warning("error writing ack for message %d", type);
+            g_warning("error writing ack for message %d", msg->type);
             /* TODO: close socketpair? */
         }
     }
@@ -300,25 +302,22 @@ void dispatcher_handle_recv_read(Dispatcher *dispatcher)
     }
 }
 
-void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
-                             void *payload)
+static void
+dispatcher_send_message_internal(Dispatcher *dispatcher, const DispatcherMessage*msg,
+                                 void *payload)
 {
-    DispatcherMessage *msg;
     uint32_t ack;
     int send_fd = dispatcher->priv->send_fd;
 
-    assert(dispatcher->priv->max_message_type > message_type);
-    assert(dispatcher->priv->messages[message_type].handler);
-    msg = &dispatcher->priv->messages[message_type];
     pthread_mutex_lock(&dispatcher->priv->lock);
-    if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) {
+    if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
         g_warning("error: failed to send message type for message %d",
-                  message_type);
+                  msg->type);
         goto unlock;
     }
     if (write_safe(send_fd, payload, msg->size) == -1) {
         g_warning("error: failed to send message body for message %d",
-                  message_type);
+                  msg->type);
         goto unlock;
     }
     if (msg->ack) {
@@ -326,7 +325,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
             g_warning("error: failed to read ack");
         } else if (ack != ACK) {
             g_warning("error: got wrong ack value in dispatcher "
-                      "for message %d\n", message_type);
+                      "for message %d\n", msg->type);
             /* TODO handling error? */
         }
     }
@@ -334,6 +333,29 @@ unlock:
     pthread_mutex_unlock(&dispatcher->priv->lock);
 }
 
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+                             void *payload)
+{
+    DispatcherMessage *msg;
+
+    assert(dispatcher->priv->max_message_type > message_type);
+    assert(dispatcher->priv->messages[message_type].handler);
+    msg = &dispatcher->priv->messages[message_type];
+    dispatcher_send_message_internal(dispatcher, msg, payload);
+}
+
+void dispatcher_send_message_generic(Dispatcher *dispatcher, dispatcher_handle_message handler,
+                                     void *payload, uint32_t payload_size, bool ack)
+{
+    DispatcherMessage msg = {
+        .handler = handler,
+        .size = payload_size,
+        .type = DISPATCHER_GENERIC_TYPE,
+        .ack = ack,
+    };
+    dispatcher_send_message_internal(dispatcher, &msg, payload);
+}
+
 void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
                                  dispatcher_handle_message handler,
                                  size_t size, bool ack)
@@ -345,6 +367,7 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
     msg = &dispatcher->priv->messages[message_type];
     msg->handler = handler;
     msg->size = size;
+    msg->type = message_type;
     msg->ack = ack;
     if (msg->size > dispatcher->priv->payload_size) {
         dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
diff --git a/server/dispatcher.h b/server/dispatcher.h
index bb968e56..b8339c06 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -99,6 +99,21 @@ typedef void (*dispatcher_handle_any_message)(void *opaque,
 void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
                              void *payload);
 
+/* dispatcher_send_message_generic
+ *
+ * Sends a message to the receiving thread.
+ *
+ * If the sent message is a message type requires an ACK, this function will
+ * block until it receives an ACK from the receiving thread.
+ *
+ * @handler:      callback to handle message
+ * @payload:      payload
+ * @payload_size: size of payload
+ * @ack:          acknowledge required. Make message synchronous
+ */
+void dispatcher_send_message_generic(Dispatcher *dispatcher, dispatcher_handle_message handler,
+                                     void *payload, uint32_t payload_size, bool ack);
+
 /* dispatcher_register_handler
  *
  * This function registers a message type with the dispatcher, and registers
-- 
2.20.1



More information about the Spice-devel mailing list