[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files
Lukáš Hrázký
lhrazky at redhat.com
Fri Mar 2 13:07:39 UTC 2018
On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
> > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky at redhat.com> wrote:
> >
> > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> > > From: Christophe de Dinechin <dinechin at redhat.com>
> > >
> > > Doing this change will make it possible to move the capture loop to the
> > > concrete-agent.cpp file.
> > >
> > > Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> > > ---
> > > include/spice-streaming-agent/errors.hpp | 2 +
> > > src/Makefile.am | 2 +
> > > src/message.hpp | 41 ++++++
> > > src/spice-streaming-agent.cpp | 209 +------------------------------
> > > src/stream.cpp | 172 +++++++++++++++++++++++++
> > > src/stream.hpp | 55 ++++++++
> > > 6 files changed, 276 insertions(+), 205 deletions(-)
> > > create mode 100644 src/message.hpp
> > > create mode 100644 src/stream.cpp
> > > create mode 100644 src/stream.hpp
> > >
> > > diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> > > index 870a0fd..62ae010 100644
> > > --- a/include/spice-streaming-agent/errors.hpp
> > > +++ b/include/spice-streaming-agent/errors.hpp
> > > @@ -90,4 +90,6 @@ protected:
> > >
> > > }} // namespace spice::streaming_agent
> > >
> > > +extern bool quit_requested;
> >
> > Putting quit_requested into errors.hpp? Why?
>
> Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
I don't think the flag belongs to the errors header at all, let alone a
public one. It's a generic control flow mechanism to signal the
termination of the program. The only relation to errors is that in case
of errors you want to (usually) also quit.
Therefore the quit flag should not be coupled with errors and instead
used in the main control flow spot where global error handling is
taking place (and then in the signal handler). As a static variable it
should also be proliferating through the program as little as possible.
I've actually had the following idea for the quit flag, which I think
promotes the locality of the flag in the class design:
// file agent.{hpp,cpp}
class Agent {
public:
bool& quit_flag() {
return quit_requested;
}
void do_capture() {
while(!quit_requested) {
// ...
}
}
private:
bool quit_requested = false;
};
// file main.cpp
static bool* quit_requested = nullptr;
void handle_sigterm() {
*quit_requested = true;
}
int main() {
Agent agent;
quit_requested = &agent.quit_flag();
...
}
Some corner case handling (the quit_reqested pointer not yet set, etc.)
was left out for clarity.
This keeps the flag local to the loop in which it is relevant and the
static variable local to the main.cpp file. Thus it increases
modularity (which arguably we do not need that much here).
I would also use a different quit flag for the cursor thread (again,
local to the X11CursorUpdater) and take care of it in main() after the
Agent::do_capture() loop quits.
>
> >
> > > +
> > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> > > diff --git a/src/Makefile.am b/src/Makefile.am
> > > index 2507844..923a103 100644
> > > --- a/src/Makefile.am
> > > +++ b/src/Makefile.am
> > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> > > mjpeg-fallback.hpp \
> > > jpeg.cpp \
> > > jpeg.hpp \
> > > + stream.cpp \
> > > + stream.hpp \
> > > errors.cpp \
> > > $(NULL)
> > > diff --git a/src/message.hpp b/src/message.hpp
> > > new file mode 100644
> > > index 0000000..28b3e28
> > > --- /dev/null
> > > +++ b/src/message.hpp
> > > @@ -0,0 +1,41 @@
> > > +/* Formatting messages
> > > + *
> > > + * \copyright
> > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > + */
> > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > +
> > > +#include <spice/stream-device.h>
> > > +
> > > +namespace spice
> > > +{
> > > +namespace streaming_agent
> > > +{
> > > +
> > > +template <typename Payload, typename Info, unsigned Type>
> > > +class Message
> > > +{
> > > +public:
> > > + template <typename ...PayloadArgs>
> > > + Message(PayloadArgs... payload)
> > > + : hdr(StreamDevHeader {
> > > + .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > + .padding = 0, // Workaround GCC bug "sorry: not implemented"
> > > + .type = Type,
> > > + .size = (uint32_t) Info::size(payload...)
> > > + })
> > > + { }
> > > + void write_header(Stream &stream)
> > > + {
> > > + stream.write_all("header", &hdr, sizeof(hdr));
> > > + }
> > > +
> > > +protected:
> > > + StreamDevHeader hdr;
> > > + typedef Payload payload_t;
> > > +};
> > > +
> > > +}} // namespace spice::streaming_agent
> > > +
> > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > index 35e65bb..c401a34 100644
> > > --- a/src/spice-streaming-agent.cpp
> > > +++ b/src/spice-streaming-agent.cpp
> > > @@ -5,6 +5,8 @@
> > > */
> > >
> > > #include "concrete-agent.hpp"
> > > +#include "stream.hpp"
> > > +#include "message.hpp"
> > > #include "hexdump.h"
> > > #include "mjpeg-fallback.hpp"
> > >
> > > @@ -21,11 +23,9 @@
> > > #include <inttypes.h>
> > > #include <string.h>
> > > #include <getopt.h>
> > > -#include <unistd.h>
> > > #include <errno.h>
> > > -#include <fcntl.h>
> > > +#include <unistd.h>
> > > #include <sys/time.h>
> > > -#include <poll.h>
> > > #include <syslog.h>
> > > #include <signal.h>
> > > #include <exception>
> > > @@ -57,76 +57,6 @@ static uint64_t get_time(void)
> > >
> > > }
> > >
> > > -class Stream
> > > -{
> > > - typedef std::set<SpiceVideoCodecType> codecs_t;
> > > -
> > > -public:
> > > - Stream(const char *name)
> > > - : codecs()
> > > - {
> > > - streamfd = open(name, O_RDWR);
> > > - if (streamfd < 0) {
> > > - throw IOError("failed to open streaming device", errno);
> > > - }
> > > - }
> > > - ~Stream()
> > > - {
> > > - close(streamfd);
> > > - }
> > > -
> > > - const codecs_t &client_codecs() { return codecs; }
> > > - bool streaming_requested() { return is_streaming; }
> > > -
> > > - template <typename Message, typename ...PayloadArgs>
> > > - void send(PayloadArgs... payload)
> > > - {
> > > - Message message(payload...);
> > > - std::lock_guard<std::mutex> stream_guard(mutex);
> > > - message.write_header(*this);
> > > - message.write_message_body(*this, payload...);
> > > - }
> > > -
> > > - int read_command(bool blocking);
> > > - void write_all(const char *operation, const void *buf, const size_t len);
> > > -
> > > -private:
> > > - int have_something_to_read(int timeout);
> > > - void handle_stream_start_stop(uint32_t len);
> > > - void handle_stream_capabilities(uint32_t len);
> > > - void handle_stream_error(uint32_t len);
> > > - void read_command_from_device(void);
> > > -
> > > -private:
> > > - std::mutex mutex;
> > > - codecs_t codecs;
> > > - int streamfd = -1;
> > > - bool is_streaming = false;
> > > -};
> > > -
> > > -template <typename Payload, typename Info, unsigned Type>
> > > -class Message
> > > -{
> > > -public:
> > > - template <typename ...PayloadArgs>
> > > - Message(PayloadArgs... payload)
> > > - : hdr(StreamDevHeader {
> > > - .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > - .padding = 0, // Workaround GCC bug "sorry: not implemented"
> > > - .type = Type,
> > > - .size = (uint32_t) Info::size(payload...)
> > > - })
> > > - { }
> > > - void write_header(Stream &stream)
> > > - {
> > > - stream.write_all("header", &hdr, sizeof(hdr));
> > > - }
> > > -
> > > -protected:
> > > - StreamDevHeader hdr;
> > > - typedef Payload payload_t;
> > > -};
> > > -
> > > class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> > > {
> > > public:
> > > @@ -156,20 +86,6 @@ public:
> > > }
> > > };
> > >
> > > -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> > > -{
> > > -public:
> > > - CapabilitiesMessage() : Message() {}
> > > - static size_t size()
> > > - {
> > > - return sizeof(payload_t);
> > > - }
> > > - void write_message_body(Stream &stream)
> > > - {
> > > - /* No body for capabilities message */
> > > - }
> > > -};
> > > -
> > > class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> > > {
> > > public:
> > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
> > >
> > > }} // namespace spice::streaming_agent
> > >
> > > -static bool quit_requested = false;
> > > -
> > > -int Stream::have_something_to_read(int timeout)
> > > -{
> > > - struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > -
> > > - if (poll(&pollfd, 1, timeout) < 0) {
> > > - syslog(LOG_ERR, "poll FAILED\n");
> > > - return -1;
> > > - }
> > > -
> > > - if (pollfd.revents == POLLIN) {
> > > - return 1;
> > > - }
> > > -
> > > - return 0;
> > > -}
> > > -
> > > -void Stream::handle_stream_start_stop(uint32_t len)
> > > -{
> > > - uint8_t msg[256];
> > > -
> > > - if (len >= sizeof(msg)) {
> > > - throw MessageDataError("message is too long", len, sizeof(msg));
> > > - }
> > > - int n = read(streamfd, &msg, len);
> > > - if (n != (int) len) {
> > > - throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > - }
> > > - is_streaming = (msg[0] != 0); /* num_codecs */
> > > - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > - is_streaming ? "START" : "STOP");
> > > - codecs.clear();
> > > - for (int i = 1; i <= msg[0]; ++i) {
> > > - codecs.insert((SpiceVideoCodecType) msg[i]);
> > > - }
> > > -}
> > > -
> > > -void Stream::handle_stream_capabilities(uint32_t len)
> > > -{
> > > - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > -
> > > - if (len > sizeof(caps)) {
> > > - throw MessageDataError("capability message too long", len, sizeof(caps));
> > > - }
> > > - int n = read(streamfd, caps, len);
> > > - if (n != (int) len) {
> > > - throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > - }
> > > -
> > > - // we currently do not support extensions so just reply so
> > > - send<CapabilitiesMessage>();
> > > -}
> > > -
> > > -void Stream::handle_stream_error(uint32_t len)
> > > -{
> > > - // TODO read message and use it
> > > - throw ProtocolError("got an error message from server");
> > > -}
> > > -
> > > -void Stream::read_command_from_device()
> > > -{
> > > - StreamDevHeader hdr;
> > > - int n;
> > > -
> > > - std::lock_guard<std::mutex> stream_guard(mutex);
> > > - n = read(streamfd, &hdr, sizeof(hdr));
> > > - if (n != sizeof(hdr)) {
> > > - throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > - }
> > > - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > - throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > - }
> > > -
> > > - switch (hdr.type) {
> > > - case STREAM_TYPE_CAPABILITIES:
> > > - return handle_stream_capabilities(hdr.size);
> > > - case STREAM_TYPE_NOTIFY_ERROR:
> > > - return handle_stream_error(hdr.size);
> > > - case STREAM_TYPE_START_STOP:
> > > - return handle_stream_start_stop(hdr.size);
> > > - }
> > > - throw MessageDataError("unknown message type", hdr.type, 0);
> > > -}
> > > -
> > > -int Stream::read_command(bool blocking)
> > > -{
> > > - int timeout = blocking?-1:0;
> > > - while (!quit_requested) {
> > > - if (!have_something_to_read(timeout)) {
> > > - if (!blocking) {
> > > - return 0;
> > > - }
> > > - sleep(1);
> > > - continue;
> > > - }
> > > - read_command_from_device();
> > > - break;
> > > - }
> > > -
> > > - return 1;
> > > -}
> > > -
> > > -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > -{
> > > - size_t written = 0;
> > > - while (written < len) {
> > > - int l = write(streamfd, (const char *) buf + written, len - written);
> > > - if (l < 0) {
> > > - if (errno == EINTR) {
> > > - continue;
> > > - }
> > > - throw WriteError("write failed", operation, errno).syslog();
> > > - }
> > > - written += l;
> > > - }
> > > - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > -}
> > > +bool quit_requested = false;
> > >
> > > static void handle_interrupt(int intr)
> > > {
> > > diff --git a/src/stream.cpp b/src/stream.cpp
> > > new file mode 100644
> > > index 0000000..f756097
> > > --- /dev/null
> > > +++ b/src/stream.cpp
> > > @@ -0,0 +1,172 @@
> > > +/* Encapsulation of the stream used to communicate between agent and server
> > > + *
> > > + * \copyright
> > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > + */
> > > +
> > > +#include "stream.hpp"
> > > +#include "message.hpp"
> > > +
> > > +#include <spice/stream-device.h>
> > > +
> > > +#include <spice-streaming-agent/errors.hpp>
> > > +
> > > +#include <sys/types.h>
> > > +#include <sys/stat.h>
> > > +#include <fcntl.h>
> > > +#include <poll.h>
> > > +#include <syslog.h>
> > > +#include <unistd.h>
> > > +
> > > +namespace spice
> > > +{
> > > +namespace streaming_agent
> > > +{
> > > +
> > > +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> > > + STREAM_TYPE_CAPABILITIES>
> > > +{
> > > +public:
> > > + CapabilitiesMessage() : Message() {}
> > > + static size_t size()
> > > + {
> > > + return sizeof(payload_t);
> > > + }
> > > + void write_message_body(Stream &stream)
> > > + {
> > > + /* No body for capabilities message */
> > > + }
> > > +};
> >
> > Not sure I like scattering the messages across source files that happen
> > to use them, though I suppose you did it because each message (like the
> > X11Cursor) may require different header files included? Perhaps it is
> > the way to go…
>
> No, it’s really to de-couple things, a good way to check if encapsulation was correct.
>
>
> >
> > > +
> > > +Stream::Stream(const char *name)
> > > + : codecs()
> > > +{
> > > + streamfd = open(name, O_RDWR);
> > > + if (streamfd < 0) {
> > > + throw IOError("failed to open streaming device", errno);
> > > + }
> > > +}
> > > +
> > > +Stream::~Stream()
> > > +{
> > > + close(streamfd);
> > > +}
> > > +
> > > +int Stream::have_something_to_read(int timeout)
> > > +{
> > > + struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > +
> > > + if (poll(&pollfd, 1, timeout) < 0) {
> > > + syslog(LOG_ERR, "poll FAILED\n");
> > > + return -1;
> > > + }
> > > +
> > > + if (pollfd.revents == POLLIN) {
> > > + return 1;
> > > + }
> > > +
> > > + return 0;
> > > +}
> > > +
> > > +void Stream::handle_stream_start_stop(uint32_t len)
> > > +{
> > > + uint8_t msg[256];
> > > +
> > > + if (len >= sizeof(msg)) {
> > > + throw MessageDataError("message is too long", len, sizeof(msg));
> > > + }
> > > + int n = read(streamfd, &msg, len);
> > > + if (n != (int) len) {
> > > + throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > + }
> > > + is_streaming = (msg[0] != 0); /* num_codecs */
> > > + syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > + is_streaming ? "START" : "STOP");
> > > + codecs.clear();
> > > + for (int i = 1; i <= msg[0]; ++i) {
> > > + codecs.insert((SpiceVideoCodecType) msg[i]);
> > > + }
> > > +}
> > > +
> > > +void Stream::handle_stream_capabilities(uint32_t len)
> > > +{
> > > + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > +
> > > + if (len > sizeof(caps)) {
> > > + throw MessageDataError("capability message too long", len, sizeof(caps));
> > > + }
> > > + int n = read(streamfd, caps, len);
> > > + if (n != (int) len) {
> > > + throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > + }
> > > +
> > > + // we currently do not support extensions so just reply so
> > > + send<CapabilitiesMessage>();
> > > +}
> > > +
> > > +void Stream::handle_stream_error(uint32_t len)
> > > +{
> > > + // TODO read message and use it
> > > + throw ProtocolError("got an error message from server");
> > > +}
> > > +
> > > +void Stream::read_command_from_device()
> > > +{
> > > + StreamDevHeader hdr;
> > > + int n;
> > > +
> > > + std::lock_guard<std::mutex> stream_guard(mutex);
> > > + n = read(streamfd, &hdr, sizeof(hdr));
> > > + if (n != sizeof(hdr)) {
> > > + throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > + }
> > > + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > + throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > + }
> > > +
> > > + switch (hdr.type) {
> > > + case STREAM_TYPE_CAPABILITIES:
> > > + return handle_stream_capabilities(hdr.size);
> > > + case STREAM_TYPE_NOTIFY_ERROR:
> > > + return handle_stream_error(hdr.size);
> > > + case STREAM_TYPE_START_STOP:
> > > + return handle_stream_start_stop(hdr.size);
> > > + }
> > > + throw MessageDataError("unknown message type", hdr.type, 0);
> > > +}
> > > +
> > > +int Stream::read_command(bool blocking)
> > > +{
> > > + int timeout = blocking?-1:0;
> > > + while (!quit_requested) {
> > > + if (!have_something_to_read(timeout)) {
> > > + if (!blocking) {
> > > + return 0;
> > > + }
> > > + sleep(1);
> > > + continue;
> > > + }
> > > + read_command_from_device();
> > > + break;
> > > + }
> > > +
> > > + return 1;
> > > +}
> > > +
> > > +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > +{
> > > + size_t written = 0;
> > > + while (written < len) {
> > > + int l = write(streamfd, (const char *) buf + written, len - written);
> > > + if (l < 0) {
> > > + if (errno == EINTR) {
> > > + continue;
> > > + }
> > > + throw WriteError("write failed", operation, errno).syslog();
> > > + }
> > > + written += l;
> > > + }
> > > + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > +}
> > > +
> > > +}} // namespace spice::streaming_agent
> > > diff --git a/src/stream.hpp b/src/stream.hpp
> > > new file mode 100644
> > > index 0000000..b689f36
> > > --- /dev/null
> > > +++ b/src/stream.hpp
> > > @@ -0,0 +1,55 @@
> > > +/* Encapsulation of the stream used to communicate between agent and server
> > > + *
> > > + * \copyright
> > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > + */
> > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> > > +#define SPICE_STREAMING_AGENT_STREAM_HPP
> > > +
> > > +#include <spice/enums.h>
> > > +#include <set>
> > > +#include <mutex>
> > > +
> > > +namespace spice {
> > > +namespace streaming_agent {
> > > +
> > > +class Stream
> > > +{
> > > + typedef std::set<SpiceVideoCodecType> codecs_t;
> > > +
> > > +public:
> > > + Stream(const char *name);
> > > + ~Stream();
> > > +
> > > + const codecs_t &client_codecs() { return codecs; }
> > > + bool streaming_requested() { return is_streaming; }
> > > +
> > > + template <typename Message, typename ...PayloadArgs>
> > > + void send(PayloadArgs... payload)
> > > + {
> > > + Message message(payload...);
> > > + std::lock_guard<std::mutex> stream_guard(mutex);
> > > + message.write_header(*this);
> > > + message.write_message_body(*this, payload...);
> > > + }
> > > +
> > > + int read_command(bool blocking);
> > > + void write_all(const char *operation, const void *buf, const size_t len);
> > > +
> > > +private:
> > > + int have_something_to_read(int timeout);
> > > + void handle_stream_start_stop(uint32_t len);
> > > + void handle_stream_capabilities(uint32_t len);
> > > + void handle_stream_error(uint32_t len);
> > > + void read_command_from_device(void);
> > > +
> > > +private:
> > > + std::mutex mutex;
> > > + codecs_t codecs;
> > > + int streamfd = -1;
> > > + bool is_streaming = false;
> > > +};
> > > +
> > > +}} // namespace spice::streaming_agent
> > > +
> > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>
>
More information about the Spice-devel
mailing list