[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files

Christophe de Dinechin christophe at dinechin.org
Wed Feb 28 15:43:22 UTC 2018


From: Christophe de Dinechin <dinechin at redhat.com>

Doing this change will make it possible to move the capture loop to the
concrete-agent.cpp file.

Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
---
 include/spice-streaming-agent/errors.hpp |   2 +
 src/Makefile.am                          |   2 +
 src/message.hpp                          |  41 ++++++
 src/spice-streaming-agent.cpp            | 209 +------------------------------
 src/stream.cpp                           | 172 +++++++++++++++++++++++++
 src/stream.hpp                           |  55 ++++++++
 6 files changed, 276 insertions(+), 205 deletions(-)
 create mode 100644 src/message.hpp
 create mode 100644 src/stream.cpp
 create mode 100644 src/stream.hpp

diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
index 870a0fd..62ae010 100644
--- a/include/spice-streaming-agent/errors.hpp
+++ b/include/spice-streaming-agent/errors.hpp
@@ -90,4 +90,6 @@ protected:
 
 }} // namespace spice::streaming_agent
 
+extern bool quit_requested;
+
 #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
diff --git a/src/Makefile.am b/src/Makefile.am
index 2507844..923a103 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
 	mjpeg-fallback.hpp \
 	jpeg.cpp \
 	jpeg.hpp \
+	stream.cpp \
+	stream.hpp \
 	errors.cpp \
 	$(NULL)
diff --git a/src/message.hpp b/src/message.hpp
new file mode 100644
index 0000000..28b3e28
--- /dev/null
+++ b/src/message.hpp
@@ -0,0 +1,41 @@
+/* Formatting messages
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
+#define SPICE_STREAMING_AGENT_MESSAGE_HPP
+
+#include <spice/stream-device.h>
+
+namespace spice
+{
+namespace streaming_agent
+{
+
+template <typename Payload, typename Info, unsigned Type>
+class Message
+{
+public:
+    template <typename ...PayloadArgs>
+    Message(PayloadArgs... payload)
+        : hdr(StreamDevHeader {
+              .protocol_version = STREAM_DEVICE_PROTOCOL,
+              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
+              .type = Type,
+              .size = (uint32_t) Info::size(payload...)
+          })
+    { }
+    void write_header(Stream &stream)
+    {
+        stream.write_all("header", &hdr, sizeof(hdr));
+    }
+
+protected:
+    StreamDevHeader hdr;
+    typedef Payload payload_t;
+};
+
+}} // namespace spice::streaming_agent
+
+#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 35e65bb..c401a34 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -5,6 +5,8 @@
  */
 
 #include "concrete-agent.hpp"
+#include "stream.hpp"
+#include "message.hpp"
 #include "hexdump.h"
 #include "mjpeg-fallback.hpp"
 
@@ -21,11 +23,9 @@
 #include <inttypes.h>
 #include <string.h>
 #include <getopt.h>
-#include <unistd.h>
 #include <errno.h>
-#include <fcntl.h>
+#include <unistd.h>
 #include <sys/time.h>
-#include <poll.h>
 #include <syslog.h>
 #include <signal.h>
 #include <exception>
@@ -57,76 +57,6 @@ static uint64_t get_time(void)
 
 }
 
-class Stream
-{
-    typedef std::set<SpiceVideoCodecType> codecs_t;
-
-public:
-    Stream(const char *name)
-        : codecs()
-    {
-        streamfd = open(name, O_RDWR);
-        if (streamfd < 0) {
-            throw IOError("failed to open streaming device", errno);
-        }
-    }
-    ~Stream()
-    {
-        close(streamfd);
-    }
-
-    const codecs_t &client_codecs() { return codecs; }
-    bool streaming_requested() { return is_streaming; }
-
-    template <typename Message, typename ...PayloadArgs>
-    void send(PayloadArgs... payload)
-    {
-        Message message(payload...);
-        std::lock_guard<std::mutex> stream_guard(mutex);
-        message.write_header(*this);
-        message.write_message_body(*this, payload...);
-    }
-
-    int read_command(bool blocking);
-    void write_all(const char *operation, const void *buf, const size_t len);
-
-private:
-    int have_something_to_read(int timeout);
-    void handle_stream_start_stop(uint32_t len);
-    void handle_stream_capabilities(uint32_t len);
-    void handle_stream_error(uint32_t len);
-    void read_command_from_device(void);
-
-private:
-    std::mutex mutex;
-    codecs_t codecs;
-    int streamfd = -1;
-    bool is_streaming = false;
-};
-
-template <typename Payload, typename Info, unsigned Type>
-class Message
-{
-public:
-    template <typename ...PayloadArgs>
-    Message(PayloadArgs... payload)
-        : hdr(StreamDevHeader {
-              .protocol_version = STREAM_DEVICE_PROTOCOL,
-              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
-              .type = Type,
-              .size = (uint32_t) Info::size(payload...)
-          })
-    { }
-    void write_header(Stream &stream)
-    {
-        stream.write_all("header", &hdr, sizeof(hdr));
-    }
-
-protected:
-    StreamDevHeader hdr;
-    typedef Payload payload_t;
-};
-
 class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
 {
 public:
@@ -156,20 +86,6 @@ public:
     }
 };
 
-class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
-{
-public:
-    CapabilitiesMessage() : Message() {}
-    static size_t size()
-    {
-        return sizeof(payload_t);
-    }
-    void write_message_body(Stream &stream)
-    {
-        /* No body for capabilities message */
-    }
-};
-
 class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
 {
 public:
@@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
 
 }} // namespace spice::streaming_agent
 
-static bool quit_requested = false;
-
-int Stream::have_something_to_read(int timeout)
-{
-    struct pollfd pollfd = {streamfd, POLLIN, 0};
-
-    if (poll(&pollfd, 1, timeout) < 0) {
-        syslog(LOG_ERR, "poll FAILED\n");
-        return -1;
-    }
-
-    if (pollfd.revents == POLLIN) {
-        return 1;
-    }
-
-    return 0;
-}
-
-void Stream::handle_stream_start_stop(uint32_t len)
-{
-    uint8_t msg[256];
-
-    if (len >= sizeof(msg)) {
-        throw MessageDataError("message is too long", len, sizeof(msg));
-    }
-    int n = read(streamfd, &msg, len);
-    if (n != (int) len) {
-        throw MessageDataError("read start/stop command from device failed", n, len, errno);
-    }
-    is_streaming = (msg[0] != 0); /* num_codecs */
-    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
-           is_streaming ? "START" : "STOP");
-    codecs.clear();
-    for (int i = 1; i <= msg[0]; ++i) {
-        codecs.insert((SpiceVideoCodecType) msg[i]);
-    }
-}
-
-void Stream::handle_stream_capabilities(uint32_t len)
-{
-    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
-
-    if (len > sizeof(caps)) {
-        throw MessageDataError("capability message too long", len, sizeof(caps));
-    }
-    int n = read(streamfd, caps, len);
-    if (n != (int) len) {
-        throw MessageDataError("read capabilities from device failed", n, len, errno);
-    }
-
-    // we currently do not support extensions so just reply so
-    send<CapabilitiesMessage>();
-}
-
-void Stream::handle_stream_error(uint32_t len)
-{
-    // TODO read message and use it
-    throw ProtocolError("got an error message from server");
-}
-
-void Stream::read_command_from_device()
-{
-    StreamDevHeader hdr;
-    int n;
-
-    std::lock_guard<std::mutex> stream_guard(mutex);
-    n = read(streamfd, &hdr, sizeof(hdr));
-    if (n != sizeof(hdr)) {
-        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
-    }
-    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
-        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
-    }
-
-    switch (hdr.type) {
-    case STREAM_TYPE_CAPABILITIES:
-        return handle_stream_capabilities(hdr.size);
-    case STREAM_TYPE_NOTIFY_ERROR:
-        return handle_stream_error(hdr.size);
-    case STREAM_TYPE_START_STOP:
-        return handle_stream_start_stop(hdr.size);
-    }
-    throw MessageDataError("unknown message type", hdr.type, 0);
-}
-
-int Stream::read_command(bool blocking)
-{
-    int timeout = blocking?-1:0;
-    while (!quit_requested) {
-        if (!have_something_to_read(timeout)) {
-            if (!blocking) {
-                return 0;
-            }
-            sleep(1);
-            continue;
-        }
-        read_command_from_device();
-        break;
-    }
-
-    return 1;
-}
-
-void Stream::write_all(const char *operation, const void *buf, const size_t len)
-{
-    size_t written = 0;
-    while (written < len) {
-        int l = write(streamfd, (const char *) buf + written, len - written);
-        if (l < 0) {
-            if (errno == EINTR) {
-                continue;
-            }
-            throw WriteError("write failed", operation, errno).syslog();
-        }
-        written += l;
-    }
-    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
-}
+bool quit_requested = false;
 
 static void handle_interrupt(int intr)
 {
diff --git a/src/stream.cpp b/src/stream.cpp
new file mode 100644
index 0000000..f756097
--- /dev/null
+++ b/src/stream.cpp
@@ -0,0 +1,172 @@
+/* Encapsulation of the stream used to communicate between agent and server
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+
+#include "stream.hpp"
+#include "message.hpp"
+
+#include <spice/stream-device.h>
+
+#include <spice-streaming-agent/errors.hpp>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <syslog.h>
+#include <unistd.h>
+
+namespace spice
+{
+namespace streaming_agent
+{
+
+class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
+                                           STREAM_TYPE_CAPABILITIES>
+{
+public:
+    CapabilitiesMessage() : Message() {}
+    static size_t size()
+    {
+        return sizeof(payload_t);
+    }
+    void write_message_body(Stream &stream)
+    {
+        /* No body for capabilities message */
+    }
+};
+
+Stream::Stream(const char *name)
+    : codecs()
+{
+    streamfd = open(name, O_RDWR);
+    if (streamfd < 0) {
+        throw IOError("failed to open streaming device", errno);
+    }
+}
+
+Stream::~Stream()
+{
+    close(streamfd);
+}
+
+int Stream::have_something_to_read(int timeout)
+{
+    struct pollfd pollfd = {streamfd, POLLIN, 0};
+
+    if (poll(&pollfd, 1, timeout) < 0) {
+        syslog(LOG_ERR, "poll FAILED\n");
+        return -1;
+    }
+
+    if (pollfd.revents == POLLIN) {
+        return 1;
+    }
+
+    return 0;
+}
+
+void Stream::handle_stream_start_stop(uint32_t len)
+{
+    uint8_t msg[256];
+
+    if (len >= sizeof(msg)) {
+        throw MessageDataError("message is too long", len, sizeof(msg));
+    }
+    int n = read(streamfd, &msg, len);
+    if (n != (int) len) {
+        throw MessageDataError("read start/stop command from device failed", n, len, errno);
+    }
+    is_streaming = (msg[0] != 0); /* num_codecs */
+    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
+           is_streaming ? "START" : "STOP");
+    codecs.clear();
+    for (int i = 1; i <= msg[0]; ++i) {
+        codecs.insert((SpiceVideoCodecType) msg[i]);
+    }
+}
+
+void Stream::handle_stream_capabilities(uint32_t len)
+{
+    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
+
+    if (len > sizeof(caps)) {
+        throw MessageDataError("capability message too long", len, sizeof(caps));
+    }
+    int n = read(streamfd, caps, len);
+    if (n != (int) len) {
+        throw MessageDataError("read capabilities from device failed", n, len, errno);
+    }
+
+    // we currently do not support extensions so just reply so
+    send<CapabilitiesMessage>();
+}
+
+void Stream::handle_stream_error(uint32_t len)
+{
+    // TODO read message and use it
+    throw ProtocolError("got an error message from server");
+}
+
+void Stream::read_command_from_device()
+{
+    StreamDevHeader hdr;
+    int n;
+
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    n = read(streamfd, &hdr, sizeof(hdr));
+    if (n != sizeof(hdr)) {
+        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
+    }
+    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
+        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
+    }
+
+    switch (hdr.type) {
+    case STREAM_TYPE_CAPABILITIES:
+        return handle_stream_capabilities(hdr.size);
+    case STREAM_TYPE_NOTIFY_ERROR:
+        return handle_stream_error(hdr.size);
+    case STREAM_TYPE_START_STOP:
+        return handle_stream_start_stop(hdr.size);
+    }
+    throw MessageDataError("unknown message type", hdr.type, 0);
+}
+
+int Stream::read_command(bool blocking)
+{
+    int timeout = blocking?-1:0;
+    while (!quit_requested) {
+        if (!have_something_to_read(timeout)) {
+            if (!blocking) {
+                return 0;
+            }
+            sleep(1);
+            continue;
+        }
+        read_command_from_device();
+        break;
+    }
+
+    return 1;
+}
+
+void Stream::write_all(const char *operation, const void *buf, const size_t len)
+{
+    size_t written = 0;
+    while (written < len) {
+        int l = write(streamfd, (const char *) buf + written, len - written);
+        if (l < 0) {
+            if (errno == EINTR) {
+                continue;
+            }
+            throw WriteError("write failed", operation, errno).syslog();
+        }
+        written += l;
+    }
+    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
+}
+
+}} // namespace spice::streaming_agent
diff --git a/src/stream.hpp b/src/stream.hpp
new file mode 100644
index 0000000..b689f36
--- /dev/null
+++ b/src/stream.hpp
@@ -0,0 +1,55 @@
+/* Encapsulation of the stream used to communicate between agent and server
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
+#define SPICE_STREAMING_AGENT_STREAM_HPP
+
+#include <spice/enums.h>
+#include <set>
+#include <mutex>
+
+namespace spice {
+namespace streaming_agent {
+
+class Stream
+{
+    typedef std::set<SpiceVideoCodecType> codecs_t;
+
+public:
+    Stream(const char *name);
+    ~Stream();
+
+    const codecs_t &client_codecs() { return codecs; }
+    bool streaming_requested() { return is_streaming; }
+
+    template <typename Message, typename ...PayloadArgs>
+    void send(PayloadArgs... payload)
+    {
+        Message message(payload...);
+        std::lock_guard<std::mutex> stream_guard(mutex);
+        message.write_header(*this);
+        message.write_message_body(*this, payload...);
+    }
+
+    int read_command(bool blocking);
+    void write_all(const char *operation, const void *buf, const size_t len);
+
+private:
+    int have_something_to_read(int timeout);
+    void handle_stream_start_stop(uint32_t len);
+    void handle_stream_capabilities(uint32_t len);
+    void handle_stream_error(uint32_t len);
+    void read_command_from_device(void);
+
+private:
+    std::mutex mutex;
+    codecs_t codecs;
+    int streamfd = -1;
+    bool is_streaming = false;
+};
+
+}} // namespace spice::streaming_agent
+
+#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
-- 
2.13.5 (Apple Git-94)



More information about the Spice-devel mailing list