[Libreoffice-commits] online.git: loolwsd/Common.hpp loolwsd/LOOLBroker.cpp loolwsd/Util.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Thu Jan 14 05:56:58 PST 2016
loolwsd/Common.hpp | 1
loolwsd/LOOLBroker.cpp | 255 +++++++++++++++++++++++++++----------------------
loolwsd/Util.cpp | 5
3 files changed, 148 insertions(+), 113 deletions(-)
New commits:
commit 0632da5edba5dc876ead45ea7ffcd4c79d2f6f41
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Wed Jan 13 19:28:51 2016 -0500
loolwsd: reworked child management and load balancing in broker
Change-Id: I92874d1aeb8fe46f3bbc1cb9d3b2b9d46632f4b9
Reviewed-on: https://gerrit.libreoffice.org/21473
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/Common.hpp b/loolwsd/Common.hpp
index 10c3d7e..87585d3 100644
--- a/loolwsd/Common.hpp
+++ b/loolwsd/Common.hpp
@@ -17,6 +17,7 @@ constexpr int DEFAULT_CLIENT_PORT_NUMBER = 9980;
constexpr int MASTER_PORT_NUMBER = 9981;
constexpr int INTERVAL_PROBES = 10;
constexpr int MAINTENANCE_INTERVAL = 1;
+constexpr int CHILD_TIMEOUT_SECS = 10;
constexpr int POLL_TIMEOUT = 1000000;
// Pipe buffer is in function of URL size, a big URL will be handled in several
// work loads.
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 575338a..eefa8e9 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -68,9 +68,9 @@ static int readerChild = -1;
static int readerBroker = -1;
static std::atomic<unsigned> forkCounter;
-static std::atomic<std::chrono::seconds> maintenance;
+static std::chrono::steady_clock::time_point lastMaintenanceTime = std::chrono::steady_clock::now();
static unsigned int childCounter = 0;
-static unsigned int numPreSpawnedChildren = 0;
+static signed numPreSpawnedChildren = 0;
static std::mutex forkMutex;
static std::map<Process::PID, int> _childProcesses;
@@ -78,6 +78,31 @@ static std::map<std::string, Process::PID> _cacheURL;
namespace
{
+ /// Safely looks up the pipe descriptor
+ /// of a child. Returns -1 on error.
+ int getChildPipe(const Process::PID pid)
+ {
+ std::lock_guard<std::mutex> lock(forkMutex);
+ const auto it = _childProcesses.find(pid);
+ return (it != _childProcesses.end() ? it->second : -1);
+ }
+
+ /// Safely removes a child process and
+ /// invalidates the URL cache.
+ void removeChild(const Process::PID pid)
+ {
+ std::lock_guard<std::mutex> lock(forkMutex);
+ const auto it = _childProcesses.find(pid);
+ if (it != _childProcesses.end())
+ {
+ // Close the write pipe.
+ close(it->second);
+ _childProcesses.erase(it);
+ _cacheURL.clear();
+ ++forkCounter;
+ }
+ }
+
ThreadLocal<std::string> sourceForLinkOrCopy;
ThreadLocal<Path> destinationForLinkOrCopy;
@@ -177,37 +202,45 @@ public:
ssize_t nBytes = -1;
aLine.clear();
- while (true)
+ try
{
- if ( _pStart == _pEnd )
+ while (true)
{
- nBytes = Util::readMessage(nPipeReader, _aBuffer, sizeof(_aBuffer));
- if ( nBytes < 0 )
+ if ( _pStart == _pEnd )
{
- _pStart = _pEnd = nullptr;
- break;
- }
-
- _pStart = _aBuffer;
- _pEnd = _aBuffer + nBytes;
- }
+ nBytes = Util::readMessage(nPipeReader, _aBuffer, sizeof(_aBuffer));
+ if ( nBytes < 0 )
+ {
+ _pStart = _pEnd = nullptr;
+ break;
+ }
- if ( _pStart != _pEnd )
- {
- char aChar = *_pStart++;
- while (_pStart != _pEnd && aChar != '\r' && aChar != '\n')
- {
- aLine += aChar;
- aChar = *_pStart++;
+ _pStart = _aBuffer;
+ _pEnd = _aBuffer + nBytes;
}
- if ( aChar == '\r' && *_pStart == '\n')
+ if ( _pStart != _pEnd )
{
- _pStart++;
- break;
+ char aChar = *_pStart++;
+ while (_pStart != _pEnd && aChar != '\r' && aChar != '\n')
+ {
+ aLine += aChar;
+ aChar = *_pStart++;
+ }
+
+ if ( aChar == '\r' && *_pStart == '\n')
+ {
+ _pStart++;
+ break;
+ }
}
}
}
+ catch (const std::exception& exc)
+ {
+ Log::error(std::string("Exception: ") + exc.what());
+ return -1;
+ }
return nBytes;
}
@@ -238,22 +271,23 @@ public:
return nBytes;
}
- ssize_t createThread(Process::PID nPID, const std::string& aTID, const std::string& aURL)
+ ssize_t createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL)
{
const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n";
- return sendMessage(_childProcesses[nPID], aMessage);
+ return sendMessage(getChildPipe(nPID), aMessage);
}
void verifyChilds()
{
+ Log::trace("Verifying Childs.");
std::string aMessage;
bool bError = false;
// sanity cache
- for (auto it =_cacheURL.cbegin(); it != _cacheURL.cend(); )
+ for (auto it = _cacheURL.cbegin(); it != _cacheURL.cend(); )
{
aMessage = "search " + it->first + "\r\n";
- if (sendMessage(_childProcesses[it->second], aMessage) < 0)
+ if (sendMessage(getChildPipe(it->second), aMessage) < 0)
{
bError = true;
break;
@@ -346,13 +380,9 @@ public:
return;
}
- else
- {
- Log::debug("URL [" + aURL + "] is not in cache, searching " +
- std::to_string(_childProcesses.size()) + " kits.");
- }
// not found in cache, full search.
+ Log::debug("URL [" + aURL + "] is not in cache");
const Process::PID nPID = searchURL(aURL);
if ( nPID > 0 )
{
@@ -367,8 +397,8 @@ public:
}
else
{
- Log::info("No children available, creating [" + std::to_string(numPreSpawnedChildren) + "] childs");
- forkCounter = numPreSpawnedChildren;
+ Log::info("No children available.");
+ ++forkCounter;
}
}
}
@@ -440,15 +470,16 @@ public:
{
pStart++;
- forkMutex.lock();
- if (maintenance.load() > std::chrono::seconds(10))
+ Log::trace("Recv: " + aMessage);
+ const auto duration = (std::chrono::steady_clock::now() - lastMaintenanceTime);
+ if (duration >= std::chrono::seconds(10))
{
- maintenance = std::chrono::seconds::zero();
verifyChilds();
+ lastMaintenanceTime = std::chrono::steady_clock::now();
}
+
handleInput(aMessage);
aMessage.clear();
- forkMutex.unlock();
}
}
}
@@ -493,7 +524,8 @@ static bool globalPreinit(const std::string &loSubPath)
return preInit(("/" + loSubPath + "/program").c_str(), "file:///user") == 0;
}
-static int createLibreOfficeKit(const bool sharePages, const std::string& loSubPath,
+static int createLibreOfficeKit(const bool sharePages,
+ const std::string& loSubPath,
const std::string& jailId)
{
Poco::UInt64 child;
@@ -556,45 +588,25 @@ static int createLibreOfficeKit(const bool sharePages, const std::string& loSubP
return -1;
}
- Log::info() << "Adding Kit #" << childCounter << " PID " << child << Log::end;
+ Log::info() << "Adding Kit #" << childCounter << ", PID: " << child << Log::end;
_childProcesses[child] = nFIFOWriter;
+ ++forkCounter;
return child;
}
-static int startupLibreOfficeKit(const bool sharePages, const int nLOKits,
- const std::string& loSubPath, const std::string& jailId)
+static bool waitForTerminationChild(const Process::PID aPID, signed count = CHILD_TIMEOUT_SECS)
{
- Process::PID pId = -1;
-
- Log::info() << "Starting " << nLOKits << " LoKit instaces." << Log::end;
- for (int nCntr = nLOKits; nCntr; nCntr--)
- {
- if ((pId = createLibreOfficeKit(sharePages, loSubPath, jailId)) < 0)
- {
- Log::error("Error: failed to create LibreOfficeKit.");
- break;
- }
- }
-
- return pId;
-}
-
-
-static bool waitForTerminationChild(const Process::PID aPID)
-{
- int status;
- short nCntr = 3;
-
- while (nCntr-- > 0)
+ while (count-- > 0)
{
+ int status;
waitpid(aPID, &status, WUNTRACED | WNOHANG);
if (WIFEXITED(status))
- break;
+ return true;
sleep(MAINTENANCE_INTERVAL);
}
- return nCntr;
+ return false;
}
// Broker process
@@ -691,7 +703,7 @@ int main(int argc, char** argv)
exit(-1);
}
- if ( !numPreSpawnedChildren )
+ if (numPreSpawnedChildren < 1)
{
Log::error("Error: --numprespawns is 0");
exit(-1);
@@ -801,9 +813,11 @@ int main(int argc, char** argv)
exit(-1);
}
+ // Initialize LoKit and hope we can fork and save memory by sharing pages.
const bool sharePages = globalPreinit(loSubPath);
- if ( startupLibreOfficeKit(sharePages, numPreSpawnedChildren, loSubPath, jailId) < 0 )
+ // We must have at least one child, more is created dynamically.
+ if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
{
Log::error("Error: failed to create children.");
exit(-1);
@@ -823,74 +837,95 @@ int main(int argc, char** argv)
Log::info("loolbroker is ready.");
unsigned timeoutCounter = 0;
- while (!TerminationFlag && !_childProcesses.empty())
+ while (!TerminationFlag)
{
+ if (forkCounter > 0)
+ {
+ std::lock_guard<std::mutex> lock(forkMutex);
+
+ // Figure out how many children we need.
+ const signed total = _childProcesses.size();
+ const signed used = _cacheURL.size();
+ const signed extra = total - used;
+ if (extra < numPreSpawnedChildren)
+ {
+ if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
+ Log::error("Error: fork failed.");
+ }
+ else
+ forkCounter = 0;
+ }
+
int status;
const pid_t pid = waitpid(-1, &status, WUNTRACED | WNOHANG);
if (pid > 0)
{
- if ( _childProcesses.find(pid) != _childProcesses.end() )
+ if (WIFEXITED(status))
{
- if ((WIFEXITED(status) || WIFSIGNALED(status) || WTERMSIG(status) ) )
- {
- Log::error("Child [" + std::to_string(pid) + "] processes died.");
-
- forkMutex.lock();
- _childProcesses.erase(pid);
- _cacheURL.clear();
- forkMutex.unlock();
- }
-
- if ( WCOREDUMP(status) )
- Log::error("Child [" + std::to_string(pid) + "] produced a core dump.");
-
- if ( WIFSTOPPED(status) )
- Log::error("Child [" + std::to_string(pid) + "] process was stopped by delivery of a signal.");
+ Log::info() << "Child process [" << pid << "] exited with code: "
+ << WEXITSTATUS(status) << "." << Log::end;
- if ( WSTOPSIG(status) )
- Log::error("Child [" + std::to_string(pid) + "] process was stopped.");
+ removeChild(pid);
+ }
+ else
+ if (WIFSIGNALED(status))
+ {
+ std::string fate = "died";
+#ifdef WCOREDUMP
+ if (WCOREDUMP(status))
+ fate = "core-dumped";
+#endif
+ Log::error() << "Child process [" << pid << "] " << fate
+ << " with " << Util::signalName(WTERMSIG(status))
+ << " signal. " << Log::end;
- if ( WIFCONTINUED(status) )
- Log::error("Child [" + std::to_string(pid) + "] process was resumed.");
+ removeChild(pid);
+ }
+ else if (WIFSTOPPED(status))
+ {
+ Log::info() << "Child process [" << pid << "] stopped with "
+ << Util::signalName(WSTOPSIG(status))
+ << " signal. " << Log::end;
+ }
+ else if (WIFCONTINUED(status))
+ {
+ Log::info() << "Child process [" << pid << "] resumed with SIGCONT."
+ << Log::end;
}
else
{
- Log::error("None of our known child processes died. PID: " + std::to_string(pid));
+ Log::warn() << "Unknown status returned by waitpid: "
+ << std::hex << status << "." << Log::end;
}
}
else if (pid < 0)
- Log::error("Error: Child error.");
-
- if (forkCounter > 0)
- {
- forkMutex.lock();
- --forkCounter;
-
- if (createLibreOfficeKit(sharePages, loSubPath, jailId) < 0)
- Log::error("Error: fork failed.");
-
- forkMutex.unlock();
- }
+ Log::error("Error: waitpid failed.");
if (timeoutCounter++ == INTERVAL_PROBES)
{
timeoutCounter = 0;
sleep(MAINTENANCE_INTERVAL);
- maintenance.store( ++maintenance.load() );
}
}
// Terminate child processes
- for (auto i : _childProcesses)
+ for (auto& it : _childProcesses)
{
- Log::info("Requesting child process " + std::to_string(i.first) + " to terminate.");
- close(i.second);
- Process::requestTermination(i.first);
- if (!waitForTerminationChild(i.first))
+ Log::info("Requesting child process " + std::to_string(it.first) + " to terminate.");
+ Process::requestTermination(it.first);
+ }
+
+ // Wait and kill child processes
+ for (auto& it : _childProcesses)
+ {
+ if (!waitForTerminationChild(it.first))
{
- Log::info("Forcing a child process " + std::to_string(i.first) + " to terminate.");
- Process::kill(i.first);
+ Log::info("Forcing child process " + std::to_string(it.first) + " to terminate.");
+ Process::kill(it.first);
}
+
+ // Close the write pipe.
+ close(it.second);
}
aPipe.join();
diff --git a/loolwsd/Util.cpp b/loolwsd/Util.cpp
index af263f0..5da99de 100644
--- a/loolwsd/Util.cpp
+++ b/loolwsd/Util.cpp
@@ -402,9 +402,9 @@ namespace Util
aPoll.events = POLLIN;
aPoll.revents = 0;
- int nPoll = poll(&aPoll, 1, 3000);
+ const int nPoll = poll(&aPoll, 1, CHILD_TIMEOUT_SECS * 1000);
if ( nPoll < 0 )
- goto ErrorPoll;
+ return -1;
if ( nPoll == 0 )
errno = ETIME;
@@ -412,7 +412,6 @@ namespace Util
if( (aPoll.revents & POLLIN) != 0 )
nBytes = readFIFO(nPipe, pBuffer, nSize);
- ErrorPoll:
return nBytes;
}
More information about the Libreoffice-commits
mailing list