[Spice-devel] [PATCH spice-streaming-agent 1/3] Introduce InboundMessages for the StreamPort class
Frediano Ziglio
fziglio at redhat.com
Tue Oct 9 10:03:35 UTC 2018
>
> On Tue, 2018-10-09 at 04:02 -0400, Frediano Ziglio wrote:
> > > On Mon, 2018-10-08 at 07:03 -0400, Frediano Ziglio wrote:
> > > > >
> > > > > Wraps the deserialization of the received messages in an
> > > > > InboundMessages
> > > > > class. The class is created with the deserialized header and the raw
> > > > > data of the message. A template function get_payload() returns the
> > > > > struct of the concrete message. The function is specialized for each
> > > > > incoming message.
> > > > >
> > > > > While this leaves the responsibility to call the get_payload()
> > > > > function
> > > > > with the message according to the type in the header to the caller,
> > > > > the
> > > > > solution preserves the efficiency of the original implementation
> > > > > without
> > > > > introducing too much complexity around the separation of the code.
> > > > >
> > > > > Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> > > > > ---
> > > > > src/spice-streaming-agent.cpp | 115
> > > > > +++++++++-------------------------
> > > > > src/stream-port.cpp | 70 ++++++++++++++++++++-
> > > > > src/stream-port.hpp | 41 +++++++++++-
> > > > > 3 files changed, 139 insertions(+), 87 deletions(-)
> > > > >
> > > > > diff --git a/src/spice-streaming-agent.cpp
> > > > > b/src/spice-streaming-agent.cpp
> > > > > index a9baf4d..a89ba3f 100644
> > > > > --- a/src/spice-streaming-agent.cpp
> > > > > +++ b/src/spice-streaming-agent.cpp
> > > > > @@ -77,92 +77,39 @@ static bool have_something_to_read(StreamPort
> > > > > &stream_port, bool blocking)
> > > > > return false;
> > > > > }
> > > > >
> > > > > -static void handle_stream_start_stop(StreamPort &stream_port,
> > > > > uint32_t
> > > > > len)
> > > > > -{
> > > > > - uint8_t msg[256];
> > > > > -
> > > > > - if (len >= sizeof(msg)) {
> > > > > - throw std::runtime_error("msg size (" + std::to_string(len)
> > > > > + ")
> > > > > is
> > > > > too long "
> > > > > - "(longer than " +
> > > > > std::to_string(sizeof(msg)) + ")");
> > > > > - }
> > > > > -
> > > > > - stream_port.read(msg, len);
> > > > > - streaming_requested = (msg[0] != 0); /* num_codecs */
> > > > > - syslog(LOG_INFO, "GOT START_STOP message -- request to %s
> > > > > streaming",
> > > > > - streaming_requested ? "START" : "STOP");
> > > > > - client_codecs.clear();
> > > > > - for (int i = 1; i <= msg[0]; ++i) {
> > > > > - client_codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > - }
> > > > > -}
> > > > > -
> > > > > -static void handle_stream_capabilities(StreamPort &stream_port,
> > > > > uint32_t
> > > > > len)
> > > > > -{
> > > > > - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > -
> > > > > - if (len > sizeof(caps)) {
> > > > > - throw std::runtime_error("capability message too long");
> > > > > - }
> > > > > -
> > > > > - stream_port.read(caps, len);
> > > > > - // we currently do not support extensions so just reply so
> > > > > - StreamDevHeader hdr = {
> > > > > - STREAM_DEVICE_PROTOCOL,
> > > > > - 0,
> > > > > - STREAM_TYPE_CAPABILITIES,
> > > > > - 0
> > > > > - };
> > > > > -
> > > > > - stream_port.write(&hdr, sizeof(hdr));
> > > > > -}
> > > > > -
> > > > > -static void handle_stream_error(StreamPort &stream_port, size_t len)
> > > > > -{
> > > > > - if (len < sizeof(StreamMsgNotifyError)) {
> > > > > - throw std::runtime_error("Received NotifyError message size
> > > > > " +
> > > > > std::to_string(len) +
> > > > > - " is too small (smaller than " +
> > > > > -
> > > > > std::to_string(sizeof(StreamMsgNotifyError))
> > > > > + ")");
> > > > > - }
> > > > > -
> > > > > - struct StreamMsgNotifyError1K : StreamMsgNotifyError {
> > > > > - uint8_t msg[1024];
> > > > > - } msg;
> > > > > -
> > > > > - size_t len_to_read = std::min(len, sizeof(msg) - 1);
> > > > > -
> > > > > - stream_port.read(&msg, len_to_read);
> > > > > - msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0';
> > > > > -
> > > > > - syslog(LOG_ERR, "Received NotifyError message from the server:
> > > > > %d -
> > > > > %s",
> > > > > - msg.error_code, msg.msg);
> > > > > -
> > > > > - if (len_to_read < len) {
> > > > > - throw std::runtime_error("Received NotifyError message size
> > > > > " +
> > > > > std::to_string(len) +
> > > > > - " is too big (bigger than " +
> > > > > std::to_string(sizeof(msg)) + ")");
> > > > > - }
> > > > > -}
> > > > > -
> > > > > static void read_command_from_device(StreamPort &stream_port)
> > > > > {
> > > > > - StreamDevHeader hdr;
> > > > > -
> > > > > - std::lock_guard<std::mutex> guard(stream_port.mutex);
> > > > > - stream_port.read(&hdr, sizeof(hdr));
> > > > > -
> > > > > - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > - throw std::runtime_error("BAD VERSION " +
> > > > > std::to_string(hdr.protocol_version) +
> > > > > - " (expected is " +
> > > > > std::to_string(STREAM_DEVICE_PROTOCOL) + ")");
> > > > > - }
> > > > > -
> > > > > - switch (hdr.type) {
> > > > > - case STREAM_TYPE_CAPABILITIES:
> > > > > - return handle_stream_capabilities(stream_port, hdr.size);
> > > > > - case STREAM_TYPE_NOTIFY_ERROR:
> > > > > - return handle_stream_error(stream_port, hdr.size);
> > > > > - case STREAM_TYPE_START_STOP:
> > > > > - return handle_stream_start_stop(stream_port, hdr.size);
> > > > > - }
> > > > > - throw std::runtime_error("UNKNOWN msg of type " +
> > > > > std::to_string(hdr.type));
> > > > > + InboundMessage in_message = stream_port.receive();
> > > > > +
> > > > > + switch (in_message.header.type) {
> > > > > + case STREAM_TYPE_CAPABILITIES: {
> > > > > + StreamDevHeader hdr = {
> > > > > + STREAM_DEVICE_PROTOCOL,
> > > > > + 0,
> > > > > + STREAM_TYPE_CAPABILITIES,
> > > > > + 0
> > > > > + };
> > > > > +
> > > > > + std::lock_guard<std::mutex> guard(stream_port.mutex);
> > > > > + stream_port.write(&hdr, sizeof(hdr));
> > > > > + return;
> > > > > + } case STREAM_TYPE_NOTIFY_ERROR: {
> > > >
> > > > Not against this line style but case are not aligned and we never
> > > > used this style.
> > >
> > > Checking the coding style guide, it should be:
> > >
> > > // ...
> > > }
> > > case STREAM_TYPE_NOTIFY_ERROR: {
> > > // ...
> > >
> > > Is this what you meant? I'll fix it in the next version.
> > >
> >
> > Yes, was that
> >
> > > > > + NotifyErrorMessage msg =
> > > > > in_message.get_payload<NotifyErrorMessage>();
> > > > > +
> > > > > + syslog(LOG_ERR, "Received NotifyError message from the
> > > > > server:
> > > > > %d -
> > > > > %s",
> > > > > + msg.error_code, msg.message);
> > > > > + return;
> > > > > + } case STREAM_TYPE_START_STOP: {
> > > > > + StartStopMessage msg =
> > > > > in_message.get_payload<StartStopMessage>();
> > > > > + streaming_requested = msg.start_streaming;
> > > > > + client_codecs = msg.client_codecs;
> > > > > +
> > > > > + syslog(LOG_INFO, "GOT START_STOP message -- request to %s
> > > > > streaming",
> > > > > + streaming_requested ? "START" : "STOP");
> > > > > + return;
> > > > > + }}
> > > > > +
> > > > > + throw std::runtime_error("UNKNOWN msg of type " +
> > > > > std::to_string(in_message.header.type));
> > > > > }
> > > > >
> > > > > static void read_command(StreamPort &stream_port, bool blocking)
> > > > > diff --git a/src/stream-port.cpp b/src/stream-port.cpp
> > > > > index 5528854..56747fd 100644
> > > > > --- a/src/stream-port.cpp
> > > > > +++ b/src/stream-port.cpp
> > > > > @@ -19,6 +19,58 @@
> > > > > namespace spice {
> > > > > namespace streaming_agent {
> > > > >
> > > > > +InboundMessage::InboundMessage(const StreamDevHeader &header,
> > > > > std::unique_ptr<uint8_t[]> &&data) :
> > > > > + header(header),
> > > > > + data(std::move(data))
> > > > > +{}
> > > > > +
> > > > > +template<>
> > > > > +StartStopMessage InboundMessage::get_payload<StartStopMessage>()
> > > > > +{
> > > >
> > > > why not also checking that the message is really the right type
> > > > instead of assuming the caller is doing the right thing?
> > >
> > > I didn't intend to make a safe and clean API for the stream-port
> > > module. I aimed for simplicity instead, treating it as an integral part
> > > of the streaming agent. If I wanted a safe and clean API, I'd probably
> > > need to abstract other parts of the interface too, which would add
> > > boilerplate, if not overhead.
> > >
> > > I can add the checks in the methods if you like, it just seems
> > > redundant to me in the current rather simple situation and the tight
> > > coupling between the stream-port module and its usage.
> > >
> >
> > Don't mine, fine as it it.
> >
> > > > > + StartStopMessage msg;
> > > > > +
> > > > > + msg.start_streaming = data[0]; // num_codecs
> > > > > +
> > > > > + for (size_t i = 1; i <= data[0]; ++i) {
> > > > > + msg.client_codecs.insert((SpiceVideoCodecType) data[i]);
> > > > > + }
> > > > > +
> > > > > + return msg;
> > > > > +}
> > > > > +
> > > > > +template<>
> > > > > +InCapabilitiesMessage
> > > > > InboundMessage::get_payload<InCapabilitiesMessage>()
> > > > > +{
> > > > > + // no capabilities yet
> > > > > + return InCapabilitiesMessage();
> > > > > +}
> > > > > +
> > > > > +template<>
> > > > > +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>()
> > > > > +{
> > > > > + if (header.size < sizeof(StreamMsgNotifyError)) {
> > > > > + throw std::runtime_error("Received NotifyError message size
> > > > > " +
> > > > > std::to_string(header.size) +
> > > > > + " is too small (smaller than " +
> > > > > +
> > > > > std::to_string(sizeof(StreamMsgNotifyError))
> > > > > + ")");
> > > > > + }
> > > > > +
> > > > > + size_t msg_len = header.size - sizeof(StreamMsgNotifyError);
> > > > > + if (msg_len > 1024) {
> > > > > + throw std::runtime_error("Received NotifyError message is
> > > > > too
> > > > > long
> > > > > (" +
> > > > > + std::to_string(msg_len) + " >
> > > > > 1024)");
> > > > > + }
> > > > > +
> > > > > + StreamMsgNotifyError *raw_message =
> > > > > reinterpret_cast<StreamMsgNotifyError*>(data.get());
> > > > > +
> > > > > + NotifyErrorMessage msg;
> > > > > + msg.error_code = raw_message->error_code;
> > > > > + strncpy(msg.message, reinterpret_cast<char*>(raw_message->msg),
> > > > > msg_len);
> > > > > + // make sure the string is terminated
> > > > > + msg.message[msg_len] = '\0';
> > > > > +
> > > > > + return msg;
> > > > > +}
> > > > > +
> > > > > StreamPort::StreamPort(const std::string &port_name) :
> > > > > fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK))
> > > > > {
> > > > > if (fd < 0) {
> > > > > @@ -31,9 +83,23 @@ StreamPort::~StreamPort()
> > > > > close(fd);
> > > > > }
> > > > >
> > > > > -void StreamPort::read(void *buf, size_t len)
> > > > > +InboundMessage StreamPort::receive()
> > > > > {
> > > > > - read_all(fd, buf, len);
> > > > > + std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +
> > > > > + StreamDevHeader header;
> > > > > + read_all(fd, &header, sizeof(header));
> > > > > +
> > > > > + if (header.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > + throw std::runtime_error("Bad protocol version: " +
> > > > > std::to_string(header.protocol_version) +
> > > > > + ", expected: " +
> > > > > std::to_string(STREAM_DEVICE_PROTOCOL));
> > > > > + }
> > > > > +
> > > > > + // TODO should we limit the maximum message size?
> > > >
> > > > This is a regression from previous code, should be added again.
> > >
> > > Yes, the difference is, in the previous code each message had it's own
> > > custom limit. That is not possible anymore with this code (a small
> > > disadvantage, although the modularization is well worth it imo).
> > >
> >
> > A switch won't kill it. Modularity should not be a limit.
>
> Well, it wouldn't kill it, but it would be another place where you have
> to switch over all of the message types. Right now there is only one
> such place, which I find very nice and convenient...
>
> > > So we need to add a limit that is sufficient for the biggest message
> > > that can be received. Which currently seems to be the
> > > NotifyErrorMessage with a size of 1028B. What shall we use as the limit
> > > here? 1028? Add some room for the future and use 2048? Or even bigger?
> > >
> >
> > That make sense too, I think 4K won't kill either.
>
> I'll do that. I think one common message size limit doesn't hurt
> anything here.
>
> > > > > + std::unique_ptr<uint8_t[]> data(new uint8_t[header.size]);
> > > > > + read_all(fd, data.get(), header.size);
> > > > > +
> > > > > + return InboundMessage(header, std::move(data));
> > > > > }
> > > > >
> > > > > void StreamPort::write(const void *buf, size_t len)
> > > > > diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> > > > > index 9187cf5..090930b 100644
> > > > > --- a/src/stream-port.hpp
> > > > > +++ b/src/stream-port.hpp
> > > > > @@ -7,20 +7,59 @@
> > > > > #ifndef SPICE_STREAMING_AGENT_STREAM_PORT_HPP
> > > > > #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP
> > > > >
> > > > > +#include <spice/stream-device.h>
> > > > > +#include <spice/enums.h>
> > > > > +
> > > > > #include <cstddef>
> > > > > #include <string>
> > > > > +#include <memory>
> > > > > #include <mutex>
> > > > > +#include <set>
> > > > >
> > > > >
> > > > > namespace spice {
> > > > > namespace streaming_agent {
> > > > >
> > > > > +struct StartStopMessage
> > > > > +{
> > > > > + bool start_streaming = false;
> > > >
> > > > This has not much to do with the message but with the usage of it,
> > > > the agent should start streaming if there are any codec supported,
> > > > so should test client_codecs only (actually start_streaming ==
> > > > !client_codecs.empty() as currently the agent assume the list
> > > > contains a codecs it can handle, which can be wrong).
> > >
> > > In my opinion the information to start/stop streaming is different from
> > > the information contained in the list of codecs and should be kept
> > > separate. Using the codec list to represent this information is an
> > > abuse.
> > >
> >
> > Yes, is different and is not something the message can say, how the
> > message can know the list of plugin the agent has? Is the agent that
> > knows that, not the message, the agent derive it from its knowledge
> > about plugins and this message.
>
> I'm not sure I understand, do you mean plugins or codecs here? I'm not
> sure how plugins are related, if you mean codecs, I'm still not sure
> what your point is. I'm not arguing about the need for the list of
> codecs and matching them against what's available on the agent.
>
> > > I realize this is already done on the protocol level, but I wanted to
> > > keep the information separate at least here.
> > >
> >
> > What is at protocol level? The protocol just contains a list of
> > codecs supported by the client(s). Is up to the user of the message
> > to decide what to do.
>
> The message is called StartStopStreaming, the purpose of it is to tell
> the agent whether to start or to stop the streaming. The way this
> information is conveyed is that if there are any codecs in the
> client_codecs list, the command is "start", if there are no codecs in
> the list, the command is "stop". So at the protocol level, the contents
> of the client_codecs list is "abused" to infer the command.
>
> Yes, you can (ab)use it this way, because the start/stop command
> coincides with the contents of the client_codecs list. But especially
> with regards to how difficult it is for us to change the protocol, I
> think it should be as clean and as explicit as possible.
>
Oh, make sense, I would put something like (comment and boolean)
// No codecs present indicates to stop streaming
msg.start_streaming = (data[0] != 0);
(or !! instead of != 0)
> Cheers,
> Lukas
>
> > > Thanks for the review,
> > > Lukas
> > >
> > > > > + std::set<SpiceVideoCodecType> client_codecs;
> > > > > +};
> > > > > +
> > > > > +struct InCapabilitiesMessage {};
> > > > > +
> > > > > +struct NotifyErrorMessage
> > > > > +{
> > > > > + uint32_t error_code;
> > > > > + char message[1025];
> > > > > +};
> > > > > +
> > > > > +class InboundMessage
> > > > > +{
> > > > > +public:
> > > > > + InboundMessage(const StreamDevHeader &header,
> > > > > std::unique_ptr<uint8_t[]>
> > > > > &&data);
> > > > > +
> > > > > + template<class Payload> Payload get_payload();
> > > > > +
> > > > > + const StreamDevHeader header;
> > > > > +private:
> > > > > + std::unique_ptr<uint8_t[]> data;
> > > > > +};
> > > > > +
> > > > > +template<>
> > > > > +StartStopMessage InboundMessage::get_payload<StartStopMessage>();
> > > > > +template<>
> > > > > +InCapabilitiesMessage
> > > > > InboundMessage::get_payload<InCapabilitiesMessage>();
> > > > > +template<>
> > > > > +NotifyErrorMessage
> > > > > InboundMessage::get_payload<NotifyErrorMessage>();
> > > > > +
> > > > > class StreamPort {
> > > > > public:
> > > > > StreamPort(const std::string &port_name);
> > > > > ~StreamPort();
> > > > >
> > > > > - void read(void *buf, size_t len);
> > > > > + InboundMessage receive();
> > > > > +
> > > > > void write(const void *buf, size_t len);
> > > > >
> > > > > int fd;
> > > >
> > > > Frediano
>
More information about the Spice-devel
mailing list