[Libreoffice-commits] online.git: loolwsd/LOOLBroker.cpp loolwsd/Util.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Sun Jan 24 12:59:58 PST 2016
loolwsd/LOOLBroker.cpp | 150 ++++++++++++++++++++++++-------------------------
loolwsd/Util.cpp | 5 -
2 files changed, 76 insertions(+), 79 deletions(-)
New commits:
commit ef2ec0b2e2197d64dc563dcaa856ec60eb39e1dc
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Fri Jan 22 09:49:05 2016 -0500
loolwsd: fixes to broker threading and communication
Broker cache clean up is now done only during searching
as there we loop over processes and request status
from each child.
Internal helpers simplified and termination is done
in a single removeChild helper.
Change-Id: I31f7df5429f0737d352842d5c0f6a02b91b8078f
Reviewed-on: https://gerrit.libreoffice.org/21751
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 62ae29e..7647dcc 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -87,6 +87,14 @@ namespace
return (it != _childProcesses.end() ? it->second : -1);
}
+ void requestAbnormalTermination(const Process::PID aPID)
+ {
+ if (kill(aPID, SIGTERM) != 0 && kill(aPID, 0) != 0)
+ {
+ Log::info("Cannot terminate lokit [" + std::to_string(aPID) + "].");
+ }
+ }
+
/// Safely removes a child process and
/// invalidates the URL cache.
void removeChild(const Process::PID pid)
@@ -96,6 +104,7 @@ namespace
if (it != _childProcesses.end())
{
// Close the write pipe.
+ requestAbnormalTermination(pid);
close(it->second);
_childProcesses.erase(it);
_cacheURL.clear();
@@ -178,14 +187,6 @@ namespace
if (nftw(source.c_str(), linkOrCopyFunction, 10, FTW_DEPTH) == -1)
Log::error("linkOrCopy: nftw() failed for '" + source + "'");
}
-
- void requestAbnormalTermination(const Process::PID aPID)
- {
- if (kill(aPID, SIGTERM) != 0)
- {
- Log::info("Cannot terminate lokit [" + std::to_string(aPID) + "]");
- }
- }
}
class PipeRunnable: public Runnable
@@ -238,21 +239,27 @@ public:
}
catch (const std::exception& exc)
{
- Log::error(std::string("Exception: ") + exc.what());
+ Log::error() << "Exception while reading from pipe ["
+ << nPipeReader << "]: " << exc.what() << Log::end;
return -1;
}
return nBytes;
}
- bool isOKResponse(int nPID)
+ bool createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL)
{
+ const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n";
+ if (Util::writeFIFO(getChildPipe(nPID), aMessage) < 0)
+ {
+ Log::error("Error sending thread message to child [" + std::to_string(nPID) + "].");
+ return false;
+ }
+
std::string aResponse;
if (getResponseLine(readerChild, aResponse) < 0)
{
- Log::error("Error reading child response: " + std::to_string(nPID) + ". Clearing cache.");
- requestAbnormalTermination(nPID);
- _cacheURL.clear();
+ Log::error("Error reading response to thread message from child [" + std::to_string(nPID) + "].");
return false;
}
@@ -260,51 +267,40 @@ public:
return (tokens.count() == 2 && tokens[1] == "ok");
}
- ssize_t sendMessage(int nPipeWriter, const std::string& aMessage)
- {
- const ssize_t nBytes = Util::writeFIFO(nPipeWriter, aMessage);
- if ( nBytes < 0 )
- Log::error("Error writting to child pipe.");
-
- return nBytes;
- }
-
- ssize_t createThread(const Process::PID nPID, const std::string& aTID, const std::string& aURL)
- {
- const std::string aMessage = "thread " + aTID + " " + aURL + "\r\n";
- return sendMessage(getChildPipe(nPID), aMessage);
- }
-
void verifyChilds()
{
std::lock_guard<std::recursive_mutex> lock(forkMutex);
+ // Sanitize cache.
Log::trace("Verifying Childs.");
- std::string aMessage;
- bool bError = false;
-
- // sanity cache
for (auto it = _cacheURL.cbegin(); it != _cacheURL.cend(); )
{
- aMessage = "search " + it->first + "\r\n";
- if (sendMessage(getChildPipe(it->second), aMessage) < 0)
+ const auto aMessage = "search " + it->first + "\r\n";
+ if (Util::writeFIFO(getChildPipe(it->second), aMessage) < 0)
+ {
+ Log::error("Error sending search message to child [" + std::to_string(it->second) + "]. Clearing cache.");
+ _cacheURL.clear();
+ break;
+ }
+
+ std::string aResponse;
+ if (getResponseLine(readerChild, aResponse) < 0)
{
- bError = true;
+ Log::error("Error reading response to thread message from child [" + std::to_string(it->second) + "]. Clearing cache.");
+ _cacheURL.clear();
break;
}
- if (!isOKResponse(it->second))
+ StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ if (tokens.count() != 2 || tokens[1] != "ok")
{
Log::debug() << "Removed expired Kit [" << it->second << "] hosts URL [" << it->first << "]." << Log::end;
- _cacheURL.erase(it++);
+ it = _cacheURL.erase(it);
continue;
}
- it++;
+ ++it;
}
-
- if (bError)
- _cacheURL.clear();
}
Process::PID searchURL(const std::string& aURL)
@@ -313,45 +309,47 @@ public:
const std::string aMessage = "search " + aURL + "\r\n";
Process::PID nPID = -1;
- for (auto& it : _childProcesses)
+ for (auto it = _childProcesses.cbegin(); it != _childProcesses.cend(); )
{
- assert(it.first > 0 && it.second > 0);
+ assert(it->first > 0 && it->second > 0);
- Log::trace("Query to kit [" + std::to_string(it.first) + "]: " + aMessage);
- ssize_t nBytes = Util::writeFIFO(it.second, aMessage);
+ Log::trace("Query to kit [" + std::to_string(it->first) + "]: " + aMessage);
+ ssize_t nBytes = Util::writeFIFO(it->second, aMessage);
if ( nBytes < 0 )
{
- Log::error("Error writting to child pipe: " + std::to_string(it.first) + ". Clearing cache.");
- requestAbnormalTermination(it.first);
- _cacheURL.clear();
- break;
+ Log::error("Error sending search message to child pipe: " + std::to_string(it->first) + ". Terminating.");
+ removeChild(it->second);
+ it = _childProcesses.cbegin();
+ continue;
}
std::string aResponse;
nBytes = getResponseLine(readerChild, aResponse);
- Log::trace("Response from kit [" + std::to_string(it.first) + "]: " + aResponse);
+ Log::trace("Response from kit [" + std::to_string(it->first) + "]: " + aResponse);
if ( nBytes < 0 )
{
- Log::error("Error reading child response: " + std::to_string(it.first) + ". Clearing cache.");
- requestAbnormalTermination(it.first);
- _cacheURL.clear();
- break;
+ Log::error("Error reading response to search message from child [" + std::to_string(it->first) + "]. Terminating.");
+ removeChild(it->second);
+ it = _childProcesses.cbegin();
+ continue;
}
StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
if (tokens[1] == "ok")
{
// Found, but find all empty instances.
- nPID = it.first;
+ nPID = it->first;
Log::debug("Kit [" + std::to_string(nPID) + "] hosts URL [" + aURL + "].");
break;
}
else if (tokens[1] == "empty")
{
// Remember the last empty.
- nPID = it.first;
+ nPID = it->first;
Log::debug("Kit [" + std::to_string(nPID) + "] is empty.");
}
+
+ ++it;
}
return nPID;
@@ -369,40 +367,40 @@ public:
Log::debug("Finding kit for URL [" + aURL + "] on thread [" + aTID + "].");
- // check cache
+ // Check the cache first.
const auto aIterURL = _cacheURL.find(aURL);
if ( aIterURL != _cacheURL.end() )
{
Log::debug("Cache found URL [" + aURL + "] hosted on child [" + std::to_string(aIterURL->second) +
"]. Creating view for thread [" + aTID + "].");
- if (createThread(aIterURL->second, aTID, aURL) < 0)
- Log::error("Cache: Error creating thread.");
-
- if (!isOKResponse(aIterURL->second))
- Log::error("Cache Failed: Creating view for thread [" + aTID + "].");
+ if (createThread(aIterURL->second, aTID, aURL))
+ return;
- return;
+ Log::error("Cache: Error creating thread [" + aTID + "] for URL [" + aURL + "]. Will search.");
+ }
+ else
+ {
+ // Not found in cache, do a full search.
+ Log::debug("URL [" + aURL + "] is not in cache. Will search.");
}
- // not found in cache, full search.
- Log::debug("URL [" + aURL + "] is not in cache");
const Process::PID nPID = searchURL(aURL);
if ( nPID > 0 )
{
- Log::debug("Creating view for URL [" + aURL + "] for thread [" +
- aTID + "] on kit [" + std::to_string(nPID) + "].");
+ Log::debug("Search found child [" + std::to_string(aIterURL->second) +
+ "] to host URL [" + aURL +
+ "]. Creating view for thread [" + aTID + "].");
if (createThread(nPID, aTID, aURL) < 0)
- Log::error("Search: Error creating thread.");
- else if (isOKResponse(nPID))
+ {
_cacheURL[aURL] = nPID;
- else
- Log::error("Failed: Creating view for thread [" + aTID + "].");
- }
- else
- {
- Log::info("No children available.");
- ++forkCounter;
+ return;
+ }
+
+ Log::error("Search: Error creating thread [" + aTID + "] for URL [" + aURL + "].");
}
+
+ Log::info("No children available. Creating more.");
+ ++forkCounter;
}
}
diff --git a/loolwsd/Util.cpp b/loolwsd/Util.cpp
index 872219a..ee2b5d1 100644
--- a/loolwsd/Util.cpp
+++ b/loolwsd/Util.cpp
@@ -396,7 +396,6 @@ namespace Util
ssize_t readMessage(int nPipe, char* pBuffer, ssize_t nSize)
{
- ssize_t nBytes = -1;
struct pollfd aPoll;
aPoll.fd = nPipe;
@@ -411,9 +410,9 @@ namespace Util
errno = ETIME;
if( (aPoll.revents & POLLIN) != 0 )
- nBytes = readFIFO(nPipe, pBuffer, nSize);
+ return readFIFO(nPipe, pBuffer, nSize);
- return nBytes;
+ return -1;
}
static
More information about the Libreoffice-commits
mailing list