[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/Socket.hpp
Michael Meeks
michael.meeks at collabora.com
Fri Feb 24 23:14:50 UTC 2017
net/Socket.hpp | 64 +++++++++++++++++++++++++++++++++------------------------
1 file changed, 38 insertions(+), 26 deletions(-)
New commits:
commit 6a34272fdd10b2c4a5ba2ad6c7fb8a9e56e3dc6a
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Fri Feb 24 23:14:09 2017 +0000
Add thread-safe cross-thread callbacks, split out wakeup and simplify
diff --git a/net/Socket.hpp b/net/Socket.hpp
index d78e3c9..4b49b08 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -217,49 +217,60 @@ public:
// Process the wakeup pipe (always the last entry).
if (_pollFds[size].revents)
{
- // Add new sockets first.
- addNewSocketsToPoll();
+ std::vector<CallbackFn> invoke;
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ // Clear the data.
+ int dump = ::read(_wakeup[0], &dump, sizeof(dump));
- // Clear the data.
- int dump = ::read(_wakeup[0], &dump, sizeof(dump));
+ // Copy the new sockets over and clear.
+ _pollSockets.insert(_pollSockets.end(),
+ _newSockets.begin(), _newSockets.end());
+ _newSockets.clear();
+
+ // Extract list of callbacks to process
+ std::swap(_newCallbacks, invoke);
+ }
+
+ for (size_t i = 0; i < invoke.size(); ++i)
+ invoke[i]();
}
}
+ /// Wakeup the main polling loop in another thread
+ void wakeup()
+ {
+ // wakeup the main-loop.
+ int rc;
+ do {
+ rc = ::write(_wakeup[1], "w", 1);
+ } while (rc == -1 && errno == EINTR);
+
+ assert (rc != -1 || errno == EAGAIN || errno == EWOULDBLOCK);
+ }
+
/// Insert a new socket to be polled.
/// Sockets are removed only when the handler return false.
void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
{
std::lock_guard<std::mutex> lock(_mutex);
-
_newSockets.emplace_back(newSocket);
-
- // wakeup the main-loop.
- int rc;
- do
- {
- // wakeup pipe is already full.
- rc = ::write(_wakeup[1], "w", 1);
- }
- while (rc == -1 && errno == EINTR);
-
- if (rc == -1)
- {
- assert(errno == EAGAIN || errno == EWOULDBLOCK);
- }
+ wakeup();
}
-private:
+ typedef std::function<void()> CallbackFn;
- /// Add the new sockets to list of those to poll.
- void addNewSocketsToPoll()
+ /// Add a callback to be invoked in the polling thread
+ void addCallback(CallbackFn fn)
{
std::lock_guard<std::mutex> lock(_mutex);
-
- // Copy the new sockets over and clear.
- _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
- _newSockets.clear();
+ _newCallbacks.emplace_back(fn);
+ wakeup();
}
+private:
+
void removeSocketFromPoll(const std::shared_ptr<Socket>& socket)
{
auto it = std::find(_pollSockets.begin(), _pollSockets.end(), socket);
@@ -296,6 +307,7 @@ private:
/// Protects _newSockets
std::mutex _mutex;
std::vector<std::shared_ptr<Socket>> _newSockets;
+ std::vector<CallbackFn> _newCallbacks;
/// The fds to poll.
std::vector<pollfd> _pollFds;
};
More information about the Libreoffice-commits
mailing list