[Spice-devel] [RFC 2/3] server: introduce dispatcher

Yonit Halperin yhalperi at redhat.com
Wed Nov 2 07:24:05 PDT 2011


On 11/01/2011 10:10 AM, Alon Levy wrote:
> used for main_dispatcher only in this patch.
>
> Dispatcher is meant to be used for Main<->any low frequency messages.
>
> It's interface is meant to include the red_dispatcher usage:
>   fixed size messages per message type
>   some messages require an ack
>
> Some methods are added to be used by RedDispatcher later:
>   dispatcher_handle_read - to be called directly by RedDispatcher epoll
>    based loop
>   dispatcher_set_opaque - to be set from red_worker pthread
>   dispatcher_init - allow NULL core as used by red_worker
Can you add comment that by default a dispatcher is watched by the "main 
thread" and that if the user don't want this to happen, core must be 
NULL? Maybe it will be safer to just make it the dispatcher user 
responsibility and not start the watch in dispatcher_init.
> ---
>   server/Makefile.am       |    2 +
>   server/dispatcher.c      |  162 ++++++++++++++++++++++++++++++++++++++++++++++
>   server/dispatcher.h      |   78 ++++++++++++++++++++++
>   server/main_dispatcher.c |  105 +++++++-----------------------
>   4 files changed, 265 insertions(+), 82 deletions(-)
>   create mode 100644 server/dispatcher.c
>   create mode 100644 server/dispatcher.h
>
> diff --git a/server/Makefile.am b/server/Makefile.am
> index 418d707..34a6b47 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -78,6 +78,8 @@ libspice_server_la_SOURCES =			\
>   	red_client_cache.h			\
>   	red_client_shared_cache.h		\
>   	red_common.h				\
> +	dispatcher.c				\
> +	dispatcher.h				\
>   	red_dispatcher.c			\
>   	red_dispatcher.h			\
>   	main_dispatcher.c			\
> diff --git a/server/dispatcher.c b/server/dispatcher.c
> new file mode 100644
> index 0000000..ca111b4
> --- /dev/null
> +++ b/server/dispatcher.c
> @@ -0,0 +1,162 @@
> +#include<unistd.h>
> +#include<errno.h>
> +#include<assert.h>
> +#include<string.h>
> +#include<pthread.h>
> +
> +#include "mem.h"
> +#include "spice_common.h"
> +#include "dispatcher.h"
> +
> +#define ACK 0xffffffff
> +
> +/*
> + * read_with_eintr
> + * helper. reads until size bytes accumulated in buf, if an error other then
> + * EINTR is encountered returns -1, otherwise returns 0.
> + */
static
> +int read_with_eintr(int fd, void *buf, size_t size)
> +{
> +    int read_size = 0;
> +    int ret;
> +
> +    while (read_size<  size) {
> +        ret = read(fd, buf + read_size, size - read_size);
> +        if (ret == -1) {
> +            if (errno != EINTR) {
> +                return -1;
> +            }
> +            continue;
> +        }
> +        read_size += size;
should be += ret
> +    }
> +    return 0;
> +}
> +
static
> +int write_with_eintr(int fd, void *buf, size_t size)
> +{
> +    /* according to the man page EINTR will mean no data written, so
> +     * no need to take account of short writes. */
this is when -1 is returned and not if (write(fd, buf, size) > 0) and 
!size. Then, you should still track short writes.
> +    while (write(fd, buf, size) != size) {
> +        if (errno != EINTR) {
> +            return -1;
> +        }
> +    }
> +    return 0;
> +}
> +/*
> + * dispatcher_handle_read
> + * doesn't handle being in the middle of a message. all reads are blocking.
> + */
> +void dispatcher_handle_read(int fd, int event, void *opaque)
> +{
> +    Dispatcher *dispatcher = opaque;
> +    uint32_t type;
> +    DispatcherMessage *msg = NULL;
> +    uint8_t *payload = dispatcher->payload;
> +    uint32_t ack = ACK;
> +
> +    if (read_with_eintr(dispatcher->recv_fd,&type, sizeof(type)) == -1) {
> +        red_printf("error reading from dispatcher: %d\n", errno);
> +        return;
> +    }
> +    assert(type<  dispatcher->max_message_type);
> +    msg =&dispatcher->messages[type];
> +    if (read_with_eintr(dispatcher->recv_fd, payload, msg->size) == -1) {
> +        red_printf("error reading from dispatcher: %d\n", errno);
> +        /* TODO: close socketpair? */
> +        return;
> +    }
> +    if (msg->handler) {
> +        msg->handler(dispatcher->opaque, (void *)payload);
> +    } else {
> +        red_printf("error: no handler for message type %d\n", type);
> +    }
> +    if (msg->send_ack&&  write_with_eintr(dispatcher->recv_fd,
> +&ack, sizeof(ack)) == -1) {
> +        red_printf("error writing ack for message %d\n", type);
> +        /* TODO: close socketpair? */
> +    }
> +}
> +
Looks like a problem (we already had), that we read only one message 
when we are waked up, where it is possible that there will be several 
messages in the pipe. What if several display channels disconnected and 
sent channel_event through the main_dispatcher? Another example is any 
message written to the red_dispatcher without waiting for Ack.
(already talked with Alon - the fd will be non blocking)
> +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> +                             void *payload)
> +{
> +    DispatcherMessage *msg;
> +    uint32_t ack;
> +    int send_fd = dispatcher->send_fd;
> +
> +    assert(dispatcher->max_message_type>  message_type);
> +    assert(dispatcher->messages[message_type].handler);
> +    msg =&dispatcher->messages[message_type];
> +    pthread_mutex_lock(&dispatcher->lock);
> +    if (write_with_eintr(send_fd,&message_type, sizeof(message_type)) == -1) {
> +        red_printf("error: failed to send message type for message %d\n",
> +                   message_type);
> +        goto unlock;
> +    }
> +    if (write_with_eintr(send_fd, payload, msg->size) == -1) {
> +        red_printf("error: failed to send message body for message %d\n",
> +                   message_type);
> +        goto unlock;
> +    }
> +    if (msg->send_ack) {
> +        if (read_with_eintr(send_fd,&ack, sizeof(ack)) == -1) {
> +        }
> +        if (ack != ACK) {
> +            red_printf("error: got wrong ack value in dispatcher "
> +                       "for message %d\n", message_type);
> +            /* TODO handling error? */
> +        }
> +    }
> +unlock:
> +    pthread_mutex_unlock(&dispatcher->lock);
> +}
> +
> +void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> +                                 dispatcher_handle_message handler, size_t size,
> +                                 int send_ack)
> +{
> +    DispatcherMessage *msg;
> +
> +    assert(message_type<  dispatcher->max_message_type);
> +    assert(dispatcher->messages[message_type].handler == 0);
> +    msg =&dispatcher->messages[message_type];
> +    msg->handler = handler;
> +    msg->size = size;
> +    msg->send_ack = send_ack;
> +    if (msg->size>  dispatcher->payload_size) {
> +        dispatcher->payload = realloc(dispatcher->payload, msg->size);
> +        dispatcher->payload_size = msg->size;
> +    }
> +}
> +
> +void dispatcher_init(Dispatcher *dispatcher, SpiceCoreInterface *recv_core,
> +                     size_t max_message_type, void *opaque)
> +{
> +    int channels[2];
> +
> +    dispatcher->recv_core = recv_core;
> +    dispatcher->opaque = opaque;
> +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> +        red_error("socketpair failed %s", strerror(errno));
> +        return;
> +    }
> +    pthread_mutex_init(&dispatcher->lock, NULL);
> +    dispatcher->recv_fd = channels[0];
> +    dispatcher->send_fd = channels[1];
> +    dispatcher->self = pthread_self();
> +
> +    dispatcher->messages = spice_malloc0_n(max_message_type,
> +                                           sizeof(dispatcher->messages[0]));
> +    dispatcher->max_message_type = max_message_type;
> +    if (recv_core) {
> +        recv_core->watch_add(dispatcher->recv_fd, SPICE_WATCH_EVENT_READ,
> +                             dispatcher_handle_read, dispatcher);
> +    }
> +}
> +
> +void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
> +{
> +    dispatcher->opaque = opaque;
> +}
> diff --git a/server/dispatcher.h b/server/dispatcher.h
> new file mode 100644
> index 0000000..9212ca6
> --- /dev/null
> +++ b/server/dispatcher.h
> @@ -0,0 +1,78 @@
> +#ifndef MAIN_DISPATCHER_H
> +#define MAIN_DISPATCHER_H
> +
> +#include<spice.h>
> +
> +typedef struct Dispatcher Dispatcher;
> +
> +typedef void (*dispatcher_handle_message)(void *opaque,
> +                                          void *payload);
> +
> +typedef struct DispatcherMessage {
> +    size_t size;
> +    int send_ack;
a bit confusing. s/send_ack/wait_ack ?
> +    dispatcher_handle_message handler;
> +} DispatcherMessage;
> +
> +struct Dispatcher {
> +    SpiceCoreInterface *recv_core;
> +    int recv_fd;
> +    int send_fd;
> +    pthread_t self;
> +    pthread_mutex_t lock;
> +    DispatcherMessage *messages;
> +    int stage;  /* message parser stage - sender has no stages */
> +    size_t max_message_type;
> +    void *payload; /* allocated as max of message sizes */
> +    size_t payload_size; /* used to track realloc calls */
> +    void *opaque;
> +};
> +
> +/*
> + * dispatcher_send_message
> + * @message_type: message type
> + * @payload:      payload
> + */
> +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> +                             void *payload);
> +
> +/*
> + * dispatcher_init
> + * @recv_core: core interface instance used by receiver
> + * @max_message_type: number of message types. Allows upfront allocation
> + *  of a DispatcherMessage list.
> + * up front, and registration in any order wanted.
> + */
> +void dispatcher_init(Dispatcher *dispatcher, SpiceCoreInterface *recv_core,
> +                     size_t max_message_type, void *opaque);
> +
> +/*
> + * dispatcher_register_handler
> + * @dispatcher:           dispatcher
> + * @messsage_type:  message type
> + * @handler:        message handler
> + * @size:           message size. Each type has a fixed associated size.
> + * @send_ack:       send an ack. This is per message type - you can't send the same
> + *                  message type with and without. Register two different messages
> + *                  if that is what you want.
> + */
> +void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> +                                 dispatcher_handle_message handler, size_t size,
> +                                 int send_ack);
> +
> +/*
> + *  dispatcher_handle_read
> + *  @fd: fd triggered. Value is ignored
> + *  @event: event (read/write). Value is ignored
> + *  @opaque: must be Dispatcher instance
> + */
> +void dispatcher_handle_read(int fd, int event, void *opaque);
> +
> +/*
> + * dispatcher_set_opaque
> + * @dispatcher: Dispatcher instance
> + * @opaque: opaque to use for callbacks
> + */
> +void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
> +
> +#endif //MAIN_DISPATCHER_H
> diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> index 73856bf..5cddab7 100644
> --- a/server/main_dispatcher.c
> +++ b/server/main_dispatcher.c
> @@ -5,6 +5,7 @@
>   #include<assert.h>
>
>   #include "red_common.h"
> +#include "dispatcher.h"
>   #include "main_dispatcher.h"
>
>   /*
> @@ -28,11 +29,7 @@
>    */
>
>   typedef struct {
> -    SpiceCoreInterface *core;
> -    int main_fd;
> -    int other_fd;
> -    pthread_t self;
> -    pthread_mutex_t lock;
> +    Dispatcher base;
>   } MainDispatcher;
>
>   MainDispatcher main_dispatcher;
> @@ -43,103 +40,47 @@ enum {
>       MAIN_DISPATCHER_NUM_MESSAGES
>   };
>
> -typedef struct MainDispatcherMessage {
> -    uint32_t type;
> -    union {
> -        struct {
> -            int event;
> -            SpiceChannelEventInfo *info;
> -        } channel_event;
> -    } data;
> -} MainDispatcherMessage;
> +typedef struct MainDispatcherChannelEventMessage {
> +    int event;
> +    SpiceChannelEventInfo *info;
> +} MainDispatcherChannelEventMessage;
>
>   /* channel_event - calls core->channel_event, must be done in main thread */
>   static void main_dispatcher_self_handle_channel_event(
>                                                   int event,
>                                                   SpiceChannelEventInfo *info)
>   {
> -    main_dispatcher.core->channel_event(event, info);
> +    main_dispatcher.base.recv_core->channel_event(event, info);
>   }
>
> -static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> +static void main_dispatcher_handle_channel_event(void *opaque,
> +                                                 void *payload)
>   {
> -    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> -                                              msg->data.channel_event.info);
> +    MainDispatcherChannelEventMessage *channel_event = payload;
> +
> +    main_dispatcher_self_handle_channel_event(channel_event->event,
> +                                              channel_event->info);
>   }
>
>   void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
>   {
> -    MainDispatcherMessage msg;
> -    ssize_t written = 0;
> -    ssize_t ret;
> +    MainDispatcherChannelEventMessage msg;
>
> -    if (pthread_self() == main_dispatcher.self) {
> +    if (pthread_self() == main_dispatcher.base.self) {
>           main_dispatcher_self_handle_channel_event(event, info);
>           return;
>       }
> -    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> -    msg.data.channel_event.event = event;
> -    msg.data.channel_event.info = info;
> -    pthread_mutex_lock(&main_dispatcher.lock);
> -    while (written<  sizeof(msg)) {
> -        ret = write(main_dispatcher.other_fd,&msg + written,
> -                    sizeof(msg) - written);
> -        if (ret == -1) {
> -            assert(errno == EINTR);
> -            continue;
> -        }
> -        written += ret;
> -    }
> -    pthread_mutex_unlock(&main_dispatcher.lock);
> -}
> -
> -
> -static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> -{
> -    int ret;
> -    MainDispatcher *md = opaque;
> -    MainDispatcherMessage msg;
> -    int read_size = 0;
> -
> -    while (read_size<  sizeof(msg)) {
> -        /* blocks until sizeof(msg) is read */
> -        ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size);
> -        if (ret == -1) {
> -            if (errno != EINTR) {
> -                red_printf("error reading from main dispatcher: %d\n", errno);
> -                /* TODO: close channel? */
> -                return;
> -            }
> -            continue;
> -        }
> -        read_size += ret;
> -    }
> -    switch (msg.type) {
> -        case MAIN_DISPATCHER_CHANNEL_EVENT:
> -            main_dispatcher_handle_channel_event(&msg);
> -            break;
> -        default:
> -            red_printf("error: unhandled main dispatcher message type %d\n",
> -                       msg.type);
> -    }
> +    msg.event = event;
> +    msg.info = info;
> +    dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> +&msg);
>   }
>
>   void main_dispatcher_init(SpiceCoreInterface *core)
>   {
> -    int channels[2];
> -
>       memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> -    main_dispatcher.core = core;
> -
> -    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> -        red_error("socketpair failed %s", strerror(errno));
> -        return;
> -    }
> -    pthread_mutex_init(&main_dispatcher.lock, NULL);
> -    main_dispatcher.main_fd = channels[0];
> -    main_dispatcher.other_fd = channels[1];
> -    main_dispatcher.self = pthread_self();
> -
> -    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> -                    main_dispatcher_handle_read,&main_dispatcher);
> +    dispatcher_init(&main_dispatcher.base, core, MAIN_DISPATCHER_NUM_MESSAGES,&main_dispatcher.base);
> +    dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> +                                main_dispatcher_handle_channel_event,
> +                                sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
>   }



More information about the Spice-devel mailing list