[Libreoffice-commits] online.git: wsd/DocumentBroker.cpp wsd/SenderQueue.cpp wsd/SenderQueue.hpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Dec 12 05:24:29 UTC 2016


 wsd/DocumentBroker.cpp |    6 ++++++
 wsd/SenderQueue.cpp    |   41 ++++++++++++++++++++++++++++++++++++++++-
 wsd/SenderQueue.hpp    |   42 ++++++++++++++++++++++++++++++++++++++----
 3 files changed, 84 insertions(+), 5 deletions(-)

New commits:
commit da97e2ac184d91e5be5382c34566fde35f532c7f
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Dec 11 18:07:26 2016 -0500

    loolwsd: grow the SenderThreadPool dynamically
    
    The pool is checked for expansion before processing
    items from the queue. By tracking the number of idle
    thread (i.e. not currently sending data) we ensure
    that there is at least one idle thread before
    we invoke the socket.
    
    If there is not enough idle threads at that point,
    a new thread is spawned, so long as we're below the
    limit. This guarantees that even if all the active
    threads block on the socket, we'd always have one
    more to process new data (until we reach the limit,
    which is as many client connections as we have).
    
    Technically, a single slow connection could
    still monopolize all connections if there are
    many messages to be sent to it. For that we'd
    need to track and assign one thread per connection,
    something we don't currently do.
    
    Change-Id: Ic8b5e064da068b37bcfa773005495b198763c31d
    Reviewed-on: https://gerrit.libreoffice.org/31886
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 01c4a8c..73de6b1 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -584,6 +584,9 @@ size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session)
     // Now we are ready to bridge between the kit and client.
     session->bridgePrisonerSession();
 
+    // Provision for another thread to service this session.
+    SenderThreadPool::instance().incMaxThreadCount();
+
     return count;
 }
 
@@ -595,6 +598,9 @@ size_t DocumentBroker::removeSession(const std::string& id)
     {
         Admin::instance().rmDoc(_docKey, id);
 
+        // Reduce thread provisioning.
+        SenderThreadPool::instance().decMaxThreadCount();
+
         auto it = _sessions.find(id);
         if (it != _sessions.end())
         {
diff --git a/wsd/SenderQueue.cpp b/wsd/SenderQueue.cpp
index f68255e..0feb463 100644
--- a/wsd/SenderQueue.cpp
+++ b/wsd/SenderQueue.cpp
@@ -25,8 +25,13 @@ bool SenderThreadPool::dispatchItem(const size_t timeoutMs)
         auto session = item.Session.lock();
         if (session)
         {
+            // Make sure we have extra threads before potentially getting stuck.
+            checkAndGrow();
+
             try
             {
+                IdleCountGuard guard(_idleThreadCount);
+
                 const std::vector<char>& data = item.Data->data();
                 if (item.Data->isBinary())
                 {
@@ -42,6 +47,10 @@ bool SenderThreadPool::dispatchItem(const size_t timeoutMs)
                 LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
             }
         }
+        else
+        {
+            LOG_WRN("Discarding send data for expired session.");
+        }
     }
 
     return false;
@@ -60,6 +69,31 @@ std::shared_ptr<SenderThreadPool::ThreadData> SenderThreadPool::createThread()
     return nullptr;
 }
 
+void SenderThreadPool::checkAndGrow()
+{
+    auto queueSize = SenderQueue::instance().size();
+    if (_idleThreadCount <= 1 && queueSize > 1)
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
+
+        // Check again, in case rebalancing already did the trick.
+        queueSize = SenderQueue::instance().size();
+        if (_idleThreadCount <= 1 && queueSize > 1 &&
+            _maxThreadCount > _threads.size() && !stopping())
+        {
+            LOG_TRC("SenderThreadPool: growing. Cur: " << _threads.size() << ", Max: " << _maxThreadCount <<
+                    ", Idles: " << _idleThreadCount << ", Q: " << queueSize);
+
+            // We have room to grow.
+            auto newThreadData = createThread();
+            if (newThreadData)
+            {
+                _threads.push_back(newThreadData);
+            }
+        }
+    }
+}
+
 bool SenderThreadPool::rebalance()
 {
     std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
@@ -81,7 +115,9 @@ bool SenderThreadPool::rebalance()
         }
     }
 
-    LOG_DBG("SenderThreadPool: removed " << threadCount - _threads.size() << " dead threads.");
+    const auto threadCountNew = _threads.size();
+    LOG_DBG("SenderThreadPool: removed " << threadCount - threadCountNew <<
+            " dead threads to have " << threadCountNew << ".");
 
     while (_threads.size() < _optimalThreadCount && !stopping())
     {
@@ -93,12 +129,15 @@ bool SenderThreadPool::rebalance()
     }
 
     // Need to reduce?
+    LOG_DBG("SenderThreadPool: threads: " << _threads.size());
     return _threads.size() > _optimalThreadCount;
 }
 
 void SenderThreadPool::threadFunction(const std::shared_ptr<ThreadData>& data)
 {
     LOG_DBG("SenderThread started");
+    ++_idleThreadCount;
+
     while (!stopping())
     {
         if (!dispatchItem(HousekeepIdleIntervalMs) && !stopping())
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index d95fae7..afd7d95 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -49,7 +49,7 @@ struct SendItem
 };
 
 /// A queue of data to send to certain Sessions.
-class SenderQueue
+class SenderQueue final
 {
 public:
 
@@ -93,7 +93,7 @@ public:
                 return true;
             }
 
-            LOG_WRN("SenderQueue: stopping");
+            LOG_INF("SenderQueue: stopping");
             return false;
         }
 
@@ -121,11 +121,13 @@ private:
 /// the SenderQueue and send to the target Session.
 /// This pool has long-running threads that grow
 /// only on congention and shrink otherwise.
-class SenderThreadPool
+class SenderThreadPool final
 {
 public:
     SenderThreadPool() :
         _optimalThreadCount(std::min(2U, std::thread::hardware_concurrency())),
+        _maxThreadCount(_optimalThreadCount),
+        _idleThreadCount(0),
         _stop(false)
     {
         LOG_INF("Creating SenderThreadPool with " << _optimalThreadCount << " optimal threads.");
@@ -150,13 +152,36 @@ public:
         }
     }
 
-    SenderThreadPool& instance() { return ThePool; }
+    static SenderThreadPool& instance() { return ThePool; }
 
     void stop() { _stop = true; }
     bool stopping() const { return _stop || TerminationFlag; }
 
+    void incMaxThreadCount() { ++_maxThreadCount; }
+    void decMaxThreadCount() { --_maxThreadCount; }
+
 private:
 
+    /// Count idle threads safely.
+    /// Decrements count on ctor, and increments on dtor.
+    class IdleCountGuard final
+    {
+    public:
+        IdleCountGuard(std::atomic<size_t>& var) :
+            _var(var)
+        {
+            --_var;
+        }
+
+        ~IdleCountGuard()
+        {
+            ++_var;
+        }
+
+    private:
+        std::atomic<size_t>& _var;
+    };
+
     typedef std::thread ThreadData;
 
     /// Dequeue a SendItem and send it.
@@ -169,6 +194,9 @@ private:
     /// Returns true if we need to reduce the threads.
     bool rebalance();
 
+    /// Grow the pool if congestion is detected.
+    void checkAndGrow();
+
     /// The worker thread entry function.
     void threadFunction(const std::shared_ptr<ThreadData>& data);
 
@@ -176,6 +204,12 @@ private:
     /// A minimum of 2, but ideally as many as cores.
     const size_t _optimalThreadCount;
 
+    /// Never exceed this number of threads.
+    size_t _maxThreadCount;
+
+    /// The number of threads not sending data.
+    std::atomic<size_t> _idleThreadCount;
+
     /// Stop condition to take the pool down.
     std::atomic<bool> _stop;
 


More information about the Libreoffice-commits mailing list