[Spice-devel] [PATCH spice-streaming-agent v2 7/9] Change the mode to NONBLOCK for the virtio port
Lukáš Hrázký
lhrazky at redhat.com
Wed May 16 16:26:05 UTC 2018
In blocking mode, the IO operations block indefinitely if the server
closes the virtio port on it's side. Change to non-blocking mode, so
that we can quit the streaming agent in case the port gets closed.
Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
---
src/error.hpp | 2 ++
src/spice-streaming-agent.cpp | 4 ++--
src/stream-port.cpp | 49 ++++++++++++++++++++++++++++++++++----
src/unittests/test-stream-port.cpp | 22 +++++++++++++----
4 files changed, 66 insertions(+), 11 deletions(-)
diff --git a/src/error.hpp b/src/error.hpp
index 333a481..e30990f 100644
--- a/src/error.hpp
+++ b/src/error.hpp
@@ -24,6 +24,8 @@ public:
class IOError : public Error
{
public:
+ using Error::Error;
+
IOError(const std::string &msg, int sys_errno);
};
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 26258d0..692f067 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -72,7 +72,7 @@ static int have_something_to_read(int timeout)
return -1;
}
- if (pollfd.revents == POLLIN) {
+ if (pollfd.revents & POLLIN) {
return 1;
}
@@ -330,7 +330,7 @@ static void cursor_changes(Display *display, int event_base)
static void
do_capture(const char *streamport, FILE *f_log)
{
- streamfd = open(streamport, O_RDWR);
+ streamfd = open(streamport, O_RDWR | O_NONBLOCK);
if (streamfd < 0) {
throw std::runtime_error("failed to open the streaming device (" +
std::string(streamport) + "): "
diff --git a/src/stream-port.cpp b/src/stream-port.cpp
index cee63ac..72364bd 100644
--- a/src/stream-port.cpp
+++ b/src/stream-port.cpp
@@ -8,6 +8,7 @@
#include "error.hpp"
#include <errno.h>
+#include <poll.h>
#include <string.h>
#include <syslog.h>
#include <unistd.h>
@@ -22,9 +23,31 @@ void read_all(int fd, void *buf, size_t len)
while (len > 0) {
ssize_t n = read(fd, buf, len);
+ if (n == 0) {
+ throw ReadError("Reading message from device failed: read() returned 0, device is closed.");
+ }
+
if (n < 0) {
- if (errno == EINTR) {
- continue;
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ struct pollfd pollfd = {fd, POLLIN, 0};
+ if (poll(&pollfd, 1, -1) < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ throw ReadError("poll failed while reading message from device", errno);
+ }
+
+ if (pollfd.revents & POLLIN) {
+ continue;
+ }
+
+ if (pollfd.revents & POLLHUP) {
+ throw ReadError("Reading message from device failed: The device is closed.");
+ }
+
+ throw ReadError("Reading message from device failed: poll returned " +
+ std::to_string(pollfd.revents));
}
throw ReadError("Reading message from device failed", errno);
}
@@ -40,8 +63,26 @@ void write_all(int fd, const void *buf, size_t len)
ssize_t n = write(fd, buf, len);
if (n < 0) {
- if (errno == EINTR) {
- continue;
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ struct pollfd pollfd = {fd, POLLOUT, 0};
+ if (poll(&pollfd, 1, -1) < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ throw WriteError("poll failed while writing message to device", errno);
+ }
+
+ if (pollfd.revents & POLLOUT) {
+ continue;
+ }
+
+ if (pollfd.revents & POLLHUP) {
+ throw WriteError("Writing message to device failed: The device is closed.");
+ }
+
+ throw WriteError("Writing message to device failed: poll returned " +
+ std::to_string(pollfd.revents));
}
throw WriteError("Writing message to device failed", errno);
}
diff --git a/src/unittests/test-stream-port.cpp b/src/unittests/test-stream-port.cpp
index 3f9dadf..eb02142 100644
--- a/src/unittests/test-stream-port.cpp
+++ b/src/unittests/test-stream-port.cpp
@@ -1,8 +1,10 @@
#define CATCH_CONFIG_MAIN
#include <catch/catch.hpp>
#include <sys/socket.h>
+#include <signal.h>
#include "stream-port.hpp"
+#include "error.hpp"
namespace ssa = spice::streaming_agent;
@@ -12,12 +14,18 @@ namespace ssa = spice::streaming_agent;
* that is actually used for the real interface.
*/
SCENARIO("test basic IO on the stream port", "[port][io]") {
+ // When trying to write to a socket that was closed on the other side, the
+ // process receives a SIGPIPE, which is a difference to the virtio port,
+ // which returns EAGAIN from write(). By ignoring the SIGPIPE we get EPIPE
+ // from write() instead.
+ signal(SIGPIPE, SIG_IGN);
+
GIVEN("An open port (socketpair)") {
int fd[2];
const char *src_buf = "brekeke";
const size_t src_size = strlen(src_buf);
- socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
+ socketpair(AF_LOCAL, SOCK_STREAM | SOCK_NONBLOCK, 0, fd);
WHEN("reading data in one go") {
CHECK(write(fd[0], src_buf, src_size) == src_size);
@@ -50,16 +58,20 @@ SCENARIO("test basic IO on the stream port", "[port][io]") {
CHECK(close(fd[0]) == 0);
ssa::read_all(fd[1], buf, 4);
CHECK(std::string(buf, 4) == "keke");
- // TODO blocks infinitely, we should recognize the remote end is closed
- //ssa::read_all(fd[1], buf, 1);
+ CHECK_THROWS_AS(ssa::read_all(fd[1], buf, 1), ssa::ReadError);
}
+ // This test behaves differently with socketpair than it does with the virtio port:
+ // real case:
+ // - write() on the virtio port returns EAGAIN
+ // - subsequent poll() on the port returns POLLHUP, which throws WriteError
+ // test case:
+ // - write() on the socketpair returns EPIPE, which throws WriteError
WHEN("closing the remote end and trying to write") {
ssa::write_all(fd[1], src_buf, src_size);
char buf[10];
CHECK(close(fd[0]) == 0);
- // TODO causes a SIGPIPE
- //ssa::write_all(fd[1], src_buf, src_size);
+ CHECK_THROWS_AS(ssa::write_all(fd[1], src_buf, src_size), ssa::WriteError);
}
// clean up the descriptors in case they are still open
--
2.16.2
More information about the Spice-devel
mailing list