[Libreoffice-commits] online.git: loolwsd/DocumentBroker.cpp loolwsd/DocumentBroker.hpp loolwsd/LOOLWSD.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Apr 25 02:16:34 UTC 2016


 loolwsd/DocumentBroker.cpp |   17 +++++++
 loolwsd/DocumentBroker.hpp |    2 
 loolwsd/LOOLWSD.cpp        |  106 ++++++++++++---------------------------------
 3 files changed, 48 insertions(+), 77 deletions(-)

New commits:
commit 4f7b911066bd29d5901b2724b85aae77258b73eb
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Apr 24 22:09:13 2016 -0400

    loolwsd: simplified the bridging between client and prisoner sessions
    
    Change-Id: I1335060963eda3356312f42060da229f43d239d8
    Reviewed-on: https://gerrit.libreoffice.org/24358
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/DocumentBroker.cpp b/loolwsd/DocumentBroker.cpp
index 31de15e..fc7d887 100644
--- a/loolwsd/DocumentBroker.cpp
+++ b/loolwsd/DocumentBroker.cpp
@@ -292,6 +292,23 @@ size_t DocumentBroker::addSession(std::shared_ptr<MasterProcessSession>& session
     return _sessions.size();
 }
 
+bool DocumentBroker::connectPeers(std::shared_ptr<MasterProcessSession>& session)
+{
+    const auto id = session->getId();
+
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    auto it = _sessions.find(id);
+    if (it != _sessions.end())
+    {
+        it->second->setPeer(session);
+        session->setPeer(it->second);
+        return true;
+    }
+
+    return false;
+}
+
 size_t DocumentBroker::removeSession(const std::string& id)
 {
     std::lock_guard<std::mutex> lock(_mutex);
diff --git a/loolwsd/DocumentBroker.hpp b/loolwsd/DocumentBroker.hpp
index dcf2d2a..5a5298f 100644
--- a/loolwsd/DocumentBroker.hpp
+++ b/loolwsd/DocumentBroker.hpp
@@ -194,6 +194,8 @@ public:
 
     /// Add a new session. Returns the new number of sessions.
     size_t addSession(std::shared_ptr<MasterProcessSession>& session);
+    /// Connect a prison session to its client peer.
+    bool connectPeers(std::shared_ptr<MasterProcessSession>& session);
     /// Removes a session by ID. Returns the new number of sessions.
     size_t removeSession(const std::string& id);
 
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 675444e..068f25c 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -271,51 +271,28 @@ public:
 class ClientRequestHandler: public HTTPRequestHandler
 {
 private:
-
-    static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& clientSession,
-                                    const std::shared_ptr<DocumentBroker>& docBroker)
+    static void waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& session)
     {
-        int retries = 5;
         bool isFound = false;
-
-        // Wait until the client has connected with a prison socket.
-        std::shared_ptr<MasterProcessSession> prisonSession;
         std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
+        Log::debug() << "Waiting for client session [" << session->getId() << "] to connect." << Log::end;
+        AvailableChildSessionCV.wait_for(
+            lock,
+            std::chrono::milliseconds(COMMAND_TIMEOUT_MS),
+            [&isFound, &session]
+            {
+                return (isFound = AvailableChildSessions.find(session->getId()) != AvailableChildSessions.end());
+            });
 
-        Log::debug() << "Waiting for client session [" << clientSession->getId() << "] to connect." << Log::end;
-        while (!TerminationFlag && retries-- && !isFound)
-        {
-            AvailableChildSessionCV.wait_for(
-                lock,
-                std::chrono::milliseconds(COMMAND_TIMEOUT_MS),
-                [&isFound, &clientSession]
-                {
-                    return (isFound = AvailableChildSessions.find(clientSession->getId()) != AvailableChildSessions.end());
-                });
-
-                if (!isFound)
-                {
-                    //FIXME: outdated!
-                    Log::info() << "Retrying client permission... " << retries << Log::end;
-                    // request again new URL session
-                    const std::string message = "request " + clientSession->getId() + " " + docBroker->getDocKey() + '\n';
-                    Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
-                    IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message);
-                }
-        }
-
-        if (isFound)
+        if (!isFound)
         {
-            Log::debug("Waiting child session permission, done!");
-            prisonSession = AvailableChildSessions[clientSession->getId()];
-            AvailableChildSessions.erase(clientSession->getId());
-
-            clientSession->setPeer(prisonSession);
-            prisonSession->setPeer(clientSession);
-            Log::debug("Connected " + clientSession->getName() + " - " + prisonSession->getName() + ".");
+            // Let the client know we can't serve now.
+            Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now.");
+            throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR);
         }
 
-        return isFound;
+        Log::debug("Waiting child session permission, done!");
+        AvailableChildSessions.erase(session->getId());
     }
 
     /// Handle POST requests.
@@ -362,15 +339,16 @@ private:
                     // Load the document.
                     std::shared_ptr<WebSocket> ws;
                     auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, nullptr);
+
+                    // Request the child to connect to us and add this session.
                     auto sessionsCount = docBroker->addSession(session);
+                    Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount));
+
                     lock.unlock();
                     Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount));
 
-                    if (!waitBridgeCompleted(session, docBroker))
-                    {
-                        // Let the client know we can't serve now.
-                        throw std::runtime_error("Failed to connect to lokit child.");
-                    }
+                    // Wait until the client has connected with a prison socket.
+                    waitBridgeCompleted(session);
                     // Now the bridge between the client and kit processes is connected
                     // Let messages flow
 
@@ -588,6 +566,8 @@ private:
             // thread to pump them. This is to empty the queue when we get a "canceltiles" message.
             auto queue = std::make_shared<BasicTileQueue>();
             session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, queue);
+
+            // Request the child to connect to us and add this session.
             const auto sessionsCount = docBroker->addSession(session);
             Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount));
 
@@ -595,15 +575,11 @@ private:
             status = "statusindicator: connect";
             ws->sendFrame(status.data(), (int) status.size());
 
-            if (!waitBridgeCompleted(session, docBroker))
-            {
-                // Let the client know we can't serve now.
-                Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now.");
-                throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR);
-            }
-
+            // Wait until the client has connected with a prison socket.
+            waitBridgeCompleted(session);
             // Now the bridge beetween the client and kit process is connected
             // Let messages flow
+
             status = "statusindicator: ready";
             ws->sendFrame(status.data(), (int) status.size());
 
@@ -826,24 +802,6 @@ class PrisonerRequestHandler: public HTTPRequestHandler
 {
 public:
 
-    static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& prisonSession)
-    {
-        // time to live, if the kit process cannot connect to a client session.
-        int ttl = 180;
-        bool isFound = true;
-        // Wait until the prison has connected with a client socket.
-        Log::debug() << "Waiting for prison session [" << prisonSession->getId() << "] to connect." << Log::end;
-        while (!TerminationFlag &&
-                (isFound = AvailableChildSessions.find(prisonSession->getId()) != AvailableChildSessions.end()) &&
-                ttl--)
-        {
-            std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS));
-            Log::debug() << "Sleeping prison session [" << prisonSession->getId() << "] to connect." << Log::end;
-        }
-
-        return isFound;
-    }
-
     void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
     {
         if (UnitWSD::get().filterHandleRequest(
@@ -959,6 +917,9 @@ public:
             auto ws = std::make_shared<WebSocket>(request, response);
             auto session = std::make_shared<MasterProcessSession>(sessionId, LOOLSession::Kind::ToPrisoner, ws, docBroker, nullptr);
 
+            // Connect the prison session to the client.
+            docBroker->connectPeers(session);
+
             std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
             AvailableChildSessions.emplace(sessionId, session);
 
@@ -971,15 +932,6 @@ public:
             Log::info("Adding doc " + docKey + " to Admin");
             Admin::instance().addDoc(docKey, pid, docBroker->getFilename(), sessionId);
 
-            if (waitBridgeCompleted(session))
-            {
-                ws->shutdown();
-                throw WebSocketException("Failed to connect to client session", WebSocket::WS_ENDPOINT_GOING_AWAY);
-            }
-            Log::debug("Connected " + session->getName() + ".");
-            // Now the bridge beetween the prison and the client is connected
-            // Let messages flow
-
             UnitWSD::get().onChildConnected(pid, sessionId);
 
             IoUtil::SocketProcessor(ws,


More information about the Libreoffice-commits mailing list