[Spice-devel] [PATCH spice-streaming-agent 1/3] Introduce InboundMessages for the StreamPort class

Frediano Ziglio fziglio at redhat.com
Mon Oct 8 11:03:44 UTC 2018


> 
> Wraps the deserialization of the received messages in an InboundMessages
> class. The class is created with the deserialized header and the raw
> data of the message. A template function get_payload() returns the
> struct of the concrete message. The function is specialized for each
> incoming message.
> 
> While this leaves the responsibility to call the get_payload() function
> with the message according to the type in the header to the caller, the
> solution preserves the efficiency of the original implementation without
> introducing too much complexity around the separation of the code.
> 
> Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> ---
>  src/spice-streaming-agent.cpp | 115 +++++++++-------------------------
>  src/stream-port.cpp           |  70 ++++++++++++++++++++-
>  src/stream-port.hpp           |  41 +++++++++++-
>  3 files changed, 139 insertions(+), 87 deletions(-)
> 
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index a9baf4d..a89ba3f 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -77,92 +77,39 @@ static bool have_something_to_read(StreamPort
> &stream_port, bool blocking)
>      return false;
>  }
>  
> -static void handle_stream_start_stop(StreamPort &stream_port, uint32_t len)
> -{
> -    uint8_t msg[256];
> -
> -    if (len >= sizeof(msg)) {
> -        throw std::runtime_error("msg size (" + std::to_string(len) + ") is
> too long "
> -                                 "(longer than " +
> std::to_string(sizeof(msg)) + ")");
> -    }
> -
> -    stream_port.read(msg, len);
> -    streaming_requested = (msg[0] != 0); /* num_codecs */
> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming",
> -           streaming_requested ? "START" : "STOP");
> -    client_codecs.clear();
> -    for (int i = 1; i <= msg[0]; ++i) {
> -        client_codecs.insert((SpiceVideoCodecType) msg[i]);
> -    }
> -}
> -
> -static void handle_stream_capabilities(StreamPort &stream_port, uint32_t
> len)
> -{
> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> -
> -    if (len > sizeof(caps)) {
> -        throw std::runtime_error("capability message too long");
> -    }
> -
> -    stream_port.read(caps, len);
> -    // we currently do not support extensions so just reply so
> -    StreamDevHeader hdr = {
> -        STREAM_DEVICE_PROTOCOL,
> -        0,
> -        STREAM_TYPE_CAPABILITIES,
> -        0
> -    };
> -
> -    stream_port.write(&hdr, sizeof(hdr));
> -}
> -
> -static void handle_stream_error(StreamPort &stream_port, size_t len)
> -{
> -    if (len < sizeof(StreamMsgNotifyError)) {
> -        throw std::runtime_error("Received NotifyError message size " +
> std::to_string(len) +
> -                                 " is too small (smaller than " +
> -
> std::to_string(sizeof(StreamMsgNotifyError))
> + ")");
> -    }
> -
> -    struct StreamMsgNotifyError1K : StreamMsgNotifyError {
> -        uint8_t msg[1024];
> -    } msg;
> -
> -    size_t len_to_read = std::min(len, sizeof(msg) - 1);
> -
> -    stream_port.read(&msg, len_to_read);
> -    msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0';
> -
> -    syslog(LOG_ERR, "Received NotifyError message from the server: %d - %s",
> -        msg.error_code, msg.msg);
> -
> -    if (len_to_read < len) {
> -        throw std::runtime_error("Received NotifyError message size " +
> std::to_string(len) +
> -                                 " is too big (bigger than " +
> std::to_string(sizeof(msg)) + ")");
> -    }
> -}
> -
>  static void read_command_from_device(StreamPort &stream_port)
>  {
> -    StreamDevHeader hdr;
> -
> -    std::lock_guard<std::mutex> guard(stream_port.mutex);
> -    stream_port.read(&hdr, sizeof(hdr));
> -
> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> -        throw std::runtime_error("BAD VERSION " +
> std::to_string(hdr.protocol_version) +
> -                                 " (expected is " +
> std::to_string(STREAM_DEVICE_PROTOCOL) + ")");
> -    }
> -
> -    switch (hdr.type) {
> -    case STREAM_TYPE_CAPABILITIES:
> -        return handle_stream_capabilities(stream_port, hdr.size);
> -    case STREAM_TYPE_NOTIFY_ERROR:
> -        return handle_stream_error(stream_port, hdr.size);
> -    case STREAM_TYPE_START_STOP:
> -        return handle_stream_start_stop(stream_port, hdr.size);
> -    }
> -    throw std::runtime_error("UNKNOWN msg of type " +
> std::to_string(hdr.type));
> +    InboundMessage in_message = stream_port.receive();
> +
> +    switch (in_message.header.type) {
> +    case STREAM_TYPE_CAPABILITIES: {
> +        StreamDevHeader hdr = {
> +            STREAM_DEVICE_PROTOCOL,
> +            0,
> +            STREAM_TYPE_CAPABILITIES,
> +            0
> +        };
> +
> +        std::lock_guard<std::mutex> guard(stream_port.mutex);
> +        stream_port.write(&hdr, sizeof(hdr));
> +        return;
> +    } case STREAM_TYPE_NOTIFY_ERROR: {

Not against this line style but case are not aligned and we never
used this style.

> +        NotifyErrorMessage msg =
> in_message.get_payload<NotifyErrorMessage>();
> +
> +        syslog(LOG_ERR, "Received NotifyError message from the server: %d -
> %s",
> +               msg.error_code, msg.message);
> +        return;
> +    } case STREAM_TYPE_START_STOP: {
> +        StartStopMessage msg = in_message.get_payload<StartStopMessage>();
> +        streaming_requested = msg.start_streaming;
> +        client_codecs = msg.client_codecs;
> +
> +        syslog(LOG_INFO, "GOT START_STOP message -- request to %s
> streaming",
> +               streaming_requested ? "START" : "STOP");
> +        return;
> +    }}
> +
> +    throw std::runtime_error("UNKNOWN msg of type " +
> std::to_string(in_message.header.type));
>  }
>  
>  static void read_command(StreamPort &stream_port, bool blocking)
> diff --git a/src/stream-port.cpp b/src/stream-port.cpp
> index 5528854..56747fd 100644
> --- a/src/stream-port.cpp
> +++ b/src/stream-port.cpp
> @@ -19,6 +19,58 @@
>  namespace spice {
>  namespace streaming_agent {
>  
> +InboundMessage::InboundMessage(const StreamDevHeader &header,
> std::unique_ptr<uint8_t[]> &&data) :
> +    header(header),
> +    data(std::move(data))
> +{}
> +
> +template<>
> +StartStopMessage InboundMessage::get_payload<StartStopMessage>()
> +{

why not also checking that the message is really the right type
instead of assuming the caller is doing the right thing?

> +    StartStopMessage msg;
> +
> +    msg.start_streaming = data[0]; // num_codecs
> +
> +    for (size_t i = 1; i <= data[0]; ++i) {
> +        msg.client_codecs.insert((SpiceVideoCodecType) data[i]);
> +    }
> +
> +    return msg;
> +}
> +
> +template<>
> +InCapabilitiesMessage InboundMessage::get_payload<InCapabilitiesMessage>()
> +{
> +    // no capabilities yet
> +    return InCapabilitiesMessage();
> +}
> +
> +template<>
> +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>()
> +{
> +    if (header.size < sizeof(StreamMsgNotifyError)) {
> +        throw std::runtime_error("Received NotifyError message size " +
> std::to_string(header.size) +
> +                                 " is too small (smaller than " +
> +
> std::to_string(sizeof(StreamMsgNotifyError))
> + ")");
> +    }
> +
> +    size_t msg_len = header.size - sizeof(StreamMsgNotifyError);
> +    if (msg_len > 1024) {
> +        throw std::runtime_error("Received NotifyError message is too long
> (" +
> +                                 std::to_string(msg_len) + " > 1024)");
> +    }
> +
> +    StreamMsgNotifyError *raw_message =
> reinterpret_cast<StreamMsgNotifyError*>(data.get());
> +
> +    NotifyErrorMessage msg;
> +    msg.error_code = raw_message->error_code;
> +    strncpy(msg.message, reinterpret_cast<char*>(raw_message->msg),
> msg_len);
> +    // make sure the string is terminated
> +    msg.message[msg_len] = '\0';
> +
> +    return msg;
> +}
> +
>  StreamPort::StreamPort(const std::string &port_name) :
>  fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK))
>  {
>      if (fd < 0) {
> @@ -31,9 +83,23 @@ StreamPort::~StreamPort()
>      close(fd);
>  }
>  
> -void StreamPort::read(void *buf, size_t len)
> +InboundMessage StreamPort::receive()
>  {
> -    read_all(fd, buf, len);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +
> +    StreamDevHeader header;
> +    read_all(fd, &header, sizeof(header));
> +
> +    if (header.protocol_version != STREAM_DEVICE_PROTOCOL) {
> +        throw std::runtime_error("Bad protocol version: " +
> std::to_string(header.protocol_version) +
> +                                 ", expected: " +
> std::to_string(STREAM_DEVICE_PROTOCOL));
> +    }
> +
> +    // TODO should we limit the maximum message size?

This is a regression from previous code, should be added again.

> +    std::unique_ptr<uint8_t[]> data(new uint8_t[header.size]);
> +    read_all(fd, data.get(), header.size);
> +
> +    return InboundMessage(header, std::move(data));
>  }
>  
>  void StreamPort::write(const void *buf, size_t len)
> diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> index 9187cf5..090930b 100644
> --- a/src/stream-port.hpp
> +++ b/src/stream-port.hpp
> @@ -7,20 +7,59 @@
>  #ifndef SPICE_STREAMING_AGENT_STREAM_PORT_HPP
>  #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP
>  
> +#include <spice/stream-device.h>
> +#include <spice/enums.h>
> +
>  #include <cstddef>
>  #include <string>
> +#include <memory>
>  #include <mutex>
> +#include <set>
>  
>  
>  namespace spice {
>  namespace streaming_agent {
>  
> +struct StartStopMessage
> +{
> +    bool start_streaming = false;

This has not much to do with the message but with the usage of it,
the agent should start streaming if there are any codec supported,
so should test client_codecs only (actually start_streaming ==
!client_codecs.empty() as currently the agent assume the list
contains a codecs it can handle, which can be wrong).

> +    std::set<SpiceVideoCodecType> client_codecs;
> +};
> +
> +struct InCapabilitiesMessage {};
> +
> +struct NotifyErrorMessage
> +{
> +    uint32_t error_code;
> +    char message[1025];
> +};
> +
> +class InboundMessage
> +{
> +public:
> +    InboundMessage(const StreamDevHeader &header, std::unique_ptr<uint8_t[]>
> &&data);
> +
> +    template<class Payload> Payload get_payload();
> +
> +    const StreamDevHeader header;
> +private:
> +    std::unique_ptr<uint8_t[]> data;
> +};
> +
> +template<>
> +StartStopMessage InboundMessage::get_payload<StartStopMessage>();
> +template<>
> +InCapabilitiesMessage InboundMessage::get_payload<InCapabilitiesMessage>();
> +template<>
> +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>();
> +
>  class StreamPort {
>  public:
>      StreamPort(const std::string &port_name);
>      ~StreamPort();
>  
> -    void read(void *buf, size_t len);
> +    InboundMessage receive();
> +
>      void write(const void *buf, size_t len);
>  
>      int fd;

Frediano


More information about the Spice-devel mailing list