[Spice-devel] [PATCH spice-streaming-agent 8/9] Encapsulate the stream port fd and locking

Lukáš Hrázký lhrazky at redhat.com
Fri Apr 27 12:14:39 UTC 2018


Wrap the streaming virtio port along with the mutex to lock it in a
class. Pass the class temporarily around to functions that need it until
the functions too are consolidated into the class.

Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
---
 src/spice-streaming-agent.cpp | 104 +++++++++++++++++-------------------------
 src/stream-port.cpp           |  25 ++++++++++
 src/stream-port.hpp           |  14 ++++++
 3 files changed, 81 insertions(+), 62 deletions(-)

diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 692f067..2fdd02f 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -32,7 +32,6 @@
 #include <exception>
 #include <stdexcept>
 #include <memory>
-#include <mutex>
 #include <thread>
 #include <vector>
 #include <string>
@@ -60,12 +59,10 @@ static bool streaming_requested = false;
 static bool quit_requested = false;
 static bool log_binary = false;
 static std::set<SpiceVideoCodecType> client_codecs;
-static int streamfd = -1;
-static std::mutex stream_mtx;
 
-static int have_something_to_read(int timeout)
+static int have_something_to_read(StreamPort &stream_port, int timeout)
 {
-    struct pollfd pollfd = {streamfd, POLLIN, 0};
+    struct pollfd pollfd = {stream_port.fd, POLLIN, 0};
 
     if (poll(&pollfd, 1, timeout) < 0) {
         syslog(LOG_ERR, "poll FAILED\n");
@@ -79,7 +76,7 @@ static int have_something_to_read(int timeout)
     return 0;
 }
 
-static void handle_stream_start_stop(uint32_t len)
+static void handle_stream_start_stop(StreamPort &stream_port, uint32_t len)
 {
     uint8_t msg[256];
 
@@ -88,7 +85,7 @@ static void handle_stream_start_stop(uint32_t len)
                                  "(longer than " + std::to_string(sizeof(msg)) + ")");
     }
 
-    read_all(streamfd, msg, len);
+    stream_port.read(msg, len);
     streaming_requested = (msg[0] != 0); /* num_codecs */
     syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
            streaming_requested ? "START" : "STOP");
@@ -98,7 +95,7 @@ static void handle_stream_start_stop(uint32_t len)
     }
 }
 
-static void handle_stream_capabilities(uint32_t len)
+static void handle_stream_capabilities(StreamPort &stream_port, uint32_t len)
 {
     uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
 
@@ -106,7 +103,7 @@ static void handle_stream_capabilities(uint32_t len)
         throw std::runtime_error("capability message too long");
     }
 
-    read_all(streamfd, caps, len);
+    stream_port.read(caps, len);
     // we currently do not support extensions so just reply so
     StreamDevHeader hdr = {
         STREAM_DEVICE_PROTOCOL,
@@ -114,10 +111,10 @@ static void handle_stream_capabilities(uint32_t len)
         STREAM_TYPE_CAPABILITIES,
         0
     };
-    write_all(streamfd, &hdr, sizeof(hdr));
+    stream_port.write(&hdr, sizeof(hdr));
 }
 
-static void handle_stream_error(size_t len)
+static void handle_stream_error(StreamPort &stream_port, size_t len)
 {
     if (len < sizeof(StreamMsgNotifyError)) {
         throw std::runtime_error("Received NotifyError message size " + std::to_string(len) +
@@ -131,7 +128,7 @@ static void handle_stream_error(size_t len)
 
     size_t len_to_read = std::min(len, sizeof(msg) - 1);
 
-    read_all(streamfd, &msg, len_to_read);
+    stream_port.read(&msg, len_to_read);
     msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0';
 
     syslog(LOG_ERR, "Received NotifyError message from the server: %d - %s\n",
@@ -143,13 +140,11 @@ static void handle_stream_error(size_t len)
     }
 }
 
-static void read_command_from_device(void)
+static void read_command_from_device(StreamPort &stream_port)
 {
     StreamDevHeader hdr;
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-
-    read_all(streamfd, &hdr, sizeof(hdr));
+    stream_port.read(&hdr, sizeof(hdr));
 
     if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
         throw std::runtime_error("BAD VERSION " + std::to_string(hdr.protocol_version) +
@@ -158,34 +153,34 @@ static void read_command_from_device(void)
 
     switch (hdr.type) {
     case STREAM_TYPE_CAPABILITIES:
-        return handle_stream_capabilities(hdr.size);
+        return handle_stream_capabilities(stream_port, hdr.size);
     case STREAM_TYPE_NOTIFY_ERROR:
-        return handle_stream_error(hdr.size);
+        return handle_stream_error(stream_port, hdr.size);
     case STREAM_TYPE_START_STOP:
-        return handle_stream_start_stop(hdr.size);
+        return handle_stream_start_stop(stream_port, hdr.size);
     }
     throw std::runtime_error("UNKNOWN msg of type " + std::to_string(hdr.type));
 }
 
-static int read_command(bool blocking)
+static int read_command(StreamPort &stream_port, bool blocking)
 {
     int timeout = blocking?-1:0;
     while (!quit_requested) {
-        if (!have_something_to_read(timeout)) {
+        if (!have_something_to_read(stream_port, timeout)) {
             if (!blocking) {
                 return 0;
             }
             sleep(1);
             continue;
         }
-        read_command_from_device();
+        read_command_from_device(stream_port);
         break;
     }
 
     return 1;
 }
 
-static void spice_stream_send_format(unsigned w, unsigned h, unsigned c)
+static void spice_stream_send_format(StreamPort &stream_port, unsigned w, unsigned h, unsigned c)
 {
 
     SpiceStreamFormatMessage msg;
@@ -199,11 +194,10 @@ static void spice_stream_send_format(unsigned w, unsigned h, unsigned c)
     msg.msg.height = h;
     msg.msg.codec = c;
     syslog(LOG_DEBUG, "writing format\n");
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, &msg, msgsize);
+    stream_port.write(&msg, msgsize);
 }
 
-static void spice_stream_send_frame(const void *buf, const unsigned size)
+static void spice_stream_send_frame(StreamPort &stream_port, const void *buf, const unsigned size)
 {
     SpiceStreamDataMessage msg;
     const size_t msgsize = sizeof(msg);
@@ -212,9 +206,8 @@ static void spice_stream_send_frame(const void *buf, const unsigned size)
     msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL;
     msg.hdr.type = STREAM_TYPE_DATA;
     msg.hdr.size = size; /* includes only the body? */
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, &msg, msgsize);
-    write_all(streamfd, buf, size);
+    stream_port.write(&msg, msgsize);
+    stream_port.write(buf, size);
 
     syslog(LOG_DEBUG, "Sent a frame of size %u\n", size);
 }
@@ -264,7 +257,7 @@ static void usage(const char *progname)
 }
 
 static void
-send_cursor(unsigned width, unsigned height, int hotspot_x, int hotspot_y,
+send_cursor(StreamPort &stream_port, unsigned width, unsigned height, int hotspot_x, int hotspot_y,
             std::function<void(uint32_t *)> fill_cursor)
 {
     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
@@ -294,11 +287,10 @@ send_cursor(unsigned width, unsigned height, int hotspot_x, int hotspot_y,
     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data);
     fill_cursor(pixels);
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, msg.get(), cursor_size);
+    stream_port.write(msg.get(), cursor_size);
 }
 
-static void cursor_changes(Display *display, int event_base)
+static void cursor_changes(StreamPort *stream_port, Display *display, int event_base)
 {
     unsigned long last_serial = 0;
 
@@ -323,26 +315,18 @@ static void cursor_changes(Display *display, int event_base)
             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
                 pixels[i] = cursor->pixels[i];
         };
-        send_cursor(cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
+        send_cursor(*stream_port, cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
     }
 }
 
 static void
-do_capture(const char *streamport, FILE *f_log)
+do_capture(StreamPort &stream_port, FILE *f_log)
 {
-    streamfd = open(streamport, O_RDWR | O_NONBLOCK);
-    if (streamfd < 0) {
-        throw std::runtime_error("failed to open the streaming device (" +
-                                 std::string(streamport) + "): "
-                                 + strerror(errno));
-    }
-
     unsigned int frame_count = 0;
     while (!quit_requested) {
         while (!quit_requested && !streaming_requested) {
-            if (read_command(true) < 0) {
-                syslog(LOG_ERR, "FAILED to read command\n");
-                goto done;
+            if (read_command(stream_port, true) < 0) {
+                throw std::runtime_error("FAILED to read command");
             }
         }
 
@@ -385,7 +369,7 @@ do_capture(const char *streamport, FILE *f_log)
 
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
 
-                spice_stream_send_format(width, height, codec);
+                spice_stream_send_format(stream_port, width, height, codec);
             }
             if (f_log) {
                 if (log_binary) {
@@ -398,32 +382,25 @@ do_capture(const char *streamport, FILE *f_log)
             }
 
             try {
-                spice_stream_send_frame(frame.buffer, frame.buffer_size);
+                spice_stream_send_frame(stream_port, frame.buffer, frame.buffer_size);
             } catch (const WriteError& e) {
                 syslog(e);
                 break;
             }
 
             //usleep(1);
-            if (read_command(false) < 0) {
-                syslog(LOG_ERR, "FAILED to read command\n");
-                goto done;
+            if (read_command(stream_port, false) < 0) {
+                throw std::runtime_error("FAILED to read command");
             }
         }
     }
-
-done:
-    if (streamfd >= 0) {
-        close(streamfd);
-        streamfd = -1;
-    }
 }
 
 #define arg_error(...) syslog(LOG_ERR, ## __VA_ARGS__);
 
 int main(int argc, char* argv[])
 {
-    const char *streamport = "/dev/virtio-ports/org.spice-space.stream.0";
+    const char *stream_port_name = "/dev/virtio-ports/org.spice-space.stream.0";
     int opt;
     const char *log_filename = NULL;
     int logmask = LOG_UPTO(LOG_WARNING);
@@ -454,7 +431,7 @@ int main(int argc, char* argv[])
             pluginsdir = optarg;
             break;
         case 'p':
-            streamport = optarg;
+            stream_port_name = optarg;
             break;
         case 'c': {
             char *p = strchr(optarg, '=');
@@ -512,12 +489,15 @@ int main(int argc, char* argv[])
     Window rootwindow = DefaultRootWindow(display);
     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
 
-    std::thread cursor_th(cursor_changes, display, event_base);
-    cursor_th.detach();
-
     int ret = EXIT_SUCCESS;
+
     try {
-        do_capture(streamport, f_log);
+        StreamPort stream_port(stream_port_name);
+
+        std::thread cursor_th(cursor_changes, &stream_port, display, event_base);
+        cursor_th.detach();
+
+        do_capture(stream_port, f_log);
     }
     catch (std::exception &err) {
         syslog(LOG_ERR, "%s\n", err.what());
diff --git a/src/stream-port.cpp b/src/stream-port.cpp
index ee85179..3cd4753 100644
--- a/src/stream-port.cpp
+++ b/src/stream-port.cpp
@@ -8,6 +8,7 @@
 #include "error.hpp"
 
 #include <errno.h>
+#include <fcntl.h>
 #include <poll.h>
 #include <string.h>
 #include <syslog.h>
@@ -18,6 +19,30 @@
 namespace spice {
 namespace streaming_agent {
 
+StreamPort::StreamPort(const std::string &port_name) : fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK))
+{
+    if (fd < 0) {
+        throw IOError("Failed to open the streaming device \"" + port_name + "\"", errno);
+    }
+}
+
+StreamPort::~StreamPort()
+{
+    close(fd);
+}
+
+void StreamPort::read(void *buf, size_t len)
+{
+    std::lock_guard<std::mutex> guard(mutex);
+    read_all(fd, buf, len);
+}
+
+void StreamPort::write(const void *buf, size_t len)
+{
+    std::lock_guard<std::mutex> guard(mutex);
+    write_all(fd, buf, len);
+}
+
 void read_all(int fd, void *buf, size_t len)
 {
     while (len > 0) {
diff --git a/src/stream-port.hpp b/src/stream-port.hpp
index b2d8352..9187cf5 100644
--- a/src/stream-port.hpp
+++ b/src/stream-port.hpp
@@ -8,11 +8,25 @@
 #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP
 
 #include <cstddef>
+#include <string>
+#include <mutex>
 
 
 namespace spice {
 namespace streaming_agent {
 
+class StreamPort {
+public:
+    StreamPort(const std::string &port_name);
+    ~StreamPort();
+
+    void read(void *buf, size_t len);
+    void write(const void *buf, size_t len);
+
+    int fd;
+    std::mutex mutex;
+};
+
 void read_all(int fd, void *buf, size_t len);
 void write_all(int fd, const void *buf, size_t len);
 
-- 
2.16.2



More information about the Spice-devel mailing list