[Libreoffice-commits] online.git: loolwsd/DocumentBroker.cpp loolwsd/DocumentBroker.hpp loolwsd/LOOLWSD.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Apr 25 02:16:34 UTC 2016
loolwsd/DocumentBroker.cpp | 17 +++++++
loolwsd/DocumentBroker.hpp | 2
loolwsd/LOOLWSD.cpp | 106 ++++++++++++---------------------------------
3 files changed, 48 insertions(+), 77 deletions(-)
New commits:
commit 4f7b911066bd29d5901b2724b85aae77258b73eb
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Apr 24 22:09:13 2016 -0400
loolwsd: simplified the bridging between client and prisoner sessions
Change-Id: I1335060963eda3356312f42060da229f43d239d8
Reviewed-on: https://gerrit.libreoffice.org/24358
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/DocumentBroker.cpp b/loolwsd/DocumentBroker.cpp
index 31de15e..fc7d887 100644
--- a/loolwsd/DocumentBroker.cpp
+++ b/loolwsd/DocumentBroker.cpp
@@ -292,6 +292,23 @@ size_t DocumentBroker::addSession(std::shared_ptr<MasterProcessSession>& session
return _sessions.size();
}
+bool DocumentBroker::connectPeers(std::shared_ptr<MasterProcessSession>& session)
+{
+ const auto id = session->getId();
+
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ auto it = _sessions.find(id);
+ if (it != _sessions.end())
+ {
+ it->second->setPeer(session);
+ session->setPeer(it->second);
+ return true;
+ }
+
+ return false;
+}
+
size_t DocumentBroker::removeSession(const std::string& id)
{
std::lock_guard<std::mutex> lock(_mutex);
diff --git a/loolwsd/DocumentBroker.hpp b/loolwsd/DocumentBroker.hpp
index dcf2d2a..5a5298f 100644
--- a/loolwsd/DocumentBroker.hpp
+++ b/loolwsd/DocumentBroker.hpp
@@ -194,6 +194,8 @@ public:
/// Add a new session. Returns the new number of sessions.
size_t addSession(std::shared_ptr<MasterProcessSession>& session);
+ /// Connect a prison session to its client peer.
+ bool connectPeers(std::shared_ptr<MasterProcessSession>& session);
/// Removes a session by ID. Returns the new number of sessions.
size_t removeSession(const std::string& id);
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 675444e..068f25c 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -271,51 +271,28 @@ public:
class ClientRequestHandler: public HTTPRequestHandler
{
private:
-
- static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& clientSession,
- const std::shared_ptr<DocumentBroker>& docBroker)
+ static void waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& session)
{
- int retries = 5;
bool isFound = false;
-
- // Wait until the client has connected with a prison socket.
- std::shared_ptr<MasterProcessSession> prisonSession;
std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
+ Log::debug() << "Waiting for client session [" << session->getId() << "] to connect." << Log::end;
+ AvailableChildSessionCV.wait_for(
+ lock,
+ std::chrono::milliseconds(COMMAND_TIMEOUT_MS),
+ [&isFound, &session]
+ {
+ return (isFound = AvailableChildSessions.find(session->getId()) != AvailableChildSessions.end());
+ });
- Log::debug() << "Waiting for client session [" << clientSession->getId() << "] to connect." << Log::end;
- while (!TerminationFlag && retries-- && !isFound)
- {
- AvailableChildSessionCV.wait_for(
- lock,
- std::chrono::milliseconds(COMMAND_TIMEOUT_MS),
- [&isFound, &clientSession]
- {
- return (isFound = AvailableChildSessions.find(clientSession->getId()) != AvailableChildSessions.end());
- });
-
- if (!isFound)
- {
- //FIXME: outdated!
- Log::info() << "Retrying client permission... " << retries << Log::end;
- // request again new URL session
- const std::string message = "request " + clientSession->getId() + " " + docBroker->getDocKey() + '\n';
- Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
- IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message);
- }
- }
-
- if (isFound)
+ if (!isFound)
{
- Log::debug("Waiting child session permission, done!");
- prisonSession = AvailableChildSessions[clientSession->getId()];
- AvailableChildSessions.erase(clientSession->getId());
-
- clientSession->setPeer(prisonSession);
- prisonSession->setPeer(clientSession);
- Log::debug("Connected " + clientSession->getName() + " - " + prisonSession->getName() + ".");
+ // Let the client know we can't serve now.
+ Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now.");
+ throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR);
}
- return isFound;
+ Log::debug("Waiting child session permission, done!");
+ AvailableChildSessions.erase(session->getId());
}
/// Handle POST requests.
@@ -362,15 +339,16 @@ private:
// Load the document.
std::shared_ptr<WebSocket> ws;
auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, nullptr);
+
+ // Request the child to connect to us and add this session.
auto sessionsCount = docBroker->addSession(session);
+ Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount));
+
lock.unlock();
Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount));
- if (!waitBridgeCompleted(session, docBroker))
- {
- // Let the client know we can't serve now.
- throw std::runtime_error("Failed to connect to lokit child.");
- }
+ // Wait until the client has connected with a prison socket.
+ waitBridgeCompleted(session);
// Now the bridge between the client and kit processes is connected
// Let messages flow
@@ -588,6 +566,8 @@ private:
// thread to pump them. This is to empty the queue when we get a "canceltiles" message.
auto queue = std::make_shared<BasicTileQueue>();
session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, queue);
+
+ // Request the child to connect to us and add this session.
const auto sessionsCount = docBroker->addSession(session);
Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount));
@@ -595,15 +575,11 @@ private:
status = "statusindicator: connect";
ws->sendFrame(status.data(), (int) status.size());
- if (!waitBridgeCompleted(session, docBroker))
- {
- // Let the client know we can't serve now.
- Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now.");
- throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR);
- }
-
+ // Wait until the client has connected with a prison socket.
+ waitBridgeCompleted(session);
// Now the bridge beetween the client and kit process is connected
// Let messages flow
+
status = "statusindicator: ready";
ws->sendFrame(status.data(), (int) status.size());
@@ -826,24 +802,6 @@ class PrisonerRequestHandler: public HTTPRequestHandler
{
public:
- static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& prisonSession)
- {
- // time to live, if the kit process cannot connect to a client session.
- int ttl = 180;
- bool isFound = true;
- // Wait until the prison has connected with a client socket.
- Log::debug() << "Waiting for prison session [" << prisonSession->getId() << "] to connect." << Log::end;
- while (!TerminationFlag &&
- (isFound = AvailableChildSessions.find(prisonSession->getId()) != AvailableChildSessions.end()) &&
- ttl--)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS));
- Log::debug() << "Sleeping prison session [" << prisonSession->getId() << "] to connect." << Log::end;
- }
-
- return isFound;
- }
-
void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
{
if (UnitWSD::get().filterHandleRequest(
@@ -959,6 +917,9 @@ public:
auto ws = std::make_shared<WebSocket>(request, response);
auto session = std::make_shared<MasterProcessSession>(sessionId, LOOLSession::Kind::ToPrisoner, ws, docBroker, nullptr);
+ // Connect the prison session to the client.
+ docBroker->connectPeers(session);
+
std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
AvailableChildSessions.emplace(sessionId, session);
@@ -971,15 +932,6 @@ public:
Log::info("Adding doc " + docKey + " to Admin");
Admin::instance().addDoc(docKey, pid, docBroker->getFilename(), sessionId);
- if (waitBridgeCompleted(session))
- {
- ws->shutdown();
- throw WebSocketException("Failed to connect to client session", WebSocket::WS_ENDPOINT_GOING_AWAY);
- }
- Log::debug("Connected " + session->getName() + ".");
- // Now the bridge beetween the prison and the client is connected
- // Let messages flow
-
UnitWSD::get().onChildConnected(pid, sessionId);
IoUtil::SocketProcessor(ws,
More information about the Libreoffice-commits
mailing list