[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files

Christophe de Dinechin cdupontd at redhat.com
Fri Mar 2 18:25:58 UTC 2018



> On 2 Mar 2018, at 16:46, Lukáš Hrázký <lhrazky at redhat.com> wrote:
> 
> On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote:
>>> On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhrazky at redhat.com> wrote:
>>> 
>>> On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
>>>>> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky at redhat.com> wrote:
>>>>> 
>>>>> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>>>>>> From: Christophe de Dinechin <dinechin at redhat.com>
>>>>>> 
>>>>>> Doing this change will make it possible to move the capture loop to the
>>>>>> concrete-agent.cpp file.
>>>>>> 
>>>>>> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
>>>>>> ---
>>>>>> include/spice-streaming-agent/errors.hpp |   2 +
>>>>>> src/Makefile.am                          |   2 +
>>>>>> src/message.hpp                          |  41 ++++++
>>>>>> src/spice-streaming-agent.cpp            | 209 +------------------------------
>>>>>> src/stream.cpp                           | 172 +++++++++++++++++++++++++
>>>>>> src/stream.hpp                           |  55 ++++++++
>>>>>> 6 files changed, 276 insertions(+), 205 deletions(-)
>>>>>> create mode 100644 src/message.hpp
>>>>>> create mode 100644 src/stream.cpp
>>>>>> create mode 100644 src/stream.hpp
>>>>>> 
>>>>>> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
>>>>>> index 870a0fd..62ae010 100644
>>>>>> --- a/include/spice-streaming-agent/errors.hpp
>>>>>> +++ b/include/spice-streaming-agent/errors.hpp
>>>>>> @@ -90,4 +90,6 @@ protected:
>>>>>> 
>>>>>> }} // namespace spice::streaming_agent
>>>>>> 
>>>>>> +extern bool quit_requested;
>>>>> 
>>>>> Putting quit_requested into errors.hpp? Why?
>>>> 
>>>> Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
>>> 
>>> I don't think the flag belongs to the errors header at all, let alone a
>>> public one. It's a generic control flow mechanism to signal the
>>> termination of the program. The only relation to errors is that in case
>>> of errors you want to (usually) also quit.
>> 
>> Well, ‘quit_requested’ is set for all “final" errors, whether we detect them using exceptions or signals.
> 
> But a termination signal is not an error, it is the natural way to end
> the program. Actually, exceptions thrown inside the main program loop
> are another, second way to exit the loop besides setting the quit flag.

The termination signal in itself is not an error. But it leads to either an error in one of the system calls, or a graceful interruption if we are elsewhere, see below.

> 
> From another point of view, you include errors.h in each place where
> you are either throwing or catching exceptions. But the quit flag is
> only needed in the main loop and in the signal handler.

Actually, for correct operation, it’s also needed in read_all and poll, as these system calls may be interrupted. The current code is broken, I have another fix but that was not submitted yet. Something like: https://github.com/c3d/spice-streaming-agent/commit/de11d77e9b6fcb92f29eacc7da178624783aea6b (similar tests are needed in read_all and write_all, I believe). This is what fixes the “Control-C” problem.

Now, I see these as “OS errors” that are not reported through exception but using errno. The proper handling of this kind of OS errors requires a check of quit_requested.

Obviously, we can put this flag somewhere else if that really annoys you, but frankly, I think that errors.hpp is as good a place as any.

> 
>>> 
>>> Therefore the quit flag should not be coupled with errors and instead
>>> used in the main control flow spot where global error handling is
>>> taking place (and then in the signal handler). As a static variable it
>>> should also be proliferating through the program as little as possible.
>>> 
>>> I've actually had the following idea for the quit flag, which I think
>>> promotes the locality of the flag in the class design:
>>> 
>>> 
>>> // file agent.{hpp,cpp}
>>> class Agent {
>>> public:
>>>   bool& quit_flag() {
>>>       return quit_requested;
>>>   }
>>> 
>>>   void do_capture() {
>>>       while(!quit_requested) {
>>>           // ...
>>>       }
>>>   }
>>> private:
>>>   bool quit_requested = false;
>>> };
>>> 
>>> 
>>> // file main.cpp
>>> static bool* quit_requested = nullptr;
>>> 
>>> void handle_sigterm() {
>>>   *quit_requested = true;
>>> }
>>> 
>>> int main() {
>>>   Agent agent;
>>>   quit_requested = &agent.quit_flag();
>>>   ...
>>> }
>>> 
>>> 
>>> Some corner case handling (the quit_reqested pointer not yet set, etc.)
>>> was left out for clarity.
>> 
>> I understand what you are trying to do, but you replace one global variable with one (and several if I follow the logic below), so how is helping with the proliferation?
> 
> Only this one global variable that is in the example. And the only
> reason it is global is because of the signal handler, there is no other
> way for it work. Therefore, I only make it "global locally" (excuse the
> oxymoron :)) for the signal handler and limit what can access it as
> much as I can.
> 
> (And it's just a pointer, the real flag is local to the loop which uses
> it)

Adding one level of indirection here does not look like an improvement to me.

If you want some kind of encapsulation, why not make the signal handler and the quit flag static members of the agent instead? Would that work for you? Something like this: https://github.com/c3d/spice-streaming-agent/commit/077ad90ad2f923b90546a3be99988ddf45746ea7 ?

> 
>>> 
>>> This keeps the flag local to the loop in which it is relevant and the
>>> static variable local to the main.cpp file. Thus it increases
>>> modularity (which arguably we do not need that much here).
>>> 
>>> I would also use a different quit flag for the cursor thread (again,
>>> local to the X11CursorUpdater) and take care of it in main() after the
>>> Agent::do_capture() loop quits.
>> 
>> What benefits do you see in having multiple flags? Are there signals where we can quit the agent without interrupting the cursor thread or other activities?
> 
> No, I don't think there are. The reason is different. To me one global
> "quit all" flag seems like a not well formed hierarchy of execution,
> for one, it could introduce race conditions, if we generalize and say
> we have several losely dependant threads/processes and you signal them
> all to quit, the order they do so is arbitrary. Of course you can
> introduce mechanisms like join() etc. to synchronize.
 
We have such issues presently with the cursor thread. This is fixed in my series, it now exits cleanly in all test cases I threw at it instead of aborting or terminating.

> 
> But what I would consider a better design would be one main loop in the
> main thread, that also reacts to the signals. If the main loop exits,
> then it would tear down the other threads signalling them in whatever
> way is natural to the thread in question and waiting for them to
> finish. So you define the hierarchy, there is the main thread taking
> responsibility and subthreads that are managed by it.

This is more or less what I was describing with the quit flag and signal handler being static members of the agent. I’m OK with that.

> 
>> To me, quit_requested is the archetypical example of when a global variable should be used. There is only one “quit”, and it’s for all threads and objects in the program. How each one of them deals with it is local, but the “we must quit” request is global.
> 
> As I explained. I'm not necessarily saying your approach is wrong, just that:
> 
> 1. I prefer to explicitely define a main thread that has the management responsibility.

As I see it, that’s how the code after refactoring works.

> 2. The static flag shared across classes simply seems to me a thing to
> avoid, it breaks modularity and for example makes it a bit more fiddly
> to use the modules in tests for example…

Except for something that is truly global.

I don’t mind making it a static member of the agent.

I do mind having multiple “quit” flags, or a pointer to the quit flag. What happens if the signal handler is invoked before the quit flag pointer is initialized, for example?


> 
>>> 
>>>> 
>>>>> 
>>>>>> +
>>>>>> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>>>>>> diff --git a/src/Makefile.am b/src/Makefile.am
>>>>>> index 2507844..923a103 100644
>>>>>> --- a/src/Makefile.am
>>>>>> +++ b/src/Makefile.am
>>>>>> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>>>>>> 	mjpeg-fallback.hpp \
>>>>>> 	jpeg.cpp \
>>>>>> 	jpeg.hpp \
>>>>>> +	stream.cpp \
>>>>>> +	stream.hpp \
>>>>>> 	errors.cpp \
>>>>>> 	$(NULL)
>>>>>> diff --git a/src/message.hpp b/src/message.hpp
>>>>>> new file mode 100644
>>>>>> index 0000000..28b3e28
>>>>>> --- /dev/null
>>>>>> +++ b/src/message.hpp
>>>>>> @@ -0,0 +1,41 @@
>>>>>> +/* Formatting messages
>>>>>> + *
>>>>>> + * \copyright
>>>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>>>> + */
>>>>>> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>>>> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>>>> +
>>>>>> +#include <spice/stream-device.h>
>>>>>> +
>>>>>> +namespace spice
>>>>>> +{
>>>>>> +namespace streaming_agent
>>>>>> +{
>>>>>> +
>>>>>> +template <typename Payload, typename Info, unsigned Type>
>>>>>> +class Message
>>>>>> +{
>>>>>> +public:
>>>>>> +    template <typename ...PayloadArgs>
>>>>>> +    Message(PayloadArgs... payload)
>>>>>> +        : hdr(StreamDevHeader {
>>>>>> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
>>>>>> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>>>>>> +              .type = Type,
>>>>>> +              .size = (uint32_t) Info::size(payload...)
>>>>>> +          })
>>>>>> +    { }
>>>>>> +    void write_header(Stream &stream)
>>>>>> +    {
>>>>>> +        stream.write_all("header", &hdr, sizeof(hdr));
>>>>>> +    }
>>>>>> +
>>>>>> +protected:
>>>>>> +    StreamDevHeader hdr;
>>>>>> +    typedef Payload payload_t;
>>>>>> +};
>>>>>> +
>>>>>> +}} // namespace spice::streaming_agent
>>>>>> +
>>>>>> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>>>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>>>>>> index 35e65bb..c401a34 100644
>>>>>> --- a/src/spice-streaming-agent.cpp
>>>>>> +++ b/src/spice-streaming-agent.cpp
>>>>>> @@ -5,6 +5,8 @@
>>>>>> */
>>>>>> 
>>>>>> #include "concrete-agent.hpp"
>>>>>> +#include "stream.hpp"
>>>>>> +#include "message.hpp"
>>>>>> #include "hexdump.h"
>>>>>> #include "mjpeg-fallback.hpp"
>>>>>> 
>>>>>> @@ -21,11 +23,9 @@
>>>>>> #include <inttypes.h>
>>>>>> #include <string.h>
>>>>>> #include <getopt.h>
>>>>>> -#include <unistd.h>
>>>>>> #include <errno.h>
>>>>>> -#include <fcntl.h>
>>>>>> +#include <unistd.h>
>>>>>> #include <sys/time.h>
>>>>>> -#include <poll.h>
>>>>>> #include <syslog.h>
>>>>>> #include <signal.h>
>>>>>> #include <exception>
>>>>>> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>>>>>> 
>>>>>> }
>>>>>> 
>>>>>> -class Stream
>>>>>> -{
>>>>>> -    typedef std::set<SpiceVideoCodecType> codecs_t;
>>>>>> -
>>>>>> -public:
>>>>>> -    Stream(const char *name)
>>>>>> -        : codecs()
>>>>>> -    {
>>>>>> -        streamfd = open(name, O_RDWR);
>>>>>> -        if (streamfd < 0) {
>>>>>> -            throw IOError("failed to open streaming device", errno);
>>>>>> -        }
>>>>>> -    }
>>>>>> -    ~Stream()
>>>>>> -    {
>>>>>> -        close(streamfd);
>>>>>> -    }
>>>>>> -
>>>>>> -    const codecs_t &client_codecs() { return codecs; }
>>>>>> -    bool streaming_requested() { return is_streaming; }
>>>>>> -
>>>>>> -    template <typename Message, typename ...PayloadArgs>
>>>>>> -    void send(PayloadArgs... payload)
>>>>>> -    {
>>>>>> -        Message message(payload...);
>>>>>> -        std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> -        message.write_header(*this);
>>>>>> -        message.write_message_body(*this, payload...);
>>>>>> -    }
>>>>>> -
>>>>>> -    int read_command(bool blocking);
>>>>>> -    void write_all(const char *operation, const void *buf, const size_t len);
>>>>>> -
>>>>>> -private:
>>>>>> -    int have_something_to_read(int timeout);
>>>>>> -    void handle_stream_start_stop(uint32_t len);
>>>>>> -    void handle_stream_capabilities(uint32_t len);
>>>>>> -    void handle_stream_error(uint32_t len);
>>>>>> -    void read_command_from_device(void);
>>>>>> -
>>>>>> -private:
>>>>>> -    std::mutex mutex;
>>>>>> -    codecs_t codecs;
>>>>>> -    int streamfd = -1;
>>>>>> -    bool is_streaming = false;
>>>>>> -};
>>>>>> -
>>>>>> -template <typename Payload, typename Info, unsigned Type>
>>>>>> -class Message
>>>>>> -{
>>>>>> -public:
>>>>>> -    template <typename ...PayloadArgs>
>>>>>> -    Message(PayloadArgs... payload)
>>>>>> -        : hdr(StreamDevHeader {
>>>>>> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
>>>>>> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>>>>>> -              .type = Type,
>>>>>> -              .size = (uint32_t) Info::size(payload...)
>>>>>> -          })
>>>>>> -    { }
>>>>>> -    void write_header(Stream &stream)
>>>>>> -    {
>>>>>> -        stream.write_all("header", &hdr, sizeof(hdr));
>>>>>> -    }
>>>>>> -
>>>>>> -protected:
>>>>>> -    StreamDevHeader hdr;
>>>>>> -    typedef Payload payload_t;
>>>>>> -};
>>>>>> -
>>>>>> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>>>>>> {
>>>>>> public:
>>>>>> @@ -156,20 +86,6 @@ public:
>>>>>>   }
>>>>>> };
>>>>>> 
>>>>>> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>>>>>> -{
>>>>>> -public:
>>>>>> -    CapabilitiesMessage() : Message() {}
>>>>>> -    static size_t size()
>>>>>> -    {
>>>>>> -        return sizeof(payload_t);
>>>>>> -    }
>>>>>> -    void write_message_body(Stream &stream)
>>>>>> -    {
>>>>>> -        /* No body for capabilities message */
>>>>>> -    }
>>>>>> -};
>>>>>> -
>>>>>> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>>>>>> {
>>>>>> public:
>>>>>> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>>>>>> 
>>>>>> }} // namespace spice::streaming_agent
>>>>>> 
>>>>>> -static bool quit_requested = false;
>>>>>> -
>>>>>> -int Stream::have_something_to_read(int timeout)
>>>>>> -{
>>>>>> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>>>> -
>>>>>> -    if (poll(&pollfd, 1, timeout) < 0) {
>>>>>> -        syslog(LOG_ERR, "poll FAILED\n");
>>>>>> -        return -1;
>>>>>> -    }
>>>>>> -
>>>>>> -    if (pollfd.revents == POLLIN) {
>>>>>> -        return 1;
>>>>>> -    }
>>>>>> -
>>>>>> -    return 0;
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::handle_stream_start_stop(uint32_t len)
>>>>>> -{
>>>>>> -    uint8_t msg[256];
>>>>>> -
>>>>>> -    if (len >= sizeof(msg)) {
>>>>>> -        throw MessageDataError("message is too long", len, sizeof(msg));
>>>>>> -    }
>>>>>> -    int n = read(streamfd, &msg, len);
>>>>>> -    if (n != (int) len) {
>>>>>> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>>>>>> -    }
>>>>>> -    is_streaming = (msg[0] != 0); /* num_codecs */
>>>>>> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>>>>>> -           is_streaming ? "START" : "STOP");
>>>>>> -    codecs.clear();
>>>>>> -    for (int i = 1; i <= msg[0]; ++i) {
>>>>>> -        codecs.insert((SpiceVideoCodecType) msg[i]);
>>>>>> -    }
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::handle_stream_capabilities(uint32_t len)
>>>>>> -{
>>>>>> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>>>>>> -
>>>>>> -    if (len > sizeof(caps)) {
>>>>>> -        throw MessageDataError("capability message too long", len, sizeof(caps));
>>>>>> -    }
>>>>>> -    int n = read(streamfd, caps, len);
>>>>>> -    if (n != (int) len) {
>>>>>> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
>>>>>> -    }
>>>>>> -
>>>>>> -    // we currently do not support extensions so just reply so
>>>>>> -    send<CapabilitiesMessage>();
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::handle_stream_error(uint32_t len)
>>>>>> -{
>>>>>> -    // TODO read message and use it
>>>>>> -    throw ProtocolError("got an error message from server");
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::read_command_from_device()
>>>>>> -{
>>>>>> -    StreamDevHeader hdr;
>>>>>> -    int n;
>>>>>> -
>>>>>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> -    n = read(streamfd, &hdr, sizeof(hdr));
>>>>>> -    if (n != sizeof(hdr)) {
>>>>>> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>>>>>> -    }
>>>>>> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>>>>>> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>>>>>> -    }
>>>>>> -
>>>>>> -    switch (hdr.type) {
>>>>>> -    case STREAM_TYPE_CAPABILITIES:
>>>>>> -        return handle_stream_capabilities(hdr.size);
>>>>>> -    case STREAM_TYPE_NOTIFY_ERROR:
>>>>>> -        return handle_stream_error(hdr.size);
>>>>>> -    case STREAM_TYPE_START_STOP:
>>>>>> -        return handle_stream_start_stop(hdr.size);
>>>>>> -    }
>>>>>> -    throw MessageDataError("unknown message type", hdr.type, 0);
>>>>>> -}
>>>>>> -
>>>>>> -int Stream::read_command(bool blocking)
>>>>>> -{
>>>>>> -    int timeout = blocking?-1:0;
>>>>>> -    while (!quit_requested) {
>>>>>> -        if (!have_something_to_read(timeout)) {
>>>>>> -            if (!blocking) {
>>>>>> -                return 0;
>>>>>> -            }
>>>>>> -            sleep(1);
>>>>>> -            continue;
>>>>>> -        }
>>>>>> -        read_command_from_device();
>>>>>> -        break;
>>>>>> -    }
>>>>>> -
>>>>>> -    return 1;
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
>>>>>> -{
>>>>>> -    size_t written = 0;
>>>>>> -    while (written < len) {
>>>>>> -        int l = write(streamfd, (const char *) buf + written, len - written);
>>>>>> -        if (l < 0) {
>>>>>> -            if (errno == EINTR) {
>>>>>> -                continue;
>>>>>> -            }
>>>>>> -            throw WriteError("write failed", operation, errno).syslog();
>>>>>> -        }
>>>>>> -        written += l;
>>>>>> -    }
>>>>>> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>>>>>> -}
>>>>>> +bool quit_requested = false;
>>>>>> 
>>>>>> static void handle_interrupt(int intr)
>>>>>> {
>>>>>> diff --git a/src/stream.cpp b/src/stream.cpp
>>>>>> new file mode 100644
>>>>>> index 0000000..f756097
>>>>>> --- /dev/null
>>>>>> +++ b/src/stream.cpp
>>>>>> @@ -0,0 +1,172 @@
>>>>>> +/* Encapsulation of the stream used to communicate between agent and server
>>>>>> + *
>>>>>> + * \copyright
>>>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>>>> + */
>>>>>> +
>>>>>> +#include "stream.hpp"
>>>>>> +#include "message.hpp"
>>>>>> +
>>>>>> +#include <spice/stream-device.h>
>>>>>> +
>>>>>> +#include <spice-streaming-agent/errors.hpp>
>>>>>> +
>>>>>> +#include <sys/types.h>
>>>>>> +#include <sys/stat.h>
>>>>>> +#include <fcntl.h>
>>>>>> +#include <poll.h>
>>>>>> +#include <syslog.h>
>>>>>> +#include <unistd.h>
>>>>>> +
>>>>>> +namespace spice
>>>>>> +{
>>>>>> +namespace streaming_agent
>>>>>> +{
>>>>>> +
>>>>>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
>>>>>> +                                           STREAM_TYPE_CAPABILITIES>
>>>>>> +{
>>>>>> +public:
>>>>>> +    CapabilitiesMessage() : Message() {}
>>>>>> +    static size_t size()
>>>>>> +    {
>>>>>> +        return sizeof(payload_t);
>>>>>> +    }
>>>>>> +    void write_message_body(Stream &stream)
>>>>>> +    {
>>>>>> +        /* No body for capabilities message */
>>>>>> +    }
>>>>>> +};
>>>>> 
>>>>> Not sure I like scattering the messages across source files that happen
>>>>> to use them, though I suppose you did it because each message (like the
>>>>> X11Cursor) may require different header files included? Perhaps it is
>>>>> the way to go…
>>>> 
>>>> No, it’s really to de-couple things, a good way to check if encapsulation was correct.
>>>> 
>>>> 
>>>>> 
>>>>>> +
>>>>>> +Stream::Stream(const char *name)
>>>>>> +    : codecs()
>>>>>> +{
>>>>>> +    streamfd = open(name, O_RDWR);
>>>>>> +    if (streamfd < 0) {
>>>>>> +        throw IOError("failed to open streaming device", errno);
>>>>>> +    }
>>>>>> +}
>>>>>> +
>>>>>> +Stream::~Stream()
>>>>>> +{
>>>>>> +    close(streamfd);
>>>>>> +}
>>>>>> +
>>>>>> +int Stream::have_something_to_read(int timeout)
>>>>>> +{
>>>>>> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>>>> +
>>>>>> +    if (poll(&pollfd, 1, timeout) < 0) {
>>>>>> +        syslog(LOG_ERR, "poll FAILED\n");
>>>>>> +        return -1;
>>>>>> +    }
>>>>>> +
>>>>>> +    if (pollfd.revents == POLLIN) {
>>>>>> +        return 1;
>>>>>> +    }
>>>>>> +
>>>>>> +    return 0;
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::handle_stream_start_stop(uint32_t len)
>>>>>> +{
>>>>>> +    uint8_t msg[256];
>>>>>> +
>>>>>> +    if (len >= sizeof(msg)) {
>>>>>> +        throw MessageDataError("message is too long", len, sizeof(msg));
>>>>>> +    }
>>>>>> +    int n = read(streamfd, &msg, len);
>>>>>> +    if (n != (int) len) {
>>>>>> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>>>>>> +    }
>>>>>> +    is_streaming = (msg[0] != 0); /* num_codecs */
>>>>>> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>>>>>> +           is_streaming ? "START" : "STOP");
>>>>>> +    codecs.clear();
>>>>>> +    for (int i = 1; i <= msg[0]; ++i) {
>>>>>> +        codecs.insert((SpiceVideoCodecType) msg[i]);
>>>>>> +    }
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::handle_stream_capabilities(uint32_t len)
>>>>>> +{
>>>>>> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>>>>>> +
>>>>>> +    if (len > sizeof(caps)) {
>>>>>> +        throw MessageDataError("capability message too long", len, sizeof(caps));
>>>>>> +    }
>>>>>> +    int n = read(streamfd, caps, len);
>>>>>> +    if (n != (int) len) {
>>>>>> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
>>>>>> +    }
>>>>>> +
>>>>>> +    // we currently do not support extensions so just reply so
>>>>>> +    send<CapabilitiesMessage>();
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::handle_stream_error(uint32_t len)
>>>>>> +{
>>>>>> +    // TODO read message and use it
>>>>>> +    throw ProtocolError("got an error message from server");
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::read_command_from_device()
>>>>>> +{
>>>>>> +    StreamDevHeader hdr;
>>>>>> +    int n;
>>>>>> +
>>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> +    n = read(streamfd, &hdr, sizeof(hdr));
>>>>>> +    if (n != sizeof(hdr)) {
>>>>>> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>>>>>> +    }
>>>>>> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>>>>>> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>>>>>> +    }
>>>>>> +
>>>>>> +    switch (hdr.type) {
>>>>>> +    case STREAM_TYPE_CAPABILITIES:
>>>>>> +        return handle_stream_capabilities(hdr.size);
>>>>>> +    case STREAM_TYPE_NOTIFY_ERROR:
>>>>>> +        return handle_stream_error(hdr.size);
>>>>>> +    case STREAM_TYPE_START_STOP:
>>>>>> +        return handle_stream_start_stop(hdr.size);
>>>>>> +    }
>>>>>> +    throw MessageDataError("unknown message type", hdr.type, 0);
>>>>>> +}
>>>>>> +
>>>>>> +int Stream::read_command(bool blocking)
>>>>>> +{
>>>>>> +    int timeout = blocking?-1:0;
>>>>>> +    while (!quit_requested) {
>>>>>> +        if (!have_something_to_read(timeout)) {
>>>>>> +            if (!blocking) {
>>>>>> +                return 0;
>>>>>> +            }
>>>>>> +            sleep(1);
>>>>>> +            continue;
>>>>>> +        }
>>>>>> +        read_command_from_device();
>>>>>> +        break;
>>>>>> +    }
>>>>>> +
>>>>>> +    return 1;
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>>>>>> +{
>>>>>> +    size_t written = 0;
>>>>>> +    while (written < len) {
>>>>>> +        int l = write(streamfd, (const char *) buf + written, len - written);
>>>>>> +        if (l < 0) {
>>>>>> +            if (errno == EINTR) {
>>>>>> +                continue;
>>>>>> +            }
>>>>>> +            throw WriteError("write failed", operation, errno).syslog();
>>>>>> +        }
>>>>>> +        written += l;
>>>>>> +    }
>>>>>> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>>>>>> +}
>>>>>> +
>>>>>> +}} // namespace spice::streaming_agent
>>>>>> diff --git a/src/stream.hpp b/src/stream.hpp
>>>>>> new file mode 100644
>>>>>> index 0000000..b689f36
>>>>>> --- /dev/null
>>>>>> +++ b/src/stream.hpp
>>>>>> @@ -0,0 +1,55 @@
>>>>>> +/* Encapsulation of the stream used to communicate between agent and server
>>>>>> + *
>>>>>> + * \copyright
>>>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>>>> + */
>>>>>> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
>>>>>> +#define SPICE_STREAMING_AGENT_STREAM_HPP
>>>>>> +
>>>>>> +#include <spice/enums.h>
>>>>>> +#include <set>
>>>>>> +#include <mutex>
>>>>>> +
>>>>>> +namespace spice {
>>>>>> +namespace streaming_agent {
>>>>>> +
>>>>>> +class Stream
>>>>>> +{
>>>>>> +    typedef std::set<SpiceVideoCodecType> codecs_t;
>>>>>> +
>>>>>> +public:
>>>>>> +    Stream(const char *name);
>>>>>> +    ~Stream();
>>>>>> +
>>>>>> +    const codecs_t &client_codecs() { return codecs; }
>>>>>> +    bool streaming_requested() { return is_streaming; }
>>>>>> +
>>>>>> +    template <typename Message, typename ...PayloadArgs>
>>>>>> +    void send(PayloadArgs... payload)
>>>>>> +    {
>>>>>> +        Message message(payload...);
>>>>>> +        std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> +        message.write_header(*this);
>>>>>> +        message.write_message_body(*this, payload...);
>>>>>> +    }
>>>>>> +
>>>>>> +    int read_command(bool blocking);
>>>>>> +    void write_all(const char *operation, const void *buf, const size_t len);
>>>>>> +
>>>>>> +private:
>>>>>> +    int have_something_to_read(int timeout);
>>>>>> +    void handle_stream_start_stop(uint32_t len);
>>>>>> +    void handle_stream_capabilities(uint32_t len);
>>>>>> +    void handle_stream_error(uint32_t len);
>>>>>> +    void read_command_from_device(void);
>>>>>> +
>>>>>> +private:
>>>>>> +    std::mutex mutex;
>>>>>> +    codecs_t codecs;
>>>>>> +    int streamfd = -1;
>>>>>> +    bool is_streaming = false;
>>>>>> +};
>>>>>> +
>>>>>> +}} // namespace spice::streaming_agent
>>>>>> +
>>>>>> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP



More information about the Spice-devel mailing list