[Spice-devel] [PATCH spice-server v2 01/12] dispatcher: Allows to manage messages without registering them
Frediano Ziglio
fziglio at redhat.com
Tue Mar 26 19:10:27 UTC 2019
The only way to add new message to Dispatcher was to register
using a number. These numbers corresponded to array indexes.
This is good if the list of messages is allocated statically
and contiguously, on the contrary this method is not that
flexible.
Writing a header of 4 or 16 bytes using system call does not
make much difference so pass all message information in the
payload header.
A new dispatcher_send_message_custom function allows to send
a message passing all message information, including the
pointer to the handler.
This will allow for instance a Dispatcher associate to a given
thread to be reused by different classes.
Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
---
server/dispatcher.c | 77 +++++++++++++++++++++++++++++++--------------
server/dispatcher.h | 15 +++++++++
2 files changed, 68 insertions(+), 24 deletions(-)
diff --git a/server/dispatcher.c b/server/dispatcher.c
index 5f839ec4..ed4037aa 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -38,10 +38,19 @@
static void setup_dummy_signal_handler(void);
#endif
+#define DISPATCHER_CUSTOM_TYPE 0x7fffffffu
+
+/* structure to store message header information.
+ * That structure is sent through a socketpair so it's optimized
+ * to be transfered via sockets.
+ * Is also packaged to not leave holes in both 32 and 64 environments
+ * so memory instrumentation tools should not find uninitialised bytes.
+ */
typedef struct DispatcherMessage {
- size_t size;
- bool ack;
dispatcher_handle_message handler;
+ uint32_t size;
+ uint32_t type:31;
+ uint32_t ack:1;
} DispatcherMessage;
struct DispatcherPrivate {
@@ -249,12 +258,11 @@ static int write_safe(int fd, uint8_t *buf, size_t size)
static int dispatcher_handle_single_read(Dispatcher *dispatcher)
{
int ret;
- uint32_t type;
- DispatcherMessage *msg = NULL;
- uint8_t *payload = dispatcher->priv->payload;
+ DispatcherMessage msg[1];
+ uint8_t *payload;
uint32_t ack = ACK;
- if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) {
+ if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg, sizeof(msg), 0)) == -1) {
g_warning("error reading from dispatcher: %d", errno);
return 0;
}
@@ -262,28 +270,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
/* no message */
return 0;
}
- if (type >= dispatcher->priv->max_message_type) {
- spice_error("Invalid message type for this dispatcher: %u", type);
- return 0;
+ if (G_UNLIKELY(msg->size > dispatcher->priv->payload_size)) {
+ dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
+ dispatcher->priv->payload_size = msg->size;
}
- msg = &dispatcher->priv->messages[type];
+ payload = dispatcher->priv->payload;
if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) {
g_warning("error reading from dispatcher: %d", errno);
/* TODO: close socketpair? */
return 0;
}
- if (dispatcher->priv->any_handler) {
- dispatcher->priv->any_handler(dispatcher->priv->opaque, type, payload);
+ if (dispatcher->priv->any_handler && msg->type != DISPATCHER_CUSTOM_TYPE) {
+ dispatcher->priv->any_handler(dispatcher->priv->opaque, msg->type, payload);
}
if (msg->handler) {
msg->handler(dispatcher->priv->opaque, payload);
} else {
- g_warning("error: no handler for message type %d", type);
+ g_warning("error: no handler for message type %d", msg->type);
}
if (msg->ack) {
if (write_safe(dispatcher->priv->recv_fd,
(uint8_t*)&ack, sizeof(ack)) == -1) {
- g_warning("error writing ack for message %d", type);
+ g_warning("error writing ack for message %d", msg->type);
/* TODO: close socketpair? */
}
}
@@ -300,25 +308,22 @@ void dispatcher_handle_recv_read(Dispatcher *dispatcher)
}
}
-void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
- void *payload)
+static void
+dispatcher_send_message_internal(Dispatcher *dispatcher, const DispatcherMessage*msg,
+ void *payload)
{
- DispatcherMessage *msg;
uint32_t ack;
int send_fd = dispatcher->priv->send_fd;
- assert(dispatcher->priv->max_message_type > message_type);
- assert(dispatcher->priv->messages[message_type].handler);
- msg = &dispatcher->priv->messages[message_type];
pthread_mutex_lock(&dispatcher->priv->lock);
- if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) {
+ if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
g_warning("error: failed to send message type for message %d",
- message_type);
+ msg->type);
goto unlock;
}
if (write_safe(send_fd, payload, msg->size) == -1) {
g_warning("error: failed to send message body for message %d",
- message_type);
+ msg->type);
goto unlock;
}
if (msg->ack) {
@@ -326,7 +331,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
g_warning("error: failed to read ack");
} else if (ack != ACK) {
g_warning("error: got wrong ack value in dispatcher "
- "for message %d\n", message_type);
+ "for message %d\n", msg->type);
/* TODO handling error? */
}
}
@@ -334,6 +339,29 @@ unlock:
pthread_mutex_unlock(&dispatcher->priv->lock);
}
+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
+ void *payload)
+{
+ DispatcherMessage *msg;
+
+ assert(dispatcher->priv->max_message_type > message_type);
+ assert(dispatcher->priv->messages[message_type].handler);
+ msg = &dispatcher->priv->messages[message_type];
+ dispatcher_send_message_internal(dispatcher, msg, payload);
+}
+
+void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_message handler,
+ void *payload, uint32_t payload_size, bool ack)
+{
+ DispatcherMessage msg = {
+ .handler = handler,
+ .size = payload_size,
+ .type = DISPATCHER_CUSTOM_TYPE,
+ .ack = ack,
+ };
+ dispatcher_send_message_internal(dispatcher, &msg, payload);
+}
+
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
dispatcher_handle_message handler,
size_t size, bool ack)
@@ -345,6 +373,7 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
msg = &dispatcher->priv->messages[message_type];
msg->handler = handler;
msg->size = size;
+ msg->type = message_type;
msg->ack = ack;
if (msg->size > dispatcher->priv->payload_size) {
dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
diff --git a/server/dispatcher.h b/server/dispatcher.h
index bb968e56..a7aaebed 100644
--- a/server/dispatcher.h
+++ b/server/dispatcher.h
@@ -99,6 +99,21 @@ typedef void (*dispatcher_handle_any_message)(void *opaque,
void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
void *payload);
+/* dispatcher_send_message_custom
+ *
+ * Sends a message to the receiving thread.
+ *
+ * If the sent message is a message type requires an ACK, this function will
+ * block until it receives an ACK from the receiving thread.
+ *
+ * @handler: callback to handle message
+ * @payload: payload
+ * @payload_size: size of payload
+ * @ack: acknowledge required. Make message synchronous
+ */
+void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_message handler,
+ void *payload, uint32_t payload_size, bool ack);
+
/* dispatcher_register_handler
*
* This function registers a message type with the dispatcher, and registers
--
2.20.1
More information about the Spice-devel
mailing list