[Spice-devel] [PATCH spice-streaming-agent 2/3] Introduce OutboundMessages for the StreamPort class
Lukáš Hrázký
lhrazky at redhat.com
Mon Oct 8 13:01:19 UTC 2018
On Mon, 2018-10-08 at 07:46 -0400, Frediano Ziglio wrote:
> >
> > Heavily based on code by Christophe de Dinechin.
> >
> > Wraps the serialization code in the OutboundMessage class and its
> > descendants for each specific message. Uses Cruiously Recurring Template
> > Pattern (CRTP) to avoid runtime overhead of polymorphism.
> >
> > The messages are placed along with the code that sends them, this helps
> > to avoid header proliferation, e.g. the CursorMessage requires X11
> > headers for it's interface.
> >
> > Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> > ---
> > src/cursor-updater.cpp | 128 ++++++++++++++++++++--------------
> > src/spice-streaming-agent.cpp | 97 ++++++++++++--------------
> > src/stream-port.hpp | 35 ++++++++++
> > 3 files changed, 154 insertions(+), 106 deletions(-)
> >
> > diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp
> > index 8f65e83..9edb010 100644
> > --- a/src/cursor-updater.cpp
> > +++ b/src/cursor-updater.cpp
> > @@ -12,52 +12,62 @@
> > #include <spice/stream-device.h>
> > #include <spice/enums.h>
> >
> > -#include <cstring>
> > -#include <functional>
> > #include <memory>
> > +#include <vector>
> > +#include <unistd.h>
> > #include <X11/extensions/Xfixes.h>
> >
> >
> > namespace spice {
> > namespace streaming_agent {
> >
> > -namespace {
> > -
> > -void send_cursor(StreamPort &stream_port, unsigned width, unsigned height,
> > int hotspot_x, int hotspot_y,
> > - std::function<void(uint32_t *)> fill_cursor)
> > +class CursorError : public Error
> > {
> > - if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >=
> > STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> > - return;
> > - }
> > -
> > - size_t cursor_size =
> > - sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) +
> > - width * height * sizeof(uint32_t);
> > - std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]);
> > + using Error::Error;
> > +};
> >
> > - StreamDevHeader
> > &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get()));
> > - memset(&dev_hdr, 0, sizeof(dev_hdr));
> > - dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> > - dev_hdr.type = STREAM_TYPE_CURSOR_SET;
> > - dev_hdr.size = cursor_size - sizeof(StreamDevHeader);
> > -
> > - StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet
> > *>(msg.get() + sizeof(StreamDevHeader)));
> > - memset(&cursor_msg, 0, sizeof(cursor_msg));
> > +class CursorMessage : public OutboundMessage<StreamMsgCursorSet,
> > CursorMessage, STREAM_TYPE_CURSOR_SET>
> > +{
> > +public:
> > + CursorMessage(uint16_t width, uint16_t height, uint16_t xhot, uint16_t
> > yhot,
> > + const std::vector<uint32_t> &pixels)
> > + :
> > + OutboundMessage(pixels)
> > + {
> > + if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) {
> > + throw CursorError("Cursor width " + std::to_string(width) +
> > + " too big (limit is " +
> > std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")");
> > + }
> >
> > - cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA;
> > - cursor_msg.width = width;
> > - cursor_msg.height = height;
> > - cursor_msg.hot_spot_x = hotspot_x;
> > - cursor_msg.hot_spot_y = hotspot_y;
> > + if (height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> > + throw CursorError("Cursor height " + std::to_string(height) +
> > + " too big (limit is " +
> > std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")");
> > + }
> > + }
> >
> > - uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
> > - fill_cursor(pixels);
> > + static size_t size(const std::vector<uint32_t> &pixels)
> > + {
> > + return sizeof(PayloadType) + sizeof(uint32_t) * pixels.size();
> > + }
> >
> > - std::lock_guard<std::mutex> guard(stream_port.mutex);
> > - stream_port.write(msg.get(), cursor_size);
> > -}
> > + void write_message_body(StreamPort &stream_port,
> > + uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
> > + const std::vector<uint32_t> &pixels)
> > + {
> > + StreamMsgCursorSet msg = {
> > + .width = width,
> > + .height = height,
> > + .hot_spot_x = xhot,
> > + .hot_spot_y = yhot,
> > + .type = SPICE_CURSOR_TYPE_ALPHA,
> > + .padding1 = {},
> > + .data = {}
> > + };
>
> do not use C++20 features, we agreed on C++11.
I've used this based on previous discussion about designated
initializers, but it seems I've remembered wrong the outcome of that
discussion. Looking it up now, it seems the usage of designated
initializers was mostly rejected. I'll rewrite this to use the classic
initialization.
> > -} // namespace
> > + stream_port.write(&msg, sizeof(msg));
> > + stream_port.write(pixels.data(), sizeof(uint32_t) * pixels.size());
> > + }
> > +};
> >
> > CursorUpdater::CursorUpdater(StreamPort *stream_port) :
> > stream_port(stream_port)
> > {
> > @@ -79,27 +89,39 @@ CursorUpdater::CursorUpdater(StreamPort *stream_port) :
> > stream_port(stream_port)
> > unsigned long last_serial = 0;
> >
> > while (1) {
> > - XEvent event;
> > - XNextEvent(display, &event);
> > - if (event.type != xfixes_event_base + 1) {
> > - continue;
> > - }
> > -
> > - XFixesCursorImage *cursor = XFixesGetCursorImage(display);
> > - if (!cursor) {
> > - continue;
> > + try {
> > + XEvent event;
> > + XNextEvent(display, &event);
> > + if (event.type != xfixes_event_base + 1) {
> > + continue;
> > + }
> > +
> > + XFixesCursorImage *cursor = XFixesGetCursorImage(display);
> > + if (!cursor) {
> > + continue;
> > + }
> > +
> > + if (cursor->cursor_serial == last_serial) {
> > + continue;
> > + }
> > +
> > + last_serial = cursor->cursor_serial;
> > +
> > + // the X11 cursor data may be in a wrong format, copy them to an
> > uint32_t array
> > + size_t pixcount = cursor->width * cursor->height;
> > + std::vector<uint32_t> pixels;
> > + pixels.reserve(pixcount);
> > +
> > + for (size_t i = 0; i < pixcount; ++i) {
> > + pixels.push_back(cursor->pixels[i]);
> > + }
> > +
> > + stream_port->send<CursorMessage>(cursor->width, cursor->height,
> > + cursor->xhot, cursor->yhot,
> > pixels);
> > + } catch (const std::exception &e) {
> > + ::syslog(LOG_ERR, "Error in cursor updater thread: %s",
> > e.what());
> > + sleep(1); // rate-limit the error
> > }
> > -
> > - if (cursor->cursor_serial == last_serial) {
> > - continue;
> > - }
> > -
> > - last_serial = cursor->cursor_serial;
> > - auto fill_cursor = [cursor](uint32_t *pixels) {
> > - for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
> > - pixels[i] = cursor->pixels[i];
> > - };
> > - send_cursor(*stream_port, cursor->width, cursor->height,
> > cursor->xhot, cursor->yhot, fill_cursor);
> > }
> > }
> >
> > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > index a89ba3f..39c53bd 100644
> > --- a/src/spice-streaming-agent.cpp
> > +++ b/src/spice-streaming-agent.cpp
> > @@ -41,16 +41,51 @@ using namespace spice::streaming_agent;
> >
> > static ConcreteAgent agent;
> >
> > -struct SpiceStreamFormatMessage
> > +class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage,
> > STREAM_TYPE_FORMAT>
> > {
> > - StreamDevHeader hdr;
> > - StreamMsgFormat msg;
> > +public:
> > + FormatMessage(unsigned w, unsigned h, uint8_t c) {}
> > +
> > + static size_t size()
> > + {
> > + return sizeof(PayloadType);
> > + }
> > +
> > + void write_message_body(StreamPort &stream_port, unsigned w, unsigned h,
> > uint8_t c)
> > + {
> > + StreamMsgFormat msg = { .width = w, .height = h, .codec = c,
> > .padding1 = {} };
> > + stream_port.write(&msg, sizeof(msg));
> > + }
> > +};
> > +
> > +class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage,
> > STREAM_TYPE_DATA>
> > +{
> > +public:
> > + FrameMessage(const void *frame, size_t length) : OutboundMessage(length)
> > {}
> > +
> > + static size_t size(size_t length)
> > + {
> > + return sizeof(PayloadType) + length;
> > + }
> > +
> > + void write_message_body(StreamPort &stream_port, const void *frame,
> > size_t length)
> > + {
> > + stream_port.write(frame, length);
> > + }
> > };
> >
> > -struct SpiceStreamDataMessage
> > +class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities,
> > CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES>
> > {
> > - StreamDevHeader hdr;
> > - StreamMsgData msg;
> > +public:
> > + static size_t size()
> > + {
> > + return sizeof(PayloadType);
> > + }
> > +
> > + void write_message_body(StreamPort &stream_port)
> > + {
> > + // No body for capabilities message
> > + }
> > };
> >
> > static bool streaming_requested = false;
> > @@ -83,15 +118,7 @@ static void read_command_from_device(StreamPort
> > &stream_port)
> >
> > switch (in_message.header.type) {
> > case STREAM_TYPE_CAPABILITIES: {
> > - StreamDevHeader hdr = {
> > - STREAM_DEVICE_PROTOCOL,
> > - 0,
> > - STREAM_TYPE_CAPABILITIES,
> > - 0
> > - };
> > -
> > - std::lock_guard<std::mutex> guard(stream_port.mutex);
> > - stream_port.write(&hdr, sizeof(hdr));
> > + stream_port.send<CapabilitiesOutMessage>();
> > return;
> > } case STREAM_TYPE_NOTIFY_ERROR: {
> > NotifyErrorMessage msg =
> > in_message.get_payload<NotifyErrorMessage>();
> > @@ -128,42 +155,6 @@ static void read_command(StreamPort &stream_port, bool
> > blocking)
> > }
> > }
> >
> > -static void spice_stream_send_format(StreamPort &stream_port, unsigned w,
> > unsigned h, unsigned c)
> > -{
> > -
> > - SpiceStreamFormatMessage msg;
> > - const size_t msgsize = sizeof(msg);
> > - const size_t hdrsize = sizeof(msg.hdr);
> > - memset(&msg, 0, msgsize);
> > - msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> > - msg.hdr.type = STREAM_TYPE_FORMAT;
> > - msg.hdr.size = msgsize - hdrsize; /* includes only the body? */
> > - msg.msg.width = w;
> > - msg.msg.height = h;
> > - msg.msg.codec = c;
> > -
> > - syslog(LOG_DEBUG, "writing format");
> > - std::lock_guard<std::mutex> guard(stream_port.mutex);
> > - stream_port.write(&msg, msgsize);
> > -}
> > -
> > -static void spice_stream_send_frame(StreamPort &stream_port, const void
> > *buf, const unsigned size)
> > -{
> > - SpiceStreamDataMessage msg;
> > - const size_t msgsize = sizeof(msg);
> > -
> > - memset(&msg, 0, msgsize);
> > - msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> > - msg.hdr.type = STREAM_TYPE_DATA;
> > - msg.hdr.size = size; /* includes only the body? */
> > -
> > - std::lock_guard<std::mutex> guard(stream_port.mutex);
> > - stream_port.write(&msg, msgsize);
> > - stream_port.write(buf, size);
> > -
> > - syslog(LOG_DEBUG, "Sent a frame of size %u", size);
> > -}
> > -
> > static void handle_interrupt(int intr)
> > {
> > syslog(LOG_INFO, "Got signal %d, exiting", intr);
> > @@ -249,13 +240,13 @@ do_capture(StreamPort &stream_port, FrameLog
> > &frame_log)
> > syslog(LOG_DEBUG, "wXh %uX%u codec=%u", width, height,
> > codec);
> > frame_log.log_stat("Started new stream wXh %uX%u codec=%u",
> > width, height, codec);
> >
> > - spice_stream_send_format(stream_port, width, height, codec);
> > + stream_port.send<FormatMessage>(width, height, codec);
> > }
> > frame_log.log_stat("Frame of %zu bytes", frame.buffer_size);
> > frame_log.log_frame(frame.buffer, frame.buffer_size);
> >
> > try {
> > - spice_stream_send_frame(stream_port, frame.buffer,
> > frame.buffer_size);
> > + stream_port.send<FrameMessage>(frame.buffer,
> > frame.buffer_size);
> > } catch (const WriteError& e) {
> > syslog(e);
> > break;
> > diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> > index 090930b..6cb516b 100644
> > --- a/src/stream-port.hpp
> > +++ b/src/stream-port.hpp
> > @@ -60,12 +60,47 @@ public:
> >
> > InboundMessage receive();
> >
> > + template <typename Message, typename ...PayloadArgs>
> > + void send(PayloadArgs&&... payload_args)
> > + {
> > + Message message(payload_args...);
> > + std::lock_guard<std::mutex> stream_guard(mutex);
> > + message.write_header(*this);
> > + message.write_message_body(*this, payload_args...);
> > + }
> > +
> > void write(const void *buf, size_t len);
> >
> > int fd;
> > +
> > +private:
>
> OT: This looks great. Wondering if at the end also write and fd should
> be private.
I think it's on topic :) One of the reasons to do this refactor.
The fd is needed for the poll. We could use a getter for that. I
remember actually wanting to make it const, but in the end I forgot, so
I'll do at least that in the next iteration. I think it being const is
enough and don't see the need for a getter.
The write() method is used by the OutboundMessage class and I don't see
much way around it (the interaction between OutboundMessage and
StreamPort is a bit twisted, but it allows for separation and at the
same time zero copies of the data). We could use friend classes, but
then again, I'd just keep it simple...
Thanks,
Lukas
> > std::mutex mutex;
> > };
> >
> > +template <typename Payload, typename Message, unsigned Type>
> > +class OutboundMessage
> > +{
> > +public:
> > + template <typename ...PayloadArgs>
> > + OutboundMessage(PayloadArgs&&... payload_args) :
> > + hdr(StreamDevHeader {
> > + .protocol_version = STREAM_DEVICE_PROTOCOL,
> > + .padding = 0, // Workaround GCC bug "sorry: not implemented"
> > + .type = Type,
> > + .size = (uint32_t) Message::size(payload_args...)
> > + })
> > + {}
> > +
> > + void write_header(StreamPort &stream_port)
> > + {
> > + stream_port.write(&hdr, sizeof(hdr));
> > + }
> > +
> > +protected:
> > + StreamDevHeader hdr;
> > + using PayloadType = Payload;
> > +};
> > +
> > void read_all(int fd, void *buf, size_t len);
> > void write_all(int fd, const void *buf, size_t len);
> >
>
> Frediano
More information about the Spice-devel
mailing list