[Spice-devel] [PATCH v2 2/6] server: introduce dispatcher

Yonit Halperin yhalperi at redhat.com
Mon Nov 7 23:31:27 PST 2011


Hi,
2 comments bellow
On 11/07/2011 01:44 PM, Alon Levy wrote:
> used for main_dispatcher only in this patch.
>
> Dispatcher is meant to be used for Main<->any low frequency messages.
>
> It's interface is meant to include the red_dispatcher usage:
>   fixed size messages per message type
>   some messages require an ack
>
> Some methods are added to be used by RedDispatcher later:
>   dispatcher_handle_read - to be called directly by RedDispatcher epoll
>    based loop
>   dispatcher_set_opaque - to be set from red_worker pthread
>   dispatcher_init - allow NULL core as used by red_worker
>
> Read and Write behavior: (changed from RFC)
>   Sender: blocking write, blocking read for ack (if any).
>   Reader: non blocking check for 4 bytes (message type is uint32_t),
>    blocking read for the rest of the message, repeat until fail to
>    get message_type.
>
> FDO Bugzilla: 42463
>
> v1->v2:
>   drop O_NONBLOCK, use poll (Paolo Bonzini)
>   write_size/written_size (Paolo)
>   fold red_dispatcher change into dispatcher introduction
>
> RFC->v1:
>   read_safe, write_safe instead of read_with_eintr (Paolo Bonzini)
>   read_safe can be blocking or non blocking
>   statics where missing, fix read_safe arithmetic (Yonit Halperin)
>   dispatcher_handle_read: read until no data available. will block
>    if the start of a message is available. (Yonit)
>   renamed DispatcherMessage's send_ack to ack (Yonit)
> ---
>   server/Makefile.am       |    2 +
>   server/dispatcher.c      |  206 ++++++++++++++++++++++++++++++++++++++++++++++
>   server/dispatcher.h      |   81 ++++++++++++++++++
>   server/main_dispatcher.c |  102 ++++++-----------------
>   4 files changed, 316 insertions(+), 75 deletions(-)
>   create mode 100644 server/dispatcher.c
>   create mode 100644 server/dispatcher.h
>
> diff --git a/server/Makefile.am b/server/Makefile.am
> index 418d707..34a6b47 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -78,6 +78,8 @@ libspice_server_la_SOURCES =			\
>   	red_client_cache.h			\
>   	red_client_shared_cache.h		\
>   	red_common.h				\
> +	dispatcher.c				\
> +	dispatcher.h				\
>   	red_dispatcher.c			\
>   	red_dispatcher.h			\
>   	main_dispatcher.c			\
> diff --git a/server/dispatcher.c b/server/dispatcher.c
> new file mode 100644
> index 0000000..e2973a4
> --- /dev/null
> +++ b/server/dispatcher.c
> @@ -0,0 +1,206 @@
> +#include<unistd.h>
> +#include<errno.h>
> +#include<assert.h>
> +#include<string.h>
> +#include<pthread.h>
> +#include<fcntl.h>
> +#include<poll.h>
> +
> +#include "mem.h"
> +#include "spice_common.h"
> +#include "dispatcher.h"
> +
> +#define ACK 0xffffffff
> +
> +/*
> + * read_safe
> + * helper. reads until size bytes accumulated in buf, if an error other then
> + * EINTR is encountered returns -1, otherwise returns 0.
> + * @block if 1 the read will block (the fd is always blocking).
> + *        if 0 poll first, return immediately if no bytes available, otherwise
> + *         read size in blocking mode.
> + */
> +static int read_safe(int fd, void *buf, size_t size, int block)
> +{
> +    int read_size = 0;
> +    int ret;
> +    struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0};
> +
> +    if (!block) {
> +        ret = poll(&pollfd, 1, 0);
> +        if (ret == -1) {
What if (ernno == EINTR)? shouldn't you call poll again? If you don't 
read from the fd now, will you receive another read event?
> +            return -1;
> +        }
> +        if (!(pollfd.revents&  POLLIN)) {
> +            return 0;
> +        }
> +    }
> +    while (read_size<  size) {
> +        ret = read(fd, buf + read_size, size - read_size);
> +        if (ret == -1) {
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +            return -1;
> +        }
> +        if (ret == 0) {
> +            red_error("broken pipe on read");
> +            errno = ECONNRESET; /* huh? */
> +            return -1;
> +        }
> +        read_size += ret;
> +    }
> +    return read_size;
> +}
> +
> +/*
> + * write_safe
> + * @return -1 for error, otherwise number of written bytes. may be zero.
> + */
> +static int write_safe(int fd, void *buf, size_t size)
> +{
> +    int written_size = 0;
> +    int ret;
> +
> +    while (written_size<  size) {
> +        ret = write(fd, buf + written_size, size - written_size);
> +        if (ret == -1) {
> +            if (errno != EINTR) {
> +                return -1;
> +            }
> +            continue;
> +        }
> +        written_size += ret;
> +    }
> +    return written_size;
> +}
> +
> +static int dispatcher_handle_single_read(Dispatcher *dispatcher)
> +{
> +    int ret;
> +    uint32_t type;
> +    DispatcherMessage *msg = NULL;
> +    uint8_t *payload = dispatcher->payload;
> +    uint32_t ack = ACK;
> +
> +    if ((ret = read_safe(dispatcher->recv_fd,&type, sizeof(type), 0)) == -1) {
> +        red_printf("error reading from dispatcher: %d\n", errno);
> +        return 0;
> +    }
> +    if (ret == 0) {
> +        /* no messsage */
> +        return 0;
> +    }
> +    msg =&dispatcher->messages[type];
> +    /* we call read_safe even if msg->size == 0, this has the side
> +     * effect of setting the fd to block, so the remaining send_data
> +     * users will get a blocking fd as expected. */
obsolete comment (and the poll is not necessary if  msg->size == 0)
> +    if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
> +        red_printf("error reading from dispatcher: %d\n", errno);
> +        /* TODO: close socketpair? */
> +        return 0;
> +    }
> +    if (msg->handler) {
> +        msg->handler(dispatcher->opaque, (void *)payload);
> +    } else {
> +        red_printf("error: no handler for message type %d\n", type);
> +    }
> +    if (msg->ack&&  write_safe(dispatcher->recv_fd,
> +&ack, sizeof(ack)) == -1) {
> +        red_printf("error writing ack for message %d\n", type);
> +        /* TODO: close socketpair? */
> +    }
> +    return 1;
> +}
> +
> +/*
> + * dispatcher_handle_recv_read
> + * doesn't handle being in the middle of a message. all reads are blocking.
> + */
> +void dispatcher_handle_recv_read(Dispatcher *dispatcher)
> +{
> +    while (dispatcher_handle_single_read(dispatcher)) {
> +    }
> +}
> +
> +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> +                             void *payload)
> +{
> +    DispatcherMessage *msg;
> +    uint32_t ack;
> +    int send_fd = dispatcher->send_fd;
> +
> +    assert(dispatcher->max_message_type>  message_type);
> +    assert(dispatcher->messages[message_type].handler);
> +    msg =&dispatcher->messages[message_type];
> +    pthread_mutex_lock(&dispatcher->lock);
> +    if (write_safe(send_fd,&message_type, sizeof(message_type)) == -1) {
> +        red_printf("error: failed to send message type for message %d\n",
> +                   message_type);
> +        goto unlock;
> +    }
> +    if (write_safe(send_fd, payload, msg->size) == -1) {
> +        red_printf("error: failed to send message body for message %d\n",
> +                   message_type);
> +        goto unlock;
> +    }
> +    if (msg->ack) {
> +        if (read_safe(send_fd,&ack, sizeof(ack), 1) == -1) {
missing error print, goto unlock and return?
> +        }
> +        if (ack != ACK) {
> +            red_printf("error: got wrong ack value in dispatcher "
> +                       "for message %d\n", message_type);
> +            /* TODO handling error? */
> +        }
> +    }
> +unlock:
> +    pthread_mutex_unlock(&dispatcher->lock);
> +}
> +
> +void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> +                                 dispatcher_handle_message handler, size_t size,
> +                                 int ack)
> +{
> +    DispatcherMessage *msg;
> +
> +    assert(message_type<  dispatcher->max_message_type);
> +    assert(dispatcher->messages[message_type].handler == 0);
> +    msg =&dispatcher->messages[message_type];
> +    msg->handler = handler;
> +    msg->size = size;
> +    msg->ack = ack;
> +    if (msg->size>  dispatcher->payload_size) {
> +        dispatcher->payload = realloc(dispatcher->payload, msg->size);
> +        dispatcher->payload_size = msg->size;
> +    }
> +}
> +
> +void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
> +                     void *opaque)
> +{
> +    int channels[2];
> +
> +    dispatcher->opaque = opaque;
> +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> +        red_error("socketpair failed %s", strerror(errno));
> +        return;
> +    }
> +    pthread_mutex_init(&dispatcher->lock, NULL);
> +    dispatcher->recv_fd = channels[0];
> +    dispatcher->send_fd = channels[1];
> +    dispatcher->self = pthread_self();
> +
> +    dispatcher->messages = spice_malloc0_n(max_message_type,
> +                                           sizeof(dispatcher->messages[0]));
> +    dispatcher->max_message_type = max_message_type;
> +}
> +
> +void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
> +{
> +    dispatcher->opaque = opaque;
> +}
> +
> +int dispatcher_get_recv_fd(Dispatcher *dispatcher)
> +{
> +    return dispatcher->recv_fd;
> +}
> diff --git a/server/dispatcher.h b/server/dispatcher.h
> new file mode 100644
> index 0000000..04e6b46
> --- /dev/null
> +++ b/server/dispatcher.h
> @@ -0,0 +1,81 @@
> +#ifndef MAIN_DISPATCHER_H
> +#define MAIN_DISPATCHER_H
> +
> +#include<spice.h>
> +
> +typedef struct Dispatcher Dispatcher;
> +
> +typedef void (*dispatcher_handle_message)(void *opaque,
> +                                          void *payload);
> +
> +typedef struct DispatcherMessage {
> +    size_t size;
> +    int ack;
> +    dispatcher_handle_message handler;
> +} DispatcherMessage;
> +
> +struct Dispatcher {
> +    SpiceCoreInterface *recv_core;
> +    int recv_fd;
> +    int send_fd;
> +    pthread_t self;
> +    pthread_mutex_t lock;
> +    DispatcherMessage *messages;
> +    int stage;  /* message parser stage - sender has no stages */
> +    size_t max_message_type;
> +    void *payload; /* allocated as max of message sizes */
> +    size_t payload_size; /* used to track realloc calls */
> +    void *opaque;
> +};
> +
> +/*
> + * dispatcher_send_message
> + * @message_type: message type
> + * @payload:      payload
> + */
> +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
> +                             void *payload);
> +
> +/*
> + * dispatcher_init
> + * @max_message_type: number of message types. Allows upfront allocation
> + *  of a DispatcherMessage list.
> + * up front, and registration in any order wanted.
> + */
> +void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
> +                     void *opaque);
> +
> +/*
> + * dispatcher_register_handler
> + * @dispatcher:           dispatcher
> + * @messsage_type:  message type
> + * @handler:        message handler
> + * @size:           message size. Each type has a fixed associated size.
> + * @ack:            send an ack. This is per message type - you can't send the
> + *                  same message type with and without. Register two different
> + *                  messages if that is what you want.
> + */
> +void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
> +                                 dispatcher_handle_message handler, size_t size,
> +                                 int ack);
> +
> +/*
> + *  dispatcher_handle_recv_read
> + *  @dispatcher: Dispatcher instance
> + */
> +void dispatcher_handle_recv_read(Dispatcher *);
> +
> +/*
> + *  dispatcher_get_recv_fd
> + *  @return: receive file descriptor of the dispatcher
> + */
> +int dispatcher_get_recv_fd(Dispatcher *);
> +
> +/*
> + * dispatcher_set_opaque
> + * @dispatcher: Dispatcher instance
> + * @opaque: opaque to use for callbacks
> + */
> +void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
> +
> +#endif //MAIN_DISPATCHER_H
> diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> index 73856bf..a5967fa 100644
> --- a/server/main_dispatcher.c
> +++ b/server/main_dispatcher.c
> @@ -5,6 +5,7 @@
>   #include<assert.h>
>
>   #include "red_common.h"
> +#include "dispatcher.h"
>   #include "main_dispatcher.h"
>
>   /*
> @@ -28,11 +29,8 @@
>    */
>
>   typedef struct {
> +    Dispatcher base;
>       SpiceCoreInterface *core;
> -    int main_fd;
> -    int other_fd;
> -    pthread_t self;
> -    pthread_mutex_t lock;
>   } MainDispatcher;
>
>   MainDispatcher main_dispatcher;
> @@ -43,15 +41,10 @@ enum {
>       MAIN_DISPATCHER_NUM_MESSAGES
>   };
>
> -typedef struct MainDispatcherMessage {
> -    uint32_t type;
> -    union {
> -        struct {
> -            int event;
> -            SpiceChannelEventInfo *info;
> -        } channel_event;
> -    } data;
> -} MainDispatcherMessage;
> +typedef struct MainDispatcherChannelEventMessage {
> +    int event;
> +    SpiceChannelEventInfo *info;
> +} MainDispatcherChannelEventMessage;
>
>   /* channel_event - calls core->channel_event, must be done in main thread */
>   static void main_dispatcher_self_handle_channel_event(
> @@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
>       main_dispatcher.core->channel_event(event, info);
>   }
>
> -static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> +static void main_dispatcher_handle_channel_event(void *opaque,
> +                                                 void *payload)
>   {
> -    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> -                                              msg->data.channel_event.info);
> +    MainDispatcherChannelEventMessage *channel_event = payload;
> +
> +    main_dispatcher_self_handle_channel_event(channel_event->event,
> +                                              channel_event->info);
>   }
>
>   void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
>   {
> -    MainDispatcherMessage msg;
> -    ssize_t written = 0;
> -    ssize_t ret;
> +    MainDispatcherChannelEventMessage msg;
>
> -    if (pthread_self() == main_dispatcher.self) {
> +    if (pthread_self() == main_dispatcher.base.self) {
>           main_dispatcher_self_handle_channel_event(event, info);
>           return;
>       }
> -    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> -    msg.data.channel_event.event = event;
> -    msg.data.channel_event.info = info;
> -    pthread_mutex_lock(&main_dispatcher.lock);
> -    while (written<  sizeof(msg)) {
> -        ret = write(main_dispatcher.other_fd,&msg + written,
> -                    sizeof(msg) - written);
> -        if (ret == -1) {
> -            assert(errno == EINTR);
> -            continue;
> -        }
> -        written += ret;
> -    }
> -    pthread_mutex_unlock(&main_dispatcher.lock);
> +    msg.event = event;
> +    msg.info = info;
> +    dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> +&msg);
>   }
>
> -
> -static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> +static void dispatcher_handle_read(int fd, int event, void *opaque)
>   {
> -    int ret;
> -    MainDispatcher *md = opaque;
> -    MainDispatcherMessage msg;
> -    int read_size = 0;
> +    Dispatcher *dispatcher = opaque;
>
> -    while (read_size<  sizeof(msg)) {
> -        /* blocks until sizeof(msg) is read */
> -        ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size);
> -        if (ret == -1) {
> -            if (errno != EINTR) {
> -                red_printf("error reading from main dispatcher: %d\n", errno);
> -                /* TODO: close channel? */
> -                return;
> -            }
> -            continue;
> -        }
> -        read_size += ret;
> -    }
> -    switch (msg.type) {
> -        case MAIN_DISPATCHER_CHANNEL_EVENT:
> -            main_dispatcher_handle_channel_event(&msg);
> -            break;
> -        default:
> -            red_printf("error: unhandled main dispatcher message type %d\n",
> -                       msg.type);
> -    }
> +    dispatcher_handle_recv_read(dispatcher);
>   }
>
>   void main_dispatcher_init(SpiceCoreInterface *core)
>   {
> -    int channels[2];
> -
>       memset(&main_dispatcher, 0, sizeof(main_dispatcher));
>       main_dispatcher.core = core;
> -
> -    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> -        red_error("socketpair failed %s", strerror(errno));
> -        return;
> -    }
> -    pthread_mutex_init(&main_dispatcher.lock, NULL);
> -    main_dispatcher.main_fd = channels[0];
> -    main_dispatcher.other_fd = channels[1];
> -    main_dispatcher.self = pthread_self();
> -
> -    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> -                    main_dispatcher_handle_read,&main_dispatcher);
> +    dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES,&main_dispatcher.base);
> +    core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
> +                    dispatcher_handle_read,&main_dispatcher.base);
> +    dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
> +                                main_dispatcher_handle_channel_event,
> +                                sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
>   }



More information about the Spice-devel mailing list