[Spice-devel] [PATCH v2 11/24] Move read, write and locking into the 'Stream' class

Christophe de Dinechin christophe at dinechin.org
Wed Feb 21 17:46:23 UTC 2018


From: Christophe de Dinechin <dinechin at redhat.com>

Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
---
 src/spice-streaming-agent.cpp | 105 +++++++++++++++++++++++-------------------
 1 file changed, 57 insertions(+), 48 deletions(-)

diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 3c4fb67..9f4183c 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -41,8 +41,6 @@
 
 using namespace spice::streaming_agent;
 
-static size_t write_all(int fd, const void *buf, const size_t len);
-
 static ConcreteAgent agent;
 
 namespace spice
@@ -73,18 +71,33 @@ class Stream
 public:
     Stream(const char *name)
     {
-        fd = open(name, O_RDWR);
-        if (fd < 0)
+        streamfd = open(name, O_RDWR);
+        if (streamfd < 0)
             throw std::runtime_error("failed to open streaming device");
     }
     ~Stream()
     {
-        close(fd);
+        close(streamfd);
     }
-    operator int() { return fd; }
+    operator int() { return streamfd; }
+
+    int have_something_to_read(int timeout);
+    void handle_stream_start_stop(uint32_t len);
+    void handle_stream_capabilities(uint32_t len);
+    void handle_stream_error(uint32_t len);
+    void read_command_from_device(void);
+    int read_command(bool blocking);
+
+    size_t write_all(const void *buf, const size_t len);
+    int send_format(unsigned w, unsigned h, uint8_t c);
+    int send_frame(const void *buf, const unsigned size);
+    void send_cursor(uint16_t width, uint16_t height,
+                     uint16_t hotspot_x, uint16_t hotspot_y,
+                     std::function<void(uint32_t *)> fill_cursor);
 
 private:
-    int fd = -1;
+    int streamfd = -1;
+    std::mutex mutex;
 };
 
 }} // namespace spice::streaming_agent
@@ -94,9 +107,8 @@ static bool streaming_requested = false;
 static bool quit_requested = false;
 static bool log_binary = false;
 static std::set<SpiceVideoCodecType> client_codecs;
-static std::mutex stream_mtx;
 
-static int have_something_to_read(int streamfd, int timeout)
+int Stream::have_something_to_read(int timeout)
 {
     struct pollfd pollfd = {streamfd, POLLIN, 0};
 
@@ -112,7 +124,7 @@ static int have_something_to_read(int streamfd, int timeout)
     return 0;
 }
 
-static void handle_stream_start_stop(int streamfd, uint32_t len)
+void Stream::handle_stream_start_stop(uint32_t len)
 {
     uint8_t msg[256];
 
@@ -134,7 +146,7 @@ static void handle_stream_start_stop(int streamfd, uint32_t len)
     }
 }
 
-static void handle_stream_capabilities(int streamfd, uint32_t len)
+void Stream::handle_stream_capabilities(uint32_t len)
 {
     uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
 
@@ -154,23 +166,23 @@ static void handle_stream_capabilities(int streamfd, uint32_t len)
         STREAM_TYPE_CAPABILITIES,
         0
     };
-    if (write_all(streamfd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
+    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
         throw std::runtime_error("error writing capabilities");
     }
 }
 
-static void handle_stream_error(int streamfd, uint32_t len)
+void Stream::handle_stream_error(uint32_t len)
 {
     // TODO read message and use it
     throw std::runtime_error("got an error message from server");
 }
 
-static void read_command_from_device(int streamfd)
+void Stream::read_command_from_device()
 {
     StreamDevHeader hdr;
     int n;
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
+    std::lock_guard<std::mutex> stream_guard(mutex);
     n = read(streamfd, &hdr, sizeof(hdr));
     if (n != sizeof(hdr)) {
         throw std::runtime_error("read command from device FAILED -- read " + std::to_string(n) +
@@ -183,39 +195,38 @@ static void read_command_from_device(int streamfd)
 
     switch (hdr.type) {
     case STREAM_TYPE_CAPABILITIES:
-        return handle_stream_capabilities(streamfd, hdr.size);
+        return handle_stream_capabilities(hdr.size);
     case STREAM_TYPE_NOTIFY_ERROR:
-        return handle_stream_error(streamfd, hdr.size);
+        return handle_stream_error(hdr.size);
     case STREAM_TYPE_START_STOP:
-        return handle_stream_start_stop(streamfd, hdr.size);
+        return handle_stream_start_stop(hdr.size);
     }
     throw std::runtime_error("UNKNOWN msg of type " + std::to_string(hdr.type));
 }
 
-static int read_command(int streamfd, bool blocking)
+int Stream::read_command(bool blocking)
 {
     int timeout = blocking?-1:0;
     while (!quit_requested) {
-        if (!have_something_to_read(streamfd, timeout)) {
+        if (!have_something_to_read(timeout)) {
             if (!blocking) {
                 return 0;
             }
             sleep(1);
             continue;
         }
-        read_command_from_device(streamfd);
+        read_command_from_device();
         break;
     }
 
     return 1;
 }
 
-static size_t
-write_all(int fd, const void *buf, const size_t len)
+size_t Stream::write_all(const void *buf, const size_t len)
 {
     size_t written = 0;
     while (written < len) {
-        int l = write(fd, (const char *) buf + written, len - written);
+        int l = write(streamfd, (const char *) buf + written, len - written);
         if (l < 0) {
             if (errno == EINTR) {
                 continue;
@@ -229,7 +240,7 @@ write_all(int fd, const void *buf, const size_t len)
     return written;
 }
 
-static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
+int Stream::send_format(unsigned w, unsigned h, uint8_t c)
 {
     const size_t msgsize = sizeof(FormatMessage);
     const size_t hdrsize  = sizeof(StreamDevHeader);
@@ -248,14 +259,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
         }
     };
     syslog(LOG_DEBUG, "writing format\n");
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    if (write_all(streamfd, &msg, msgsize) != msgsize) {
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    if (write_all(&msg, msgsize) != msgsize) {
         return EXIT_FAILURE;
     }
     return EXIT_SUCCESS;
 }
 
-static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
+int Stream::send_frame(const void *buf, const unsigned size)
 {
     ssize_t n;
     const size_t msgsize = sizeof(FormatMessage);
@@ -269,8 +280,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
         .msg = {}
     };
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    n = write_all(streamfd, &msg, msgsize);
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    n = write_all(&msg, msgsize);
     syslog(LOG_DEBUG,
            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
            n, msg.hdr.size);
@@ -279,7 +290,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
                n, msgsize);
         return EXIT_FAILURE;
     }
-    n = write_all(streamfd, buf, size);
+    n = write_all(buf, size);
     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
     if (n != size) {
         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
@@ -332,11 +343,10 @@ static void usage(const char *progname)
     exit(1);
 }
 
-static void
-send_cursor(int streamfd,
-            uint16_t width, uint16_t height,
-            uint16_t hotspot_x, uint16_t hotspot_y,
-            std::function<void(uint32_t *)> fill_cursor)
+void
+Stream::send_cursor(uint16_t width, uint16_t height,
+                    uint16_t hotspot_x, uint16_t hotspot_y,
+                    std::function<void(uint32_t *)> fill_cursor)
 {
     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
@@ -370,11 +380,11 @@ send_cursor(int streamfd,
     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
     fill_cursor(pixels);
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, storage.get(), cursor_msgsize);
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    write_all(storage.get(), cursor_msgsize);
 }
 
-static void cursor_changes(int streamfd, Display *display, int event_base)
+static void cursor_changes(Stream *stream, Display *display, int event_base)
 {
     unsigned long last_serial = 0;
 
@@ -396,18 +406,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
                 pixels[i] = cursor->pixels[i];
         };
-        send_cursor(streamfd,
-                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
+        stream->send_cursor(cursor->width, cursor->height,
+                            cursor->xhot, cursor->yhot, fill_cursor);
     }
 }
 
 static void
-do_capture(int streamfd, const char *streamport, FILE *f_log)
+do_capture(Stream &stream, const char *streamport, FILE *f_log)
 {
     unsigned int frame_count = 0;
     while (!quit_requested) {
         while (!quit_requested && !streaming_requested) {
-            if (read_command(streamfd, true) < 0) {
+            if (stream.read_command(true) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -451,7 +461,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
 
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
 
-                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
+                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
                     throw std::runtime_error("FAILED to send format message");
             }
             if (f_log) {
@@ -463,13 +473,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
                     hexdump(frame.buffer, frame.buffer_size, f_log);
                 }
             }
-            if (spice_stream_send_frame(streamfd,
-                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
+            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
                 syslog(LOG_ERR, "FAILED to send a frame\n");
                 break;
             }
             //usleep(1);
-            if (read_command(streamfd, false) < 0) {
+            if (stream.read_command(false) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -561,7 +570,7 @@ int main(int argc, char* argv[])
     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
 
     Stream streamfd(streamport);
-    std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
+    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
     cursor_th.detach();
 
     int ret = EXIT_SUCCESS;
-- 
2.13.5 (Apple Git-94)



More information about the Spice-devel mailing list