[Libreoffice-commits] online.git: loolwsd/Common.hpp loolwsd/LOOLBroker.cpp loolwsd/Util.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Thu Jan 14 05:56:58 PST 2016


 loolwsd/Common.hpp     |    1 
 loolwsd/LOOLBroker.cpp |  255 +++++++++++++++++++++++++++----------------------
 loolwsd/Util.cpp       |    5 
 3 files changed, 148 insertions(+), 113 deletions(-)

New commits:
commit 0632da5edba5dc876ead45ea7ffcd4c79d2f6f41
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Wed Jan 13 19:28:51 2016 -0500

    loolwsd: reworked child management and load balancing in broker
    
    Change-Id: I92874d1aeb8fe46f3bbc1cb9d3b2b9d46632f4b9
    Reviewed-on: https://gerrit.libreoffice.org/21473
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/Common.hpp b/loolwsd/Common.hpp
index 10c3d7e..87585d3 100644
--- a/loolwsd/Common.hpp
+++ b/loolwsd/Common.hpp
@@ -17,6 +17,7 @@ constexpr int DEFAULT_CLIENT_PORT_NUMBER = 9980;
 constexpr int MASTER_PORT_NUMBER = 9981;
 constexpr int INTERVAL_PROBES = 10;
 constexpr int MAINTENANCE_INTERVAL = 1;
+constexpr int CHILD_TIMEOUT_SECS = 10;
 constexpr int POLL_TIMEOUT = 1000000;
 // Pipe buffer is in function of URL size, a big URL will be handled in several
 // work loads.
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 575338a..eefa8e9 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -68,9 +68,9 @@ static int readerChild = -1;
 static int readerBroker = -1;
 
 static std::atomic<unsigned> forkCounter;
-static std::atomic<std::chrono::seconds> maintenance;
+static std::chrono::steady_clock::time_point lastMaintenanceTime = std::chrono::steady_clock::now();
 static unsigned int childCounter = 0;
-static unsigned int numPreSpawnedChildren = 0;
+static signed numPreSpawnedChildren = 0;
 
 static std::mutex forkMutex;
 static std::map<Process::PID, int> _childProcesses;
@@ -78,6 +78,31 @@ static std::map<std::string, Process::PID> _cacheURL;
 
 namespace
 {
+    /// Safely looks up the pipe descriptor
+    /// of a child. Returns -1 on error.
+    int getChildPipe(const Process::PID pid)
+    {
+        std::lock_guard<std::mutex> lock(forkMutex);
+        const auto it = _childProcesses.find(pid);
+        return (it != _childProcesses.end() ? it->second : -1);
+    }
+
+    /// Safely removes a child process and
+    /// invalidates the URL cache.
+    void removeChild(const Process::PID pid)
+    {
+        std::lock_guard<std::mutex> lock(forkMutex);
+        const auto it = _childProcesses.find(pid);
+        if (it != _childProcesses.end())
+        {
+            // Close the write pipe.
+            close(it->second);
+            _childProcesses.erase(it);
+            _cacheURL.clear();
+            ++forkCounter;
+        }
+    }
+
     ThreadLocal<std::string> sourceForLinkOrCopy;
     ThreadLocal<Path> destinationForLinkOrCopy;
 
@@ -177,37 +202,45 @@ public:
         ssize_t nBytes = -1;
         aLine.clear();
 
-        while (true)
+        try
         {
-            if ( _pStart == _pEnd )
+            while (true)
             {
-                nBytes = Util::readMessage(nPipeReader, _aBuffer, sizeof(_aBuffer));
-                if ( nBytes < 0 )
+                if ( _pStart == _pEnd )
                 {
-                    _pStart = _pEnd = nullptr;
-                    break;
-                }
-
-                _pStart = _aBuffer;
-                _pEnd   = _aBuffer + nBytes;
-            }
+                    nBytes = Util::readMessage(nPipeReader, _aBuffer, sizeof(_aBuffer));
+                    if ( nBytes < 0 )
+                    {
+                        _pStart = _pEnd = nullptr;
+                        break;
+                    }
 
-            if ( _pStart != _pEnd )
-            {
-                char aChar = *_pStart++;
-                while (_pStart != _pEnd && aChar != '\r' && aChar != '\n')
-                {
-                    aLine += aChar;
-                    aChar  = *_pStart++;
+                    _pStart = _aBuffer;
+                    _pEnd   = _aBuffer + nBytes;
                 }
 
-                if ( aChar == '\r' && *_pStart == '\n')
+                if ( _pStart != _pEnd )
                 {
-                    _pStart++;
-                    break;
+                    char aChar = *_pStart++;
+                    while (_pStart != _pEnd && aChar != '\r' && aChar != '\n')
+                    {
+                        aLine += aChar;
+                        aChar  = *_pStart++;
+                    }
+
+                    if ( aChar == '\r' && *_pStart == '\n')
+                    {
+                        _pStart++;
+                        break;
+                    }
                 }
             }
         }
+        catch (const std::exception& exc)
+        {
+            Log::error(std::string("Exception: ") + exc.what());
+            return -1;
+        }
 
         return nBytes;
     }
@@ -238,22 +271,23 @@ public:
         return nBytes;
     }
 
-    ssize_t createThread(Process::PID nPID, const std::string& aTID, const std::string& aURL)
+    ssize_t createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL)
     {
         const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n";
-        return sendMessage(_childProcesses[nPID], aMessage);
+        return sendMessage(getChildPipe(nPID), aMessage);
     }
 
     void verifyChilds()
     {
+        Log::trace("Verifying Childs.");
         std::string aMessage;
         bool bError = false;
 
         // sanity cache
-        for (auto it =_cacheURL.cbegin(); it != _cacheURL.cend(); )
+        for (auto it = _cacheURL.cbegin(); it != _cacheURL.cend(); )
         {
             aMessage = "search " + it->first + "\r\n";
-            if (sendMessage(_childProcesses[it->second], aMessage) < 0)
+            if (sendMessage(getChildPipe(it->second), aMessage) < 0)
             {
                 bError = true;
                 break;
@@ -346,13 +380,9 @@ public:
 
                 return;
             }
-            else
-            {
-                Log::debug("URL [" + aURL + "] is not in cache, searching " +
-                           std::to_string(_childProcesses.size()) + " kits.");
-            }
 
             // not found in cache, full search.
+            Log::debug("URL [" + aURL + "] is not in cache");
             const Process::PID nPID = searchURL(aURL);
             if ( nPID > 0 )
             {
@@ -367,8 +397,8 @@ public:
             }
             else
             {
-                Log::info("No children available, creating [" + std::to_string(numPreSpawnedChildren) + "] childs");
-                forkCounter = numPreSpawnedChildren;
+                Log::info("No children available.");
+                ++forkCounter;
             }
         }
     }
@@ -440,15 +470,16 @@ public:
                 {
                     pStart++;
 
-                    forkMutex.lock();
-                    if (maintenance.load() > std::chrono::seconds(10))
+                    Log::trace("Recv: " + aMessage);
+                    const auto duration = (std::chrono::steady_clock::now() - lastMaintenanceTime);
+                    if (duration >= std::chrono::seconds(10))
                     {
-                        maintenance = std::chrono::seconds::zero();
                         verifyChilds();
+                        lastMaintenanceTime = std::chrono::steady_clock::now();
                     }
+
                     handleInput(aMessage);
                     aMessage.clear();
-                    forkMutex.unlock();
                 }
             }
         }
@@ -493,7 +524,8 @@ static bool globalPreinit(const std::string &loSubPath)
     return preInit(("/" + loSubPath + "/program").c_str(), "file:///user") == 0;
 }
 
-static int createLibreOfficeKit(const bool sharePages, const std::string& loSubPath,
+static int createLibreOfficeKit(const bool sharePages,
+                                const std::string& loSubPath,
                                 const std::string& jailId)
 {
     Poco::UInt64 child;
@@ -556,45 +588,25 @@ static int createLibreOfficeKit(const bool sharePages, const std::string& loSubP
         return -1;
     }
 
-    Log::info() << "Adding Kit #" << childCounter << " PID " << child << Log::end;
+    Log::info() << "Adding Kit #" << childCounter << ", PID: " << child << Log::end;
     _childProcesses[child] = nFIFOWriter;
+    ++forkCounter;
     return child;
 }
 
-static int startupLibreOfficeKit(const bool sharePages, const int nLOKits,
-                                 const std::string& loSubPath, const std::string& jailId)
+static bool waitForTerminationChild(const Process::PID aPID, signed count = CHILD_TIMEOUT_SECS)
 {
-    Process::PID pId = -1;
-
-    Log::info() << "Starting " << nLOKits << " LoKit instaces." << Log::end;
-    for (int nCntr = nLOKits; nCntr; nCntr--)
-    {
-        if ((pId = createLibreOfficeKit(sharePages, loSubPath, jailId)) < 0)
-        {
-            Log::error("Error: failed to create LibreOfficeKit.");
-            break;
-        }
-    }
-
-    return pId;
-}
-
-
-static bool waitForTerminationChild(const Process::PID aPID)
-{
-    int status;
-    short nCntr = 3;
-
-    while (nCntr-- > 0)
+    while (count-- > 0)
     {
+        int status;
         waitpid(aPID, &status, WUNTRACED | WNOHANG);
         if (WIFEXITED(status))
-            break;
+            return true;
 
         sleep(MAINTENANCE_INTERVAL);
     }
 
-    return nCntr;
+    return false;
 }
 
 // Broker process
@@ -691,7 +703,7 @@ int main(int argc, char** argv)
         exit(-1);
     }
 
-    if ( !numPreSpawnedChildren )
+    if (numPreSpawnedChildren < 1)
     {
         Log::error("Error: --numprespawns is 0");
         exit(-1);
@@ -801,9 +813,11 @@ int main(int argc, char** argv)
         exit(-1);
     }
 
+    // Initialize LoKit and hope we can fork and save memory by sharing pages.
     const bool sharePages = globalPreinit(loSubPath);
 
-    if ( startupLibreOfficeKit(sharePages, numPreSpawnedChildren, loSubPath, jailId) < 0 )
+    // We must have at least one child, more is created dynamically.
+    if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
     {
         Log::error("Error: failed to create children.");
         exit(-1);
@@ -823,74 +837,95 @@ int main(int argc, char** argv)
     Log::info("loolbroker is ready.");
 
     unsigned timeoutCounter = 0;
-    while (!TerminationFlag && !_childProcesses.empty())
+    while (!TerminationFlag)
     {
+        if (forkCounter > 0)
+        {
+            std::lock_guard<std::mutex> lock(forkMutex);
+
+            // Figure out how many children we need.
+            const signed total = _childProcesses.size();
+            const signed used = _cacheURL.size();
+            const signed extra = total - used;
+            if (extra < numPreSpawnedChildren)
+            {
+                if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
+                    Log::error("Error: fork failed.");
+            }
+            else
+                forkCounter = 0;
+        }
+
         int status;
         const pid_t pid = waitpid(-1, &status, WUNTRACED | WNOHANG);
         if (pid > 0)
         {
-            if ( _childProcesses.find(pid) != _childProcesses.end() )
+            if (WIFEXITED(status))
             {
-                if ((WIFEXITED(status) || WIFSIGNALED(status) || WTERMSIG(status) ) )
-                {
-                    Log::error("Child [" + std::to_string(pid) + "] processes died.");
-
-                    forkMutex.lock();
-                    _childProcesses.erase(pid);
-                    _cacheURL.clear();
-                    forkMutex.unlock();
-                }
-
-                if ( WCOREDUMP(status) )
-                    Log::error("Child [" + std::to_string(pid) + "] produced a core dump.");
-
-                if ( WIFSTOPPED(status) )
-                    Log::error("Child [" + std::to_string(pid) + "] process was stopped by delivery of a signal.");
+                Log::info() << "Child process [" << pid << "] exited with code: "
+                            << WEXITSTATUS(status) << "." << Log::end;
 
-                if ( WSTOPSIG(status) )
-                    Log::error("Child [" + std::to_string(pid) + "] process was stopped.");
+                removeChild(pid);
+            }
+            else
+            if (WIFSIGNALED(status))
+            {
+                std::string fate = "died";
+#ifdef WCOREDUMP
+                if (WCOREDUMP(status))
+                    fate = "core-dumped";
+#endif
+                Log::error() << "Child process [" << pid << "] " << fate
+                             << " with " << Util::signalName(WTERMSIG(status))
+                             << " signal. " << Log::end;
 
-                if ( WIFCONTINUED(status) )
-                    Log::error("Child [" + std::to_string(pid) + "] process was resumed.");
+                removeChild(pid);
+            }
+            else if (WIFSTOPPED(status))
+            {
+                Log::info() << "Child process [" << pid << "] stopped with "
+                            << Util::signalName(WSTOPSIG(status))
+                            << " signal. " << Log::end;
+            }
+            else if (WIFCONTINUED(status))
+            {
+                Log::info() << "Child process [" << pid << "] resumed with SIGCONT."
+                            << Log::end;
             }
             else
             {
-                Log::error("None of our known child processes died. PID: " + std::to_string(pid));
+                Log::warn() << "Unknown status returned by waitpid: "
+                            << std::hex << status << "." << Log::end;
             }
         }
         else if (pid < 0)
-            Log::error("Error: Child error.");
-
-        if (forkCounter > 0)
-        {
-            forkMutex.lock();
-            --forkCounter;
-
-            if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
-                Log::error("Error: fork failed.");
-
-            forkMutex.unlock();
-        }
+            Log::error("Error: waitpid failed.");
 
         if (timeoutCounter++ == INTERVAL_PROBES)
         {
             timeoutCounter = 0;
             sleep(MAINTENANCE_INTERVAL);
-            maintenance.store( ++maintenance.load() );
         }
     }
 
     // Terminate child processes
-    for (auto i : _childProcesses)
+    for (auto& it : _childProcesses)
     {
-        Log::info("Requesting child process " + std::to_string(i.first) + " to terminate.");
-        close(i.second);
-        Process::requestTermination(i.first);
-        if (!waitForTerminationChild(i.first))
+        Log::info("Requesting child process " + std::to_string(it.first) + " to terminate.");
+        Process::requestTermination(it.first);
+    }
+
+    // Wait and kill child processes
+    for (auto& it : _childProcesses)
+    {
+        if (!waitForTerminationChild(it.first))
         {
-            Log::info("Forcing a child process " + std::to_string(i.first) + " to terminate.");
-            Process::kill(i.first);
+            Log::info("Forcing child process " + std::to_string(it.first) + " to terminate.");
+            Process::kill(it.first);
         }
+
+        // Close the write pipe.
+        close(it.second);
     }
 
     aPipe.join();
diff --git a/loolwsd/Util.cpp b/loolwsd/Util.cpp
index af263f0..5da99de 100644
--- a/loolwsd/Util.cpp
+++ b/loolwsd/Util.cpp
@@ -402,9 +402,9 @@ namespace Util
         aPoll.events = POLLIN;
         aPoll.revents = 0;
 
-        int nPoll = poll(&aPoll, 1, 3000);
+        const int nPoll = poll(&aPoll, 1, CHILD_TIMEOUT_SECS * 1000);
         if ( nPoll < 0 )
-            goto ErrorPoll;
+            return -1;
 
         if ( nPoll == 0 )
             errno = ETIME;
@@ -412,7 +412,6 @@ namespace Util
         if( (aPoll.revents & POLLIN) != 0 )
             nBytes = readFIFO(nPipe, pBuffer, nSize);
 
-    ErrorPoll:
         return nBytes;
     }
 


More information about the Libreoffice-commits mailing list