[Libreoffice-commits] online.git: Branch 'private/hcvcastro/forking' - loolwsd/LOOLBroker.cpp

Henry Castro hcastro at collabora.com
Sun Sep 27 15:04:34 PDT 2015


 loolwsd/LOOLBroker.cpp |  278 ++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 274 insertions(+), 4 deletions(-)

New commits:
commit 85de9ac86f0ca858439ad14492f1df1ec1e26a4e
Author: Henry Castro <hcastro at collabora.com>
Date:   Sun Sep 27 18:04:12 2015 -0400

    loolwsd: add pipe thread handler

diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 9097595..4737cc4 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -21,6 +21,7 @@
 #include <cassert>
 #include <iostream>
 #include <fstream>
+#include <deque>
 
 #include <Poco/Types.h>
 #include <Poco/Random.h>
@@ -64,12 +65,17 @@ const std::string BROKER_PREFIX = "/tmp/lokit";
 
 static int readerChild = -1;
 static int readerBroker = -1;
+static int timeoutCounter = 0;
 
 static unsigned int forkCounter = 0;
 static unsigned int childCounter = 0;
 
 static std::mutex forkMutex;
-static std::map<Poco::Process::PID, int> _childProcesses;
+static std::deque<Process::PID> _emptyURL;
+static std::map<Process::PID, int> _childProcesses;
+static std::map<std::string, Process::PID> _cacheURL;
+
+Poco::NamedMutex _namedMutexLOOL("loolwsd");
 
 namespace
 {
@@ -232,6 +238,266 @@ namespace
     }
 }
 
+class PipeRunnable: public Runnable
+{
+public:
+    PipeRunnable()
+    {
+        _pStart = _pEnd = NULL;
+    }
+
+    ssize_t getResponseLine(int nPipeReader, std::string& aLine)
+    {
+        ssize_t nBytes = -1;
+        aLine.clear();
+
+        while (true)
+        {
+            if ( _pStart == _pEnd )
+            {
+                nBytes = Util::readMessage(nPipeReader, aBuffer, sizeof(aBuffer));
+                if ( nBytes < 0 )
+                {
+                    _pStart = _pEnd = NULL;
+                    break;
+                }
+
+                _pStart = aBuffer;
+                _pEnd   = aBuffer + nBytes;
+            }
+
+            if ( _pStart != _pEnd )
+            {
+                char aChar = *_pStart++;
+                while (_pStart != _pEnd && aChar != '\r' && aChar != '\n')
+                {
+                    aLine += aChar;
+                    aChar  = *_pStart++;
+                }
+
+                if ( aChar == '\r' && *_pStart == '\n')
+                {
+                    _pStart++;
+                    break;
+                }
+            }
+        }
+
+        return nBytes;
+    }
+
+    ssize_t sendMessage(int nPipeWriter, const std::string& aMessage)
+    {
+        ssize_t nBytes = -1;
+
+        nBytes = Util::writeFIFO(nPipeWriter, aMessage.c_str(), aMessage.length());
+        if ( nBytes < 0 )
+            std::cout << Util::logPrefix() << "Error writting child: " << strerror(errno) << std::endl;
+
+        return nBytes;
+    }
+
+    ssize_t createThread(Process::PID nPID, const std::string& aTID)
+    {
+        std::string aResponse;
+        std::string aMessage = "thread " + aTID + "\r\n";
+        return sendMessage(_childProcesses[nPID], aMessage);
+    }
+
+    ssize_t updateURL(Process::PID nPID, const std::string& aURL)
+    {
+        std::string aResponse;
+        std::string aMessage = "url " + aURL + "\r\n";
+        return sendMessage(_childProcesses[nPID], aMessage);
+    }
+
+    Process::PID searchURL(const std::string& aURL)
+    {
+        ssize_t nBytes = -1;
+        Process::PID nPID = 0;
+        std::string aResponse;
+        std::string aMessage = "search " + aURL + "\r\n";
+
+        auto aIterator = _childProcesses.begin();
+        for ( ; aIterator!=_childProcesses.end(); ++aIterator)
+        {
+            if ( !(aIterator->first > 0 && aIterator->second > 0) )
+            {
+                //std::cout << Util::logPrefix() << "error iterator " << aIterator->second << " " << aMessage << std::endl;
+                continue;
+            }
+
+            nBytes = Util::writeFIFO(aIterator->second, aMessage.c_str(), aMessage.length());
+            if ( nBytes < 0 )
+            {
+                std::cout << Util::logPrefix() << "Error writting child: " << aIterator->first << " " << strerror(errno) << std::endl;
+                break;
+            }
+
+            nBytes = getResponseLine(readerChild, aResponse);
+            if ( nBytes < 0 )
+            {
+                std::cout << Util::logPrefix() << "Error reading child: " << aIterator->first << " " << strerror(errno) << std::endl;
+                break;
+            }
+
+            //std::cout << Util::logPrefix() << "response: " << aResponse << std::endl;
+            StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+            if (tokens[1] == "ok")
+            {
+                nPID = aIterator->first;
+            }
+            else if (tokens[1] == "empty")
+            {
+                _emptyURL.push_back(aIterator->first);
+            }
+        }
+
+        if ( aIterator != _childProcesses.end() )
+        {
+            _cacheURL.clear();
+            _emptyURL.clear();
+        }
+
+        return (nBytes > 0 ? nPID : -1);
+    }
+
+    void handleInput(const std::string& aMessage)
+    {
+        Process::PID nPID;
+
+        //std::cout << Util::logPrefix() << "Broker,Input," <<  aMessage << std::endl;
+        StringTokenizer tokens(aMessage, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+        if (tokens[0] == "request" && tokens.count() == 3)
+        {
+            std::string aTID = tokens[1];
+            std::string aURL = tokens[2];
+
+            // check cache
+            auto aIterURL = _cacheURL.find(aURL);
+            if ( aIterURL != _cacheURL.end() )
+            {
+                std::cout << Util::logPrefix() << "Cache Found: " << aIterURL->first << std::endl;
+                if (createThread(aIterURL->second, aTID) < 0)
+                    std::cout << Util::logPrefix() << "Cache: Error creating thread: " << strerror(errno) << std::endl;
+
+                return;
+            }
+
+            // not found in cache, full search.
+            nPID = searchURL(aURL);
+            if ( nPID < 0)
+                return;
+
+            if ( nPID > 0 )
+            {
+                std::cout << Util::logPrefix() << "Search Found: " << nPID << std::endl;
+                if (createThread(nPID, aTID) < 0)
+                    std::cout << Util::logPrefix() << "Search: Error creating thread: " << strerror(errno) << std::endl;
+                else
+                    _cacheURL[aURL] = nPID;
+
+                return;
+            }
+
+            // not found, new URL session.
+            if ( _emptyURL.size() > 0 )
+            {
+                auto aItem = _emptyURL.front();
+                std::cout << Util::logPrefix() << "Not Found: " << aItem << std::endl;
+                if (updateURL(aItem, aURL) < 0)
+                {
+                    std::cout << Util::logPrefix() << "New: Error update URL: " << strerror(errno) << std::endl;
+                    return;
+                }
+
+                if (createThread(aItem, aTID) < 0)
+                {
+                    std::cout << Util::logPrefix() << "New: Error creating thread: " << strerror(errno) << std::endl;
+                    return;
+                }
+                _emptyURL.pop_front();
+                _cacheURL[aURL] = aItem;
+            }
+
+            if (_emptyURL.size() == 0 )
+            {
+                std::cout << Util::logPrefix() << "No available childs, fork new one" << std::endl;
+                forkCounter++;
+            }
+        }
+    }
+
+    void run() override
+    {
+        std::string aMessage;
+        char  aBuffer[1024*2];
+        char* pStart;
+        char* pEnd;
+
+        struct pollfd aPoll;
+        ssize_t nBytes = -1;
+
+        aPoll.fd = readerBroker;
+        aPoll.events = POLLIN;
+        aPoll.revents = 0;
+
+        pStart = aBuffer;
+        pEnd   = aBuffer;
+
+#ifdef __linux
+        if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>("pipe_reader"), 0, 0, 0) != 0)
+            std::cout << Util::logPrefix() << "Cannot set thread name :" << strerror(errno) << std::endl;
+#endif
+
+        while (true)
+        {
+            if ( pStart == pEnd )
+            {
+                (void)poll(&aPoll,1,-1);
+
+                if( (aPoll.revents & POLLIN) != 0 )
+                {
+                    nBytes = Util::readFIFO(readerBroker, aBuffer, sizeof(aBuffer));
+                    if (nBytes < 0)
+                    {
+                        pStart = pEnd = NULL;
+                        std::cout << Util::logPrefix() << "Error reading message :" << strerror(errno) << std::endl;
+                        continue;
+                    }
+                    pStart = aBuffer;
+                    pEnd   = aBuffer + nBytes;
+                }
+            }
+
+            if ( pStart != pEnd )
+            {
+                char aChar = *pStart++;
+                while (pStart != pEnd && aChar != '\r' && aChar != '\n')
+                {
+                    aMessage += aChar;
+                    aChar = *pStart++;
+                }
+
+                if ( aChar == '\r' && *pStart == '\n')
+                {
+                    pStart++;
+
+                    forkMutex.lock();
+                    handleInput(aMessage);
+                    aMessage.clear();
+                    forkMutex.unlock();
+                }
+            }
+        }
+    }
+
+private:
+    char* _pStart;
+    char* _pEnd;
+    char  aBuffer[1024];
+};
+
 /// Initializes LibreOfficeKit for cross-fork re-use.
 static bool globalPreinit(const std::string &loSubPath)
 {
@@ -326,9 +592,6 @@ static int startupLibreOfficeKit(bool sharePages, int nLOKits,
    return pId;
 }
 
-static int timeoutCounter = 0;
-Poco::NamedMutex _namedMutexLOOL("loolwsd");
-
 // Broker process
 int main(int argc, char** argv)
 {
@@ -498,6 +761,13 @@ int main(int argc, char** argv)
         exit(-1);
     }
 
+    PipeRunnable pipeHandler;
+    Poco::Thread aPipe;
+
+    aPipe.start(pipeHandler);
+
+    std::cout << Util::logPrefix() << "loolwsd ready!" << std::endl;
+
     while (_childProcesses.size() > 0)
     {
         int status;


More information about the Libreoffice-commits mailing list