[Libreoffice-commits] online.git: loolwsd/Admin.cpp loolwsd/IoUtil.cpp loolwsd/IoUtil.hpp loolwsd/LOOLBroker.cpp loolwsd/LOOLKit.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Mar 30 01:59:49 UTC 2016


 loolwsd/Admin.cpp      |   12 +---
 loolwsd/IoUtil.cpp     |  135 +++++++++++++++++++++++++++++++++----------------
 loolwsd/IoUtil.hpp     |   29 +++++++++-
 loolwsd/LOOLBroker.cpp |   11 +--
 loolwsd/LOOLKit.cpp    |    4 -
 5 files changed, 128 insertions(+), 63 deletions(-)

New commits:
commit ecce874315984f01f6baa6ebcdb2a347d291c3c4
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Mon Mar 28 14:22:18 2016 -0400

    loolwsd: new PipeReader class to poll and tokenize pipe messages
    
    Change-Id: I5676b313ca4c7e711ead04c1491fe36591a00531
    Reviewed-on: https://gerrit.libreoffice.org/23644
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/Admin.cpp b/loolwsd/Admin.cpp
index be9c93a..1bbfc22 100644
--- a/loolwsd/Admin.cpp
+++ b/loolwsd/Admin.cpp
@@ -481,12 +481,6 @@ void Admin::run()
     _cpuStatsTask = new CpuStats(this);
     _cpuStatsTimer.schedule(_cpuStatsTask, _cpuStatsTaskInterval, _cpuStatsTaskInterval);
 
-    // Start listening for data changes
-    struct pollfd pollPipeNotify;
-    pollPipeNotify.fd = NotifyPipe;
-    pollPipeNotify.events = POLLIN;
-    pollPipeNotify.revents = 0;
-
     static const std::string thread_name = "admin_thread";
 
     if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
@@ -494,8 +488,10 @@ void Admin::run()
 
     Log::info("Thread [" + thread_name + "] started.");
 
-    IoUtil::pollPipeForReading(pollPipeNotify, FIFO_NOTIFY, NotifyPipe,
-                            [this](std::string& message) { return handleInput(message); } );
+    // Start listening for data changes.
+    IoUtil::PipeReader pipeReader(FIFO_NOTIFY, NotifyPipe);
+    pipeReader.process([this](std::string& message) { handleInput(message); return true; },
+                       []() { return TerminationFlag; });
 
     _memStatsTimer.cancel();
     _cpuStatsTimer.cancel();
diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp
index 70e386c..b2bd61e 100644
--- a/loolwsd/IoUtil.cpp
+++ b/loolwsd/IoUtil.cpp
@@ -268,62 +268,113 @@ ssize_t readMessage(const int pipe, char* buffer, const ssize_t size, const size
     return -1;
 }
 
-void pollPipeForReading(pollfd& pollPipe, const std::string& targetPipeName , const int& targetPipe,
-                        std::function<void(std::string& message)> handler)
+/// Reads a single line from a pipe.
+/// Returns 0 for timeout, <0 for error, and >0 on success.
+/// On success, line will contain the read message.
+int PipeReader::readLine(std::string& line,
+                         std::function<bool()> stopPredicate,
+                         const size_t timeoutMs)
 {
-    std::string message;
-    char buffer[READ_BUFFER_SIZE];
-    char* start = buffer;
-    char* end = buffer;
-    ssize_t bytes = -1;
+    const char *endOfLine = static_cast<const char *>(std::memchr(_data.data(), '\n', _data.size()));
+    if (endOfLine != nullptr)
+    {
+        // We have a line cached, return it.
+        line += std::string(_data.data(), endOfLine);
+        _data.erase(0, endOfLine - _data.data() + 1); // Including the '\n'.
+        return 1;
+    }
 
-    while (!TerminationFlag)
+    // Poll in short intervals to check for stop condition.
+    const auto pollTimeoutMs = 500;
+    auto maxPollCount = timeoutMs / pollTimeoutMs;
+    while (maxPollCount-- > 0)
     {
-        if (start == end)
+        if (stopPredicate())
+        {
+            Log::info() << "Spot requested for pipe: " << _name << Log::end;
+            return -1;
+        }
+
+        struct pollfd pipe;
+        pipe.fd = _pipe;
+        pipe.events = POLLIN;
+        pipe.revents = 0;
+        const int ready = poll(&pipe, 1, pollTimeoutMs);
+        if (ready == 0)
+        {
+            // Timeout.
+            continue;
+        }
+        else if (ready < 0)
+        {
+            // error.
+            return ready;
+        }
+        else if (pipe.revents & (POLLIN | POLLPRI))
         {
-            if (poll(&pollPipe, 1, POLL_TIMEOUT_MS) < 0)
+            char buffer[READ_BUFFER_SIZE];
+            const auto bytes = readFIFO(_pipe, buffer, sizeof(buffer));
+            if (bytes < 0)
             {
-                Log::error("Failed to poll pipe [" + targetPipeName + "].");
-                continue;
+                return -1;
             }
-            else if (pollPipe.revents & (POLLIN | POLLPRI))
+
+            const char *endOfLine = static_cast<const char *>(std::memchr(buffer, '\n', bytes));
+            if (endOfLine != nullptr)
             {
-                bytes = readFIFO(targetPipe, buffer, sizeof(buffer));
-                if (bytes < 0)
-                {
-                    start = end = nullptr;
-                    Log::error("Error reading message from pipe [" + targetPipeName + "].");
-                    continue;
-                }
-                start = buffer;
-                end = buffer + bytes;
+                // Got end of line.
+                line = _data;
+                auto tail = std::string(static_cast<const char*>(buffer), endOfLine);
+                line += tail;
+                _data = std::string(endOfLine, bytes - tail.size() - 1); // Exclude the '\n'.
+                return 1;
             }
-            else if (pollPipe.revents & (POLLERR | POLLHUP))
+            else
             {
-                Log::error("Broken pipe [" + targetPipeName + "] with wsd.");
-                break;
+                // More data, keep going.
+                _data += std::string(buffer, bytes);
             }
         }
-
-        if (start != end)
+        else if (pipe.revents & (POLLERR | POLLHUP | POLLNVAL))
         {
-            char byteChar = *start++;
-            while (start != end && byteChar != '\r' && byteChar != '\n')
-            {
-                message += byteChar;
-                byteChar = *start++;
-            }
+            return -1;
+        }
+    }
 
-            if (byteChar == '\r' && *start == '\n')
-            {
-                start++;
-                Log::debug(targetPipeName + " recv: " + message);
-                if (message == "eof")
-                    break;
+    // Timeout.
+    return 0;
+}
 
-                handler(message);
-                message.clear();
-            }
+void PipeReader::process(std::function<bool(std::string& message)> handler,
+                         std::function<bool()> stopPredicate,
+                         const size_t pollTimeoutMs)
+{
+    bool stop = false;
+    for (;;)
+    {
+        stop = stopPredicate();
+        if (stop)
+        {
+            Log::info("Termination flagged for pipe [" + _name + "].");
+            break;
+        }
+
+        std::string line;
+        const auto ready = readLine(line, stopPredicate, pollTimeoutMs);
+        if (ready == 0)
+        {
+            // Timeout.
+            continue;
+        }
+        else if (ready < 0)
+        {
+            Log::error("Error reading from pipe [" + _name + "].");
+            continue;
+        }
+        else if (!handler(line))
+        {
+            Log::info("Pipe [" + _name + "] handler requested to finish.");
+            break;
         }
     }
 }
diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp
index 8a0f53d..c7a4e00 100644
--- a/loolwsd/IoUtil.hpp
+++ b/loolwsd/IoUtil.hpp
@@ -45,9 +45,32 @@ namespace IoUtil
     ssize_t readMessage(const int pipe, char* buffer, const ssize_t size,
                         const size_t timeoutSec = CHILD_TIMEOUT_SECS);
 
-    void pollPipeForReading(pollfd& pollPipe, const std::string& targetPipeName , const int& targetPipe,
-                            std::function<void(std::string& message)> handler);
-};
+    class PipeReader
+    {
+    public:
+        PipeReader(const std::string& name, const int pipe) :
+            _name(name),
+            _pipe(pipe)
+        {
+        }
+
+        /// Reads a single line from the pipe.
+        /// Returns 0 for timeout, <0 for error, and >0 on success.
+        /// On success, line will contain the read message.
+        int readLine(std::string& line,
+                     std::function<bool()> stopPredicate,
+                     const size_t timeoutMs);
+
+        void process(std::function<bool(std::string& message)> handler,
+                     std::function<bool()> stopPredicate,
+                     const size_t pollTimeoutMs = POLL_TIMEOUT_MS);
+
+    private:
+        const std::string _name;
+        const int _pipe;
+        std::string _data;
+    };
+}
 
 #endif
 
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index eab8777..a551365 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -373,12 +373,6 @@ public:
 
     void run() override
     {
-        struct pollfd pollPipeBroker;
-
-        pollPipeBroker.fd = readerBroker;
-        pollPipeBroker.events = POLLIN;
-        pollPipeBroker.revents = 0;
-
         static const std::string thread_name = "brk_pipe_reader";
 
         if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
@@ -386,8 +380,9 @@ public:
 
         Log::debug("Thread [" + thread_name + "] started.");
 
-        IoUtil::pollPipeForReading(pollPipeBroker, FIFO_LOOLWSD, readerBroker,
-                                 [this](std::string& message) { return handleInput(message); } );
+        IoUtil::PipeReader pipeReader(FIFO_LOOLWSD, readerBroker);
+        pipeReader.process([this](std::string& message) { handleInput(message); return true; },
+                           []() { return TerminationFlag; });
 
         Log::debug("Thread [" + thread_name + "] finished.");
     }
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 8c9ef9f..299af4d 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -943,6 +943,8 @@ void lokit_main(const std::string& childRoot,
 
         Log::info("loolkit [" + std::to_string(Process::id()) + "] is ready.");
 
+        char buffer[READ_BUFFER_SIZE];
+        std::string message;
         char* start = nullptr;
         char* end = nullptr;
 
@@ -974,7 +976,6 @@ void lokit_main(const std::string& childRoot,
                 else
                 if (pollPipeBroker.revents & (POLLIN | POLLPRI))
                 {
-                    char buffer[READ_BUFFER_SIZE];
                     const auto bytes = IoUtil::readFIFO(readerBroker, buffer, sizeof(buffer));
                     if (bytes < 0)
                     {
@@ -995,7 +996,6 @@ void lokit_main(const std::string& childRoot,
 
             if (start != end)
             {
-                std::string message;
                 char byteChar = *start++;
                 while (start != end && byteChar != '\r' && byteChar != '\n')
                 {


More information about the Libreoffice-commits mailing list