[Spice-devel] [PATCH spice-server v2 01/12] dispatcher: Allows to manage messages without registering them

Jonathon Jongsma jjongsma at redhat.com
Thu Mar 28 15:23:35 UTC 2019


On Tue, 2019-03-26 at 19:10 +0000, Frediano Ziglio wrote:
> The only way to add new message to Dispatcher was to register
> using a number. These numbers corresponded to array indexes.
> This is good if the list of messages is allocated statically
> and contiguously, on the contrary this method is not that
> flexible.
> Writing a header of 4 or 16 bytes using system call does not
> make much difference so pass all message information in the
> payload header.
> A new dispatcher_send_message_custom function allows to send
> a message passing all message information, including the
> pointer to the handler.
> This will allow for instance a Dispatcher associate to a given
> thread to be reused by different classes.
> 
> Signed-off-by: Frediano Ziglio <fziglio at redhat.com>
> ---
>  server/dispatcher.c | 77 +++++++++++++++++++++++++++++++----------
> ----
>  server/dispatcher.h | 15 +++++++++
>  2 files changed, 68 insertions(+), 24 deletions(-)
> 
> diff --git a/server/dispatcher.c b/server/dispatcher.c
> index 5f839ec4..ed4037aa 100644
> --- a/server/dispatcher.c
> +++ b/server/dispatcher.c
> @@ -38,10 +38,19 @@
>  static void setup_dummy_signal_handler(void);
>  #endif
>  
> +#define DISPATCHER_CUSTOM_TYPE 0x7fffffffu

Can I suggest DISPATCHER_MESSAGE_TYPE_CUSTOM ?


> +
> +/* structure to store message header information.
> + * That structure is sent through a socketpair so it's optimized
> + * to be transfered via sockets.
> + * Is also packaged to not leave holes in both 32 and 64
> environments
> + * so memory instrumentation tools should not find uninitialised
> bytes.
> + */
>  typedef struct DispatcherMessage {
> -    size_t size;
> -    bool ack;
>      dispatcher_handle_message handler;
> +    uint32_t size;
> +    uint32_t type:31;
> +    uint32_t ack:1;
>  } DispatcherMessage;
>  
>  struct DispatcherPrivate {
> @@ -249,12 +258,11 @@ static int write_safe(int fd, uint8_t *buf,
> size_t size)
>  static int dispatcher_handle_single_read(Dispatcher *dispatcher)
>  {
>      int ret;
> -    uint32_t type;
> -    DispatcherMessage *msg = NULL;
> -    uint8_t *payload = dispatcher->priv->payload;
> +    DispatcherMessage msg[1];
> +    uint8_t *payload;
>      uint32_t ack = ACK;
>  
> -    if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type,
> sizeof(type), 0)) == -1) {
> +    if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg,
> sizeof(msg), 0)) == -1) {
>          g_warning("error reading from dispatcher: %d", errno);
>          return 0;
>      }
> @@ -262,28 +270,28 @@ static int
> dispatcher_handle_single_read(Dispatcher *dispatcher)
>          /* no message */
>          return 0;
>      }
> -    if (type >= dispatcher->priv->max_message_type) {
> -        spice_error("Invalid message type for this dispatcher: %u",
> type);
> -        return 0;
> +    if (G_UNLIKELY(msg->size > dispatcher->priv->payload_size)) {
> +        dispatcher->priv->payload = g_realloc(dispatcher->priv-
> >payload, msg->size);
> +        dispatcher->priv->payload_size = msg->size;
>      }
> -    msg = &dispatcher->priv->messages[type];
> +    payload = dispatcher->priv->payload;
>      if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1)
> == -1) {
>          g_warning("error reading from dispatcher: %d", errno);
>          /* TODO: close socketpair? */
>          return 0;
>      }
> -    if (dispatcher->priv->any_handler) {
> -        dispatcher->priv->any_handler(dispatcher->priv->opaque,
> type, payload);
> +    if (dispatcher->priv->any_handler && msg->type !=
> DISPATCHER_CUSTOM_TYPE) {
> +        dispatcher->priv->any_handler(dispatcher->priv->opaque, msg-
> >type, payload);
>      }
>      if (msg->handler) {
>          msg->handler(dispatcher->priv->opaque, payload);
>      } else {
> -        g_warning("error: no handler for message type %d", type);
> +        g_warning("error: no handler for message type %d", msg-
> >type);
>      }
>      if (msg->ack) {
>          if (write_safe(dispatcher->priv->recv_fd,
>                         (uint8_t*)&ack, sizeof(ack)) == -1) {
> -            g_warning("error writing ack for message %d", type);
> +            g_warning("error writing ack for message %d", msg-
> >type);
>              /* TODO: close socketpair? */
>          }
>      }
> @@ -300,25 +308,22 @@ void dispatcher_handle_recv_read(Dispatcher
> *dispatcher)
>      }
>  }
>  
> -void dispatcher_send_message(Dispatcher *dispatcher, uint32_t
> message_type,
> -                             void *payload)
> +static void
> +dispatcher_send_message_internal(Dispatcher *dispatcher, const
> DispatcherMessage*msg,
> +                                 void *payload)
>  {
> -    DispatcherMessage *msg;
>      uint32_t ack;
>      int send_fd = dispatcher->priv->send_fd;
>  
> -    assert(dispatcher->priv->max_message_type > message_type);
> -    assert(dispatcher->priv->messages[message_type].handler);

You've removed a bunch of asserts here, but do we want to assert (or at
least g_return_if_fail) if msg is NULL?

> -    msg = &dispatcher->priv->messages[message_type];
>      pthread_mutex_lock(&dispatcher->priv->lock);
> -    if (write_safe(send_fd, (uint8_t*)&message_type,
> sizeof(message_type)) == -1) {
> +    if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
>          g_warning("error: failed to send message type for message
> %d",

Perhaps this warning message should be changed from "failed to send
message type" to "failed to send message header"?

> -                  message_type);
> +                  msg->type);
>          goto unlock;
>      }
>      if (write_safe(send_fd, payload, msg->size) == -1) {
>          g_warning("error: failed to send message body for message
> %d",
> -                  message_type);
> +                  msg->type);
>          goto unlock;
>      }
>      if (msg->ack) {
> @@ -326,7 +331,7 @@ void dispatcher_send_message(Dispatcher
> *dispatcher, uint32_t message_type,
>              g_warning("error: failed to read ack");
>          } else if (ack != ACK) {
>              g_warning("error: got wrong ack value in dispatcher "
> -                      "for message %d\n", message_type);
> +                      "for message %d\n", msg->type);
>              /* TODO handling error? */
>          }
>      }
> @@ -334,6 +339,29 @@ unlock:
>      pthread_mutex_unlock(&dispatcher->priv->lock);
>  }
>  
> +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t
> message_type,
> +                             void *payload)
> +{
> +    DispatcherMessage *msg;
> +
> +    assert(dispatcher->priv->max_message_type > message_type);
> +    assert(dispatcher->priv->messages[message_type].handler);
> +    msg = &dispatcher->priv->messages[message_type];
> +    dispatcher_send_message_internal(dispatcher, msg, payload);
> +}
> +
> +void dispatcher_send_message_custom(Dispatcher *dispatcher,
> dispatcher_handle_message handler,
> +                                    void *payload, uint32_t
> payload_size, bool ack)
> +{
> +    DispatcherMessage msg = {
> +        .handler = handler,
> +        .size = payload_size,
> +        .type = DISPATCHER_CUSTOM_TYPE,
> +        .ack = ack,
> +    };
> +    dispatcher_send_message_internal(dispatcher, &msg, payload);
> +}
> +
>  void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t
> message_type,
>                                   dispatcher_handle_message handler,
>                                   size_t size, bool ack)
> @@ -345,6 +373,7 @@ void dispatcher_register_handler(Dispatcher
> *dispatcher, uint32_t message_type,
>      msg = &dispatcher->priv->messages[message_type];
>      msg->handler = handler;
>      msg->size = size;
> +    msg->type = message_type;
>      msg->ack = ack;
>      if (msg->size > dispatcher->priv->payload_size) {
>          dispatcher->priv->payload = g_realloc(dispatcher->priv-
> >payload, msg->size);
> diff --git a/server/dispatcher.h b/server/dispatcher.h
> index bb968e56..a7aaebed 100644
> --- a/server/dispatcher.h
> +++ b/server/dispatcher.h
> @@ -99,6 +99,21 @@ typedef void (*dispatcher_handle_any_message)(void
> *opaque,
>  void dispatcher_send_message(Dispatcher *dispatcher, uint32_t
> message_type,
>                               void *payload);
>  
> +/* dispatcher_send_message_custom
> + *
> + * Sends a message to the receiving thread.
> + *
> + * If the sent message is a message type requires an ACK, this 
> function will
> + * block until it receives an ACK from the receiving thread.

The comment above is a little bit confusing because in this case the
message type doesn't determine whether an ACK is required. The @ack
argument determines whether an ACK is required. Maybe remove the phrase
"is a message type ". So it becomes:

"If the sent message requires an ACK, this function..."

> + *
> + * @handler:      callback to handle message
> + * @payload:      payload
> + * @payload_size: size of payload
> + * @ack:          acknowledge required. Make message synchronous
> + */
> +void dispatcher_send_message_custom(Dispatcher *dispatcher,
> dispatcher_handle_message handler,
> +                                    void *payload, uint32_t
> payload_size, bool ack);
> +
>  /* dispatcher_register_handler
>   *
>   * This function registers a message type with the dispatcher, and
> registers


with minor comments above:

Acked-by: Jonathon Jongsma <jjongsma at redhat.com>



More information about the Spice-devel mailing list