[Libreoffice-commits] online.git: loolwsd/DocumentBroker.cpp loolwsd/DocumentBroker.hpp loolwsd/LOOLKit.cpp loolwsd/LOOLWSD.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Apr 4 04:08:19 UTC 2016
loolwsd/DocumentBroker.cpp | 9 +++
loolwsd/DocumentBroker.hpp | 81 ++++++++++++++++++++++++++++++++++-
loolwsd/LOOLKit.cpp | 68 +++++++++++++++++++++++++++++
loolwsd/LOOLWSD.cpp | 103 +++++++++++----------------------------------
4 files changed, 182 insertions(+), 79 deletions(-)
New commits:
commit d5e2f647901bf6b764afdae4c6b2922527718b7f
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Apr 3 21:40:14 2016 -0400
loolwsd: WSD <-> Child direct communication
WSD now communicates on a WebSocket directly
with kit processes. ChildProcess encapsulates
kit processes and the control WS, which itself
is owned by DocumentBroker.
Change-Id: Ica209aaa07974739b8e51a14e11325d084e193f6
Reviewed-on: https://gerrit.libreoffice.org/23789
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/DocumentBroker.cpp b/loolwsd/DocumentBroker.cpp
index c36067e..393ba9a 100644
--- a/loolwsd/DocumentBroker.cpp
+++ b/loolwsd/DocumentBroker.cpp
@@ -64,11 +64,13 @@ std::string DocumentBroker::getDocKey(const Poco::URI& uri)
DocumentBroker::DocumentBroker(const Poco::URI& uriPublic,
const std::string& docKey,
- const std::string& childRoot) :
+ const std::string& childRoot,
+ std::shared_ptr<ChildProcess> childProcess) :
_uriPublic(uriPublic),
_docKey(docKey),
_childRoot(childRoot),
_cacheRoot(getCachePath(uriPublic.toString())),
+ _childProcess(childProcess),
_sessionsCount(0)
{
assert(!_docKey.empty());
@@ -163,6 +165,11 @@ void DocumentBroker::addWSSession(const std::string id, std::shared_ptr<MasterPr
{
Log::warn("DocumentBroker: Trying to add already existed session.");
}
+
+ // Request a new session from the child kit.
+ const std::string aMessage = "session " + id + " " + _docKey + "\n";
+ Log::debug("DocBroker to Child: " + aMessage.substr(0, aMessage.length() - 1));
+ _childProcess->getWebSocket()->sendFrame(aMessage.data(), aMessage.size());
}
void DocumentBroker::removeWSSession(const std::string id)
diff --git a/loolwsd/DocumentBroker.hpp b/loolwsd/DocumentBroker.hpp
index 3d848de..458090f 100644
--- a/loolwsd/DocumentBroker.hpp
+++ b/loolwsd/DocumentBroker.hpp
@@ -10,6 +10,8 @@
#ifndef INCLUDED_DOCUMENTBROKER_HPP
#define INCLUDED_DOCUMENTBROKER_HPP
+#include <signal.h>
+
#include <atomic>
#include <memory>
#include <mutex>
@@ -25,6 +27,81 @@
class StorageBase;
class TileCache;
+/// Represents a new LOK child that is read
+/// to host a document.
+class ChildProcess
+{
+public:
+ ChildProcess() :
+ _pid(-1)
+ {
+ }
+
+ /// pid is the process ID of the child.
+ /// ws is the control WebSocket to the child.
+ ChildProcess(const Poco::Process::PID pid, const std::shared_ptr<Poco::Net::WebSocket>& ws) :
+ _pid(pid),
+ _ws(ws)
+ {
+ Log::info("ChildProcess ctor [" + std::to_string(_pid) + "].");
+ }
+
+ ChildProcess(ChildProcess&& other) :
+ _pid(other._pid),
+ _ws(other._ws)
+ {
+ Log::info("ChildProcess move ctor [" + std::to_string(_pid) + "].");
+ other._pid = -1;
+ other._ws.reset();
+ }
+
+ const ChildProcess& operator=(ChildProcess&& other)
+ {
+ Log::info("ChildProcess assign [" + std::to_string(_pid) + "].");
+ _pid = other._pid;
+ other._pid = -1;
+ _ws = other._ws;
+ other._ws.reset();
+
+ return *this;
+ }
+
+ ~ChildProcess()
+ {
+ Log::info("~ChildProcess dtor [" + std::to_string(_pid) + "].");
+ close(true);
+ }
+
+ void close(const bool rude)
+ {
+ Log::info("Closing child [" + std::to_string(_pid) + "].");
+ if (_pid != -1)
+ {
+ if (kill(_pid, SIGINT) != 0 && rude && kill(_pid, 0) != 0)
+ {
+ Log::error("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning.");
+ }
+
+ //TODO: Notify Admin.
+ std::ostringstream message;
+ message << "rmdoc" << " "
+ << _pid << " "
+ << "\n";
+ //IoUtil::writeFIFO(WriterNotify, message.str());
+ _pid = -1;
+ }
+
+ _ws.reset();
+ }
+
+ Poco::Process::PID getPid() const { return _pid; }
+ std::shared_ptr<Poco::Net::WebSocket> getWebSocket() const { return _ws; }
+
+private:
+ Poco::Process::PID _pid;
+ std::shared_ptr<Poco::Net::WebSocket> _ws;
+};
+
/// DocumentBroker is responsible for setting up a document
/// in jail and brokering loading it from Storage
/// and saving it back.
@@ -43,7 +120,8 @@ public:
DocumentBroker(const Poco::URI& uriPublic,
const std::string& docKey,
- const std::string& childRoot);
+ const std::string& childRoot,
+ std::shared_ptr<ChildProcess> childProcess);
~DocumentBroker()
{
@@ -94,6 +172,7 @@ private:
std::string _filename;
std::unique_ptr<StorageBase> _storage;
std::unique_ptr<TileCache> _tileCache;
+ std::shared_ptr<ChildProcess> _childProcess;
std::mutex _mutex;
std::atomic<unsigned> _sessionsCount;
};
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index cdf2ca0..149ddff 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -1016,7 +1016,75 @@ void lokit_main(const std::string& childRoot,
Log::info("loolkit [" + std::to_string(Process::id()) + "] is ready.");
if (doBenchmark)
+ {
IoUtil::writeFIFO(writerBroker, "started\n");
+ }
+
+ // Open websocket connection between the child process and WSD.
+ Poco::Net::HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER);
+ cs.setTimeout(0);
+ HTTPRequest request(HTTPRequest::HTTP_GET, std::string(NEW_CHILD_URI) + "pid=" + pid);
+ Poco::Net::HTTPResponse response;
+ auto ws = std::make_shared<WebSocket>(cs, request, response);
+ ws->setReceiveTimeout(0);
+
+ const std::string socketName = "ChildControllerWS";
+ IoUtil::SocketProcessor(ws, response, [&socketName, &ws, &document, &loKit](const std::vector<char>& data)
+ {
+ const std::string message(data.data(), data.size());
+ Log::debug(socketName + ": recv [" + message + "].");
+ StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ auto response = std::to_string(Process::id()) + " ";
+
+ if (TerminationFlag)
+ {
+ // Too late, we're going down.
+ response += "down\n";
+ }
+ else if (tokens[0] == "session")
+ {
+ const std::string& sessionId = tokens[1];
+ const unsigned intSessionId = Util::decodeId(sessionId);
+ const std::string& docKey = tokens[2];
+
+ std::string url;
+ Poco::URI::decode(docKey, url);
+ Log::info("New session [" + sessionId + "] request on url [" + url + "].");
+
+ if (!document)
+ {
+ document = std::make_shared<Document>(loKit, jailId, docKey, url);
+ }
+
+ // Validate and create session.
+ if (url == document->getUrl() &&
+ document->createSession(sessionId, intSessionId))
+ {
+ response += "ok\n";
+ }
+ else
+ {
+ response += "bad\n";
+ }
+ }
+ else if (document && document->canDiscard())
+ {
+ TerminationFlag = true;
+ response += "down\n";
+ }
+ else
+ {
+ response += "bad unknown token [" + tokens[0] + "]\n";
+ }
+
+ //FIXME: Do we really need to respond here?
+ Log::trace("KitToDocBroker: " + response.substr(0, response.length()-2));
+ ws->sendFrame(response.data(), response.size());
+
+ return true;
+ },
+ [](){ return TerminationFlag; },
+ socketName);
char buffer[READ_BUFFER_SIZE];
std::string message;
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index ded9089..0332e48 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -173,76 +173,6 @@ using Poco::XML::InputSource;
using Poco::XML::Node;
using Poco::XML::NodeList;
-/// Represents a new LOK child that is read
-/// to host a document.
-class ChildProcess
-{
-public:
- ChildProcess() :
- _pid(-1)
- {
- }
-
- /// pid is the process ID of the child.
- /// ws is the control WebSocket to the child.
- ChildProcess(const Poco::Process::PID pid, const std::shared_ptr<Poco::Net::WebSocket>& ws) :
- _pid(pid),
- _ws(ws)
- {
- }
-
- ChildProcess(ChildProcess&& other) :
- _pid(other._pid),
- _ws(other._ws)
- {
- other._pid = -1;
- other._ws.reset();
- }
-
- const ChildProcess& operator=(ChildProcess&& other)
- {
- _pid = other._pid;
- other._pid = -1;
- _ws = other._ws;
- other._ws.reset();
-
- return *this;
- }
-
- ~ChildProcess()
- {
- close(true);
- }
-
- void close(const bool rude)
- {
- if (_pid != -1)
- {
- if (kill(_pid, SIGINT) != 0 && rude && kill(_pid, 0) != 0)
- {
- Log::error("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning.");
- }
-
- //TODO: Notify Admin.
- std::ostringstream message;
- message << "rmdoc" << " "
- << _pid << " "
- << "\n";
- //IoUtil::writeFIFO(WriterNotify, message.str());
- _pid = -1;
- }
-
- _ws.reset();
- }
-
- Poco::Process::PID getPid() const { return _pid; }
- std::shared_ptr<Poco::Net::WebSocket> getWebSocket() const { return _ws; }
-
-private:
- Poco::Process::PID _pid;
- std::shared_ptr<Poco::Net::WebSocket> _ws;
-};
-
/// New LOK child processes ready to host documents.
static std::vector<std::shared_ptr<ChildProcess>> newChilds;
static std::mutex newChildsMutex;
@@ -337,9 +267,21 @@ private:
if (!format.empty())
{
Log::info("Conversion request for URI [" + fromPath + "].");
+
+ // Request a kit process for this doc.
+ auto child = getNewChild();
+ if (!child)
+ {
+ // Let the client know we can't serve now.
+ response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
+ response.setContentLength(0);
+ response.send();
+ return;
+ }
+
auto uriPublic = DocumentBroker::sanitizeURI(fromPath);
const auto docKey = DocumentBroker::getDocKey(uriPublic);
- auto docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot);
+ auto docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child);
// This lock could become a bottleneck.
// In that case, we can use a pool and index by publicPath.
@@ -517,9 +459,20 @@ private:
}
else
{
+ // Request a kit process for this doc.
+ auto child = getNewChild();
+ if (!child)
+ {
+ // Let the client know we can't serve now.
+ response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
+ response.setContentLength(0);
+ response.send();
+ return;
+ }
+
// Set one we just created.
Log::debug("New DocumentBroker for docKey [" + docKey + "].");
- docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot);
+ docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child);
docBrokers.emplace(docKey, docBroker);
}
@@ -543,11 +496,6 @@ private:
if (wsSessionsCount == 1)
session->setEditLock(true);
- // Request a kit process for this doc.
- const std::string aMessage = "request " + id + " " + docKey + "\n";
- Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1));
- IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage);
-
QueueHandler handler(queue, session, "wsd_queue_" + session->getId());
Thread queueHandlerThread;
@@ -722,6 +670,7 @@ public:
auto ws = std::make_shared<WebSocket>(request, response);
std::unique_lock<std::mutex> lock(newChildsMutex);
newChilds.emplace_back(std::make_shared<ChildProcess>(pid, ws));
+ Log::info("Have " + std::to_string(newChilds.size()) + " childs.");
return;
}
More information about the Libreoffice-commits
mailing list