[Spice-devel] [PATCH spice-streaming-agent 2/3] Introduce OutboundMessages for the StreamPort class

Frediano Ziglio fziglio at redhat.com
Mon Oct 8 11:46:09 UTC 2018


> 
> Heavily based on code by Christophe de Dinechin.
> 
> Wraps the serialization code in the OutboundMessage class and its
> descendants for each specific message. Uses Cruiously Recurring Template
> Pattern (CRTP) to avoid runtime overhead of polymorphism.
> 
> The messages are placed along with the code that sends them, this helps
> to avoid header proliferation, e.g. the CursorMessage requires X11
> headers for it's interface.
> 
> Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> ---
>  src/cursor-updater.cpp        | 128 ++++++++++++++++++++--------------
>  src/spice-streaming-agent.cpp |  97 ++++++++++++--------------
>  src/stream-port.hpp           |  35 ++++++++++
>  3 files changed, 154 insertions(+), 106 deletions(-)
> 
> diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp
> index 8f65e83..9edb010 100644
> --- a/src/cursor-updater.cpp
> +++ b/src/cursor-updater.cpp
> @@ -12,52 +12,62 @@
>  #include <spice/stream-device.h>
>  #include <spice/enums.h>
>  
> -#include <cstring>
> -#include <functional>
>  #include <memory>
> +#include <vector>
> +#include <unistd.h>
>  #include <X11/extensions/Xfixes.h>
>  
>  
>  namespace spice {
>  namespace streaming_agent {
>  
> -namespace {
> -
> -void send_cursor(StreamPort &stream_port, unsigned width, unsigned height,
> int hotspot_x, int hotspot_y,
> -                 std::function<void(uint32_t *)> fill_cursor)
> +class CursorError : public Error
>  {
> -    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >=
> STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> -        return;
> -    }
> -
> -    size_t cursor_size =
> -        sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) +
> -        width * height * sizeof(uint32_t);
> -    std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]);
> +    using Error::Error;
> +};
>  
> -    StreamDevHeader
> &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get()));
> -    memset(&dev_hdr, 0, sizeof(dev_hdr));
> -    dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> -    dev_hdr.type = STREAM_TYPE_CURSOR_SET;
> -    dev_hdr.size = cursor_size - sizeof(StreamDevHeader);
> -
> -    StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet
> *>(msg.get() + sizeof(StreamDevHeader)));
> -    memset(&cursor_msg, 0, sizeof(cursor_msg));
> +class CursorMessage : public OutboundMessage<StreamMsgCursorSet,
> CursorMessage, STREAM_TYPE_CURSOR_SET>
> +{
> +public:
> +    CursorMessage(uint16_t width, uint16_t height, uint16_t xhot, uint16_t
> yhot,
> +        const std::vector<uint32_t> &pixels)
> +    :
> +        OutboundMessage(pixels)
> +    {
> +        if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) {
> +            throw CursorError("Cursor width " + std::to_string(width) +
> +                " too big (limit is " +
> std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")");
> +        }
>  
> -    cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA;
> -    cursor_msg.width = width;
> -    cursor_msg.height = height;
> -    cursor_msg.hot_spot_x = hotspot_x;
> -    cursor_msg.hot_spot_y = hotspot_y;
> +        if (height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> +            throw CursorError("Cursor height " + std::to_string(height) +
> +                " too big (limit is " +
> std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")");
> +        }
> +    }
>  
> -    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
> -    fill_cursor(pixels);
> +    static size_t size(const std::vector<uint32_t> &pixels)
> +    {
> +        return sizeof(PayloadType) + sizeof(uint32_t) * pixels.size();
> +    }
>  
> -    std::lock_guard<std::mutex> guard(stream_port.mutex);
> -    stream_port.write(msg.get(), cursor_size);
> -}
> +    void write_message_body(StreamPort &stream_port,
> +        uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
> +        const std::vector<uint32_t> &pixels)
> +    {
> +        StreamMsgCursorSet msg = {
> +            .width = width,
> +            .height = height,
> +            .hot_spot_x = xhot,
> +            .hot_spot_y = yhot,
> +            .type = SPICE_CURSOR_TYPE_ALPHA,
> +            .padding1 = {},
> +            .data = {}
> +        };

do not use C++20 features, we agreed on C++11.

>  
> -} // namespace
> +        stream_port.write(&msg, sizeof(msg));
> +        stream_port.write(pixels.data(), sizeof(uint32_t) * pixels.size());
> +    }
> +};
>  
>  CursorUpdater::CursorUpdater(StreamPort *stream_port) :
>  stream_port(stream_port)
>  {
> @@ -79,27 +89,39 @@ CursorUpdater::CursorUpdater(StreamPort *stream_port) :
> stream_port(stream_port)
>      unsigned long last_serial = 0;
>  
>      while (1) {
> -        XEvent event;
> -        XNextEvent(display, &event);
> -        if (event.type != xfixes_event_base + 1) {
> -            continue;
> -        }
> -
> -        XFixesCursorImage *cursor = XFixesGetCursorImage(display);
> -        if (!cursor) {
> -            continue;
> +        try {
> +            XEvent event;
> +            XNextEvent(display, &event);
> +            if (event.type != xfixes_event_base + 1) {
> +                continue;
> +            }
> +
> +            XFixesCursorImage *cursor = XFixesGetCursorImage(display);
> +            if (!cursor) {
> +                continue;
> +            }
> +
> +            if (cursor->cursor_serial == last_serial) {
> +                continue;
> +            }
> +
> +            last_serial = cursor->cursor_serial;
> +
> +            // the X11 cursor data may be in a wrong format, copy them to an
> uint32_t array
> +            size_t pixcount = cursor->width * cursor->height;
> +            std::vector<uint32_t> pixels;
> +            pixels.reserve(pixcount);
> +
> +            for (size_t i = 0; i < pixcount; ++i) {
> +                pixels.push_back(cursor->pixels[i]);
> +            }
> +
> +            stream_port->send<CursorMessage>(cursor->width, cursor->height,
> +                                             cursor->xhot, cursor->yhot,
> pixels);
> +        } catch (const std::exception &e) {
> +            ::syslog(LOG_ERR, "Error in cursor updater thread: %s",
> e.what());
> +            sleep(1); // rate-limit the error
>          }
> -
> -        if (cursor->cursor_serial == last_serial) {
> -            continue;
> -        }
> -
> -        last_serial = cursor->cursor_serial;
> -        auto fill_cursor = [cursor](uint32_t *pixels) {
> -            for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
> -                pixels[i] = cursor->pixels[i];
> -        };
> -        send_cursor(*stream_port, cursor->width, cursor->height,
> cursor->xhot, cursor->yhot, fill_cursor);
>      }
>  }
>  
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index a89ba3f..39c53bd 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -41,16 +41,51 @@ using namespace spice::streaming_agent;
>  
>  static ConcreteAgent agent;
>  
> -struct SpiceStreamFormatMessage
> +class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage,
> STREAM_TYPE_FORMAT>
>  {
> -    StreamDevHeader hdr;
> -    StreamMsgFormat msg;
> +public:
> +    FormatMessage(unsigned w, unsigned h, uint8_t c) {}
> +
> +    static size_t size()
> +    {
> +        return sizeof(PayloadType);
> +    }
> +
> +    void write_message_body(StreamPort &stream_port, unsigned w, unsigned h,
> uint8_t c)
> +    {
> +        StreamMsgFormat msg = { .width = w, .height = h, .codec = c,
> .padding1 = {} };
> +        stream_port.write(&msg, sizeof(msg));
> +    }
> +};
> +
> +class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage,
> STREAM_TYPE_DATA>
> +{
> +public:
> +    FrameMessage(const void *frame, size_t length) : OutboundMessage(length)
> {}
> +
> +    static size_t size(size_t length)
> +    {
> +        return sizeof(PayloadType) + length;
> +    }
> +
> +    void write_message_body(StreamPort &stream_port, const void *frame,
> size_t length)
> +    {
> +        stream_port.write(frame, length);
> +    }
>  };
>  
> -struct SpiceStreamDataMessage
> +class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities,
> CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES>
>  {
> -    StreamDevHeader hdr;
> -    StreamMsgData msg;
> +public:
> +    static size_t size()
> +    {
> +        return sizeof(PayloadType);
> +    }
> +
> +    void write_message_body(StreamPort &stream_port)
> +    {
> +        // No body for capabilities message
> +    }
>  };
>  
>  static bool streaming_requested = false;
> @@ -83,15 +118,7 @@ static void read_command_from_device(StreamPort
> &stream_port)
>  
>      switch (in_message.header.type) {
>      case STREAM_TYPE_CAPABILITIES: {
> -        StreamDevHeader hdr = {
> -            STREAM_DEVICE_PROTOCOL,
> -            0,
> -            STREAM_TYPE_CAPABILITIES,
> -            0
> -        };
> -
> -        std::lock_guard<std::mutex> guard(stream_port.mutex);
> -        stream_port.write(&hdr, sizeof(hdr));
> +        stream_port.send<CapabilitiesOutMessage>();
>          return;
>      } case STREAM_TYPE_NOTIFY_ERROR: {
>          NotifyErrorMessage msg =
>          in_message.get_payload<NotifyErrorMessage>();
> @@ -128,42 +155,6 @@ static void read_command(StreamPort &stream_port, bool
> blocking)
>      }
>  }
>  
> -static void spice_stream_send_format(StreamPort &stream_port, unsigned w,
> unsigned h, unsigned c)
> -{
> -
> -    SpiceStreamFormatMessage msg;
> -    const size_t msgsize = sizeof(msg);
> -    const size_t hdrsize  = sizeof(msg.hdr);
> -    memset(&msg, 0, msgsize);
> -    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> -    msg.hdr.type = STREAM_TYPE_FORMAT;
> -    msg.hdr.size = msgsize - hdrsize; /* includes only the body? */
> -    msg.msg.width = w;
> -    msg.msg.height = h;
> -    msg.msg.codec = c;
> -
> -    syslog(LOG_DEBUG, "writing format");
> -    std::lock_guard<std::mutex> guard(stream_port.mutex);
> -    stream_port.write(&msg, msgsize);
> -}
> -
> -static void spice_stream_send_frame(StreamPort &stream_port, const void
> *buf, const unsigned size)
> -{
> -    SpiceStreamDataMessage msg;
> -    const size_t msgsize = sizeof(msg);
> -
> -    memset(&msg, 0, msgsize);
> -    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
> -    msg.hdr.type = STREAM_TYPE_DATA;
> -    msg.hdr.size = size; /* includes only the body? */
> -
> -    std::lock_guard<std::mutex> guard(stream_port.mutex);
> -    stream_port.write(&msg, msgsize);
> -    stream_port.write(buf, size);
> -
> -    syslog(LOG_DEBUG, "Sent a frame of size %u", size);
> -}
> -
>  static void handle_interrupt(int intr)
>  {
>      syslog(LOG_INFO, "Got signal %d, exiting", intr);
> @@ -249,13 +240,13 @@ do_capture(StreamPort &stream_port, FrameLog
> &frame_log)
>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u", width, height,
>                  codec);
>                  frame_log.log_stat("Started new stream wXh %uX%u codec=%u",
>                  width, height, codec);
>  
> -                spice_stream_send_format(stream_port, width, height, codec);
> +                stream_port.send<FormatMessage>(width, height, codec);
>              }
>              frame_log.log_stat("Frame of %zu bytes", frame.buffer_size);
>              frame_log.log_frame(frame.buffer, frame.buffer_size);
>  
>              try {
> -                spice_stream_send_frame(stream_port, frame.buffer,
> frame.buffer_size);
> +                stream_port.send<FrameMessage>(frame.buffer,
> frame.buffer_size);
>              } catch (const WriteError& e) {
>                  syslog(e);
>                  break;
> diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> index 090930b..6cb516b 100644
> --- a/src/stream-port.hpp
> +++ b/src/stream-port.hpp
> @@ -60,12 +60,47 @@ public:
>  
>      InboundMessage receive();
>  
> +    template <typename Message, typename ...PayloadArgs>
> +    void send(PayloadArgs&&... payload_args)
> +    {
> +        Message message(payload_args...);
> +        std::lock_guard<std::mutex> stream_guard(mutex);
> +        message.write_header(*this);
> +        message.write_message_body(*this, payload_args...);
> +    }
> +
>      void write(const void *buf, size_t len);
>  
>      int fd;
> +
> +private:

OT: This looks great. Wondering if at the end also write and fd should
be private.

>      std::mutex mutex;
>  };
>  
> +template <typename Payload, typename Message, unsigned Type>
> +class OutboundMessage
> +{
> +public:
> +    template <typename ...PayloadArgs>
> +    OutboundMessage(PayloadArgs&&... payload_args) :
> +        hdr(StreamDevHeader {
> +            .protocol_version = STREAM_DEVICE_PROTOCOL,
> +            .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> +            .type = Type,
> +            .size = (uint32_t) Message::size(payload_args...)
> +        })
> +    {}
> +
> +    void write_header(StreamPort &stream_port)
> +    {
> +        stream_port.write(&hdr, sizeof(hdr));
> +    }
> +
> +protected:
> +    StreamDevHeader hdr;
> +    using PayloadType = Payload;
> +};
> +
>  void read_all(int fd, void *buf, size_t len);
>  void write_all(int fd, const void *buf, size_t len);
>  

Frediano


More information about the Spice-devel mailing list