[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files
Christophe de Dinechin
christophe at dinechin.org
Wed Feb 28 15:43:22 UTC 2018
From: Christophe de Dinechin <dinechin at redhat.com>
Doing this change will make it possible to move the capture loop to the
concrete-agent.cpp file.
Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
---
include/spice-streaming-agent/errors.hpp | 2 +
src/Makefile.am | 2 +
src/message.hpp | 41 ++++++
src/spice-streaming-agent.cpp | 209 +------------------------------
src/stream.cpp | 172 +++++++++++++++++++++++++
src/stream.hpp | 55 ++++++++
6 files changed, 276 insertions(+), 205 deletions(-)
create mode 100644 src/message.hpp
create mode 100644 src/stream.cpp
create mode 100644 src/stream.hpp
diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
index 870a0fd..62ae010 100644
--- a/include/spice-streaming-agent/errors.hpp
+++ b/include/spice-streaming-agent/errors.hpp
@@ -90,4 +90,6 @@ protected:
}} // namespace spice::streaming_agent
+extern bool quit_requested;
+
#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
diff --git a/src/Makefile.am b/src/Makefile.am
index 2507844..923a103 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
mjpeg-fallback.hpp \
jpeg.cpp \
jpeg.hpp \
+ stream.cpp \
+ stream.hpp \
errors.cpp \
$(NULL)
diff --git a/src/message.hpp b/src/message.hpp
new file mode 100644
index 0000000..28b3e28
--- /dev/null
+++ b/src/message.hpp
@@ -0,0 +1,41 @@
+/* Formatting messages
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
+#define SPICE_STREAMING_AGENT_MESSAGE_HPP
+
+#include <spice/stream-device.h>
+
+namespace spice
+{
+namespace streaming_agent
+{
+
+template <typename Payload, typename Info, unsigned Type>
+class Message
+{
+public:
+ template <typename ...PayloadArgs>
+ Message(PayloadArgs... payload)
+ : hdr(StreamDevHeader {
+ .protocol_version = STREAM_DEVICE_PROTOCOL,
+ .padding = 0, // Workaround GCC bug "sorry: not implemented"
+ .type = Type,
+ .size = (uint32_t) Info::size(payload...)
+ })
+ { }
+ void write_header(Stream &stream)
+ {
+ stream.write_all("header", &hdr, sizeof(hdr));
+ }
+
+protected:
+ StreamDevHeader hdr;
+ typedef Payload payload_t;
+};
+
+}} // namespace spice::streaming_agent
+
+#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 35e65bb..c401a34 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -5,6 +5,8 @@
*/
#include "concrete-agent.hpp"
+#include "stream.hpp"
+#include "message.hpp"
#include "hexdump.h"
#include "mjpeg-fallback.hpp"
@@ -21,11 +23,9 @@
#include <inttypes.h>
#include <string.h>
#include <getopt.h>
-#include <unistd.h>
#include <errno.h>
-#include <fcntl.h>
+#include <unistd.h>
#include <sys/time.h>
-#include <poll.h>
#include <syslog.h>
#include <signal.h>
#include <exception>
@@ -57,76 +57,6 @@ static uint64_t get_time(void)
}
-class Stream
-{
- typedef std::set<SpiceVideoCodecType> codecs_t;
-
-public:
- Stream(const char *name)
- : codecs()
- {
- streamfd = open(name, O_RDWR);
- if (streamfd < 0) {
- throw IOError("failed to open streaming device", errno);
- }
- }
- ~Stream()
- {
- close(streamfd);
- }
-
- const codecs_t &client_codecs() { return codecs; }
- bool streaming_requested() { return is_streaming; }
-
- template <typename Message, typename ...PayloadArgs>
- void send(PayloadArgs... payload)
- {
- Message message(payload...);
- std::lock_guard<std::mutex> stream_guard(mutex);
- message.write_header(*this);
- message.write_message_body(*this, payload...);
- }
-
- int read_command(bool blocking);
- void write_all(const char *operation, const void *buf, const size_t len);
-
-private:
- int have_something_to_read(int timeout);
- void handle_stream_start_stop(uint32_t len);
- void handle_stream_capabilities(uint32_t len);
- void handle_stream_error(uint32_t len);
- void read_command_from_device(void);
-
-private:
- std::mutex mutex;
- codecs_t codecs;
- int streamfd = -1;
- bool is_streaming = false;
-};
-
-template <typename Payload, typename Info, unsigned Type>
-class Message
-{
-public:
- template <typename ...PayloadArgs>
- Message(PayloadArgs... payload)
- : hdr(StreamDevHeader {
- .protocol_version = STREAM_DEVICE_PROTOCOL,
- .padding = 0, // Workaround GCC bug "sorry: not implemented"
- .type = Type,
- .size = (uint32_t) Info::size(payload...)
- })
- { }
- void write_header(Stream &stream)
- {
- stream.write_all("header", &hdr, sizeof(hdr));
- }
-
-protected:
- StreamDevHeader hdr;
- typedef Payload payload_t;
-};
-
class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
{
public:
@@ -156,20 +86,6 @@ public:
}
};
-class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
-{
-public:
- CapabilitiesMessage() : Message() {}
- static size_t size()
- {
- return sizeof(payload_t);
- }
- void write_message_body(Stream &stream)
- {
- /* No body for capabilities message */
- }
-};
-
class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
{
public:
@@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
}} // namespace spice::streaming_agent
-static bool quit_requested = false;
-
-int Stream::have_something_to_read(int timeout)
-{
- struct pollfd pollfd = {streamfd, POLLIN, 0};
-
- if (poll(&pollfd, 1, timeout) < 0) {
- syslog(LOG_ERR, "poll FAILED\n");
- return -1;
- }
-
- if (pollfd.revents == POLLIN) {
- return 1;
- }
-
- return 0;
-}
-
-void Stream::handle_stream_start_stop(uint32_t len)
-{
- uint8_t msg[256];
-
- if (len >= sizeof(msg)) {
- throw MessageDataError("message is too long", len, sizeof(msg));
- }
- int n = read(streamfd, &msg, len);
- if (n != (int) len) {
- throw MessageDataError("read start/stop command from device failed", n, len, errno);
- }
- is_streaming = (msg[0] != 0); /* num_codecs */
- syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
- is_streaming ? "START" : "STOP");
- codecs.clear();
- for (int i = 1; i <= msg[0]; ++i) {
- codecs.insert((SpiceVideoCodecType) msg[i]);
- }
-}
-
-void Stream::handle_stream_capabilities(uint32_t len)
-{
- uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
-
- if (len > sizeof(caps)) {
- throw MessageDataError("capability message too long", len, sizeof(caps));
- }
- int n = read(streamfd, caps, len);
- if (n != (int) len) {
- throw MessageDataError("read capabilities from device failed", n, len, errno);
- }
-
- // we currently do not support extensions so just reply so
- send<CapabilitiesMessage>();
-}
-
-void Stream::handle_stream_error(uint32_t len)
-{
- // TODO read message and use it
- throw ProtocolError("got an error message from server");
-}
-
-void Stream::read_command_from_device()
-{
- StreamDevHeader hdr;
- int n;
-
- std::lock_guard<std::mutex> stream_guard(mutex);
- n = read(streamfd, &hdr, sizeof(hdr));
- if (n != sizeof(hdr)) {
- throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
- }
- if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
- throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
- }
-
- switch (hdr.type) {
- case STREAM_TYPE_CAPABILITIES:
- return handle_stream_capabilities(hdr.size);
- case STREAM_TYPE_NOTIFY_ERROR:
- return handle_stream_error(hdr.size);
- case STREAM_TYPE_START_STOP:
- return handle_stream_start_stop(hdr.size);
- }
- throw MessageDataError("unknown message type", hdr.type, 0);
-}
-
-int Stream::read_command(bool blocking)
-{
- int timeout = blocking?-1:0;
- while (!quit_requested) {
- if (!have_something_to_read(timeout)) {
- if (!blocking) {
- return 0;
- }
- sleep(1);
- continue;
- }
- read_command_from_device();
- break;
- }
-
- return 1;
-}
-
-void Stream::write_all(const char *operation, const void *buf, const size_t len)
-{
- size_t written = 0;
- while (written < len) {
- int l = write(streamfd, (const char *) buf + written, len - written);
- if (l < 0) {
- if (errno == EINTR) {
- continue;
- }
- throw WriteError("write failed", operation, errno).syslog();
- }
- written += l;
- }
- syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
-}
+bool quit_requested = false;
static void handle_interrupt(int intr)
{
diff --git a/src/stream.cpp b/src/stream.cpp
new file mode 100644
index 0000000..f756097
--- /dev/null
+++ b/src/stream.cpp
@@ -0,0 +1,172 @@
+/* Encapsulation of the stream used to communicate between agent and server
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+
+#include "stream.hpp"
+#include "message.hpp"
+
+#include <spice/stream-device.h>
+
+#include <spice-streaming-agent/errors.hpp>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <syslog.h>
+#include <unistd.h>
+
+namespace spice
+{
+namespace streaming_agent
+{
+
+class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
+ STREAM_TYPE_CAPABILITIES>
+{
+public:
+ CapabilitiesMessage() : Message() {}
+ static size_t size()
+ {
+ return sizeof(payload_t);
+ }
+ void write_message_body(Stream &stream)
+ {
+ /* No body for capabilities message */
+ }
+};
+
+Stream::Stream(const char *name)
+ : codecs()
+{
+ streamfd = open(name, O_RDWR);
+ if (streamfd < 0) {
+ throw IOError("failed to open streaming device", errno);
+ }
+}
+
+Stream::~Stream()
+{
+ close(streamfd);
+}
+
+int Stream::have_something_to_read(int timeout)
+{
+ struct pollfd pollfd = {streamfd, POLLIN, 0};
+
+ if (poll(&pollfd, 1, timeout) < 0) {
+ syslog(LOG_ERR, "poll FAILED\n");
+ return -1;
+ }
+
+ if (pollfd.revents == POLLIN) {
+ return 1;
+ }
+
+ return 0;
+}
+
+void Stream::handle_stream_start_stop(uint32_t len)
+{
+ uint8_t msg[256];
+
+ if (len >= sizeof(msg)) {
+ throw MessageDataError("message is too long", len, sizeof(msg));
+ }
+ int n = read(streamfd, &msg, len);
+ if (n != (int) len) {
+ throw MessageDataError("read start/stop command from device failed", n, len, errno);
+ }
+ is_streaming = (msg[0] != 0); /* num_codecs */
+ syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
+ is_streaming ? "START" : "STOP");
+ codecs.clear();
+ for (int i = 1; i <= msg[0]; ++i) {
+ codecs.insert((SpiceVideoCodecType) msg[i]);
+ }
+}
+
+void Stream::handle_stream_capabilities(uint32_t len)
+{
+ uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
+
+ if (len > sizeof(caps)) {
+ throw MessageDataError("capability message too long", len, sizeof(caps));
+ }
+ int n = read(streamfd, caps, len);
+ if (n != (int) len) {
+ throw MessageDataError("read capabilities from device failed", n, len, errno);
+ }
+
+ // we currently do not support extensions so just reply so
+ send<CapabilitiesMessage>();
+}
+
+void Stream::handle_stream_error(uint32_t len)
+{
+ // TODO read message and use it
+ throw ProtocolError("got an error message from server");
+}
+
+void Stream::read_command_from_device()
+{
+ StreamDevHeader hdr;
+ int n;
+
+ std::lock_guard<std::mutex> stream_guard(mutex);
+ n = read(streamfd, &hdr, sizeof(hdr));
+ if (n != sizeof(hdr)) {
+ throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
+ }
+ if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
+ throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
+ }
+
+ switch (hdr.type) {
+ case STREAM_TYPE_CAPABILITIES:
+ return handle_stream_capabilities(hdr.size);
+ case STREAM_TYPE_NOTIFY_ERROR:
+ return handle_stream_error(hdr.size);
+ case STREAM_TYPE_START_STOP:
+ return handle_stream_start_stop(hdr.size);
+ }
+ throw MessageDataError("unknown message type", hdr.type, 0);
+}
+
+int Stream::read_command(bool blocking)
+{
+ int timeout = blocking?-1:0;
+ while (!quit_requested) {
+ if (!have_something_to_read(timeout)) {
+ if (!blocking) {
+ return 0;
+ }
+ sleep(1);
+ continue;
+ }
+ read_command_from_device();
+ break;
+ }
+
+ return 1;
+}
+
+void Stream::write_all(const char *operation, const void *buf, const size_t len)
+{
+ size_t written = 0;
+ while (written < len) {
+ int l = write(streamfd, (const char *) buf + written, len - written);
+ if (l < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ throw WriteError("write failed", operation, errno).syslog();
+ }
+ written += l;
+ }
+ syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
+}
+
+}} // namespace spice::streaming_agent
diff --git a/src/stream.hpp b/src/stream.hpp
new file mode 100644
index 0000000..b689f36
--- /dev/null
+++ b/src/stream.hpp
@@ -0,0 +1,55 @@
+/* Encapsulation of the stream used to communicate between agent and server
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
+#define SPICE_STREAMING_AGENT_STREAM_HPP
+
+#include <spice/enums.h>
+#include <set>
+#include <mutex>
+
+namespace spice {
+namespace streaming_agent {
+
+class Stream
+{
+ typedef std::set<SpiceVideoCodecType> codecs_t;
+
+public:
+ Stream(const char *name);
+ ~Stream();
+
+ const codecs_t &client_codecs() { return codecs; }
+ bool streaming_requested() { return is_streaming; }
+
+ template <typename Message, typename ...PayloadArgs>
+ void send(PayloadArgs... payload)
+ {
+ Message message(payload...);
+ std::lock_guard<std::mutex> stream_guard(mutex);
+ message.write_header(*this);
+ message.write_message_body(*this, payload...);
+ }
+
+ int read_command(bool blocking);
+ void write_all(const char *operation, const void *buf, const size_t len);
+
+private:
+ int have_something_to_read(int timeout);
+ void handle_stream_start_stop(uint32_t len);
+ void handle_stream_capabilities(uint32_t len);
+ void handle_stream_error(uint32_t len);
+ void read_command_from_device(void);
+
+private:
+ std::mutex mutex;
+ codecs_t codecs;
+ int streamfd = -1;
+ bool is_streaming = false;
+};
+
+}} // namespace spice::streaming_agent
+
+#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
--
2.13.5 (Apple Git-94)
More information about the Spice-devel
mailing list