[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Tue Feb 14 03:21:00 UTC 2017
net/loolnb.cpp | 129 ++++++++++++++++++++++++++++++---------------------------
1 file changed, 70 insertions(+), 59 deletions(-)
New commits:
commit 404c1fab379650bf9cde78ae1398c5800bb232e9
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Mon Feb 13 19:40:41 2017 -0500
nb: poll wakeup pipe and simplified polling
Change-Id: I2e688b985d4a9bf7cbe8eef5df10f67bfc96f91c
Reviewed-on: https://gerrit.libreoffice.org/34233
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index e7ce4f0..78e61b1 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -306,91 +306,102 @@ class SocketPoll
public:
SocketPoll()
{
+ // Create the wakeup fd.
+ if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
+ {
+ //FIXME: Can't have wakeup pipe, should we exit?
+ _wakeup[0] = -1;
+ _wakeup[1] = -1;
+ }
}
- void add(const std::shared_ptr<T>& socket)
- {
- const int fd = socket->fd();
- pollfd poll;
- memset(&poll, 0, sizeof(pollfd));
- poll.fd = fd;
- poll.events = (POLLIN | POLLOUT);
-
- std::lock_guard<std::mutex> lock(_mutex);
-
- _sockets.emplace(fd, socket);
- _pollDesc.emplace_back(poll);
- }
-
- void remove(const std::shared_ptr<T>& socket)
+ /// Poll the sockets for available data to read or buffer to write.
+ void poll(const int timeoutMs, const std::function<bool(const std::shared_ptr<T>&, const int)>& handler)
{
- const int fd = socket->fd();
+ const size_t size = _pollSockets.size();
+ std::vector<pollfd> pollFds(size + 1); // + wakeup fd
- std::lock_guard<std::mutex> lock(_mutex);
+ for (size_t i = 0; i < size; ++i)
+ {
+ pollFds[i].fd = _pollSockets[i]->fd();
+ pollFds[i].events = POLLIN | POLLOUT; //TODO: Get from the socket.
+ pollFds[i].revents = 0;
+ }
- // Hold reference to socket so it doesn't
- // close while we are in poll(2).
- _socketsDead.emplace(fd, socket);
- _sockets.erase(fd);
- }
+ // Add the read-end of the wake pipe.
+ pollFds[size].fd = _wakeup[0];
+ pollFds[size].events = POLLIN;
+ pollFds[size].revents = 0;
- /// Poll the sockets for available data to read or buffer to write.
- void poll(const std::function<bool(const std::shared_ptr<T>&, const int)>& handler)
- {
int rc;
do
{
- // See note in class doc.
- rc = ::poll(&_pollDesc[0], _pollDesc.size(), 0);
+ rc = ::poll(&pollFds[0], pollFds.size(), timeoutMs);
}
while (rc < 0 && errno == EINTR);
- for (const pollfd& poll : _pollDesc)
+ // Fire the callback and remove dead fds.
+ for (int i = static_cast<int>(size) - 1; i >= 0; --i)
{
- if (poll.revents)
+ if (pollFds[i].revents)
{
- std::lock_guard<std::mutex> lock(_mutex);
- const auto it = _sockets.find(poll.fd);
- if (it != _sockets.end() && it->second)
+ if (!handler(_pollSockets[i], pollFds[i].revents))
{
- if (!handler(it->second, poll.revents))
- {
- std::cout << "Removing: " << poll.fd << std::endl;
- _socketsDead.emplace(poll.fd, it->second);
- _sockets.erase(poll.fd);
- }
+ std::cout << "Removing: " << pollFds[i].fd << std::endl;
+ _pollSockets.erase(_pollSockets.begin() + i);
}
}
}
- // Now clear the dead sockets to close/free them.
- std::lock_guard<std::mutex> lock(_mutex);
-
- // Remove the pollfd of these sockets as well.
- size_t size = 0;
- for (size_t i = 0; i < _pollDesc.size(); ++i)
+ if (pollFds[size].revents)
{
- const auto it = _socketsDead.find(_pollDesc[i].fd);
- if (it != _socketsDead.end())
- {
- // Move to the end.
- std::swap(_pollDesc[i], _pollDesc[_pollDesc.size() - 1]);
- }
- else
+ // Process new sockets first.
+ addNewSocketsToPoll();
+
+ // Clear the data.
+ int dump;
+ if (::read(_wakeup[0], &dump, sizeof(4)) == -1)
{
- ++size;
+ // Nothing to do.
}
}
+ }
+
+ /// Insert a new socket to be polled.
+ /// Sockets are removed only when the handler return false.
+ void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ _newSockets.emplace_back(newSocket);
+
+ // wakeup the main-loop.
+ if (::write(_wakeup[1], "w", 1) == -1)
+ {
+ // No wake up then.
+ }
+ }
+
+private:
+
+ /// Add the new sockets to list of those to poll.
+ void addNewSocketsToPoll()
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
- _pollDesc.resize(size);
- _socketsDead.clear();
+ // Copy the new sockets over and clear.
+ _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
+ _newSockets.clear();
}
private:
+ /// main-loop wakeup pipe
+ int _wakeup[2];
+ /// The sockets we're controlling
+ std::vector<std::shared_ptr<Socket>> _pollSockets;
+ /// Protects _newSockets
std::mutex _mutex;
- std::map<int, std::shared_ptr<T>> _sockets;
- std::map<int, std::shared_ptr<T>> _socketsDead;
- std::vector<pollfd> _pollDesc;
+ std::vector<std::shared_ptr<Socket>> _newSockets;
};
/// Generic thread class.
@@ -482,7 +493,7 @@ int main(int argc, const char**)
{
while (!stop)
{
- poller.poll([](const std::shared_ptr<Socket>& socket, const int events)
+ poller.poll(5000, [](const std::shared_ptr<Socket>& socket, const int events)
{
if (events & POLLIN)
{
@@ -550,7 +561,7 @@ int main(int argc, const char**)
}
std::cout << "Accepted client #" << clientSocket->fd() << std::endl;
- poller.add(clientSocket);
+ poller.insertNewSocket(clientSocket);
}
}
More information about the Libreoffice-commits
mailing list