[Libreoffice-commits] online.git: wsd/SenderQueue.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Dec 12 05:22:49 UTC 2016
wsd/SenderQueue.cpp | 73 ++++++++++++++++++++++++++++++++++
wsd/SenderQueue.hpp | 110 ++++++++++++++++++++++++++++++++++++++++++++--------
wsd/TileCache.cpp | 2
3 files changed, 166 insertions(+), 19 deletions(-)
New commits:
commit f8d3576556b8ad56dad1f9853b961d9e54430de5
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Dec 11 11:59:46 2016 -0500
loolwsd: Sender Thread Pool
Change-Id: I36a0ad376dad1e6b0733ebfa930baf1dd5752a8c
Reviewed-on: https://gerrit.libreoffice.org/31885
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/wsd/SenderQueue.cpp b/wsd/SenderQueue.cpp
index 6a81758..f68255e 100644
--- a/wsd/SenderQueue.cpp
+++ b/wsd/SenderQueue.cpp
@@ -15,8 +15,9 @@
#include <Log.hpp>
SenderQueue SenderQueue::TheQueue;
+SenderThreadPool SenderThreadPool::ThePool;
-bool DispatchSendItem(const size_t timeoutMs)
+bool SenderThreadPool::dispatchItem(const size_t timeoutMs)
{
SendItem item;
if (SenderQueue::instance().waitDequeue(item, timeoutMs))
@@ -46,4 +47,74 @@ bool DispatchSendItem(const size_t timeoutMs)
return false;
}
+std::shared_ptr<SenderThreadPool::ThreadData> SenderThreadPool::createThread()
+{
+ if (!stopping())
+ {
+ std::shared_ptr<ThreadData> data(std::make_shared<ThreadData>());
+ std::thread thread([this, data]{ threadFunction(data); });
+ data->swap(thread);
+ return data;
+ }
+
+ return nullptr;
+}
+
+bool SenderThreadPool::rebalance()
+{
+ std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
+ if (!lock.try_lock())
+ {
+ // A sibling is likely rebalancing.
+ return false;
+ }
+
+ const auto threadCount = _threads.size();
+ LOG_DBG("SenderThreadPool: rebalancing " << threadCount << " threads.");
+
+ // First cleanup the non-joinables.
+ for (int i = _threads.size() - 1; i >= 0; --i)
+ {
+ if (!_threads[i]->joinable())
+ {
+ _threads.erase(_threads.begin() + i);
+ }
+ }
+
+ LOG_DBG("SenderThreadPool: removed " << threadCount - _threads.size() << " dead threads.");
+
+ while (_threads.size() < _optimalThreadCount && !stopping())
+ {
+ auto newThreadData = createThread();
+ if (newThreadData)
+ {
+ _threads.push_back(newThreadData);
+ }
+ }
+
+ // Need to reduce?
+ return _threads.size() > _optimalThreadCount;
+}
+
+void SenderThreadPool::threadFunction(const std::shared_ptr<ThreadData>& data)
+{
+ LOG_DBG("SenderThread started");
+ while (!stopping())
+ {
+ if (!dispatchItem(HousekeepIdleIntervalMs) && !stopping())
+ {
+ // We timed out. Seems we have more threads than work.
+ if (rebalance())
+ {
+ // We've been considered expendable.
+ LOG_DBG("SenderThread marked to die");
+ break;
+ }
+ }
+ }
+
+ data->detach();
+ LOG_DBG("SenderThread finished");
+}
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 4d80a0c..d95fae7 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -10,12 +10,11 @@
#ifndef INCLUDED_SENDERQUEUE_HPP
#define INCLUDED_SENDERQUEUE_HPP
-#include <functional>
-
-#include <Poco/NotificationQueue.h>
-#include <Poco/Runnable.h>
-#include <Poco/ThreadPool.h>
+#include <deque>
+#include <memory>
+#include <vector>
+#include "common/SigUtil.hpp"
#include "Session.hpp"
#include "Log.hpp"
@@ -34,9 +33,6 @@ public:
std::vector<char>& data() { return _data; }
- Type getType() const { return _type; }
- void setType(const Type type) { _type = type; }
-
/// Returns true if and only if the payload is considered Binary.
bool isBinary() const { return _type == Type::Binary; }
@@ -59,6 +55,13 @@ public:
static SenderQueue& instance() { return TheQueue; }
+ bool stopping() const { return _stop || TerminationFlag; }
+ void stop()
+ {
+ _stop = true;
+ _cv.notify_all();
+ }
+
size_t enqueue(const std::weak_ptr<LOOLSession>& session,
const std::shared_ptr<MessagePayload>& data)
{
@@ -80,16 +83,20 @@ public:
std::unique_lock<std::mutex> lock(_mutex);
- LOG_TRC("SenderQueue size: " << _queue.size());
if (!_queue.empty() ||
- _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty(); }))
+ _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); }))
{
- item = _queue.front();
- _queue.pop_front();
- return true;
+ if (!stopping())
+ {
+ item = _queue.front();
+ _queue.pop_front();
+ return true;
+ }
+
+ LOG_WRN("SenderQueue: stopping");
+ return false;
}
- LOG_WRN("SenderQueue: timeout");
return false;
}
@@ -103,13 +110,84 @@ private:
mutable std::mutex _mutex;
std::condition_variable _cv;
std::deque<SendItem> _queue;
+ std::atomic<bool> _stop;
/// The only SenderQueue instance.
static SenderQueue TheQueue;
};
-/// Dequeue a SendItem and send it.
-bool DispatchSendItem(const size_t timeoutMs = std::numeric_limits<size_t>::max());
+/// Pool of sender threads.
+/// These are dedicated threads that only dequeue from
+/// the SenderQueue and send to the target Session.
+/// This pool has long-running threads that grow
+/// only on congention and shrink otherwise.
+class SenderThreadPool
+{
+public:
+ SenderThreadPool() :
+ _optimalThreadCount(std::min(2U, std::thread::hardware_concurrency())),
+ _stop(false)
+ {
+ LOG_INF("Creating SenderThreadPool with " << _optimalThreadCount << " optimal threads.");
+ for (size_t i = 0; i < _optimalThreadCount; ++i)
+ {
+ _threads.push_back(createThread());
+ }
+ }
+
+ ~SenderThreadPool()
+ {
+ // Stop us and the queue.
+ stop();
+ SenderQueue::instance().stop();
+
+ for (const auto& threadData : _threads)
+ {
+ if (threadData && threadData->joinable())
+ {
+ threadData->join();
+ }
+ }
+ }
+
+ SenderThreadPool& instance() { return ThePool; }
+
+ void stop() { _stop = true; }
+ bool stopping() const { return _stop || TerminationFlag; }
+
+private:
+
+ typedef std::thread ThreadData;
+
+ /// Dequeue a SendItem and send it.
+ bool dispatchItem(const size_t timeoutMs);
+
+ /// Create a new thread and add to the pool.
+ std::shared_ptr<ThreadData> createThread();
+
+ /// Rebalance the number of threads.
+ /// Returns true if we need to reduce the threads.
+ bool rebalance();
+
+ /// The worker thread entry function.
+ void threadFunction(const std::shared_ptr<ThreadData>& data);
+
+private:
+ /// A minimum of 2, but ideally as many as cores.
+ const size_t _optimalThreadCount;
+
+ /// Stop condition to take the pool down.
+ std::atomic<bool> _stop;
+
+ std::vector<std::shared_ptr<ThreadData>> _threads;
+ mutable std::mutex _mutex;
+
+ /// How often to do housekeeping when we idle.
+ static constexpr size_t HousekeepIdleIntervalMs = 60000;
+
+ /// The only pool.
+ static SenderThreadPool ThePool;
+};
#endif
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index beb754c..e76e2ed 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -186,7 +186,6 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
auto& firstSubscriber = tileBeingRendered->_subscribers[0];
SenderQueue::instance().enqueue(firstSubscriber, payload);
- DispatchSendItem();
if (subscriberCount > 1)
{
@@ -205,7 +204,6 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
{
auto& subscriber = tileBeingRendered->_subscribers[i];
SenderQueue::instance().enqueue(subscriber, payload);
- DispatchSendItem();
}
}
}
More information about the Libreoffice-commits
mailing list