[Libreoffice-commits] online.git: 2 commits - loolwsd/LOOLKit.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Oct 10 06:32:37 UTC 2016
loolwsd/LOOLKit.cpp | 254 ++++++++--------------------------------------------
1 file changed, 42 insertions(+), 212 deletions(-)
New commits:
commit e0ff6eef6b636789b0f7e849df77a6047fec5bae
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Oct 9 17:18:29 2016 -0400
loolwsd: kill Connection object in child
Change-Id: Ic4d0d3e7286272a0765b299824dfa3556fe56f4b
Reviewed-on: https://gerrit.libreoffice.org/29652
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 95d3f8c..852be7a 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -258,123 +258,6 @@ namespace
#endif
}
-/// Connection thread with a client (via WSD).
-class Connection: public Runnable
-{
-public:
- Connection(std::shared_ptr<ChildSession> session,
- std::shared_ptr<WebSocket> ws) :
- _sessionId(session->getId()),
- _session(std::move(session)),
- _ws(std::move(ws)),
- _threadMutex(),
- _joined(false)
- {
- Log::info("Connection ctor in child for " + _sessionId);
- }
-
- ~Connection()
- {
- Log::info("~Connection dtor in child for " + _sessionId);
- stop();
- join();
- }
-
- const std::string& getSessionId() const { return _sessionId; };
- std::shared_ptr<WebSocket> getWebSocket() const { return _ws; }
- std::shared_ptr<ChildSession> getSession() { return _session; }
-
- void start()
- {
- _thread.start(*this);
-
- // Busy-wait until we run.
- // This is important to make sure we can process
- // callbacks, which if we're late to start will
- // be dropped. No need for async notification here.
- constexpr auto delay = COMMAND_TIMEOUT_MS / 20;
- for (auto i = 0; i < 20 && !isRunning(); ++i)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(delay));
- }
- }
-
- bool isRunning()
- {
- return _thread.isRunning();
- }
-
- void stop()
- {
- // What should we do here?
- }
-
- void join()
- {
- // The thread is joinable only once.
- std::unique_lock<std::mutex> lock(_threadMutex);
- if (!_joined)
- {
- _thread.join();
- _joined = true;
- }
- }
-
- void run() override
- {
- Util::setThreadName("kit_ws_" + _sessionId);
-
- Log::debug("Thread started.");
- try
- {
- IoUtil::SocketProcessor(_ws,
- [this](const std::vector<char>& payload)
- {
- if (!_session->handleInput(payload.data(), payload.size()))
- {
- Log::info("Socket handler flagged for finishing.");
- return false;
- }
-
- return true;
- },
- [this]() { _session->closeFrame(); },
- []() { return !!TerminationFlag; });
-
- if (_session->isCloseFrame())
- {
- Log::trace("Normal close handshake.");
- _ws->shutdown();
- }
- else
- {
- Log::trace("Abnormal close handshake.");
- _ws->shutdown(WebSocket::WS_ENDPOINT_GOING_AWAY, SERVICE_UNAVALABLE_INTERNAL_ERROR);
- }
- }
- catch (const Exception& exc)
- {
- Log::error() << "Connection::run: Exception: " << exc.displayText()
- << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
- << Log::end;
- }
- catch (const std::exception& exc)
- {
- Log::error(std::string("Connection::run: Exception: ") + exc.what());
- }
-
- Log::debug("Thread finished.");
- }
-
-private:
- const std::string _sessionId;
- Thread _thread;
- std::shared_ptr<ChildSession> _session;
- std::shared_ptr<WebSocket> _ws;
- std::mutex _threadMutex;
- std::atomic<bool> _joined;
-};
-
/// A document container.
/// Owns LOKitDocument instance and connections.
/// Manages the lifetime of a document.
@@ -464,10 +347,10 @@ public:
if (!_sessions.emplace(sessionId, session).second)
{
- Log::error("Connection already exists for child: " + _jailId + ", session: " + sessionId);
+ Log::error("Session already exists for child: " + _jailId + ", session: " + sessionId);
}
- Log::debug("Connections: " + std::to_string(_sessions.size()));
+ Log::debug("Sessions: " + std::to_string(_sessions.size()));
return true;
}
catch (const std::exception& ex)
@@ -539,7 +422,7 @@ public:
/// Returns true if at least one *live* connection exists.
/// Does not consider user activity, just socket status.
- bool hasConnections()
+ bool hasSessions()
{
// -ve values for failure.
return purgeSessions() != 0;
@@ -550,7 +433,7 @@ public:
bool canDiscard()
{
//TODO: Implement proper time-out on inactivity.
- return !hasConnections();
+ return !hasSessions();
}
/// Set Document password for given URL
@@ -1296,7 +1179,7 @@ private:
}
else
{
- Log::error() << "Connection thread for session " << session->getId() << " for view "
+ Log::error() << "Session thread for session " << session->getId() << " for view "
<< viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type)
<< "] payload [" << payload << "]." << Log::end;
}
commit e33eff4abdc4a3145b5bf7a9b6def2fce63d64a6
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Oct 9 17:05:24 2016 -0400
loolwsd: cleanup of Connection in ChildSession
Change-Id: I07636163df7b2973dada55b9704abf7105ad285f
Reviewed-on: https://gerrit.libreoffice.org/29651
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 17f7f3f..95d3f8c 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -428,62 +428,21 @@ public:
_tileQueue->put("eof");
_callbackThread.join();
-
- // Flag all connections to stop.
- for (auto aIterator : _connections)
- {
- aIterator.second->stop();
- }
-
- // Destroy all connections and views.
- for (auto aIterator : _connections)
- {
- try
- {
- // stop all websockets
- if (aIterator.second->isRunning())
- {
- std::shared_ptr<WebSocket> ws = aIterator.second->getWebSocket();
- if (ws)
- {
- ws->shutdownReceive();
- aIterator.second->join();
- }
- }
- }
- catch(NetException& exc)
- {
- Log::error() << "Document::~Document: " << exc.displayText()
- << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
- << Log::end;
- }
- }
-
- // Destroy all connections and views.
- _connections.clear();
}
const std::string& getUrl() const { return _url; }
- bool createSession(const std::string& sessionId, const unsigned intSessionId)
+ bool createSession(const std::string& sessionId)
{
std::unique_lock<std::mutex> lock(_mutex);
try
{
- const auto& it = _connections.find(intSessionId);
- if (it != _connections.end())
+ const auto& it = _sessions.find(sessionId);
+ if (it != _sessions.end())
{
- // found item, check if still running
- if (it->second->isRunning())
- {
- Log::warn("Session [" + sessionId + "] is already running.");
- return true;
- }
-
- // Restore thread. TODO: Review this logic.
- Log::warn("Session [" + sessionId + "] is not running. Restoring.");
- _connections.erase(intSessionId);
+ Log::warn("Session [" + sessionId + "] is already running.");
+ return true;
}
Log::info() << "Creating " << (_clientViews ? "new" : "first")
@@ -503,18 +462,12 @@ public:
auto session = std::make_shared<ChildSession>(sessionId, ws, _jailId, *this);
- auto thread = std::make_shared<Connection>(session, ws);
- const auto aInserted = _connections.emplace(intSessionId, thread);
- if (aInserted.second)
- {
- thread->start();
- }
- else
+ if (!_sessions.emplace(sessionId, session).second)
{
Log::error("Connection already exists for child: " + _jailId + ", session: " + sessionId);
}
- Log::debug("Connections: " + std::to_string(_connections.size()));
+ Log::debug("Connections: " + std::to_string(_sessions.size()));
return true;
}
catch (const std::exception& ex)
@@ -531,7 +484,7 @@ public:
{
std::vector<std::shared_ptr<ChildSession>> deadSessions;
size_t numRunning = 0;
- size_t num_connections = 0;
+ size_t num_sessions = 0;
{
std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
if (!lock.try_lock())
@@ -544,20 +497,20 @@ public:
// bluntly exit, no need to clean up our own data structures. Also, there is a bug that
// causes the deadSessions.clear() call below to crash in some situations when the last
// session is being removed.
- for (auto it = _connections.cbegin(); it != _connections.cend(); ++it)
+ for (auto it = _sessions.cbegin(); it != _sessions.cend(); ++it)
{
- if (it->second->isRunning())
+ if (!it->second->isCloseFrame())
numRunning++;
}
if (numRunning > 0)
{
- for (auto it = _connections.cbegin(); it != _connections.cend(); )
+ for (auto it = _sessions.cbegin(); it != _sessions.cend(); )
{
- if (!it->second->isRunning())
+ if (it->second->isCloseFrame())
{
- deadSessions.push_back(it->second->getSession());
- it = _connections.erase(it);
+ deadSessions.push_back(it->second);
+ it = _sessions.erase(it);
}
else
{
@@ -566,7 +519,7 @@ public:
}
}
- num_connections = _connections.size();
+ num_sessions = _sessions.size();
}
if (numRunning == 0)
@@ -581,7 +534,7 @@ public:
// and the dtor tries to take its lock (which is taken).
deadSessions.clear();
- return num_connections;
+ return num_sessions;
}
/// Returns true if at least one *live* connection exists.
@@ -1027,11 +980,11 @@ private:
std::unique_lock<std::mutex> lock(_mutex);
std::map<int, std::string> viewInfo;
- for (auto& connection : _connections)
+ for (auto& pair : _sessions)
{
- if (connection.second->isRunning())
+ const auto session = pair.second;
+ if (!session->isCloseFrame())
{
- const auto session = connection.second->getSession();
const auto viewId = session->getViewId();
viewInfo[viewId] = session->getViewUserName();
}
@@ -1083,15 +1036,12 @@ private:
viewInfoArray->stringify(ossViewInfo);
// Broadcast updated viewinfo to all _active_ connections
- for (auto& connectionIt: _connections)
+ for (auto& pair : _sessions)
{
- if (connectionIt.second->isRunning())
+ const auto session = pair.second;
+ if (!session->isCloseFrame() && session->isActive())
{
- auto session = connectionIt.second->getSession();
- if (session->isActive())
- {
- session->sendTextFrame("viewinfo: " + ossViewInfo.str());
- }
+ session->sendTextFrame("viewinfo: " + ossViewInfo.str());
}
}
}
@@ -1105,15 +1055,14 @@ private:
const std::string& renderOpts,
const bool haveDocPassword)
{
- const unsigned intSessionId = Util::decodeId(sessionId);
- const auto it = _connections.find(intSessionId);
- if (it == _connections.end() || !it->second)
+ const auto it = _sessions.find(sessionId);
+ if (it == _sessions.end() || !it->second)
{
Log::error("Cannot find session [" + sessionId + "].");
return nullptr;
}
- auto session = it->second->getSession();
+ auto session = it->second;
int viewId = 0;
std::unique_lock<std::mutex> lockLokDoc;
@@ -1257,21 +1206,20 @@ private:
Log::trace("Forwarding payload to " + prefix + ' ' + message);
std::string name;
- std::string value;
- if (LOOLProtocol::parseNameValuePair(prefix, name, value, '-') && name == "child")
+ std::string viewId;
+ if (LOOLProtocol::parseNameValuePair(prefix, name, viewId, '-') && name == "child")
{
- const unsigned viewId = Util::decodeId(value);
- const auto it = _connections.find(viewId);
- if (it != _connections.end())
+ const auto it = _sessions.find(viewId);
+ if (it != _sessions.end())
{
if (message == "disconnect")
{
- Log::debug("Removing ChildSession " + value);
- _connections.erase(it);
+ Log::debug("Removing ChildSession " + viewId);
+ _sessions.erase(it);
return true;
}
- auto session = it->second->getSession();
+ auto session = it->second;
if (session)
{
return session->handleInput(message.data(), message.size());
@@ -1336,19 +1284,19 @@ private:
// Forward the callback to the same view, demultiplexing is done by the LibreOffice core.
// TODO: replace with a map to be faster.
bool isFound = false;
- for (auto& it : _connections)
+ for (auto& it : _sessions)
{
- auto session = it.second->getSession();
+ auto session = it.second;
if (session && ((session->getViewId() == viewId) || (viewId == -1)))
{
- if (it.second->isRunning())
+ if (!it.second->isCloseFrame())
{
isFound = true;
session->loKitCallback(type, payload);
}
else
{
- Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view "
+ Log::error() << "Connection thread for session " << session->getId() << " for view "
<< viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type)
<< "] payload [" << payload << "]." << Log::end;
}
@@ -1404,7 +1352,7 @@ private:
std::condition_variable _cvLoading;
std::atomic_size_t _isLoading;
std::map<int, std::unique_ptr<CallbackDescriptor>> _viewIdToCallbackDescr;
- std::map<unsigned, std::shared_ptr<Connection>> _connections;
+ std::map<std::string, std::shared_ptr<ChildSession>> _sessions;
Poco::Thread _callbackThread;
std::atomic_size_t _clientViews;
};
@@ -1628,7 +1576,6 @@ void lokit_main(const std::string& childRoot,
else if (tokens[0] == "session")
{
const std::string& sessionId = tokens[1];
- const unsigned intSessionId = Util::decodeId(sessionId);
const std::string& docKey = tokens[2];
std::string url;
@@ -1642,7 +1589,7 @@ void lokit_main(const std::string& childRoot,
// Validate and create session.
if (!(url == document->getUrl() &&
- document->createSession(sessionId, intSessionId)))
+ document->createSession(sessionId)))
{
Log::debug("CreateSession failed.");
}
More information about the Libreoffice-commits
mailing list