[Spice-devel] [PATCH spice-server 02/10] dispatcher: Allows to manage message without registering them
Frediano Ziglio
fziglio at redhat.com
Wed Mar 20 09:59:11 UTC 2019
Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
---
server/dispatcher.c | 71 ++++++++++++++++++++++++++++++---------------
server/dispatcher.h | 15 ++++++++++
2 files changed, 62 insertions(+), 24 deletions(-)
diff --git a/server/dispatcher.c b/server/dispatcher.c
index 5f839ec4..0b18b32d 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -38,10 +38,13 @@
static void setup_dummy_signal_handler(void);
#endif
+#define DISPATCHER_GENERIC_TYPE 0x7fffffffu
+
typedef struct DispatcherMessage {
- size_t size;
- bool ack;
dispatcher_handle_message handler;
+ uint32_t size;
+ uint32_t type:31;
+ uint32_t ack:1;
} DispatcherMessage;
struct DispatcherPrivate {
@@ -249,12 +252,11 @@ static int write_safe(int fd, uint8_t *buf, size_t size)
static int dispatcher_handle_single_read(Dispatcher *dispatcher)
{
int ret;
- uint32_t type;
- DispatcherMessage *msg = NULL;
- uint8_t *payload = dispatcher->priv->payload;
+ DispatcherMessage msg[1];
+ uint8_t *payload;
uint32_t ack = ACK;
- if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) {
+ if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg, sizeof(msg), 0)) == -1) {
g_warning("error reading from dispatcher: %d", errno);
return 0;
}
@@ -262,28 +264,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
/* no message */
return 0;
}
- if (type >= dispatcher->priv->max_message_type) {
- spice_error("Invalid message type for this dispatcher: %u", type);
- return 0;
+ if (SPICE_UNLIKELY(msg->size > dispatcher->priv->payload_size)) {
+ dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
+ dispatcher->priv->payload_size = msg->size;
}
- msg = &dispatcher->priv->messages[type];
+ payload = dispatcher->priv->payload;
if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) {
g_warning("error reading from dispatcher: %d", errno);
/* TODO: close socketpair? */
return 0;
}
- if (dispatcher->priv->any_handler) {
- dispatcher->priv->any_handler(dispatcher->priv->opaque, type, payload);
+ if (dispatcher->priv->any_handler && msg->type != DISPATCHER_GENERIC_TYPE) {
+ dispatcher->priv->any_handler(dispatcher->priv->opaque, msg->type, payload);
}
if (msg->handler) {
msg->handler(dispatcher->priv->opaque, payload);
} else {
- g_warning("error: no handler for message type %d", type);
+ g_warning("error: no handler for message type %d", msg->type);
}
if (msg->ack) {
if (write_safe(dispatcher->priv->recv_fd,
(uint8_t*)&ack, sizeof(ack)) == -1) {
- g_warning("error writing ack for message %d", type);
+ g_warning("error writing ack for message %d", msg->type);
/* TODO: close socketpair? */
}
}
@@ -300,25 +302,22 @@ void dispatcher_handle_recv_read(Dispatcher *dispatcher)
}
}
-void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
- void *payload)
+static void
+dispatcher_send_message_internal(Dispatcher *dispatcher, const DispatcherMessage*msg,
+ void *payload)
{
- DispatcherMessage *msg;
uint32_t ack;
int send_fd = dispatcher->priv->send_fd;
- assert(dispatcher->priv->max_message_type > message_type);
- assert(dispatcher->priv->messages[message_type].handler);
- msg = &dispatcher->priv->messages[message_type];
pthread_mutex_lock(&dispatcher->priv->lock);
- if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) {
+ if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
g_warning("error: failed to send message type for message %d",
- message_type);
+ msg->type);
goto unlock;
}
if (write_safe(send_fd, payload, msg->size) == -1) {
g_warning("error: failed to send message body for message %d",
- message_type);
+ msg->type);
goto unlock;
}
if (msg->ack) {
@@ -326,7 +325,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
g_warning("error: failed to read ack");
} else if (ack != ACK) {
g_warning("error: got wrong ack value in dispatcher "
- "for message %d\n", message_type);
+ "for message %d\n", msg->type);
/* TODO handling error? */
}
}
@@ -334,6 +333,29 @@ unlock:
pthread_mutex_unlock(&dispatcher->priv->lock);
}
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+ void *payload)
+{
+ DispatcherMessage *msg;
+
+ assert(dispatcher->priv->max_message_type > message_type);
+ assert(dispatcher->priv->messages[message_type].handler);
+ msg = &dispatcher->priv->messages[message_type];
+ dispatcher_send_message_internal(dispatcher, msg, payload);
+}
+
+void dispatcher_send_message_generic(Dispatcher *dispatcher, dispatcher_handle_message handler,
+ void *payload, uint32_t payload_size, bool ack)
+{
+ DispatcherMessage msg = {
+ .handler = handler,
+ .size = payload_size,
+ .type = DISPATCHER_GENERIC_TYPE,
+ .ack = ack,
+ };
+ dispatcher_send_message_internal(dispatcher, &msg, payload);
+}
+
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
dispatcher_handle_message handler,
size_t size, bool ack)
@@ -345,6 +367,7 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
msg = &dispatcher->priv->messages[message_type];
msg->handler = handler;
msg->size = size;
+ msg->type = message_type;
msg->ack = ack;
if (msg->size > dispatcher->priv->payload_size) {
dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
diff --git a/server/dispatcher.h b/server/dispatcher.h
index bb968e56..b8339c06 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -99,6 +99,21 @@ typedef void (*dispatcher_handle_any_message)(void *opaque,
void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
void *payload);
+/* dispatcher_send_message_generic
+ *
+ * Sends a message to the receiving thread.
+ *
+ * If the sent message is a message type requires an ACK, this function will
+ * block until it receives an ACK from the receiving thread.
+ *
+ * @handler: callback to handle message
+ * @payload: payload
+ * @payload_size: size of payload
+ * @ack: acknowledge required. Make message synchronous
+ */
+void dispatcher_send_message_generic(Dispatcher *dispatcher, dispatcher_handle_message handler,
+ void *payload, uint32_t payload_size, bool ack);
+
/* dispatcher_register_handler
*
* This function registers a message type with the dispatcher, and registers
--
2.20.1
More information about the Spice-devel
mailing list