[Spice-devel] [PATCH spice-streaming-agent 7/9] Change the mode to NONBLOCK for the virtio port
Frediano Ziglio
fziglio at redhat.com
Tue May 15 21:01:37 UTC 2018
>
> In blocking mode, the IO operations block indefinitely if the server
> closes the virtio port on it's side. Change to non-blocking mode, so
> that we can quit the streaming agent in case the port gets closed.
>
> Signed-off-by: Lukáš Hrázký <lhrazky at redhat.com>
> ---
> src/error.cpp | 6 +++++
> src/error.hpp | 3 +++
> src/spice-streaming-agent.cpp | 4 ++--
> src/stream-port.cpp | 49
> ++++++++++++++++++++++++++++++++++----
> src/unittests/test-stream-port.cpp | 22 +++++++++++++----
> 5 files changed, 73 insertions(+), 11 deletions(-)
>
> diff --git a/src/error.cpp b/src/error.cpp
> index 4ef275c..1bbf750 100644
> --- a/src/error.cpp
> +++ b/src/error.cpp
> @@ -19,12 +19,18 @@ const char* Error::what() const noexcept
> return message.c_str();
> }
>
> +IOError::IOError(const std::string &msg) : Error(msg) {}
> +
> IOError::IOError(const std::string &msg, int errno_) :
> Error(msg + ": " + std::to_string(errno_) + " - " + strerror(errno_))
> {}
>
> +ReadError::ReadError(const std::string &msg) : IOError(msg) {}
> +
> ReadError::ReadError(const std::string &msg, int errno_) : IOError(msg,
> errno_) {}
>
> +WriteError::WriteError(const std::string &msg) : IOError(msg) {}
> +
> WriteError::WriteError(const std::string &msg, int errno_) : IOError(msg,
> errno_) {}
>
> }} // namespace spice::streaming_agent
> diff --git a/src/error.hpp b/src/error.hpp
> index d69942c..46fd904 100644
> --- a/src/error.hpp
> +++ b/src/error.hpp
> @@ -28,6 +28,7 @@ protected:
> class IOError : public Error
> {
> public:
> + IOError(const std::string &msg);
> IOError(const std::string &msg, int errno_);
>
> protected:
> @@ -37,12 +38,14 @@ protected:
> class ReadError : public IOError
> {
> public:
> + ReadError(const std::string &msg);
> ReadError(const std::string &msg, int errno_);
> };
>
> class WriteError : public IOError
> {
> public:
> + WriteError(const std::string &msg);
> WriteError(const std::string &msg, int errno_);
> };
>
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 26258d0..692f067 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -72,7 +72,7 @@ static int have_something_to_read(int timeout)
> return -1;
> }
>
> - if (pollfd.revents == POLLIN) {
> + if (pollfd.revents & POLLIN) {
> return 1;
> }
>
> @@ -330,7 +330,7 @@ static void cursor_changes(Display *display, int
> event_base)
> static void
> do_capture(const char *streamport, FILE *f_log)
> {
> - streamfd = open(streamport, O_RDWR);
> + streamfd = open(streamport, O_RDWR | O_NONBLOCK);
> if (streamfd < 0) {
> throw std::runtime_error("failed to open the streaming device (" +
> std::string(streamport) + "): "
> diff --git a/src/stream-port.cpp b/src/stream-port.cpp
> index cee63ac..ee85179 100644
> --- a/src/stream-port.cpp
> +++ b/src/stream-port.cpp
> @@ -8,6 +8,7 @@
> #include "error.hpp"
>
> #include <errno.h>
> +#include <poll.h>
> #include <string.h>
> #include <syslog.h>
> #include <unistd.h>
> @@ -22,9 +23,31 @@ void read_all(int fd, void *buf, size_t len)
> while (len > 0) {
> ssize_t n = read(fd, buf, len);
>
> + if (n == 0) {
> + throw ReadError("Reading message from device failed: read()
> returned 0, device is closed.");
> + }
> +
> if (n < 0) {
> - if (errno == EINTR) {
> - continue;
> + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
> + struct pollfd pollfd = {fd, POLLIN, 0};
> + if (poll(&pollfd, 1, -1) < 0) {
> + if (errno == EINTR) {
> + continue;
> + }
> +
> + throw ReadError("poll failed while reading message from
> device", errno);
> + }
> +
> + if (pollfd.revents & POLLIN) {
> + continue;
> + }
> +
> + if (pollfd.revents & POLLHUP) {
> + throw ReadError("Reading message from device failed: The
> device is closed.");
> + }
> +
> + throw ReadError("Reading message from device failed: poll
> returned " +
> + std::to_string(pollfd.revents));
> }
> throw ReadError("Reading message from device failed", errno);
> }
> @@ -40,8 +63,26 @@ void write_all(int fd, const void *buf, size_t len)
> ssize_t n = write(fd, buf, len);
>
> if (n < 0) {
> - if (errno == EINTR) {
> - continue;
> + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
> + struct pollfd pollfd = {fd, POLLOUT, 0};
> + if (poll(&pollfd, 1, -1) < 0) {
> + if (errno == EINTR) {
> + continue;
> + }
> +
> + throw WriteError("poll failed while writin message to
> device", errno);
typo: writing
> + }
> +
> + if (pollfd.revents & POLLOUT) {
> + continue;
> + }
> +
> + if (pollfd.revents & POLLHUP) {
> + throw WriteError("Writing message to device failed: The
> device is closed.");
> + }
> +
> + throw WriteError("Writing message to device failed: poll
> returned " +
> + std::to_string(pollfd.revents));
> }
> throw WriteError("Writing message to device failed", errno);
> }
> diff --git a/src/unittests/test-stream-port.cpp
> b/src/unittests/test-stream-port.cpp
> index 9added3..98009ac 100644
> --- a/src/unittests/test-stream-port.cpp
> +++ b/src/unittests/test-stream-port.cpp
> @@ -1,8 +1,10 @@
> #define CATCH_CONFIG_MAIN
> #include <catch/catch.hpp>
> #include <sys/socket.h>
> +#include <signal.h>
>
> #include "stream-port.hpp"
> +#include "error.hpp"
>
>
> namespace ssa = spice::streaming_agent;
> @@ -12,12 +14,18 @@ namespace ssa = spice::streaming_agent;
> * that is actually used for the real interface.
> */
> SCENARIO("test basic IO on the stream port", "[port][io]") {
> + // When trying to write to a socket that was closed on the other side,
> the
> + // process receives a SIGPIPE, which is a difference to the virtio port,
> + // which returns EAGAIN from write(). By ignoring the SIGPIPE we get
> EPIPE
> + // from write() instead.
> + signal(SIGPIPE, SIG_IGN);
> +
> GIVEN("An open port (socketpair)") {
> int fd[2];
> const char *src_buf = "brekeke";
> const size_t src_size = strlen(src_buf) + 1;
>
> - socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
> + socketpair(AF_LOCAL, SOCK_STREAM | SOCK_NONBLOCK, 0, fd);
>
> WHEN("reading data in one go") {
> CHECK(write(fd[0], src_buf, src_size) == src_size);
> @@ -50,16 +58,20 @@ SCENARIO("test basic IO on the stream port",
> "[port][io]") {
> CHECK(close(fd[0]) == 0);
> ssa::read_all(fd[1], buf, 5);
> CHECK(std::string(buf) == "keke");
> - // TODO loops infinitely, we should recognize the remote end is
> closed
> - //ssa::read_all(fd[1], buf, 1);
> + CHECK_THROWS_AS(ssa::read_all(fd[1], buf, 1), ssa::ReadError);
Ok, ignore comment on previous patch.
> }
>
> + // This test behaves differently with socketpair than it does with
> the virtio port:
> + // real case:
> + // - write() on the virtio port returns EAGAIN
> + // - subsequent poll() on the port returns POLLHUP, which throws
> WriteError
> + // test case:
> + // - write() on the socketpair returns EPIPE, which throws
> WriteError
> WHEN("closing the remote end and trying to write") {
> ssa::write_all(fd[1], src_buf, src_size);
> char buf[10];
> CHECK(close(fd[0]) == 0);
> - // TODO causes a SIGPIPE
> - //ssa::write_all(fd[1], src_buf, src_size);
> + CHECK_THROWS_AS(ssa::write_all(fd[1], src_buf, src_size),
> ssa::WriteError);
> }
>
> // clean up the descriptors in case they are still open
Acked-by: Frediano Ziglio <fziglio at redhat.com>
Frediano
More information about the Spice-devel
mailing list