[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Tue Feb 14 03:21:00 UTC 2017


 net/loolnb.cpp |  129 ++++++++++++++++++++++++++++++---------------------------
 1 file changed, 70 insertions(+), 59 deletions(-)

New commits:
commit 404c1fab379650bf9cde78ae1398c5800bb232e9
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Mon Feb 13 19:40:41 2017 -0500

    nb: poll wakeup pipe and simplified polling
    
    Change-Id: I2e688b985d4a9bf7cbe8eef5df10f67bfc96f91c
    Reviewed-on: https://gerrit.libreoffice.org/34233
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index e7ce4f0..78e61b1 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -306,91 +306,102 @@ class SocketPoll
 public:
     SocketPoll()
     {
+        // Create the wakeup fd.
+        if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
+        {
+            //FIXME: Can't have wakeup pipe, should we exit?
+            _wakeup[0] = -1;
+            _wakeup[1] = -1;
+        }
     }
 
-    void add(const std::shared_ptr<T>& socket)
-    {
-        const int fd = socket->fd();
-        pollfd poll;
-        memset(&poll, 0, sizeof(pollfd));
-        poll.fd = fd;
-        poll.events = (POLLIN | POLLOUT);
-
-        std::lock_guard<std::mutex> lock(_mutex);
-
-        _sockets.emplace(fd, socket);
-        _pollDesc.emplace_back(poll);
-    }
-
-    void remove(const std::shared_ptr<T>& socket)
+    /// Poll the sockets for available data to read or buffer to write.
+    void poll(const int timeoutMs, const std::function<bool(const std::shared_ptr<T>&, const int)>& handler)
     {
-        const int fd = socket->fd();
+        const size_t size = _pollSockets.size();
+        std::vector<pollfd> pollFds(size + 1); // + wakeup fd
 
-        std::lock_guard<std::mutex> lock(_mutex);
+        for (size_t i = 0; i < size; ++i)
+        {
+            pollFds[i].fd = _pollSockets[i]->fd();
+            pollFds[i].events = POLLIN | POLLOUT; //TODO: Get from the socket.
+            pollFds[i].revents = 0;
+        }
 
-        // Hold reference to socket so it doesn't
-        // close while we are in poll(2).
-        _socketsDead.emplace(fd, socket);
-        _sockets.erase(fd);
-    }
+        // Add the read-end of the wake pipe.
+        pollFds[size].fd = _wakeup[0];
+        pollFds[size].events = POLLIN;
+        pollFds[size].revents = 0;
 
-    /// Poll the sockets for available data to read or buffer to write.
-    void poll(const std::function<bool(const std::shared_ptr<T>&, const int)>& handler)
-    {
         int rc;
         do
         {
-            // See note in class doc.
-            rc = ::poll(&_pollDesc[0], _pollDesc.size(), 0);
+            rc = ::poll(&pollFds[0], pollFds.size(), timeoutMs);
         }
         while (rc < 0 && errno == EINTR);
 
-        for (const pollfd& poll : _pollDesc)
+        // Fire the callback and remove dead fds.
+        for (int i = static_cast<int>(size) - 1; i >= 0; --i)
         {
-            if (poll.revents)
+            if (pollFds[i].revents)
             {
-                std::lock_guard<std::mutex> lock(_mutex);
-                const auto it = _sockets.find(poll.fd);
-                if (it != _sockets.end() && it->second)
+                if (!handler(_pollSockets[i], pollFds[i].revents))
                 {
-                    if (!handler(it->second, poll.revents))
-                    {
-                        std::cout << "Removing: " << poll.fd << std::endl;
-                        _socketsDead.emplace(poll.fd, it->second);
-                        _sockets.erase(poll.fd);
-                    }
+                    std::cout << "Removing: " << pollFds[i].fd << std::endl;
+                    _pollSockets.erase(_pollSockets.begin() + i);
                 }
             }
         }
 
-        // Now clear the dead sockets to close/free them.
-        std::lock_guard<std::mutex> lock(_mutex);
-
-        // Remove the pollfd of these sockets as well.
-        size_t size = 0;
-        for (size_t i = 0; i < _pollDesc.size(); ++i)
+        if (pollFds[size].revents)
         {
-            const auto it = _socketsDead.find(_pollDesc[i].fd);
-            if (it != _socketsDead.end())
-            {
-                // Move to the end.
-                std::swap(_pollDesc[i], _pollDesc[_pollDesc.size() - 1]);
-            }
-            else
+            // Process new sockets first.
+            addNewSocketsToPoll();
+
+            // Clear the data.
+            int dump;
+            if (::read(_wakeup[0], &dump, sizeof(4)) == -1)
             {
-                ++size;
+                // Nothing to do.
             }
         }
+    }
+
+    /// Insert a new socket to be polled.
+    /// Sockets are removed only when the handler return false.
+    void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
+
+        _newSockets.emplace_back(newSocket);
+
+        // wakeup the main-loop.
+        if (::write(_wakeup[1], "w", 1) == -1)
+        {
+            // No wake up then.
+        }
+    }
+
+private:
+
+    /// Add the new sockets to list of those to poll.
+    void addNewSocketsToPoll()
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
 
-        _pollDesc.resize(size);
-        _socketsDead.clear();
+        // Copy the new sockets over and clear.
+        _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
+        _newSockets.clear();
     }
 
 private:
+    /// main-loop wakeup pipe
+    int _wakeup[2];
+    /// The sockets we're controlling
+    std::vector<std::shared_ptr<Socket>> _pollSockets;
+    /// Protects _newSockets
     std::mutex _mutex;
-    std::map<int, std::shared_ptr<T>> _sockets;
-    std::map<int, std::shared_ptr<T>> _socketsDead;
-    std::vector<pollfd> _pollDesc;
+    std::vector<std::shared_ptr<Socket>> _newSockets;
 };
 
 /// Generic thread class.
@@ -482,7 +493,7 @@ int main(int argc, const char**)
     {
         while (!stop)
         {
-            poller.poll([](const std::shared_ptr<Socket>& socket, const int events)
+            poller.poll(5000, [](const std::shared_ptr<Socket>& socket, const int events)
             {
                 if (events & POLLIN)
                 {
@@ -550,7 +561,7 @@ int main(int argc, const char**)
             }
 
             std::cout << "Accepted client #" << clientSocket->fd() << std::endl;
-            poller.add(clientSocket);
+            poller.insertNewSocket(clientSocket);
         }
     }
 


More information about the Libreoffice-commits mailing list