[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp net/socket.hpp
Michael Meeks
michael.meeks at collabora.com
Wed Feb 15 14:49:27 UTC 2017
net/loolnb.cpp | 250 ++++++++++++++++------------------------------------
net/socket.hpp | 273 +++++++++++++++++++++++++++++++--------------------------
2 files changed, 231 insertions(+), 292 deletions(-)
New commits:
commit ae35938a9dac2fe632b32b072a9cb5469bdbbdb7
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Wed Feb 15 14:48:48 2017 +0000
De-templatize and simplify.
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index da7b17a..bffc684 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -52,147 +52,6 @@ public:
}
};
-/// Handles non-blocking socket event polling.
-/// Only polls on N-Sockets and invokes callback and
-/// doesn't manage buffers or client data.
-/// Note: uses poll(2) since it has very good performance
-/// compared to epoll up to a few hundred sockets and
-/// doesn't suffer select(2)'s poor API. Since this will
-/// be used per-document we don't expect to have several
-/// hundred users on same document to suffer poll(2)'s
-/// scalability limit. Meanwhile, epoll(2)'s high
-/// overhead to adding/removing sockets is not helpful.
-template <typename T>
-class SocketPoll
-{
-public:
- SocketPoll()
- {
- // Create the wakeup fd.
- if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
- {
- // FIXME: Can't have wakeup pipe, should we exit?
- // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess.
- _wakeup[0] = -1;
- _wakeup[1] = -1;
- }
- }
-
- ~SocketPoll()
- {
- ::close(_wakeup[0]);
- ::close(_wakeup[1]);
- }
-
- /// Poll the sockets for available data to read or buffer to write.
- void poll(const int timeoutMs, const std::function<bool(const std::shared_ptr<T>&, const int)>& handler)
- {
- const size_t size = _pollSockets.size();
-
- // The events to poll on change each spin of the loop.
- setupPollFds();
-
- int rc;
- do
- {
- rc = ::poll(&_pollFds[0], size + 1, timeoutMs);
- }
- while (rc < 0 && errno == EINTR);
-
- // Fire the callback and remove dead fds.
- for (int i = static_cast<int>(size) - 1; i >= 0; --i)
- {
- if (_pollFds[i].revents)
- {
- if (!handler(_pollSockets[i], _pollFds[i].revents))
- {
- std::cout << "Removing: " << _pollFds[i].fd << std::endl;
- _pollSockets.erase(_pollSockets.begin() + i);
- // Don't remove from pollFds; we'll recreate below.
- }
- }
- }
-
- // Process the wakeup pipe (always the last entry).
- if (_pollFds[size].revents)
- {
- // Add new sockets first.
- addNewSocketsToPoll();
-
- // Clear the data.
- int dump;
- if (::read(_wakeup[0], &dump, sizeof(dump)) == -1)
- {
- // Nothing to do.
- }
- }
- }
-
- /// Insert a new socket to be polled.
- /// Sockets are removed only when the handler return false.
- void insertNewSocket(const std::shared_ptr<T>& newSocket)
- {
- std::lock_guard<std::mutex> lock(_mutex);
-
- _newSockets.emplace_back(newSocket);
-
- // wakeup the main-loop.
- if (::write(_wakeup[1], "w", 1) == -1)
- {
- // wakeup pipe is already full.
- assert(errno == EAGAIN || errno == EWOULDBLOCK);
- }
- }
-
-private:
-
- /// Add the new sockets to list of those to poll.
- void addNewSocketsToPoll()
- {
- std::lock_guard<std::mutex> lock(_mutex);
-
- // Copy the new sockets over and clear.
- _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
- _newSockets.clear();
- }
-
- void removeSocketFromPoll(const std::shared_ptr<T>& socket)
- {
- _pollSockets.erase(_pollSockets.find(socket));
- }
-
- /// Initialize the poll fds array with the right events
- void setupPollFds()
- {
- const size_t size = _pollSockets.size();
-
- _pollFds.resize(size + 1); // + wakeup pipe
-
- for (size_t i = 0; i < size; ++i)
- {
- _pollFds[i].fd = _pollSockets[i]->getFD();
- _pollFds[i].events = _pollSockets[i]->getPollEvents();
- _pollFds[i].revents = 0;
- }
-
- // Add the read-end of the wake pipe.
- _pollFds[size].fd = _wakeup[0];
- _pollFds[size].events = POLLIN;
- _pollFds[size].revents = 0;
- }
-
-private:
- /// main-loop wakeup pipe
- int _wakeup[2];
- /// The sockets we're controlling
- std::vector<std::shared_ptr<T>> _pollSockets;
- /// Protects _newSockets
- std::mutex _mutex;
- std::vector<std::shared_ptr<T>> _newSockets;
- /// The fds to poll.
- std::vector<pollfd> _pollFds;
-};
-
/// Generic thread class.
class Thread
{
@@ -229,10 +88,78 @@ private:
Poco::Net::SocketAddress addr("127.0.0.1", PortNumber);
-void server(SocketPoll<SimpleResponseClient>& poller)
+/// A non-blocking, streaming socket.
+class ServerSocket : public Socket
+{
+ SocketPoll& _clientPoller;
+public:
+ ServerSocket(SocketPoll& clientPoller)
+ : _clientPoller(clientPoller)
+ {
+ }
+
+ /// Binds to a local address (Servers only).
+ /// Does not retry on error.
+ /// Returns true on success only.
+ bool bind(const Poco::Net::SocketAddress& address)
+ {
+ // Enable address reuse to avoid stalling after
+ // recycling, when previous socket is TIME_WAIT.
+ //TODO: Might be worth refactoring out.
+ const int reuseAddress = 1;
+ constexpr unsigned int len = sizeof(reuseAddress);
+ ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
+
+ const int rc = ::bind(getFD(), address.addr(), address.length());
+ return (rc == 0);
+ }
+
+ /// Listen to incoming connections (Servers only).
+ /// Does not retry on error.
+ /// Returns true on success only.
+ bool listen(const int backlog = 64)
+ {
+ const int rc = ::listen(getFD(), backlog);
+ return (rc == 0);
+ }
+
+ /// Accepts an incoming connection (Servers only).
+ /// Does not retry on error.
+ /// Returns a valid Socket shared_ptr on success only.
+ template <typename T>
+ std::shared_ptr<T> accept()
+ {
+ // Accept a connection (if any) and set it to non-blocking.
+ // We don't care about the client's address, so ignored.
+ const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
+ return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
+ }
+
+ int getPollEvents() override
+ {
+ return POLLIN;
+ }
+
+ HandleResult handlePoll( int /* events */ ) override
+ {
+ std::shared_ptr<SimpleResponseClient> clientSocket = accept<SimpleResponseClient>();
+ if (!clientSocket)
+ {
+ const std::string msg = "Failed to accept. (errno: ";
+ throw std::runtime_error(msg + std::strerror(errno) + ")");
+ }
+
+ std::cout << "Accepted client #" << clientSocket->getFD() << std::endl;
+ _clientPoller.insertNewSocket(clientSocket);
+
+ return Socket::HandleResult::CONTINUE;
+ }
+};
+
+void server(SocketPoll& clientPoller)
{
// Start server.
- auto server = std::make_shared<ServerSocket>();
+ auto server = std::make_shared<ServerSocket>(clientPoller);
if (!server->bind(addr))
{
const std::string msg = "Failed to bind. (errno: ";
@@ -245,51 +172,30 @@ void server(SocketPoll<SimpleResponseClient>& poller)
throw std::runtime_error(msg + std::strerror(errno) + ")");
}
+ SocketPoll serverPoll;
+
+ serverPoll.insertNewSocket(server);
+
std::cout << "Listening." << std::endl;
for (;;)
{
- if (server->pollRead(30000))
- {
- std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>();
- if (!clientSocket)
- {
- const std::string msg = "Failed to accept. (errno: ";
- throw std::runtime_error(msg + std::strerror(errno) + ")");
- }
-
- std::cout << "Accepted client #" << clientSocket->getFD() << std::endl;
- poller.insertNewSocket(clientSocket);
- }
+ serverPoll.poll(30000);
}
}
/// Poll client sockets and do IO.
-void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop)
+void pollAndComm(SocketPoll& poller, std::atomic<bool>& stop)
{
while (!stop)
{
- poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events)
- {
- bool closeSocket = false;
-
- if (events & POLLIN)
- closeSocket = !socket->readIncomingData();
-
- if (events & POLLOUT)
- socket->writeOutgoingData();
-
- if (events & (POLLHUP | POLLERR | POLLNVAL))
- closeSocket = true;
-
- return !closeSocket;
- });
+ poller.poll(5000);
}
}
int main(int, const char**)
{
// Used to poll client sockets.
- SocketPoll<SimpleResponseClient> poller;
+ SocketPoll poller;
// Start the client polling thread.
Thread threadPoll([&poller](std::atomic<bool>& stop)
diff --git a/net/socket.hpp b/net/socket.hpp
index 17a3d9d..449f6e0 100644
--- a/net/socket.hpp
+++ b/net/socket.hpp
@@ -29,7 +29,7 @@ public:
{
}
- ~Socket()
+ virtual ~Socket()
{
//TODO: Should we shutdown here or up to the client?
@@ -40,6 +40,14 @@ public:
// Returns the OS native socket fd.
int getFD() const { return _fd; }
+ /// Return a mask of events we should be polling for
+ virtual int getPollEvents() = 0;
+
+ /// Handle results of events returned from poll
+ enum HandleResult { CONTINUE, SOCKET_CLOSED };
+ virtual HandleResult handlePoll( int events ) = 0;
+
+
/// Sets the send buffer in size bytes.
/// Must be called before accept or connect.
/// Note: TCP will allocate twice this size for admin purposes,
@@ -106,88 +114,164 @@ public:
return rc;
}
- /// Poll the socket for either read, write, or both.
- /// Returns -1 on failure/error (query socket error), 0 for timeout,
- /// otherwise, depending on events, the respective bits set.
- int poll(const int timeoutMs, const int events = POLLIN | POLLOUT)
+protected:
+
+ /// Construct based on an existing socket fd.
+ /// Used by accept() only.
+ Socket(const int fd) :
+ _fd(fd)
{
- // Use poll(2) as it has lower overhead for up to
- // a few hundred sockets compared to epoll(2).
- // Also it has a more intuitive API and portable.
- pollfd pollfd;
- memset(&pollfd, 0, sizeof(pollfd));
+ }
- pollfd.fd = getFD();
- pollfd.events |= events;
+private:
+ const int _fd;
+};
- int rc;
- do
+
+/// Handles non-blocking socket event polling.
+/// Only polls on N-Sockets and invokes callback and
+/// doesn't manage buffers or client data.
+/// Note: uses poll(2) since it has very good performance
+/// compared to epoll up to a few hundred sockets and
+/// doesn't suffer select(2)'s poor API. Since this will
+/// be used per-document we don't expect to have several
+/// hundred users on same document to suffer poll(2)'s
+/// scalability limit. Meanwhile, epoll(2)'s high
+/// overhead to adding/removing sockets is not helpful.
+class SocketPoll
+{
+public:
+ SocketPoll()
+ {
+ // Create the wakeup fd.
+ if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
{
- // Technically, on retrying we should wait
- // the _remaining_ time, alas simplicity wins.
- rc = ::poll(&pollfd, 1, timeoutMs);
+ // FIXME: Can't have wakeup pipe, should we exit?
+ // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess.
+ _wakeup[0] = -1;
+ _wakeup[1] = -1;
}
- while (rc < 0 && errno == EINTR);
+ }
+
+ ~SocketPoll()
+ {
+ ::close(_wakeup[0]);
+ ::close(_wakeup[1]);
+ }
+
+ /// Poll the sockets for available data to read or buffer to write.
+ void poll(const int timeoutMs)
+ {
+ const size_t size = _pollSockets.size();
- if (rc <= 0)
+ // The events to poll on change each spin of the loop.
+ setupPollFds();
+
+ int rc;
+ do
{
- return rc;
+ rc = ::poll(&_pollFds[0], size + 1, timeoutMs);
}
+ while (rc < 0 && errno == EINTR);
- int revents = 0;
- if (rc == 1)
+ // Fire the callback and remove dead fds.
+ for (int i = static_cast<int>(size) - 1; i >= 0; --i)
{
- if (pollfd.revents & (POLLERR|POLLHUP|POLLNVAL))
+ if (_pollFds[i].revents)
{
- // Probe socket for error.
- return -1;
+ if (_pollSockets[i]->handlePoll(_pollFds[i].revents) ==
+ Socket::HandleResult::SOCKET_CLOSED)
+ {
+ std::cout << "Removing: " << _pollFds[i].fd << std::endl;
+ _pollSockets.erase(_pollSockets.begin() + i);
+ // Don't remove from pollFds; we'll recreate below.
+ }
}
+ }
- if (pollfd.revents & (POLLIN|POLLPRI))
- {
- // Data ready to read.
- revents |= POLLIN;
- }
+ // Process the wakeup pipe (always the last entry).
+ if (_pollFds[size].revents)
+ {
+ // Add new sockets first.
+ addNewSocketsToPoll();
- if (pollfd.revents & POLLOUT)
+ // Clear the data.
+ int dump;
+ if (::read(_wakeup[0], &dump, sizeof(dump)) == -1)
{
- // Ready for write.
- revents |= POLLOUT;
+ // Nothing to do.
}
}
-
- return revents;
}
- /// Poll the socket for readability.
- /// Returns true when there is data to read, otherwise false.
- bool pollRead(const int timeoutMs)
+ /// Insert a new socket to be polled.
+ /// Sockets are removed only when the handler return false.
+ void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
{
- const int rc = poll(timeoutMs, POLLIN);
- return (rc > 0 && (rc & POLLIN));
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ _newSockets.emplace_back(newSocket);
+
+ // wakeup the main-loop.
+ if (::write(_wakeup[1], "w", 1) == -1)
+ {
+ // wakeup pipe is already full.
+ assert(errno == EAGAIN || errno == EWOULDBLOCK);
+ }
}
- /// Poll the socket for writability.
- /// Returns true when socket is ready for writing, otherwise false.
- bool pollWrite(const int timeoutMs)
+private:
+
+ /// Add the new sockets to list of those to poll.
+ void addNewSocketsToPoll()
{
- const int rc = poll(timeoutMs, POLLOUT);
- return (rc > 0 && (rc & POLLOUT));
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ // Copy the new sockets over and clear.
+ _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
+ _newSockets.clear();
}
-protected:
+ void removeSocketFromPoll(const std::shared_ptr<Socket>& socket)
+ {
+ auto it = std::find(_pollSockets.begin(), _pollSockets.end(), socket);
+ assert (it != _pollSockets.end());
+ _pollSockets.erase(it);
+ }
- /// Construct based on an existing socket fd.
- /// Used by accept() only.
- Socket(const int fd) :
- _fd(fd)
+ /// Initialize the poll fds array with the right events
+ void setupPollFds()
{
+ const size_t size = _pollSockets.size();
+
+ _pollFds.resize(size + 1); // + wakeup pipe
+
+ for (size_t i = 0; i < size; ++i)
+ {
+ _pollFds[i].fd = _pollSockets[i]->getFD();
+ _pollFds[i].events = _pollSockets[i]->getPollEvents();
+ _pollFds[i].revents = 0;
+ }
+
+ // Add the read-end of the wake pipe.
+ _pollFds[size].fd = _wakeup[0];
+ _pollFds[size].events = POLLIN;
+ _pollFds[size].revents = 0;
}
private:
- const int _fd;
+ /// main-loop wakeup pipe
+ int _wakeup[2];
+ /// The sockets we're controlling
+ std::vector<std::shared_ptr<Socket>> _pollSockets;
+ /// Protects _newSockets
+ std::mutex _mutex;
+ std::vector<std::shared_ptr<Socket>> _newSockets;
+ /// The fds to poll.
+ std::vector<pollfd> _pollFds;
};
+
/// A non-blocking, client socket.
class ClientSocket : public Socket
{
@@ -197,36 +281,28 @@ public:
{
}
- /// Connect to a server address.
- /// Does not retry on error.
- /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever.
- /// Returns true on success only.
- /// Note: when succceeds, caller must check for
- /// EINPROGRESS and poll for write, then getError(),
- /// only when the latter returns 0 we are connected.
- bool connect(const Poco::Net::SocketAddress& address, const int timeoutMs = 0)
+ protected:
+ std::vector< unsigned char > _inBuffer;
+ std::vector< unsigned char > _outBuffer;
+ public:
+
+ HandleResult handlePoll( int events ) override
{
- const int rc = ::connect(getFD(), address.addr(), address.length());
- if (rc == 0)
- {
- return true;
- }
+ bool closeSocket = false;
- if (errno != EINPROGRESS)
- {
- return false;
- }
+ if (events & POLLIN)
+ closeSocket = !readIncomingData();
- // Wait for writable, then check again.
- pollWrite(timeoutMs);
+ if (events & POLLOUT)
+ writeOutgoingData();
- // Now check if we connected, not, or not yet.
- return (getError() == 0 || errno == EINPROGRESS);
+ if (events & (POLLHUP | POLLERR | POLLNVAL))
+ closeSocket = true;
+
+ return closeSocket ? HandleResult::SOCKET_CLOSED :
+ HandleResult::CONTINUE;
}
- protected:
- std::vector< unsigned char > _inBuffer;
- std::vector< unsigned char > _outBuffer;
- public:
+
bool readIncomingData()
{
ssize_t len;
@@ -260,7 +336,7 @@ public:
// else poll will handle errors
}
- int getPollEvents()
+ int getPollEvents() override
{
int pollFor = POLLIN;
if (_outBuffer.size() > 0)
@@ -279,47 +355,4 @@ protected:
friend class ServerSocket;
};
-/// A non-blocking, streaming socket.
-class ServerSocket : public Socket
-{
-public:
-
- /// Binds to a local address (Servers only).
- /// Does not retry on error.
- /// Returns true on success only.
- bool bind(const Poco::Net::SocketAddress& address)
- {
- // Enable address reuse to avoid stalling after
- // recycling, when previous socket is TIME_WAIT.
- //TODO: Might be worth refactoring out.
- const int reuseAddress = 1;
- constexpr unsigned int len = sizeof(reuseAddress);
- ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
-
- const int rc = ::bind(getFD(), address.addr(), address.length());
- return (rc == 0);
- }
-
- /// Listen to incoming connections (Servers only).
- /// Does not retry on error.
- /// Returns true on success only.
- bool listen(const int backlog = 64)
- {
- const int rc = ::listen(getFD(), backlog);
- return (rc == 0);
- }
-
- /// Accepts an incoming connection (Servers only).
- /// Does not retry on error.
- /// Returns a valid Socket shared_ptr on success only.
- template <typename T>
- std::shared_ptr<T> accept()
- {
- // Accept a connection (if any) and set it to non-blocking.
- // We don't care about the client's address, so ignored.
- const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
- return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
- }
-};
-
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
More information about the Libreoffice-commits
mailing list