[Libreoffice-commits] online.git: 3 commits - wsd/Admin.cpp wsd/Admin.hpp wsd/AdminModel.cpp wsd/AdminModel.hpp wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp

Michael Meeks michael.meeks at collabora.com
Mon Apr 3 19:47:16 UTC 2017


 wsd/Admin.cpp          |   36 ++++++++++++-----------------
 wsd/Admin.hpp          |    5 +---
 wsd/AdminModel.cpp     |   60 ++++++++++++++++++++++++++++++++++++++++++++-----
 wsd/AdminModel.hpp     |   12 ++++++++-
 wsd/DocumentBroker.cpp |    7 ++++-
 wsd/DocumentBroker.hpp |    3 ++
 wsd/LOOLWSD.cpp        |    7 ++++-
 7 files changed, 97 insertions(+), 33 deletions(-)

New commits:
commit 0806986c8cd8147b925aeb167ff0708fac6f9725
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Mon Apr 3 18:21:20 2017 +0100

    Admin model locking - major cleanup.
    
    Do everything in the Admin Model in the AdminPoll thread.
    Everything else can push work there safely through callbacks.

diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 4ab3d821..78da7ac1 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -57,7 +57,6 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */,
         return;
     }
 
-    std::unique_lock<std::mutex> modelLock(_admin->getLock());
     AdminModel& model = _admin->getModel();
 
     if (tokens[0] == "auth")
@@ -262,14 +261,13 @@ bool AdminSocketHandler::handleInitialRequest(
         Admin &admin = Admin::instance();
         auto handler = std::make_shared<AdminSocketHandler>(&admin, socketWeak, request);
         socket->setHandler(handler);
-
-        { // FIXME: weird locking around subscribe ...
-            std::unique_lock<std::mutex> modelLock(admin.getLock());
-            // Subscribe the websocket of any AdminModel updates
-            AdminModel& model = admin.getModel();
+        admin.addCallback([handler, sessionId]
+        {
+            Admin &adminIn = Admin::instance();
+            AdminModel& model = adminIn.getModel();
             handler->_sessionId = sessionId;
             model.subscribe(sessionId, handler);
-        }
+        });
         return true;
     }
 
@@ -293,7 +291,6 @@ Admin::Admin() :
 {
     LOG_INF("Admin ctor.");
 
-    std::unique_lock<std::mutex> modelLock(getLock());
     const auto totalMem = getTotalMemoryUsage();
     LOG_TRC("Total memory used: " << totalMem);
     _model.addMemStats(totalMem);
@@ -308,6 +305,8 @@ void Admin::pollingThread()
 {
     std::chrono::steady_clock::time_point lastCPU, lastMem;
 
+    _model.setThreadOwner(std::this_thread::get_id());
+
     lastCPU = std::chrono::steady_clock::now();
     lastMem = lastCPU;
 
@@ -326,7 +325,6 @@ void Admin::pollingThread()
             std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
         if (memWait <= 0)
         {
-            std::unique_lock<std::mutex> modelLock(getLock());
             const auto totalMem = getTotalMemoryUsage();
             if (totalMem != _lastTotalMemory)
             {
@@ -349,21 +347,20 @@ void Admin::pollingThread()
 
 void Admin::addDoc(const std::string& docKey, Poco::Process::PID pid, const std::string& filename, const std::string& sessionId)
 {
-    std::unique_lock<std::mutex> modelLock(_modelMutex);
-    _model.addDocument(docKey, pid, filename, sessionId);
+    addCallback([this, docKey, pid, filename, sessionId]
+                 { _model.addDocument(docKey, pid, filename, sessionId); });
 }
 
 void Admin::rmDoc(const std::string& docKey, const std::string& sessionId)
 {
-    std::unique_lock<std::mutex> modelLock(_modelMutex);
-    _model.removeDocument(docKey, sessionId);
+    addCallback([this, docKey, sessionId]
+                 { _model.removeDocument(docKey, sessionId); });
 }
 
 void Admin::rmDoc(const std::string& docKey)
 {
-    std::unique_lock<std::mutex> modelLock(_modelMutex);
     LOG_INF("Removing complete doc [" << docKey << "] from Admin.");
-    _model.removeDocument(docKey);
+    addCallback([this, docKey]{ _model.removeDocument(docKey); });
 }
 
 void Admin::rescheduleMemTimer(unsigned interval)
@@ -382,8 +379,6 @@ void Admin::rescheduleCpuTimer(unsigned interval)
 
 unsigned Admin::getTotalMemoryUsage()
 {
-    Util::assertIsLocked(_modelMutex);
-
     // To simplify and clarify this; since load, link and pre-init all
     // inside the forkit - we should account all of our fixed cost of
     // memory to the forkit; and then count only dirty pages in the clients
@@ -413,14 +408,13 @@ AdminModel& Admin::getModel()
 
 void Admin::updateLastActivityTime(const std::string& docKey)
 {
-    std::unique_lock<std::mutex> modelLock(_modelMutex);
-    _model.updateLastActivityTime(docKey);
+    addCallback([this, docKey]{ _model.updateLastActivityTime(docKey); });
 }
 
 void Admin::updateMemoryDirty(const std::string& docKey, int dirty)
 {
-    std::unique_lock<std::mutex> modelLock(_modelMutex);
-    _model.updateMemoryDirty(docKey, dirty);
+    addCallback([this, docKey, dirty]
+                 { _model.updateMemoryDirty(docKey, dirty); });
 }
 
 void Admin::dumpState(std::ostream& os)
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index a816b636..033619a7 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -96,16 +96,15 @@ public:
 
     void rescheduleCpuTimer(unsigned interval);
 
-    std::unique_lock<std::mutex> getLock() { return std::unique_lock<std::mutex>(_modelMutex); }
-
     void updateLastActivityTime(const std::string& docKey);
     void updateMemoryDirty(const std::string& docKey, int dirty);
 
     void dumpState(std::ostream& os) override;
 
 private:
+    /// The model is accessed only during startup & in
+    /// the Admin Poll thread.
     AdminModel _model;
-    std::mutex _modelMutex;
     int _forKitPid;
     long _lastTotalMemory;
 
diff --git a/wsd/AdminModel.cpp b/wsd/AdminModel.cpp
index 5e1aefc7..1f3e6dcc 100644
--- a/wsd/AdminModel.cpp
+++ b/wsd/AdminModel.cpp
@@ -94,8 +94,26 @@ void Subscriber::unsubscribe(const std::string& command)
     _subscriptions.erase(command);
 }
 
+bool AdminModel::isCorrectThread() const
+{
+#if ENABLE_DEBUG
+    // FIXME: share this code [!]
+    const bool sameThread = std::this_thread::get_id() == _owner;
+    if (!sameThread)
+        LOG_WRN("Admin command invoked from foreign thread. Expected: 0x" << std::hex <<
+        _owner << " but called from 0x" << std::this_thread::get_id() << " (" <<
+        std::dec << Util::getThreadId() << ").");
+
+    return sameThread;
+#else
+    return true;
+#endif
+}
+
 std::string AdminModel::query(const std::string& command)
 {
+    assert (isCorrectThread());
+
     const auto token = LOOLProtocol::getFirstToken(command);
     if (token == "documents")
     {
@@ -132,6 +150,8 @@ std::string AdminModel::query(const std::string& command)
 /// Returns memory consumed by all active loolkit processes
 unsigned AdminModel::getKitsMemoryUsage()
 {
+    assert (isCorrectThread());
+
     unsigned totalMem = 0;
     unsigned docs = 0;
     for (const auto& it : _documents)
@@ -158,6 +178,8 @@ unsigned AdminModel::getKitsMemoryUsage()
 
 void AdminModel::subscribe(int sessionId, const std::weak_ptr<WebSocketHandler>& ws)
 {
+    assert (isCorrectThread());
+
     const auto ret = _subscribers.emplace(sessionId, Subscriber(sessionId, ws));
     if (!ret.second)
     {
@@ -167,6 +189,8 @@ void AdminModel::subscribe(int sessionId, const std::weak_ptr<WebSocketHandler>&
 
 void AdminModel::subscribe(int sessionId, const std::string& command)
 {
+    assert (isCorrectThread());
+
     auto subscriber = _subscribers.find(sessionId);
     if (subscriber != _subscribers.end())
     {
@@ -176,37 +200,39 @@ void AdminModel::subscribe(int sessionId, const std::string& command)
 
 void AdminModel::unsubscribe(int sessionId, const std::string& command)
 {
+    assert (isCorrectThread());
+
     auto subscriber = _subscribers.find(sessionId);
     if (subscriber != _subscribers.end())
-    {
         subscriber->second.unsubscribe(command);
-    }
 }
 
 void AdminModel::addMemStats(unsigned memUsage)
 {
+    assert (isCorrectThread());
+
     _memStats.push_back(memUsage);
     if (_memStats.size() > _memStatsSize)
-    {
         _memStats.pop_front();
-    }
 
     notify("mem_stats " + std::to_string(memUsage));
 }
 
 void AdminModel::addCpuStats(unsigned cpuUsage)
 {
+    assert (isCorrectThread());
+
     _cpuStats.push_back(cpuUsage);
     if (_cpuStats.size() > _cpuStatsSize)
-    {
         _cpuStats.pop_front();
-    }
 
     notify("cpu_stats " + std::to_string(cpuUsage));
 }
 
 void AdminModel::setCpuStatsSize(unsigned size)
 {
+    assert (isCorrectThread());
+
     int wasteValuesLen = _cpuStats.size() - size;
     while (wasteValuesLen-- > 0)
     {
@@ -219,6 +245,8 @@ void AdminModel::setCpuStatsSize(unsigned size)
 
 void AdminModel::setMemStatsSize(unsigned size)
 {
+    assert (isCorrectThread());
+
     int wasteValuesLen = _memStats.size() - size;
     while (wasteValuesLen-- > 0)
     {
@@ -231,6 +259,8 @@ void AdminModel::setMemStatsSize(unsigned size)
 
 void AdminModel::notify(const std::string& message)
 {
+    assert (isCorrectThread());
+
     if (!_subscribers.empty())
     {
         LOG_TRC("Message to admin console: " << message);
@@ -251,6 +281,8 @@ void AdminModel::notify(const std::string& message)
 void AdminModel::addDocument(const std::string& docKey, Poco::Process::PID pid,
                              const std::string& filename, const std::string& sessionId)
 {
+    assert (isCorrectThread());
+
     const auto ret = _documents.emplace(docKey, Document(docKey, pid, filename));
     ret.first->second.addView(sessionId);
     LOG_DBG("Added admin document [" << docKey << "].");
@@ -289,6 +321,8 @@ void AdminModel::addDocument(const std::string& docKey, Poco::Process::PID pid,
 
 void AdminModel::removeDocument(const std::string& docKey, const std::string& sessionId)
 {
+    assert (isCorrectThread());
+
     auto docIt = _documents.find(docKey);
     if (docIt != _documents.end() && !docIt->second.isExpired())
     {
@@ -311,6 +345,8 @@ void AdminModel::removeDocument(const std::string& docKey, const std::string& se
 
 void AdminModel::removeDocument(const std::string& docKey)
 {
+    assert (isCorrectThread());
+
     auto docIt = _documents.find(docKey);
     if (docIt != _documents.end())
     {
@@ -332,6 +368,8 @@ void AdminModel::removeDocument(const std::string& docKey)
 
 std::string AdminModel::getMemStats()
 {
+    assert (isCorrectThread());
+
     std::ostringstream oss;
     for (const auto& i: _memStats)
     {
@@ -343,6 +381,8 @@ std::string AdminModel::getMemStats()
 
 std::string AdminModel::getCpuStats()
 {
+    assert (isCorrectThread());
+
     std::ostringstream oss;
     for (const auto& i: _cpuStats)
     {
@@ -354,6 +394,8 @@ std::string AdminModel::getCpuStats()
 
 unsigned AdminModel::getTotalActiveViews()
 {
+    assert (isCorrectThread());
+
     unsigned numTotalViews = 0;
     for (const auto& it: _documents)
     {
@@ -368,6 +410,8 @@ unsigned AdminModel::getTotalActiveViews()
 
 std::string AdminModel::getDocuments() const
 {
+    assert (isCorrectThread());
+
     std::ostringstream oss;
     for (const auto& it: _documents)
     {
@@ -389,6 +433,8 @@ std::string AdminModel::getDocuments() const
 
 void AdminModel::updateLastActivityTime(const std::string& docKey)
 {
+    assert (isCorrectThread());
+
     auto docIt = _documents.find(docKey);
     if (docIt != _documents.end())
     {
@@ -410,6 +456,8 @@ bool Document::updateMemoryDirty(int dirty)
 
 void AdminModel::updateMemoryDirty(const std::string& docKey, int dirty)
 {
+    assert (isCorrectThread());
+
     auto docIt = _documents.find(docKey);
     if (docIt != _documents.end() &&
         docIt->second.updateMemoryDirty(dirty))
diff --git a/wsd/AdminModel.hpp b/wsd/AdminModel.hpp
index 8250687c..57bd702b 100644
--- a/wsd/AdminModel.hpp
+++ b/wsd/AdminModel.hpp
@@ -138,7 +138,8 @@ private:
 class AdminModel
 {
 public:
-    AdminModel()
+    AdminModel() :
+        _owner(std::this_thread::get_id())
     {
         LOG_INF("AdminModel ctor.");
     }
@@ -148,6 +149,12 @@ public:
         LOG_INF("AdminModel dtor.");
     }
 
+    /// All methods here must be called from the Admin socket-poll
+    void setThreadOwner(const std::thread::id &id) { _owner = id; }
+
+    /// In debug mode check that code is running in the correct thread.
+    bool isCorrectThread() const;
+
     std::string query(const std::string& command);
 
     /// Returns memory consumed by all active loolkit processes
@@ -199,6 +206,9 @@ private:
 
     std::list<unsigned> _cpuStats;
     unsigned _cpuStatsSize = 100;
+
+    // always enabled to avoid ABI change in debug mode ...
+    std::thread::id _owner;
 };
 
 #endif
commit 94022e90d9f76b71c2dfde711188736d4c9f8b08
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Fri Mar 31 20:58:33 2017 +0100

    Join threads to force a reasonably sensible shutdown sequence.
    
    ie. actually wait until documents are saved and sessions closed.

diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index aab632bb..483e7c5e 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -295,6 +295,11 @@ DocumentBroker::~DocumentBroker()
     _childProcess.reset();
 }
 
+void DocumentBroker::joinThread()
+{
+    _poll->joinThread();
+}
+
 bool DocumentBroker::load(const std::shared_ptr<ClientSession>& session, const std::string& jailId)
 {
     assert(isCorrectThread());
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 91dfb63a..168734cc 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -225,6 +225,9 @@ public:
     //TODO: Take reason to broadcast to clients.
     void stop() { _stop = true; }
 
+    /// Thread safe termination of this broker if it has a lingering thread
+    void joinThread();
+
     /// Loads a document from the public URI into the jail.
     bool load(const std::shared_ptr<ClientSession>& session, const std::string& jailId);
     bool isLoaded() const { return _isLoaded; }
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index a47a3511..1ac9efa2 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2179,6 +2179,8 @@ public:
     {
         _stop = true;
         SocketPoll::wakeupWorld();
+        _acceptPoll.joinThread();
+        WebServerPoll.joinThread();
     }
 
     void dumpState(std::ostream& os)
@@ -2458,10 +2460,11 @@ int LOOLWSD::innerMain()
 
     // Wait until documents are saved and sessions closed.
     srv.stop();
-    WebServerPoll.stop();
 
     // atexit handlers tend to free Admin before Documents
     LOG_INF("Cleaning up lingering documents.");
+    for (auto& docBrokerIt : DocBrokers)
+        docBrokerIt.second->joinThread();
     DocBrokers.clear();
 
 #ifndef KIT_IN_PROCESS
@@ -2470,6 +2473,8 @@ int LOOLWSD::innerMain()
     SigUtil::killChild(ForKitProcId);
 #endif
 
+    PrisonerPoll.joinThread();
+
     // Terminate child processes
     LOG_INF("Requesting child processes to terminate.");
     for (auto& child : NewChildren)
commit 3e1351ec79d5a5804f15ce32f3324ceb93750f47
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Fri Mar 31 17:55:26 2017 +0100

    Correct obsolete method name.

diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 910f328f..aab632bb 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -759,7 +759,7 @@ size_t DocumentBroker::addSession(const std::shared_ptr<ClientSession>& session)
         throw;
     }
 
-    // Below values are recalculated when startDestroy() is called (before destroying the
+    // Below values are recalculated when destroyIfLastEditor() is called (before destroying the
     // document). It is safe to reset their values to their defaults whenever a new session is added.
     _lastEditableSession = false;
     _markToDestroy = false;


More information about the Libreoffice-commits mailing list