[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/Socket.hpp

Michael Meeks michael.meeks at collabora.com
Fri Feb 24 23:14:50 UTC 2017


 net/Socket.hpp |   64 +++++++++++++++++++++++++++++++++------------------------
 1 file changed, 38 insertions(+), 26 deletions(-)

New commits:
commit 6a34272fdd10b2c4a5ba2ad6c7fb8a9e56e3dc6a
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Fri Feb 24 23:14:09 2017 +0000

    Add thread-safe cross-thread callbacks, split out wakeup and simplify

diff --git a/net/Socket.hpp b/net/Socket.hpp
index d78e3c9..4b49b08 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -217,49 +217,60 @@ public:
         // Process the wakeup pipe (always the last entry).
         if (_pollFds[size].revents)
         {
-            // Add new sockets first.
-            addNewSocketsToPoll();
+            std::vector<CallbackFn> invoke;
+            {
+                std::lock_guard<std::mutex> lock(_mutex);
+
+                // Clear the data.
+                int dump = ::read(_wakeup[0], &dump, sizeof(dump));
 
-            // Clear the data.
-            int dump = ::read(_wakeup[0], &dump, sizeof(dump));
+                // Copy the new sockets over and clear.
+                _pollSockets.insert(_pollSockets.end(),
+                                    _newSockets.begin(), _newSockets.end());
+                _newSockets.clear();
+
+                // Extract list of callbacks to process
+                std::swap(_newCallbacks, invoke);
+            }
+
+            for (size_t i = 0; i < invoke.size(); ++i)
+                invoke[i]();
         }
     }
 
+    /// Wakeup the main polling loop in another thread
+    void wakeup()
+    {
+        // wakeup the main-loop.
+        int rc;
+        do {
+            rc = ::write(_wakeup[1], "w", 1);
+        } while (rc == -1 && errno == EINTR);
+
+        assert (rc != -1 || errno == EAGAIN || errno == EWOULDBLOCK);
+    }
+
     /// Insert a new socket to be polled.
     /// Sockets are removed only when the handler return false.
     void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
     {
         std::lock_guard<std::mutex> lock(_mutex);
-
         _newSockets.emplace_back(newSocket);
-
-        // wakeup the main-loop.
-        int rc;
-        do
-        {
-            // wakeup pipe is already full.
-            rc = ::write(_wakeup[1], "w", 1);
-        }
-        while (rc == -1 && errno == EINTR);
-
-        if (rc == -1)
-        {
-            assert(errno == EAGAIN || errno == EWOULDBLOCK);
-        }
+        wakeup();
     }
 
-private:
+    typedef std::function<void()> CallbackFn;
 
-    /// Add the new sockets to list of those to poll.
-    void addNewSocketsToPoll()
+    /// Add a callback to be invoked in the polling thread
+    void addCallback(CallbackFn fn)
     {
         std::lock_guard<std::mutex> lock(_mutex);
-
-        // Copy the new sockets over and clear.
-        _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
-        _newSockets.clear();
+        _newCallbacks.emplace_back(fn);
+        wakeup();
     }
 
+private:
+
     void removeSocketFromPoll(const std::shared_ptr<Socket>& socket)
     {
         auto it = std::find(_pollSockets.begin(), _pollSockets.end(), socket);
@@ -296,6 +307,7 @@ private:
     /// Protects _newSockets
     std::mutex _mutex;
     std::vector<std::shared_ptr<Socket>> _newSockets;
+    std::vector<CallbackFn> _newCallbacks;
     /// The fds to poll.
     std::vector<pollfd> _pollFds;
 };


More information about the Libreoffice-commits mailing list