[Spice-devel] [RFC] server: add main_dispatcher

Alon Levy alevy at redhat.com
Wed Sep 7 23:15:53 PDT 2011


On Thu, Sep 08, 2011 at 02:24:32AM +0200, Marc-André Lureau wrote:
> Hi Alon,
> 
> On Thu, Sep 8, 2011 at 1:17 AM, Alon Levy <alevy at redhat.com> wrote:
> > ---
> >  server/Makefile.am       |    2 +
> >  server/main_dispatcher.c |  132 ++++++++++++++++++++++++++++++++++++++++++++++
> >  server/main_dispatcher.h |    9 +++
> >  server/reds.c            |    5 ++-
> >  4 files changed, 147 insertions(+), 1 deletions(-)
> >  create mode 100644 server/main_dispatcher.c
> >  create mode 100644 server/main_dispatcher.h
> >
> > diff --git a/server/Makefile.am b/server/Makefile.am
> > index a7cdd84..f1d4052 100644
> > --- a/server/Makefile.am
> > +++ b/server/Makefile.am
> > @@ -80,6 +80,8 @@ libspice_server_la_SOURCES =                  \
> >        red_common.h                            \
> >        red_dispatcher.c                        \
> >        red_dispatcher.h                        \
> > +       main_dispatcher.c           \
> > +       main_dispatcher.h           \
> >        red_memslots.c                          \
> >        red_memslots.h                          \
> >        red_parse_qxl.c                         \
> > diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> > new file mode 100644
> > index 0000000..8cdb2e0
> > --- /dev/null
> > +++ b/server/main_dispatcher.c
> > @@ -0,0 +1,132 @@
> > +#include <unistd.h>
> > +#include <string.h>
> > +#include <errno.h>
> > +#include <pthread.h>
> > +
> > +#include "common/spice_common.h"
> > +#include "main_dispatcher.h"
> > +
> > +/*
> > + * Main Dispatcher
> > + * ===============
> > + *
> > + * Communication channel between any non main thread and the main thread.
> > + *
> > + * The main thread is that from which spice_server_init is called.
> > + *
> > + * Messages are single sized, sent from the non-main thread to the main-thread.
> > + * No acknowledge is sent back. This prevents a possible deadlock with the main
> > + * thread already waiting on a response for the existing red_dispatcher used
> > + * by the worker thread.
> > + *
> 
> How often do you expect it to be used? What will be the overhead of
> those messages, size & rate? From the description, it looks a lot like
> the async message queues in Pulseaudio (see src/pulsecore/asyncq.h).
> PA uses eventfd for notification/semaphore & lock-free queues (and a
> simple object system, not unions). Just an idea, could be changed
> later perhaps. socketpair is probably fine.
> 

First of all I see I forgot to add a mutex, since I said "any non main thread", right
now we only have one, so no need to lock, but the idea is that it can also be called from
others, and so we will need it later. (unless that write can be considered atomic, but
I don't think it can)

To answer your question - infrequently. So I don't want / think we need to invest in
optimizing this. Right now the only usage is the channel event (core->channel_event)
calls, which have to happen in the main thread, and are currently done also from the
worker thread.

Thanks for the reference, I'll definitely check it. We already use a single socketpair
btw in red_dispatcher, this is a simple copy from there, I didn't actually create a
base "dispatcher" class because there are some differences and there isn't that much
shared code (differences: dispatcher does an ack, here I don't because otherwise we
can deadlock with the dispatcher. Also dispatcher has non union messages, here I wanted
to do the least code even if it meant some messages are larger then neccessary. I can
always just send a pointer, but then I must have acks to avoid freeing it.)

> Lennart, I wonder what today you think about it, if there is any
> drawback or improvements. ie would you use it again?
> 
> cheers
> 
> > + * All events have three functions:
> > + * main_dispatcher_<event_name> - non static, public function
> > + * main_dispatcher_self_<event_name> - handler for main thread
> > + * main_dispatcher_handle_<event_name> - handler for callback from main thread
> > + *   seperate from self because it may send an ack or do other work in the future.
> > + */
> > +
> > +typedef struct {
> > +    SpiceCoreInterface *core;
> > +    int main_fd;
> > +    int other_fd;
> > +    pthread_t self;
> > +} MainDispatcher;
> > +
> > +MainDispatcher main_dispatcher;
> > +
> > +enum {
> > +    MAIN_DISPATCHER_CHANNEL_EVENT = 0,
> > +
> > +    MAIN_DISPATCHER_NUM_MESSAGES
> > +};
> > +
> > +typedef struct MainDispatcherMessage {
> > +    uint32_t type;
> > +    union {
> > +        struct {
> > +            int event;
> > +            SpiceChannelEventInfo info;
> > +        } channel_event;
> > +    } data;
> > +} MainDispatcherMessage;
> > +
> > +
> > +/* channel_event - calls core->channel_event, must be done in main thread */
> > +static void main_dispatcher_self_handle_channel_event(int event,
> > +    SpiceChannelEventInfo *info)
> > +{
> > +    main_dispatcher.core->channel_event(event, info);
> > +}
> > +
> > +static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> > +{
> > +    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> > +        &msg->data.channel_event.info);
> > +}
> > +
> > +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> > +{
> > +    MainDispatcherMessage msg;
> > +
> > +    if (pthread_self() == main_dispatcher.self) {
> > +        main_dispatcher_self_handle_channel_event(event, info);
> > +        return;
> > +    }
> > +    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> > +    msg.data.channel_event.event = event;
> > +    msg.data.channel_event.info = *info;
> > +    write(main_dispatcher.other_fd, &msg, sizeof(msg));
> > +}
> > +
> > +
> > +static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> > +{
> > +    int ret;
> > +    MainDispatcher *md = opaque;
> > +    MainDispatcherMessage msg;
> > +
> > +    ret = read(md->main_fd, &msg, sizeof(msg));
> > +    if (ret == -1) {
> > +        if (errno == EAGAIN) {
> > +            return;
> > +        } else {
> > +            red_printf("error reading from main dispatcher: %d\n", errno);
> > +            /* TODO: close channel? */
> > +            return;
> > +        }
> > +    }
> > +    if (ret != sizeof(msg)) {
> > +        red_printf("short read in main dispatcher: %d < %zd\n", ret, sizeof(msg));
> > +        /* TODO: close channel?  */
> > +        return;
> > +    }
> > +
> > +    switch (msg.type) {
> > +        case MAIN_DISPATCHER_CHANNEL_EVENT:
> > +            main_dispatcher_handle_channel_event(&msg);
> > +            break;
> > +        default:
> > +            red_printf("error: unhandled main dispatcher message type %d\n",
> > +                       msg.type);
> > +    }
> > +}
> > +
> > +void main_dispatcher_init(SpiceCoreInterface *core)
> > +{
> > +    int channels[2];
> > +
> > +    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> > +    main_dispatcher.core = core;
> > +
> > +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> > +        red_error("socketpair failed %s", strerror(errno));
> > +        return;
> > +    }
> > +    main_dispatcher.main_fd = channels[0];
> > +    main_dispatcher.other_fd = channels[1];
> > +    main_dispatcher.self = pthread_self();
> > +
> > +    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> > +                    main_dispatcher_handle_read, &main_dispatcher);
> > +}
> > diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> > new file mode 100644
> > index 0000000..2c201c7
> > --- /dev/null
> > +++ b/server/main_dispatcher.h
> > @@ -0,0 +1,9 @@
> > +#ifndef MAIN_DISPATCHER_H
> > +#define MAIN_DISPATCHER_H
> > +
> > +#include <spice.h>
> > +
> > +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> > +void main_dispatcher_init(SpiceCoreInterface *core);
> > +
> > +#endif //MAIN_DISPATCHER_H
> > diff --git a/server/reds.c b/server/reds.c
> > index 90779ff..ef9da4f 100644
> > --- a/server/reds.c
> > +++ b/server/reds.c
> > @@ -56,6 +56,7 @@
> >  #include "main_channel.h"
> >  #include "red_common.h"
> >  #include "red_dispatcher.h"
> > +#include "main_dispatcher.h"
> >  #include "snd_worker.h"
> >  #include <spice/stats.h>
> >  #include "stat.h"
> > @@ -322,7 +323,8 @@ static void reds_stream_channel_event(RedsStream *s, int event)
> >  {
> >     if (core->base.minor_version < 3 || core->channel_event == NULL)
> >         return;
> > -    core->channel_event(event, &s->info);
> > +
> > +    main_dispatcher_channel_event(event, &s->info);
> >  }
> >
> >  static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> > @@ -3519,6 +3521,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
> >     init_vd_agent_resources();
> >     ring_init(&reds->clients);
> >     reds->num_clients = 0;
> > +    main_dispatcher_init(core);
> >
> >     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
> >         red_error("migration timer create failed");
> > --
> > 1.7.6.1
> >
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel at lists.freedesktop.org
> > http://lists.freedesktop.org/mailman/listinfo/spice-devel
> >
> 
> 
> 
> -- 
> Marc-André Lureau


More information about the Spice-devel mailing list