[Spice-devel] [PATCH spice-streaming-agent 2/3] Introduce OutboundMessages for the StreamPort class
Frediano Ziglio
fziglio at redhat.com
Mon Oct 8 11:46:09 UTC 2018
>
> Heavily based on code by Christophe de Dinechin.
>
> Wraps the serialization code in the OutboundMessage class and its
> descendants for each specific message. Uses Cruiously Recurring Template
> Pattern (CRTP) to avoid runtime overhead of polymorphism.
>
> The messages are placed along with the code that sends them, this helps
> to avoid header proliferation, e.g. the CursorMessage requires X11
> headers for it's interface.
>
> Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> ---
> src/cursor-updater.cpp | 128 ++++++++++++++++++++--------------
> src/spice-streaming-agent.cpp | 97 ++++++++++++--------------
> src/stream-port.hpp | 35 ++++++++++
> 3 files changed, 154 insertions(+), 106 deletions(-)
>
> diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp
> index 8f65e83..9edb010 100644
> --- a/src/cursor-updater.cpp
> +++ b/src/cursor-updater.cpp
> @@ -12,52 +12,62 @@
> #include <spice/stream-device.h>
> #include <spice/enums.h>
>
> -#include <cstring>
> -#include <functional>
> #include <memory>
> +#include <vector>
> +#include <unistd.h>
> #include <X11/extensions/Xfixes.h>
>
>
> namespace spice {
> namespace streaming_agent {
>
> -namespace {
> -
> -void send_cursor(StreamPort &stream_port, unsigned width, unsigned height,
> int hotspot_x, int hotspot_y,
> - std::function<void(uint32_t *)> fill_cursor)
> +class CursorError : public Error
> {
> - if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >=
> STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> - return;
> - }
> -
> - size_t cursor_size =
> - sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) +
> - width * height * sizeof(uint32_t);
> - std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]);
> + using Error::Error;
> +};
>
> - StreamDevHeader
> &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get()));
> - memset(&dev_hdr, 0, sizeof(dev_hdr));
> - dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> - dev_hdr.type = STREAM_TYPE_CURSOR_SET;
> - dev_hdr.size = cursor_size - sizeof(StreamDevHeader);
> -
> - StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet
> *>(msg.get() + sizeof(StreamDevHeader)));
> - memset(&cursor_msg, 0, sizeof(cursor_msg));
> +class CursorMessage : public OutboundMessage<StreamMsgCursorSet,
> CursorMessage, STREAM_TYPE_CURSOR_SET>
> +{
> +public:
> + CursorMessage(uint16_t width, uint16_t height, uint16_t xhot, uint16_t
> yhot,
> + const std::vector<uint32_t> &pixels)
> + :
> + OutboundMessage(pixels)
> + {
> + if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) {
> + throw CursorError("Cursor width " + std::to_string(width) +
> + " too big (limit is " +
> std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")");
> + }
>
> - cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA;
> - cursor_msg.width = width;
> - cursor_msg.height = height;
> - cursor_msg.hot_spot_x = hotspot_x;
> - cursor_msg.hot_spot_y = hotspot_y;
> + if (height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> + throw CursorError("Cursor height " + std::to_string(height) +
> + " too big (limit is " +
> std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")");
> + }
> + }
>
> - uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
> - fill_cursor(pixels);
> + static size_t size(const std::vector<uint32_t> &pixels)
> + {
> + return sizeof(PayloadType) + sizeof(uint32_t) * pixels.size();
> + }
>
> - std::lock_guard<std::mutex> guard(stream_port.mutex);
> - stream_port.write(msg.get(), cursor_size);
> -}
> + void write_message_body(StreamPort &stream_port,
> + uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
> + const std::vector<uint32_t> &pixels)
> + {
> + StreamMsgCursorSet msg = {
> + .width = width,
> + .height = height,
> + .hot_spot_x = xhot,
> + .hot_spot_y = yhot,
> + .type = SPICE_CURSOR_TYPE_ALPHA,
> + .padding1 = {},
> + .data = {}
> + };
do not use C++20 features, we agreed on C++11.
>
> -} // namespace
> + stream_port.write(&msg, sizeof(msg));
> + stream_port.write(pixels.data(), sizeof(uint32_t) * pixels.size());
> + }
> +};
>
> CursorUpdater::CursorUpdater(StreamPort *stream_port) :
> stream_port(stream_port)
> {
> @@ -79,27 +89,39 @@ CursorUpdater::CursorUpdater(StreamPort *stream_port) :
> stream_port(stream_port)
> unsigned long last_serial = 0;
>
> while (1) {
> - XEvent event;
> - XNextEvent(display, &event);
> - if (event.type != xfixes_event_base + 1) {
> - continue;
> - }
> -
> - XFixesCursorImage *cursor = XFixesGetCursorImage(display);
> - if (!cursor) {
> - continue;
> + try {
> + XEvent event;
> + XNextEvent(display, &event);
> + if (event.type != xfixes_event_base + 1) {
> + continue;
> + }
> +
> + XFixesCursorImage *cursor = XFixesGetCursorImage(display);
> + if (!cursor) {
> + continue;
> + }
> +
> + if (cursor->cursor_serial == last_serial) {
> + continue;
> + }
> +
> + last_serial = cursor->cursor_serial;
> +
> + // the X11 cursor data may be in a wrong format, copy them to an
> uint32_t array
> + size_t pixcount = cursor->width * cursor->height;
> + std::vector<uint32_t> pixels;
> + pixels.reserve(pixcount);
> +
> + for (size_t i = 0; i < pixcount; ++i) {
> + pixels.push_back(cursor->pixels[i]);
> + }
> +
> + stream_port->send<CursorMessage>(cursor->width, cursor->height,
> + cursor->xhot, cursor->yhot,
> pixels);
> + } catch (const std::exception &e) {
> + ::syslog(LOG_ERR, "Error in cursor updater thread: %s",
> e.what());
> + sleep(1); // rate-limit the error
> }
> -
> - if (cursor->cursor_serial == last_serial) {
> - continue;
> - }
> -
> - last_serial = cursor->cursor_serial;
> - auto fill_cursor = [cursor](uint32_t *pixels) {
> - for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
> - pixels[i] = cursor->pixels[i];
> - };
> - send_cursor(*stream_port, cursor->width, cursor->height,
> cursor->xhot, cursor->yhot, fill_cursor);
> }
> }
>
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index a89ba3f..39c53bd 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -41,16 +41,51 @@ using namespace spice::streaming_agent;
>
> static ConcreteAgent agent;
>
> -struct SpiceStreamFormatMessage
> +class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage,
> STREAM_TYPE_FORMAT>
> {
> - StreamDevHeader hdr;
> - StreamMsgFormat msg;
> +public:
> + FormatMessage(unsigned w, unsigned h, uint8_t c) {}
> +
> + static size_t size()
> + {
> + return sizeof(PayloadType);
> + }
> +
> + void write_message_body(StreamPort &stream_port, unsigned w, unsigned h,
> uint8_t c)
> + {
> + StreamMsgFormat msg = { .width = w, .height = h, .codec = c,
> .padding1 = {} };
> + stream_port.write(&msg, sizeof(msg));
> + }
> +};
> +
> +class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage,
> STREAM_TYPE_DATA>
> +{
> +public:
> + FrameMessage(const void *frame, size_t length) : OutboundMessage(length)
> {}
> +
> + static size_t size(size_t length)
> + {
> + return sizeof(PayloadType) + length;
> + }
> +
> + void write_message_body(StreamPort &stream_port, const void *frame,
> size_t length)
> + {
> + stream_port.write(frame, length);
> + }
> };
>
> -struct SpiceStreamDataMessage
> +class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities,
> CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES>
> {
> - StreamDevHeader hdr;
> - StreamMsgData msg;
> +public:
> + static size_t size()
> + {
> + return sizeof(PayloadType);
> + }
> +
> + void write_message_body(StreamPort &stream_port)
> + {
> + // No body for capabilities message
> + }
> };
>
> static bool streaming_requested = false;
> @@ -83,15 +118,7 @@ static void read_command_from_device(StreamPort
> &stream_port)
>
> switch (in_message.header.type) {
> case STREAM_TYPE_CAPABILITIES: {
> - StreamDevHeader hdr = {
> - STREAM_DEVICE_PROTOCOL,
> - 0,
> - STREAM_TYPE_CAPABILITIES,
> - 0
> - };
> -
> - std::lock_guard<std::mutex> guard(stream_port.mutex);
> - stream_port.write(&hdr, sizeof(hdr));
> + stream_port.send<CapabilitiesOutMessage>();
> return;
> } case STREAM_TYPE_NOTIFY_ERROR: {
> NotifyErrorMessage msg =
> in_message.get_payload<NotifyErrorMessage>();
> @@ -128,42 +155,6 @@ static void read_command(StreamPort &stream_port, bool
> blocking)
> }
> }
>
> -static void spice_stream_send_format(StreamPort &stream_port, unsigned w,
> unsigned h, unsigned c)
> -{
> -
> - SpiceStreamFormatMessage msg;
> - const size_t msgsize = sizeof(msg);
> - const size_t hdrsize = sizeof(msg.hdr);
> - memset(&msg, 0, msgsize);
> - msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> - msg.hdr.type = STREAM_TYPE_FORMAT;
> - msg.hdr.size = msgsize - hdrsize; /* includes only the body? */
> - msg.msg.width = w;
> - msg.msg.height = h;
> - msg.msg.codec = c;
> -
> - syslog(LOG_DEBUG, "writing format");
> - std::lock_guard<std::mutex> guard(stream_port.mutex);
> - stream_port.write(&msg, msgsize);
> -}
> -
> -static void spice_stream_send_frame(StreamPort &stream_port, const void
> *buf, const unsigned size)
> -{
> - SpiceStreamDataMessage msg;
> - const size_t msgsize = sizeof(msg);
> -
> - memset(&msg, 0, msgsize);
> - msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> - msg.hdr.type = STREAM_TYPE_DATA;
> - msg.hdr.size = size; /* includes only the body? */
> -
> - std::lock_guard<std::mutex> guard(stream_port.mutex);
> - stream_port.write(&msg, msgsize);
> - stream_port.write(buf, size);
> -
> - syslog(LOG_DEBUG, "Sent a frame of size %u", size);
> -}
> -
> static void handle_interrupt(int intr)
> {
> syslog(LOG_INFO, "Got signal %d, exiting", intr);
> @@ -249,13 +240,13 @@ do_capture(StreamPort &stream_port, FrameLog
> &frame_log)
> syslog(LOG_DEBUG, "wXh %uX%u codec=%u", width, height,
> codec);
> frame_log.log_stat("Started new stream wXh %uX%u codec=%u",
> width, height, codec);
>
> - spice_stream_send_format(stream_port, width, height, codec);
> + stream_port.send<FormatMessage>(width, height, codec);
> }
> frame_log.log_stat("Frame of %zu bytes", frame.buffer_size);
> frame_log.log_frame(frame.buffer, frame.buffer_size);
>
> try {
> - spice_stream_send_frame(stream_port, frame.buffer,
> frame.buffer_size);
> + stream_port.send<FrameMessage>(frame.buffer,
> frame.buffer_size);
> } catch (const WriteError& e) {
> syslog(e);
> break;
> diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> index 090930b..6cb516b 100644
> --- a/src/stream-port.hpp
> +++ b/src/stream-port.hpp
> @@ -60,12 +60,47 @@ public:
>
> InboundMessage receive();
>
> + template <typename Message, typename ...PayloadArgs>
> + void send(PayloadArgs&&... payload_args)
> + {
> + Message message(payload_args...);
> + std::lock_guard<std::mutex> stream_guard(mutex);
> + message.write_header(*this);
> + message.write_message_body(*this, payload_args...);
> + }
> +
> void write(const void *buf, size_t len);
>
> int fd;
> +
> +private:
OT: This looks great. Wondering if at the end also write and fd should
be private.
> std::mutex mutex;
> };
>
> +template <typename Payload, typename Message, unsigned Type>
> +class OutboundMessage
> +{
> +public:
> + template <typename ...PayloadArgs>
> + OutboundMessage(PayloadArgs&&... payload_args) :
> + hdr(StreamDevHeader {
> + .protocol_version = STREAM_DEVICE_PROTOCOL,
> + .padding = 0, // Workaround GCC bug "sorry: not implemented"
> + .type = Type,
> + .size = (uint32_t) Message::size(payload_args...)
> + })
> + {}
> +
> + void write_header(StreamPort &stream_port)
> + {
> + stream_port.write(&hdr, sizeof(hdr));
> + }
> +
> +protected:
> + StreamDevHeader hdr;
> + using PayloadType = Payload;
> +};
> +
> void read_all(int fd, void *buf, size_t len);
> void write_all(int fd, const void *buf, size_t len);
>
Frediano
More information about the Spice-devel
mailing list