[Libreoffice-commits] online.git: loolwsd/Admin.cpp loolwsd/IoUtil.cpp loolwsd/IoUtil.hpp loolwsd/LOOLBroker.cpp loolwsd/LOOLKit.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Wed Mar 30 01:59:49 UTC 2016
loolwsd/Admin.cpp | 12 +---
loolwsd/IoUtil.cpp | 135 +++++++++++++++++++++++++++++++++----------------
loolwsd/IoUtil.hpp | 29 +++++++++-
loolwsd/LOOLBroker.cpp | 11 +--
loolwsd/LOOLKit.cpp | 4 -
5 files changed, 128 insertions(+), 63 deletions(-)
New commits:
commit ecce874315984f01f6baa6ebcdb2a347d291c3c4
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Mon Mar 28 14:22:18 2016 -0400
loolwsd: new PipeReader class to poll and tokenize pipe messages
Change-Id: I5676b313ca4c7e711ead04c1491fe36591a00531
Reviewed-on: https://gerrit.libreoffice.org/23644
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/Admin.cpp b/loolwsd/Admin.cpp
index be9c93a..1bbfc22 100644
--- a/loolwsd/Admin.cpp
+++ b/loolwsd/Admin.cpp
@@ -481,12 +481,6 @@ void Admin::run()
_cpuStatsTask = new CpuStats(this);
_cpuStatsTimer.schedule(_cpuStatsTask, _cpuStatsTaskInterval, _cpuStatsTaskInterval);
- // Start listening for data changes
- struct pollfd pollPipeNotify;
- pollPipeNotify.fd = NotifyPipe;
- pollPipeNotify.events = POLLIN;
- pollPipeNotify.revents = 0;
-
static const std::string thread_name = "admin_thread";
if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
@@ -494,8 +488,10 @@ void Admin::run()
Log::info("Thread [" + thread_name + "] started.");
- IoUtil::pollPipeForReading(pollPipeNotify, FIFO_NOTIFY, NotifyPipe,
- [this](std::string& message) { return handleInput(message); } );
+ // Start listening for data changes.
+ IoUtil::PipeReader pipeReader(FIFO_NOTIFY, NotifyPipe);
+ pipeReader.process([this](std::string& message) { handleInput(message); return true; },
+ []() { return TerminationFlag; });
_memStatsTimer.cancel();
_cpuStatsTimer.cancel();
diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp
index 70e386c..b2bd61e 100644
--- a/loolwsd/IoUtil.cpp
+++ b/loolwsd/IoUtil.cpp
@@ -268,62 +268,113 @@ ssize_t readMessage(const int pipe, char* buffer, const ssize_t size, const size
return -1;
}
-void pollPipeForReading(pollfd& pollPipe, const std::string& targetPipeName , const int& targetPipe,
- std::function<void(std::string& message)> handler)
+/// Reads a single line from a pipe.
+/// Returns 0 for timeout, <0 for error, and >0 on success.
+/// On success, line will contain the read message.
+int PipeReader::readLine(std::string& line,
+ std::function<bool()> stopPredicate,
+ const size_t timeoutMs)
{
- std::string message;
- char buffer[READ_BUFFER_SIZE];
- char* start = buffer;
- char* end = buffer;
- ssize_t bytes = -1;
+ const char *endOfLine = static_cast<const char *>(std::memchr(_data.data(), '\n', _data.size()));
+ if (endOfLine != nullptr)
+ {
+ // We have a line cached, return it.
+ line += std::string(_data.data(), endOfLine);
+ _data.erase(0, endOfLine - _data.data() + 1); // Including the '\n'.
+ return 1;
+ }
- while (!TerminationFlag)
+ // Poll in short intervals to check for stop condition.
+ const auto pollTimeoutMs = 500;
+ auto maxPollCount = timeoutMs / pollTimeoutMs;
+ while (maxPollCount-- > 0)
{
- if (start == end)
+ if (stopPredicate())
+ {
+ Log::info() << "Spot requested for pipe: " << _name << Log::end;
+ return -1;
+ }
+
+ struct pollfd pipe;
+ pipe.fd = _pipe;
+ pipe.events = POLLIN;
+ pipe.revents = 0;
+ const int ready = poll(&pipe, 1, pollTimeoutMs);
+ if (ready == 0)
+ {
+ // Timeout.
+ continue;
+ }
+ else if (ready < 0)
+ {
+ // error.
+ return ready;
+ }
+ else if (pipe.revents & (POLLIN | POLLPRI))
{
- if (poll(&pollPipe, 1, POLL_TIMEOUT_MS) < 0)
+ char buffer[READ_BUFFER_SIZE];
+ const auto bytes = readFIFO(_pipe, buffer, sizeof(buffer));
+ if (bytes < 0)
{
- Log::error("Failed to poll pipe [" + targetPipeName + "].");
- continue;
+ return -1;
}
- else if (pollPipe.revents & (POLLIN | POLLPRI))
+
+ const char *endOfLine = static_cast<const char *>(std::memchr(buffer, '\n', bytes));
+ if (endOfLine != nullptr)
{
- bytes = readFIFO(targetPipe, buffer, sizeof(buffer));
- if (bytes < 0)
- {
- start = end = nullptr;
- Log::error("Error reading message from pipe [" + targetPipeName + "].");
- continue;
- }
- start = buffer;
- end = buffer + bytes;
+ // Got end of line.
+ line = _data;
+ auto tail = std::string(static_cast<const char*>(buffer), endOfLine);
+ line += tail;
+ _data = std::string(endOfLine, bytes - tail.size() - 1); // Exclude the '\n'.
+ return 1;
}
- else if (pollPipe.revents & (POLLERR | POLLHUP))
+ else
{
- Log::error("Broken pipe [" + targetPipeName + "] with wsd.");
- break;
+ // More data, keep going.
+ _data += std::string(buffer, bytes);
}
}
-
- if (start != end)
+ else if (pipe.revents & (POLLERR | POLLHUP | POLLNVAL))
{
- char byteChar = *start++;
- while (start != end && byteChar != '\r' && byteChar != '\n')
- {
- message += byteChar;
- byteChar = *start++;
- }
+ return -1;
+ }
+ }
- if (byteChar == '\r' && *start == '\n')
- {
- start++;
- Log::debug(targetPipeName + " recv: " + message);
- if (message == "eof")
- break;
+ // Timeout.
+ return 0;
+}
- handler(message);
- message.clear();
- }
+void PipeReader::process(std::function<bool(std::string& message)> handler,
+ std::function<bool()> stopPredicate,
+ const size_t pollTimeoutMs)
+{
+ bool stop = false;
+ for (;;)
+ {
+ stop = stopPredicate();
+ if (stop)
+ {
+ Log::info("Termination flagged for pipe [" + _name + "].");
+ break;
+ }
+
+ std::string line;
+ const auto ready = readLine(line, stopPredicate, pollTimeoutMs);
+ if (ready == 0)
+ {
+ // Timeout.
+ continue;
+ }
+ else if (ready < 0)
+ {
+ Log::error("Error reading from pipe [" + _name + "].");
+ continue;
+ }
+ else if (!handler(line))
+ {
+ Log::info("Pipe [" + _name + "] handler requested to finish.");
+ break;
}
}
}
diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp
index 8a0f53d..c7a4e00 100644
--- a/loolwsd/IoUtil.hpp
+++ b/loolwsd/IoUtil.hpp
@@ -45,9 +45,32 @@ namespace IoUtil
ssize_t readMessage(const int pipe, char* buffer, const ssize_t size,
const size_t timeoutSec = CHILD_TIMEOUT_SECS);
- void pollPipeForReading(pollfd& pollPipe, const std::string& targetPipeName , const int& targetPipe,
- std::function<void(std::string& message)> handler);
-};
+ class PipeReader
+ {
+ public:
+ PipeReader(const std::string& name, const int pipe) :
+ _name(name),
+ _pipe(pipe)
+ {
+ }
+
+ /// Reads a single line from the pipe.
+ /// Returns 0 for timeout, <0 for error, and >0 on success.
+ /// On success, line will contain the read message.
+ int readLine(std::string& line,
+ std::function<bool()> stopPredicate,
+ const size_t timeoutMs);
+
+ void process(std::function<bool(std::string& message)> handler,
+ std::function<bool()> stopPredicate,
+ const size_t pollTimeoutMs = POLL_TIMEOUT_MS);
+
+ private:
+ const std::string _name;
+ const int _pipe;
+ std::string _data;
+ };
+}
#endif
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index eab8777..a551365 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -373,12 +373,6 @@ public:
void run() override
{
- struct pollfd pollPipeBroker;
-
- pollPipeBroker.fd = readerBroker;
- pollPipeBroker.events = POLLIN;
- pollPipeBroker.revents = 0;
-
static const std::string thread_name = "brk_pipe_reader";
if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
@@ -386,8 +380,9 @@ public:
Log::debug("Thread [" + thread_name + "] started.");
- IoUtil::pollPipeForReading(pollPipeBroker, FIFO_LOOLWSD, readerBroker,
- [this](std::string& message) { return handleInput(message); } );
+ IoUtil::PipeReader pipeReader(FIFO_LOOLWSD, readerBroker);
+ pipeReader.process([this](std::string& message) { handleInput(message); return true; },
+ []() { return TerminationFlag; });
Log::debug("Thread [" + thread_name + "] finished.");
}
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 8c9ef9f..299af4d 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -943,6 +943,8 @@ void lokit_main(const std::string& childRoot,
Log::info("loolkit [" + std::to_string(Process::id()) + "] is ready.");
+ char buffer[READ_BUFFER_SIZE];
+ std::string message;
char* start = nullptr;
char* end = nullptr;
@@ -974,7 +976,6 @@ void lokit_main(const std::string& childRoot,
else
if (pollPipeBroker.revents & (POLLIN | POLLPRI))
{
- char buffer[READ_BUFFER_SIZE];
const auto bytes = IoUtil::readFIFO(readerBroker, buffer, sizeof(buffer));
if (bytes < 0)
{
@@ -995,7 +996,6 @@ void lokit_main(const std::string& childRoot,
if (start != end)
{
- std::string message;
char byteChar = *start++;
while (start != end && byteChar != '\r' && byteChar != '\n')
{
More information about the Libreoffice-commits
mailing list