[Libreoffice-commits] online.git: Branch 'private/hcvcastro/forking' - loolwsd/LOOLBroker.cpp
Henry Castro
hcastro at collabora.com
Sun Sep 27 15:04:34 PDT 2015
loolwsd/LOOLBroker.cpp | 278 ++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 274 insertions(+), 4 deletions(-)
New commits:
commit 85de9ac86f0ca858439ad14492f1df1ec1e26a4e
Author: Henry Castro <hcastro at collabora.com>
Date: Sun Sep 27 18:04:12 2015 -0400
loolwsd: add pipe thread handler
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 9097595..4737cc4 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -21,6 +21,7 @@
#include <cassert>
#include <iostream>
#include <fstream>
+#include <deque>
#include <Poco/Types.h>
#include <Poco/Random.h>
@@ -64,12 +65,17 @@ const std::string BROKER_PREFIX = "/tmp/lokit";
static int readerChild = -1;
static int readerBroker = -1;
+static int timeoutCounter = 0;
static unsigned int forkCounter = 0;
static unsigned int childCounter = 0;
static std::mutex forkMutex;
-static std::map<Poco::Process::PID, int> _childProcesses;
+static std::deque<Process::PID> _emptyURL;
+static std::map<Process::PID, int> _childProcesses;
+static std::map<std::string, Process::PID> _cacheURL;
+
+Poco::NamedMutex _namedMutexLOOL("loolwsd");
namespace
{
@@ -232,6 +238,266 @@ namespace
}
}
+class PipeRunnable: public Runnable
+{
+public:
+ PipeRunnable()
+ {
+ _pStart = _pEnd = NULL;
+ }
+
+ ssize_t getResponseLine(int nPipeReader, std::string& aLine)
+ {
+ ssize_t nBytes = -1;
+ aLine.clear();
+
+ while (true)
+ {
+ if ( _pStart == _pEnd )
+ {
+ nBytes = Util::readMessage(nPipeReader, aBuffer, sizeof(aBuffer));
+ if ( nBytes < 0 )
+ {
+ _pStart = _pEnd = NULL;
+ break;
+ }
+
+ _pStart = aBuffer;
+ _pEnd = aBuffer + nBytes;
+ }
+
+ if ( _pStart != _pEnd )
+ {
+ char aChar = *_pStart++;
+ while (_pStart != _pEnd && aChar != '\r' && aChar != '\n')
+ {
+ aLine += aChar;
+ aChar = *_pStart++;
+ }
+
+ if ( aChar == '\r' && *_pStart == '\n')
+ {
+ _pStart++;
+ break;
+ }
+ }
+ }
+
+ return nBytes;
+ }
+
+ ssize_t sendMessage(int nPipeWriter, const std::string& aMessage)
+ {
+ ssize_t nBytes = -1;
+
+ nBytes = Util::writeFIFO(nPipeWriter, aMessage.c_str(), aMessage.length());
+ if ( nBytes < 0 )
+ std::cout << Util::logPrefix() << "Error writting child: " << strerror(errno) << std::endl;
+
+ return nBytes;
+ }
+
+ ssize_t createThread(Process::PID nPID, const std::string& aTID)
+ {
+ std::string aResponse;
+ std::string aMessage = "thread " + aTID + "\r\n";
+ return sendMessage(_childProcesses[nPID], aMessage);
+ }
+
+ ssize_t updateURL(Process::PID nPID, const std::string& aURL)
+ {
+ std::string aResponse;
+ std::string aMessage = "url " + aURL + "\r\n";
+ return sendMessage(_childProcesses[nPID], aMessage);
+ }
+
+ Process::PID searchURL(const std::string& aURL)
+ {
+ ssize_t nBytes = -1;
+ Process::PID nPID = 0;
+ std::string aResponse;
+ std::string aMessage = "search " + aURL + "\r\n";
+
+ auto aIterator = _childProcesses.begin();
+ for ( ; aIterator!=_childProcesses.end(); ++aIterator)
+ {
+ if ( !(aIterator->first > 0 && aIterator->second > 0) )
+ {
+ //std::cout << Util::logPrefix() << "error iterator " << aIterator->second << " " << aMessage << std::endl;
+ continue;
+ }
+
+ nBytes = Util::writeFIFO(aIterator->second, aMessage.c_str(), aMessage.length());
+ if ( nBytes < 0 )
+ {
+ std::cout << Util::logPrefix() << "Error writting child: " << aIterator->first << " " << strerror(errno) << std::endl;
+ break;
+ }
+
+ nBytes = getResponseLine(readerChild, aResponse);
+ if ( nBytes < 0 )
+ {
+ std::cout << Util::logPrefix() << "Error reading child: " << aIterator->first << " " << strerror(errno) << std::endl;
+ break;
+ }
+
+ //std::cout << Util::logPrefix() << "response: " << aResponse << std::endl;
+ StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ if (tokens[1] == "ok")
+ {
+ nPID = aIterator->first;
+ }
+ else if (tokens[1] == "empty")
+ {
+ _emptyURL.push_back(aIterator->first);
+ }
+ }
+
+ if ( aIterator != _childProcesses.end() )
+ {
+ _cacheURL.clear();
+ _emptyURL.clear();
+ }
+
+ return (nBytes > 0 ? nPID : -1);
+ }
+
+ void handleInput(const std::string& aMessage)
+ {
+ Process::PID nPID;
+
+ //std::cout << Util::logPrefix() << "Broker,Input," << aMessage << std::endl;
+ StringTokenizer tokens(aMessage, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ if (tokens[0] == "request" && tokens.count() == 3)
+ {
+ std::string aTID = tokens[1];
+ std::string aURL = tokens[2];
+
+ // check cache
+ auto aIterURL = _cacheURL.find(aURL);
+ if ( aIterURL != _cacheURL.end() )
+ {
+ std::cout << Util::logPrefix() << "Cache Found: " << aIterURL->first << std::endl;
+ if (createThread(aIterURL->second, aTID) < 0)
+ std::cout << Util::logPrefix() << "Cache: Error creating thread: " << strerror(errno) << std::endl;
+
+ return;
+ }
+
+ // not found in cache, full search.
+ nPID = searchURL(aURL);
+ if ( nPID < 0)
+ return;
+
+ if ( nPID > 0 )
+ {
+ std::cout << Util::logPrefix() << "Search Found: " << nPID << std::endl;
+ if (createThread(nPID, aTID) < 0)
+ std::cout << Util::logPrefix() << "Search: Error creating thread: " << strerror(errno) << std::endl;
+ else
+ _cacheURL[aURL] = nPID;
+
+ return;
+ }
+
+ // not found, new URL session.
+ if ( _emptyURL.size() > 0 )
+ {
+ auto aItem = _emptyURL.front();
+ std::cout << Util::logPrefix() << "Not Found: " << aItem << std::endl;
+ if (updateURL(aItem, aURL) < 0)
+ {
+ std::cout << Util::logPrefix() << "New: Error update URL: " << strerror(errno) << std::endl;
+ return;
+ }
+
+ if (createThread(aItem, aTID) < 0)
+ {
+ std::cout << Util::logPrefix() << "New: Error creating thread: " << strerror(errno) << std::endl;
+ return;
+ }
+ _emptyURL.pop_front();
+ _cacheURL[aURL] = aItem;
+ }
+
+ if (_emptyURL.size() == 0 )
+ {
+ std::cout << Util::logPrefix() << "No available childs, fork new one" << std::endl;
+ forkCounter++;
+ }
+ }
+ }
+
+ void run() override
+ {
+ std::string aMessage;
+ char aBuffer[1024*2];
+ char* pStart;
+ char* pEnd;
+
+ struct pollfd aPoll;
+ ssize_t nBytes = -1;
+
+ aPoll.fd = readerBroker;
+ aPoll.events = POLLIN;
+ aPoll.revents = 0;
+
+ pStart = aBuffer;
+ pEnd = aBuffer;
+
+#ifdef __linux
+ if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>("pipe_reader"), 0, 0, 0) != 0)
+ std::cout << Util::logPrefix() << "Cannot set thread name :" << strerror(errno) << std::endl;
+#endif
+
+ while (true)
+ {
+ if ( pStart == pEnd )
+ {
+ (void)poll(&aPoll,1,-1);
+
+ if( (aPoll.revents & POLLIN) != 0 )
+ {
+ nBytes = Util::readFIFO(readerBroker, aBuffer, sizeof(aBuffer));
+ if (nBytes < 0)
+ {
+ pStart = pEnd = NULL;
+ std::cout << Util::logPrefix() << "Error reading message :" << strerror(errno) << std::endl;
+ continue;
+ }
+ pStart = aBuffer;
+ pEnd = aBuffer + nBytes;
+ }
+ }
+
+ if ( pStart != pEnd )
+ {
+ char aChar = *pStart++;
+ while (pStart != pEnd && aChar != '\r' && aChar != '\n')
+ {
+ aMessage += aChar;
+ aChar = *pStart++;
+ }
+
+ if ( aChar == '\r' && *pStart == '\n')
+ {
+ pStart++;
+
+ forkMutex.lock();
+ handleInput(aMessage);
+ aMessage.clear();
+ forkMutex.unlock();
+ }
+ }
+ }
+ }
+
+private:
+ char* _pStart;
+ char* _pEnd;
+ char aBuffer[1024];
+};
+
/// Initializes LibreOfficeKit for cross-fork re-use.
static bool globalPreinit(const std::string &loSubPath)
{
@@ -326,9 +592,6 @@ static int startupLibreOfficeKit(bool sharePages, int nLOKits,
return pId;
}
-static int timeoutCounter = 0;
-Poco::NamedMutex _namedMutexLOOL("loolwsd");
-
// Broker process
int main(int argc, char** argv)
{
@@ -498,6 +761,13 @@ int main(int argc, char** argv)
exit(-1);
}
+ PipeRunnable pipeHandler;
+ Poco::Thread aPipe;
+
+ aPipe.start(pipeHandler);
+
+ std::cout << Util::logPrefix() << "loolwsd ready!" << std::endl;
+
while (_childProcesses.size() > 0)
{
int status;
More information about the Libreoffice-commits
mailing list