[Spice-devel] [PATCH v4] [0.8 branch] server: add main_dispatcher
Yonit Halperin
yhalperi at redhat.com
Mon Oct 24 07:18:49 PDT 2011
ACK
On 10/24/2011 02:25 PM, Alon Levy wrote:
> add main_dispatcher, a message passing mechanism for sending messages to
> the main thread. The main thread is the thread that implements
> SpiceCoreInterface, which is assumed to be a single thread.
>
> Similar to the async operation of red_worker, a socket pair is created
> and used to pass messages. The messages are a fixed size to ease
> parsing. A single message is defined to pass a channel_event.
>
> RHBZ: 746950
> FDBZ: 41858
>
> This patch is 0.8 branch only, for the master branch there should be a
> better approach to share code with red_dispatcher and ready the way for
> later adding more threads.
> ---
> v3->v4:
> handle EINTR, not EPIPE or EAGAIN (yonit)
> fix whitespace (yonit)
>
> v2->v3:
> Resending with the write loop change.
>
> v1->v2:
> channel_event: push a pointer, not a copy, like the old code did.
> add a lock.
> loop on write until all bytes written.
> loop on read until all bytes read.
>
> server/Makefile.am | 2 +
> server/main_dispatcher.c | 145 ++++++++++++++++++++++++++++++++++++++++++++++
> server/main_dispatcher.h | 9 +++
> server/reds.c | 4 +-
> 4 files changed, 159 insertions(+), 1 deletions(-)
> create mode 100644 server/main_dispatcher.c
> create mode 100644 server/main_dispatcher.h
>
> diff --git a/server/Makefile.am b/server/Makefile.am
> index 93ed312..ebb0d3f 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -114,6 +114,8 @@ libspice_server_la_SOURCES = \
> red_common.h \
> red_dispatcher.c \
> red_dispatcher.h \
> + main_dispatcher.c \
> + main_dispatcher.h \
> red_memslots.c \
> red_memslots.h \
> red_parse_qxl.c \
> diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> new file mode 100644
> index 0000000..73856bf
> --- /dev/null
> +++ b/server/main_dispatcher.c
> @@ -0,0 +1,145 @@
> +#include<unistd.h>
> +#include<string.h>
> +#include<errno.h>
> +#include<pthread.h>
> +#include<assert.h>
> +
> +#include "red_common.h"
> +#include "main_dispatcher.h"
> +
> +/*
> + * Main Dispatcher
> + * ===============
> + *
> + * Communication channel between any non main thread and the main thread.
> + *
> + * The main thread is that from which spice_server_init is called.
> + *
> + * Messages are single sized, sent from the non-main thread to the main-thread.
> + * No acknowledge is sent back. This prevents a possible deadlock with the main
> + * thread already waiting on a response for the existing red_dispatcher used
> + * by the worker thread.
> + *
> + * All events have three functions:
> + * main_dispatcher_<event_name> - non static, public function
> + * main_dispatcher_self_<event_name> - handler for main thread
> + * main_dispatcher_handle_<event_name> - handler for callback from main thread
> + * seperate from self because it may send an ack or do other work in the future.
> + */
> +
> +typedef struct {
> + SpiceCoreInterface *core;
> + int main_fd;
> + int other_fd;
> + pthread_t self;
> + pthread_mutex_t lock;
> +} MainDispatcher;
> +
> +MainDispatcher main_dispatcher;
> +
> +enum {
> + MAIN_DISPATCHER_CHANNEL_EVENT = 0,
> +
> + MAIN_DISPATCHER_NUM_MESSAGES
> +};
> +
> +typedef struct MainDispatcherMessage {
> + uint32_t type;
> + union {
> + struct {
> + int event;
> + SpiceChannelEventInfo *info;
> + } channel_event;
> + } data;
> +} MainDispatcherMessage;
> +
> +/* channel_event - calls core->channel_event, must be done in main thread */
> +static void main_dispatcher_self_handle_channel_event(
> + int event,
> + SpiceChannelEventInfo *info)
> +{
> + main_dispatcher.core->channel_event(event, info);
> +}
> +
> +static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> +{
> + main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> + msg->data.channel_event.info);
> +}
> +
> +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> +{
> + MainDispatcherMessage msg;
> + ssize_t written = 0;
> + ssize_t ret;
> +
> + if (pthread_self() == main_dispatcher.self) {
> + main_dispatcher_self_handle_channel_event(event, info);
> + return;
> + }
> + msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> + msg.data.channel_event.event = event;
> + msg.data.channel_event.info = info;
> + pthread_mutex_lock(&main_dispatcher.lock);
> + while (written< sizeof(msg)) {
> + ret = write(main_dispatcher.other_fd,&msg + written,
> + sizeof(msg) - written);
> + if (ret == -1) {
> + assert(errno == EINTR);
> + continue;
> + }
> + written += ret;
> + }
> + pthread_mutex_unlock(&main_dispatcher.lock);
> +}
> +
> +
> +static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> +{
> + int ret;
> + MainDispatcher *md = opaque;
> + MainDispatcherMessage msg;
> + int read_size = 0;
> +
> + while (read_size< sizeof(msg)) {
> + /* blocks until sizeof(msg) is read */
> + ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size);
> + if (ret == -1) {
> + if (errno != EINTR) {
> + red_printf("error reading from main dispatcher: %d\n", errno);
> + /* TODO: close channel? */
> + return;
> + }
> + continue;
> + }
> + read_size += ret;
> + }
> + switch (msg.type) {
> + case MAIN_DISPATCHER_CHANNEL_EVENT:
> + main_dispatcher_handle_channel_event(&msg);
> + break;
> + default:
> + red_printf("error: unhandled main dispatcher message type %d\n",
> + msg.type);
> + }
> +}
> +
> +void main_dispatcher_init(SpiceCoreInterface *core)
> +{
> + int channels[2];
> +
> + memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> + main_dispatcher.core = core;
> +
> + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> + red_error("socketpair failed %s", strerror(errno));
> + return;
> + }
> + pthread_mutex_init(&main_dispatcher.lock, NULL);
> + main_dispatcher.main_fd = channels[0];
> + main_dispatcher.other_fd = channels[1];
> + main_dispatcher.self = pthread_self();
> +
> + core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> + main_dispatcher_handle_read,&main_dispatcher);
> +}
> diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> new file mode 100644
> index 0000000..2c201c7
> --- /dev/null
> +++ b/server/main_dispatcher.h
> @@ -0,0 +1,9 @@
> +#ifndef MAIN_DISPATCHER_H
> +#define MAIN_DISPATCHER_H
> +
> +#include<spice.h>
> +
> +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> +void main_dispatcher_init(SpiceCoreInterface *core);
> +
> +#endif //MAIN_DISPATCHER_H
> diff --git a/server/reds.c b/server/reds.c
> index 8e83b99..0bb7e96 100644
> --- a/server/reds.c
> +++ b/server/reds.c
> @@ -53,6 +53,7 @@
>
> #include "red_common.h"
> #include "red_dispatcher.h"
> +#include "main_dispatcher.h"
> #include "snd_worker.h"
> #include<spice/stats.h>
> #include "stat.h"
> @@ -415,7 +416,7 @@ static void reds_channel_event(RedsStream *stream, int event)
> {
> if (core->base.minor_version< 3 || core->channel_event == NULL)
> return;
> - core->channel_event(event,&stream->info);
> + main_dispatcher_channel_event(event,&stream->info);
> }
>
> static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> @@ -4685,6 +4686,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
> reds->outgoing.vec = reds->outgoing.vec_buf;
>
> init_vd_agent_resources();
> + main_dispatcher_init(core);
>
> if (!(reds->mig_timer = core->timer_add(migrate_timeout, NULL))) {
> red_error("migration timer create failed");
More information about the Spice-devel
mailing list