[Spice-devel] [PATCH spice-streaming-agent 2/3] Introduce OutboundMessages for the StreamPort class

Lukáš Hrázký lhrazky at redhat.com
Tue Jul 10 14:51:59 UTC 2018


Heavily based on code by Christophe de Dinechin.

Wraps the serialization code in the OutboundMessage class and its
descendants for each specific message. Uses Cruiously Recurring Template
Pattern (CRTP) to avoid runtime overhead of polymorphism.

The messages are placed along with the code that sends them, this helps
to avoid header proliferation, e.g. the CursorMessage requires X11
headers for it's interface.

Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
---
 src/cursor-updater.cpp        | 128 ++++++++++++++++++++--------------
 src/spice-streaming-agent.cpp |  97 ++++++++++++--------------
 src/stream-port.hpp           |  35 ++++++++++
 3 files changed, 154 insertions(+), 106 deletions(-)

diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp
index 8f65e83..9edb010 100644
--- a/src/cursor-updater.cpp
+++ b/src/cursor-updater.cpp
@@ -12,52 +12,62 @@
 #include <spice/stream-device.h>
 #include <spice/enums.h>
 
-#include <cstring>
-#include <functional>
 #include <memory>
+#include <vector>
+#include <unistd.h>
 #include <X11/extensions/Xfixes.h>
 
 
 namespace spice {
 namespace streaming_agent {
 
-namespace {
-
-void send_cursor(StreamPort &stream_port, unsigned width, unsigned height, int hotspot_x, int hotspot_y,
-                 std::function<void(uint32_t *)> fill_cursor)
+class CursorError : public Error
 {
-    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
-        return;
-    }
-
-    size_t cursor_size =
-        sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) +
-        width * height * sizeof(uint32_t);
-    std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]);
+    using Error::Error;
+};
 
-    StreamDevHeader &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get()));
-    memset(&dev_hdr, 0, sizeof(dev_hdr));
-    dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    dev_hdr.type = STREAM_TYPE_CURSOR_SET;
-    dev_hdr.size = cursor_size - sizeof(StreamDevHeader);
-
-    StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet *>(msg.get() + sizeof(StreamDevHeader)));
-    memset(&cursor_msg, 0, sizeof(cursor_msg));
+class CursorMessage : public OutboundMessage<StreamMsgCursorSet, CursorMessage, STREAM_TYPE_CURSOR_SET>
+{
+public:
+    CursorMessage(uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
+        const std::vector<uint32_t> &pixels)
+    :
+        OutboundMessage(pixels)
+    {
+        if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) {
+            throw CursorError("Cursor width " + std::to_string(width) +
+                " too big (limit is " + std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")");
+        }
 
-    cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA;
-    cursor_msg.width = width;
-    cursor_msg.height = height;
-    cursor_msg.hot_spot_x = hotspot_x;
-    cursor_msg.hot_spot_y = hotspot_y;
+        if (height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
+            throw CursorError("Cursor height " + std::to_string(height) +
+                " too big (limit is " + std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")");
+        }
+    }
 
-    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
-    fill_cursor(pixels);
+    static size_t size(const std::vector<uint32_t> &pixels)
+    {
+        return sizeof(PayloadType) + sizeof(uint32_t) * pixels.size();
+    }
 
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(msg.get(), cursor_size);
-}
+    void write_message_body(StreamPort &stream_port,
+        uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
+        const std::vector<uint32_t> &pixels)
+    {
+        StreamMsgCursorSet msg = {
+            .width = width,
+            .height = height,
+            .hot_spot_x = xhot,
+            .hot_spot_y = yhot,
+            .type = SPICE_CURSOR_TYPE_ALPHA,
+            .padding1 = {},
+            .data = {}
+        };
 
-} // namespace
+        stream_port.write(&msg, sizeof(msg));
+        stream_port.write(pixels.data(), sizeof(uint32_t) * pixels.size());
+    }
+};
 
 CursorUpdater::CursorUpdater(StreamPort *stream_port) : stream_port(stream_port)
 {
@@ -79,27 +89,39 @@ CursorUpdater::CursorUpdater(StreamPort *stream_port) : stream_port(stream_port)
     unsigned long last_serial = 0;
 
     while (1) {
-        XEvent event;
-        XNextEvent(display, &event);
-        if (event.type != xfixes_event_base + 1) {
-            continue;
-        }
-
-        XFixesCursorImage *cursor = XFixesGetCursorImage(display);
-        if (!cursor) {
-            continue;
+        try {
+            XEvent event;
+            XNextEvent(display, &event);
+            if (event.type != xfixes_event_base + 1) {
+                continue;
+            }
+
+            XFixesCursorImage *cursor = XFixesGetCursorImage(display);
+            if (!cursor) {
+                continue;
+            }
+
+            if (cursor->cursor_serial == last_serial) {
+                continue;
+            }
+
+            last_serial = cursor->cursor_serial;
+
+            // the X11 cursor data may be in a wrong format, copy them to an uint32_t array
+            size_t pixcount = cursor->width * cursor->height;
+            std::vector<uint32_t> pixels;
+            pixels.reserve(pixcount);
+
+            for (size_t i = 0; i < pixcount; ++i) {
+                pixels.push_back(cursor->pixels[i]);
+            }
+
+            stream_port->send<CursorMessage>(cursor->width, cursor->height,
+                                             cursor->xhot, cursor->yhot, pixels);
+        } catch (const std::exception &e) {
+            ::syslog(LOG_ERR, "Error in cursor updater thread: %s", e.what());
+            sleep(1); // rate-limit the error
         }
-
-        if (cursor->cursor_serial == last_serial) {
-            continue;
-        }
-
-        last_serial = cursor->cursor_serial;
-        auto fill_cursor = [cursor](uint32_t *pixels) {
-            for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
-                pixels[i] = cursor->pixels[i];
-        };
-        send_cursor(*stream_port, cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
     }
 }
 
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index a89ba3f..39c53bd 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -41,16 +41,51 @@ using namespace spice::streaming_agent;
 
 static ConcreteAgent agent;
 
-struct SpiceStreamFormatMessage
+class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
 {
-    StreamDevHeader hdr;
-    StreamMsgFormat msg;
+public:
+    FormatMessage(unsigned w, unsigned h, uint8_t c) {}
+
+    static size_t size()
+    {
+        return sizeof(PayloadType);
+    }
+
+    void write_message_body(StreamPort &stream_port, unsigned w, unsigned h, uint8_t c)
+    {
+        StreamMsgFormat msg = { .width = w, .height = h, .codec = c, .padding1 = {} };
+        stream_port.write(&msg, sizeof(msg));
+    }
+};
+
+class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage, STREAM_TYPE_DATA>
+{
+public:
+    FrameMessage(const void *frame, size_t length) : OutboundMessage(length) {}
+
+    static size_t size(size_t length)
+    {
+        return sizeof(PayloadType) + length;
+    }
+
+    void write_message_body(StreamPort &stream_port, const void *frame, size_t length)
+    {
+        stream_port.write(frame, length);
+    }
 };
 
-struct SpiceStreamDataMessage
+class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities, CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES>
 {
-    StreamDevHeader hdr;
-    StreamMsgData msg;
+public:
+    static size_t size()
+    {
+        return sizeof(PayloadType);
+    }
+
+    void write_message_body(StreamPort &stream_port)
+    {
+        // No body for capabilities message
+    }
 };
 
 static bool streaming_requested = false;
@@ -83,15 +118,7 @@ static void read_command_from_device(StreamPort &stream_port)
 
     switch (in_message.header.type) {
     case STREAM_TYPE_CAPABILITIES: {
-        StreamDevHeader hdr = {
-            STREAM_DEVICE_PROTOCOL,
-            0,
-            STREAM_TYPE_CAPABILITIES,
-            0
-        };
-
-        std::lock_guard<std::mutex> guard(stream_port.mutex);
-        stream_port.write(&hdr, sizeof(hdr));
+        stream_port.send<CapabilitiesOutMessage>();
         return;
     } case STREAM_TYPE_NOTIFY_ERROR: {
         NotifyErrorMessage msg = in_message.get_payload<NotifyErrorMessage>();
@@ -128,42 +155,6 @@ static void read_command(StreamPort &stream_port, bool blocking)
     }
 }
 
-static void spice_stream_send_format(StreamPort &stream_port, unsigned w, unsigned h, unsigned c)
-{
-
-    SpiceStreamFormatMessage msg;
-    const size_t msgsize = sizeof(msg);
-    const size_t hdrsize  = sizeof(msg.hdr);
-    memset(&msg, 0, msgsize);
-    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    msg.hdr.type = STREAM_TYPE_FORMAT;
-    msg.hdr.size = msgsize - hdrsize; /* includes only the body? */
-    msg.msg.width = w;
-    msg.msg.height = h;
-    msg.msg.codec = c;
-
-    syslog(LOG_DEBUG, "writing format");
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(&msg, msgsize);
-}
-
-static void spice_stream_send_frame(StreamPort &stream_port, const void *buf, const unsigned size)
-{
-    SpiceStreamDataMessage msg;
-    const size_t msgsize = sizeof(msg);
-
-    memset(&msg, 0, msgsize);
-    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    msg.hdr.type = STREAM_TYPE_DATA;
-    msg.hdr.size = size; /* includes only the body? */
-
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(&msg, msgsize);
-    stream_port.write(buf, size);
-
-    syslog(LOG_DEBUG, "Sent a frame of size %u", size);
-}
-
 static void handle_interrupt(int intr)
 {
     syslog(LOG_INFO, "Got signal %d, exiting", intr);
@@ -249,13 +240,13 @@ do_capture(StreamPort &stream_port, FrameLog &frame_log)
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u", width, height, codec);
                 frame_log.log_stat("Started new stream wXh %uX%u codec=%u", width, height, codec);
 
-                spice_stream_send_format(stream_port, width, height, codec);
+                stream_port.send<FormatMessage>(width, height, codec);
             }
             frame_log.log_stat("Frame of %zu bytes", frame.buffer_size);
             frame_log.log_frame(frame.buffer, frame.buffer_size);
 
             try {
-                spice_stream_send_frame(stream_port, frame.buffer, frame.buffer_size);
+                stream_port.send<FrameMessage>(frame.buffer, frame.buffer_size);
             } catch (const WriteError& e) {
                 syslog(e);
                 break;
diff --git a/src/stream-port.hpp b/src/stream-port.hpp
index 090930b..6cb516b 100644
--- a/src/stream-port.hpp
+++ b/src/stream-port.hpp
@@ -60,12 +60,47 @@ public:
 
     InboundMessage receive();
 
+    template <typename Message, typename ...PayloadArgs>
+    void send(PayloadArgs&&... payload_args)
+    {
+        Message message(payload_args...);
+        std::lock_guard<std::mutex> stream_guard(mutex);
+        message.write_header(*this);
+        message.write_message_body(*this, payload_args...);
+    }
+
     void write(const void *buf, size_t len);
 
     int fd;
+
+private:
     std::mutex mutex;
 };
 
+template <typename Payload, typename Message, unsigned Type>
+class OutboundMessage
+{
+public:
+    template <typename ...PayloadArgs>
+    OutboundMessage(PayloadArgs&&... payload_args) :
+        hdr(StreamDevHeader {
+            .protocol_version = STREAM_DEVICE_PROTOCOL,
+            .padding = 0,     // Workaround GCC bug "sorry: not implemented"
+            .type = Type,
+            .size = (uint32_t) Message::size(payload_args...)
+        })
+    {}
+
+    void write_header(StreamPort &stream_port)
+    {
+        stream_port.write(&hdr, sizeof(hdr));
+    }
+
+protected:
+    StreamDevHeader hdr;
+    using PayloadType = Payload;
+};
+
 void read_all(int fd, void *buf, size_t len);
 void write_all(int fd, const void *buf, size_t len);
 
-- 
2.17.1



More information about the Spice-devel mailing list