[Spice-devel] [PATCH spice-streaming-agent 1/3] Introduce InboundMessages for the StreamPort class

Frediano Ziglio fziglio at redhat.com
Tue Oct 9 08:02:34 UTC 2018


> On Mon, 2018-10-08 at 07:03 -0400, Frediano Ziglio wrote:
> > > 
> > > Wraps the deserialization of the received messages in an InboundMessages
> > > class. The class is created with the deserialized header and the raw
> > > data of the message. A template function get_payload() returns the
> > > struct of the concrete message. The function is specialized for each
> > > incoming message.
> > > 
> > > While this leaves the responsibility to call the get_payload() function
> > > with the message according to the type in the header to the caller, the
> > > solution preserves the efficiency of the original implementation without
> > > introducing too much complexity around the separation of the code.
> > > 
> > > Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> > > ---
> > >  src/spice-streaming-agent.cpp | 115 +++++++++-------------------------
> > >  src/stream-port.cpp           |  70 ++++++++++++++++++++-
> > >  src/stream-port.hpp           |  41 +++++++++++-
> > >  3 files changed, 139 insertions(+), 87 deletions(-)
> > > 
> > > diff --git a/src/spice-streaming-agent.cpp
> > > b/src/spice-streaming-agent.cpp
> > > index a9baf4d..a89ba3f 100644
> > > --- a/src/spice-streaming-agent.cpp
> > > +++ b/src/spice-streaming-agent.cpp
> > > @@ -77,92 +77,39 @@ static bool have_something_to_read(StreamPort
> > > &stream_port, bool blocking)
> > >      return false;
> > >  }
> > >  
> > > -static void handle_stream_start_stop(StreamPort &stream_port, uint32_t
> > > len)
> > > -{
> > > -    uint8_t msg[256];
> > > -
> > > -    if (len >= sizeof(msg)) {
> > > -        throw std::runtime_error("msg size (" + std::to_string(len) + ")
> > > is
> > > too long "
> > > -                                 "(longer than " +
> > > std::to_string(sizeof(msg)) + ")");
> > > -    }
> > > -
> > > -    stream_port.read(msg, len);
> > > -    streaming_requested = (msg[0] != 0); /* num_codecs */
> > > -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s
> > > streaming",
> > > -           streaming_requested ? "START" : "STOP");
> > > -    client_codecs.clear();
> > > -    for (int i = 1; i <= msg[0]; ++i) {
> > > -        client_codecs.insert((SpiceVideoCodecType) msg[i]);
> > > -    }
> > > -}
> > > -
> > > -static void handle_stream_capabilities(StreamPort &stream_port, uint32_t
> > > len)
> > > -{
> > > -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > -
> > > -    if (len > sizeof(caps)) {
> > > -        throw std::runtime_error("capability message too long");
> > > -    }
> > > -
> > > -    stream_port.read(caps, len);
> > > -    // we currently do not support extensions so just reply so
> > > -    StreamDevHeader hdr = {
> > > -        STREAM_DEVICE_PROTOCOL,
> > > -        0,
> > > -        STREAM_TYPE_CAPABILITIES,
> > > -        0
> > > -    };
> > > -
> > > -    stream_port.write(&hdr, sizeof(hdr));
> > > -}
> > > -
> > > -static void handle_stream_error(StreamPort &stream_port, size_t len)
> > > -{
> > > -    if (len < sizeof(StreamMsgNotifyError)) {
> > > -        throw std::runtime_error("Received NotifyError message size " +
> > > std::to_string(len) +
> > > -                                 " is too small (smaller than " +
> > > -
> > > std::to_string(sizeof(StreamMsgNotifyError))
> > > + ")");
> > > -    }
> > > -
> > > -    struct StreamMsgNotifyError1K : StreamMsgNotifyError {
> > > -        uint8_t msg[1024];
> > > -    } msg;
> > > -
> > > -    size_t len_to_read = std::min(len, sizeof(msg) - 1);
> > > -
> > > -    stream_port.read(&msg, len_to_read);
> > > -    msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0';
> > > -
> > > -    syslog(LOG_ERR, "Received NotifyError message from the server: %d -
> > > %s",
> > > -        msg.error_code, msg.msg);
> > > -
> > > -    if (len_to_read < len) {
> > > -        throw std::runtime_error("Received NotifyError message size " +
> > > std::to_string(len) +
> > > -                                 " is too big (bigger than " +
> > > std::to_string(sizeof(msg)) + ")");
> > > -    }
> > > -}
> > > -
> > >  static void read_command_from_device(StreamPort &stream_port)
> > >  {
> > > -    StreamDevHeader hdr;
> > > -
> > > -    std::lock_guard<std::mutex> guard(stream_port.mutex);
> > > -    stream_port.read(&hdr, sizeof(hdr));
> > > -
> > > -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > -        throw std::runtime_error("BAD VERSION " +
> > > std::to_string(hdr.protocol_version) +
> > > -                                 " (expected is " +
> > > std::to_string(STREAM_DEVICE_PROTOCOL) + ")");
> > > -    }
> > > -
> > > -    switch (hdr.type) {
> > > -    case STREAM_TYPE_CAPABILITIES:
> > > -        return handle_stream_capabilities(stream_port, hdr.size);
> > > -    case STREAM_TYPE_NOTIFY_ERROR:
> > > -        return handle_stream_error(stream_port, hdr.size);
> > > -    case STREAM_TYPE_START_STOP:
> > > -        return handle_stream_start_stop(stream_port, hdr.size);
> > > -    }
> > > -    throw std::runtime_error("UNKNOWN msg of type " +
> > > std::to_string(hdr.type));
> > > +    InboundMessage in_message = stream_port.receive();
> > > +
> > > +    switch (in_message.header.type) {
> > > +    case STREAM_TYPE_CAPABILITIES: {
> > > +        StreamDevHeader hdr = {
> > > +            STREAM_DEVICE_PROTOCOL,
> > > +            0,
> > > +            STREAM_TYPE_CAPABILITIES,
> > > +            0
> > > +        };
> > > +
> > > +        std::lock_guard<std::mutex> guard(stream_port.mutex);
> > > +        stream_port.write(&hdr, sizeof(hdr));
> > > +        return;
> > > +    } case STREAM_TYPE_NOTIFY_ERROR: {
> > 
> > Not against this line style but case are not aligned and we never
> > used this style.
> 
> Checking the coding style guide, it should be:
> 
>     // ...
> }
> case STREAM_TYPE_NOTIFY_ERROR: {
>     // ...
> 
> Is this what you meant? I'll fix it in the next version.
> 

Yes, was that

> > > +        NotifyErrorMessage msg =
> > > in_message.get_payload<NotifyErrorMessage>();
> > > +
> > > +        syslog(LOG_ERR, "Received NotifyError message from the server:
> > > %d -
> > > %s",
> > > +               msg.error_code, msg.message);
> > > +        return;
> > > +    } case STREAM_TYPE_START_STOP: {
> > > +        StartStopMessage msg =
> > > in_message.get_payload<StartStopMessage>();
> > > +        streaming_requested = msg.start_streaming;
> > > +        client_codecs = msg.client_codecs;
> > > +
> > > +        syslog(LOG_INFO, "GOT START_STOP message -- request to %s
> > > streaming",
> > > +               streaming_requested ? "START" : "STOP");
> > > +        return;
> > > +    }}
> > > +
> > > +    throw std::runtime_error("UNKNOWN msg of type " +
> > > std::to_string(in_message.header.type));
> > >  }
> > >  
> > >  static void read_command(StreamPort &stream_port, bool blocking)
> > > diff --git a/src/stream-port.cpp b/src/stream-port.cpp
> > > index 5528854..56747fd 100644
> > > --- a/src/stream-port.cpp
> > > +++ b/src/stream-port.cpp
> > > @@ -19,6 +19,58 @@
> > >  namespace spice {
> > >  namespace streaming_agent {
> > >  
> > > +InboundMessage::InboundMessage(const StreamDevHeader &header,
> > > std::unique_ptr<uint8_t[]> &&data) :
> > > +    header(header),
> > > +    data(std::move(data))
> > > +{}
> > > +
> > > +template<>
> > > +StartStopMessage InboundMessage::get_payload<StartStopMessage>()
> > > +{
> > 
> > why not also checking that the message is really the right type
> > instead of assuming the caller is doing the right thing?
> 
> I didn't intend to make a safe and clean API for the stream-port
> module. I aimed for simplicity instead, treating it as an integral part
> of the streaming agent. If I wanted a safe and clean API, I'd probably
> need to abstract other parts of the interface too, which would add
> boilerplate, if not overhead.
> 
> I can add the checks in the methods if you like, it just seems
> redundant to me in the current rather simple situation and the tight
> coupling between the stream-port module and its usage.
> 

Don't mine, fine as it it.

> > > +    StartStopMessage msg;
> > > +
> > > +    msg.start_streaming = data[0]; // num_codecs
> > > +
> > > +    for (size_t i = 1; i <= data[0]; ++i) {
> > > +        msg.client_codecs.insert((SpiceVideoCodecType) data[i]);
> > > +    }
> > > +
> > > +    return msg;
> > > +}
> > > +
> > > +template<>
> > > +InCapabilitiesMessage
> > > InboundMessage::get_payload<InCapabilitiesMessage>()
> > > +{
> > > +    // no capabilities yet
> > > +    return InCapabilitiesMessage();
> > > +}
> > > +
> > > +template<>
> > > +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>()
> > > +{
> > > +    if (header.size < sizeof(StreamMsgNotifyError)) {
> > > +        throw std::runtime_error("Received NotifyError message size " +
> > > std::to_string(header.size) +
> > > +                                 " is too small (smaller than " +
> > > +
> > > std::to_string(sizeof(StreamMsgNotifyError))
> > > + ")");
> > > +    }
> > > +
> > > +    size_t msg_len = header.size - sizeof(StreamMsgNotifyError);
> > > +    if (msg_len > 1024) {
> > > +        throw std::runtime_error("Received NotifyError message is too
> > > long
> > > (" +
> > > +                                 std::to_string(msg_len) + " > 1024)");
> > > +    }
> > > +
> > > +    StreamMsgNotifyError *raw_message =
> > > reinterpret_cast<StreamMsgNotifyError*>(data.get());
> > > +
> > > +    NotifyErrorMessage msg;
> > > +    msg.error_code = raw_message->error_code;
> > > +    strncpy(msg.message, reinterpret_cast<char*>(raw_message->msg),
> > > msg_len);
> > > +    // make sure the string is terminated
> > > +    msg.message[msg_len] = '\0';
> > > +
> > > +    return msg;
> > > +}
> > > +
> > >  StreamPort::StreamPort(const std::string &port_name) :
> > >  fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK))
> > >  {
> > >      if (fd < 0) {
> > > @@ -31,9 +83,23 @@ StreamPort::~StreamPort()
> > >      close(fd);
> > >  }
> > >  
> > > -void StreamPort::read(void *buf, size_t len)
> > > +InboundMessage StreamPort::receive()
> > >  {
> > > -    read_all(fd, buf, len);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +
> > > +    StreamDevHeader header;
> > > +    read_all(fd, &header, sizeof(header));
> > > +
> > > +    if (header.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > +        throw std::runtime_error("Bad protocol version: " +
> > > std::to_string(header.protocol_version) +
> > > +                                 ", expected: " +
> > > std::to_string(STREAM_DEVICE_PROTOCOL));
> > > +    }
> > > +
> > > +    // TODO should we limit the maximum message size?
> > 
> > This is a regression from previous code, should be added again.
> 
> Yes, the difference is, in the previous code each message had it's own
> custom limit. That is not possible anymore with this code (a small
> disadvantage, although the modularization is well worth it imo).
> 

A switch won't kill it. Modularity should not be a limit.

> So we need to add a limit that is sufficient for the biggest message
> that can be received. Which currently seems to be the
> NotifyErrorMessage with a size of 1028B. What shall we use as the limit
> here? 1028? Add some room for the future and use 2048? Or even bigger?
> 

That make sense too, I think 4K won't kill either.

> > > +    std::unique_ptr<uint8_t[]> data(new uint8_t[header.size]);
> > > +    read_all(fd, data.get(), header.size);
> > > +
> > > +    return InboundMessage(header, std::move(data));
> > >  }
> > >  
> > >  void StreamPort::write(const void *buf, size_t len)
> > > diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> > > index 9187cf5..090930b 100644
> > > --- a/src/stream-port.hpp
> > > +++ b/src/stream-port.hpp
> > > @@ -7,20 +7,59 @@
> > >  #ifndef SPICE_STREAMING_AGENT_STREAM_PORT_HPP
> > >  #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP
> > >  
> > > +#include <spice/stream-device.h>
> > > +#include <spice/enums.h>
> > > +
> > >  #include <cstddef>
> > >  #include <string>
> > > +#include <memory>
> > >  #include <mutex>
> > > +#include <set>
> > >  
> > >  
> > >  namespace spice {
> > >  namespace streaming_agent {
> > >  
> > > +struct StartStopMessage
> > > +{
> > > +    bool start_streaming = false;
> > 
> > This has not much to do with the message but with the usage of it,
> > the agent should start streaming if there are any codec supported,
> > so should test client_codecs only (actually start_streaming ==
> > !client_codecs.empty() as currently the agent assume the list
> > contains a codecs it can handle, which can be wrong).
> 
> In my opinion the information to start/stop streaming is different from
> the information contained in the list of codecs and should be kept
> separate. Using the codec list to represent this information is an
> abuse.
> 

Yes, is different and is not something the message can say, how the
message can know the list of plugin the agent has? Is the agent that
knows that, not the message, the agent derive it from its knowledge
about plugins and this message.

> I realize this is already done on the protocol level, but I wanted to
> keep the information separate at least here.
> 

What is at protocol level? The protocol just contains a list of
codecs supported by the client(s). Is up to the user of the message
to decide what to do.

> Thanks for the review,
> Lukas
> 
> > > +    std::set<SpiceVideoCodecType> client_codecs;
> > > +};
> > > +
> > > +struct InCapabilitiesMessage {};
> > > +
> > > +struct NotifyErrorMessage
> > > +{
> > > +    uint32_t error_code;
> > > +    char message[1025];
> > > +};
> > > +
> > > +class InboundMessage
> > > +{
> > > +public:
> > > +    InboundMessage(const StreamDevHeader &header,
> > > std::unique_ptr<uint8_t[]>
> > > &&data);
> > > +
> > > +    template<class Payload> Payload get_payload();
> > > +
> > > +    const StreamDevHeader header;
> > > +private:
> > > +    std::unique_ptr<uint8_t[]> data;
> > > +};
> > > +
> > > +template<>
> > > +StartStopMessage InboundMessage::get_payload<StartStopMessage>();
> > > +template<>
> > > +InCapabilitiesMessage
> > > InboundMessage::get_payload<InCapabilitiesMessage>();
> > > +template<>
> > > +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>();
> > > +
> > >  class StreamPort {
> > >  public:
> > >      StreamPort(const std::string &port_name);
> > >      ~StreamPort();
> > >  
> > > -    void read(void *buf, size_t len);
> > > +    InboundMessage receive();
> > > +
> > >      void write(const void *buf, size_t len);
> > >  
> > >      int fd;
> > 
> > Frediano
> 


More information about the Spice-devel mailing list