[Libreoffice-commits] online.git: loolwsd/LOOLBroker.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Sun Jan 31 21:04:56 PST 2016


 loolwsd/LOOLBroker.cpp |   89 +++++++++++++++++++++++++++++--------------------
 1 file changed, 54 insertions(+), 35 deletions(-)

New commits:
commit 57240993b68dbc86d21e5a2bfb1a94171f6d7b35
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sat Jan 30 18:58:05 2016 -0500

    loolwsd: removed url cache in favor of unified child cache
    
    Change-Id: I44d07ea3e03a78a6038322c8110f66dd9c161957
    Reviewed-on: https://gerrit.libreoffice.org/21975
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index ce829da..f58a4f3 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -73,7 +73,6 @@ static unsigned int childCounter = 0;
 static signed numPreSpawnedChildren = 0;
 
 static std::recursive_mutex forkMutex;
-static std::map<std::string, Process::PID> _cacheURL;
 
 namespace
 {
@@ -188,18 +187,16 @@ namespace
         return (it != _childProcesses.end() ? it->second->getWritePipe() : -1);
     }
 
-    /// Safely removes a child process and
-    /// invalidates the URL cache.
+    /// Safely removes a child process.
     void removeChild(const Process::PID pid)
     {
         std::lock_guard<std::recursive_mutex> lock(forkMutex);
         const auto it = _childProcesses.find(pid);
         if (it != _childProcesses.end())
         {
-            // Close the child.
+            // Close the child resources.
             it->second->close();
             _childProcesses.erase(it);
-            ++forkCounter;
         }
     }
 
@@ -358,38 +355,57 @@ public:
         return (tokens.count() == 2 && tokens[0] == std::to_string(nPID) && tokens[1] == "ok");
     }
 
-    void verifyChilds()
+    /// Sync ChildProcess instances with its child.
+    /// Returns the number of empty childs.
+    size_t syncChilds()
     {
         std::lock_guard<std::recursive_mutex> lock(forkMutex);
 
-        // Sanitize cache.
-        Log::trace("Verifying Childs.");
-        for (auto& it : _childProcesses)
+        Log::trace("Synching Childs.");
+        size_t empty_count = 0;
+        for (auto it = _childProcesses.begin(); it != _childProcesses.end(); )
         {
             const auto aMessage = "query url \r\n";
-            if (Util::writeFIFO(it.second->getWritePipe(), aMessage) < 0)
-            {
-                Log::error("Error sending query message to child [" + std::to_string(it.second->getPid()) + "]. Clearing cache.");
-                _cacheURL.clear();
-                break;
-            }
-
             std::string aResponse;
-            if (getResponseLine(readerChild, aResponse) < 0)
+            if (Util::writeFIFO(it->second->getWritePipe(), aMessage) < 0 ||
+                getResponseLine(readerChild, aResponse) < 0)
             {
-                Log::error("Error reading response to thread message from child [" + std::to_string(it.second->getPid()) + "]. Clearing cache.");
-                _cacheURL.clear();
-                break;
+                auto log = Log::error();
+                log << "Error querying child [" << std::to_string(it->second->getPid()) << "].";
+                if (it->second->getUrl().empty())
+                {
+                    log << " Removing empty child.";
+                    it = _childProcesses.erase(it);
+                }
+
+                log << Log::end;
+                continue;
             }
 
             StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-            if (tokens.count() != 2 || tokens[0] != std::to_string(it.second->getPid()) || tokens[1] != "ok")
+            if (tokens.count() == 2 && tokens[0] == std::to_string(it->second->getPid()))
             {
-                Log::debug() << "Removed expired Kit [" << it.second->getPid() << "] hosts URL [" << it.second->getUrl() << "]." << Log::end;
-                //it = _cacheURL.erase(it);
-                continue;
+                Log::debug("Child [" + std::to_string(it->second->getPid()) + "] hosts [" + tokens[1] + "].");
+                if (tokens[1] == "empty")
+                {
+                    it->second->setUrl("");
+                    ++empty_count;
+                }
+                else
+                {
+                    it->second->setUrl(tokens[1]);
+                }
+            }
+            else
+            {
+                Log::error("Unexpected response from child [" + std::to_string(it->second->getPid()) +
+                           "] to query: [" + tokens[1] + "].");
             }
+
+            ++it;
         }
+
+        return empty_count;
     }
 
     void handleInput(const std::string& aMessage)
@@ -410,12 +426,14 @@ public:
                 if (child->getUrl() == aURL)
                     Log::debug("Found URL [" + aURL + "] hosted on child [" + std::to_string(child->getPid()) + "].");
                 else
-                    Log::debug("URL [" + aURL + "] is not hosted. Using empty child[" + std::to_string(child->getPid()) + "].");
+                    Log::debug("URL [" + aURL + "] is not hosted. Using empty child [" + std::to_string(child->getPid()) + "].");
 
                 if (!createThread(child->getPid(), aTID, aURL))
                 {
-                    Log::error("Cache: Error creating thread [" + aTID + "] for URL [" + aURL + "].");
+                    Log::error("Error creating thread [" + aTID + "] for URL [" + aURL + "].");
                 }
+
+                child->setUrl(aURL);
             }
             else
             {
@@ -499,7 +517,7 @@ public:
                     const auto duration = (std::chrono::steady_clock::now() - lastMaintenanceTime);
                     if (duration >= std::chrono::seconds(10))
                     {
-                        verifyChilds();
+                        syncChilds();
                         lastMaintenanceTime = std::chrono::steady_clock::now();
                     }
 
@@ -874,15 +892,14 @@ int main(int argc, char** argv)
         {
             std::lock_guard<std::recursive_mutex> lock(forkMutex);
 
-            pipeHandler.verifyChilds();
-
-            // Figure out how many children we need.
+            const signed empty = pipeHandler.syncChilds();
             const signed total = _childProcesses.size();
-            const signed used = _cacheURL.size();
-            const signed extra = total - used;
-            signed spawn = std::min(static_cast<int>(forkCounter), numPreSpawnedChildren);
-            Log::debug() << "Spawning " << spawn << " children. Current Total: " << total
-                         << ", used: " << used << ", extra: " << extra << Log::end;
+
+            // Figure out how many children we need. Always create at least as many
+            // as configured pre-spawn or one more than requested (whichever is larger).
+            signed spawn = std::max(static_cast<int>(forkCounter) + 1, numPreSpawnedChildren);
+            Log::debug() << "Creating " << spawn << " childs. Current Total: "
+                         << total << ", Empty: " << empty << Log::end;
             do
             {
                 if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
@@ -935,6 +952,8 @@ int main(int argc, char** argv)
                 Log::warn() << "Unknown status returned by waitpid: "
                             << std::hex << status << "." << Log::end;
             }
+
+            pipeHandler.syncChilds();
         }
         else if (pid < 0)
             Log::error("Error: waitpid failed.");


More information about the Libreoffice-commits mailing list