[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files

Christophe de Dinechin christophe.de.dinechin at gmail.com
Thu Mar 1 16:57:19 UTC 2018



> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky at redhat.com> wrote:
> 
> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>> From: Christophe de Dinechin <dinechin at redhat.com>
>> 
>> Doing this change will make it possible to move the capture loop to the
>> concrete-agent.cpp file.
>> 
>> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
>> ---
>> include/spice-streaming-agent/errors.hpp |   2 +
>> src/Makefile.am                          |   2 +
>> src/message.hpp                          |  41 ++++++
>> src/spice-streaming-agent.cpp            | 209 +------------------------------
>> src/stream.cpp                           | 172 +++++++++++++++++++++++++
>> src/stream.hpp                           |  55 ++++++++
>> 6 files changed, 276 insertions(+), 205 deletions(-)
>> create mode 100644 src/message.hpp
>> create mode 100644 src/stream.cpp
>> create mode 100644 src/stream.hpp
>> 
>> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
>> index 870a0fd..62ae010 100644
>> --- a/include/spice-streaming-agent/errors.hpp
>> +++ b/include/spice-streaming-agent/errors.hpp
>> @@ -90,4 +90,6 @@ protected:
>> 
>> }} // namespace spice::streaming_agent
>> 
>> +extern bool quit_requested;
> 
> Putting quit_requested into errors.hpp? Why?

Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.


> 
>> +
>> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>> diff --git a/src/Makefile.am b/src/Makefile.am
>> index 2507844..923a103 100644
>> --- a/src/Makefile.am
>> +++ b/src/Makefile.am
>> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>> 	mjpeg-fallback.hpp \
>> 	jpeg.cpp \
>> 	jpeg.hpp \
>> +	stream.cpp \
>> +	stream.hpp \
>> 	errors.cpp \
>> 	$(NULL)
>> diff --git a/src/message.hpp b/src/message.hpp
>> new file mode 100644
>> index 0000000..28b3e28
>> --- /dev/null
>> +++ b/src/message.hpp
>> @@ -0,0 +1,41 @@
>> +/* Formatting messages
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
>> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
>> +
>> +#include <spice/stream-device.h>
>> +
>> +namespace spice
>> +{
>> +namespace streaming_agent
>> +{
>> +
>> +template <typename Payload, typename Info, unsigned Type>
>> +class Message
>> +{
>> +public:
>> +    template <typename ...PayloadArgs>
>> +    Message(PayloadArgs... payload)
>> +        : hdr(StreamDevHeader {
>> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
>> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>> +              .type = Type,
>> +              .size = (uint32_t) Info::size(payload...)
>> +          })
>> +    { }
>> +    void write_header(Stream &stream)
>> +    {
>> +        stream.write_all("header", &hdr, sizeof(hdr));
>> +    }
>> +
>> +protected:
>> +    StreamDevHeader hdr;
>> +    typedef Payload payload_t;
>> +};
>> +
>> +}} // namespace spice::streaming_agent
>> +
>> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index 35e65bb..c401a34 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -5,6 +5,8 @@
>>  */
>> 
>> #include "concrete-agent.hpp"
>> +#include "stream.hpp"
>> +#include "message.hpp"
>> #include "hexdump.h"
>> #include "mjpeg-fallback.hpp"
>> 
>> @@ -21,11 +23,9 @@
>> #include <inttypes.h>
>> #include <string.h>
>> #include <getopt.h>
>> -#include <unistd.h>
>> #include <errno.h>
>> -#include <fcntl.h>
>> +#include <unistd.h>
>> #include <sys/time.h>
>> -#include <poll.h>
>> #include <syslog.h>
>> #include <signal.h>
>> #include <exception>
>> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>> 
>> }
>> 
>> -class Stream
>> -{
>> -    typedef std::set<SpiceVideoCodecType> codecs_t;
>> -
>> -public:
>> -    Stream(const char *name)
>> -        : codecs()
>> -    {
>> -        streamfd = open(name, O_RDWR);
>> -        if (streamfd < 0) {
>> -            throw IOError("failed to open streaming device", errno);
>> -        }
>> -    }
>> -    ~Stream()
>> -    {
>> -        close(streamfd);
>> -    }
>> -
>> -    const codecs_t &client_codecs() { return codecs; }
>> -    bool streaming_requested() { return is_streaming; }
>> -
>> -    template <typename Message, typename ...PayloadArgs>
>> -    void send(PayloadArgs... payload)
>> -    {
>> -        Message message(payload...);
>> -        std::lock_guard<std::mutex> stream_guard(mutex);
>> -        message.write_header(*this);
>> -        message.write_message_body(*this, payload...);
>> -    }
>> -
>> -    int read_command(bool blocking);
>> -    void write_all(const char *operation, const void *buf, const size_t len);
>> -
>> -private:
>> -    int have_something_to_read(int timeout);
>> -    void handle_stream_start_stop(uint32_t len);
>> -    void handle_stream_capabilities(uint32_t len);
>> -    void handle_stream_error(uint32_t len);
>> -    void read_command_from_device(void);
>> -
>> -private:
>> -    std::mutex mutex;
>> -    codecs_t codecs;
>> -    int streamfd = -1;
>> -    bool is_streaming = false;
>> -};
>> -
>> -template <typename Payload, typename Info, unsigned Type>
>> -class Message
>> -{
>> -public:
>> -    template <typename ...PayloadArgs>
>> -    Message(PayloadArgs... payload)
>> -        : hdr(StreamDevHeader {
>> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
>> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>> -              .type = Type,
>> -              .size = (uint32_t) Info::size(payload...)
>> -          })
>> -    { }
>> -    void write_header(Stream &stream)
>> -    {
>> -        stream.write_all("header", &hdr, sizeof(hdr));
>> -    }
>> -
>> -protected:
>> -    StreamDevHeader hdr;
>> -    typedef Payload payload_t;
>> -};
>> -
>> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>> {
>> public:
>> @@ -156,20 +86,6 @@ public:
>>     }
>> };
>> 
>> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>> -{
>> -public:
>> -    CapabilitiesMessage() : Message() {}
>> -    static size_t size()
>> -    {
>> -        return sizeof(payload_t);
>> -    }
>> -    void write_message_body(Stream &stream)
>> -    {
>> -        /* No body for capabilities message */
>> -    }
>> -};
>> -
>> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>> {
>> public:
>> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>> 
>> }} // namespace spice::streaming_agent
>> 
>> -static bool quit_requested = false;
>> -
>> -int Stream::have_something_to_read(int timeout)
>> -{
>> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
>> -
>> -    if (poll(&pollfd, 1, timeout) < 0) {
>> -        syslog(LOG_ERR, "poll FAILED\n");
>> -        return -1;
>> -    }
>> -
>> -    if (pollfd.revents == POLLIN) {
>> -        return 1;
>> -    }
>> -
>> -    return 0;
>> -}
>> -
>> -void Stream::handle_stream_start_stop(uint32_t len)
>> -{
>> -    uint8_t msg[256];
>> -
>> -    if (len >= sizeof(msg)) {
>> -        throw MessageDataError("message is too long", len, sizeof(msg));
>> -    }
>> -    int n = read(streamfd, &msg, len);
>> -    if (n != (int) len) {
>> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>> -    }
>> -    is_streaming = (msg[0] != 0); /* num_codecs */
>> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>> -           is_streaming ? "START" : "STOP");
>> -    codecs.clear();
>> -    for (int i = 1; i <= msg[0]; ++i) {
>> -        codecs.insert((SpiceVideoCodecType) msg[i]);
>> -    }
>> -}
>> -
>> -void Stream::handle_stream_capabilities(uint32_t len)
>> -{
>> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> -
>> -    if (len > sizeof(caps)) {
>> -        throw MessageDataError("capability message too long", len, sizeof(caps));
>> -    }
>> -    int n = read(streamfd, caps, len);
>> -    if (n != (int) len) {
>> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
>> -    }
>> -
>> -    // we currently do not support extensions so just reply so
>> -    send<CapabilitiesMessage>();
>> -}
>> -
>> -void Stream::handle_stream_error(uint32_t len)
>> -{
>> -    // TODO read message and use it
>> -    throw ProtocolError("got an error message from server");
>> -}
>> -
>> -void Stream::read_command_from_device()
>> -{
>> -    StreamDevHeader hdr;
>> -    int n;
>> -
>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>> -    n = read(streamfd, &hdr, sizeof(hdr));
>> -    if (n != sizeof(hdr)) {
>> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>> -    }
>> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>> -    }
>> -
>> -    switch (hdr.type) {
>> -    case STREAM_TYPE_CAPABILITIES:
>> -        return handle_stream_capabilities(hdr.size);
>> -    case STREAM_TYPE_NOTIFY_ERROR:
>> -        return handle_stream_error(hdr.size);
>> -    case STREAM_TYPE_START_STOP:
>> -        return handle_stream_start_stop(hdr.size);
>> -    }
>> -    throw MessageDataError("unknown message type", hdr.type, 0);
>> -}
>> -
>> -int Stream::read_command(bool blocking)
>> -{
>> -    int timeout = blocking?-1:0;
>> -    while (!quit_requested) {
>> -        if (!have_something_to_read(timeout)) {
>> -            if (!blocking) {
>> -                return 0;
>> -            }
>> -            sleep(1);
>> -            continue;
>> -        }
>> -        read_command_from_device();
>> -        break;
>> -    }
>> -
>> -    return 1;
>> -}
>> -
>> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> -{
>> -    size_t written = 0;
>> -    while (written < len) {
>> -        int l = write(streamfd, (const char *) buf + written, len - written);
>> -        if (l < 0) {
>> -            if (errno == EINTR) {
>> -                continue;
>> -            }
>> -            throw WriteError("write failed", operation, errno).syslog();
>> -        }
>> -        written += l;
>> -    }
>> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> -}
>> +bool quit_requested = false;
>> 
>> static void handle_interrupt(int intr)
>> {
>> diff --git a/src/stream.cpp b/src/stream.cpp
>> new file mode 100644
>> index 0000000..f756097
>> --- /dev/null
>> +++ b/src/stream.cpp
>> @@ -0,0 +1,172 @@
>> +/* Encapsulation of the stream used to communicate between agent and server
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +
>> +#include "stream.hpp"
>> +#include "message.hpp"
>> +
>> +#include <spice/stream-device.h>
>> +
>> +#include <spice-streaming-agent/errors.hpp>
>> +
>> +#include <sys/types.h>
>> +#include <sys/stat.h>
>> +#include <fcntl.h>
>> +#include <poll.h>
>> +#include <syslog.h>
>> +#include <unistd.h>
>> +
>> +namespace spice
>> +{
>> +namespace streaming_agent
>> +{
>> +
>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
>> +                                           STREAM_TYPE_CAPABILITIES>
>> +{
>> +public:
>> +    CapabilitiesMessage() : Message() {}
>> +    static size_t size()
>> +    {
>> +        return sizeof(payload_t);
>> +    }
>> +    void write_message_body(Stream &stream)
>> +    {
>> +        /* No body for capabilities message */
>> +    }
>> +};
> 
> Not sure I like scattering the messages across source files that happen
> to use them, though I suppose you did it because each message (like the
> X11Cursor) may require different header files included? Perhaps it is
> the way to go…

No, it’s really to de-couple things, a good way to check if encapsulation was correct.


> 
>> +
>> +Stream::Stream(const char *name)
>> +    : codecs()
>> +{
>> +    streamfd = open(name, O_RDWR);
>> +    if (streamfd < 0) {
>> +        throw IOError("failed to open streaming device", errno);
>> +    }
>> +}
>> +
>> +Stream::~Stream()
>> +{
>> +    close(streamfd);
>> +}
>> +
>> +int Stream::have_something_to_read(int timeout)
>> +{
>> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
>> +
>> +    if (poll(&pollfd, 1, timeout) < 0) {
>> +        syslog(LOG_ERR, "poll FAILED\n");
>> +        return -1;
>> +    }
>> +
>> +    if (pollfd.revents == POLLIN) {
>> +        return 1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +void Stream::handle_stream_start_stop(uint32_t len)
>> +{
>> +    uint8_t msg[256];
>> +
>> +    if (len >= sizeof(msg)) {
>> +        throw MessageDataError("message is too long", len, sizeof(msg));
>> +    }
>> +    int n = read(streamfd, &msg, len);
>> +    if (n != (int) len) {
>> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>> +    }
>> +    is_streaming = (msg[0] != 0); /* num_codecs */
>> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>> +           is_streaming ? "START" : "STOP");
>> +    codecs.clear();
>> +    for (int i = 1; i <= msg[0]; ++i) {
>> +        codecs.insert((SpiceVideoCodecType) msg[i]);
>> +    }
>> +}
>> +
>> +void Stream::handle_stream_capabilities(uint32_t len)
>> +{
>> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> +
>> +    if (len > sizeof(caps)) {
>> +        throw MessageDataError("capability message too long", len, sizeof(caps));
>> +    }
>> +    int n = read(streamfd, caps, len);
>> +    if (n != (int) len) {
>> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
>> +    }
>> +
>> +    // we currently do not support extensions so just reply so
>> +    send<CapabilitiesMessage>();
>> +}
>> +
>> +void Stream::handle_stream_error(uint32_t len)
>> +{
>> +    // TODO read message and use it
>> +    throw ProtocolError("got an error message from server");
>> +}
>> +
>> +void Stream::read_command_from_device()
>> +{
>> +    StreamDevHeader hdr;
>> +    int n;
>> +
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    n = read(streamfd, &hdr, sizeof(hdr));
>> +    if (n != sizeof(hdr)) {
>> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>> +    }
>> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>> +    }
>> +
>> +    switch (hdr.type) {
>> +    case STREAM_TYPE_CAPABILITIES:
>> +        return handle_stream_capabilities(hdr.size);
>> +    case STREAM_TYPE_NOTIFY_ERROR:
>> +        return handle_stream_error(hdr.size);
>> +    case STREAM_TYPE_START_STOP:
>> +        return handle_stream_start_stop(hdr.size);
>> +    }
>> +    throw MessageDataError("unknown message type", hdr.type, 0);
>> +}
>> +
>> +int Stream::read_command(bool blocking)
>> +{
>> +    int timeout = blocking?-1:0;
>> +    while (!quit_requested) {
>> +        if (!have_something_to_read(timeout)) {
>> +            if (!blocking) {
>> +                return 0;
>> +            }
>> +            sleep(1);
>> +            continue;
>> +        }
>> +        read_command_from_device();
>> +        break;
>> +    }
>> +
>> +    return 1;
>> +}
>> +
>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> +{
>> +    size_t written = 0;
>> +    while (written < len) {
>> +        int l = write(streamfd, (const char *) buf + written, len - written);
>> +        if (l < 0) {
>> +            if (errno == EINTR) {
>> +                continue;
>> +            }
>> +            throw WriteError("write failed", operation, errno).syslog();
>> +        }
>> +        written += l;
>> +    }
>> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> +}
>> +
>> +}} // namespace spice::streaming_agent
>> diff --git a/src/stream.hpp b/src/stream.hpp
>> new file mode 100644
>> index 0000000..b689f36
>> --- /dev/null
>> +++ b/src/stream.hpp
>> @@ -0,0 +1,55 @@
>> +/* Encapsulation of the stream used to communicate between agent and server
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
>> +#define SPICE_STREAMING_AGENT_STREAM_HPP
>> +
>> +#include <spice/enums.h>
>> +#include <set>
>> +#include <mutex>
>> +
>> +namespace spice {
>> +namespace streaming_agent {
>> +
>> +class Stream
>> +{
>> +    typedef std::set<SpiceVideoCodecType> codecs_t;
>> +
>> +public:
>> +    Stream(const char *name);
>> +    ~Stream();
>> +
>> +    const codecs_t &client_codecs() { return codecs; }
>> +    bool streaming_requested() { return is_streaming; }
>> +
>> +    template <typename Message, typename ...PayloadArgs>
>> +    void send(PayloadArgs... payload)
>> +    {
>> +        Message message(payload...);
>> +        std::lock_guard<std::mutex> stream_guard(mutex);
>> +        message.write_header(*this);
>> +        message.write_message_body(*this, payload...);
>> +    }
>> +
>> +    int read_command(bool blocking);
>> +    void write_all(const char *operation, const void *buf, const size_t len);
>> +
>> +private:
>> +    int have_something_to_read(int timeout);
>> +    void handle_stream_start_stop(uint32_t len);
>> +    void handle_stream_capabilities(uint32_t len);
>> +    void handle_stream_error(uint32_t len);
>> +    void read_command_from_device(void);
>> +
>> +private:
>> +    std::mutex mutex;
>> +    codecs_t codecs;
>> +    int streamfd = -1;
>> +    bool is_streaming = false;
>> +};
>> +
>> +}} // namespace spice::streaming_agent
>> +
>> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP



More information about the Spice-devel mailing list