[Libreoffice-commits] online.git: common/Common.hpp kit/ForKit.cpp net/Socket.hpp wsd/Admin.cpp wsd/Admin.hpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/LOOLWSD.hpp

Gabriel Masei (via logerrit) logerrit at kemper.freedesktop.org
Tue Apr 7 13:05:06 UTC 2020


 common/Common.hpp      |    1 
 kit/ForKit.cpp         |  124 ++++++++++++++++++++---------------
 net/Socket.hpp         |    5 +
 wsd/Admin.cpp          |    7 -
 wsd/Admin.hpp          |    3 
 wsd/DocumentBroker.hpp |  124 +----------------------------------
 wsd/LOOLWSD.cpp        |  102 ++++++++++++++++++++++-------
 wsd/LOOLWSD.hpp        |  172 ++++++++++++++++++++++++++++++++++++++++++++++++-
 8 files changed, 333 insertions(+), 205 deletions(-)

New commits:
commit 70af76e28cbca4a45869fcecfe221d21eb7a3790
Author:     Gabriel Masei <gabriel.masei at 1and1.ro>
AuthorDate: Thu Apr 2 18:11:36 2020 +0300
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Tue Apr 7 15:04:47 2020 +0200

    Replaced pipe with websocket based on Unix socket in communication with ForKit
    
    Change-Id: I80f1a4e84ca6820503966a8ee5d9958a150eac14
    Reviewed-on: https://gerrit.libreoffice.org/c/online/+/91585
    Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
    Reviewed-by: Michael Meeks <michael.meeks at collabora.com>

diff --git a/common/Common.hpp b/common/Common.hpp
index 7a02dfaae..82f848579 100644
--- a/common/Common.hpp
+++ b/common/Common.hpp
@@ -36,6 +36,7 @@ constexpr const char JAILED_DOCUMENT_ROOT[] = "/user/docs/";
 constexpr const char CHILD_URI[] = "/loolws/child?";
 constexpr const char NEW_CHILD_URI[] = "/loolws/newchild";
 constexpr const char LO_JAIL_SUBPATH[] = "lo";
+constexpr const char FORKIT_URI[] = "/loolws/forkit";
 
 constexpr const char CAPABILITIES_END_POINT[] = "/hosting/capabilities";
 
diff --git a/kit/ForKit.cpp b/kit/ForKit.cpp
index 4ebd57907..8b18ef34e 100644
--- a/kit/ForKit.cpp
+++ b/kit/ForKit.cpp
@@ -35,6 +35,7 @@
 #include <Log.hpp>
 #include <Unit.hpp>
 #include <Util.hpp>
+#include <WebSocketHandler.hpp>
 
 #include <common/FileUtil.hpp>
 #include <common/Seccomp.hpp>
@@ -64,75 +65,88 @@ int ClientPortNumber = DEFAULT_CLIENT_PORT_NUMBER;
 std::string MasterLocation;
 #endif
 
-/// Dispatcher class to demultiplex requests from
-/// WSD and handles them.
-class CommandDispatcher : public IoUtil::PipeReader
+class ServerWSHandler;
+
+// We have a single thread and a single connection so we won't bother with
+// access synchronization
+std::shared_ptr<ServerWSHandler> WSHandler;
+
+class ServerWSHandler final : public WebSocketHandler
 {
+    std::string _socketName;
+
 public:
-    CommandDispatcher(const int pipe) :
-        PipeReader("wsd_pipe_rd", pipe)
+    ServerWSHandler(const std::string& socketName) :
+        WebSocketHandler(/* isClient = */ true, /* isMasking */ false),
+        _socketName(socketName)
     {
     }
 
-    /// Polls WSD commands and handles them.
-    bool pollAndDispatch()
+protected:
+    void handleMessage(const std::vector<char>& data) override
     {
-        std::string message;
-        const int ready = readLine(message, [](){ return SigUtil::getTerminationFlag(); });
-        if (ready <= 0)
+        std::string message(data.data(), data.size());
+
+#if !MOBILEAPP
+        if (UnitKit::get().filterKitMessage(this, message))
+            return;
+#endif
+        StringVector tokens = LOOLProtocol::tokenize(message);
+        Log::StreamLogger logger = Log::debug();
+        if (logger.enabled())
         {
-            // Termination is done via SIGTERM, which breaks the wait.
-            if (ready < 0)
+            logger << _socketName << ": recv [";
+            for (const auto& token : tokens)
             {
-                if (SigUtil::getTerminationFlag())
-                {
-                    LOG_INF("Poll interrupted in " << getName() << " and TerminationFlag is set.");
-                }
-
-                // Break.
-                return false;
+                logger << tokens.getParam(token) << ' ';
             }
 
-            // Timeout.
-            return true;
+            LOG_END(logger, true);
         }
 
-        LOG_INF("ForKit command: [" << message << "].");
-        try
+        // Note: Syntax or parsing errors here are unexpected and fatal.
+        if (SigUtil::getTerminationFlag())
         {
-            StringVector tokens = LOOLProtocol::tokenize(message);
-            if (tokens.size() == 2 && tokens.equals(0, "spawn"))
+            LOG_DBG("Termination flag set: skip message processing");
+        }
+        else if (tokens.size() == 2 && tokens.equals(0, "spawn"))
+        {
+            const int count = std::stoi(tokens[1]);
+            if (count > 0)
             {
-                const int count = std::stoi(tokens[1]);
-                if (count > 0)
-                {
-                    LOG_INF("Setting to spawn " << tokens[1] << " child" << (count == 1 ? "" : "ren") << " per request.");
-                    ForkCounter = count;
-                }
-                else
-                {
-                    LOG_WRN("Cannot spawn " << tokens[1] << " children as requested.");
-                }
+                LOG_INF("Setting to spawn " << tokens[1] << " child" << (count == 1 ? "" : "ren") << " per request.");
+                ForkCounter = count;
             }
-            else if (tokens.size() == 3 && tokens.equals(0, "setconfig"))
+            else
             {
-                // Currently only rlimit entries are supported.
-                if (!Rlimit::handleSetrlimitCommand(tokens))
-                {
-                    LOG_ERR("Unknown setconfig command: " << message);
-                }
+                LOG_WRN("Cannot spawn " << tokens[1] << " children as requested.");
             }
-            else
+        }
+        else if (tokens.size() == 3 && tokens.equals(0, "setconfig"))
+        {
+            // Currently only rlimit entries are supported.
+            if (!Rlimit::handleSetrlimitCommand(tokens))
             {
-                LOG_ERR("Unknown command: " << message);
+                LOG_ERR("Unknown setconfig command: " << message);
             }
         }
-        catch (const std::exception& exc)
+        else if (tokens.equals(0, "exit"))
+        {
+            LOG_INF("Setting TerminationFlag due to 'exit' command from parent.");
+            SigUtil::setTerminationFlag();
+        }
+        else
         {
-            LOG_ERR("Error while processing forkit request [" << message << "]: " << exc.what());
+            LOG_ERR("Bad or unknown token [" << tokens[0] << "]");
         }
+    }
 
-        return true;
+    void onDisconnect() override
+    {
+#if !MOBILEAPP
+        LOG_WRN("ForKit connection lost without exit arriving from wsd. Setting TerminationFlag");
+        SigUtil::setTerminationFlag();
+#endif
     }
 };
 
@@ -234,7 +248,7 @@ static void cleanupChildren()
             LOG_ERR("Unknown child " << exitedChildPid << " has exited");
         }
     }
-
+    
     // Now delete the jails.
     for (const auto& path : jails)
     {
@@ -545,18 +559,22 @@ int main(int argc, char** argv)
         Log::logger().setLevel(LogLevel);
     }
 
-    CommandDispatcher commandDispatcher(0);
+    SocketPoll mainPoll(Util::getThreadName());
+    mainPoll.runOnClientThread(); // We will do the polling on this thread.
+
+    WSHandler = std::make_shared<ServerWSHandler>("forkit_ws");
+
+#if !MOBILEAPP
+    mainPoll.insertNewUnixSocket(MasterLocation, FORKIT_URI, WSHandler);
+#endif
+
     LOG_INF("ForKit process is ready.");
 
     while (!SigUtil::getTerminationFlag())
     {
         UnitKit::get().invokeForKitTest();
 
-        if (!commandDispatcher.pollAndDispatch())
-        {
-            LOG_INF("Child dispatcher flagged for termination.");
-            break;
-        }
+        mainPoll.poll(POLL_TIMEOUT_MS);
 
 #if ENABLE_DEBUG
         if (!SingleKit)
diff --git a/net/Socket.hpp b/net/Socket.hpp
index d09c39334..fd0a5278d 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -543,6 +543,11 @@ public:
         }
     }
 
+    const std::thread::id &getThreadOwner()
+    {
+        return _owner;
+    }
+
     /// Are we running in either shutdown, or the polling thread.
     /// Asserts in the debug builds, otherwise just logs.
     void assertCorrectThread() const
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 93fe76827..ca72ecd61 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -24,7 +24,6 @@
 #include <Common.hpp>
 #include "FileServer.hpp"
 #include <IoUtil.hpp>
-#include "LOOLWSD.hpp"
 #include <Log.hpp>
 #include <Protocol.hpp>
 #include "Storage.hpp"
@@ -352,7 +351,6 @@ Admin::Admin() :
     SocketPoll("admin"),
     _model(AdminModel()),
     _forKitPid(-1),
-    _forKitWritePipe(-1),
     _lastTotalMemory(0),
     _lastJiffies(0),
     _lastSentCount(0),
@@ -594,10 +592,7 @@ void Admin::notifyForkit()
         << "setconfig limit_file_size_mb " << _defDocProcSettings.getLimitFileSizeMb() << '\n'
         << "setconfig limit_num_open_files " << _defDocProcSettings.getLimitNumberOpenFiles() << '\n';
 
-    if (_forKitWritePipe != -1)
-        IoUtil::writeToPipe(_forKitWritePipe, oss.str());
-    else
-        LOG_INF("Forkit write pipe not set (yet).");
+    LOOLWSD::sendMessageToForKit(oss.str());
 }
 
 void Admin::triggerMemoryCleanup(const size_t totalMem)
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index 5d3cd4b05..e39d89fe7 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -16,6 +16,7 @@
 #include "Log.hpp"
 
 #include "net/WebSocketHandler.hpp"
+#include "LOOLWSD.hpp"
 
 class Admin;
 
@@ -91,7 +92,6 @@ public:
     void rmDoc(const std::string& docKey);
 
     void setForKitPid(const int forKitPid) { _forKitPid = forKitPid; _model.setForKitPid(forKitPid);}
-    void setForKitWritePipe(const int forKitWritePipe) { _forKitWritePipe = forKitWritePipe; }
 
     /// Callers must ensure that modelMutex is acquired
     AdminModel& getModel();
@@ -156,7 +156,6 @@ private:
     /// the Admin Poll thread.
     AdminModel _model;
     int _forKitPid;
-    int _forKitWritePipe;
     size_t _lastTotalMemory;
     size_t _lastJiffies;
     uint64_t _lastSentCount;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index e304cd60c..251e97bba 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -58,9 +58,11 @@ public:
     }
 };
 
+#include "LOOLWSD.hpp"
+
 /// Represents a new LOK child that is read
 /// to host a document.
-class ChildProcess
+class ChildProcess : public WSProcess
 {
 public:
     /// @param pid is the process ID of the child.
@@ -70,12 +72,9 @@ public:
                  const std::shared_ptr<StreamSocket>& socket,
                  const Poco::Net::HTTPRequest &request) :
 
-        _pid(pid),
-        _jailId(jailId),
-        _ws(std::make_shared<WebSocketHandler>(socket, request)),
-        _socket(socket)
+        WSProcess("ChildProcess", pid, socket, std::make_shared<WebSocketHandler>(socket, request)),
+        _jailId(jailId)
     {
-        LOG_INF("ChildProcess ctor [" << _pid << "].");
     }
 
 
@@ -83,125 +82,12 @@ public:
 
     const ChildProcess& operator=(ChildProcess&& other) = delete;
 
-    ~ChildProcess()
-    {
-        LOG_DBG("~ChildProcess dtor [" << _pid << "].");
-
-        if (_pid <= 0)
-            return;
-
-        terminate();
-
-        // No need for the socket anymore.
-        _ws.reset();
-        _socket.reset();
-    }
-
     void setDocumentBroker(const std::shared_ptr<DocumentBroker>& docBroker);
     std::shared_ptr<DocumentBroker> getDocumentBroker() const { return _docBroker.lock(); }
-
-    /// Let the child close a nice way.
-    void close()
-    {
-        if (_pid < 0)
-            return;
-
-        try
-        {
-            LOG_DBG("Closing ChildProcess [" << _pid << "].");
-
-            // Request the child to exit
-            if (isAlive())
-            {
-                LOG_DBG("Stopping ChildProcess [" << _pid << "] by sending 'exit' command.");
-                sendTextFrame("exit");
-            }
-
-            // Shutdown the socket.
-            if (_ws)
-                _ws->shutdown();
-        }
-        catch (const std::exception& ex)
-        {
-            LOG_ERR("Error while closing child process: " << ex.what());
-        }
-
-        _pid = -1; // Detach from child.
-    }
-
-    /// Kill or abandon the child.
-    void terminate()
-    {
-        if (_pid < 0)
-            return;
-
-#if !MOBILEAPP
-        if (::kill(_pid, 0) == 0)
-        {
-            LOG_INF("Killing child [" << _pid << "].");
-            if (!SigUtil::killChild(_pid))
-            {
-                LOG_ERR("Cannot terminate lokit [" << _pid << "]. Abandoning.");
-            }
-        }
-#else
-        // What to do? Throw some unique exception that the outermost call in the thread catches and
-        // exits from the thread?
-#endif
-        _pid = -1;
-    }
-
-    Poco::Process::PID getPid() const { return _pid; }
     const std::string& getJailId() const { return _jailId; }
 
-    /// Send a text payload to the child-process WS.
-    bool sendTextFrame(const std::string& data)
-    {
-        try
-        {
-            if (_ws)
-            {
-                LOG_TRC("Send DocBroker to Child message: [" << LOOLProtocol::getAbbreviatedMessage(data) << "].");
-                _ws->sendMessage(data);
-                return true;
-            }
-        }
-        catch (const std::exception& exc)
-        {
-            LOG_ERR("Failed to send child [" << _pid << "] data [" <<
-                    LOOLProtocol::getAbbreviatedMessage(data) << "] due to: " << exc.what());
-            throw;
-        }
-
-        LOG_WRN("No socket between DocBroker and child to send [" << LOOLProtocol::getAbbreviatedMessage(data) << "]");
-        return false;
-    }
-
-    /// Check whether this child is alive and socket not in error.
-    /// Note: zombies will show as alive, and sockets have waiting
-    /// time after the other end-point closes. So this isn't accurate.
-    bool isAlive() const
-    {
-#if !MOBILEAPP
-        try
-        {
-            return _pid > 1 && _ws && ::kill(_pid, 0) == 0;
-        }
-        catch (const std::exception&)
-        {
-        }
-
-        return false;
-#else
-        return _pid > 1;
-#endif
-    }
-
 private:
-    Poco::Process::PID _pid;
     const std::string _jailId;
-    std::shared_ptr<WebSocketHandler> _ws;
-    std::shared_ptr<Socket> _socket;
     std::weak_ptr<DocumentBroker> _docBroker;
 };
 
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 54ba8e2b9..e9e17b86f 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -91,7 +91,6 @@ using Poco::Net::PartHandler;
 #include <Poco/Net/DNS.h>
 #include <Poco/Net/HostEntry.h>
 #include <Poco/Path.h>
-#include <Poco/Pipe.h>
 #include <Poco/Process.h>
 #include <Poco/SAX/InputSource.h>
 #include <Poco/StreamCopier.h>
@@ -387,16 +386,11 @@ static int forkChildren(const int number)
 #else
         const std::string aMessage = "spawn " + std::to_string(number) + "\n";
         LOG_DBG("MasterToForKit: " << aMessage.substr(0, aMessage.length() - 1));
-        if (write(LOOLWSD::ForKitWritePipe, aMessage.c_str(), aMessage.length()) > 0)
+        LOOLWSD::sendMessageToForKit(aMessage);
 #endif
-        {
-            OutstandingForks += number;
-            LastForkRequestTime = std::chrono::steady_clock::now();
-            return number;
-        }
-
-        LOG_ERR("No forkit pipe while rebalancing children.");
-        return -1; // Fail.
+        OutstandingForks += number;
+        LastForkRequestTime = std::chrono::steady_clock::now();
+        return number;
     }
 
     return 0;
@@ -704,8 +698,8 @@ inline std::string getServiceURI(const std::string &sub, bool asAdmin = false)
 std::atomic<uint64_t> LOOLWSD::NextConnectionId(1);
 
 #ifndef KIT_IN_PROCESS
-std::atomic<int> LOOLWSD::ForKitWritePipe(-1);
 std::atomic<int> LOOLWSD::ForKitProcId(-1);
+std::shared_ptr<ForKitProcess> LOOLWSD::ForKitProc;
 #endif
 #if !MOBILEAPP
 bool LOOLWSD::NoCapsForKit = false;
@@ -780,6 +774,42 @@ public:
 
     /// Check prisoners are still alive and balanced.
     void wakeupHook() override;
+
+    // Resets the forkit porcess object
+    void setForKitProcess(const std::weak_ptr<ForKitProcess>& forKitProc)
+    {
+        assertCorrectThread();
+        _forKitProc = forKitProc;
+    }
+
+    void sendMessageToForKit(const std::string& msg)
+    {
+        if (std::this_thread::get_id() == getThreadOwner())
+        {
+            // Speed up sending the message if the request comes from owner thread
+            std::shared_ptr<ForKitProcess> forKitProc = _forKitProc.lock();
+            if (forKitProc)
+            {
+                forKitProc->sendTextFrame(msg);
+            }
+        }
+        else
+        {
+            // Put the message in the owner's thread queue to be send later
+            // because WebSocketHandler is not thread safe and otherwise we
+            // should synchronize inside WebSocketHandler.
+            addCallback([=]{
+                std::shared_ptr<ForKitProcess> forKitProc = _forKitProc.lock();
+                if (forKitProc)
+                {
+                    forKitProc->sendTextFrame(msg);
+                }
+            });
+        }
+    }
+
+private:
+    std::weak_ptr<ForKitProcess> _forKitProc;
 };
 
 /// This thread listens for and accepts prisoner kit processes.
@@ -799,6 +829,17 @@ public:
     }
 };
 
+void ForKitProcWSHandler::handleMessage(const std::vector<char> &data)
+{
+    LOG_TRC("ForKitProcWSHandler: handling incoming [" << LOOLProtocol::getAbbreviatedMessage(&data[0], data.size()) << "].");
+    const std::string firstLine = LOOLProtocol::getFirstLine(&data[0], data.size());
+    const StringVector tokens = LOOLProtocol::tokenize(firstLine.data(), firstLine.size());
+
+    // Just add here the processing of specific received messages 
+
+    LOG_ERR("ForKitProcWSHandler: unknown command: " << tokens[0]);
+}
+
 LOOLWSD::LOOLWSD()
 {
 }
@@ -1708,13 +1749,10 @@ bool LOOLWSD::createForKit()
         Admin::instance().setForKitPid(ForKitProcId);
     }
 
-    if (ForKitWritePipe != -1)
-    {
-        close(ForKitWritePipe);
-        ForKitWritePipe = -1;
-        Admin::instance().setForKitWritePipe(ForKitWritePipe);
-    }
-
+    // Below line will be executed by PrisonerPoll thread.
+    ForKitProc = nullptr;
+    PrisonerPoll.setForKitProcess(ForKitProc);
+    
     // ForKit always spawns one.
     ++OutstandingForks;
 
@@ -1722,17 +1760,13 @@ bool LOOLWSD::createForKit()
             args.cat(std::string(" "), 0));
 
     LastForkRequestTime = std::chrono::steady_clock::now();
-    int childStdin = -1;
-    int child = Util::spawnProcess(forKitPath, args, nullptr, &childStdin);
-
-    ForKitWritePipe = childStdin;
+    int child = Util::spawnProcess(forKitPath, args);
     ForKitProcId = child;
 
     LOG_INF("Forkit process launched: " << ForKitProcId);
 
     // Init the Admin manager
     Admin::instance().setForKitPid(ForKitProcId);
-    Admin::instance().setForKitWritePipe(ForKitWritePipe);
 
     const int balance = LOOLWSD::NumPreSpawnedChildren - OutstandingForks;
     if (balance > 0)
@@ -1742,6 +1776,11 @@ bool LOOLWSD::createForKit()
 #endif
 }
 
+void LOOLWSD::sendMessageToForKit(const std::string& message)
+{
+    PrisonerPoll.sendMessageToForKit(message);
+}
+
 #endif // !MOBILEAPP
 
 #ifdef FUZZER
@@ -1903,6 +1942,20 @@ private:
 
             LOG_TRC("Child connection with URI [" << LOOLWSD::anonymizeUrl(request.getURI()) << "].");
             Poco::URI requestURI(request.getURI());
+#ifndef KIT_IN_PROCESS
+            if (requestURI.getPath() == FORKIT_URI)
+            {
+                if (socket->getPid() != LOOLWSD::ForKitProcId)
+                {
+                    LOG_WRN("Connection request received on " << FORKIT_URI << " endpoint from unexpected ForKit process. Skipped.");
+                    return;
+                }
+                LOOLWSD::ForKitProc = std::make_shared<ForKitProcess>(LOOLWSD::ForKitProcId, socket, request);
+                socket->getInBuffer().clear();
+                PrisonerPoll.setForKitProcess(LOOLWSD::ForKitProc);
+                return;
+            }
+#endif
             if (requestURI.getPath() != NEW_CHILD_URI)
             {
                 LOG_ERR("Invalid incoming URI.");
@@ -3621,7 +3674,8 @@ int LOOLWSD::innerMain()
     LOG_INF("Waiting for forkit process to exit");
     int status = 0;
     waitpid(ForKitProcId, &status, WUNTRACED);
-    close(ForKitWritePipe);
+    ForKitProcId = -1;
+    ForKitProc.reset();
 #endif
 
     // In case forkit didn't cleanup properly, don't leave jails behind.
diff --git a/wsd/LOOLWSD.hpp b/wsd/LOOLWSD.hpp
index 469d26bec..33056538f 100644
--- a/wsd/LOOLWSD.hpp
+++ b/wsd/LOOLWSD.hpp
@@ -17,6 +17,8 @@
 #include <set>
 #include <string>
 
+#include <signal.h>
+
 #include <Poco/Path.h>
 #include <Poco/Process.h>
 #include <Poco/Util/AbstractConfiguration.h>
@@ -25,6 +27,7 @@
 
 #include "Util.hpp"
 #include "FileUtil.hpp"
+#include "WebSocketHandler.hpp"
 
 class ChildProcess;
 class TraceFileWriter;
@@ -36,6 +39,170 @@ std::shared_ptr<ChildProcess> getNewChild_Blocks(
                                                  const std::string& uri
 #endif
                                                  );
+// This is common code used to setup as socket to both
+// forkit and child document processes via a websocket.
+// In general, a WSProcess instance represents a child
+// process with which we can communicate through websocket.
+class WSProcess
+{
+public:
+    /// @param pid is the process ID.
+    /// @param socket is the underlying Sockeet to the process.
+    WSProcess(const std::string& name, 
+              const Poco::Process::PID pid,
+              const std::shared_ptr<StreamSocket>& socket,
+              std::shared_ptr<WebSocketHandler> handler) :
+
+        _name(name),
+        _pid(pid),
+        _ws(handler),
+        _socket(socket)
+    {
+        LOG_INF(_name << " ctor [" << _pid << "].");
+    }
+
+
+    WSProcess(WSProcess&& other) = delete;
+
+    const WSProcess& operator=(WSProcess&& other) = delete;
+
+    virtual ~WSProcess()
+    {
+        LOG_DBG("~" << _name << " dtor [" << _pid << "].");
+
+        if (_pid <= 0)
+            return;
+
+        terminate();
+
+        // No need for the socket anymore.
+        _ws.reset();
+        _socket.reset();
+    }
+
+    /// Let the child close a nice way.
+    void close()
+    {
+        if (_pid < 0)
+            return;
+
+        try
+        {
+            LOG_DBG("Closing ChildProcess [" << _pid << "].");
+
+            // Request the child to exit
+            if (isAlive())
+            {
+                LOG_DBG("Stopping ChildProcess [" << _pid << "] by sending 'exit' command.");
+                sendTextFrame("exit");
+            }
+
+            // Shutdown the socket.
+            if (_ws)
+                _ws->shutdown();
+        }
+        catch (const std::exception& ex)
+        {
+            LOG_ERR("Error while closing child process: " << ex.what());
+        }
+
+        _pid = -1; // Detach from child.
+    }
+
+    /// Kill or abandon the child.
+    void terminate()
+    {
+        if (_pid < 0)
+            return;
+
+#if !MOBILEAPP
+        if (::kill(_pid, 0) == 0)
+        {
+            LOG_INF("Killing child [" << _pid << "].");
+            if (!SigUtil::killChild(_pid))
+            {
+                LOG_ERR("Cannot terminate lokit [" << _pid << "]. Abandoning.");
+            }
+        }
+#else
+        // What to do? Throw some unique exception that the outermost call in the thread catches and
+        // exits from the thread?
+#endif
+        _pid = -1;
+    }
+
+    Poco::Process::PID getPid() const { return _pid; }
+
+    /// Send a text payload to the child-process WS.
+    virtual bool sendTextFrame(const std::string& data)
+    {
+        try
+        {
+            if (_ws)
+            {
+                LOG_TRC("Send to " << _name << " message: [" << LOOLProtocol::getAbbreviatedMessage(data) << "].");
+                _ws->sendMessage(data);
+                return true;
+            }
+        }
+        catch (const std::exception& exc)
+        {
+            LOG_ERR("Failed to send " << _name << " [" << _pid << "] data [" <<
+                    LOOLProtocol::getAbbreviatedMessage(data) << "] due to: " << exc.what());
+            throw;
+        }
+
+        LOG_WRN("No socket to " << _name << " to send [" << LOOLProtocol::getAbbreviatedMessage(data) << "]");
+        return false;
+    }
+
+    /// Check whether this child is alive and socket not in error.
+    /// Note: zombies will show as alive, and sockets have waiting
+    /// time after the other end-point closes. So this isn't accurate.
+    virtual bool isAlive() const
+    {
+#if !MOBILEAPP
+        try
+        {
+            return _pid > 1 && _ws && ::kill(_pid, 0) == 0;
+        }
+        catch (const std::exception&)
+        {
+        }
+
+        return false;
+#else
+        return _pid > 1;
+#endif
+    }
+
+    std::string _name;
+    Poco::Process::PID _pid;
+    std::shared_ptr<WebSocketHandler> _ws;
+    std::shared_ptr<Socket> _socket;
+};
+
+class ForKitProcWSHandler: public WebSocketHandler, public std::enable_shared_from_this<ForKitProcWSHandler>
+{
+public:
+
+    ForKitProcWSHandler(const std::weak_ptr<StreamSocket>& socket, const Poco::Net::HTTPRequest& request)
+    : WebSocketHandler(socket, request)
+    {
+    }
+
+    virtual void handleMessage(const std::vector<char> &data) override;
+};
+
+class ForKitProcess : public WSProcess
+{
+public:
+    ForKitProcess(int pid, std::shared_ptr<StreamSocket>& socket, const Poco::Net::HTTPRequest &request)
+        : WSProcess("ForKit", pid, socket, std::make_shared<ForKitProcWSHandler>(socket, request))
+    {
+        socket->setHandler(_ws);
+    }
+};
 
 /// The Server class which is responsible for all
 /// external interactions.
@@ -57,7 +224,7 @@ public:
     static bool SingleKit;
 #endif
 #endif
-    static std::atomic<int> ForKitWritePipe;
+    static std::shared_ptr<ForKitProcess> ForKitProc;
     static std::atomic<int> ForKitProcId;
     static bool DummyLOK;
     static std::string FuzzFileName;
@@ -188,6 +355,9 @@ public:
     /// Return true when successfull.
     static bool createForKit();
 
+    /// Sends a message to ForKit through PrisonerPoll.
+    static void sendMessageToForKit(const std::string& message);
+
     /// Checks forkit (and respawns), rebalances
     /// child kit processes and cleans up DocBrokers.
     static void doHousekeeping();


More information about the Libreoffice-commits mailing list