[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp net/socket.hpp
Michael Meeks
michael.meeks at collabora.com
Tue Feb 14 11:29:51 UTC 2017
net/loolnb.cpp | 38 ++++++++++++++++++--------------------
net/socket.hpp | 32 ++++++++++++++++----------------
2 files changed, 34 insertions(+), 36 deletions(-)
New commits:
commit 3f64e2b0b098e8bb88515875f79d547688752bbb
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Tue Feb 14 11:10:52 2017 +0000
Cleanup comments, naming related warnings etc.
Avoid using 'poll' as a member function, and a local variable.
Avoid using 'fd' as a member function, and a parameter.
Add assertions around wake pipe.
Always setup sockets for polling, strobing events is expected.
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 94404b2..952c44a 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -16,6 +16,7 @@
#include <iostream>
#include <mutex>
#include <thread>
+#include <assert.h>
#include <Poco/Net/SocketAddress.h>
@@ -42,12 +43,11 @@ public:
// Create the wakeup fd.
if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
{
- //FIXME: Can't have wakeup pipe, should we exit?
+ // FIXME: Can't have wakeup pipe, should we exit?
+ // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess.
_wakeup[0] = -1;
_wakeup[1] = -1;
}
-
- createPollFds();
}
~SocketPoll()
@@ -61,6 +61,9 @@ public:
{
const size_t size = _pollSockets.size();
+ // The events to poll on change each spin of the loop.
+ setupPollFds();
+
int rc;
do
{
@@ -88,20 +91,13 @@ public:
// Add new sockets first.
addNewSocketsToPoll();
- // Recreate the poll fds array.
- createPollFds();
-
// Clear the data.
int dump;
- if (::read(_wakeup[0], &dump, sizeof(4)) == -1)
+ if (::read(_wakeup[0], &dump, sizeof(dump)) == -1)
{
// Nothing to do.
}
}
- else if (_pollFds.size() != (_pollSockets.size() + 1))
- {
- createPollFds();
- }
}
/// Insert a new socket to be polled.
@@ -115,7 +111,8 @@ public:
// wakeup the main-loop.
if (::write(_wakeup[1], "w", 1) == -1)
{
- // No wake up then.
+ // wakeup pipe is already full.
+ assert(errno == EAGAIN || errno == EWOULDBLOCK);
}
}
@@ -131,8 +128,8 @@ private:
_newSockets.clear();
}
- /// Create the poll fds array.
- void createPollFds()
+ /// Initialize the poll fds array with the right events
+ void setupPollFds()
{
const size_t size = _pollSockets.size();
@@ -140,8 +137,9 @@ private:
for (size_t i = 0; i < size; ++i)
{
- _pollFds[i].fd = _pollSockets[i]->fd();
- _pollFds[i].events = POLLIN | POLLOUT; //TODO: Get from the socket.
+ _pollFds[i].fd = _pollSockets[i]->getFD();
+ //TODO: Get from the socket.
+ _pollFds[i].events = POLLIN | POLLOUT;
_pollFds[i].revents = 0;
}
@@ -208,7 +206,7 @@ void client(const int timeoutMs)
throw std::runtime_error(msg + std::strerror(errno) + ")");
}
- std::cout << "Connected " << client->fd() << std::endl;
+ std::cout << "Connected " << client->getFD() << std::endl;
client->send("1", 1);
int sent = 1;
@@ -259,7 +257,7 @@ void server(SocketPoll<ClientSocket>& poller)
throw std::runtime_error(msg + std::strerror(errno) + ")");
}
- std::cout << "Accepted client #" << clientSocket->fd() << std::endl;
+ std::cout << "Accepted client #" << clientSocket->getFD() << std::endl;
poller.insertNewSocket(clientSocket);
}
}
@@ -288,7 +286,7 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
const int num = stoi(msg);
if ((num % (1<<16)) == 1)
{
- std::cout << "Client #" << socket->fd() << ": " << msg << std::endl;
+ std::cout << "Client #" << socket->getFD() << ": " << msg << std::endl;
}
const std::string new_msg = std::to_string(num + 1);
const int sent = socket->send(new_msg.data(), new_msg.size());
@@ -301,7 +299,7 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
else
{
// Normally we'd buffer the response, but for now...
- std::cerr << "Client #" << socket->fd()
+ std::cerr << "Client #" << socket->getFD()
<< ": ERROR - socket not ready for write." << std::endl;
}
}
diff --git a/net/socket.hpp b/net/socket.hpp
index b2891a3..83202b4 100644
--- a/net/socket.hpp
+++ b/net/socket.hpp
@@ -38,7 +38,7 @@ public:
}
// Returns the OS native socket fd.
- int fd() const { return _fd; }
+ int getFD() const { return _fd; }
/// Sets the send buffer in size bytes.
/// Must be called before accept or connect.
@@ -114,18 +114,18 @@ public:
// Use poll(2) as it has lower overhead for up to
// a few hundred sockets compared to epoll(2).
// Also it has a more intuitive API and portable.
- pollfd poll;
- memset(&poll, 0, sizeof(poll));
+ pollfd pollfd;
+ memset(&pollfd, 0, sizeof(pollfd));
- poll.fd = fd();
- poll.events |= events;
+ pollfd.fd = getFD();
+ pollfd.events |= events;
int rc;
do
{
// Technically, on retrying we should wait
// the _remaining_ time, alas simplicity wins.
- rc = ::poll(&poll, 1, timeoutMs);
+ rc = ::poll(&pollfd, 1, timeoutMs);
}
while (rc < 0 && errno == EINTR);
@@ -137,19 +137,19 @@ public:
int revents = 0;
if (rc == 1)
{
- if (poll.revents & (POLLERR|POLLHUP|POLLNVAL))
+ if (pollfd.revents & (POLLERR|POLLHUP|POLLNVAL))
{
// Probe socket for error.
return -1;
}
- if (poll.revents & (POLLIN|POLLPRI))
+ if (pollfd.revents & (POLLIN|POLLPRI))
{
// Data ready to read.
revents |= POLLIN;
}
- if (poll.revents & POLLOUT)
+ if (pollfd.revents & POLLOUT)
{
// Ready for write.
revents |= POLLOUT;
@@ -206,7 +206,7 @@ public:
/// only when the latter returns 0 we are connected.
bool connect(const Poco::Net::SocketAddress& address, const int timeoutMs = 0)
{
- const int rc = ::connect(fd(), address.addr(), address.length());
+ const int rc = ::connect(getFD(), address.addr(), address.length());
if (rc == 0)
{
return true;
@@ -229,7 +229,7 @@ public:
int send(const void* buf, const size_t len)
{
// Don't SIGPIPE when the other end closes.
- const int rc = ::send(fd(), buf, len, MSG_NOSIGNAL);
+ const int rc = ::send(getFD(), buf, len, MSG_NOSIGNAL);
return rc;
}
@@ -238,7 +238,7 @@ public:
/// and 0 when the peer has performed an orderly shutdown.
int recv(void* buf, const size_t len)
{
- const int rc = ::recv(fd(), buf, len, 0);
+ const int rc = ::recv(getFD(), buf, len, 0);
return rc;
}
@@ -266,9 +266,9 @@ public:
//TODO: Might be worth refactoring out.
const int reuseAddress = 1;
constexpr unsigned int len = sizeof(reuseAddress);
- ::setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
+ ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
- const int rc = ::bind(fd(), address.addr(), address.length());
+ const int rc = ::bind(getFD(), address.addr(), address.length());
return (rc == 0);
}
@@ -277,7 +277,7 @@ public:
/// Returns true on success only.
bool listen(const int backlog = 64)
{
- const int rc = ::listen(fd(), backlog);
+ const int rc = ::listen(getFD(), backlog);
return (rc == 0);
}
@@ -288,7 +288,7 @@ public:
{
// Accept a connection (if any) and set it to non-blocking.
// We don't care about the client's address, so ignored.
- const int rc = ::accept4(fd(), nullptr, nullptr, SOCK_NONBLOCK);
+ const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
}
};
More information about the Libreoffice-commits
mailing list