[Libreoffice-commits] online.git: common/Common.hpp kit/ForKit.cpp net/Socket.hpp wsd/Admin.cpp wsd/Admin.hpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/LOOLWSD.hpp
Gabriel Masei (via logerrit)
logerrit at kemper.freedesktop.org
Tue Apr 7 13:05:06 UTC 2020
common/Common.hpp | 1
kit/ForKit.cpp | 124 ++++++++++++++++++++---------------
net/Socket.hpp | 5 +
wsd/Admin.cpp | 7 -
wsd/Admin.hpp | 3
wsd/DocumentBroker.hpp | 124 +----------------------------------
wsd/LOOLWSD.cpp | 102 ++++++++++++++++++++++-------
wsd/LOOLWSD.hpp | 172 ++++++++++++++++++++++++++++++++++++++++++++++++-
8 files changed, 333 insertions(+), 205 deletions(-)
New commits:
commit 70af76e28cbca4a45869fcecfe221d21eb7a3790
Author: Gabriel Masei <gabriel.masei at 1and1.ro>
AuthorDate: Thu Apr 2 18:11:36 2020 +0300
Commit: Michael Meeks <michael.meeks at collabora.com>
CommitDate: Tue Apr 7 15:04:47 2020 +0200
Replaced pipe with websocket based on Unix socket in communication with ForKit
Change-Id: I80f1a4e84ca6820503966a8ee5d9958a150eac14
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/91585
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
diff --git a/common/Common.hpp b/common/Common.hpp
index 7a02dfaae..82f848579 100644
--- a/common/Common.hpp
+++ b/common/Common.hpp
@@ -36,6 +36,7 @@ constexpr const char JAILED_DOCUMENT_ROOT[] = "/user/docs/";
constexpr const char CHILD_URI[] = "/loolws/child?";
constexpr const char NEW_CHILD_URI[] = "/loolws/newchild";
constexpr const char LO_JAIL_SUBPATH[] = "lo";
+constexpr const char FORKIT_URI[] = "/loolws/forkit";
constexpr const char CAPABILITIES_END_POINT[] = "/hosting/capabilities";
diff --git a/kit/ForKit.cpp b/kit/ForKit.cpp
index 4ebd57907..8b18ef34e 100644
--- a/kit/ForKit.cpp
+++ b/kit/ForKit.cpp
@@ -35,6 +35,7 @@
#include <Log.hpp>
#include <Unit.hpp>
#include <Util.hpp>
+#include <WebSocketHandler.hpp>
#include <common/FileUtil.hpp>
#include <common/Seccomp.hpp>
@@ -64,75 +65,88 @@ int ClientPortNumber = DEFAULT_CLIENT_PORT_NUMBER;
std::string MasterLocation;
#endif
-/// Dispatcher class to demultiplex requests from
-/// WSD and handles them.
-class CommandDispatcher : public IoUtil::PipeReader
+class ServerWSHandler;
+
+// We have a single thread and a single connection so we won't bother with
+// access synchronization
+std::shared_ptr<ServerWSHandler> WSHandler;
+
+class ServerWSHandler final : public WebSocketHandler
{
+ std::string _socketName;
+
public:
- CommandDispatcher(const int pipe) :
- PipeReader("wsd_pipe_rd", pipe)
+ ServerWSHandler(const std::string& socketName) :
+ WebSocketHandler(/* isClient = */ true, /* isMasking */ false),
+ _socketName(socketName)
{
}
- /// Polls WSD commands and handles them.
- bool pollAndDispatch()
+protected:
+ void handleMessage(const std::vector<char>& data) override
{
- std::string message;
- const int ready = readLine(message, [](){ return SigUtil::getTerminationFlag(); });
- if (ready <= 0)
+ std::string message(data.data(), data.size());
+
+#if !MOBILEAPP
+ if (UnitKit::get().filterKitMessage(this, message))
+ return;
+#endif
+ StringVector tokens = LOOLProtocol::tokenize(message);
+ Log::StreamLogger logger = Log::debug();
+ if (logger.enabled())
{
- // Termination is done via SIGTERM, which breaks the wait.
- if (ready < 0)
+ logger << _socketName << ": recv [";
+ for (const auto& token : tokens)
{
- if (SigUtil::getTerminationFlag())
- {
- LOG_INF("Poll interrupted in " << getName() << " and TerminationFlag is set.");
- }
-
- // Break.
- return false;
+ logger << tokens.getParam(token) << ' ';
}
- // Timeout.
- return true;
+ LOG_END(logger, true);
}
- LOG_INF("ForKit command: [" << message << "].");
- try
+ // Note: Syntax or parsing errors here are unexpected and fatal.
+ if (SigUtil::getTerminationFlag())
{
- StringVector tokens = LOOLProtocol::tokenize(message);
- if (tokens.size() == 2 && tokens.equals(0, "spawn"))
+ LOG_DBG("Termination flag set: skip message processing");
+ }
+ else if (tokens.size() == 2 && tokens.equals(0, "spawn"))
+ {
+ const int count = std::stoi(tokens[1]);
+ if (count > 0)
{
- const int count = std::stoi(tokens[1]);
- if (count > 0)
- {
- LOG_INF("Setting to spawn " << tokens[1] << " child" << (count == 1 ? "" : "ren") << " per request.");
- ForkCounter = count;
- }
- else
- {
- LOG_WRN("Cannot spawn " << tokens[1] << " children as requested.");
- }
+ LOG_INF("Setting to spawn " << tokens[1] << " child" << (count == 1 ? "" : "ren") << " per request.");
+ ForkCounter = count;
}
- else if (tokens.size() == 3 && tokens.equals(0, "setconfig"))
+ else
{
- // Currently only rlimit entries are supported.
- if (!Rlimit::handleSetrlimitCommand(tokens))
- {
- LOG_ERR("Unknown setconfig command: " << message);
- }
+ LOG_WRN("Cannot spawn " << tokens[1] << " children as requested.");
}
- else
+ }
+ else if (tokens.size() == 3 && tokens.equals(0, "setconfig"))
+ {
+ // Currently only rlimit entries are supported.
+ if (!Rlimit::handleSetrlimitCommand(tokens))
{
- LOG_ERR("Unknown command: " << message);
+ LOG_ERR("Unknown setconfig command: " << message);
}
}
- catch (const std::exception& exc)
+ else if (tokens.equals(0, "exit"))
+ {
+ LOG_INF("Setting TerminationFlag due to 'exit' command from parent.");
+ SigUtil::setTerminationFlag();
+ }
+ else
{
- LOG_ERR("Error while processing forkit request [" << message << "]: " << exc.what());
+ LOG_ERR("Bad or unknown token [" << tokens[0] << "]");
}
+ }
- return true;
+ void onDisconnect() override
+ {
+#if !MOBILEAPP
+ LOG_WRN("ForKit connection lost without exit arriving from wsd. Setting TerminationFlag");
+ SigUtil::setTerminationFlag();
+#endif
}
};
@@ -234,7 +248,7 @@ static void cleanupChildren()
LOG_ERR("Unknown child " << exitedChildPid << " has exited");
}
}
-
+
// Now delete the jails.
for (const auto& path : jails)
{
@@ -545,18 +559,22 @@ int main(int argc, char** argv)
Log::logger().setLevel(LogLevel);
}
- CommandDispatcher commandDispatcher(0);
+ SocketPoll mainPoll(Util::getThreadName());
+ mainPoll.runOnClientThread(); // We will do the polling on this thread.
+
+ WSHandler = std::make_shared<ServerWSHandler>("forkit_ws");
+
+#if !MOBILEAPP
+ mainPoll.insertNewUnixSocket(MasterLocation, FORKIT_URI, WSHandler);
+#endif
+
LOG_INF("ForKit process is ready.");
while (!SigUtil::getTerminationFlag())
{
UnitKit::get().invokeForKitTest();
- if (!commandDispatcher.pollAndDispatch())
- {
- LOG_INF("Child dispatcher flagged for termination.");
- break;
- }
+ mainPoll.poll(POLL_TIMEOUT_MS);
#if ENABLE_DEBUG
if (!SingleKit)
diff --git a/net/Socket.hpp b/net/Socket.hpp
index d09c39334..fd0a5278d 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -543,6 +543,11 @@ public:
}
}
+ const std::thread::id &getThreadOwner()
+ {
+ return _owner;
+ }
+
/// Are we running in either shutdown, or the polling thread.
/// Asserts in the debug builds, otherwise just logs.
void assertCorrectThread() const
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 93fe76827..ca72ecd61 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -24,7 +24,6 @@
#include <Common.hpp>
#include "FileServer.hpp"
#include <IoUtil.hpp>
-#include "LOOLWSD.hpp"
#include <Log.hpp>
#include <Protocol.hpp>
#include "Storage.hpp"
@@ -352,7 +351,6 @@ Admin::Admin() :
SocketPoll("admin"),
_model(AdminModel()),
_forKitPid(-1),
- _forKitWritePipe(-1),
_lastTotalMemory(0),
_lastJiffies(0),
_lastSentCount(0),
@@ -594,10 +592,7 @@ void Admin::notifyForkit()
<< "setconfig limit_file_size_mb " << _defDocProcSettings.getLimitFileSizeMb() << '\n'
<< "setconfig limit_num_open_files " << _defDocProcSettings.getLimitNumberOpenFiles() << '\n';
- if (_forKitWritePipe != -1)
- IoUtil::writeToPipe(_forKitWritePipe, oss.str());
- else
- LOG_INF("Forkit write pipe not set (yet).");
+ LOOLWSD::sendMessageToForKit(oss.str());
}
void Admin::triggerMemoryCleanup(const size_t totalMem)
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index 5d3cd4b05..e39d89fe7 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -16,6 +16,7 @@
#include "Log.hpp"
#include "net/WebSocketHandler.hpp"
+#include "LOOLWSD.hpp"
class Admin;
@@ -91,7 +92,6 @@ public:
void rmDoc(const std::string& docKey);
void setForKitPid(const int forKitPid) { _forKitPid = forKitPid; _model.setForKitPid(forKitPid);}
- void setForKitWritePipe(const int forKitWritePipe) { _forKitWritePipe = forKitWritePipe; }
/// Callers must ensure that modelMutex is acquired
AdminModel& getModel();
@@ -156,7 +156,6 @@ private:
/// the Admin Poll thread.
AdminModel _model;
int _forKitPid;
- int _forKitWritePipe;
size_t _lastTotalMemory;
size_t _lastJiffies;
uint64_t _lastSentCount;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index e304cd60c..251e97bba 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -58,9 +58,11 @@ public:
}
};
+#include "LOOLWSD.hpp"
+
/// Represents a new LOK child that is read
/// to host a document.
-class ChildProcess
+class ChildProcess : public WSProcess
{
public:
/// @param pid is the process ID of the child.
@@ -70,12 +72,9 @@ public:
const std::shared_ptr<StreamSocket>& socket,
const Poco::Net::HTTPRequest &request) :
- _pid(pid),
- _jailId(jailId),
- _ws(std::make_shared<WebSocketHandler>(socket, request)),
- _socket(socket)
+ WSProcess("ChildProcess", pid, socket, std::make_shared<WebSocketHandler>(socket, request)),
+ _jailId(jailId)
{
- LOG_INF("ChildProcess ctor [" << _pid << "].");
}
@@ -83,125 +82,12 @@ public:
const ChildProcess& operator=(ChildProcess&& other) = delete;
- ~ChildProcess()
- {
- LOG_DBG("~ChildProcess dtor [" << _pid << "].");
-
- if (_pid <= 0)
- return;
-
- terminate();
-
- // No need for the socket anymore.
- _ws.reset();
- _socket.reset();
- }
-
void setDocumentBroker(const std::shared_ptr<DocumentBroker>& docBroker);
std::shared_ptr<DocumentBroker> getDocumentBroker() const { return _docBroker.lock(); }
-
- /// Let the child close a nice way.
- void close()
- {
- if (_pid < 0)
- return;
-
- try
- {
- LOG_DBG("Closing ChildProcess [" << _pid << "].");
-
- // Request the child to exit
- if (isAlive())
- {
- LOG_DBG("Stopping ChildProcess [" << _pid << "] by sending 'exit' command.");
- sendTextFrame("exit");
- }
-
- // Shutdown the socket.
- if (_ws)
- _ws->shutdown();
- }
- catch (const std::exception& ex)
- {
- LOG_ERR("Error while closing child process: " << ex.what());
- }
-
- _pid = -1; // Detach from child.
- }
-
- /// Kill or abandon the child.
- void terminate()
- {
- if (_pid < 0)
- return;
-
-#if !MOBILEAPP
- if (::kill(_pid, 0) == 0)
- {
- LOG_INF("Killing child [" << _pid << "].");
- if (!SigUtil::killChild(_pid))
- {
- LOG_ERR("Cannot terminate lokit [" << _pid << "]. Abandoning.");
- }
- }
-#else
- // What to do? Throw some unique exception that the outermost call in the thread catches and
- // exits from the thread?
-#endif
- _pid = -1;
- }
-
- Poco::Process::PID getPid() const { return _pid; }
const std::string& getJailId() const { return _jailId; }
- /// Send a text payload to the child-process WS.
- bool sendTextFrame(const std::string& data)
- {
- try
- {
- if (_ws)
- {
- LOG_TRC("Send DocBroker to Child message: [" << LOOLProtocol::getAbbreviatedMessage(data) << "].");
- _ws->sendMessage(data);
- return true;
- }
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Failed to send child [" << _pid << "] data [" <<
- LOOLProtocol::getAbbreviatedMessage(data) << "] due to: " << exc.what());
- throw;
- }
-
- LOG_WRN("No socket between DocBroker and child to send [" << LOOLProtocol::getAbbreviatedMessage(data) << "]");
- return false;
- }
-
- /// Check whether this child is alive and socket not in error.
- /// Note: zombies will show as alive, and sockets have waiting
- /// time after the other end-point closes. So this isn't accurate.
- bool isAlive() const
- {
-#if !MOBILEAPP
- try
- {
- return _pid > 1 && _ws && ::kill(_pid, 0) == 0;
- }
- catch (const std::exception&)
- {
- }
-
- return false;
-#else
- return _pid > 1;
-#endif
- }
-
private:
- Poco::Process::PID _pid;
const std::string _jailId;
- std::shared_ptr<WebSocketHandler> _ws;
- std::shared_ptr<Socket> _socket;
std::weak_ptr<DocumentBroker> _docBroker;
};
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 54ba8e2b9..e9e17b86f 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -91,7 +91,6 @@ using Poco::Net::PartHandler;
#include <Poco/Net/DNS.h>
#include <Poco/Net/HostEntry.h>
#include <Poco/Path.h>
-#include <Poco/Pipe.h>
#include <Poco/Process.h>
#include <Poco/SAX/InputSource.h>
#include <Poco/StreamCopier.h>
@@ -387,16 +386,11 @@ static int forkChildren(const int number)
#else
const std::string aMessage = "spawn " + std::to_string(number) + "\n";
LOG_DBG("MasterToForKit: " << aMessage.substr(0, aMessage.length() - 1));
- if (write(LOOLWSD::ForKitWritePipe, aMessage.c_str(), aMessage.length()) > 0)
+ LOOLWSD::sendMessageToForKit(aMessage);
#endif
- {
- OutstandingForks += number;
- LastForkRequestTime = std::chrono::steady_clock::now();
- return number;
- }
-
- LOG_ERR("No forkit pipe while rebalancing children.");
- return -1; // Fail.
+ OutstandingForks += number;
+ LastForkRequestTime = std::chrono::steady_clock::now();
+ return number;
}
return 0;
@@ -704,8 +698,8 @@ inline std::string getServiceURI(const std::string &sub, bool asAdmin = false)
std::atomic<uint64_t> LOOLWSD::NextConnectionId(1);
#ifndef KIT_IN_PROCESS
-std::atomic<int> LOOLWSD::ForKitWritePipe(-1);
std::atomic<int> LOOLWSD::ForKitProcId(-1);
+std::shared_ptr<ForKitProcess> LOOLWSD::ForKitProc;
#endif
#if !MOBILEAPP
bool LOOLWSD::NoCapsForKit = false;
@@ -780,6 +774,42 @@ public:
/// Check prisoners are still alive and balanced.
void wakeupHook() override;
+
+ // Resets the forkit porcess object
+ void setForKitProcess(const std::weak_ptr<ForKitProcess>& forKitProc)
+ {
+ assertCorrectThread();
+ _forKitProc = forKitProc;
+ }
+
+ void sendMessageToForKit(const std::string& msg)
+ {
+ if (std::this_thread::get_id() == getThreadOwner())
+ {
+ // Speed up sending the message if the request comes from owner thread
+ std::shared_ptr<ForKitProcess> forKitProc = _forKitProc.lock();
+ if (forKitProc)
+ {
+ forKitProc->sendTextFrame(msg);
+ }
+ }
+ else
+ {
+ // Put the message in the owner's thread queue to be send later
+ // because WebSocketHandler is not thread safe and otherwise we
+ // should synchronize inside WebSocketHandler.
+ addCallback([=]{
+ std::shared_ptr<ForKitProcess> forKitProc = _forKitProc.lock();
+ if (forKitProc)
+ {
+ forKitProc->sendTextFrame(msg);
+ }
+ });
+ }
+ }
+
+private:
+ std::weak_ptr<ForKitProcess> _forKitProc;
};
/// This thread listens for and accepts prisoner kit processes.
@@ -799,6 +829,17 @@ public:
}
};
+void ForKitProcWSHandler::handleMessage(const std::vector<char> &data)
+{
+ LOG_TRC("ForKitProcWSHandler: handling incoming [" << LOOLProtocol::getAbbreviatedMessage(&data[0], data.size()) << "].");
+ const std::string firstLine = LOOLProtocol::getFirstLine(&data[0], data.size());
+ const StringVector tokens = LOOLProtocol::tokenize(firstLine.data(), firstLine.size());
+
+ // Just add here the processing of specific received messages
+
+ LOG_ERR("ForKitProcWSHandler: unknown command: " << tokens[0]);
+}
+
LOOLWSD::LOOLWSD()
{
}
@@ -1708,13 +1749,10 @@ bool LOOLWSD::createForKit()
Admin::instance().setForKitPid(ForKitProcId);
}
- if (ForKitWritePipe != -1)
- {
- close(ForKitWritePipe);
- ForKitWritePipe = -1;
- Admin::instance().setForKitWritePipe(ForKitWritePipe);
- }
-
+ // Below line will be executed by PrisonerPoll thread.
+ ForKitProc = nullptr;
+ PrisonerPoll.setForKitProcess(ForKitProc);
+
// ForKit always spawns one.
++OutstandingForks;
@@ -1722,17 +1760,13 @@ bool LOOLWSD::createForKit()
args.cat(std::string(" "), 0));
LastForkRequestTime = std::chrono::steady_clock::now();
- int childStdin = -1;
- int child = Util::spawnProcess(forKitPath, args, nullptr, &childStdin);
-
- ForKitWritePipe = childStdin;
+ int child = Util::spawnProcess(forKitPath, args);
ForKitProcId = child;
LOG_INF("Forkit process launched: " << ForKitProcId);
// Init the Admin manager
Admin::instance().setForKitPid(ForKitProcId);
- Admin::instance().setForKitWritePipe(ForKitWritePipe);
const int balance = LOOLWSD::NumPreSpawnedChildren - OutstandingForks;
if (balance > 0)
@@ -1742,6 +1776,11 @@ bool LOOLWSD::createForKit()
#endif
}
+void LOOLWSD::sendMessageToForKit(const std::string& message)
+{
+ PrisonerPoll.sendMessageToForKit(message);
+}
+
#endif // !MOBILEAPP
#ifdef FUZZER
@@ -1903,6 +1942,20 @@ private:
LOG_TRC("Child connection with URI [" << LOOLWSD::anonymizeUrl(request.getURI()) << "].");
Poco::URI requestURI(request.getURI());
+#ifndef KIT_IN_PROCESS
+ if (requestURI.getPath() == FORKIT_URI)
+ {
+ if (socket->getPid() != LOOLWSD::ForKitProcId)
+ {
+ LOG_WRN("Connection request received on " << FORKIT_URI << " endpoint from unexpected ForKit process. Skipped.");
+ return;
+ }
+ LOOLWSD::ForKitProc = std::make_shared<ForKitProcess>(LOOLWSD::ForKitProcId, socket, request);
+ socket->getInBuffer().clear();
+ PrisonerPoll.setForKitProcess(LOOLWSD::ForKitProc);
+ return;
+ }
+#endif
if (requestURI.getPath() != NEW_CHILD_URI)
{
LOG_ERR("Invalid incoming URI.");
@@ -3621,7 +3674,8 @@ int LOOLWSD::innerMain()
LOG_INF("Waiting for forkit process to exit");
int status = 0;
waitpid(ForKitProcId, &status, WUNTRACED);
- close(ForKitWritePipe);
+ ForKitProcId = -1;
+ ForKitProc.reset();
#endif
// In case forkit didn't cleanup properly, don't leave jails behind.
diff --git a/wsd/LOOLWSD.hpp b/wsd/LOOLWSD.hpp
index 469d26bec..33056538f 100644
--- a/wsd/LOOLWSD.hpp
+++ b/wsd/LOOLWSD.hpp
@@ -17,6 +17,8 @@
#include <set>
#include <string>
+#include <signal.h>
+
#include <Poco/Path.h>
#include <Poco/Process.h>
#include <Poco/Util/AbstractConfiguration.h>
@@ -25,6 +27,7 @@
#include "Util.hpp"
#include "FileUtil.hpp"
+#include "WebSocketHandler.hpp"
class ChildProcess;
class TraceFileWriter;
@@ -36,6 +39,170 @@ std::shared_ptr<ChildProcess> getNewChild_Blocks(
const std::string& uri
#endif
);
+// This is common code used to setup as socket to both
+// forkit and child document processes via a websocket.
+// In general, a WSProcess instance represents a child
+// process with which we can communicate through websocket.
+class WSProcess
+{
+public:
+ /// @param pid is the process ID.
+ /// @param socket is the underlying Sockeet to the process.
+ WSProcess(const std::string& name,
+ const Poco::Process::PID pid,
+ const std::shared_ptr<StreamSocket>& socket,
+ std::shared_ptr<WebSocketHandler> handler) :
+
+ _name(name),
+ _pid(pid),
+ _ws(handler),
+ _socket(socket)
+ {
+ LOG_INF(_name << " ctor [" << _pid << "].");
+ }
+
+
+ WSProcess(WSProcess&& other) = delete;
+
+ const WSProcess& operator=(WSProcess&& other) = delete;
+
+ virtual ~WSProcess()
+ {
+ LOG_DBG("~" << _name << " dtor [" << _pid << "].");
+
+ if (_pid <= 0)
+ return;
+
+ terminate();
+
+ // No need for the socket anymore.
+ _ws.reset();
+ _socket.reset();
+ }
+
+ /// Let the child close a nice way.
+ void close()
+ {
+ if (_pid < 0)
+ return;
+
+ try
+ {
+ LOG_DBG("Closing ChildProcess [" << _pid << "].");
+
+ // Request the child to exit
+ if (isAlive())
+ {
+ LOG_DBG("Stopping ChildProcess [" << _pid << "] by sending 'exit' command.");
+ sendTextFrame("exit");
+ }
+
+ // Shutdown the socket.
+ if (_ws)
+ _ws->shutdown();
+ }
+ catch (const std::exception& ex)
+ {
+ LOG_ERR("Error while closing child process: " << ex.what());
+ }
+
+ _pid = -1; // Detach from child.
+ }
+
+ /// Kill or abandon the child.
+ void terminate()
+ {
+ if (_pid < 0)
+ return;
+
+#if !MOBILEAPP
+ if (::kill(_pid, 0) == 0)
+ {
+ LOG_INF("Killing child [" << _pid << "].");
+ if (!SigUtil::killChild(_pid))
+ {
+ LOG_ERR("Cannot terminate lokit [" << _pid << "]. Abandoning.");
+ }
+ }
+#else
+ // What to do? Throw some unique exception that the outermost call in the thread catches and
+ // exits from the thread?
+#endif
+ _pid = -1;
+ }
+
+ Poco::Process::PID getPid() const { return _pid; }
+
+ /// Send a text payload to the child-process WS.
+ virtual bool sendTextFrame(const std::string& data)
+ {
+ try
+ {
+ if (_ws)
+ {
+ LOG_TRC("Send to " << _name << " message: [" << LOOLProtocol::getAbbreviatedMessage(data) << "].");
+ _ws->sendMessage(data);
+ return true;
+ }
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Failed to send " << _name << " [" << _pid << "] data [" <<
+ LOOLProtocol::getAbbreviatedMessage(data) << "] due to: " << exc.what());
+ throw;
+ }
+
+ LOG_WRN("No socket to " << _name << " to send [" << LOOLProtocol::getAbbreviatedMessage(data) << "]");
+ return false;
+ }
+
+ /// Check whether this child is alive and socket not in error.
+ /// Note: zombies will show as alive, and sockets have waiting
+ /// time after the other end-point closes. So this isn't accurate.
+ virtual bool isAlive() const
+ {
+#if !MOBILEAPP
+ try
+ {
+ return _pid > 1 && _ws && ::kill(_pid, 0) == 0;
+ }
+ catch (const std::exception&)
+ {
+ }
+
+ return false;
+#else
+ return _pid > 1;
+#endif
+ }
+
+ std::string _name;
+ Poco::Process::PID _pid;
+ std::shared_ptr<WebSocketHandler> _ws;
+ std::shared_ptr<Socket> _socket;
+};
+
+class ForKitProcWSHandler: public WebSocketHandler, public std::enable_shared_from_this<ForKitProcWSHandler>
+{
+public:
+
+ ForKitProcWSHandler(const std::weak_ptr<StreamSocket>& socket, const Poco::Net::HTTPRequest& request)
+ : WebSocketHandler(socket, request)
+ {
+ }
+
+ virtual void handleMessage(const std::vector<char> &data) override;
+};
+
+class ForKitProcess : public WSProcess
+{
+public:
+ ForKitProcess(int pid, std::shared_ptr<StreamSocket>& socket, const Poco::Net::HTTPRequest &request)
+ : WSProcess("ForKit", pid, socket, std::make_shared<ForKitProcWSHandler>(socket, request))
+ {
+ socket->setHandler(_ws);
+ }
+};
/// The Server class which is responsible for all
/// external interactions.
@@ -57,7 +224,7 @@ public:
static bool SingleKit;
#endif
#endif
- static std::atomic<int> ForKitWritePipe;
+ static std::shared_ptr<ForKitProcess> ForKitProc;
static std::atomic<int> ForKitProcId;
static bool DummyLOK;
static std::string FuzzFileName;
@@ -188,6 +355,9 @@ public:
/// Return true when successfull.
static bool createForKit();
+ /// Sends a message to ForKit through PrisonerPoll.
+ static void sendMessageToForKit(const std::string& message);
+
/// Checks forkit (and respawns), rebalances
/// child kit processes and cleans up DocBrokers.
static void doHousekeeping();
More information about the Libreoffice-commits
mailing list