[Spice-devel] [PATCH spice-streaming-agent 1/3] Introduce InboundMessages for the StreamPort class
Frediano Ziglio
fziglio at redhat.com
Mon Oct 8 11:03:44 UTC 2018
>
> Wraps the deserialization of the received messages in an InboundMessages
> class. The class is created with the deserialized header and the raw
> data of the message. A template function get_payload() returns the
> struct of the concrete message. The function is specialized for each
> incoming message.
>
> While this leaves the responsibility to call the get_payload() function
> with the message according to the type in the header to the caller, the
> solution preserves the efficiency of the original implementation without
> introducing too much complexity around the separation of the code.
>
> Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> ---
> src/spice-streaming-agent.cpp | 115 +++++++++-------------------------
> src/stream-port.cpp | 70 ++++++++++++++++++++-
> src/stream-port.hpp | 41 +++++++++++-
> 3 files changed, 139 insertions(+), 87 deletions(-)
>
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index a9baf4d..a89ba3f 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -77,92 +77,39 @@ static bool have_something_to_read(StreamPort
> &stream_port, bool blocking)
> return false;
> }
>
> -static void handle_stream_start_stop(StreamPort &stream_port, uint32_t len)
> -{
> - uint8_t msg[256];
> -
> - if (len >= sizeof(msg)) {
> - throw std::runtime_error("msg size (" + std::to_string(len) + ") is
> too long "
> - "(longer than " +
> std::to_string(sizeof(msg)) + ")");
> - }
> -
> - stream_port.read(msg, len);
> - streaming_requested = (msg[0] != 0); /* num_codecs */
> - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming",
> - streaming_requested ? "START" : "STOP");
> - client_codecs.clear();
> - for (int i = 1; i <= msg[0]; ++i) {
> - client_codecs.insert((SpiceVideoCodecType) msg[i]);
> - }
> -}
> -
> -static void handle_stream_capabilities(StreamPort &stream_port, uint32_t
> len)
> -{
> - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> -
> - if (len > sizeof(caps)) {
> - throw std::runtime_error("capability message too long");
> - }
> -
> - stream_port.read(caps, len);
> - // we currently do not support extensions so just reply so
> - StreamDevHeader hdr = {
> - STREAM_DEVICE_PROTOCOL,
> - 0,
> - STREAM_TYPE_CAPABILITIES,
> - 0
> - };
> -
> - stream_port.write(&hdr, sizeof(hdr));
> -}
> -
> -static void handle_stream_error(StreamPort &stream_port, size_t len)
> -{
> - if (len < sizeof(StreamMsgNotifyError)) {
> - throw std::runtime_error("Received NotifyError message size " +
> std::to_string(len) +
> - " is too small (smaller than " +
> -
> std::to_string(sizeof(StreamMsgNotifyError))
> + ")");
> - }
> -
> - struct StreamMsgNotifyError1K : StreamMsgNotifyError {
> - uint8_t msg[1024];
> - } msg;
> -
> - size_t len_to_read = std::min(len, sizeof(msg) - 1);
> -
> - stream_port.read(&msg, len_to_read);
> - msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0';
> -
> - syslog(LOG_ERR, "Received NotifyError message from the server: %d - %s",
> - msg.error_code, msg.msg);
> -
> - if (len_to_read < len) {
> - throw std::runtime_error("Received NotifyError message size " +
> std::to_string(len) +
> - " is too big (bigger than " +
> std::to_string(sizeof(msg)) + ")");
> - }
> -}
> -
> static void read_command_from_device(StreamPort &stream_port)
> {
> - StreamDevHeader hdr;
> -
> - std::lock_guard<std::mutex> guard(stream_port.mutex);
> - stream_port.read(&hdr, sizeof(hdr));
> -
> - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> - throw std::runtime_error("BAD VERSION " +
> std::to_string(hdr.protocol_version) +
> - " (expected is " +
> std::to_string(STREAM_DEVICE_PROTOCOL) + ")");
> - }
> -
> - switch (hdr.type) {
> - case STREAM_TYPE_CAPABILITIES:
> - return handle_stream_capabilities(stream_port, hdr.size);
> - case STREAM_TYPE_NOTIFY_ERROR:
> - return handle_stream_error(stream_port, hdr.size);
> - case STREAM_TYPE_START_STOP:
> - return handle_stream_start_stop(stream_port, hdr.size);
> - }
> - throw std::runtime_error("UNKNOWN msg of type " +
> std::to_string(hdr.type));
> + InboundMessage in_message = stream_port.receive();
> +
> + switch (in_message.header.type) {
> + case STREAM_TYPE_CAPABILITIES: {
> + StreamDevHeader hdr = {
> + STREAM_DEVICE_PROTOCOL,
> + 0,
> + STREAM_TYPE_CAPABILITIES,
> + 0
> + };
> +
> + std::lock_guard<std::mutex> guard(stream_port.mutex);
> + stream_port.write(&hdr, sizeof(hdr));
> + return;
> + } case STREAM_TYPE_NOTIFY_ERROR: {
Not against this line style but case are not aligned and we never
used this style.
> + NotifyErrorMessage msg =
> in_message.get_payload<NotifyErrorMessage>();
> +
> + syslog(LOG_ERR, "Received NotifyError message from the server: %d -
> %s",
> + msg.error_code, msg.message);
> + return;
> + } case STREAM_TYPE_START_STOP: {
> + StartStopMessage msg = in_message.get_payload<StartStopMessage>();
> + streaming_requested = msg.start_streaming;
> + client_codecs = msg.client_codecs;
> +
> + syslog(LOG_INFO, "GOT START_STOP message -- request to %s
> streaming",
> + streaming_requested ? "START" : "STOP");
> + return;
> + }}
> +
> + throw std::runtime_error("UNKNOWN msg of type " +
> std::to_string(in_message.header.type));
> }
>
> static void read_command(StreamPort &stream_port, bool blocking)
> diff --git a/src/stream-port.cpp b/src/stream-port.cpp
> index 5528854..56747fd 100644
> --- a/src/stream-port.cpp
> +++ b/src/stream-port.cpp
> @@ -19,6 +19,58 @@
> namespace spice {
> namespace streaming_agent {
>
> +InboundMessage::InboundMessage(const StreamDevHeader &header,
> std::unique_ptr<uint8_t[]> &&data) :
> + header(header),
> + data(std::move(data))
> +{}
> +
> +template<>
> +StartStopMessage InboundMessage::get_payload<StartStopMessage>()
> +{
why not also checking that the message is really the right type
instead of assuming the caller is doing the right thing?
> + StartStopMessage msg;
> +
> + msg.start_streaming = data[0]; // num_codecs
> +
> + for (size_t i = 1; i <= data[0]; ++i) {
> + msg.client_codecs.insert((SpiceVideoCodecType) data[i]);
> + }
> +
> + return msg;
> +}
> +
> +template<>
> +InCapabilitiesMessage InboundMessage::get_payload<InCapabilitiesMessage>()
> +{
> + // no capabilities yet
> + return InCapabilitiesMessage();
> +}
> +
> +template<>
> +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>()
> +{
> + if (header.size < sizeof(StreamMsgNotifyError)) {
> + throw std::runtime_error("Received NotifyError message size " +
> std::to_string(header.size) +
> + " is too small (smaller than " +
> +
> std::to_string(sizeof(StreamMsgNotifyError))
> + ")");
> + }
> +
> + size_t msg_len = header.size - sizeof(StreamMsgNotifyError);
> + if (msg_len > 1024) {
> + throw std::runtime_error("Received NotifyError message is too long
> (" +
> + std::to_string(msg_len) + " > 1024)");
> + }
> +
> + StreamMsgNotifyError *raw_message =
> reinterpret_cast<StreamMsgNotifyError*>(data.get());
> +
> + NotifyErrorMessage msg;
> + msg.error_code = raw_message->error_code;
> + strncpy(msg.message, reinterpret_cast<char*>(raw_message->msg),
> msg_len);
> + // make sure the string is terminated
> + msg.message[msg_len] = '\0';
> +
> + return msg;
> +}
> +
> StreamPort::StreamPort(const std::string &port_name) :
> fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK))
> {
> if (fd < 0) {
> @@ -31,9 +83,23 @@ StreamPort::~StreamPort()
> close(fd);
> }
>
> -void StreamPort::read(void *buf, size_t len)
> +InboundMessage StreamPort::receive()
> {
> - read_all(fd, buf, len);
> + std::lock_guard<std::mutex> stream_guard(mutex);
> +
> + StreamDevHeader header;
> + read_all(fd, &header, sizeof(header));
> +
> + if (header.protocol_version != STREAM_DEVICE_PROTOCOL) {
> + throw std::runtime_error("Bad protocol version: " +
> std::to_string(header.protocol_version) +
> + ", expected: " +
> std::to_string(STREAM_DEVICE_PROTOCOL));
> + }
> +
> + // TODO should we limit the maximum message size?
This is a regression from previous code, should be added again.
> + std::unique_ptr<uint8_t[]> data(new uint8_t[header.size]);
> + read_all(fd, data.get(), header.size);
> +
> + return InboundMessage(header, std::move(data));
> }
>
> void StreamPort::write(const void *buf, size_t len)
> diff --git a/src/stream-port.hpp b/src/stream-port.hpp
> index 9187cf5..090930b 100644
> --- a/src/stream-port.hpp
> +++ b/src/stream-port.hpp
> @@ -7,20 +7,59 @@
> #ifndef SPICE_STREAMING_AGENT_STREAM_PORT_HPP
> #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP
>
> +#include <spice/stream-device.h>
> +#include <spice/enums.h>
> +
> #include <cstddef>
> #include <string>
> +#include <memory>
> #include <mutex>
> +#include <set>
>
>
> namespace spice {
> namespace streaming_agent {
>
> +struct StartStopMessage
> +{
> + bool start_streaming = false;
This has not much to do with the message but with the usage of it,
the agent should start streaming if there are any codec supported,
so should test client_codecs only (actually start_streaming ==
!client_codecs.empty() as currently the agent assume the list
contains a codecs it can handle, which can be wrong).
> + std::set<SpiceVideoCodecType> client_codecs;
> +};
> +
> +struct InCapabilitiesMessage {};
> +
> +struct NotifyErrorMessage
> +{
> + uint32_t error_code;
> + char message[1025];
> +};
> +
> +class InboundMessage
> +{
> +public:
> + InboundMessage(const StreamDevHeader &header, std::unique_ptr<uint8_t[]>
> &&data);
> +
> + template<class Payload> Payload get_payload();
> +
> + const StreamDevHeader header;
> +private:
> + std::unique_ptr<uint8_t[]> data;
> +};
> +
> +template<>
> +StartStopMessage InboundMessage::get_payload<StartStopMessage>();
> +template<>
> +InCapabilitiesMessage InboundMessage::get_payload<InCapabilitiesMessage>();
> +template<>
> +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>();
> +
> class StreamPort {
> public:
> StreamPort(const std::string &port_name);
> ~StreamPort();
>
> - void read(void *buf, size_t len);
> + InboundMessage receive();
> +
> void write(const void *buf, size_t len);
>
> int fd;
Frediano
More information about the Spice-devel
mailing list