[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files
Christophe de Dinechin
christophe.de.dinechin at gmail.com
Thu Mar 1 16:57:19 UTC 2018
> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky at redhat.com> wrote:
>
> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>> From: Christophe de Dinechin <dinechin at redhat.com>
>>
>> Doing this change will make it possible to move the capture loop to the
>> concrete-agent.cpp file.
>>
>> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
>> ---
>> include/spice-streaming-agent/errors.hpp | 2 +
>> src/Makefile.am | 2 +
>> src/message.hpp | 41 ++++++
>> src/spice-streaming-agent.cpp | 209 +------------------------------
>> src/stream.cpp | 172 +++++++++++++++++++++++++
>> src/stream.hpp | 55 ++++++++
>> 6 files changed, 276 insertions(+), 205 deletions(-)
>> create mode 100644 src/message.hpp
>> create mode 100644 src/stream.cpp
>> create mode 100644 src/stream.hpp
>>
>> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
>> index 870a0fd..62ae010 100644
>> --- a/include/spice-streaming-agent/errors.hpp
>> +++ b/include/spice-streaming-agent/errors.hpp
>> @@ -90,4 +90,6 @@ protected:
>>
>> }} // namespace spice::streaming_agent
>>
>> +extern bool quit_requested;
>
> Putting quit_requested into errors.hpp? Why?
Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
>
>> +
>> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>> diff --git a/src/Makefile.am b/src/Makefile.am
>> index 2507844..923a103 100644
>> --- a/src/Makefile.am
>> +++ b/src/Makefile.am
>> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>> mjpeg-fallback.hpp \
>> jpeg.cpp \
>> jpeg.hpp \
>> + stream.cpp \
>> + stream.hpp \
>> errors.cpp \
>> $(NULL)
>> diff --git a/src/message.hpp b/src/message.hpp
>> new file mode 100644
>> index 0000000..28b3e28
>> --- /dev/null
>> +++ b/src/message.hpp
>> @@ -0,0 +1,41 @@
>> +/* Formatting messages
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
>> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
>> +
>> +#include <spice/stream-device.h>
>> +
>> +namespace spice
>> +{
>> +namespace streaming_agent
>> +{
>> +
>> +template <typename Payload, typename Info, unsigned Type>
>> +class Message
>> +{
>> +public:
>> + template <typename ...PayloadArgs>
>> + Message(PayloadArgs... payload)
>> + : hdr(StreamDevHeader {
>> + .protocol_version = STREAM_DEVICE_PROTOCOL,
>> + .padding = 0, // Workaround GCC bug "sorry: not implemented"
>> + .type = Type,
>> + .size = (uint32_t) Info::size(payload...)
>> + })
>> + { }
>> + void write_header(Stream &stream)
>> + {
>> + stream.write_all("header", &hdr, sizeof(hdr));
>> + }
>> +
>> +protected:
>> + StreamDevHeader hdr;
>> + typedef Payload payload_t;
>> +};
>> +
>> +}} // namespace spice::streaming_agent
>> +
>> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index 35e65bb..c401a34 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -5,6 +5,8 @@
>> */
>>
>> #include "concrete-agent.hpp"
>> +#include "stream.hpp"
>> +#include "message.hpp"
>> #include "hexdump.h"
>> #include "mjpeg-fallback.hpp"
>>
>> @@ -21,11 +23,9 @@
>> #include <inttypes.h>
>> #include <string.h>
>> #include <getopt.h>
>> -#include <unistd.h>
>> #include <errno.h>
>> -#include <fcntl.h>
>> +#include <unistd.h>
>> #include <sys/time.h>
>> -#include <poll.h>
>> #include <syslog.h>
>> #include <signal.h>
>> #include <exception>
>> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>>
>> }
>>
>> -class Stream
>> -{
>> - typedef std::set<SpiceVideoCodecType> codecs_t;
>> -
>> -public:
>> - Stream(const char *name)
>> - : codecs()
>> - {
>> - streamfd = open(name, O_RDWR);
>> - if (streamfd < 0) {
>> - throw IOError("failed to open streaming device", errno);
>> - }
>> - }
>> - ~Stream()
>> - {
>> - close(streamfd);
>> - }
>> -
>> - const codecs_t &client_codecs() { return codecs; }
>> - bool streaming_requested() { return is_streaming; }
>> -
>> - template <typename Message, typename ...PayloadArgs>
>> - void send(PayloadArgs... payload)
>> - {
>> - Message message(payload...);
>> - std::lock_guard<std::mutex> stream_guard(mutex);
>> - message.write_header(*this);
>> - message.write_message_body(*this, payload...);
>> - }
>> -
>> - int read_command(bool blocking);
>> - void write_all(const char *operation, const void *buf, const size_t len);
>> -
>> -private:
>> - int have_something_to_read(int timeout);
>> - void handle_stream_start_stop(uint32_t len);
>> - void handle_stream_capabilities(uint32_t len);
>> - void handle_stream_error(uint32_t len);
>> - void read_command_from_device(void);
>> -
>> -private:
>> - std::mutex mutex;
>> - codecs_t codecs;
>> - int streamfd = -1;
>> - bool is_streaming = false;
>> -};
>> -
>> -template <typename Payload, typename Info, unsigned Type>
>> -class Message
>> -{
>> -public:
>> - template <typename ...PayloadArgs>
>> - Message(PayloadArgs... payload)
>> - : hdr(StreamDevHeader {
>> - .protocol_version = STREAM_DEVICE_PROTOCOL,
>> - .padding = 0, // Workaround GCC bug "sorry: not implemented"
>> - .type = Type,
>> - .size = (uint32_t) Info::size(payload...)
>> - })
>> - { }
>> - void write_header(Stream &stream)
>> - {
>> - stream.write_all("header", &hdr, sizeof(hdr));
>> - }
>> -
>> -protected:
>> - StreamDevHeader hdr;
>> - typedef Payload payload_t;
>> -};
>> -
>> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>> {
>> public:
>> @@ -156,20 +86,6 @@ public:
>> }
>> };
>>
>> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>> -{
>> -public:
>> - CapabilitiesMessage() : Message() {}
>> - static size_t size()
>> - {
>> - return sizeof(payload_t);
>> - }
>> - void write_message_body(Stream &stream)
>> - {
>> - /* No body for capabilities message */
>> - }
>> -};
>> -
>> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>> {
>> public:
>> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>>
>> }} // namespace spice::streaming_agent
>>
>> -static bool quit_requested = false;
>> -
>> -int Stream::have_something_to_read(int timeout)
>> -{
>> - struct pollfd pollfd = {streamfd, POLLIN, 0};
>> -
>> - if (poll(&pollfd, 1, timeout) < 0) {
>> - syslog(LOG_ERR, "poll FAILED\n");
>> - return -1;
>> - }
>> -
>> - if (pollfd.revents == POLLIN) {
>> - return 1;
>> - }
>> -
>> - return 0;
>> -}
>> -
>> -void Stream::handle_stream_start_stop(uint32_t len)
>> -{
>> - uint8_t msg[256];
>> -
>> - if (len >= sizeof(msg)) {
>> - throw MessageDataError("message is too long", len, sizeof(msg));
>> - }
>> - int n = read(streamfd, &msg, len);
>> - if (n != (int) len) {
>> - throw MessageDataError("read start/stop command from device failed", n, len, errno);
>> - }
>> - is_streaming = (msg[0] != 0); /* num_codecs */
>> - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>> - is_streaming ? "START" : "STOP");
>> - codecs.clear();
>> - for (int i = 1; i <= msg[0]; ++i) {
>> - codecs.insert((SpiceVideoCodecType) msg[i]);
>> - }
>> -}
>> -
>> -void Stream::handle_stream_capabilities(uint32_t len)
>> -{
>> - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> -
>> - if (len > sizeof(caps)) {
>> - throw MessageDataError("capability message too long", len, sizeof(caps));
>> - }
>> - int n = read(streamfd, caps, len);
>> - if (n != (int) len) {
>> - throw MessageDataError("read capabilities from device failed", n, len, errno);
>> - }
>> -
>> - // we currently do not support extensions so just reply so
>> - send<CapabilitiesMessage>();
>> -}
>> -
>> -void Stream::handle_stream_error(uint32_t len)
>> -{
>> - // TODO read message and use it
>> - throw ProtocolError("got an error message from server");
>> -}
>> -
>> -void Stream::read_command_from_device()
>> -{
>> - StreamDevHeader hdr;
>> - int n;
>> -
>> - std::lock_guard<std::mutex> stream_guard(mutex);
>> - n = read(streamfd, &hdr, sizeof(hdr));
>> - if (n != sizeof(hdr)) {
>> - throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>> - }
>> - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>> - throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>> - }
>> -
>> - switch (hdr.type) {
>> - case STREAM_TYPE_CAPABILITIES:
>> - return handle_stream_capabilities(hdr.size);
>> - case STREAM_TYPE_NOTIFY_ERROR:
>> - return handle_stream_error(hdr.size);
>> - case STREAM_TYPE_START_STOP:
>> - return handle_stream_start_stop(hdr.size);
>> - }
>> - throw MessageDataError("unknown message type", hdr.type, 0);
>> -}
>> -
>> -int Stream::read_command(bool blocking)
>> -{
>> - int timeout = blocking?-1:0;
>> - while (!quit_requested) {
>> - if (!have_something_to_read(timeout)) {
>> - if (!blocking) {
>> - return 0;
>> - }
>> - sleep(1);
>> - continue;
>> - }
>> - read_command_from_device();
>> - break;
>> - }
>> -
>> - return 1;
>> -}
>> -
>> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> -{
>> - size_t written = 0;
>> - while (written < len) {
>> - int l = write(streamfd, (const char *) buf + written, len - written);
>> - if (l < 0) {
>> - if (errno == EINTR) {
>> - continue;
>> - }
>> - throw WriteError("write failed", operation, errno).syslog();
>> - }
>> - written += l;
>> - }
>> - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> -}
>> +bool quit_requested = false;
>>
>> static void handle_interrupt(int intr)
>> {
>> diff --git a/src/stream.cpp b/src/stream.cpp
>> new file mode 100644
>> index 0000000..f756097
>> --- /dev/null
>> +++ b/src/stream.cpp
>> @@ -0,0 +1,172 @@
>> +/* Encapsulation of the stream used to communicate between agent and server
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +
>> +#include "stream.hpp"
>> +#include "message.hpp"
>> +
>> +#include <spice/stream-device.h>
>> +
>> +#include <spice-streaming-agent/errors.hpp>
>> +
>> +#include <sys/types.h>
>> +#include <sys/stat.h>
>> +#include <fcntl.h>
>> +#include <poll.h>
>> +#include <syslog.h>
>> +#include <unistd.h>
>> +
>> +namespace spice
>> +{
>> +namespace streaming_agent
>> +{
>> +
>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
>> + STREAM_TYPE_CAPABILITIES>
>> +{
>> +public:
>> + CapabilitiesMessage() : Message() {}
>> + static size_t size()
>> + {
>> + return sizeof(payload_t);
>> + }
>> + void write_message_body(Stream &stream)
>> + {
>> + /* No body for capabilities message */
>> + }
>> +};
>
> Not sure I like scattering the messages across source files that happen
> to use them, though I suppose you did it because each message (like the
> X11Cursor) may require different header files included? Perhaps it is
> the way to go…
No, it’s really to de-couple things, a good way to check if encapsulation was correct.
>
>> +
>> +Stream::Stream(const char *name)
>> + : codecs()
>> +{
>> + streamfd = open(name, O_RDWR);
>> + if (streamfd < 0) {
>> + throw IOError("failed to open streaming device", errno);
>> + }
>> +}
>> +
>> +Stream::~Stream()
>> +{
>> + close(streamfd);
>> +}
>> +
>> +int Stream::have_something_to_read(int timeout)
>> +{
>> + struct pollfd pollfd = {streamfd, POLLIN, 0};
>> +
>> + if (poll(&pollfd, 1, timeout) < 0) {
>> + syslog(LOG_ERR, "poll FAILED\n");
>> + return -1;
>> + }
>> +
>> + if (pollfd.revents == POLLIN) {
>> + return 1;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +void Stream::handle_stream_start_stop(uint32_t len)
>> +{
>> + uint8_t msg[256];
>> +
>> + if (len >= sizeof(msg)) {
>> + throw MessageDataError("message is too long", len, sizeof(msg));
>> + }
>> + int n = read(streamfd, &msg, len);
>> + if (n != (int) len) {
>> + throw MessageDataError("read start/stop command from device failed", n, len, errno);
>> + }
>> + is_streaming = (msg[0] != 0); /* num_codecs */
>> + syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>> + is_streaming ? "START" : "STOP");
>> + codecs.clear();
>> + for (int i = 1; i <= msg[0]; ++i) {
>> + codecs.insert((SpiceVideoCodecType) msg[i]);
>> + }
>> +}
>> +
>> +void Stream::handle_stream_capabilities(uint32_t len)
>> +{
>> + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> +
>> + if (len > sizeof(caps)) {
>> + throw MessageDataError("capability message too long", len, sizeof(caps));
>> + }
>> + int n = read(streamfd, caps, len);
>> + if (n != (int) len) {
>> + throw MessageDataError("read capabilities from device failed", n, len, errno);
>> + }
>> +
>> + // we currently do not support extensions so just reply so
>> + send<CapabilitiesMessage>();
>> +}
>> +
>> +void Stream::handle_stream_error(uint32_t len)
>> +{
>> + // TODO read message and use it
>> + throw ProtocolError("got an error message from server");
>> +}
>> +
>> +void Stream::read_command_from_device()
>> +{
>> + StreamDevHeader hdr;
>> + int n;
>> +
>> + std::lock_guard<std::mutex> stream_guard(mutex);
>> + n = read(streamfd, &hdr, sizeof(hdr));
>> + if (n != sizeof(hdr)) {
>> + throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>> + }
>> + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>> + throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>> + }
>> +
>> + switch (hdr.type) {
>> + case STREAM_TYPE_CAPABILITIES:
>> + return handle_stream_capabilities(hdr.size);
>> + case STREAM_TYPE_NOTIFY_ERROR:
>> + return handle_stream_error(hdr.size);
>> + case STREAM_TYPE_START_STOP:
>> + return handle_stream_start_stop(hdr.size);
>> + }
>> + throw MessageDataError("unknown message type", hdr.type, 0);
>> +}
>> +
>> +int Stream::read_command(bool blocking)
>> +{
>> + int timeout = blocking?-1:0;
>> + while (!quit_requested) {
>> + if (!have_something_to_read(timeout)) {
>> + if (!blocking) {
>> + return 0;
>> + }
>> + sleep(1);
>> + continue;
>> + }
>> + read_command_from_device();
>> + break;
>> + }
>> +
>> + return 1;
>> +}
>> +
>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> +{
>> + size_t written = 0;
>> + while (written < len) {
>> + int l = write(streamfd, (const char *) buf + written, len - written);
>> + if (l < 0) {
>> + if (errno == EINTR) {
>> + continue;
>> + }
>> + throw WriteError("write failed", operation, errno).syslog();
>> + }
>> + written += l;
>> + }
>> + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> +}
>> +
>> +}} // namespace spice::streaming_agent
>> diff --git a/src/stream.hpp b/src/stream.hpp
>> new file mode 100644
>> index 0000000..b689f36
>> --- /dev/null
>> +++ b/src/stream.hpp
>> @@ -0,0 +1,55 @@
>> +/* Encapsulation of the stream used to communicate between agent and server
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
>> +#define SPICE_STREAMING_AGENT_STREAM_HPP
>> +
>> +#include <spice/enums.h>
>> +#include <set>
>> +#include <mutex>
>> +
>> +namespace spice {
>> +namespace streaming_agent {
>> +
>> +class Stream
>> +{
>> + typedef std::set<SpiceVideoCodecType> codecs_t;
>> +
>> +public:
>> + Stream(const char *name);
>> + ~Stream();
>> +
>> + const codecs_t &client_codecs() { return codecs; }
>> + bool streaming_requested() { return is_streaming; }
>> +
>> + template <typename Message, typename ...PayloadArgs>
>> + void send(PayloadArgs... payload)
>> + {
>> + Message message(payload...);
>> + std::lock_guard<std::mutex> stream_guard(mutex);
>> + message.write_header(*this);
>> + message.write_message_body(*this, payload...);
>> + }
>> +
>> + int read_command(bool blocking);
>> + void write_all(const char *operation, const void *buf, const size_t len);
>> +
>> +private:
>> + int have_something_to_read(int timeout);
>> + void handle_stream_start_stop(uint32_t len);
>> + void handle_stream_capabilities(uint32_t len);
>> + void handle_stream_error(uint32_t len);
>> + void read_command_from_device(void);
>> +
>> +private:
>> + std::mutex mutex;
>> + codecs_t codecs;
>> + int streamfd = -1;
>> + bool is_streaming = false;
>> +};
>> +
>> +}} // namespace spice::streaming_agent
>> +
>> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
More information about the Spice-devel
mailing list