[Spice-devel] [PATCH spice-server v2 01/12] dispatcher: Allows to manage messages without registering them
Frediano Ziglio
fziglio at redhat.com
Thu Mar 28 15:38:31 UTC 2019
> On Tue, 2019-03-26 at 19:10 +0000, Frediano Ziglio wrote:
> > The only way to add new message to Dispatcher was to register
> > using a number. These numbers corresponded to array indexes.
> > This is good if the list of messages is allocated statically
> > and contiguously, on the contrary this method is not that
> > flexible.
> > Writing a header of 4 or 16 bytes using system call does not
> > make much difference so pass all message information in the
> > payload header.
> > A new dispatcher_send_message_custom function allows to send
> > a message passing all message information, including the
> > pointer to the handler.
> > This will allow for instance a Dispatcher associate to a given
> > thread to be reused by different classes.
> >
> > Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> > ---
> > server/dispatcher.c | 77 +++++++++++++++++++++++++++++++----------
> > ----
> > server/dispatcher.h | 15 +++++++++
> > 2 files changed, 68 insertions(+), 24 deletions(-)
> >
> > diff --git a/server/dispatcher.c b/server/dispatcher.c
> > index 5f839ec4..ed4037aa 100644
> > --- a/server/dispatcher.c
> > +++ b/server/dispatcher.c
> > @@ -38,10 +38,19 @@
> > static void setup_dummy_signal_handler(void);
> > #endif
> >
> > +#define DISPATCHER_CUSTOM_TYPE 0x7fffffffu
>
> Can I suggest DISPATCHER_MESSAGE_TYPE_CUSTOM ?
>
Fine for me, I'll change
>
> > +
> > +/* structure to store message header information.
> > + * That structure is sent through a socketpair so it's optimized
> > + * to be transfered via sockets.
> > + * Is also packaged to not leave holes in both 32 and 64
> > environments
> > + * so memory instrumentation tools should not find uninitialised
> > bytes.
> > + */
> > typedef struct DispatcherMessage {
> > - size_t size;
> > - bool ack;
> > dispatcher_handle_message handler;
> > + uint32_t size;
> > + uint32_t type:31;
> > + uint32_t ack:1;
> > } DispatcherMessage;
> >
> > struct DispatcherPrivate {
> > @@ -249,12 +258,11 @@ static int write_safe(int fd, uint8_t *buf,
> > size_t size)
> > static int dispatcher_handle_single_read(Dispatcher *dispatcher)
> > {
> > int ret;
> > - uint32_t type;
> > - DispatcherMessage *msg = NULL;
> > - uint8_t *payload = dispatcher->priv->payload;
> > + DispatcherMessage msg[1];
> > + uint8_t *payload;
> > uint32_t ack = ACK;
> >
> > - if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type,
> > sizeof(type), 0)) == -1) {
> > + if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg,
> > sizeof(msg), 0)) == -1) {
> > g_warning("error reading from dispatcher: %d", errno);
> > return 0;
> > }
> > @@ -262,28 +270,28 @@ static int
> > dispatcher_handle_single_read(Dispatcher *dispatcher)
> > /* no message */
> > return 0;
> > }
> > - if (type >= dispatcher->priv->max_message_type) {
> > - spice_error("Invalid message type for this dispatcher: %u",
> > type);
> > - return 0;
> > + if (G_UNLIKELY(msg->size > dispatcher->priv->payload_size)) {
> > + dispatcher->priv->payload = g_realloc(dispatcher->priv-
> > >payload, msg->size);
> > + dispatcher->priv->payload_size = msg->size;
> > }
> > - msg = &dispatcher->priv->messages[type];
> > + payload = dispatcher->priv->payload;
> > if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1)
> > == -1) {
> > g_warning("error reading from dispatcher: %d", errno);
> > /* TODO: close socketpair? */
> > return 0;
> > }
> > - if (dispatcher->priv->any_handler) {
> > - dispatcher->priv->any_handler(dispatcher->priv->opaque,
> > type, payload);
> > + if (dispatcher->priv->any_handler && msg->type !=
> > DISPATCHER_CUSTOM_TYPE) {
> > + dispatcher->priv->any_handler(dispatcher->priv->opaque, msg-
> > >type, payload);
> > }
> > if (msg->handler) {
> > msg->handler(dispatcher->priv->opaque, payload);
> > } else {
> > - g_warning("error: no handler for message type %d", type);
> > + g_warning("error: no handler for message type %d", msg-
> > >type);
> > }
> > if (msg->ack) {
> > if (write_safe(dispatcher->priv->recv_fd,
> > (uint8_t*)&ack, sizeof(ack)) == -1) {
> > - g_warning("error writing ack for message %d", type);
> > + g_warning("error writing ack for message %d", msg-
> > >type);
> > /* TODO: close socketpair? */
> > }
> > }
> > @@ -300,25 +308,22 @@ void dispatcher_handle_recv_read(Dispatcher
> > *dispatcher)
> > }
> > }
> >
> > -void dispatcher_send_message(Dispatcher *dispatcher, uint32_t
> > message_type,
> > - void *payload)
> > +static void
> > +dispatcher_send_message_internal(Dispatcher *dispatcher, const
> > DispatcherMessage*msg,
> > + void *payload)
> > {
> > - DispatcherMessage *msg;
> > uint32_t ack;
> > int send_fd = dispatcher->priv->send_fd;
> >
> > - assert(dispatcher->priv->max_message_type > message_type);
> > - assert(dispatcher->priv->messages[message_type].handler);
>
> You've removed a bunch of asserts here, but do we want to assert (or at
> least g_return_if_fail) if msg is NULL?
>
They are not removed, just moved in dispatcher_send_message
(this function is now dispatcher_send_message_internal without
message_type).
> > - msg = &dispatcher->priv->messages[message_type];
> > pthread_mutex_lock(&dispatcher->priv->lock);
> > - if (write_safe(send_fd, (uint8_t*)&message_type,
> > sizeof(message_type)) == -1) {
> > + if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
> > g_warning("error: failed to send message type for message
> > %d",
>
> Perhaps this warning message should be changed from "failed to send
> message type" to "failed to send message header"?
>
I assume "error: failed to send message header for message %d",
I'll change
> > - message_type);
> > + msg->type);
> > goto unlock;
> > }
> > if (write_safe(send_fd, payload, msg->size) == -1) {
> > g_warning("error: failed to send message body for message
> > %d",
> > - message_type);
> > + msg->type);
> > goto unlock;
> > }
> > if (msg->ack) {
> > @@ -326,7 +331,7 @@ void dispatcher_send_message(Dispatcher
> > *dispatcher, uint32_t message_type,
> > g_warning("error: failed to read ack");
> > } else if (ack != ACK) {
> > g_warning("error: got wrong ack value in dispatcher "
> > - "for message %d\n", message_type);
> > + "for message %d\n", msg->type);
> > /* TODO handling error? */
> > }
> > }
> > @@ -334,6 +339,29 @@ unlock:
> > pthread_mutex_unlock(&dispatcher->priv->lock);
> > }
> >
> > +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t
> > message_type,
> > + void *payload)
> > +{
> > + DispatcherMessage *msg;
> > +
> > + assert(dispatcher->priv->max_message_type > message_type);
> > + assert(dispatcher->priv->messages[message_type].handler);
> > + msg = &dispatcher->priv->messages[message_type];
> > + dispatcher_send_message_internal(dispatcher, msg, payload);
> > +}
> > +
> > +void dispatcher_send_message_custom(Dispatcher *dispatcher,
> > dispatcher_handle_message handler,
> > + void *payload, uint32_t
> > payload_size, bool ack)
> > +{
> > + DispatcherMessage msg = {
> > + .handler = handler,
> > + .size = payload_size,
> > + .type = DISPATCHER_CUSTOM_TYPE,
> > + .ack = ack,
> > + };
> > + dispatcher_send_message_internal(dispatcher, &msg, payload);
> > +}
> > +
> > void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t
> > message_type,
> > dispatcher_handle_message handler,
> > size_t size, bool ack)
> > @@ -345,6 +373,7 @@ void dispatcher_register_handler(Dispatcher
> > *dispatcher, uint32_t message_type,
> > msg = &dispatcher->priv->messages[message_type];
> > msg->handler = handler;
> > msg->size = size;
> > + msg->type = message_type;
> > msg->ack = ack;
> > if (msg->size > dispatcher->priv->payload_size) {
> > dispatcher->priv->payload = g_realloc(dispatcher->priv-
> > >payload, msg->size);
> > diff --git a/server/dispatcher.h b/server/dispatcher.h
> > index bb968e56..a7aaebed 100644
> > --- a/server/dispatcher.h
> > +++ b/server/dispatcher.h
> > @@ -99,6 +99,21 @@ typedef void (*dispatcher_handle_any_message)(void
> > *opaque,
> > void dispatcher_send_message(Dispatcher *dispatcher, uint32_t
> > message_type,
> > void *payload);
> >
> > +/* dispatcher_send_message_custom
> > + *
> > + * Sends a message to the receiving thread.
> > + *
> > + * If the sent message is a message type requires an ACK, this
> > function will
> > + * block until it receives an ACK from the receiving thread.
>
> The comment above is a little bit confusing because in this case the
> message type doesn't determine whether an ACK is required. The @ack
> argument determines whether an ACK is required. Maybe remove the phrase
> "is a message type ". So it becomes:
>
> "If the sent message requires an ACK, this function..."
>
I'll do.
> > + *
> > + * @handler: callback to handle message
> > + * @payload: payload
> > + * @payload_size: size of payload
> > + * @ack: acknowledge required. Make message synchronous
> > + */
> > +void dispatcher_send_message_custom(Dispatcher *dispatcher,
> > dispatcher_handle_message handler,
> > + void *payload, uint32_t
> > payload_size, bool ack);
> > +
> > /* dispatcher_register_handler
> > *
> > * This function registers a message type with the dispatcher, and
> > registers
>
>
> with minor comments above:
>
> Acked-by: Jonathon Jongsma <jjongsma at redhat.com>
>
>
More information about the Spice-devel
mailing list