[Libreoffice-commits] online.git: wsd/SenderQueue.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Dec 12 05:22:49 UTC 2016


 wsd/SenderQueue.cpp |   73 ++++++++++++++++++++++++++++++++++
 wsd/SenderQueue.hpp |  110 ++++++++++++++++++++++++++++++++++++++++++++--------
 wsd/TileCache.cpp   |    2 
 3 files changed, 166 insertions(+), 19 deletions(-)

New commits:
commit f8d3576556b8ad56dad1f9853b961d9e54430de5
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Dec 11 11:59:46 2016 -0500

    loolwsd: Sender Thread Pool
    
    Change-Id: I36a0ad376dad1e6b0733ebfa930baf1dd5752a8c
    Reviewed-on: https://gerrit.libreoffice.org/31885
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/wsd/SenderQueue.cpp b/wsd/SenderQueue.cpp
index 6a81758..f68255e 100644
--- a/wsd/SenderQueue.cpp
+++ b/wsd/SenderQueue.cpp
@@ -15,8 +15,9 @@
 #include <Log.hpp>
 
 SenderQueue SenderQueue::TheQueue;
+SenderThreadPool SenderThreadPool::ThePool;
 
-bool DispatchSendItem(const size_t timeoutMs)
+bool SenderThreadPool::dispatchItem(const size_t timeoutMs)
 {
     SendItem item;
     if (SenderQueue::instance().waitDequeue(item, timeoutMs))
@@ -46,4 +47,74 @@ bool DispatchSendItem(const size_t timeoutMs)
     return false;
 }
 
+std::shared_ptr<SenderThreadPool::ThreadData> SenderThreadPool::createThread()
+{
+    if (!stopping())
+    {
+        std::shared_ptr<ThreadData> data(std::make_shared<ThreadData>());
+        std::thread thread([this, data]{ threadFunction(data); });
+        data->swap(thread);
+        return data;
+    }
+
+    return nullptr;
+}
+
+bool SenderThreadPool::rebalance()
+{
+    std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
+    if (!lock.try_lock())
+    {
+        // A sibling is likely rebalancing.
+        return false;
+    }
+
+    const auto threadCount = _threads.size();
+    LOG_DBG("SenderThreadPool: rebalancing " << threadCount << " threads.");
+
+    // First cleanup the non-joinables.
+    for (int i = _threads.size() - 1; i >= 0; --i)
+    {
+        if (!_threads[i]->joinable())
+        {
+            _threads.erase(_threads.begin() + i);
+        }
+    }
+
+    LOG_DBG("SenderThreadPool: removed " << threadCount - _threads.size() << " dead threads.");
+
+    while (_threads.size() < _optimalThreadCount && !stopping())
+    {
+        auto newThreadData = createThread();
+        if (newThreadData)
+        {
+            _threads.push_back(newThreadData);
+        }
+    }
+
+    // Need to reduce?
+    return _threads.size() > _optimalThreadCount;
+}
+
+void SenderThreadPool::threadFunction(const std::shared_ptr<ThreadData>& data)
+{
+    LOG_DBG("SenderThread started");
+    while (!stopping())
+    {
+        if (!dispatchItem(HousekeepIdleIntervalMs) && !stopping())
+        {
+            // We timed out. Seems we have more threads than work.
+            if (rebalance())
+            {
+                // We've been considered expendable.
+                LOG_DBG("SenderThread marked to die");
+                break;
+            }
+        }
+    }
+
+    data->detach();
+    LOG_DBG("SenderThread finished");
+}
+
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 4d80a0c..d95fae7 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -10,12 +10,11 @@
 #ifndef INCLUDED_SENDERQUEUE_HPP
 #define INCLUDED_SENDERQUEUE_HPP
 
-#include <functional>
-
-#include <Poco/NotificationQueue.h>
-#include <Poco/Runnable.h>
-#include <Poco/ThreadPool.h>
+#include <deque>
+#include <memory>
+#include <vector>
 
+#include "common/SigUtil.hpp"
 #include "Session.hpp"
 #include "Log.hpp"
 
@@ -34,9 +33,6 @@ public:
 
     std::vector<char>& data() { return _data; }
 
-    Type getType() const { return _type; }
-    void setType(const Type type) { _type = type; }
-
     /// Returns true if and only if the payload is considered Binary.
     bool isBinary() const { return _type == Type::Binary; }
 
@@ -59,6 +55,13 @@ public:
 
     static SenderQueue& instance() { return TheQueue; }
 
+    bool stopping() const { return _stop || TerminationFlag; }
+    void stop()
+    {
+         _stop = true;
+         _cv.notify_all();
+    }
+
     size_t enqueue(const std::weak_ptr<LOOLSession>& session,
                    const std::shared_ptr<MessagePayload>& data)
     {
@@ -80,16 +83,20 @@ public:
 
         std::unique_lock<std::mutex> lock(_mutex);
 
-        LOG_TRC("SenderQueue size: " << _queue.size());
         if (!_queue.empty() ||
-            _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty(); }))
+            _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); }))
         {
-            item = _queue.front();
-            _queue.pop_front();
-            return true;
+            if (!stopping())
+            {
+                item = _queue.front();
+                _queue.pop_front();
+                return true;
+            }
+
+            LOG_WRN("SenderQueue: stopping");
+            return false;
         }
 
-        LOG_WRN("SenderQueue: timeout");
         return false;
     }
 
@@ -103,13 +110,84 @@ private:
     mutable std::mutex _mutex;
     std::condition_variable _cv;
     std::deque<SendItem> _queue;
+    std::atomic<bool> _stop;
 
     /// The only SenderQueue instance.
     static SenderQueue TheQueue;
 };
 
-/// Dequeue a SendItem and send it.
-bool DispatchSendItem(const size_t timeoutMs = std::numeric_limits<size_t>::max());
+/// Pool of sender threads.
+/// These are dedicated threads that only dequeue from
+/// the SenderQueue and send to the target Session.
+/// This pool has long-running threads that grow
+/// only on congention and shrink otherwise.
+class SenderThreadPool
+{
+public:
+    SenderThreadPool() :
+        _optimalThreadCount(std::min(2U, std::thread::hardware_concurrency())),
+        _stop(false)
+    {
+        LOG_INF("Creating SenderThreadPool with " << _optimalThreadCount << " optimal threads.");
+        for (size_t i = 0; i < _optimalThreadCount; ++i)
+        {
+            _threads.push_back(createThread());
+        }
+    }
+
+    ~SenderThreadPool()
+    {
+        // Stop us and the queue.
+        stop();
+        SenderQueue::instance().stop();
+
+        for (const auto& threadData : _threads)
+        {
+            if (threadData && threadData->joinable())
+            {
+                threadData->join();
+            }
+        }
+    }
+
+    SenderThreadPool& instance() { return ThePool; }
+
+    void stop() { _stop = true; }
+    bool stopping() const { return _stop || TerminationFlag; }
+
+private:
+
+    typedef std::thread ThreadData;
+
+    /// Dequeue a SendItem and send it.
+    bool dispatchItem(const size_t timeoutMs);
+
+    /// Create a new thread and add to the pool.
+    std::shared_ptr<ThreadData> createThread();
+
+    /// Rebalance the number of threads.
+    /// Returns true if we need to reduce the threads.
+    bool rebalance();
+
+    /// The worker thread entry function.
+    void threadFunction(const std::shared_ptr<ThreadData>& data);
+
+private:
+    /// A minimum of 2, but ideally as many as cores.
+    const size_t _optimalThreadCount;
+
+    /// Stop condition to take the pool down.
+    std::atomic<bool> _stop;
+
+    std::vector<std::shared_ptr<ThreadData>> _threads;
+    mutable std::mutex _mutex;
+
+    /// How often to do housekeeping when we idle.
+    static constexpr size_t HousekeepIdleIntervalMs = 60000;
+
+    /// The only pool.
+    static SenderThreadPool ThePool;
+};
 
 #endif
 
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index beb754c..e76e2ed 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -186,7 +186,6 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
 
             auto& firstSubscriber = tileBeingRendered->_subscribers[0];
             SenderQueue::instance().enqueue(firstSubscriber, payload);
-            DispatchSendItem();
 
             if (subscriberCount > 1)
             {
@@ -205,7 +204,6 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
                 {
                     auto& subscriber = tileBeingRendered->_subscribers[i];
                     SenderQueue::instance().enqueue(subscriber, payload);
-                    DispatchSendItem();
                 }
             }
         }


More information about the Libreoffice-commits mailing list