[Spice-devel] [PATCH spice-streaming-agent v2 3/3] Introduce OutboundMessages for the StreamPort class

Lukáš Hrázký lhrazky at redhat.com
Fri Oct 12 10:04:21 UTC 2018


Heavily based on code by Christophe de Dinechin.

Wraps the serialization code in the OutboundMessage class and its
descendants for each specific message. Uses Cruiously Recurring Template
Pattern (CRTP) to avoid runtime overhead of polymorphism.

The messages are placed along with the code that sends them, this helps
to avoid header proliferation, e.g. the CursorMessage requires X11
headers for it's interface.

Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
---
 src/cursor-updater.cpp        | 117 +++++++++++++++++++---------------
 src/spice-streaming-agent.cpp | 103 +++++++++++++++---------------
 src/stream-port.hpp           |  33 ++++++++++
 3 files changed, 149 insertions(+), 104 deletions(-)

diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp
index 9f4e893..8e4952c 100644
--- a/src/cursor-updater.cpp
+++ b/src/cursor-updater.cpp
@@ -12,55 +12,65 @@
 #include <spice/stream-device.h>
 #include <spice/enums.h>
 
-#include <cstring>
-#include <functional>
 #include <memory>
+#include <unistd.h>
 #include <X11/extensions/Xfixes.h>
 
 
 namespace spice {
 namespace streaming_agent {
 
-namespace {
+class CursorError : public Error
+{
+    using Error::Error;
+};
 
-void send_cursor(StreamPort &stream_port, XFixesCursorImage *cursor)
+class CursorMessage : public OutboundMessage<StreamMsgCursorSet, CursorMessage, STREAM_TYPE_CURSOR_SET>
 {
-    if (cursor->width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
-        cursor->height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
+public:
+    CursorMessage(XFixesCursorImage *cursor) : OutboundMessage(cursor) {}
+    static size_t size(XFixesCursorImage *cursor)
     {
-        return;
+        return sizeof(PayloadType) + sizeof(uint32_t) * pixel_count(cursor);
     }
 
-    size_t cursor_size =
-        sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) +
-        cursor->width * cursor->height * sizeof(uint32_t);
-    std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]);
-
-    StreamDevHeader &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get()));
-    memset(&dev_hdr, 0, sizeof(dev_hdr));
-    dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    dev_hdr.type = STREAM_TYPE_CURSOR_SET;
-    dev_hdr.size = cursor_size - sizeof(StreamDevHeader);
-
-    StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet *>(msg.get() + sizeof(StreamDevHeader)));
-    memset(&cursor_msg, 0, sizeof(cursor_msg));
-
-    cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA;
-    cursor_msg.width = cursor->width;
-    cursor_msg.height = cursor->height;
-    cursor_msg.hot_spot_x = cursor->xhot;
-    cursor_msg.hot_spot_y = cursor->yhot;
-
-    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
-    for (unsigned i = 0; i < cursor->width * cursor->height; ++i) {
-        pixels[i] = cursor->pixels[i];
+    void write_message_body(StreamPort &stream_port, XFixesCursorImage *cursor)
+    {
+        StreamMsgCursorSet msg{};
+        msg.width = cursor->width;
+        msg.height = cursor->height;
+        msg.hot_spot_x = cursor->xhot;
+        msg.hot_spot_y = cursor->yhot;
+        msg.type = SPICE_CURSOR_TYPE_ALPHA;
+
+        size_t pixcount = pixel_count(cursor);
+        size_t pixsize = pixcount * sizeof(uint32_t);
+        std::unique_ptr<uint32_t[]> pixels(new uint32_t[pixcount]);
+
+        for (size_t i = 0; i < pixcount; ++i) {
+            pixels[i] = cursor->pixels[i];
+        }
+
+        stream_port.write(&msg, sizeof(msg));
+        stream_port.write(pixels.get(), pixsize);
     }
 
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(msg.get(), cursor_size);
-}
+private:
+    static size_t pixel_count(XFixesCursorImage *cursor)
+    {
+        if (cursor->width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) {
+            throw CursorError("Cursor width " + std::to_string(cursor->width) +
+                " too big (limit is " + std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")");
+        }
+
+        if (cursor->height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
+            throw CursorError("Cursor height " + std::to_string(cursor->height) +
+                " too big (limit is " + std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")");
+        }
 
-} // namespace
+        return cursor->width * cursor->height;
+    }
+};
 
 CursorUpdater::CursorUpdater(StreamPort *stream_port) : stream_port(stream_port)
 {
@@ -82,24 +92,29 @@ void CursorUpdater::operator()()
     unsigned long last_serial = 0;
 
     while (1) {
-        XEvent event;
-        XNextEvent(display, &event);
-        if (event.type != xfixes_event_base + 1) {
-            continue;
-        }
-
-        XFixesCursorImage *cursor = XFixesGetCursorImage(display);
-        if (!cursor) {
-            continue;
+        try {
+            XEvent event;
+            XNextEvent(display, &event);
+            if (event.type != xfixes_event_base + 1) {
+                continue;
+            }
+
+            XFixesCursorImage *cursor = XFixesGetCursorImage(display);
+            if (!cursor) {
+                continue;
+            }
+
+            if (cursor->cursor_serial == last_serial) {
+                continue;
+            }
+
+            last_serial = cursor->cursor_serial;
+
+            stream_port->send<CursorMessage>(cursor);
+        } catch (const std::exception &e) {
+            ::syslog(LOG_ERR, "Error in cursor updater thread: %s", e.what());
+            sleep(1); // rate-limit the error
         }
-
-        if (cursor->cursor_serial == last_serial) {
-            continue;
-        }
-
-        last_serial = cursor->cursor_serial;
-
-        send_cursor(*stream_port, cursor);
     }
 }
 
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index b6e77de..2d318d2 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -41,16 +41,57 @@ using namespace spice::streaming_agent;
 
 static ConcreteAgent agent;
 
-struct SpiceStreamFormatMessage
+class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
 {
-    StreamDevHeader hdr;
-    StreamMsgFormat msg;
+public:
+    FormatMessage(unsigned w, unsigned h, uint8_t c) : OutboundMessage(w, h, c) {}
+
+    static size_t size(unsigned width, unsigned height, uint8_t codec)
+    {
+        return sizeof(PayloadType);
+    }
+
+    void write_message_body(StreamPort &stream_port, unsigned w, unsigned h, uint8_t c)
+    {
+        StreamMsgFormat msg{};
+        msg.width = w;
+        msg.height = h;
+        msg.codec = c;
+
+        stream_port.write(&msg, sizeof(msg));
+    }
 };
 
-struct SpiceStreamDataMessage
+class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage, STREAM_TYPE_DATA>
 {
-    StreamDevHeader hdr;
-    StreamMsgData msg;
+public:
+    FrameMessage(const void *frame, size_t length) : OutboundMessage(frame, length) {}
+
+    static size_t size(const void *frame, size_t length)
+    {
+        return sizeof(PayloadType) + length;
+    }
+
+    void write_message_body(StreamPort &stream_port, const void *frame, size_t length)
+    {
+        stream_port.write(frame, length);
+    }
+};
+
+class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities, CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES>
+{
+public:
+    CapabilitiesOutMessage() : OutboundMessage() {}
+
+    static size_t size()
+    {
+        return sizeof(PayloadType);
+    }
+
+    void write_message_body(StreamPort &stream_port)
+    {
+        // No body for capabilities message
+    }
 };
 
 static bool streaming_requested = false;
@@ -83,15 +124,7 @@ static void read_command_from_device(StreamPort &stream_port)
 
     switch (in_message.header.type) {
     case STREAM_TYPE_CAPABILITIES: {
-        StreamDevHeader hdr = {
-            STREAM_DEVICE_PROTOCOL,
-            0,
-            STREAM_TYPE_CAPABILITIES,
-            0
-        };
-
-        std::lock_guard<std::mutex> guard(stream_port.mutex);
-        stream_port.write(&hdr, sizeof(hdr));
+        stream_port.send<CapabilitiesOutMessage>();
         return;
     }
     case STREAM_TYPE_NOTIFY_ERROR: {
@@ -130,42 +163,6 @@ static void read_command(StreamPort &stream_port, bool blocking)
     }
 }
 
-static void spice_stream_send_format(StreamPort &stream_port, unsigned w, unsigned h, unsigned c)
-{
-
-    SpiceStreamFormatMessage msg;
-    const size_t msgsize = sizeof(msg);
-    const size_t hdrsize  = sizeof(msg.hdr);
-    memset(&msg, 0, msgsize);
-    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    msg.hdr.type = STREAM_TYPE_FORMAT;
-    msg.hdr.size = msgsize - hdrsize; /* includes only the body? */
-    msg.msg.width = w;
-    msg.msg.height = h;
-    msg.msg.codec = c;
-
-    syslog(LOG_DEBUG, "writing format");
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(&msg, msgsize);
-}
-
-static void spice_stream_send_frame(StreamPort &stream_port, const void *buf, const unsigned size)
-{
-    SpiceStreamDataMessage msg;
-    const size_t msgsize = sizeof(msg);
-
-    memset(&msg, 0, msgsize);
-    msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
-    msg.hdr.type = STREAM_TYPE_DATA;
-    msg.hdr.size = size; /* includes only the body? */
-
-    std::lock_guard<std::mutex> guard(stream_port.mutex);
-    stream_port.write(&msg, msgsize);
-    stream_port.write(buf, size);
-
-    syslog(LOG_DEBUG, "Sent a frame of size %u", size);
-}
-
 static void handle_interrupt(int intr)
 {
     syslog(LOG_INFO, "Got signal %d, exiting", intr);
@@ -251,13 +248,13 @@ do_capture(StreamPort &stream_port, FrameLog &frame_log)
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u", width, height, codec);
                 frame_log.log_stat("Started new stream wXh %uX%u codec=%u", width, height, codec);
 
-                spice_stream_send_format(stream_port, width, height, codec);
+                stream_port.send<FormatMessage>(width, height, codec);
             }
             frame_log.log_stat("Frame of %zu bytes", frame.buffer_size);
             frame_log.log_frame(frame.buffer, frame.buffer_size);
 
             try {
-                spice_stream_send_frame(stream_port, frame.buffer, frame.buffer_size);
+                stream_port.send<FrameMessage>(frame.buffer, frame.buffer_size);
             } catch (const WriteError& e) {
                 syslog(e);
                 break;
diff --git a/src/stream-port.hpp b/src/stream-port.hpp
index 136ff25..41ac04b 100644
--- a/src/stream-port.hpp
+++ b/src/stream-port.hpp
@@ -60,12 +60,45 @@ public:
 
     InboundMessage receive();
 
+    template <typename Message, typename ...PayloadArgs>
+    void send(PayloadArgs... payload_args)
+    {
+        Message message(payload_args...);
+        std::lock_guard<std::mutex> stream_guard(mutex);
+        message.write_header(*this);
+        message.write_message_body(*this, payload_args...);
+    }
+
     void write(const void *buf, size_t len);
 
     const int fd;
+
+private:
     std::mutex mutex;
 };
 
+template <typename Payload, typename Message, unsigned Type>
+class OutboundMessage
+{
+public:
+    template <typename ...PayloadArgs>
+    OutboundMessage(PayloadArgs... payload_args)
+    {
+        hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
+        hdr.type = Type;
+        hdr.size = (uint32_t) Message::size(payload_args...);
+    }
+
+    void write_header(StreamPort &stream_port)
+    {
+        stream_port.write(&hdr, sizeof(hdr));
+    }
+
+protected:
+    StreamDevHeader hdr{};
+    using PayloadType = Payload;
+};
+
 void read_all(int fd, void *buf, size_t len);
 void write_all(int fd, const void *buf, size_t len);
 
-- 
2.19.1



More information about the Spice-devel mailing list