[Libreoffice-commits] online.git: 13 commits - common/Util.cpp net/loolnb.cpp net/Socket.cpp net/Socket.hpp net/SslSocket.hpp net/WebSocketHandler.hpp wsd/Admin.cpp wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Mar 20 04:47:27 UTC 2017
common/Util.cpp | 4 +
net/Socket.cpp | 13 +++
net/Socket.hpp | 20 +++--
net/SslSocket.hpp | 3
net/WebSocketHandler.hpp | 15 ++--
net/loolnb.cpp | 9 +-
wsd/Admin.cpp | 8 +-
wsd/ClientSession.cpp | 34 ++++++++-
wsd/ClientSession.hpp | 18 -----
wsd/DocumentBroker.cpp | 44 ++++++++----
wsd/DocumentBroker.hpp | 15 ++--
wsd/LOOLWSD.cpp | 167 +++++++++++++++++++++++------------------------
12 files changed, 203 insertions(+), 147 deletions(-)
New commits:
commit d0bb5cbdc7a2a76877b17bbc4283a76ff8cc4d52
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Mon Mar 20 00:41:43 2017 -0400
wsd: restore forkit after crash
Change-Id: Iacfcbfbf922897ea1bb9896d01a9a8afd4e194cc
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index e096ade4..226fbf58 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -288,7 +288,7 @@ bool cleanupDocBrokers()
/// Forks as many children as requested.
/// Returns the number of children requested to spawn,
/// -1 for error.
-static bool forkChildren(const int number)
+static int forkChildren(const int number)
{
Util::assertIsLocked(NewChildrenMutex);
@@ -417,7 +417,20 @@ std::shared_ptr<ChildProcess> getNewChild_Blocks()
if (rebalanceChildren(numPreSpawn) < 0)
{
// Fatal. Let's fail and retry at a higher level.
- LOG_DBG("getNewChild: rebalancing of children failed.");
+ LOG_DBG("getNewChild: rebalancing of children failed. Checking and restoring forkit.");
+
+ lockb.unlock();
+ locka.unlock();
+ LOOLWSD::checkAndRestoreForKit();
+ if (chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - startTime).count() <
+ CHILD_TIMEOUT_MS * 4)
+ {
+ // Try again.
+ locka.lock();
+ lockb.lock();
+ continue;
+ }
+
return nullptr;
}
@@ -1025,7 +1038,7 @@ bool LOOLWSD::checkAndRestoreForKit()
{
// Should never fail.
LOG_FTL("Failed to spawn loolforkit.");
- return Application::EXIT_SOFTWARE;
+ SigUtil::requestShutdown();
}
}
commit 53da72a1dc10f4f5f121bc167680493a123c0b88
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 21:45:53 2017 -0400
wsd: fix hot looping the poll
When not sending ping the ping time is not set
which results in the setting the poll timeout to
a negative value, forcing it to return immediately.
This happens when sending ping before upgrading
to WebSocket, which isn't common. One way to
reproduce it, however, is to connect to the
admin console with an unauthenticated socket.
Change-Id: I9f3db1a02b8f8e2781d23d843e848068ad434958
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 72466d8f..9fe7aab6 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -262,6 +262,7 @@ public:
if (_wsState == WSState::WS)
{
LOG_WRN("Attempted ping on non-upgraded websocket!");
+ _pingSent = now; // Pretend we sent it to avoid timing out immediately.
return;
}
LOG_TRC("Send ping message");
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 19819e26..bb235156 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -64,8 +64,8 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */,
{
if (tokens.count() < 2)
{
+ LOG_DBG("Auth command without any token");
sendFrame("InvalidAuthToken");
- LOG_TRC("Auth command without any token");
shutdown();
return;
}
@@ -84,8 +84,8 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */,
}
else
{
+ LOG_DBG("Invalid auth token");
sendFrame("InvalidAuthToken");
- LOG_TRC("Invalid auth token");
shutdown();
return;
}
@@ -93,10 +93,10 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */,
if (!_isAuthenticated)
{
+ LOG_DBG("Not authenticated - message is '" << firstLine << "' " <<
+ tokens.count() << " first: '" << tokens[0] << "'");
sendFrame("NotAuthenticated");
shutdown();
- LOG_TRC("Not authenticated - message is '" << firstLine << "' "
- << tokens.count() << " first: '" << tokens[0] << "'");
return;
}
else if (tokens[0] == "documents" ||
commit e9675ed6e1022e449c13802b9e2929f059a7fdf1
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 19:07:42 2017 -0400
wsd: close socket when WS close handshake is complete
We shouldn't send any more data after the client
shuts down, or after we initiate shutdown.
Change-Id: Ibf0cf61dcabe9d02ddcb7eb40b2df23712c5a136
diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp
index 6f134e89..3654c4b7 100644
--- a/net/SslSocket.hpp
+++ b/net/SslSocket.hpp
@@ -70,6 +70,9 @@ public:
// Complete the bidirectional shutdown.
SSL_shutdown(_ssl);
}
+
+ // Close the TCP Socket.
+ Socket::shutdown();
}
bool readIncomingData() override
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 8a535cb9..72466d8f 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -225,7 +225,7 @@ public:
}
// TCP Close.
- socket->shutdown();
+ socket->closeConnection();
break;
default:
handleMessage(fin, code, _wsPayload);
commit 5a3b81216716b3be63e109295247026fae6f0600
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 19:01:03 2017 -0400
wsd: stop DocBroker poll and terminate child when no more sessions
Normally the last session either stopped and terminated
the DocBroker upon saving the doc, or immediately upon
removal when nothing to save.
However, saving could fail, or the session could be
disconnected by the client, or saving could timeout, etc.
In all those failure scenarios the DocBroker should
not linger as a zombie (alive but without sessions).
Here we detect that we are left with no sessions
and terminate correctly.
Change-Id: I31862234e321f63e686f54fa69daacc1fa06ae75
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 2fd40d6e..de09bccb 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -252,8 +252,18 @@ void DocumentBroker::pollThread()
autoSave(false);
last30SecCheckTime = std::chrono::steady_clock::now();
}
+
+ // If all sessions have been removed, no reason to linger.
+ if (_sessions.empty())
+ {
+ LOG_INF("No more sessions in doc [" << _docKey << "]. Terminating.");
+ _stop = true;
+ }
}
+ // Terminate properly while we can.
+ auto lock = getLock();
+ terminateChild(lock, "", false);
LOG_INF("Finished docBroker polling thread for docKey [" << _docKey << "].");
}
commit 47bbbbb2dcf613e1f7816ab0b963427341c643bd
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 18:57:31 2017 -0400
wsd: correct prespawning of children and simplify
The prisoner poll should wake every so often
to check and rebalance the new children.
However this didn't happen correctly and
WSD would starve of children every so often.
The frequency of checking and rebalancing of
children should be reviewed and optimized.
Also simplified the code to avoid rebalancing
DocBrokers and only do NewChildren.
Change-Id: Id3be34ed3a47c739b606ee7969088397d3807e7a
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index f61c3381..e096ade4 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -290,7 +290,6 @@ bool cleanupDocBrokers()
/// -1 for error.
static bool forkChildren(const int number)
{
- Util::assertIsLocked(DocBrokersMutex);
Util::assertIsLocked(NewChildrenMutex);
if (number > 0)
@@ -328,19 +327,17 @@ static bool cleanupChildren()
{
Util::assertIsLocked(NewChildrenMutex);
- bool removed = false;
- for (int i = NewChildren.size() - 1; i >= 0; --i)
+ const int count = NewChildren.size();
+ for (int i = count - 1; i >= 0; --i)
{
if (!NewChildren[i]->isAlive())
{
LOG_WRN("Removing dead spare child [" << NewChildren[i]->getPid() << "].");
-
NewChildren.erase(NewChildren.begin() + i);
- removed = true;
}
}
- return removed;
+ return static_cast<int>(NewChildren.size()) != count;
}
/// Decides how many children need spawning and spanws.
@@ -349,7 +346,6 @@ static bool cleanupChildren()
/// -1 for error.
static int rebalanceChildren(int balance)
{
- Util::assertIsLocked(DocBrokersMutex);
Util::assertIsLocked(NewChildrenMutex);
// Do the cleanup first.
@@ -386,26 +382,9 @@ static int rebalanceChildren(int balance)
/// Returns true only if at least one child was requested to spawn.
static bool prespawnChildren()
{
-#if 1 // FIXME: why re-balance DockBrokers here ? ...
- // First remove dead DocBrokers, if possible.
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex, std::defer_lock);
- if (!docBrokersLock.try_lock())
- {
- // Busy, try again later.
- return false;
- }
-
- cleanupDocBrokers();
-#endif
-
+ // Rebalance if not forking already.
std::unique_lock<std::mutex> lock(NewChildrenMutex, std::defer_lock);
- if (!lock.try_lock())
- {
- // We are forking already? Try later.
- return false;
- }
-
- return rebalanceChildren(LOOLWSD::NumPreSpawnedChildren) > 0;
+ return lock.try_lock() && (rebalanceChildren(LOOLWSD::NumPreSpawnedChildren) > 0);
}
static size_t addNewChild(const std::shared_ptr<ChildProcess>& child)
@@ -1220,10 +1199,6 @@ bool LOOLWSD::createForKit()
// Init the Admin manager
Admin::instance().setForKitPid(ForKitProcId);
- // Wake the prisoner poll to spawn some children, if necessary.
- PrisonerPoll.wakeup();
- // FIXME: horrors with try_lock in prespawnChildren ...
-
return (ForKitProcId != -1);
#endif
}
@@ -2482,14 +2457,20 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
mainWait.poll(SocketPoll::DefaultPollTimeoutMs * 2);
+ // Wake the prisoner poll to spawn some children, if necessary.
+ PrisonerPoll.wakeup();
+
// Unit test timeout
if (std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - startStamp).count() <
UnitWSD::get().getTimeoutMilliSeconds())
UnitWSD::get().timeout();
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
- cleanupDocBrokers();
+ std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex, std::defer_lock);
+ if (docBrokersLock.try_lock())
+ {
+ cleanupDocBrokers();
+ }
#if ENABLE_DEBUG
if (careerSpanSeconds > 0 && time(nullptr) > startTimeSpan + careerSpanSeconds)
commit 6283dbd9ccbd450d362a9d6d9c575726640df514
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 18:54:07 2017 -0400
wsd: copy and un-mask web-socket data at the same time
Change-Id: I2a4831065ae0a81f20d0513b0772d7d427ffc4ea
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 9cb6a1d6..8a535cb9 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -185,11 +185,11 @@ public:
if (hasMask)
{
+ const size_t end = _wsPayload.size();
+ _wsPayload.resize(end + payloadLen);
+ char* wsData = &_wsPayload[end];
for (size_t i = 0; i < payloadLen; ++i)
- data[i] = data[i] ^ mask[i % 4];
-
- // FIXME: copy and un-mask at the same time ...
- _wsPayload.insert(_wsPayload.end(), data, data + payloadLen);
+ *wsData++ = data[i] ^ mask[i % 4];
} else
_wsPayload.insert(_wsPayload.end(), data, data + payloadLen);
commit 714eeac3aae724c25160c6b38ae8a4cf6bafbd35
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 14:49:52 2017 -0400
wsd: simplify DocBroker creation
Change-Id: Icc93af2e32ce544c42cc65bbea83b9539c044db9
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 75b766b6..2fd40d6e 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -159,7 +159,8 @@ DocumentBroker::DocumentBroker(const std::string& uri,
assert(!_docKey.empty());
assert(!_childRoot.empty());
- LOG_INF("DocumentBroker [" << _uriPublic.toString() << "] created. DocKey: [" << _docKey << "]");
+ LOG_INF("DocumentBroker [" << _uriPublic.toString() <<
+ "] created with docKey [" << _docKey << "] and root [" << _childRoot << "]");
}
void DocumentBroker::startThread()
@@ -202,6 +203,7 @@ void DocumentBroker::pollThread()
}
_childProcess->setDocumentBroker(shared_from_this());
+ LOG_INF("Doc [" << _docKey << "] attached to child [" << _childProcess->getPid() << "].");
auto last30SecCheckTime = std::chrono::steady_clock::now();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 8508776b..f61c3381 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1232,30 +1232,6 @@ bool LOOLWSD::createForKit()
std::mutex Connection::Mutex;
#endif
-static std::shared_ptr<DocumentBroker> createDocBroker(WebSocketHandler& ws,
- const std::string& uri,
- const std::string& docKey,
- const Poco::URI& uriPublic)
-{
- Util::assertIsLocked(DocBrokersMutex);
-
- static_assert(MAX_DOCUMENTS > 0, "MAX_DOCUMENTS must be positive");
- if (DocBrokers.size() + 1 > MAX_DOCUMENTS)
- {
- LOG_ERR("Maximum number of open documents reached.");
- shutdownLimitReached(ws);
- return nullptr;
- }
-
- // Set the one we just created.
- LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
- auto docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, LOOLWSD::ChildRoot);
- DocBrokers.emplace(docKey, docBroker);
- LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "].");
-
- return docBroker;
-}
-
/// Find the DocumentBroker for the given docKey, if one exists.
/// Otherwise, creates and adds a new one to DocBrokers.
/// May return null if terminating or MaxDocuments limit is reached.
@@ -1312,7 +1288,23 @@ static std::shared_ptr<DocumentBroker> findOrCreateDocBroker(WebSocketHandler& w
ws.sendFrame(statusConnect);
if (!docBroker)
- docBroker = createDocBroker(ws, uri, docKey, uriPublic);
+ {
+ Util::assertIsLocked(DocBrokersMutex);
+
+ static_assert(MAX_DOCUMENTS > 0, "MAX_DOCUMENTS must be positive");
+ if (DocBrokers.size() + 1 > MAX_DOCUMENTS)
+ {
+ LOG_ERR("Maximum number of open documents reached.");
+ shutdownLimitReached(ws);
+ return nullptr;
+ }
+
+ // Set the one we just created.
+ LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
+ docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, LOOLWSD::ChildRoot);
+ DocBrokers.emplace(docKey, docBroker);
+ LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "].");
+ }
return docBroker;
}
commit 0d3ea2bbfdc3a619228beb171688885376082065
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 13:22:12 2017 -0400
wsd: flag thread start before creating thread
This prevents a race where the thread is started
a second time before the first gets a chance to
set the flag.
Change-Id: Ib106aa0626cdfa403b321822180b0545d3aa9139
diff --git a/net/Socket.cpp b/net/Socket.cpp
index 7d165bb9..0e5d6eab 100644
--- a/net/Socket.cpp
+++ b/net/Socket.cpp
@@ -89,8 +89,17 @@ void SocketPoll::startThread()
{
if (!_threadStarted)
{
- _thread = std::thread(&SocketPoll::pollingThreadEntry, this);
- _owner = _thread.get_id();
+ _threadStarted = true;
+ try
+ {
+ _thread = std::thread(&SocketPoll::pollingThreadEntry, this);
+ _owner = _thread.get_id();
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Failed to start poll thread: " << exc.what());
+ _threadStarted = false;
+ }
}
}
diff --git a/net/Socket.hpp b/net/Socket.hpp
index fa5c0971..c40802f1 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -464,8 +464,6 @@ private:
/// Used to set the thread name and mark the thread as stopped when done.
void pollingThreadEntry()
{
- _threadStarted = true;
-
try
{
Util::setThreadName(_name);
commit 2ba2e213bbe4f0561bd0916b4fc9a61777824e32
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 19 10:25:06 2017 -0400
wsd: support rude termination of documents
Termination should normally be initiated by the
DocumentBroker in question, so sending of termination
message on the sockets come from the correct thread.
When termination happens from elsewhere
(f.e. cleanupDocBrokers) we cannot send socket
messages, and have to resort to rude termination.
Change-Id: I94acb7b314f5dbdc45c57049fc1ac8527ba72fb9
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index af0265c3..181fae67 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -605,7 +605,7 @@ bool ClientSession::handleKitToClientMessage(const char* buffer, const int lengt
// Now terminate.
auto lock = docBroker->getLock();
- docBroker->terminateChild(lock, "");
+ docBroker->terminateChild(lock, "", true);
}
return true;
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index e1362ec6..75b766b6 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -1252,7 +1252,7 @@ void DocumentBroker::childSocketTerminated()
}
}
-void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason)
+void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason, const bool rude)
{
Util::assertIsLocked(_mutex);
Util::assertIsLocked(lock);
@@ -1260,15 +1260,18 @@ void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const st
LOG_INF("Terminating doc [" << _docKey << "].");
// Close all running sessions
- for (const auto& pair : _sessions)
+ if (!rude)
{
- try
- {
- pair.second->shutdown(WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY, closeReason);
- }
- catch (const std::exception& ex)
+ for (const auto& pair : _sessions)
{
- LOG_ERR("Error while terminating client connection [" << pair.first << "]: " << ex.what());
+ try
+ {
+ pair.second->shutdown(WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY, closeReason);
+ }
+ catch (const std::exception& ex)
+ {
+ LOG_ERR("Error while terminating client connection [" << pair.first << "]: " << ex.what());
+ }
}
}
@@ -1278,12 +1281,15 @@ void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const st
// First flag to stop as it might be waiting on our lock
// to process some incoming message.
- _childProcess->stop();
+ if (!rude)
+ {
+ _childProcess->stop();
+ }
// Release the lock and wait for the thread to finish.
lock.unlock();
- _childProcess->close(false);
+ _childProcess->close(rude);
}
// Stop the polling thread.
@@ -1295,7 +1301,7 @@ void DocumentBroker::closeDocument(const std::string& reason)
auto lock = getLock();
LOG_DBG("Closing DocumentBroker for docKey [" << _docKey << "] with reason: " << reason);
- terminateChild(lock, reason);
+ terminateChild(lock, reason, true);
}
void DocumentBroker::updateLastActivityTime()
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 6762ad8d..744cf102 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -115,12 +115,15 @@ public:
{
LOG_DBG("Closing ChildProcess [" << _pid << "].");
- // First mark to stop the thread so it knows it's intentional.
- stop();
+ if (!rude)
+ {
+ // First mark to stop the thread so it knows it's intentional.
+ stop();
- // Shutdown the socket to break the thread if blocked on it.
- if (_ws)
- _ws->shutdown();
+ // Shutdown the socket.
+ if (_ws)
+ _ws->shutdown();
+ }
}
catch (const std::exception& ex)
{
@@ -317,7 +320,7 @@ public:
/// We must be called under lock and it must be
/// passed to us so we unlock before waiting on
/// the ChildProcess thread, which can take our lock.
- void terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason);
+ void terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason, const bool rude);
/// Get the PID of the associated child process
Poco::Process::PID getPid() const { return _childProcess->getPid(); }
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 7b24449b..8508776b 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -249,7 +249,7 @@ bool cleanupDocBrokers()
{
LOG_INF("Terminating " << (idle ? "idle" : "dead") <<
" DocumentBroker for docKey [" << it->first << "].");
- docBroker->terminateChild(lock, idle ? "idle" : "");
+ docBroker->terminateChild(lock, idle ? "idle" : "", true);
// Remove only when not alive.
if (!docBroker->isAlive())
@@ -1335,7 +1335,7 @@ static void removeDocBrokerSession(const std::shared_ptr<DocumentBroker>& docBro
{
LOG_INF("Removing unloaded DocumentBroker for docKey [" << docKey << "].");
DocBrokers.erase(docKey);
- docBroker->terminateChild(lock, "");
+ docBroker->terminateChild(lock, "", true);
}
}
commit 14779f5cd4a966590442bda4fc8878a086d84df3
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sat Mar 18 23:22:56 2017 -0400
wsd: return moved socket state to stop any IO processing
Once a socket has changed ownership to a new
poll it will assert thread affinity with said
new poll. So we cannot do any IO on the old
poll's thread at that point and on.
Change-Id: I662f188dea7c377a18f3e546839ec43f2875dc7b
diff --git a/net/Socket.hpp b/net/Socket.hpp
index bcdb518d..fa5c0971 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -79,7 +79,7 @@ public:
int &timeoutMaxMs) = 0;
/// Handle results of events returned from poll
- enum class HandleResult { CONTINUE, SOCKET_CLOSED };
+ enum class HandleResult { CONTINUE, SOCKET_CLOSED, MOVED };
virtual HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) = 0;
/// manage latency issues around packet aggregation
@@ -518,8 +518,14 @@ public:
/// Will be called exactly once.
virtual void onConnect(const std::weak_ptr<StreamSocket>& socket) = 0;
+ enum class SocketOwnership
+ {
+ UNCHANGED, //< Same socket poll, business as usual.
+ MOVED //< The socket poll is now different.
+ };
+
/// Called after successful socket reads.
- virtual void handleIncomingMessage() = 0;
+ virtual SocketHandlerInterface::SocketOwnership handleIncomingMessage() = 0;
/// Prepare our poll record; adjust @timeoutMaxMs downwards
/// for timeouts, based on current time @now.
@@ -708,7 +714,8 @@ protected:
while (!_inBuffer.empty() && oldSize != _inBuffer.size())
{
oldSize = _inBuffer.size();
- _socketHandler->handleIncomingMessage();
+ if (_socketHandler->handleIncomingMessage() == SocketHandlerInterface::SocketOwnership::MOVED)
+ return Socket::HandleResult::MOVED;
}
do
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 8bdd04f6..9cb6a1d6 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -238,10 +238,12 @@ public:
}
/// Implementation of the SocketHandlerInterface.
- virtual void handleIncomingMessage() override
+ virtual SocketHandlerInterface::SocketOwnership handleIncomingMessage() override
{
while (handleOneIncomingMessage())
; // can have multiple msgs in one recv'd packet.
+
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
int getPollEvents(std::chrono::steady_clock::time_point now,
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 566ab1eb..a014173a 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -45,7 +45,7 @@ public:
{
}
- virtual void handleIncomingMessage() override
+ virtual SocketHandlerInterface::SocketOwnership handleIncomingMessage() override
{
LOG_TRC("incoming WebSocket message");
if (_wsState == WSState::HTTP)
@@ -89,17 +89,16 @@ public:
std::string str = oss.str();
socket->_outBuffer.insert(socket->_outBuffer.end(), str.begin(), str.end());
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
else if (tokens.count() == 2 && tokens[1] == "ws")
{
-
upgradeToWebSocket(req);
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
}
- WebSocketHandler::handleIncomingMessage();
+ return WebSocketHandler::handleIncomingMessage();
}
virtual void handleMessage(const bool fin, const WSOpCode code, std::vector<char> &data) override
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 08adefd0..af0265c3 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -51,12 +51,13 @@ ClientSession::~ClientSession()
stop();
}
-void ClientSession::handleIncomingMessage()
+SocketHandlerInterface::SocketOwnership ClientSession::handleIncomingMessage()
{
if (UnitWSD::get().filterHandleRequest(
UnitWSD::TestRequest::Client, *this))
- return;
- Session::handleIncomingMessage();
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
+
+ return Session::handleIncomingMessage();
}
bool ClientSession::_handleInput(const char *buffer, int length)
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index 664f30d5..19aee2af 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -30,7 +30,7 @@ public:
virtual ~ClientSession();
- void handleIncomingMessage() override;
+ SocketHandlerInterface::SocketOwnership handleIncomingMessage() override;
void setReadOnly();
bool isReadOnly() const { return _isReadOnly; }
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 2e786a0d..7b24449b 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1424,17 +1424,16 @@ private:
}
/// Called after successful socket reads.
- void handleIncomingMessage() override
+ SocketHandlerInterface::SocketOwnership handleIncomingMessage() override
{
if (UnitWSD::get().filterHandleRequest(
UnitWSD::TestRequest::Prisoner, *this))
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
if (_childProcess.lock())
{
// FIXME: inelegant etc. - derogate to websocket code
- WebSocketHandler::handleIncomingMessage();
- return;
+ return WebSocketHandler::handleIncomingMessage();
}
auto socket = _socket.lock();
@@ -1447,7 +1446,7 @@ private:
if (itBody == in.end())
{
LOG_TRC("#" << socket->getFD() << " doesn't have enough data yet.");
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
// Skip the marker.
@@ -1479,7 +1478,7 @@ private:
if (request.getURI().find(NEW_CHILD_URI) != 0)
{
LOG_ERR("Invalid incoming URI.");
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
// New Child is spawned.
@@ -1500,7 +1499,7 @@ private:
if (pid <= 0)
{
LOG_ERR("Invalid PID in child URI [" << request.getURI() << "].");
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
LOG_INF("New child [" << pid << "].");
@@ -1522,8 +1521,9 @@ private:
{
// Probably don't have enough data just yet.
// TODO: timeout if we never get enough.
- return;
}
+
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
/// Prisoner websocket fun ... (for now)
@@ -1586,9 +1586,10 @@ private:
LOG_ERR("onDisconnect");
}
- void handleIncomingMessage() override
+ SocketHandlerInterface::SocketOwnership handleIncomingMessage() override
{
LOG_ERR("handleIncomingMessage");
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
int getPollEvents(std::chrono::steady_clock::time_point /* now */,
@@ -1626,7 +1627,7 @@ private:
}
/// Called after successful socket reads.
- void handleIncomingMessage() override
+ SocketHandlerInterface::SocketOwnership handleIncomingMessage() override
{
auto socket = _socket.lock();
std::vector<char>& in = socket->_inBuffer;
@@ -1638,7 +1639,7 @@ private:
if (itBody == in.end())
{
LOG_TRC("#" << socket->getFD() << " doesn't have enough data yet.");
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
// Skip the marker.
@@ -1673,16 +1674,17 @@ private:
if (contentLength != Poco::Net::HTTPMessage::UNKNOWN_CONTENT_LENGTH && available < contentLength)
{
LOG_DBG("Not enough content yet: ContentLength: " << contentLength << ", available: " << available);
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
}
catch (const std::exception& exc)
{
// Probably don't have enough data just yet.
// TODO: timeout if we never get enough.
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
+ SocketHandlerInterface::SocketOwnership socketOwnership = SocketHandlerInterface::SocketOwnership::UNCHANGED;
try
{
// Routing
@@ -1732,7 +1734,7 @@ private:
}
else if (reqPathTokens.count() > 2 && reqPathTokens[0] == "lool" && reqPathTokens[2] == "ws")
{
- handleClientWsUpgrade(request, reqPathTokens[1]);
+ socketOwnership = handleClientWsUpgrade(request, reqPathTokens[1]);
}
else
{
@@ -1759,6 +1761,8 @@ private:
// TODO: Send back failure.
// NOTE: Check _wsState to choose between HTTP response or WebSocket (app-level) error.
}
+
+ return socketOwnership;
}
int getPollEvents(std::chrono::steady_clock::time_point /* now */,
@@ -2097,7 +2101,7 @@ private:
throw BadRequestException("Invalid or unknown request.");
}
- void handleClientWsUpgrade(const Poco::Net::HTTPRequest& request, const std::string& url)
+ SocketHandlerInterface::SocketOwnership handleClientWsUpgrade(const Poco::Net::HTTPRequest& request, const std::string& url)
{
// requestHandler = new ClientRequestHandler();
LOG_INF("Client WS request: " << request.getURI() << ", url: " << url);
@@ -2109,7 +2113,7 @@ private:
{
LOG_ERR("Limit on maximum number of connections of " << MAX_CONNECTIONS << " reached.");
shutdownLimitReached(ws);
- return;
+ return SocketHandlerInterface::SocketOwnership::UNCHANGED;
}
LOG_INF("Starting GET request handler for session [" << _id << "] on url [" << url << "].");
@@ -2137,6 +2141,8 @@ private:
LOG_INF("URL [" << url << "] is " << (isReadOnly ? "readonly" : "writable") << ".");
+ SocketHandlerInterface::SocketOwnership socketOwnership = SocketHandlerInterface::SocketOwnership::UNCHANGED;
+
// Request a kit process for this doc.
auto docBroker = findOrCreateDocBroker(ws, url, docKey, _id, uriPublic);
if (docBroker)
@@ -2149,12 +2155,13 @@ private:
auto socket = _socket.lock();
if (socket)
{
+ // Set the ClientSession to handle Socket events.
+ socket->setHandler(clientSession);
+
// Move the socket into DocBroker.
WebServerPoll.releaseSocket(socket);
docBroker->addSocketToPoll(socket);
-
- // Set the ClientSession to handle Socket events.
- socket->setHandler(clientSession);
+ socketOwnership = SocketHandlerInterface::SocketOwnership::MOVED;
}
docBroker->startThread();
}
@@ -2163,6 +2170,8 @@ private:
}
else
LOG_WRN("Failed to create DocBroker with docKey [" << docKey << "].");
+
+ return socketOwnership;
}
private:
commit 6e7bd4bcf039a7ef86c56ccff566c7391ad07a4c
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sat Mar 18 22:21:29 2017 -0400
wsd: cannot broadcast alert messages from random thread
Will have to come up with a different solution
to broadcasting alerts to all users.
Change-Id: I00260402f71c516f4335c592b10dee7555dc67a6
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index af5854f1..2e786a0d 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -215,10 +215,11 @@ void alertAllUsersInternal(const std::string& msg)
LOG_INF("Alerting all users: [" << msg << "]");
- for (auto& brokerIt : DocBrokers)
+ //FIXME loolnb: due to thread-affinity of sockets we can't send from here.
+ // for (auto& brokerIt : DocBrokers)
{
- auto lock = brokerIt.second->getLock();
- brokerIt.second->alertAllUsers(msg);
+ // auto lock = brokerIt.second->getLock();
+ // brokerIt.second->alertAllUsers(msg);
}
}
}
commit 7096133f07ddaac2c7c0895ac6f26e043cffdbb3
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sat Mar 18 22:19:41 2017 -0400
wsd: log thread affinity violations
Change-Id: Ib1317bc71f9162f005e0ce9b8c715bbce656db73
diff --git a/common/Util.cpp b/common/Util.cpp
index ce1730b0..16335fe3 100644
--- a/common/Util.cpp
+++ b/common/Util.cpp
@@ -264,6 +264,10 @@ namespace Util
{
LOG_SYS("Cannot set thread name to " << s << ".");
}
+ else
+ {
+ LOG_INF("Thread " << std::hex << std::this_thread::get_id() << std::dec << " is now called " << s);
+ }
}
void getVersionInfo(std::string& version, std::string& hash)
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 50e84a39..bcdb518d 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -193,7 +193,10 @@ public:
virtual bool isCorrectThread(bool hard = false)
{
#if ENABLE_DEBUG
- bool sameThread = std::this_thread::get_id() == _owner;
+ const bool sameThread = std::this_thread::get_id() == _owner;
+ if (!sameThread)
+ LOG_WRN("#" << _fd << " invoked from foreign thread. Expected: " <<
+ std::hex << _owner << std::dec);
if (hard)
return sameThread;
else
commit b1609a0087fcf4ecd074cd21d8d35e05740e7f5c
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sat Mar 18 16:36:32 2017 -0400
wsd: cleanup session and docBroker after convert-to
Change-Id: I7d9c8eeef61c23cc3f4f902b15953abd5ec6851a
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 83024cfd..08adefd0 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -581,7 +581,32 @@ bool ClientSession::handleKitToClientMessage(const char* buffer, const int lengt
}
}
- setSaveAsUrl(url);
+ if (_saveAsSocket)
+ {
+ Poco::URI resultURL(url);
+ LOG_TRC("Save-as URL: " << resultURL.toString());
+
+ // TODO: Send back error when there is no output.
+ if (!resultURL.getPath().empty())
+ {
+ const std::string mimeType = "application/octet-stream";
+ std::string encodedFilePath;
+ Poco::URI::encode(resultURL.getPath(), "", encodedFilePath);
+ LOG_TRC("Sending file: " << encodedFilePath);
+ HttpHelper::sendFile(_saveAsSocket, encodedFilePath, mimeType);
+ }
+
+ // Conversion is done, cleanup this fake session.
+ LOG_TRC("Removing save-as ClientSession after conversion.");
+
+ // Remove us.
+ docBroker->removeSession(getId());
+
+ // Now terminate.
+ auto lock = docBroker->getLock();
+ docBroker->terminateChild(lock, "");
+ }
+
return true;
}
else if (tokens.size() == 2 && tokens[0] == "statechanged:")
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index 90558b72..664f30d5 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -89,26 +89,12 @@ public:
_senderQueue.stop();
}
+ /// Set the save-as socket which is used to send convert-to results.
void setSaveAsSocket(const std::shared_ptr<StreamSocket>& socket)
{
_saveAsSocket = socket;
}
- void setSaveAsUrl(const std::string& url)
- {
- Poco::URI resultURL(url);
- LOG_TRC("Save-as URL: " << resultURL.toString());
-
- if (!resultURL.getPath().empty())
- {
- const std::string mimeType = "application/octet-stream";
- std::string encodedFilePath;
- Poco::URI::encode(resultURL.getPath(), "", encodedFilePath);
- LOG_TRC("Sending file: " << encodedFilePath);
- HttpHelper::sendFile(_saveAsSocket, encodedFilePath, mimeType);
- }
- }
-
std::shared_ptr<DocumentBroker> getDocumentBroker() const { return _docBroker.lock(); }
/// Exact URI (including query params - access tokens etc.) with which
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 10a89233..e1362ec6 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -1191,7 +1191,7 @@ bool DocumentBroker::forwardToClient(const std::shared_ptr<Message>& payload)
const auto& data = payload->data().data();
const auto& size = payload->size();
- std::unique_lock<std::mutex> lock(_mutex);
+ // std::unique_lock<std::mutex> lock(_mutex);
if (sid == "all")
{
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 55e52ccd..af5854f1 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1539,7 +1539,8 @@ private:
{
// We should never destroy the broker, since
// it owns us and will wait on this thread.
- // FIXME: loolnb - check that comment !
+ // This is true with non-blocking since this is
+ // called from DocumentBroker::pollThread.
assert(docBroker.use_count() > 1);
docBroker->handleInput(data);
return;
More information about the Libreoffice-commits
mailing list