[Libreoffice-commits] online.git: loolwsd/LOOLBroker.cpp loolwsd/Util.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Sun Jan 24 12:59:58 PST 2016


 loolwsd/LOOLBroker.cpp |  150 ++++++++++++++++++++++++-------------------------
 loolwsd/Util.cpp       |    5 -
 2 files changed, 76 insertions(+), 79 deletions(-)

New commits:
commit ef2ec0b2e2197d64dc563dcaa856ec60eb39e1dc
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Fri Jan 22 09:49:05 2016 -0500

    loolwsd: fixes to broker threading and communication
    
    Broker cache clean up is now done only during searching
    as there we loop over processes and request status
    from each child.
    
    Internal helpers simplified and termination is done
    in a single removeChild helper.
    
    Change-Id: I31f7df5429f0737d352842d5c0f6a02b91b8078f
    Reviewed-on: https://gerrit.libreoffice.org/21751
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 62ae29e..7647dcc 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -87,6 +87,14 @@ namespace
         return (it != _childProcesses.end() ? it->second : -1);
     }
 
+    void requestAbnormalTermination(const Process::PID aPID)
+    {
+        if (kill(aPID, SIGTERM) != 0 && kill(aPID, 0) != 0)
+        {
+            Log::info("Cannot terminate lokit [" + std::to_string(aPID) + "].");
+        }
+    }
+
     /// Safely removes a child process and
     /// invalidates the URL cache.
     void removeChild(const Process::PID pid)
@@ -96,6 +104,7 @@ namespace
         if (it != _childProcesses.end())
         {
             // Close the write pipe.
+            requestAbnormalTermination(pid);
             close(it->second);
             _childProcesses.erase(it);
             _cacheURL.clear();
@@ -178,14 +187,6 @@ namespace
         if (nftw(source.c_str(), linkOrCopyFunction, 10, FTW_DEPTH) == -1)
             Log::error("linkOrCopy: nftw() failed for '" + source + "'");
     }
-
-    void requestAbnormalTermination(const Process::PID aPID)
-    {
-        if (kill(aPID, SIGTERM) != 0)
-        {
-            Log::info("Cannot terminate lokit [" + std::to_string(aPID) + "]");
-        }
-    }
 }
 
 class PipeRunnable: public Runnable
@@ -238,21 +239,27 @@ public:
         }
         catch (const std::exception& exc)
         {
-            Log::error(std::string("Exception: ") + exc.what());
+            Log::error() << "Exception while reading from pipe ["
+                         << nPipeReader << "]: " << exc.what() << Log::end;
             return -1;
         }
 
         return nBytes;
     }
 
-    bool isOKResponse(int nPID)
+    bool createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL)
     {
+        const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n";
+        if (Util::writeFIFO(getChildPipe(nPID), aMessage) < 0)
+        {
+            Log::error("Error sending thread message to child [" + std::to_string(nPID) + "].");
+            return false;
+        }
+
         std::string aResponse;
         if (getResponseLine(readerChild, aResponse) < 0)
         {
-            Log::error("Error reading child response: " + std::to_string(nPID) + ". Clearing cache.");
-            requestAbnormalTermination(nPID);
-            _cacheURL.clear();
+            Log::error("Error reading response to thread message from child [" + std::to_string(nPID) + "].");
             return false;
         }
 
@@ -260,51 +267,40 @@ public:
         return (tokens.count() == 2 && tokens[1] == "ok");
     }
 
-    ssize_t sendMessage(int nPipeWriter, const std::string& aMessage)
-    {
-        const ssize_t nBytes = Util::writeFIFO(nPipeWriter, aMessage);
-        if ( nBytes < 0 )
-            Log::error("Error writting to child pipe.");
-
-        return nBytes;
-    }
-
-    ssize_t createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL)
-    {
-        const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n";
-        return sendMessage(getChildPipe(nPID), aMessage);
-    }
-
     void verifyChilds()
     {
         std::lock_guard<std::recursive_mutex> lock(forkMutex);
 
+        // Sanitize cache.
         Log::trace("Verifying Childs.");
-        std::string aMessage;
-        bool bError = false;
-
-        // sanity cache
         for (auto it = _cacheURL.cbegin(); it != _cacheURL.cend(); )
         {
-            aMessage = "search " + it->first + "\r\n";
-            if (sendMessage(getChildPipe(it->second), aMessage) < 0)
+            const auto aMessage = "search " + it->first + "\r\n";
+            if (Util::writeFIFO(getChildPipe(it->second), aMessage) < 0)
+            {
+                Log::error("Error sending search message to child [" + std::to_string(it->second) + "]. Clearing cache.");
+                _cacheURL.clear();
+                break;
+            }
+
+            std::string aResponse;
+            if (getResponseLine(readerChild, aResponse) < 0)
             {
-                bError = true;
+                Log::error("Error reading response to thread message from child [" + std::to_string(it->second) + "]. Clearing cache.");
+                _cacheURL.clear();
                 break;
             }
 
-            if (!isOKResponse(it->second))
+            StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+            if (tokens.count() != 2 || tokens[1] != "ok")
             {
                 Log::debug() << "Removed expired Kit [" << it->second << "] hosts URL [" << it->first << "]." << Log::end;
-                _cacheURL.erase(it++);
+                it = _cacheURL.erase(it);
                 continue;
             }
 
-            it++;
+            ++it;
         }
-
-        if (bError)
-            _cacheURL.clear();
     }
 
     Process::PID searchURL(const std::string& aURL)
@@ -313,45 +309,47 @@ public:
 
         const std::string aMessage = "search " + aURL + "\r\n";
         Process::PID nPID = -1;
-        for (auto& it : _childProcesses)
+        for (auto it = _childProcesses.cbegin(); it != _childProcesses.cend(); )
         {
-            assert(it.first > 0 && it.second > 0);
+            assert(it->first > 0 && it->second > 0);
 
-            Log::trace("Query to kit [" + std::to_string(it.first) + "]: " + aMessage);
-            ssize_t nBytes = Util::writeFIFO(it.second, aMessage);
+            Log::trace("Query to kit [" + std::to_string(it->first) + "]: " + aMessage);
+            ssize_t nBytes = Util::writeFIFO(it->second, aMessage);
             if ( nBytes < 0 )
             {
-                Log::error("Error writting to child pipe: " + std::to_string(it.first) + ". Clearing cache.");
-                requestAbnormalTermination(it.first);
-                _cacheURL.clear();
-                break;
+                Log::error("Error sending search message to child pipe: " + std::to_string(it->first) + ". Terminating.");
+                removeChild(it->second);
+                it = _childProcesses.cbegin();
+                continue;
             }
 
             std::string aResponse;
             nBytes = getResponseLine(readerChild, aResponse);
-            Log::trace("Response from kit [" + std::to_string(it.first) + "]: " + aResponse);
+            Log::trace("Response from kit [" + std::to_string(it->first) + "]: " + aResponse);
             if ( nBytes < 0 )
             {
-                Log::error("Error reading child response: " + std::to_string(it.first) + ". Clearing cache.");
-                requestAbnormalTermination(it.first);
-                _cacheURL.clear();
-                break;
+                Log::error("Error reading response to search message from child [" + std::to_string(it->first) + "]. Terminating.");
+                removeChild(it->second);
+                it = _childProcesses.cbegin();
+                continue;
             }
 
             StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
             if (tokens[1] == "ok")
             {
                 // Found, but find all empty instances.
-                nPID = it.first;
+                nPID = it->first;
                 Log::debug("Kit [" + std::to_string(nPID) + "] hosts URL [" + aURL + "].");
                 break;
             }
             else if (tokens[1] == "empty")
             {
                 // Remember the last empty.
-                nPID = it.first;
+                nPID = it->first;
                 Log::debug("Kit [" + std::to_string(nPID) + "] is empty.");
             }
+
+            ++it;
         }
 
         return nPID;
@@ -369,40 +367,40 @@ public:
 
             Log::debug("Finding kit for URL [" + aURL + "] on thread [" + aTID + "].");
 
-            // check cache
+            // Check the cache first.
             const auto aIterURL = _cacheURL.find(aURL);
             if ( aIterURL != _cacheURL.end() )
             {
                 Log::debug("Cache found URL [" + aURL + "] hosted on child [" + std::to_string(aIterURL->second) +
                            "]. Creating view for thread [" + aTID + "].");
-                if (createThread(aIterURL->second, aTID, aURL) < 0)
-                    Log::error("Cache: Error creating thread.");
-
-                if (!isOKResponse(aIterURL->second))
-                    Log::error("Cache Failed: Creating view for thread [" + aTID + "].");
+                if (createThread(aIterURL->second, aTID, aURL))
+                    return;
 
-                return;
+                Log::error("Cache: Error creating thread [" + aTID + "] for URL [" + aURL + "]. Will search.");
+            }
+            else
+            {
+                // Not found in cache, do a full search.
+                Log::debug("URL [" + aURL + "] is not in cache. Will search.");
             }
 
-            // not found in cache, full search.
-            Log::debug("URL [" + aURL + "] is not in cache");
             const Process::PID nPID = searchURL(aURL);
             if ( nPID > 0 )
             {
-                Log::debug("Creating view for URL [" + aURL + "] for thread [" +
-                           aTID + "] on kit [" + std::to_string(nPID) + "].");
+                Log::debug("Search found child [" + std::to_string(aIterURL->second) +
+                           "] to host URL [" + aURL +
+                           "]. Creating view for thread [" + aTID + "].");
                 if (createThread(nPID, aTID, aURL) < 0)
-                    Log::error("Search: Error creating thread.");
-                else if (isOKResponse(nPID))
+                {
                     _cacheURL[aURL] = nPID;
-                else
-                    Log::error("Failed: Creating view for thread [" + aTID + "].");
-            }
-            else
-            {
-                Log::info("No children available.");
-                ++forkCounter;
+                    return;
+                }
+
+                Log::error("Search: Error creating thread [" + aTID + "] for URL [" + aURL + "].");
             }
+
+            Log::info("No children available. Creating more.");
+            ++forkCounter;
         }
     }
 
diff --git a/loolwsd/Util.cpp b/loolwsd/Util.cpp
index 872219a..ee2b5d1 100644
--- a/loolwsd/Util.cpp
+++ b/loolwsd/Util.cpp
@@ -396,7 +396,6 @@ namespace Util
 
     ssize_t readMessage(int nPipe, char* pBuffer, ssize_t nSize)
     {
-        ssize_t nBytes = -1;
         struct pollfd aPoll;
 
         aPoll.fd = nPipe;
@@ -411,9 +410,9 @@ namespace Util
             errno = ETIME;
 
         if( (aPoll.revents & POLLIN) != 0 )
-            nBytes = readFIFO(nPipe, pBuffer, nSize);
+            return readFIFO(nPipe, pBuffer, nSize);
 
-        return nBytes;
+        return -1;
     }
 
     static


More information about the Libreoffice-commits mailing list