[Spice-devel] [PATCH spice-streaming-agent 7/9] Change the mode to NONBLOCK for the virtio port

Lukáš Hrázký lhrazky at redhat.com
Fri Apr 27 12:14:38 UTC 2018


In blocking mode, the IO operations block indefinitely if the server
closes the virtio port on it's side. Change to non-blocking mode, so
that we can quit the streaming agent in case the port gets closed.

Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
---
 src/error.cpp                      |  6 +++++
 src/error.hpp                      |  3 +++
 src/spice-streaming-agent.cpp      |  4 ++--
 src/stream-port.cpp                | 49 ++++++++++++++++++++++++++++++++++----
 src/unittests/test-stream-port.cpp | 22 +++++++++++++----
 5 files changed, 73 insertions(+), 11 deletions(-)

diff --git a/src/error.cpp b/src/error.cpp
index 4ef275c..1bbf750 100644
--- a/src/error.cpp
+++ b/src/error.cpp
@@ -19,12 +19,18 @@ const char* Error::what() const noexcept
     return message.c_str();
 }
 
+IOError::IOError(const std::string &msg) : Error(msg) {}
+
 IOError::IOError(const std::string &msg, int errno_) :
     Error(msg + ": " + std::to_string(errno_) + " - " + strerror(errno_))
 {}
 
+ReadError::ReadError(const std::string &msg) : IOError(msg) {}
+
 ReadError::ReadError(const std::string &msg, int errno_) : IOError(msg, errno_) {}
 
+WriteError::WriteError(const std::string &msg) : IOError(msg) {}
+
 WriteError::WriteError(const std::string &msg, int errno_) : IOError(msg, errno_) {}
 
 }} // namespace spice::streaming_agent
diff --git a/src/error.hpp b/src/error.hpp
index d69942c..46fd904 100644
--- a/src/error.hpp
+++ b/src/error.hpp
@@ -28,6 +28,7 @@ protected:
 class IOError : public Error
 {
 public:
+    IOError(const std::string &msg);
     IOError(const std::string &msg, int errno_);
 
 protected:
@@ -37,12 +38,14 @@ protected:
 class ReadError : public IOError
 {
 public:
+    ReadError(const std::string &msg);
     ReadError(const std::string &msg, int errno_);
 };
 
 class WriteError : public IOError
 {
 public:
+    WriteError(const std::string &msg);
     WriteError(const std::string &msg, int errno_);
 };
 
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 26258d0..692f067 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -72,7 +72,7 @@ static int have_something_to_read(int timeout)
         return -1;
     }
 
-    if (pollfd.revents == POLLIN) {
+    if (pollfd.revents & POLLIN) {
         return 1;
     }
 
@@ -330,7 +330,7 @@ static void cursor_changes(Display *display, int event_base)
 static void
 do_capture(const char *streamport, FILE *f_log)
 {
-    streamfd = open(streamport, O_RDWR);
+    streamfd = open(streamport, O_RDWR | O_NONBLOCK);
     if (streamfd < 0) {
         throw std::runtime_error("failed to open the streaming device (" +
                                  std::string(streamport) + "): "
diff --git a/src/stream-port.cpp b/src/stream-port.cpp
index cee63ac..ee85179 100644
--- a/src/stream-port.cpp
+++ b/src/stream-port.cpp
@@ -8,6 +8,7 @@
 #include "error.hpp"
 
 #include <errno.h>
+#include <poll.h>
 #include <string.h>
 #include <syslog.h>
 #include <unistd.h>
@@ -22,9 +23,31 @@ void read_all(int fd, void *buf, size_t len)
     while (len > 0) {
         ssize_t n = read(fd, buf, len);
 
+        if (n == 0) {
+            throw ReadError("Reading message from device failed: read() returned 0, device is closed.");
+        }
+
         if (n < 0) {
-            if (errno == EINTR) {
-                continue;
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+                struct pollfd pollfd = {fd, POLLIN, 0};
+                if (poll(&pollfd, 1, -1) < 0) {
+                    if (errno == EINTR) {
+                        continue;
+                    }
+
+                    throw ReadError("poll failed while reading message from device", errno);
+                }
+
+                if (pollfd.revents & POLLIN) {
+                    continue;
+                }
+
+                if (pollfd.revents & POLLHUP) {
+                    throw ReadError("Reading message from device failed: The device is closed.");
+                }
+
+                throw ReadError("Reading message from device failed: poll returned " +
+                                std::to_string(pollfd.revents));
             }
             throw ReadError("Reading message from device failed", errno);
         }
@@ -40,8 +63,26 @@ void write_all(int fd, const void *buf, size_t len)
         ssize_t n = write(fd, buf, len);
 
         if (n < 0) {
-            if (errno == EINTR) {
-                continue;
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+                struct pollfd pollfd = {fd, POLLOUT, 0};
+                if (poll(&pollfd, 1, -1) < 0) {
+                    if (errno == EINTR) {
+                        continue;
+                    }
+
+                    throw WriteError("poll failed while writin message to device", errno);
+                }
+
+                if (pollfd.revents & POLLOUT) {
+                    continue;
+                }
+
+                if (pollfd.revents & POLLHUP) {
+                    throw WriteError("Writing message to device failed: The device is closed.");
+                }
+
+                throw WriteError("Writing message to device failed: poll returned " +
+                                 std::to_string(pollfd.revents));
             }
             throw WriteError("Writing message to device failed", errno);
         }
diff --git a/src/unittests/test-stream-port.cpp b/src/unittests/test-stream-port.cpp
index 9added3..98009ac 100644
--- a/src/unittests/test-stream-port.cpp
+++ b/src/unittests/test-stream-port.cpp
@@ -1,8 +1,10 @@
 #define CATCH_CONFIG_MAIN
 #include <catch/catch.hpp>
 #include <sys/socket.h>
+#include <signal.h>
 
 #include "stream-port.hpp"
+#include "error.hpp"
 
 
 namespace ssa = spice::streaming_agent;
@@ -12,12 +14,18 @@ namespace ssa = spice::streaming_agent;
  * that is actually used for the real interface.
  */
 SCENARIO("test basic IO on the stream port", "[port][io]") {
+    // When trying to write to a socket that was closed on the other side, the
+    // process receives a SIGPIPE, which is a difference to the virtio port,
+    // which returns EAGAIN from write(). By ignoring the SIGPIPE we get EPIPE
+    // from write() instead.
+    signal(SIGPIPE, SIG_IGN);
+
     GIVEN("An open port (socketpair)") {
         int fd[2];
         const char *src_buf = "brekeke";
         const size_t src_size = strlen(src_buf) + 1;
 
-        socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
+        socketpair(AF_LOCAL, SOCK_STREAM | SOCK_NONBLOCK, 0, fd);
 
         WHEN("reading data in one go") {
             CHECK(write(fd[0], src_buf, src_size) == src_size);
@@ -50,16 +58,20 @@ SCENARIO("test basic IO on the stream port", "[port][io]") {
             CHECK(close(fd[0]) == 0);
             ssa::read_all(fd[1], buf, 5);
             CHECK(std::string(buf) == "keke");
-            // TODO loops infinitely, we should recognize the remote end is closed
-            //ssa::read_all(fd[1], buf, 1);
+            CHECK_THROWS_AS(ssa::read_all(fd[1], buf, 1), ssa::ReadError);
         }
 
+        // This test behaves differently with socketpair than it does with the virtio port:
+        // real case:
+        // - write() on the virtio port returns EAGAIN
+        // - subsequent poll() on the port returns POLLHUP, which throws WriteError
+        // test case:
+        // - write() on the socketpair returns EPIPE, which throws WriteError
         WHEN("closing the remote end and trying to write") {
             ssa::write_all(fd[1], src_buf, src_size);
             char buf[10];
             CHECK(close(fd[0]) == 0);
-            // TODO causes a SIGPIPE
-            //ssa::write_all(fd[1], src_buf, src_size);
+            CHECK_THROWS_AS(ssa::write_all(fd[1], src_buf, src_size), ssa::WriteError);
         }
 
         // clean up the descriptors in case they are still open
-- 
2.16.2



More information about the Spice-devel mailing list