[Libreoffice-commits] online.git: 3 commits - wsd/Admin.cpp wsd/Admin.hpp wsd/AdminModel.cpp wsd/AdminModel.hpp wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp
Michael Meeks
michael.meeks at collabora.com
Mon Apr 3 19:47:16 UTC 2017
wsd/Admin.cpp | 36 ++++++++++++-----------------
wsd/Admin.hpp | 5 +---
wsd/AdminModel.cpp | 60 ++++++++++++++++++++++++++++++++++++++++++++-----
wsd/AdminModel.hpp | 12 ++++++++-
wsd/DocumentBroker.cpp | 7 ++++-
wsd/DocumentBroker.hpp | 3 ++
wsd/LOOLWSD.cpp | 7 ++++-
7 files changed, 97 insertions(+), 33 deletions(-)
New commits:
commit 0806986c8cd8147b925aeb167ff0708fac6f9725
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Mon Apr 3 18:21:20 2017 +0100
Admin model locking - major cleanup.
Do everything in the Admin Model in the AdminPoll thread.
Everything else can push work there safely through callbacks.
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 4ab3d821..78da7ac1 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -57,7 +57,6 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */,
return;
}
- std::unique_lock<std::mutex> modelLock(_admin->getLock());
AdminModel& model = _admin->getModel();
if (tokens[0] == "auth")
@@ -262,14 +261,13 @@ bool AdminSocketHandler::handleInitialRequest(
Admin &admin = Admin::instance();
auto handler = std::make_shared<AdminSocketHandler>(&admin, socketWeak, request);
socket->setHandler(handler);
-
- { // FIXME: weird locking around subscribe ...
- std::unique_lock<std::mutex> modelLock(admin.getLock());
- // Subscribe the websocket of any AdminModel updates
- AdminModel& model = admin.getModel();
+ admin.addCallback([handler, sessionId]
+ {
+ Admin &adminIn = Admin::instance();
+ AdminModel& model = adminIn.getModel();
handler->_sessionId = sessionId;
model.subscribe(sessionId, handler);
- }
+ });
return true;
}
@@ -293,7 +291,6 @@ Admin::Admin() :
{
LOG_INF("Admin ctor.");
- std::unique_lock<std::mutex> modelLock(getLock());
const auto totalMem = getTotalMemoryUsage();
LOG_TRC("Total memory used: " << totalMem);
_model.addMemStats(totalMem);
@@ -308,6 +305,8 @@ void Admin::pollingThread()
{
std::chrono::steady_clock::time_point lastCPU, lastMem;
+ _model.setThreadOwner(std::this_thread::get_id());
+
lastCPU = std::chrono::steady_clock::now();
lastMem = lastCPU;
@@ -326,7 +325,6 @@ void Admin::pollingThread()
std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
if (memWait <= 0)
{
- std::unique_lock<std::mutex> modelLock(getLock());
const auto totalMem = getTotalMemoryUsage();
if (totalMem != _lastTotalMemory)
{
@@ -349,21 +347,20 @@ void Admin::pollingThread()
void Admin::addDoc(const std::string& docKey, Poco::Process::PID pid, const std::string& filename, const std::string& sessionId)
{
- std::unique_lock<std::mutex> modelLock(_modelMutex);
- _model.addDocument(docKey, pid, filename, sessionId);
+ addCallback([this, docKey, pid, filename, sessionId]
+ { _model.addDocument(docKey, pid, filename, sessionId); });
}
void Admin::rmDoc(const std::string& docKey, const std::string& sessionId)
{
- std::unique_lock<std::mutex> modelLock(_modelMutex);
- _model.removeDocument(docKey, sessionId);
+ addCallback([this, docKey, sessionId]
+ { _model.removeDocument(docKey, sessionId); });
}
void Admin::rmDoc(const std::string& docKey)
{
- std::unique_lock<std::mutex> modelLock(_modelMutex);
LOG_INF("Removing complete doc [" << docKey << "] from Admin.");
- _model.removeDocument(docKey);
+ addCallback([this, docKey]{ _model.removeDocument(docKey); });
}
void Admin::rescheduleMemTimer(unsigned interval)
@@ -382,8 +379,6 @@ void Admin::rescheduleCpuTimer(unsigned interval)
unsigned Admin::getTotalMemoryUsage()
{
- Util::assertIsLocked(_modelMutex);
-
// To simplify and clarify this; since load, link and pre-init all
// inside the forkit - we should account all of our fixed cost of
// memory to the forkit; and then count only dirty pages in the clients
@@ -413,14 +408,13 @@ AdminModel& Admin::getModel()
void Admin::updateLastActivityTime(const std::string& docKey)
{
- std::unique_lock<std::mutex> modelLock(_modelMutex);
- _model.updateLastActivityTime(docKey);
+ addCallback([this, docKey]{ _model.updateLastActivityTime(docKey); });
}
void Admin::updateMemoryDirty(const std::string& docKey, int dirty)
{
- std::unique_lock<std::mutex> modelLock(_modelMutex);
- _model.updateMemoryDirty(docKey, dirty);
+ addCallback([this, docKey, dirty]
+ { _model.updateMemoryDirty(docKey, dirty); });
}
void Admin::dumpState(std::ostream& os)
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index a816b636..033619a7 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -96,16 +96,15 @@ public:
void rescheduleCpuTimer(unsigned interval);
- std::unique_lock<std::mutex> getLock() { return std::unique_lock<std::mutex>(_modelMutex); }
-
void updateLastActivityTime(const std::string& docKey);
void updateMemoryDirty(const std::string& docKey, int dirty);
void dumpState(std::ostream& os) override;
private:
+ /// The model is accessed only during startup & in
+ /// the Admin Poll thread.
AdminModel _model;
- std::mutex _modelMutex;
int _forKitPid;
long _lastTotalMemory;
diff --git a/wsd/AdminModel.cpp b/wsd/AdminModel.cpp
index 5e1aefc7..1f3e6dcc 100644
--- a/wsd/AdminModel.cpp
+++ b/wsd/AdminModel.cpp
@@ -94,8 +94,26 @@ void Subscriber::unsubscribe(const std::string& command)
_subscriptions.erase(command);
}
+bool AdminModel::isCorrectThread() const
+{
+#if ENABLE_DEBUG
+ // FIXME: share this code [!]
+ const bool sameThread = std::this_thread::get_id() == _owner;
+ if (!sameThread)
+ LOG_WRN("Admin command invoked from foreign thread. Expected: 0x" << std::hex <<
+ _owner << " but called from 0x" << std::this_thread::get_id() << " (" <<
+ std::dec << Util::getThreadId() << ").");
+
+ return sameThread;
+#else
+ return true;
+#endif
+}
+
std::string AdminModel::query(const std::string& command)
{
+ assert (isCorrectThread());
+
const auto token = LOOLProtocol::getFirstToken(command);
if (token == "documents")
{
@@ -132,6 +150,8 @@ std::string AdminModel::query(const std::string& command)
/// Returns memory consumed by all active loolkit processes
unsigned AdminModel::getKitsMemoryUsage()
{
+ assert (isCorrectThread());
+
unsigned totalMem = 0;
unsigned docs = 0;
for (const auto& it : _documents)
@@ -158,6 +178,8 @@ unsigned AdminModel::getKitsMemoryUsage()
void AdminModel::subscribe(int sessionId, const std::weak_ptr<WebSocketHandler>& ws)
{
+ assert (isCorrectThread());
+
const auto ret = _subscribers.emplace(sessionId, Subscriber(sessionId, ws));
if (!ret.second)
{
@@ -167,6 +189,8 @@ void AdminModel::subscribe(int sessionId, const std::weak_ptr<WebSocketHandler>&
void AdminModel::subscribe(int sessionId, const std::string& command)
{
+ assert (isCorrectThread());
+
auto subscriber = _subscribers.find(sessionId);
if (subscriber != _subscribers.end())
{
@@ -176,37 +200,39 @@ void AdminModel::subscribe(int sessionId, const std::string& command)
void AdminModel::unsubscribe(int sessionId, const std::string& command)
{
+ assert (isCorrectThread());
+
auto subscriber = _subscribers.find(sessionId);
if (subscriber != _subscribers.end())
- {
subscriber->second.unsubscribe(command);
- }
}
void AdminModel::addMemStats(unsigned memUsage)
{
+ assert (isCorrectThread());
+
_memStats.push_back(memUsage);
if (_memStats.size() > _memStatsSize)
- {
_memStats.pop_front();
- }
notify("mem_stats " + std::to_string(memUsage));
}
void AdminModel::addCpuStats(unsigned cpuUsage)
{
+ assert (isCorrectThread());
+
_cpuStats.push_back(cpuUsage);
if (_cpuStats.size() > _cpuStatsSize)
- {
_cpuStats.pop_front();
- }
notify("cpu_stats " + std::to_string(cpuUsage));
}
void AdminModel::setCpuStatsSize(unsigned size)
{
+ assert (isCorrectThread());
+
int wasteValuesLen = _cpuStats.size() - size;
while (wasteValuesLen-- > 0)
{
@@ -219,6 +245,8 @@ void AdminModel::setCpuStatsSize(unsigned size)
void AdminModel::setMemStatsSize(unsigned size)
{
+ assert (isCorrectThread());
+
int wasteValuesLen = _memStats.size() - size;
while (wasteValuesLen-- > 0)
{
@@ -231,6 +259,8 @@ void AdminModel::setMemStatsSize(unsigned size)
void AdminModel::notify(const std::string& message)
{
+ assert (isCorrectThread());
+
if (!_subscribers.empty())
{
LOG_TRC("Message to admin console: " << message);
@@ -251,6 +281,8 @@ void AdminModel::notify(const std::string& message)
void AdminModel::addDocument(const std::string& docKey, Poco::Process::PID pid,
const std::string& filename, const std::string& sessionId)
{
+ assert (isCorrectThread());
+
const auto ret = _documents.emplace(docKey, Document(docKey, pid, filename));
ret.first->second.addView(sessionId);
LOG_DBG("Added admin document [" << docKey << "].");
@@ -289,6 +321,8 @@ void AdminModel::addDocument(const std::string& docKey, Poco::Process::PID pid,
void AdminModel::removeDocument(const std::string& docKey, const std::string& sessionId)
{
+ assert (isCorrectThread());
+
auto docIt = _documents.find(docKey);
if (docIt != _documents.end() && !docIt->second.isExpired())
{
@@ -311,6 +345,8 @@ void AdminModel::removeDocument(const std::string& docKey, const std::string& se
void AdminModel::removeDocument(const std::string& docKey)
{
+ assert (isCorrectThread());
+
auto docIt = _documents.find(docKey);
if (docIt != _documents.end())
{
@@ -332,6 +368,8 @@ void AdminModel::removeDocument(const std::string& docKey)
std::string AdminModel::getMemStats()
{
+ assert (isCorrectThread());
+
std::ostringstream oss;
for (const auto& i: _memStats)
{
@@ -343,6 +381,8 @@ std::string AdminModel::getMemStats()
std::string AdminModel::getCpuStats()
{
+ assert (isCorrectThread());
+
std::ostringstream oss;
for (const auto& i: _cpuStats)
{
@@ -354,6 +394,8 @@ std::string AdminModel::getCpuStats()
unsigned AdminModel::getTotalActiveViews()
{
+ assert (isCorrectThread());
+
unsigned numTotalViews = 0;
for (const auto& it: _documents)
{
@@ -368,6 +410,8 @@ unsigned AdminModel::getTotalActiveViews()
std::string AdminModel::getDocuments() const
{
+ assert (isCorrectThread());
+
std::ostringstream oss;
for (const auto& it: _documents)
{
@@ -389,6 +433,8 @@ std::string AdminModel::getDocuments() const
void AdminModel::updateLastActivityTime(const std::string& docKey)
{
+ assert (isCorrectThread());
+
auto docIt = _documents.find(docKey);
if (docIt != _documents.end())
{
@@ -410,6 +456,8 @@ bool Document::updateMemoryDirty(int dirty)
void AdminModel::updateMemoryDirty(const std::string& docKey, int dirty)
{
+ assert (isCorrectThread());
+
auto docIt = _documents.find(docKey);
if (docIt != _documents.end() &&
docIt->second.updateMemoryDirty(dirty))
diff --git a/wsd/AdminModel.hpp b/wsd/AdminModel.hpp
index 8250687c..57bd702b 100644
--- a/wsd/AdminModel.hpp
+++ b/wsd/AdminModel.hpp
@@ -138,7 +138,8 @@ private:
class AdminModel
{
public:
- AdminModel()
+ AdminModel() :
+ _owner(std::this_thread::get_id())
{
LOG_INF("AdminModel ctor.");
}
@@ -148,6 +149,12 @@ public:
LOG_INF("AdminModel dtor.");
}
+ /// All methods here must be called from the Admin socket-poll
+ void setThreadOwner(const std::thread::id &id) { _owner = id; }
+
+ /// In debug mode check that code is running in the correct thread.
+ bool isCorrectThread() const;
+
std::string query(const std::string& command);
/// Returns memory consumed by all active loolkit processes
@@ -199,6 +206,9 @@ private:
std::list<unsigned> _cpuStats;
unsigned _cpuStatsSize = 100;
+
+ // always enabled to avoid ABI change in debug mode ...
+ std::thread::id _owner;
};
#endif
commit 94022e90d9f76b71c2dfde711188736d4c9f8b08
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Fri Mar 31 20:58:33 2017 +0100
Join threads to force a reasonably sensible shutdown sequence.
ie. actually wait until documents are saved and sessions closed.
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index aab632bb..483e7c5e 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -295,6 +295,11 @@ DocumentBroker::~DocumentBroker()
_childProcess.reset();
}
+void DocumentBroker::joinThread()
+{
+ _poll->joinThread();
+}
+
bool DocumentBroker::load(const std::shared_ptr<ClientSession>& session, const std::string& jailId)
{
assert(isCorrectThread());
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 91dfb63a..168734cc 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -225,6 +225,9 @@ public:
//TODO: Take reason to broadcast to clients.
void stop() { _stop = true; }
+ /// Thread safe termination of this broker if it has a lingering thread
+ void joinThread();
+
/// Loads a document from the public URI into the jail.
bool load(const std::shared_ptr<ClientSession>& session, const std::string& jailId);
bool isLoaded() const { return _isLoaded; }
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index a47a3511..1ac9efa2 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2179,6 +2179,8 @@ public:
{
_stop = true;
SocketPoll::wakeupWorld();
+ _acceptPoll.joinThread();
+ WebServerPoll.joinThread();
}
void dumpState(std::ostream& os)
@@ -2458,10 +2460,11 @@ int LOOLWSD::innerMain()
// Wait until documents are saved and sessions closed.
srv.stop();
- WebServerPoll.stop();
// atexit handlers tend to free Admin before Documents
LOG_INF("Cleaning up lingering documents.");
+ for (auto& docBrokerIt : DocBrokers)
+ docBrokerIt.second->joinThread();
DocBrokers.clear();
#ifndef KIT_IN_PROCESS
@@ -2470,6 +2473,8 @@ int LOOLWSD::innerMain()
SigUtil::killChild(ForKitProcId);
#endif
+ PrisonerPoll.joinThread();
+
// Terminate child processes
LOG_INF("Requesting child processes to terminate.");
for (auto& child : NewChildren)
commit 3e1351ec79d5a5804f15ce32f3324ceb93750f47
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Fri Mar 31 17:55:26 2017 +0100
Correct obsolete method name.
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 910f328f..aab632bb 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -759,7 +759,7 @@ size_t DocumentBroker::addSession(const std::shared_ptr<ClientSession>& session)
throw;
}
- // Below values are recalculated when startDestroy() is called (before destroying the
+ // Below values are recalculated when destroyIfLastEditor() is called (before destroying the
// document). It is safe to reset their values to their defaults whenever a new session is added.
_lastEditableSession = false;
_markToDestroy = false;
More information about the Libreoffice-commits
mailing list