[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files

Lukáš Hrázký lhrazky at redhat.com
Fri Mar 2 15:46:15 UTC 2018


On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote:
> > On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhrazky at redhat.com> wrote:
> > 
> > On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
> > > > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky at redhat.com> wrote:
> > > > 
> > > > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> > > > > From: Christophe de Dinechin <dinechin at redhat.com>
> > > > > 
> > > > > Doing this change will make it possible to move the capture loop to the
> > > > > concrete-agent.cpp file.
> > > > > 
> > > > > Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> > > > > ---
> > > > > include/spice-streaming-agent/errors.hpp |   2 +
> > > > > src/Makefile.am                          |   2 +
> > > > > src/message.hpp                          |  41 ++++++
> > > > > src/spice-streaming-agent.cpp            | 209 +------------------------------
> > > > > src/stream.cpp                           | 172 +++++++++++++++++++++++++
> > > > > src/stream.hpp                           |  55 ++++++++
> > > > > 6 files changed, 276 insertions(+), 205 deletions(-)
> > > > > create mode 100644 src/message.hpp
> > > > > create mode 100644 src/stream.cpp
> > > > > create mode 100644 src/stream.hpp
> > > > > 
> > > > > diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> > > > > index 870a0fd..62ae010 100644
> > > > > --- a/include/spice-streaming-agent/errors.hpp
> > > > > +++ b/include/spice-streaming-agent/errors.hpp
> > > > > @@ -90,4 +90,6 @@ protected:
> > > > > 
> > > > > }} // namespace spice::streaming_agent
> > > > > 
> > > > > +extern bool quit_requested;
> > > > 
> > > > Putting quit_requested into errors.hpp? Why?
> > > 
> > > Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
> > 
> > I don't think the flag belongs to the errors header at all, let alone a
> > public one. It's a generic control flow mechanism to signal the
> > termination of the program. The only relation to errors is that in case
> > of errors you want to (usually) also quit.
> 
> Well, ‘quit_requested’ is set for all “final" errors, whether we detect them using exceptions or signals.

But a termination signal is not an error, it is the natural way to end
the program. Actually, exceptions thrown inside the main program loop
are another, second way to exit the loop besides setting the quit flag.

>From another point of view, you include errors.h in each place where
you are either throwing or catching exceptions. But the quit flag is
only needed in the main loop and in the signal handler.

> > 
> > Therefore the quit flag should not be coupled with errors and instead
> > used in the main control flow spot where global error handling is
> > taking place (and then in the signal handler). As a static variable it
> > should also be proliferating through the program as little as possible.
> > 
> > I've actually had the following idea for the quit flag, which I think
> > promotes the locality of the flag in the class design:
> > 
> > 
> > // file agent.{hpp,cpp}
> > class Agent {
> > public:
> >    bool& quit_flag() {
> >        return quit_requested;
> >    }
> > 
> >    void do_capture() {
> >        while(!quit_requested) {
> >            // ...
> >        }
> >    }
> > private:
> >    bool quit_requested = false;
> > };
> > 
> > 
> > // file main.cpp
> > static bool* quit_requested = nullptr;
> > 
> > void handle_sigterm() {
> >    *quit_requested = true;
> > }
> > 
> > int main() {
> >    Agent agent;
> >    quit_requested = &agent.quit_flag();
> >    ...
> > }
> > 
> > 
> > Some corner case handling (the quit_reqested pointer not yet set, etc.)
> > was left out for clarity.
> 
> I understand what you are trying to do, but you replace one global variable with one (and several if I follow the logic below), so how is helping with the proliferation?

Only this one global variable that is in the example. And the only
reason it is global is because of the signal handler, there is no other
way for it work. Therefore, I only make it "global locally" (excuse the
oxymoron :)) for the signal handler and limit what can access it as
much as I can.

(And it's just a pointer, the real flag is local to the loop which uses
it)

> > 
> > This keeps the flag local to the loop in which it is relevant and the
> > static variable local to the main.cpp file. Thus it increases
> > modularity (which arguably we do not need that much here).
> > 
> > I would also use a different quit flag for the cursor thread (again,
> > local to the X11CursorUpdater) and take care of it in main() after the
> > Agent::do_capture() loop quits.
> 
> What benefits do you see in having multiple flags? Are there signals where we can quit the agent without interrupting the cursor thread or other activities?

No, I don't think there are. The reason is different. To me one global
"quit all" flag seems like a not well formed hierarchy of execution,
for one, it could introduce race conditions, if we generalize and say
we have several losely dependant threads/processes and you signal them
all to quit, the order they do so is arbitrary. Of course you can
introduce mechanisms like join() etc. to synchronize.

But what I would consider a better design would be one main loop in the
main thread, that also reacts to the signals. If the main loop exits,
then it would tear down the other threads signalling them in whatever
way is natural to the thread in question and waiting for them to
finish. So you define the hierarchy, there is the main thread taking
responsibility and subthreads that are managed by it.

> To me, quit_requested is the archetypical example of when a global variable should be used. There is only one “quit”, and it’s for all threads and objects in the program. How each one of them deals with it is local, but the “we must quit” request is global.

As I explained. I'm not necessarily saying your approach is wrong, just
that:

1. I prefer to explicitely define a main thread that has the management
responsibility.

2. The static flag shared across classes simply seems to me a thing to
avoid, it breaks modularity and for example makes it a bit more fiddly
to use the modules in tests for example...

> > 
> > > 
> > > > 
> > > > > +
> > > > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> > > > > diff --git a/src/Makefile.am b/src/Makefile.am
> > > > > index 2507844..923a103 100644
> > > > > --- a/src/Makefile.am
> > > > > +++ b/src/Makefile.am
> > > > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> > > > > 	mjpeg-fallback.hpp \
> > > > > 	jpeg.cpp \
> > > > > 	jpeg.hpp \
> > > > > +	stream.cpp \
> > > > > +	stream.hpp \
> > > > > 	errors.cpp \
> > > > > 	$(NULL)
> > > > > diff --git a/src/message.hpp b/src/message.hpp
> > > > > new file mode 100644
> > > > > index 0000000..28b3e28
> > > > > --- /dev/null
> > > > > +++ b/src/message.hpp
> > > > > @@ -0,0 +1,41 @@
> > > > > +/* Formatting messages
> > > > > + *
> > > > > + * \copyright
> > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > + */
> > > > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > +
> > > > > +#include <spice/stream-device.h>
> > > > > +
> > > > > +namespace spice
> > > > > +{
> > > > > +namespace streaming_agent
> > > > > +{
> > > > > +
> > > > > +template <typename Payload, typename Info, unsigned Type>
> > > > > +class Message
> > > > > +{
> > > > > +public:
> > > > > +    template <typename ...PayloadArgs>
> > > > > +    Message(PayloadArgs... payload)
> > > > > +        : hdr(StreamDevHeader {
> > > > > +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > > > +              .type = Type,
> > > > > +              .size = (uint32_t) Info::size(payload...)
> > > > > +          })
> > > > > +    { }
> > > > > +    void write_header(Stream &stream)
> > > > > +    {
> > > > > +        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > +    }
> > > > > +
> > > > > +protected:
> > > > > +    StreamDevHeader hdr;
> > > > > +    typedef Payload payload_t;
> > > > > +};
> > > > > +
> > > > > +}} // namespace spice::streaming_agent
> > > > > +
> > > > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > > > index 35e65bb..c401a34 100644
> > > > > --- a/src/spice-streaming-agent.cpp
> > > > > +++ b/src/spice-streaming-agent.cpp
> > > > > @@ -5,6 +5,8 @@
> > > > > */
> > > > > 
> > > > > #include "concrete-agent.hpp"
> > > > > +#include "stream.hpp"
> > > > > +#include "message.hpp"
> > > > > #include "hexdump.h"
> > > > > #include "mjpeg-fallback.hpp"
> > > > > 
> > > > > @@ -21,11 +23,9 @@
> > > > > #include <inttypes.h>
> > > > > #include <string.h>
> > > > > #include <getopt.h>
> > > > > -#include <unistd.h>
> > > > > #include <errno.h>
> > > > > -#include <fcntl.h>
> > > > > +#include <unistd.h>
> > > > > #include <sys/time.h>
> > > > > -#include <poll.h>
> > > > > #include <syslog.h>
> > > > > #include <signal.h>
> > > > > #include <exception>
> > > > > @@ -57,76 +57,6 @@ static uint64_t get_time(void)
> > > > > 
> > > > > }
> > > > > 
> > > > > -class Stream
> > > > > -{
> > > > > -    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > -
> > > > > -public:
> > > > > -    Stream(const char *name)
> > > > > -        : codecs()
> > > > > -    {
> > > > > -        streamfd = open(name, O_RDWR);
> > > > > -        if (streamfd < 0) {
> > > > > -            throw IOError("failed to open streaming device", errno);
> > > > > -        }
> > > > > -    }
> > > > > -    ~Stream()
> > > > > -    {
> > > > > -        close(streamfd);
> > > > > -    }
> > > > > -
> > > > > -    const codecs_t &client_codecs() { return codecs; }
> > > > > -    bool streaming_requested() { return is_streaming; }
> > > > > -
> > > > > -    template <typename Message, typename ...PayloadArgs>
> > > > > -    void send(PayloadArgs... payload)
> > > > > -    {
> > > > > -        Message message(payload...);
> > > > > -        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > -        message.write_header(*this);
> > > > > -        message.write_message_body(*this, payload...);
> > > > > -    }
> > > > > -
> > > > > -    int read_command(bool blocking);
> > > > > -    void write_all(const char *operation, const void *buf, const size_t len);
> > > > > -
> > > > > -private:
> > > > > -    int have_something_to_read(int timeout);
> > > > > -    void handle_stream_start_stop(uint32_t len);
> > > > > -    void handle_stream_capabilities(uint32_t len);
> > > > > -    void handle_stream_error(uint32_t len);
> > > > > -    void read_command_from_device(void);
> > > > > -
> > > > > -private:
> > > > > -    std::mutex mutex;
> > > > > -    codecs_t codecs;
> > > > > -    int streamfd = -1;
> > > > > -    bool is_streaming = false;
> > > > > -};
> > > > > -
> > > > > -template <typename Payload, typename Info, unsigned Type>
> > > > > -class Message
> > > > > -{
> > > > > -public:
> > > > > -    template <typename ...PayloadArgs>
> > > > > -    Message(PayloadArgs... payload)
> > > > > -        : hdr(StreamDevHeader {
> > > > > -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > > > -              .type = Type,
> > > > > -              .size = (uint32_t) Info::size(payload...)
> > > > > -          })
> > > > > -    { }
> > > > > -    void write_header(Stream &stream)
> > > > > -    {
> > > > > -        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > -    }
> > > > > -
> > > > > -protected:
> > > > > -    StreamDevHeader hdr;
> > > > > -    typedef Payload payload_t;
> > > > > -};
> > > > > -
> > > > > class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> > > > > {
> > > > > public:
> > > > > @@ -156,20 +86,6 @@ public:
> > > > >    }
> > > > > };
> > > > > 
> > > > > -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> > > > > -{
> > > > > -public:
> > > > > -    CapabilitiesMessage() : Message() {}
> > > > > -    static size_t size()
> > > > > -    {
> > > > > -        return sizeof(payload_t);
> > > > > -    }
> > > > > -    void write_message_body(Stream &stream)
> > > > > -    {
> > > > > -        /* No body for capabilities message */
> > > > > -    }
> > > > > -};
> > > > > -
> > > > > class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> > > > > {
> > > > > public:
> > > > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
> > > > > 
> > > > > }} // namespace spice::streaming_agent
> > > > > 
> > > > > -static bool quit_requested = false;
> > > > > -
> > > > > -int Stream::have_something_to_read(int timeout)
> > > > > -{
> > > > > -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > -
> > > > > -    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > -        syslog(LOG_ERR, "poll FAILED\n");
> > > > > -        return -1;
> > > > > -    }
> > > > > -
> > > > > -    if (pollfd.revents == POLLIN) {
> > > > > -        return 1;
> > > > > -    }
> > > > > -
> > > > > -    return 0;
> > > > > -}
> > > > > -
> > > > > -void Stream::handle_stream_start_stop(uint32_t len)
> > > > > -{
> > > > > -    uint8_t msg[256];
> > > > > -
> > > > > -    if (len >= sizeof(msg)) {
> > > > > -        throw MessageDataError("message is too long", len, sizeof(msg));
> > > > > -    }
> > > > > -    int n = read(streamfd, &msg, len);
> > > > > -    if (n != (int) len) {
> > > > > -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > > > -    }
> > > > > -    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > > > -           is_streaming ? "START" : "STOP");
> > > > > -    codecs.clear();
> > > > > -    for (int i = 1; i <= msg[0]; ++i) {
> > > > > -        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > -    }
> > > > > -}
> > > > > -
> > > > > -void Stream::handle_stream_capabilities(uint32_t len)
> > > > > -{
> > > > > -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > -
> > > > > -    if (len > sizeof(caps)) {
> > > > > -        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > > > -    }
> > > > > -    int n = read(streamfd, caps, len);
> > > > > -    if (n != (int) len) {
> > > > > -        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > > > -    }
> > > > > -
> > > > > -    // we currently do not support extensions so just reply so
> > > > > -    send<CapabilitiesMessage>();
> > > > > -}
> > > > > -
> > > > > -void Stream::handle_stream_error(uint32_t len)
> > > > > -{
> > > > > -    // TODO read message and use it
> > > > > -    throw ProtocolError("got an error message from server");
> > > > > -}
> > > > > -
> > > > > -void Stream::read_command_from_device()
> > > > > -{
> > > > > -    StreamDevHeader hdr;
> > > > > -    int n;
> > > > > -
> > > > > -    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > -    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > -    if (n != sizeof(hdr)) {
> > > > > -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > > > -    }
> > > > > -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > -    }
> > > > > -
> > > > > -    switch (hdr.type) {
> > > > > -    case STREAM_TYPE_CAPABILITIES:
> > > > > -        return handle_stream_capabilities(hdr.size);
> > > > > -    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > -        return handle_stream_error(hdr.size);
> > > > > -    case STREAM_TYPE_START_STOP:
> > > > > -        return handle_stream_start_stop(hdr.size);
> > > > > -    }
> > > > > -    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > -}
> > > > > -
> > > > > -int Stream::read_command(bool blocking)
> > > > > -{
> > > > > -    int timeout = blocking?-1:0;
> > > > > -    while (!quit_requested) {
> > > > > -        if (!have_something_to_read(timeout)) {
> > > > > -            if (!blocking) {
> > > > > -                return 0;
> > > > > -            }
> > > > > -            sleep(1);
> > > > > -            continue;
> > > > > -        }
> > > > > -        read_command_from_device();
> > > > > -        break;
> > > > > -    }
> > > > > -
> > > > > -    return 1;
> > > > > -}
> > > > > -
> > > > > -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > > > -{
> > > > > -    size_t written = 0;
> > > > > -    while (written < len) {
> > > > > -        int l = write(streamfd, (const char *) buf + written, len - written);
> > > > > -        if (l < 0) {
> > > > > -            if (errno == EINTR) {
> > > > > -                continue;
> > > > > -            }
> > > > > -            throw WriteError("write failed", operation, errno).syslog();
> > > > > -        }
> > > > > -        written += l;
> > > > > -    }
> > > > > -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > > > -}
> > > > > +bool quit_requested = false;
> > > > > 
> > > > > static void handle_interrupt(int intr)
> > > > > {
> > > > > diff --git a/src/stream.cpp b/src/stream.cpp
> > > > > new file mode 100644
> > > > > index 0000000..f756097
> > > > > --- /dev/null
> > > > > +++ b/src/stream.cpp
> > > > > @@ -0,0 +1,172 @@
> > > > > +/* Encapsulation of the stream used to communicate between agent and server
> > > > > + *
> > > > > + * \copyright
> > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > + */
> > > > > +
> > > > > +#include "stream.hpp"
> > > > > +#include "message.hpp"
> > > > > +
> > > > > +#include <spice/stream-device.h>
> > > > > +
> > > > > +#include <spice-streaming-agent/errors.hpp>
> > > > > +
> > > > > +#include <sys/types.h>
> > > > > +#include <sys/stat.h>
> > > > > +#include <fcntl.h>
> > > > > +#include <poll.h>
> > > > > +#include <syslog.h>
> > > > > +#include <unistd.h>
> > > > > +
> > > > > +namespace spice
> > > > > +{
> > > > > +namespace streaming_agent
> > > > > +{
> > > > > +
> > > > > +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> > > > > +                                           STREAM_TYPE_CAPABILITIES>
> > > > > +{
> > > > > +public:
> > > > > +    CapabilitiesMessage() : Message() {}
> > > > > +    static size_t size()
> > > > > +    {
> > > > > +        return sizeof(payload_t);
> > > > > +    }
> > > > > +    void write_message_body(Stream &stream)
> > > > > +    {
> > > > > +        /* No body for capabilities message */
> > > > > +    }
> > > > > +};
> > > > 
> > > > Not sure I like scattering the messages across source files that happen
> > > > to use them, though I suppose you did it because each message (like the
> > > > X11Cursor) may require different header files included? Perhaps it is
> > > > the way to go…
> > > 
> > > No, it’s really to de-couple things, a good way to check if encapsulation was correct.
> > > 
> > > 
> > > > 
> > > > > +
> > > > > +Stream::Stream(const char *name)
> > > > > +    : codecs()
> > > > > +{
> > > > > +    streamfd = open(name, O_RDWR);
> > > > > +    if (streamfd < 0) {
> > > > > +        throw IOError("failed to open streaming device", errno);
> > > > > +    }
> > > > > +}
> > > > > +
> > > > > +Stream::~Stream()
> > > > > +{
> > > > > +    close(streamfd);
> > > > > +}
> > > > > +
> > > > > +int Stream::have_something_to_read(int timeout)
> > > > > +{
> > > > > +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > +
> > > > > +    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > +        syslog(LOG_ERR, "poll FAILED\n");
> > > > > +        return -1;
> > > > > +    }
> > > > > +
> > > > > +    if (pollfd.revents == POLLIN) {
> > > > > +        return 1;
> > > > > +    }
> > > > > +
> > > > > +    return 0;
> > > > > +}
> > > > > +
> > > > > +void Stream::handle_stream_start_stop(uint32_t len)
> > > > > +{
> > > > > +    uint8_t msg[256];
> > > > > +
> > > > > +    if (len >= sizeof(msg)) {
> > > > > +        throw MessageDataError("message is too long", len, sizeof(msg));
> > > > > +    }
> > > > > +    int n = read(streamfd, &msg, len);
> > > > > +    if (n != (int) len) {
> > > > > +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > > > +    }
> > > > > +    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > > > +           is_streaming ? "START" : "STOP");
> > > > > +    codecs.clear();
> > > > > +    for (int i = 1; i <= msg[0]; ++i) {
> > > > > +        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > +    }
> > > > > +}
> > > > > +
> > > > > +void Stream::handle_stream_capabilities(uint32_t len)
> > > > > +{
> > > > > +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > +
> > > > > +    if (len > sizeof(caps)) {
> > > > > +        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > > > +    }
> > > > > +    int n = read(streamfd, caps, len);
> > > > > +    if (n != (int) len) {
> > > > > +        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > > > +    }
> > > > > +
> > > > > +    // we currently do not support extensions so just reply so
> > > > > +    send<CapabilitiesMessage>();
> > > > > +}
> > > > > +
> > > > > +void Stream::handle_stream_error(uint32_t len)
> > > > > +{
> > > > > +    // TODO read message and use it
> > > > > +    throw ProtocolError("got an error message from server");
> > > > > +}
> > > > > +
> > > > > +void Stream::read_command_from_device()
> > > > > +{
> > > > > +    StreamDevHeader hdr;
> > > > > +    int n;
> > > > > +
> > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > +    if (n != sizeof(hdr)) {
> > > > > +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > > > +    }
> > > > > +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > +    }
> > > > > +
> > > > > +    switch (hdr.type) {
> > > > > +    case STREAM_TYPE_CAPABILITIES:
> > > > > +        return handle_stream_capabilities(hdr.size);
> > > > > +    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > +        return handle_stream_error(hdr.size);
> > > > > +    case STREAM_TYPE_START_STOP:
> > > > > +        return handle_stream_start_stop(hdr.size);
> > > > > +    }
> > > > > +    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > +}
> > > > > +
> > > > > +int Stream::read_command(bool blocking)
> > > > > +{
> > > > > +    int timeout = blocking?-1:0;
> > > > > +    while (!quit_requested) {
> > > > > +        if (!have_something_to_read(timeout)) {
> > > > > +            if (!blocking) {
> > > > > +                return 0;
> > > > > +            }
> > > > > +            sleep(1);
> > > > > +            continue;
> > > > > +        }
> > > > > +        read_command_from_device();
> > > > > +        break;
> > > > > +    }
> > > > > +
> > > > > +    return 1;
> > > > > +}
> > > > > +
> > > > > +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > > > +{
> > > > > +    size_t written = 0;
> > > > > +    while (written < len) {
> > > > > +        int l = write(streamfd, (const char *) buf + written, len - written);
> > > > > +        if (l < 0) {
> > > > > +            if (errno == EINTR) {
> > > > > +                continue;
> > > > > +            }
> > > > > +            throw WriteError("write failed", operation, errno).syslog();
> > > > > +        }
> > > > > +        written += l;
> > > > > +    }
> > > > > +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > > > +}
> > > > > +
> > > > > +}} // namespace spice::streaming_agent
> > > > > diff --git a/src/stream.hpp b/src/stream.hpp
> > > > > new file mode 100644
> > > > > index 0000000..b689f36
> > > > > --- /dev/null
> > > > > +++ b/src/stream.hpp
> > > > > @@ -0,0 +1,55 @@
> > > > > +/* Encapsulation of the stream used to communicate between agent and server
> > > > > + *
> > > > > + * \copyright
> > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > + */
> > > > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > +#define SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > +
> > > > > +#include <spice/enums.h>
> > > > > +#include <set>
> > > > > +#include <mutex>
> > > > > +
> > > > > +namespace spice {
> > > > > +namespace streaming_agent {
> > > > > +
> > > > > +class Stream
> > > > > +{
> > > > > +    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > +
> > > > > +public:
> > > > > +    Stream(const char *name);
> > > > > +    ~Stream();
> > > > > +
> > > > > +    const codecs_t &client_codecs() { return codecs; }
> > > > > +    bool streaming_requested() { return is_streaming; }
> > > > > +
> > > > > +    template <typename Message, typename ...PayloadArgs>
> > > > > +    void send(PayloadArgs... payload)
> > > > > +    {
> > > > > +        Message message(payload...);
> > > > > +        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +        message.write_header(*this);
> > > > > +        message.write_message_body(*this, payload...);
> > > > > +    }
> > > > > +
> > > > > +    int read_command(bool blocking);
> > > > > +    void write_all(const char *operation, const void *buf, const size_t len);
> > > > > +
> > > > > +private:
> > > > > +    int have_something_to_read(int timeout);
> > > > > +    void handle_stream_start_stop(uint32_t len);
> > > > > +    void handle_stream_capabilities(uint32_t len);
> > > > > +    void handle_stream_error(uint32_t len);
> > > > > +    void read_command_from_device(void);
> > > > > +
> > > > > +private:
> > > > > +    std::mutex mutex;
> > > > > +    codecs_t codecs;
> > > > > +    int streamfd = -1;
> > > > > +    bool is_streaming = false;
> > > > > +};
> > > > > +
> > > > > +}} // namespace spice::streaming_agent
> > > > > +
> > > > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> 
> 


More information about the Spice-devel mailing list