[Libreoffice-commits] online.git: loolwsd/IoUtil.cpp loolwsd/IoUtil.hpp loolwsd/LOOLBroker.cpp loolwsd/LOOLKit.cpp loolwsd/LOOLWSD.cpp loolwsd/MasterProcessSession.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Mar 30 02:03:11 UTC 2016


 loolwsd/IoUtil.cpp               |   19 ++++++----
 loolwsd/IoUtil.hpp               |    2 -
 loolwsd/LOOLBroker.cpp           |   73 +++++----------------------------------
 loolwsd/LOOLKit.cpp              |   18 ++++-----
 loolwsd/LOOLWSD.cpp              |   12 +++---
 loolwsd/MasterProcessSession.cpp |    8 +++-
 6 files changed, 46 insertions(+), 86 deletions(-)

New commits:
commit fe69c4d5b647d0191748956ffc2c5255b914b858
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Mon Mar 28 16:07:02 2016 -0400

    loolwsd: pipe plumbing cleanup
    
    Change-Id: I5519235a4601e1e38cedc3f06ffe9386434a292d
    Reviewed-on: https://gerrit.libreoffice.org/23645
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp
index b2bd61e..c7cc54c 100644
--- a/loolwsd/IoUtil.cpp
+++ b/loolwsd/IoUtil.cpp
@@ -207,14 +207,14 @@ void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws)
 
 ssize_t writeFIFO(int pipe, const char* buffer, ssize_t size)
 {
-    ssize_t bytes = -1;
     ssize_t count = 0;
-
     while(true)
     {
-        bytes = write(pipe, buffer + count, size - count);
+        Log::trace("Writing to pipe. Data: [" + std::string(buffer, size) + "].");
+        const auto bytes = write(pipe, buffer + count, size - count);
         if (bytes < 0)
         {
+            Log::error("Failed to write to pipe. Retrying. Data: [" + std::string(buffer, size) + "].");
             if (errno == EINTR || errno == EAGAIN)
                 continue;
 
@@ -281,6 +281,8 @@ int PipeReader::readLine(std::string& line,
         // We have a line cached, return it.
         line += std::string(_data.data(), endOfLine);
         _data.erase(0, endOfLine - _data.data() + 1); // Including the '\n'.
+        Log::trace() << "Read existing line from pipe: " << _name << ", line: ["
+                     << line << "], data: [" << _data << "]." << Log::end;
         return 1;
     }
 
@@ -291,7 +293,7 @@ int PipeReader::readLine(std::string& line,
     {
         if (stopPredicate())
         {
-            Log::info() << "Spot requested for pipe: " << _name << Log::end;
+            Log::info() << "Stop requested for pipe: " << _name << '.' << Log::end;
             return -1;
         }
 
@@ -300,6 +302,7 @@ int PipeReader::readLine(std::string& line,
         pipe.events = POLLIN;
         pipe.revents = 0;
         const int ready = poll(&pipe, 1, pollTimeoutMs);
+        Log::trace() << "Poll for pipe: " << _name << " returned: " << ready << Log::end;
         if (ready == 0)
         {
             // Timeout.
@@ -314,6 +317,7 @@ int PipeReader::readLine(std::string& line,
         {
             char buffer[READ_BUFFER_SIZE];
             const auto bytes = readFIFO(_pipe, buffer, sizeof(buffer));
+            Log::trace() << "readFIFO for pipe: " << _name << " returned: " << bytes << Log::end;
             if (bytes < 0)
             {
                 return -1;
@@ -324,15 +328,18 @@ int PipeReader::readLine(std::string& line,
             {
                 // Got end of line.
                 line = _data;
-                auto tail = std::string(static_cast<const char*>(buffer), endOfLine);
+                const auto tail = std::string(static_cast<const char*>(buffer), endOfLine);
                 line += tail;
-                _data = std::string(endOfLine, bytes - tail.size() - 1); // Exclude the '\n'.
+                _data = std::string(endOfLine + 1, bytes - tail.size() - 1); // Exclude the '\n'.
+                Log::trace() << "Read line from pipe: " << _name << ", line: [" << line
+                            << "], data: [" << _data << "]." << Log::end;
                 return 1;
             }
             else
             {
                 // More data, keep going.
                 _data += std::string(buffer, bytes);
+                Log::trace() << "data appended to pipe: " << _name << ", data: " << _data << Log::end;
             }
         }
         else if (pipe.revents & (POLLERR | POLLHUP | POLLNVAL))
diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp
index c7a4e00..016a808 100644
--- a/loolwsd/IoUtil.hpp
+++ b/loolwsd/IoUtil.hpp
@@ -59,7 +59,7 @@ namespace IoUtil
         /// On success, line will contain the read message.
         int readLine(std::string& line,
                      std::function<bool()> stopPredicate,
-                     const size_t timeoutMs);
+                     const size_t timeoutMs = POLL_TIMEOUT_MS);
 
         void process(std::function<bool(std::string& message)> handler,
                      std::function<bool()> stopPredicate,
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index a551365..fdb43a3 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -184,65 +184,14 @@ namespace
 class PipeRunnable: public Runnable
 {
 public:
-    PipeRunnable()
-      : _start(nullptr),
-        _end(nullptr)
+    PipeRunnable() :
+        _childPipeReader("child_pipe_rd", readerChild)
     {
     }
 
-    ssize_t getResponseLine(const int pipeReader, std::string& response)
-    {
-        ssize_t bytes = -1;
-        response.clear();
-
-        try
-        {
-            for (;;)
-            {
-                if (_start == _end)
-                {
-                    bytes = IoUtil::readMessage(pipeReader, _buffer, sizeof(_buffer));
-                    if (bytes < 0)
-                    {
-                        _start = _end = nullptr;
-                        break;
-                    }
-
-                    _start = _buffer;
-                    _end = _buffer + bytes;
-                }
-
-                if (_start != _end)
-                {
-                    char byteChar = *_start++;
-                    while (_start != _end && byteChar != '\r' && byteChar != '\n')
-                    {
-                        response += byteChar;
-                        byteChar = *_start++;
-                    }
-
-                    if (byteChar == '\r' && *_start == '\n')
-                    {
-                        ++_start;
-                        break;
-                    }
-                }
-            }
-        }
-        catch (const std::exception& exc)
-        {
-            Log::error() << "Exception while reading from pipe ["
-                         << pipeReader << "]: " << exc.what() << Log::end;
-            return -1;
-        }
-
-        Log::debug("Recv child response: [" + response + "].");
-        return bytes;
-    }
-
     bool createSession(const Process::PID pid, const std::string& session, const std::string& url)
     {
-        const std::string message = "session " + session + " " + url + "\r\n";
+        const std::string message = "session " + session + " " + url + "\n";
         if (IoUtil::writeFIFO(getChildPipe(pid), message) < 0)
         {
             Log::error("Error sending session message to child [" + std::to_string(pid) + "].");
@@ -250,7 +199,7 @@ public:
         }
 
         std::string response;
-        if (getResponseLine(readerChild, response) < 0)
+        if (_childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0)
         {
             Log::error("Error reading response to session message from child [" + std::to_string(pid) + "].");
             return false;
@@ -268,10 +217,10 @@ public:
         size_t empty_count = 0;
         for (auto it = _childProcesses.begin(); it != _childProcesses.end(); )
         {
-            const auto message = "query url \r\n";
+            const auto message = "query url\n";
             std::string response;
             if (IoUtil::writeFIFO(it->second->getWritePipe(), message) < 0 ||
-                getResponseLine(readerChild, response) < 0)
+                _childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0)
             {
                 auto log = Log::error();
                 log << "Error querying child [" << std::to_string(it->second->getPid()) << "].";
@@ -288,7 +237,7 @@ public:
             }
 
             StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-            if (tokens.count() == 2 && tokens[0] == std::to_string(it->second->getPid()))
+            if (tokens.count() >= 2 && tokens[0] == std::to_string(it->second->getPid()))
             {
                 Log::debug("Child [" + std::to_string(it->second->getPid()) + "] hosts [" + tokens[1] + "].");
                 if (tokens[1] == "empty")
@@ -304,7 +253,7 @@ public:
             else
             {
                 Log::error("Unexpected response from child [" + std::to_string(it->second->getPid()) +
-                           "] to query: [" + tokens[1] + "].");
+                           "] to url query: [" + response + "].");
             }
 
             ++it;
@@ -317,6 +266,8 @@ public:
 
     void handleInput(const std::string& message)
     {
+        Log::info("Broker command: [" + message + "].");
+
         StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
 
         std::lock_guard<std::mutex> lock(forkMutex);
@@ -388,9 +339,7 @@ public:
     }
 
 private:
-    char* _start;
-    char* _end;
-    char  _buffer[READ_BUFFER_SIZE];
+    IoUtil::PipeReader _childPipeReader;
 };
 
 /// Initializes LibreOfficeKit for cross-fork re-use.
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 299af4d..2bce3e8 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -997,15 +997,14 @@ void lokit_main(const std::string& childRoot,
             if (start != end)
             {
                 char byteChar = *start++;
-                while (start != end && byteChar != '\r' && byteChar != '\n')
+                while (start != end && byteChar != '\n')
                 {
                     message += byteChar;
                     byteChar = *start++;
                 }
 
-                if (byteChar == '\r' && *start == '\n')
+                if (byteChar == '\n')
                 {
-                    start++;
                     Log::trace("Recv: " + message);
                     StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
                     auto response = std::to_string(Process::id()) + " ";
@@ -1013,7 +1012,7 @@ void lokit_main(const std::string& childRoot,
                     if (TerminationFlag)
                     {
                         // Too late, we're going down.
-                        response += "down \r\n";
+                        response += "down\n";
                     }
                     else if (tokens[0] == "session")
                     {
@@ -1034,29 +1033,29 @@ void lokit_main(const std::string& childRoot,
                         if (url == document->getUrl() &&
                             document->createSession(sessionId, intSessionId))
                         {
-                            response += "ok \r\n";
+                            response += "ok\n";
                         }
                         else
                         {
-                            response += "bad \r\n";
+                            response += "bad\n";
                         }
                     }
                     else if (document && document->canDiscard())
                     {
                         TerminationFlag = true;
-                        response += "down \r\n";
+                        response += "down\n";
                     }
                     else if (tokens[0] == "query" && tokens.count() > 1)
                     {
                         if (tokens[1] == "url")
                         {
                             response += (document ? document->getUrl() : "empty");
-                            response += " \r\n";
+                            response += "\n";
                         }
                     }
                     else
                     {
-                        response += "bad unknown token [" + tokens[0] + "] \r\n";
+                        response += "bad unknown token [" + tokens[0] + "]\n";
                     }
 
                     IoUtil::writeFIFO(writerBroker, response);
@@ -1064,7 +1063,6 @@ void lokit_main(const std::string& childRoot,
                     // Don't log the CR LF at end
                     assert(response.length() > 2);
                     assert(response[response.length()-1] == '\n');
-                    assert(response[response.length()-2] == '\r');
                     Log::trace("KitToBroker: " + response.substr(0, response.length()-2));
                     message.clear();
                 }
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 650b1e1..db673cc 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -272,6 +272,7 @@ private:
                     session->handleInput(saveas.data(), saveas.size());
 
                     // Send it back to the client.
+                    //TODO: Should have timeout to avoid waiting forever.
                     Poco::URI resultURL(session->getSaveAs());
                     if (!resultURL.getPath().empty())
                     {
@@ -459,8 +460,8 @@ private:
         sessionsLock.unlock();
 
         // Request a kit process for this doc.
-        const std::string aMessage = "request " + id + " " + docKey + "\r\n";
-        Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 2));
+        const std::string aMessage = "request " + id + " " + docKey + "\n";
+        Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1));
         IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage);
 
         // For ToClient sessions, we store incoming messages in a queue and have a separate
@@ -495,8 +496,9 @@ private:
 
         if (docBroker->getSessionsCount() == 1 && !normalShutdown)
         {
-            //TODO: This really should move to the kit, where it
-            // knows if a doc is unsaved, and if other views are open.
+            //TODO: This isn't this simple. We need to wait for the notification
+            // of save so Storage can persist the save (if necessary).
+            // In addition, we shouldn't issue save when opening of the doc fails.
             Log::info("Non-deliberate shutdown of the last session, saving the document before tearing down.");
             queue.put("uno .uno:Save");
         }
@@ -1360,7 +1362,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
     threadPool.joinAll();
 
     // Terminate child processes
-    IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\r\n");
+    IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\n");
     Log::info("Requesting child process " + std::to_string(brokerPid) + " to terminate");
     Util::requestTermination(brokerPid);
 
diff --git a/loolwsd/MasterProcessSession.cpp b/loolwsd/MasterProcessSession.cpp
index f274a0e..90a82af 100644
--- a/loolwsd/MasterProcessSession.cpp
+++ b/loolwsd/MasterProcessSession.cpp
@@ -441,6 +441,7 @@ bool MasterProcessSession::loadDocument(const char* /*buffer*/, int /*length*/,
 
         // Finally, wait for the Child to connect to Master,
         // link the document in jail and dispatch load to child.
+        Log::trace("Dispatching child to handle [load].");
         dispatchChild();
 
         return true;
@@ -463,7 +464,10 @@ bool MasterProcessSession::getStatus(const char *buffer, int length)
     }
 
     if (_peer.expired())
+    {
+        Log::trace("Dispatching child to handle [getStatus].");
         dispatchChild();
+    }
     forwardToPeer(buffer, length);
     return true;
 }
@@ -757,8 +761,8 @@ void MasterProcessSession::dispatchChild()
         {
             Log::info() << "Retrying child permission... " << retries << Log::end;
             // request again new URL session
-            const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + "\r\n";
-            Log::trace("MasterToBroker: " + message.substr(0, message.length()-2));
+            const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + '\n';
+            Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
             IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, message);
         }
     }


More information about the Libreoffice-commits mailing list