[Libreoffice-commits] online.git: loolwsd/IoUtil.cpp loolwsd/IoUtil.hpp loolwsd/LOOLBroker.cpp loolwsd/LOOLKit.cpp loolwsd/LOOLWSD.cpp loolwsd/MasterProcessSession.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Wed Mar 30 02:03:11 UTC 2016
loolwsd/IoUtil.cpp | 19 ++++++----
loolwsd/IoUtil.hpp | 2 -
loolwsd/LOOLBroker.cpp | 73 +++++----------------------------------
loolwsd/LOOLKit.cpp | 18 ++++-----
loolwsd/LOOLWSD.cpp | 12 +++---
loolwsd/MasterProcessSession.cpp | 8 +++-
6 files changed, 46 insertions(+), 86 deletions(-)
New commits:
commit fe69c4d5b647d0191748956ffc2c5255b914b858
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Mon Mar 28 16:07:02 2016 -0400
loolwsd: pipe plumbing cleanup
Change-Id: I5519235a4601e1e38cedc3f06ffe9386434a292d
Reviewed-on: https://gerrit.libreoffice.org/23645
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp
index b2bd61e..c7cc54c 100644
--- a/loolwsd/IoUtil.cpp
+++ b/loolwsd/IoUtil.cpp
@@ -207,14 +207,14 @@ void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws)
ssize_t writeFIFO(int pipe, const char* buffer, ssize_t size)
{
- ssize_t bytes = -1;
ssize_t count = 0;
-
while(true)
{
- bytes = write(pipe, buffer + count, size - count);
+ Log::trace("Writing to pipe. Data: [" + std::string(buffer, size) + "].");
+ const auto bytes = write(pipe, buffer + count, size - count);
if (bytes < 0)
{
+ Log::error("Failed to write to pipe. Retrying. Data: [" + std::string(buffer, size) + "].");
if (errno == EINTR || errno == EAGAIN)
continue;
@@ -281,6 +281,8 @@ int PipeReader::readLine(std::string& line,
// We have a line cached, return it.
line += std::string(_data.data(), endOfLine);
_data.erase(0, endOfLine - _data.data() + 1); // Including the '\n'.
+ Log::trace() << "Read existing line from pipe: " << _name << ", line: ["
+ << line << "], data: [" << _data << "]." << Log::end;
return 1;
}
@@ -291,7 +293,7 @@ int PipeReader::readLine(std::string& line,
{
if (stopPredicate())
{
- Log::info() << "Spot requested for pipe: " << _name << Log::end;
+ Log::info() << "Stop requested for pipe: " << _name << '.' << Log::end;
return -1;
}
@@ -300,6 +302,7 @@ int PipeReader::readLine(std::string& line,
pipe.events = POLLIN;
pipe.revents = 0;
const int ready = poll(&pipe, 1, pollTimeoutMs);
+ Log::trace() << "Poll for pipe: " << _name << " returned: " << ready << Log::end;
if (ready == 0)
{
// Timeout.
@@ -314,6 +317,7 @@ int PipeReader::readLine(std::string& line,
{
char buffer[READ_BUFFER_SIZE];
const auto bytes = readFIFO(_pipe, buffer, sizeof(buffer));
+ Log::trace() << "readFIFO for pipe: " << _name << " returned: " << bytes << Log::end;
if (bytes < 0)
{
return -1;
@@ -324,15 +328,18 @@ int PipeReader::readLine(std::string& line,
{
// Got end of line.
line = _data;
- auto tail = std::string(static_cast<const char*>(buffer), endOfLine);
+ const auto tail = std::string(static_cast<const char*>(buffer), endOfLine);
line += tail;
- _data = std::string(endOfLine, bytes - tail.size() - 1); // Exclude the '\n'.
+ _data = std::string(endOfLine + 1, bytes - tail.size() - 1); // Exclude the '\n'.
+ Log::trace() << "Read line from pipe: " << _name << ", line: [" << line
+ << "], data: [" << _data << "]." << Log::end;
return 1;
}
else
{
// More data, keep going.
_data += std::string(buffer, bytes);
+ Log::trace() << "data appended to pipe: " << _name << ", data: " << _data << Log::end;
}
}
else if (pipe.revents & (POLLERR | POLLHUP | POLLNVAL))
diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp
index c7a4e00..016a808 100644
--- a/loolwsd/IoUtil.hpp
+++ b/loolwsd/IoUtil.hpp
@@ -59,7 +59,7 @@ namespace IoUtil
/// On success, line will contain the read message.
int readLine(std::string& line,
std::function<bool()> stopPredicate,
- const size_t timeoutMs);
+ const size_t timeoutMs = POLL_TIMEOUT_MS);
void process(std::function<bool(std::string& message)> handler,
std::function<bool()> stopPredicate,
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index a551365..fdb43a3 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -184,65 +184,14 @@ namespace
class PipeRunnable: public Runnable
{
public:
- PipeRunnable()
- : _start(nullptr),
- _end(nullptr)
+ PipeRunnable() :
+ _childPipeReader("child_pipe_rd", readerChild)
{
}
- ssize_t getResponseLine(const int pipeReader, std::string& response)
- {
- ssize_t bytes = -1;
- response.clear();
-
- try
- {
- for (;;)
- {
- if (_start == _end)
- {
- bytes = IoUtil::readMessage(pipeReader, _buffer, sizeof(_buffer));
- if (bytes < 0)
- {
- _start = _end = nullptr;
- break;
- }
-
- _start = _buffer;
- _end = _buffer + bytes;
- }
-
- if (_start != _end)
- {
- char byteChar = *_start++;
- while (_start != _end && byteChar != '\r' && byteChar != '\n')
- {
- response += byteChar;
- byteChar = *_start++;
- }
-
- if (byteChar == '\r' && *_start == '\n')
- {
- ++_start;
- break;
- }
- }
- }
- }
- catch (const std::exception& exc)
- {
- Log::error() << "Exception while reading from pipe ["
- << pipeReader << "]: " << exc.what() << Log::end;
- return -1;
- }
-
- Log::debug("Recv child response: [" + response + "].");
- return bytes;
- }
-
bool createSession(const Process::PID pid, const std::string& session, const std::string& url)
{
- const std::string message = "session " + session + " " + url + "\r\n";
+ const std::string message = "session " + session + " " + url + "\n";
if (IoUtil::writeFIFO(getChildPipe(pid), message) < 0)
{
Log::error("Error sending session message to child [" + std::to_string(pid) + "].");
@@ -250,7 +199,7 @@ public:
}
std::string response;
- if (getResponseLine(readerChild, response) < 0)
+ if (_childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0)
{
Log::error("Error reading response to session message from child [" + std::to_string(pid) + "].");
return false;
@@ -268,10 +217,10 @@ public:
size_t empty_count = 0;
for (auto it = _childProcesses.begin(); it != _childProcesses.end(); )
{
- const auto message = "query url \r\n";
+ const auto message = "query url\n";
std::string response;
if (IoUtil::writeFIFO(it->second->getWritePipe(), message) < 0 ||
- getResponseLine(readerChild, response) < 0)
+ _childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0)
{
auto log = Log::error();
log << "Error querying child [" << std::to_string(it->second->getPid()) << "].";
@@ -288,7 +237,7 @@ public:
}
StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
- if (tokens.count() == 2 && tokens[0] == std::to_string(it->second->getPid()))
+ if (tokens.count() >= 2 && tokens[0] == std::to_string(it->second->getPid()))
{
Log::debug("Child [" + std::to_string(it->second->getPid()) + "] hosts [" + tokens[1] + "].");
if (tokens[1] == "empty")
@@ -304,7 +253,7 @@ public:
else
{
Log::error("Unexpected response from child [" + std::to_string(it->second->getPid()) +
- "] to query: [" + tokens[1] + "].");
+ "] to url query: [" + response + "].");
}
++it;
@@ -317,6 +266,8 @@ public:
void handleInput(const std::string& message)
{
+ Log::info("Broker command: [" + message + "].");
+
StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
std::lock_guard<std::mutex> lock(forkMutex);
@@ -388,9 +339,7 @@ public:
}
private:
- char* _start;
- char* _end;
- char _buffer[READ_BUFFER_SIZE];
+ IoUtil::PipeReader _childPipeReader;
};
/// Initializes LibreOfficeKit for cross-fork re-use.
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 299af4d..2bce3e8 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -997,15 +997,14 @@ void lokit_main(const std::string& childRoot,
if (start != end)
{
char byteChar = *start++;
- while (start != end && byteChar != '\r' && byteChar != '\n')
+ while (start != end && byteChar != '\n')
{
message += byteChar;
byteChar = *start++;
}
- if (byteChar == '\r' && *start == '\n')
+ if (byteChar == '\n')
{
- start++;
Log::trace("Recv: " + message);
StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
auto response = std::to_string(Process::id()) + " ";
@@ -1013,7 +1012,7 @@ void lokit_main(const std::string& childRoot,
if (TerminationFlag)
{
// Too late, we're going down.
- response += "down \r\n";
+ response += "down\n";
}
else if (tokens[0] == "session")
{
@@ -1034,29 +1033,29 @@ void lokit_main(const std::string& childRoot,
if (url == document->getUrl() &&
document->createSession(sessionId, intSessionId))
{
- response += "ok \r\n";
+ response += "ok\n";
}
else
{
- response += "bad \r\n";
+ response += "bad\n";
}
}
else if (document && document->canDiscard())
{
TerminationFlag = true;
- response += "down \r\n";
+ response += "down\n";
}
else if (tokens[0] == "query" && tokens.count() > 1)
{
if (tokens[1] == "url")
{
response += (document ? document->getUrl() : "empty");
- response += " \r\n";
+ response += "\n";
}
}
else
{
- response += "bad unknown token [" + tokens[0] + "] \r\n";
+ response += "bad unknown token [" + tokens[0] + "]\n";
}
IoUtil::writeFIFO(writerBroker, response);
@@ -1064,7 +1063,6 @@ void lokit_main(const std::string& childRoot,
// Don't log the CR LF at end
assert(response.length() > 2);
assert(response[response.length()-1] == '\n');
- assert(response[response.length()-2] == '\r');
Log::trace("KitToBroker: " + response.substr(0, response.length()-2));
message.clear();
}
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 650b1e1..db673cc 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -272,6 +272,7 @@ private:
session->handleInput(saveas.data(), saveas.size());
// Send it back to the client.
+ //TODO: Should have timeout to avoid waiting forever.
Poco::URI resultURL(session->getSaveAs());
if (!resultURL.getPath().empty())
{
@@ -459,8 +460,8 @@ private:
sessionsLock.unlock();
// Request a kit process for this doc.
- const std::string aMessage = "request " + id + " " + docKey + "\r\n";
- Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 2));
+ const std::string aMessage = "request " + id + " " + docKey + "\n";
+ Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1));
IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage);
// For ToClient sessions, we store incoming messages in a queue and have a separate
@@ -495,8 +496,9 @@ private:
if (docBroker->getSessionsCount() == 1 && !normalShutdown)
{
- //TODO: This really should move to the kit, where it
- // knows if a doc is unsaved, and if other views are open.
+ //TODO: This isn't this simple. We need to wait for the notification
+ // of save so Storage can persist the save (if necessary).
+ // In addition, we shouldn't issue save when opening of the doc fails.
Log::info("Non-deliberate shutdown of the last session, saving the document before tearing down.");
queue.put("uno .uno:Save");
}
@@ -1360,7 +1362,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
threadPool.joinAll();
// Terminate child processes
- IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\r\n");
+ IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\n");
Log::info("Requesting child process " + std::to_string(brokerPid) + " to terminate");
Util::requestTermination(brokerPid);
diff --git a/loolwsd/MasterProcessSession.cpp b/loolwsd/MasterProcessSession.cpp
index f274a0e..90a82af 100644
--- a/loolwsd/MasterProcessSession.cpp
+++ b/loolwsd/MasterProcessSession.cpp
@@ -441,6 +441,7 @@ bool MasterProcessSession::loadDocument(const char* /*buffer*/, int /*length*/,
// Finally, wait for the Child to connect to Master,
// link the document in jail and dispatch load to child.
+ Log::trace("Dispatching child to handle [load].");
dispatchChild();
return true;
@@ -463,7 +464,10 @@ bool MasterProcessSession::getStatus(const char *buffer, int length)
}
if (_peer.expired())
+ {
+ Log::trace("Dispatching child to handle [getStatus].");
dispatchChild();
+ }
forwardToPeer(buffer, length);
return true;
}
@@ -757,8 +761,8 @@ void MasterProcessSession::dispatchChild()
{
Log::info() << "Retrying child permission... " << retries << Log::end;
// request again new URL session
- const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + "\r\n";
- Log::trace("MasterToBroker: " + message.substr(0, message.length()-2));
+ const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + '\n';
+ Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, message);
}
}
More information about the Libreoffice-commits
mailing list