[Spice-devel] [PATCH 19/22] Put Stream and Message classes in separate files

Lukáš Hrázký lhrazky at redhat.com
Thu Mar 1 15:11:57 UTC 2018


On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> From: Christophe de Dinechin <dinechin at redhat.com>
> 
> Doing this change will make it possible to move the capture loop to the
> concrete-agent.cpp file.
> 
> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> ---
>  include/spice-streaming-agent/errors.hpp |   2 +
>  src/Makefile.am                          |   2 +
>  src/message.hpp                          |  41 ++++++
>  src/spice-streaming-agent.cpp            | 209 +------------------------------
>  src/stream.cpp                           | 172 +++++++++++++++++++++++++
>  src/stream.hpp                           |  55 ++++++++
>  6 files changed, 276 insertions(+), 205 deletions(-)
>  create mode 100644 src/message.hpp
>  create mode 100644 src/stream.cpp
>  create mode 100644 src/stream.hpp
> 
> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> index 870a0fd..62ae010 100644
> --- a/include/spice-streaming-agent/errors.hpp
> +++ b/include/spice-streaming-agent/errors.hpp
> @@ -90,4 +90,6 @@ protected:
>  
>  }} // namespace spice::streaming_agent
>  
> +extern bool quit_requested;

Putting quit_requested into errors.hpp? Why?

> +
>  #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 2507844..923a103 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>  	mjpeg-fallback.hpp \
>  	jpeg.cpp \
>  	jpeg.hpp \
> +	stream.cpp \
> +	stream.hpp \
>  	errors.cpp \
>  	$(NULL)
> diff --git a/src/message.hpp b/src/message.hpp
> new file mode 100644
> index 0000000..28b3e28
> --- /dev/null
> +++ b/src/message.hpp
> @@ -0,0 +1,41 @@
> +/* Formatting messages
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> +
> +#include <spice/stream-device.h>
> +
> +namespace spice
> +{
> +namespace streaming_agent
> +{
> +
> +template <typename Payload, typename Info, unsigned Type>
> +class Message
> +{
> +public:
> +    template <typename ...PayloadArgs>
> +    Message(PayloadArgs... payload)
> +        : hdr(StreamDevHeader {
> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> +              .type = Type,
> +              .size = (uint32_t) Info::size(payload...)
> +          })
> +    { }
> +    void write_header(Stream &stream)
> +    {
> +        stream.write_all("header", &hdr, sizeof(hdr));
> +    }
> +
> +protected:
> +    StreamDevHeader hdr;
> +    typedef Payload payload_t;
> +};
> +
> +}} // namespace spice::streaming_agent
> +
> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 35e65bb..c401a34 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -5,6 +5,8 @@
>   */
>  
>  #include "concrete-agent.hpp"
> +#include "stream.hpp"
> +#include "message.hpp"
>  #include "hexdump.h"
>  #include "mjpeg-fallback.hpp"
>  
> @@ -21,11 +23,9 @@
>  #include <inttypes.h>
>  #include <string.h>
>  #include <getopt.h>
> -#include <unistd.h>
>  #include <errno.h>
> -#include <fcntl.h>
> +#include <unistd.h>
>  #include <sys/time.h>
> -#include <poll.h>
>  #include <syslog.h>
>  #include <signal.h>
>  #include <exception>
> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>  
>  }
>  
> -class Stream
> -{
> -    typedef std::set<SpiceVideoCodecType> codecs_t;
> -
> -public:
> -    Stream(const char *name)
> -        : codecs()
> -    {
> -        streamfd = open(name, O_RDWR);
> -        if (streamfd < 0) {
> -            throw IOError("failed to open streaming device", errno);
> -        }
> -    }
> -    ~Stream()
> -    {
> -        close(streamfd);
> -    }
> -
> -    const codecs_t &client_codecs() { return codecs; }
> -    bool streaming_requested() { return is_streaming; }
> -
> -    template <typename Message, typename ...PayloadArgs>
> -    void send(PayloadArgs... payload)
> -    {
> -        Message message(payload...);
> -        std::lock_guard<std::mutex> stream_guard(mutex);
> -        message.write_header(*this);
> -        message.write_message_body(*this, payload...);
> -    }
> -
> -    int read_command(bool blocking);
> -    void write_all(const char *operation, const void *buf, const size_t len);
> -
> -private:
> -    int have_something_to_read(int timeout);
> -    void handle_stream_start_stop(uint32_t len);
> -    void handle_stream_capabilities(uint32_t len);
> -    void handle_stream_error(uint32_t len);
> -    void read_command_from_device(void);
> -
> -private:
> -    std::mutex mutex;
> -    codecs_t codecs;
> -    int streamfd = -1;
> -    bool is_streaming = false;
> -};
> -
> -template <typename Payload, typename Info, unsigned Type>
> -class Message
> -{
> -public:
> -    template <typename ...PayloadArgs>
> -    Message(PayloadArgs... payload)
> -        : hdr(StreamDevHeader {
> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> -              .type = Type,
> -              .size = (uint32_t) Info::size(payload...)
> -          })
> -    { }
> -    void write_header(Stream &stream)
> -    {
> -        stream.write_all("header", &hdr, sizeof(hdr));
> -    }
> -
> -protected:
> -    StreamDevHeader hdr;
> -    typedef Payload payload_t;
> -};
> -
>  class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>  {
>  public:
> @@ -156,20 +86,6 @@ public:
>      }
>  };
>  
> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> -{
> -public:
> -    CapabilitiesMessage() : Message() {}
> -    static size_t size()
> -    {
> -        return sizeof(payload_t);
> -    }
> -    void write_message_body(Stream &stream)
> -    {
> -        /* No body for capabilities message */
> -    }
> -};
> -
>  class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>  {
>  public:
> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>  
>  }} // namespace spice::streaming_agent
>  
> -static bool quit_requested = false;
> -
> -int Stream::have_something_to_read(int timeout)
> -{
> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> -
> -    if (poll(&pollfd, 1, timeout) < 0) {
> -        syslog(LOG_ERR, "poll FAILED\n");
> -        return -1;
> -    }
> -
> -    if (pollfd.revents == POLLIN) {
> -        return 1;
> -    }
> -
> -    return 0;
> -}
> -
> -void Stream::handle_stream_start_stop(uint32_t len)
> -{
> -    uint8_t msg[256];
> -
> -    if (len >= sizeof(msg)) {
> -        throw MessageDataError("message is too long", len, sizeof(msg));
> -    }
> -    int n = read(streamfd, &msg, len);
> -    if (n != (int) len) {
> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> -    }
> -    is_streaming = (msg[0] != 0); /* num_codecs */
> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> -           is_streaming ? "START" : "STOP");
> -    codecs.clear();
> -    for (int i = 1; i <= msg[0]; ++i) {
> -        codecs.insert((SpiceVideoCodecType) msg[i]);
> -    }
> -}
> -
> -void Stream::handle_stream_capabilities(uint32_t len)
> -{
> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> -
> -    if (len > sizeof(caps)) {
> -        throw MessageDataError("capability message too long", len, sizeof(caps));
> -    }
> -    int n = read(streamfd, caps, len);
> -    if (n != (int) len) {
> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
> -    }
> -
> -    // we currently do not support extensions so just reply so
> -    send<CapabilitiesMessage>();
> -}
> -
> -void Stream::handle_stream_error(uint32_t len)
> -{
> -    // TODO read message and use it
> -    throw ProtocolError("got an error message from server");
> -}
> -
> -void Stream::read_command_from_device()
> -{
> -    StreamDevHeader hdr;
> -    int n;
> -
> -    std::lock_guard<std::mutex> stream_guard(mutex);
> -    n = read(streamfd, &hdr, sizeof(hdr));
> -    if (n != sizeof(hdr)) {
> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> -    }
> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> -    }
> -
> -    switch (hdr.type) {
> -    case STREAM_TYPE_CAPABILITIES:
> -        return handle_stream_capabilities(hdr.size);
> -    case STREAM_TYPE_NOTIFY_ERROR:
> -        return handle_stream_error(hdr.size);
> -    case STREAM_TYPE_START_STOP:
> -        return handle_stream_start_stop(hdr.size);
> -    }
> -    throw MessageDataError("unknown message type", hdr.type, 0);
> -}
> -
> -int Stream::read_command(bool blocking)
> -{
> -    int timeout = blocking?-1:0;
> -    while (!quit_requested) {
> -        if (!have_something_to_read(timeout)) {
> -            if (!blocking) {
> -                return 0;
> -            }
> -            sleep(1);
> -            continue;
> -        }
> -        read_command_from_device();
> -        break;
> -    }
> -
> -    return 1;
> -}
> -
> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> -{
> -    size_t written = 0;
> -    while (written < len) {
> -        int l = write(streamfd, (const char *) buf + written, len - written);
> -        if (l < 0) {
> -            if (errno == EINTR) {
> -                continue;
> -            }
> -            throw WriteError("write failed", operation, errno).syslog();
> -        }
> -        written += l;
> -    }
> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> -}
> +bool quit_requested = false;
>  
>  static void handle_interrupt(int intr)
>  {
> diff --git a/src/stream.cpp b/src/stream.cpp
> new file mode 100644
> index 0000000..f756097
> --- /dev/null
> +++ b/src/stream.cpp
> @@ -0,0 +1,172 @@
> +/* Encapsulation of the stream used to communicate between agent and server
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +
> +#include "stream.hpp"
> +#include "message.hpp"
> +
> +#include <spice/stream-device.h>
> +
> +#include <spice-streaming-agent/errors.hpp>
> +
> +#include <sys/types.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +#include <poll.h>
> +#include <syslog.h>
> +#include <unistd.h>
> +
> +namespace spice
> +{
> +namespace streaming_agent
> +{
> +
> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> +                                           STREAM_TYPE_CAPABILITIES>
> +{
> +public:
> +    CapabilitiesMessage() : Message() {}
> +    static size_t size()
> +    {
> +        return sizeof(payload_t);
> +    }
> +    void write_message_body(Stream &stream)
> +    {
> +        /* No body for capabilities message */
> +    }
> +};

Not sure I like scattering the messages across source files that happen
to use them, though I suppose you did it because each message (like the
X11Cursor) may require different header files included? Perhaps it is
the way to go...

> +
> +Stream::Stream(const char *name)
> +    : codecs()
> +{
> +    streamfd = open(name, O_RDWR);
> +    if (streamfd < 0) {
> +        throw IOError("failed to open streaming device", errno);
> +    }
> +}
> +
> +Stream::~Stream()
> +{
> +    close(streamfd);
> +}
> +
> +int Stream::have_something_to_read(int timeout)
> +{
> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> +
> +    if (poll(&pollfd, 1, timeout) < 0) {
> +        syslog(LOG_ERR, "poll FAILED\n");
> +        return -1;
> +    }
> +
> +    if (pollfd.revents == POLLIN) {
> +        return 1;
> +    }
> +
> +    return 0;
> +}
> +
> +void Stream::handle_stream_start_stop(uint32_t len)
> +{
> +    uint8_t msg[256];
> +
> +    if (len >= sizeof(msg)) {
> +        throw MessageDataError("message is too long", len, sizeof(msg));
> +    }
> +    int n = read(streamfd, &msg, len);
> +    if (n != (int) len) {
> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> +    }
> +    is_streaming = (msg[0] != 0); /* num_codecs */
> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> +           is_streaming ? "START" : "STOP");
> +    codecs.clear();
> +    for (int i = 1; i <= msg[0]; ++i) {
> +        codecs.insert((SpiceVideoCodecType) msg[i]);
> +    }
> +}
> +
> +void Stream::handle_stream_capabilities(uint32_t len)
> +{
> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> +
> +    if (len > sizeof(caps)) {
> +        throw MessageDataError("capability message too long", len, sizeof(caps));
> +    }
> +    int n = read(streamfd, caps, len);
> +    if (n != (int) len) {
> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
> +    }
> +
> +    // we currently do not support extensions so just reply so
> +    send<CapabilitiesMessage>();
> +}
> +
> +void Stream::handle_stream_error(uint32_t len)
> +{
> +    // TODO read message and use it
> +    throw ProtocolError("got an error message from server");
> +}
> +
> +void Stream::read_command_from_device()
> +{
> +    StreamDevHeader hdr;
> +    int n;
> +
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    n = read(streamfd, &hdr, sizeof(hdr));
> +    if (n != sizeof(hdr)) {
> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> +    }
> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> +    }
> +
> +    switch (hdr.type) {
> +    case STREAM_TYPE_CAPABILITIES:
> +        return handle_stream_capabilities(hdr.size);
> +    case STREAM_TYPE_NOTIFY_ERROR:
> +        return handle_stream_error(hdr.size);
> +    case STREAM_TYPE_START_STOP:
> +        return handle_stream_start_stop(hdr.size);
> +    }
> +    throw MessageDataError("unknown message type", hdr.type, 0);
> +}
> +
> +int Stream::read_command(bool blocking)
> +{
> +    int timeout = blocking?-1:0;
> +    while (!quit_requested) {
> +        if (!have_something_to_read(timeout)) {
> +            if (!blocking) {
> +                return 0;
> +            }
> +            sleep(1);
> +            continue;
> +        }
> +        read_command_from_device();
> +        break;
> +    }
> +
> +    return 1;
> +}
> +
> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> +{
> +    size_t written = 0;
> +    while (written < len) {
> +        int l = write(streamfd, (const char *) buf + written, len - written);
> +        if (l < 0) {
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +            throw WriteError("write failed", operation, errno).syslog();
> +        }
> +        written += l;
> +    }
> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> +}
> +
> +}} // namespace spice::streaming_agent
> diff --git a/src/stream.hpp b/src/stream.hpp
> new file mode 100644
> index 0000000..b689f36
> --- /dev/null
> +++ b/src/stream.hpp
> @@ -0,0 +1,55 @@
> +/* Encapsulation of the stream used to communicate between agent and server
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> +#define SPICE_STREAMING_AGENT_STREAM_HPP
> +
> +#include <spice/enums.h>
> +#include <set>
> +#include <mutex>
> +
> +namespace spice {
> +namespace streaming_agent {
> +
> +class Stream
> +{
> +    typedef std::set<SpiceVideoCodecType> codecs_t;
> +
> +public:
> +    Stream(const char *name);
> +    ~Stream();
> +
> +    const codecs_t &client_codecs() { return codecs; }
> +    bool streaming_requested() { return is_streaming; }
> +
> +    template <typename Message, typename ...PayloadArgs>
> +    void send(PayloadArgs... payload)
> +    {
> +        Message message(payload...);
> +        std::lock_guard<std::mutex> stream_guard(mutex);
> +        message.write_header(*this);
> +        message.write_message_body(*this, payload...);
> +    }
> +
> +    int read_command(bool blocking);
> +    void write_all(const char *operation, const void *buf, const size_t len);
> +
> +private:
> +    int have_something_to_read(int timeout);
> +    void handle_stream_start_stop(uint32_t len);
> +    void handle_stream_capabilities(uint32_t len);
> +    void handle_stream_error(uint32_t len);
> +    void read_command_from_device(void);
> +
> +private:
> +    std::mutex mutex;
> +    codecs_t codecs;
> +    int streamfd = -1;
> +    bool is_streaming = false;
> +};
> +
> +}} // namespace spice::streaming_agent
> +
> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP


More information about the Spice-devel mailing list