[Spice-devel] [PATCH spice-streaming-agent v3 2/2] Introduce OutboundMessages for the StreamPort class

Lukáš Hrázký lhrazky at redhat.com
Fri Oct 12 12:07:11 UTC 2018


Heavily based on code by Christophe de Dinechin.

Wraps the serialization code in the OutboundMessage class and its
descendants for each specific message. Uses Cruiously Recurring Template
Pattern (CRTP) to avoid runtime overhead of polymorphism.

The messages are placed along with the code that sends them, this helps
to avoid header proliferation, e.g. the CursorMessage requires X11
headers for it's interface.

Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
---
 src/cursor-updater.cpp        | 127 +++++++++++++++++++---------------
 src/spice-streaming-agent.cpp | 101 +++++++++++++--------------
 src/stream-port.hpp           |  33 +++++++++
 3 files changed, 154 insertions(+), 107 deletions(-)

diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp
index 9f39c7b..f0412db 100644
--- a/src/cursor-updater.cpp
+++ b/src/cursor-updater.cpp
@@ -12,52 +12,59 @@
 #include <spice/stream-device.h>
 #include <spice/enums.h>
 
-#include <cstring>
-#include <functional>
 #include <memory>
+#include <vector>
+#include <unistd.h>
 #include <X11/extensions/Xfixes.h>
 
 
 namespace spice {
 namespace streaming_agent {
 
-namespace {
-
-void send_cursor(StreamPort &stream_port, unsigned width, unsigned height, int hotspot_x, int hotspot_y,
-                 std::function<void(uint32_t *)> fill_cursor)
+class CursorError : public Error
 {
-    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
-        return;
-    }
-
-    size_t cursor_size =
-        sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) +
-        width * height * sizeof(uint32_t);
-    std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]);
-
-    StreamDevHeader &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get()));
-    memset(&dev_hdr, 0, sizeof(dev_hdr));
-    dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    dev_hdr.type = STREAM_TYPE_CURSOR_SET;
-    dev_hdr.size = cursor_size - sizeof(StreamDevHeader);
-
-    StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet *>(msg.get() + sizeof(StreamDevHeader)));
-    memset(&cursor_msg, 0, sizeof(cursor_msg));
+    using Error::Error;
+};
 
-    cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA;
-    cursor_msg.width = width;
-    cursor_msg.height = height;
-    cursor_msg.hot_spot_x = hotspot_x;
-    cursor_msg.hot_spot_y = hotspot_y;
+class CursorMessage : public OutboundMessage<StreamMsgCursorSet, CursorMessage, STREAM_TYPE_CURSOR_SET>
+{
+public:
+    CursorMessage(uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
+        const std::vector<uint32_t> &pixels)
+    :
+        OutboundMessage(pixels)
+    {
+        if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) {
+            throw CursorError("Cursor width " + std::to_string(width) +
+                " too big (limit is " + std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")");
+        }
 
-    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
-    fill_cursor(pixels);
+        if (height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
+            throw CursorError("Cursor height " + std::to_string(height) +
+                " too big (limit is " + std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")");
+        }
+    }
 
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(msg.get(), cursor_size);
-}
+    static size_t size(const std::vector<uint32_t> &pixels)
+    {
+        return sizeof(PayloadType) + sizeof(uint32_t) * pixels.size();
+    }
 
-} // namespace
+    void write_message_body(StreamPort &stream_port,
+        uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot,
+        const std::vector<uint32_t> &pixels)
+    {
+        StreamMsgCursorSet msg{};
+        msg.type = SPICE_CURSOR_TYPE_ALPHA;
+        msg.width = width;
+        msg.height = height;
+        msg.hot_spot_x = xhot;
+        msg.hot_spot_y = yhot;
+
+        stream_port.write(&msg, sizeof(msg));
+        stream_port.write(pixels.data(), sizeof(uint32_t) * pixels.size());
+    }
+};
 
 CursorUpdater::CursorUpdater(StreamPort *stream_port) : stream_port(stream_port)
 {
@@ -79,27 +86,39 @@ void CursorUpdater::operator()()
     unsigned long last_serial = 0;
 
     while (1) {
-        XEvent event;
-        XNextEvent(display, &event);
-        if (event.type != xfixes_event_base + 1) {
-            continue;
-        }
-
-        XFixesCursorImage *cursor = XFixesGetCursorImage(display);
-        if (!cursor) {
-            continue;
-        }
-
-        if (cursor->cursor_serial == last_serial) {
-            continue;
+        try {
+            XEvent event;
+            XNextEvent(display, &event);
+            if (event.type != xfixes_event_base + 1) {
+                continue;
+            }
+
+            XFixesCursorImage *cursor = XFixesGetCursorImage(display);
+            if (!cursor) {
+                continue;
+            }
+
+            if (cursor->cursor_serial == last_serial) {
+                continue;
+            }
+
+            last_serial = cursor->cursor_serial;
+
+            // the X11 cursor data may be in a wrong format, copy them to an uint32_t array
+            size_t pixcount = cursor->width * cursor->height;
+            std::vector<uint32_t> pixels;
+            pixels.reserve(pixcount);
+
+            for (size_t i = 0; i < pixcount; ++i) {
+                pixels.push_back(cursor->pixels[i]);
+            }
+
+            stream_port->send<CursorMessage>(cursor->width, cursor->height,
+                                             cursor->xhot, cursor->yhot, pixels);
+        } catch (const std::exception &e) {
+            ::syslog(LOG_ERR, "Error in cursor updater thread: %s", e.what());
+            sleep(1); // rate-limit the error
         }
-
-        last_serial = cursor->cursor_serial;
-        auto fill_cursor = [cursor](uint32_t *pixels) {
-            for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
-                pixels[i] = cursor->pixels[i];
-        };
-        send_cursor(*stream_port, cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
     }
 }
 
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index b6e77de..36d0692 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -41,16 +41,55 @@ using namespace spice::streaming_agent;
 
 static ConcreteAgent agent;
 
-struct SpiceStreamFormatMessage
+class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
 {
-    StreamDevHeader hdr;
-    StreamMsgFormat msg;
+public:
+    FormatMessage(unsigned w, unsigned h, uint8_t c) {}
+
+    static size_t size()
+    {
+        return sizeof(PayloadType);
+    }
+
+    void write_message_body(StreamPort &stream_port, unsigned w, unsigned h, uint8_t c)
+    {
+        StreamMsgFormat msg{};
+        msg.width = w;
+        msg.height = h;
+        msg.codec = c;
+
+        stream_port.write(&msg, sizeof(msg));
+    }
 };
 
-struct SpiceStreamDataMessage
+class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage, STREAM_TYPE_DATA>
 {
-    StreamDevHeader hdr;
-    StreamMsgData msg;
+public:
+    FrameMessage(const void *frame, size_t length) : OutboundMessage(length) {}
+
+    static size_t size(size_t length)
+    {
+        return sizeof(PayloadType) + length;
+    }
+
+    void write_message_body(StreamPort &stream_port, const void *frame, size_t length)
+    {
+        stream_port.write(frame, length);
+    }
+};
+
+class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities, CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES>
+{
+public:
+    static size_t size()
+    {
+        return sizeof(PayloadType);
+    }
+
+    void write_message_body(StreamPort &stream_port)
+    {
+        // No body for capabilities message
+    }
 };
 
 static bool streaming_requested = false;
@@ -83,15 +122,7 @@ static void read_command_from_device(StreamPort &stream_port)
 
     switch (in_message.header.type) {
     case STREAM_TYPE_CAPABILITIES: {
-        StreamDevHeader hdr = {
-            STREAM_DEVICE_PROTOCOL,
-            0,
-            STREAM_TYPE_CAPABILITIES,
-            0
-        };
-
-        std::lock_guard<std::mutex> guard(stream_port.mutex);
-        stream_port.write(&hdr, sizeof(hdr));
+        stream_port.send<CapabilitiesOutMessage>();
         return;
     }
     case STREAM_TYPE_NOTIFY_ERROR: {
@@ -130,42 +161,6 @@ static void read_command(StreamPort &stream_port, bool blocking)
     }
 }
 
-static void spice_stream_send_format(StreamPort &stream_port, unsigned w, unsigned h, unsigned c)
-{
-
-    SpiceStreamFormatMessage msg;
-    const size_t msgsize = sizeof(msg);
-    const size_t hdrsize  = sizeof(msg.hdr);
-    memset(&msg, 0, msgsize);
-    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    msg.hdr.type = STREAM_TYPE_FORMAT;
-    msg.hdr.size = msgsize - hdrsize; /* includes only the body? */
-    msg.msg.width = w;
-    msg.msg.height = h;
-    msg.msg.codec = c;
-
-    syslog(LOG_DEBUG, "writing format");
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(&msg, msgsize);
-}
-
-static void spice_stream_send_frame(StreamPort &stream_port, const void *buf, const unsigned size)
-{
-    SpiceStreamDataMessage msg;
-    const size_t msgsize = sizeof(msg);
-
-    memset(&msg, 0, msgsize);
-    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    msg.hdr.type = STREAM_TYPE_DATA;
-    msg.hdr.size = size; /* includes only the body? */
-
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(&msg, msgsize);
-    stream_port.write(buf, size);
-
-    syslog(LOG_DEBUG, "Sent a frame of size %u", size);
-}
-
 static void handle_interrupt(int intr)
 {
     syslog(LOG_INFO, "Got signal %d, exiting", intr);
@@ -251,13 +246,13 @@ do_capture(StreamPort &stream_port, FrameLog &frame_log)
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u", width, height, codec);
                 frame_log.log_stat("Started new stream wXh %uX%u codec=%u", width, height, codec);
 
-                spice_stream_send_format(stream_port, width, height, codec);
+                stream_port.send<FormatMessage>(width, height, codec);
             }
             frame_log.log_stat("Frame of %zu bytes", frame.buffer_size);
             frame_log.log_frame(frame.buffer, frame.buffer_size);
 
             try {
-                spice_stream_send_frame(stream_port, frame.buffer, frame.buffer_size);
+                stream_port.send<FrameMessage>(frame.buffer, frame.buffer_size);
             } catch (const WriteError& e) {
                 syslog(e);
                 break;
diff --git a/src/stream-port.hpp b/src/stream-port.hpp
index 136ff25..cf010a4 100644
--- a/src/stream-port.hpp
+++ b/src/stream-port.hpp
@@ -60,12 +60,45 @@ public:
 
     InboundMessage receive();
 
+    template <typename Message, typename ...PayloadArgs>
+    void send(PayloadArgs&&... payload_args)
+    {
+        Message message(payload_args...);
+        std::lock_guard<std::mutex> stream_guard(mutex);
+        message.write_header(*this);
+        message.write_message_body(*this, payload_args...);
+    }
+
     void write(const void *buf, size_t len);
 
     const int fd;
+
+private:
     std::mutex mutex;
 };
 
+template <typename Payload, typename Message, unsigned Type>
+class OutboundMessage
+{
+public:
+    template <typename ...PayloadArgs>
+    OutboundMessage(PayloadArgs&&... payload_args)
+    {
+        hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
+        hdr.type = Type;
+        hdr.size = (uint32_t) Message::size(payload_args...);
+    }
+
+    void write_header(StreamPort &stream_port)
+    {
+        stream_port.write(&hdr, sizeof(hdr));
+    }
+
+protected:
+    StreamDevHeader hdr{};
+    using PayloadType = Payload;
+};
+
 void read_all(int fd, void *buf, size_t len);
 void write_all(int fd, const void *buf, size_t len);
 
-- 
2.19.1



More information about the Spice-devel mailing list