[Spice-devel] [PATCH 09/22] Move read, write, handle and locking into the 'Stream' class

Christophe de Dinechin christophe at dinechin.org
Wed Feb 28 15:43:12 UTC 2018


From: Christophe de Dinechin <dinechin at redhat.com>

The 'Stream' class is designed to abstract file I/O. In a subsequent
patch, message formatting will be isolated out of the class, but in
order to minimize code changes, this intermediate step simply moves
the corresponding functions within the Stream class.

Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
---
 src/spice-streaming-agent.cpp | 108 +++++++++++++++++++++++-------------------
 1 file changed, 58 insertions(+), 50 deletions(-)

diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 21f9c31..4d24234 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -40,8 +40,6 @@
 
 using namespace spice::streaming_agent;
 
-static size_t write_all(int fd, const void *buf, const size_t len);
-
 static ConcreteAgent agent;
 
 namespace spice
@@ -72,31 +70,44 @@ class Stream
 public:
     Stream(const char *name)
     {
-        fd = open(name, O_RDWR);
-        if (fd < 0) {
+        streamfd = open(name, O_RDWR);
+        if (streamfd < 0) {
             throw std::runtime_error("failed to open streaming device");
         }
     }
     ~Stream()
     {
-        close(fd);
+        close(streamfd);
     }
-    int file_descriptor() { return fd; }
+
+    int read_command(bool blocking);
+    size_t write_all(const void *buf, const size_t len);
+    int send_format(unsigned w, unsigned h, uint8_t c);
+    int send_frame(const void *buf, const unsigned size);
+    void send_cursor(uint16_t width, uint16_t height,
+                     uint16_t hotspot_x, uint16_t hotspot_y,
+                     std::function<void(uint32_t *)> fill_cursor);
+
+private:
+    int have_something_to_read(int timeout);
+    void handle_stream_start_stop(uint32_t len);
+    void handle_stream_capabilities(uint32_t len);
+    void handle_stream_error(uint32_t len);
+    void read_command_from_device(void);
 
 private:
-    int fd = -1;
+    int streamfd = -1;
+    std::mutex mutex;
 };
 
 }} // namespace spice::streaming_agent
 
-
 static bool streaming_requested = false;
 static bool quit_requested = false;
 static bool log_binary = false;
 static std::set<SpiceVideoCodecType> client_codecs;
-static std::mutex stream_mtx;
 
-static int have_something_to_read(int streamfd, int timeout)
+int Stream::have_something_to_read(int timeout)
 {
     struct pollfd pollfd = {streamfd, POLLIN, 0};
 
@@ -112,7 +123,7 @@ static int have_something_to_read(int streamfd, int timeout)
     return 0;
 }
 
-static void handle_stream_start_stop(int streamfd, uint32_t len)
+void Stream::handle_stream_start_stop(uint32_t len)
 {
     uint8_t msg[256];
 
@@ -134,7 +145,7 @@ static void handle_stream_start_stop(int streamfd, uint32_t len)
     }
 }
 
-static void handle_stream_capabilities(int streamfd, uint32_t len)
+void Stream::handle_stream_capabilities(uint32_t len)
 {
     uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
 
@@ -154,23 +165,23 @@ static void handle_stream_capabilities(int streamfd, uint32_t len)
         STREAM_TYPE_CAPABILITIES,
         0
     };
-    if (write_all(streamfd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
+    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
         throw std::runtime_error("error writing capabilities");
     }
 }
 
-static void handle_stream_error(int streamfd, uint32_t len)
+void Stream::handle_stream_error(uint32_t len)
 {
     // TODO read message and use it
     throw std::runtime_error("got an error message from server");
 }
 
-static void read_command_from_device(int streamfd)
+void Stream::read_command_from_device()
 {
     StreamDevHeader hdr;
     int n;
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
+    std::lock_guard<std::mutex> stream_guard(mutex);
     n = read(streamfd, &hdr, sizeof(hdr));
     if (n != sizeof(hdr)) {
         throw std::runtime_error("read command from device FAILED -- read " + std::to_string(n) +
@@ -183,39 +194,38 @@ static void read_command_from_device(int streamfd)
 
     switch (hdr.type) {
     case STREAM_TYPE_CAPABILITIES:
-        return handle_stream_capabilities(streamfd, hdr.size);
+        return handle_stream_capabilities(hdr.size);
     case STREAM_TYPE_NOTIFY_ERROR:
-        return handle_stream_error(streamfd, hdr.size);
+        return handle_stream_error(hdr.size);
     case STREAM_TYPE_START_STOP:
-        return handle_stream_start_stop(streamfd, hdr.size);
+        return handle_stream_start_stop(hdr.size);
     }
     throw std::runtime_error("UNKNOWN msg of type " + std::to_string(hdr.type));
 }
 
-static int read_command(int streamfd, bool blocking)
+int Stream::read_command(bool blocking)
 {
     int timeout = blocking?-1:0;
     while (!quit_requested) {
-        if (!have_something_to_read(streamfd, timeout)) {
+        if (!have_something_to_read(timeout)) {
             if (!blocking) {
                 return 0;
             }
             sleep(1);
             continue;
         }
-        read_command_from_device(streamfd);
+        read_command_from_device();
         break;
     }
 
     return 1;
 }
 
-static size_t
-write_all(int fd, const void *buf, const size_t len)
+size_t Stream::write_all(const void *buf, const size_t len)
 {
     size_t written = 0;
     while (written < len) {
-        int l = write(fd, (const char *) buf + written, len - written);
+        int l = write(streamfd, (const char *) buf + written, len - written);
         if (l < 0) {
             if (errno == EINTR) {
                 continue;
@@ -229,7 +239,7 @@ write_all(int fd, const void *buf, const size_t len)
     return written;
 }
 
-static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
+int Stream::send_format(unsigned w, unsigned h, uint8_t c)
 {
     const size_t msgsize = sizeof(FormatMessage);
     const size_t hdrsize  = sizeof(StreamDevHeader);
@@ -248,14 +258,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
         }
     };
     syslog(LOG_DEBUG, "writing format\n");
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    if (write_all(streamfd, &msg, msgsize) != msgsize) {
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    if (write_all(&msg, msgsize) != msgsize) {
         return EXIT_FAILURE;
     }
     return EXIT_SUCCESS;
 }
 
-static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
+int Stream::send_frame(const void *buf, const unsigned size)
 {
     ssize_t n;
     const size_t msgsize = sizeof(FormatMessage);
@@ -269,8 +279,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
         .msg = {}
     };
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    n = write_all(streamfd, &msg, msgsize);
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    n = write_all(&msg, msgsize);
     syslog(LOG_DEBUG,
            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
            n, msg.hdr.size);
@@ -279,7 +289,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
                n, msgsize);
         return EXIT_FAILURE;
     }
-    n = write_all(streamfd, buf, size);
+    n = write_all(buf, size);
     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
     if (n != size) {
         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
@@ -333,11 +343,10 @@ static void usage(const char *progname)
     exit(1);
 }
 
-static void
-send_cursor(int streamfd,
-            uint16_t width, uint16_t height,
-            uint16_t hotspot_x, uint16_t hotspot_y,
-            std::function<void(uint32_t *)> fill_cursor)
+void
+Stream::send_cursor(uint16_t width, uint16_t height,
+                    uint16_t hotspot_x, uint16_t hotspot_y,
+                    std::function<void(uint32_t *)> fill_cursor)
 {
     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
         return;
@@ -370,11 +379,11 @@ send_cursor(int streamfd,
     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
     fill_cursor(pixels);
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, storage.get(), msgsize);
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    write_all(storage.get(), msgsize);
 }
 
-static void cursor_changes(int streamfd, Display *display, int event_base)
+static void cursor_changes(Stream *stream, Display *display, int event_base)
 {
     unsigned long last_serial = 0;
 
@@ -399,18 +408,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
                 pixels[i] = cursor->pixels[i];
         };
-        send_cursor(streamfd,
-                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
+        stream->send_cursor(cursor->width, cursor->height,
+                            cursor->xhot, cursor->yhot, fill_cursor);
     }
 }
 
 static void
-do_capture(int streamfd, const char *streamport, FILE *f_log)
+do_capture(Stream &stream, const char *streamport, FILE *f_log)
 {
     unsigned int frame_count = 0;
     while (!quit_requested) {
         while (!quit_requested && !streaming_requested) {
-            if (read_command(streamfd, true) < 0) {
+            if (stream.read_command(true) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -455,7 +464,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
 
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
 
-                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE) {
+                if (stream.send_format(width, height, codec) == EXIT_FAILURE) {
                     throw std::runtime_error("FAILED to send format message");
                 }
             }
@@ -468,13 +477,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
                     hexdump(frame.buffer, frame.buffer_size, f_log);
                 }
             }
-            if (spice_stream_send_frame(streamfd,
-                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
+            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
                 syslog(LOG_ERR, "FAILED to send a frame\n");
                 break;
             }
             //usleep(1);
-            if (read_command(streamfd, false) < 0) {
+            if (stream.read_command(false) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -576,12 +584,12 @@ int main(int argc, char* argv[])
     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
 
     Stream stream(streamport);
-    std::thread cursor_th(cursor_changes, stream.file_descriptor(), display, event_base);
+    std::thread cursor_th(cursor_changes, &stream, display, event_base);
     cursor_th.detach();
 
     int ret = EXIT_SUCCESS;
     try {
-        do_capture(stream.file_descriptor(), streamport, f_log);
+        do_capture(stream, streamport, f_log);
     }
     catch (std::runtime_error &err) {
         syslog(LOG_ERR, "%s\n", err.what());
-- 
2.13.5 (Apple Git-94)



More information about the Spice-devel mailing list