[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files
Lukáš Hrázký
lhrazky at redhat.com
Thu Mar 1 15:11:57 UTC 2018
On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> From: Christophe de Dinechin <dinechin at redhat.com>
>
> Doing this change will make it possible to move the capture loop to the
> concrete-agent.cpp file.
>
> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> ---
> include/spice-streaming-agent/errors.hpp | 2 +
> src/Makefile.am | 2 +
> src/message.hpp | 41 ++++++
> src/spice-streaming-agent.cpp | 209 +------------------------------
> src/stream.cpp | 172 +++++++++++++++++++++++++
> src/stream.hpp | 55 ++++++++
> 6 files changed, 276 insertions(+), 205 deletions(-)
> create mode 100644 src/message.hpp
> create mode 100644 src/stream.cpp
> create mode 100644 src/stream.hpp
>
> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> index 870a0fd..62ae010 100644
> --- a/include/spice-streaming-agent/errors.hpp
> +++ b/include/spice-streaming-agent/errors.hpp
> @@ -90,4 +90,6 @@ protected:
>
> }} // namespace spice::streaming_agent
>
> +extern bool quit_requested;
Putting quit_requested into errors.hpp? Why?
> +
> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 2507844..923a103 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> mjpeg-fallback.hpp \
> jpeg.cpp \
> jpeg.hpp \
> + stream.cpp \
> + stream.hpp \
> errors.cpp \
> $(NULL)
> diff --git a/src/message.hpp b/src/message.hpp
> new file mode 100644
> index 0000000..28b3e28
> --- /dev/null
> +++ b/src/message.hpp
> @@ -0,0 +1,41 @@
> +/* Formatting messages
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> +
> +#include <spice/stream-device.h>
> +
> +namespace spice
> +{
> +namespace streaming_agent
> +{
> +
> +template <typename Payload, typename Info, unsigned Type>
> +class Message
> +{
> +public:
> + template <typename ...PayloadArgs>
> + Message(PayloadArgs... payload)
> + : hdr(StreamDevHeader {
> + .protocol_version = STREAM_DEVICE_PROTOCOL,
> + .padding = 0, // Workaround GCC bug "sorry: not implemented"
> + .type = Type,
> + .size = (uint32_t) Info::size(payload...)
> + })
> + { }
> + void write_header(Stream &stream)
> + {
> + stream.write_all("header", &hdr, sizeof(hdr));
> + }
> +
> +protected:
> + StreamDevHeader hdr;
> + typedef Payload payload_t;
> +};
> +
> +}} // namespace spice::streaming_agent
> +
> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 35e65bb..c401a34 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -5,6 +5,8 @@
> */
>
> #include "concrete-agent.hpp"
> +#include "stream.hpp"
> +#include "message.hpp"
> #include "hexdump.h"
> #include "mjpeg-fallback.hpp"
>
> @@ -21,11 +23,9 @@
> #include <inttypes.h>
> #include <string.h>
> #include <getopt.h>
> -#include <unistd.h>
> #include <errno.h>
> -#include <fcntl.h>
> +#include <unistd.h>
> #include <sys/time.h>
> -#include <poll.h>
> #include <syslog.h>
> #include <signal.h>
> #include <exception>
> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>
> }
>
> -class Stream
> -{
> - typedef std::set<SpiceVideoCodecType> codecs_t;
> -
> -public:
> - Stream(const char *name)
> - : codecs()
> - {
> - streamfd = open(name, O_RDWR);
> - if (streamfd < 0) {
> - throw IOError("failed to open streaming device", errno);
> - }
> - }
> - ~Stream()
> - {
> - close(streamfd);
> - }
> -
> - const codecs_t &client_codecs() { return codecs; }
> - bool streaming_requested() { return is_streaming; }
> -
> - template <typename Message, typename ...PayloadArgs>
> - void send(PayloadArgs... payload)
> - {
> - Message message(payload...);
> - std::lock_guard<std::mutex> stream_guard(mutex);
> - message.write_header(*this);
> - message.write_message_body(*this, payload...);
> - }
> -
> - int read_command(bool blocking);
> - void write_all(const char *operation, const void *buf, const size_t len);
> -
> -private:
> - int have_something_to_read(int timeout);
> - void handle_stream_start_stop(uint32_t len);
> - void handle_stream_capabilities(uint32_t len);
> - void handle_stream_error(uint32_t len);
> - void read_command_from_device(void);
> -
> -private:
> - std::mutex mutex;
> - codecs_t codecs;
> - int streamfd = -1;
> - bool is_streaming = false;
> -};
> -
> -template <typename Payload, typename Info, unsigned Type>
> -class Message
> -{
> -public:
> - template <typename ...PayloadArgs>
> - Message(PayloadArgs... payload)
> - : hdr(StreamDevHeader {
> - .protocol_version = STREAM_DEVICE_PROTOCOL,
> - .padding = 0, // Workaround GCC bug "sorry: not implemented"
> - .type = Type,
> - .size = (uint32_t) Info::size(payload...)
> - })
> - { }
> - void write_header(Stream &stream)
> - {
> - stream.write_all("header", &hdr, sizeof(hdr));
> - }
> -
> -protected:
> - StreamDevHeader hdr;
> - typedef Payload payload_t;
> -};
> -
> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> {
> public:
> @@ -156,20 +86,6 @@ public:
> }
> };
>
> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> -{
> -public:
> - CapabilitiesMessage() : Message() {}
> - static size_t size()
> - {
> - return sizeof(payload_t);
> - }
> - void write_message_body(Stream &stream)
> - {
> - /* No body for capabilities message */
> - }
> -};
> -
> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> {
> public:
> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>
> }} // namespace spice::streaming_agent
>
> -static bool quit_requested = false;
> -
> -int Stream::have_something_to_read(int timeout)
> -{
> - struct pollfd pollfd = {streamfd, POLLIN, 0};
> -
> - if (poll(&pollfd, 1, timeout) < 0) {
> - syslog(LOG_ERR, "poll FAILED\n");
> - return -1;
> - }
> -
> - if (pollfd.revents == POLLIN) {
> - return 1;
> - }
> -
> - return 0;
> -}
> -
> -void Stream::handle_stream_start_stop(uint32_t len)
> -{
> - uint8_t msg[256];
> -
> - if (len >= sizeof(msg)) {
> - throw MessageDataError("message is too long", len, sizeof(msg));
> - }
> - int n = read(streamfd, &msg, len);
> - if (n != (int) len) {
> - throw MessageDataError("read start/stop command from device failed", n, len, errno);
> - }
> - is_streaming = (msg[0] != 0); /* num_codecs */
> - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> - is_streaming ? "START" : "STOP");
> - codecs.clear();
> - for (int i = 1; i <= msg[0]; ++i) {
> - codecs.insert((SpiceVideoCodecType) msg[i]);
> - }
> -}
> -
> -void Stream::handle_stream_capabilities(uint32_t len)
> -{
> - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> -
> - if (len > sizeof(caps)) {
> - throw MessageDataError("capability message too long", len, sizeof(caps));
> - }
> - int n = read(streamfd, caps, len);
> - if (n != (int) len) {
> - throw MessageDataError("read capabilities from device failed", n, len, errno);
> - }
> -
> - // we currently do not support extensions so just reply so
> - send<CapabilitiesMessage>();
> -}
> -
> -void Stream::handle_stream_error(uint32_t len)
> -{
> - // TODO read message and use it
> - throw ProtocolError("got an error message from server");
> -}
> -
> -void Stream::read_command_from_device()
> -{
> - StreamDevHeader hdr;
> - int n;
> -
> - std::lock_guard<std::mutex> stream_guard(mutex);
> - n = read(streamfd, &hdr, sizeof(hdr));
> - if (n != sizeof(hdr)) {
> - throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> - }
> - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> - throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> - }
> -
> - switch (hdr.type) {
> - case STREAM_TYPE_CAPABILITIES:
> - return handle_stream_capabilities(hdr.size);
> - case STREAM_TYPE_NOTIFY_ERROR:
> - return handle_stream_error(hdr.size);
> - case STREAM_TYPE_START_STOP:
> - return handle_stream_start_stop(hdr.size);
> - }
> - throw MessageDataError("unknown message type", hdr.type, 0);
> -}
> -
> -int Stream::read_command(bool blocking)
> -{
> - int timeout = blocking?-1:0;
> - while (!quit_requested) {
> - if (!have_something_to_read(timeout)) {
> - if (!blocking) {
> - return 0;
> - }
> - sleep(1);
> - continue;
> - }
> - read_command_from_device();
> - break;
> - }
> -
> - return 1;
> -}
> -
> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> -{
> - size_t written = 0;
> - while (written < len) {
> - int l = write(streamfd, (const char *) buf + written, len - written);
> - if (l < 0) {
> - if (errno == EINTR) {
> - continue;
> - }
> - throw WriteError("write failed", operation, errno).syslog();
> - }
> - written += l;
> - }
> - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> -}
> +bool quit_requested = false;
>
> static void handle_interrupt(int intr)
> {
> diff --git a/src/stream.cpp b/src/stream.cpp
> new file mode 100644
> index 0000000..f756097
> --- /dev/null
> +++ b/src/stream.cpp
> @@ -0,0 +1,172 @@
> +/* Encapsulation of the stream used to communicate between agent and server
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +
> +#include "stream.hpp"
> +#include "message.hpp"
> +
> +#include <spice/stream-device.h>
> +
> +#include <spice-streaming-agent/errors.hpp>
> +
> +#include <sys/types.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +#include <poll.h>
> +#include <syslog.h>
> +#include <unistd.h>
> +
> +namespace spice
> +{
> +namespace streaming_agent
> +{
> +
> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> + STREAM_TYPE_CAPABILITIES>
> +{
> +public:
> + CapabilitiesMessage() : Message() {}
> + static size_t size()
> + {
> + return sizeof(payload_t);
> + }
> + void write_message_body(Stream &stream)
> + {
> + /* No body for capabilities message */
> + }
> +};
Not sure I like scattering the messages across source files that happen
to use them, though I suppose you did it because each message (like the
X11Cursor) may require different header files included? Perhaps it is
the way to go...
> +
> +Stream::Stream(const char *name)
> + : codecs()
> +{
> + streamfd = open(name, O_RDWR);
> + if (streamfd < 0) {
> + throw IOError("failed to open streaming device", errno);
> + }
> +}
> +
> +Stream::~Stream()
> +{
> + close(streamfd);
> +}
> +
> +int Stream::have_something_to_read(int timeout)
> +{
> + struct pollfd pollfd = {streamfd, POLLIN, 0};
> +
> + if (poll(&pollfd, 1, timeout) < 0) {
> + syslog(LOG_ERR, "poll FAILED\n");
> + return -1;
> + }
> +
> + if (pollfd.revents == POLLIN) {
> + return 1;
> + }
> +
> + return 0;
> +}
> +
> +void Stream::handle_stream_start_stop(uint32_t len)
> +{
> + uint8_t msg[256];
> +
> + if (len >= sizeof(msg)) {
> + throw MessageDataError("message is too long", len, sizeof(msg));
> + }
> + int n = read(streamfd, &msg, len);
> + if (n != (int) len) {
> + throw MessageDataError("read start/stop command from device failed", n, len, errno);
> + }
> + is_streaming = (msg[0] != 0); /* num_codecs */
> + syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> + is_streaming ? "START" : "STOP");
> + codecs.clear();
> + for (int i = 1; i <= msg[0]; ++i) {
> + codecs.insert((SpiceVideoCodecType) msg[i]);
> + }
> +}
> +
> +void Stream::handle_stream_capabilities(uint32_t len)
> +{
> + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> +
> + if (len > sizeof(caps)) {
> + throw MessageDataError("capability message too long", len, sizeof(caps));
> + }
> + int n = read(streamfd, caps, len);
> + if (n != (int) len) {
> + throw MessageDataError("read capabilities from device failed", n, len, errno);
> + }
> +
> + // we currently do not support extensions so just reply so
> + send<CapabilitiesMessage>();
> +}
> +
> +void Stream::handle_stream_error(uint32_t len)
> +{
> + // TODO read message and use it
> + throw ProtocolError("got an error message from server");
> +}
> +
> +void Stream::read_command_from_device()
> +{
> + StreamDevHeader hdr;
> + int n;
> +
> + std::lock_guard<std::mutex> stream_guard(mutex);
> + n = read(streamfd, &hdr, sizeof(hdr));
> + if (n != sizeof(hdr)) {
> + throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> + }
> + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> + throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> + }
> +
> + switch (hdr.type) {
> + case STREAM_TYPE_CAPABILITIES:
> + return handle_stream_capabilities(hdr.size);
> + case STREAM_TYPE_NOTIFY_ERROR:
> + return handle_stream_error(hdr.size);
> + case STREAM_TYPE_START_STOP:
> + return handle_stream_start_stop(hdr.size);
> + }
> + throw MessageDataError("unknown message type", hdr.type, 0);
> +}
> +
> +int Stream::read_command(bool blocking)
> +{
> + int timeout = blocking?-1:0;
> + while (!quit_requested) {
> + if (!have_something_to_read(timeout)) {
> + if (!blocking) {
> + return 0;
> + }
> + sleep(1);
> + continue;
> + }
> + read_command_from_device();
> + break;
> + }
> +
> + return 1;
> +}
> +
> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> +{
> + size_t written = 0;
> + while (written < len) {
> + int l = write(streamfd, (const char *) buf + written, len - written);
> + if (l < 0) {
> + if (errno == EINTR) {
> + continue;
> + }
> + throw WriteError("write failed", operation, errno).syslog();
> + }
> + written += l;
> + }
> + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> +}
> +
> +}} // namespace spice::streaming_agent
> diff --git a/src/stream.hpp b/src/stream.hpp
> new file mode 100644
> index 0000000..b689f36
> --- /dev/null
> +++ b/src/stream.hpp
> @@ -0,0 +1,55 @@
> +/* Encapsulation of the stream used to communicate between agent and server
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> +#define SPICE_STREAMING_AGENT_STREAM_HPP
> +
> +#include <spice/enums.h>
> +#include <set>
> +#include <mutex>
> +
> +namespace spice {
> +namespace streaming_agent {
> +
> +class Stream
> +{
> + typedef std::set<SpiceVideoCodecType> codecs_t;
> +
> +public:
> + Stream(const char *name);
> + ~Stream();
> +
> + const codecs_t &client_codecs() { return codecs; }
> + bool streaming_requested() { return is_streaming; }
> +
> + template <typename Message, typename ...PayloadArgs>
> + void send(PayloadArgs... payload)
> + {
> + Message message(payload...);
> + std::lock_guard<std::mutex> stream_guard(mutex);
> + message.write_header(*this);
> + message.write_message_body(*this, payload...);
> + }
> +
> + int read_command(bool blocking);
> + void write_all(const char *operation, const void *buf, const size_t len);
> +
> +private:
> + int have_something_to_read(int timeout);
> + void handle_stream_start_stop(uint32_t len);
> + void handle_stream_capabilities(uint32_t len);
> + void handle_stream_error(uint32_t len);
> + void read_command_from_device(void);
> +
> +private:
> + std::mutex mutex;
> + codecs_t codecs;
> + int streamfd = -1;
> + bool is_streaming = false;
> +};
> +
> +}} // namespace spice::streaming_agent
> +
> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
More information about the Spice-devel
mailing list