[Libreoffice-commits] online.git: loolwsd/DocumentBroker.cpp loolwsd/DocumentBroker.hpp loolwsd/LOOLKit.cpp loolwsd/LOOLWSD.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Apr 4 04:08:19 UTC 2016


 loolwsd/DocumentBroker.cpp |    9 +++
 loolwsd/DocumentBroker.hpp |   81 ++++++++++++++++++++++++++++++++++-
 loolwsd/LOOLKit.cpp        |   68 +++++++++++++++++++++++++++++
 loolwsd/LOOLWSD.cpp        |  103 +++++++++++----------------------------------
 4 files changed, 182 insertions(+), 79 deletions(-)

New commits:
commit d5e2f647901bf6b764afdae4c6b2922527718b7f
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Apr 3 21:40:14 2016 -0400

    loolwsd: WSD <-> Child direct communication
    
    WSD now communicates on a WebSocket directly
    with kit processes. ChildProcess encapsulates
    kit processes and the control WS, which itself
    is owned by DocumentBroker.
    
    Change-Id: Ica209aaa07974739b8e51a14e11325d084e193f6
    Reviewed-on: https://gerrit.libreoffice.org/23789
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/DocumentBroker.cpp b/loolwsd/DocumentBroker.cpp
index c36067e..393ba9a 100644
--- a/loolwsd/DocumentBroker.cpp
+++ b/loolwsd/DocumentBroker.cpp
@@ -64,11 +64,13 @@ std::string DocumentBroker::getDocKey(const Poco::URI& uri)
 
 DocumentBroker::DocumentBroker(const Poco::URI& uriPublic,
                                const std::string& docKey,
-                               const std::string& childRoot) :
+                               const std::string& childRoot,
+                               std::shared_ptr<ChildProcess> childProcess) :
     _uriPublic(uriPublic),
     _docKey(docKey),
     _childRoot(childRoot),
     _cacheRoot(getCachePath(uriPublic.toString())),
+    _childProcess(childProcess),
     _sessionsCount(0)
 {
     assert(!_docKey.empty());
@@ -163,6 +165,11 @@ void DocumentBroker::addWSSession(const std::string id, std::shared_ptr<MasterPr
     {
         Log::warn("DocumentBroker: Trying to add already existed session.");
     }
+
+    // Request a new session from the child kit.
+    const std::string aMessage = "session " + id + " " + _docKey + "\n";
+    Log::debug("DocBroker to Child: " + aMessage.substr(0, aMessage.length() - 1));
+    _childProcess->getWebSocket()->sendFrame(aMessage.data(), aMessage.size());
 }
 
 void DocumentBroker::removeWSSession(const std::string id)
diff --git a/loolwsd/DocumentBroker.hpp b/loolwsd/DocumentBroker.hpp
index 3d848de..458090f 100644
--- a/loolwsd/DocumentBroker.hpp
+++ b/loolwsd/DocumentBroker.hpp
@@ -10,6 +10,8 @@
 #ifndef INCLUDED_DOCUMENTBROKER_HPP
 #define INCLUDED_DOCUMENTBROKER_HPP
 
+#include <signal.h>
+
 #include <atomic>
 #include <memory>
 #include <mutex>
@@ -25,6 +27,81 @@
 class StorageBase;
 class TileCache;
 
+/// Represents a new LOK child that is read
+/// to host a document.
+class ChildProcess
+{
+public:
+    ChildProcess() :
+        _pid(-1)
+    {
+    }
+
+    /// pid is the process ID of the child.
+    /// ws is the control WebSocket to the child.
+    ChildProcess(const Poco::Process::PID pid, const std::shared_ptr<Poco::Net::WebSocket>& ws) :
+        _pid(pid),
+        _ws(ws)
+    {
+        Log::info("ChildProcess ctor [" + std::to_string(_pid) + "].");
+    }
+
+    ChildProcess(ChildProcess&& other) :
+        _pid(other._pid),
+        _ws(other._ws)
+    {
+        Log::info("ChildProcess move ctor [" + std::to_string(_pid) + "].");
+        other._pid = -1;
+        other._ws.reset();
+    }
+
+    const ChildProcess& operator=(ChildProcess&& other)
+    {
+        Log::info("ChildProcess assign [" + std::to_string(_pid) + "].");
+        _pid = other._pid;
+        other._pid = -1;
+        _ws = other._ws;
+        other._ws.reset();
+
+        return *this;
+    }
+
+    ~ChildProcess()
+    {
+        Log::info("~ChildProcess dtor [" + std::to_string(_pid) + "].");
+        close(true);
+    }
+
+    void close(const bool rude)
+    {
+        Log::info("Closing child [" + std::to_string(_pid) + "].");
+        if (_pid != -1)
+        {
+            if (kill(_pid, SIGINT) != 0 && rude && kill(_pid, 0) != 0)
+            {
+                Log::error("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning.");
+            }
+
+            //TODO: Notify Admin.
+            std::ostringstream message;
+            message << "rmdoc" << " "
+                    << _pid << " "
+                    << "\n";
+            //IoUtil::writeFIFO(WriterNotify, message.str());
+           _pid = -1;
+        }
+
+        _ws.reset();
+    }
+
+    Poco::Process::PID getPid() const { return _pid; }
+    std::shared_ptr<Poco::Net::WebSocket> getWebSocket() const { return _ws; }
+
+private:
+    Poco::Process::PID _pid;
+    std::shared_ptr<Poco::Net::WebSocket> _ws;
+};
+
 /// DocumentBroker is responsible for setting up a document
 /// in jail and brokering loading it from Storage
 /// and saving it back.
@@ -43,7 +120,8 @@ public:
 
     DocumentBroker(const Poco::URI& uriPublic,
                    const std::string& docKey,
-                   const std::string& childRoot);
+                   const std::string& childRoot,
+                   std::shared_ptr<ChildProcess> childProcess);
 
     ~DocumentBroker()
     {
@@ -94,6 +172,7 @@ private:
     std::string _filename;
     std::unique_ptr<StorageBase> _storage;
     std::unique_ptr<TileCache> _tileCache;
+    std::shared_ptr<ChildProcess> _childProcess;
     std::mutex _mutex;
     std::atomic<unsigned> _sessionsCount;
 };
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index cdf2ca0..149ddff 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -1016,7 +1016,75 @@ void lokit_main(const std::string& childRoot,
 
         Log::info("loolkit [" + std::to_string(Process::id()) + "] is ready.");
         if (doBenchmark)
+        {
             IoUtil::writeFIFO(writerBroker, "started\n");
+        }
+
+        // Open websocket connection between the child process and WSD.
+        Poco::Net::HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER);
+        cs.setTimeout(0);
+        HTTPRequest request(HTTPRequest::HTTP_GET, std::string(NEW_CHILD_URI) + "pid=" + pid);
+        Poco::Net::HTTPResponse response;
+        auto ws = std::make_shared<WebSocket>(cs, request, response);
+        ws->setReceiveTimeout(0);
+
+        const std::string socketName = "ChildControllerWS";
+        IoUtil::SocketProcessor(ws, response, [&socketName, &ws, &document, &loKit](const std::vector<char>& data)
+                {
+                    const std::string message(data.data(), data.size());
+                    Log::debug(socketName + ": recv [" + message + "].");
+                    StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+                    auto response = std::to_string(Process::id()) + " ";
+
+                    if (TerminationFlag)
+                    {
+                        // Too late, we're going down.
+                        response += "down\n";
+                    }
+                    else if (tokens[0] == "session")
+                    {
+                        const std::string& sessionId = tokens[1];
+                        const unsigned intSessionId = Util::decodeId(sessionId);
+                        const std::string& docKey = tokens[2];
+
+                        std::string url;
+                        Poco::URI::decode(docKey, url);
+                        Log::info("New session [" + sessionId + "] request on url [" + url + "].");
+
+                        if (!document)
+                        {
+                            document = std::make_shared<Document>(loKit, jailId, docKey, url);
+                        }
+
+                        // Validate and create session.
+                        if (url == document->getUrl() &&
+                            document->createSession(sessionId, intSessionId))
+                        {
+                            response += "ok\n";
+                        }
+                        else
+                        {
+                            response += "bad\n";
+                        }
+                    }
+                    else if (document && document->canDiscard())
+                    {
+                        TerminationFlag = true;
+                        response += "down\n";
+                    }
+                    else
+                    {
+                        response += "bad unknown token [" + tokens[0] + "]\n";
+                    }
+
+                    //FIXME: Do we really need to respond here?
+                    Log::trace("KitToDocBroker: " + response.substr(0, response.length()-2));
+                    ws->sendFrame(response.data(), response.size());
+
+                    return true;
+                },
+                [](){ return TerminationFlag; },
+                socketName);
 
         char buffer[READ_BUFFER_SIZE];
         std::string message;
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index ded9089..0332e48 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -173,76 +173,6 @@ using Poco::XML::InputSource;
 using Poco::XML::Node;
 using Poco::XML::NodeList;
 
-/// Represents a new LOK child that is read
-/// to host a document.
-class ChildProcess
-{
-public:
-    ChildProcess() :
-        _pid(-1)
-    {
-    }
-
-    /// pid is the process ID of the child.
-    /// ws is the control WebSocket to the child.
-    ChildProcess(const Poco::Process::PID pid, const std::shared_ptr<Poco::Net::WebSocket>& ws) :
-        _pid(pid),
-        _ws(ws)
-    {
-    }
-
-    ChildProcess(ChildProcess&& other) :
-        _pid(other._pid),
-        _ws(other._ws)
-    {
-        other._pid = -1;
-        other._ws.reset();
-    }
-
-    const ChildProcess& operator=(ChildProcess&& other)
-    {
-        _pid = other._pid;
-        other._pid = -1;
-        _ws = other._ws;
-        other._ws.reset();
-
-        return *this;
-    }
-
-    ~ChildProcess()
-    {
-        close(true);
-    }
-
-    void close(const bool rude)
-    {
-        if (_pid != -1)
-        {
-            if (kill(_pid, SIGINT) != 0 && rude && kill(_pid, 0) != 0)
-            {
-                Log::error("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning.");
-            }
-
-            //TODO: Notify Admin.
-            std::ostringstream message;
-            message << "rmdoc" << " "
-                    << _pid << " "
-                    << "\n";
-            //IoUtil::writeFIFO(WriterNotify, message.str());
-           _pid = -1;
-        }
-
-        _ws.reset();
-    }
-
-    Poco::Process::PID getPid() const { return _pid; }
-    std::shared_ptr<Poco::Net::WebSocket> getWebSocket() const { return _ws; }
-
-private:
-    Poco::Process::PID _pid;
-    std::shared_ptr<Poco::Net::WebSocket> _ws;
-};
-
 /// New LOK child processes ready to host documents.
 static std::vector<std::shared_ptr<ChildProcess>> newChilds;
 static std::mutex newChildsMutex;
@@ -337,9 +267,21 @@ private:
                 if (!format.empty())
                 {
                     Log::info("Conversion request for URI [" + fromPath + "].");
+
+                    // Request a kit process for this doc.
+                    auto child = getNewChild();
+                    if (!child)
+                    {
+                        // Let the client know we can't serve now.
+                        response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
+                        response.setContentLength(0);
+                        response.send();
+                        return;
+                    }
+
                     auto uriPublic = DocumentBroker::sanitizeURI(fromPath);
                     const auto docKey = DocumentBroker::getDocKey(uriPublic);
-                    auto docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot);
+                    auto docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child);
 
                     // This lock could become a bottleneck.
                     // In that case, we can use a pool and index by publicPath.
@@ -517,9 +459,20 @@ private:
         }
         else
         {
+            // Request a kit process for this doc.
+            auto child = getNewChild();
+            if (!child)
+            {
+                // Let the client know we can't serve now.
+                response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
+                response.setContentLength(0);
+                response.send();
+                return;
+            }
+
             // Set one we just created.
             Log::debug("New DocumentBroker for docKey [" + docKey + "].");
-            docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot);
+            docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child);
             docBrokers.emplace(docKey, docBroker);
         }
 
@@ -543,11 +496,6 @@ private:
         if (wsSessionsCount == 1)
             session->setEditLock(true);
 
-        // Request a kit process for this doc.
-        const std::string aMessage = "request " + id + " " + docKey + "\n";
-        Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1));
-        IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage);
-
         QueueHandler handler(queue, session, "wsd_queue_" + session->getId());
 
         Thread queueHandlerThread;
@@ -722,6 +670,7 @@ public:
             auto ws = std::make_shared<WebSocket>(request, response);
             std::unique_lock<std::mutex> lock(newChildsMutex);
             newChilds.emplace_back(std::make_shared<ChildProcess>(pid, ws));
+            Log::info("Have " + std::to_string(newChilds.size()) + " childs.");
             return;
         }
 


More information about the Libreoffice-commits mailing list