[Libreoffice-commits] online.git: Branch 'private/hcvcastro/socket' - loolwsd/Admin.cpp loolwsd/Common.hpp loolwsd/LOOLBroker.cpp loolwsd/LOOLWSD.cpp loolwsd/LOOLWSD.hpp loolwsd/MasterProcessSession.cpp

Henry Castro hcastro at collabora.com
Sun Apr 3 19:54:26 UTC 2016


 loolwsd/Admin.cpp                |    2 -
 loolwsd/Common.hpp               |    1 
 loolwsd/LOOLBroker.cpp           |   72 ++++++++++++++++++++++++-------------
 loolwsd/LOOLWSD.cpp              |   75 ++++++++++++++++++++++-----------------
 loolwsd/LOOLWSD.hpp              |    2 +
 loolwsd/MasterProcessSession.cpp |    6 +--
 6 files changed, 97 insertions(+), 61 deletions(-)

New commits:
commit 4125d2e851260c3bfb669b1f375e8ae5c83787d8
Author: Henry Castro <hcastro at collabora.com>
Date:   Sun Apr 3 15:54:37 2016 -0400

    loolwsd: replace fifo for socket WSD -> Broker

diff --git a/loolwsd/Admin.cpp b/loolwsd/Admin.cpp
index 2ad3d3d..800c2bc 100644
--- a/loolwsd/Admin.cpp
+++ b/loolwsd/Admin.cpp
@@ -194,7 +194,7 @@ void AdminRequestHandler::handleWSRequests(HTTPServerRequest& request, HTTPServe
                         {
                             if (std::stoi(tokens[1]))
                             {
-                                IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, firstLine + " \r\n");
+                                LOOLWSD::DlgBroker->sendMessage(firstLine);
                             }
                         }
                         catch(std::exception& e)
diff --git a/loolwsd/Common.hpp b/loolwsd/Common.hpp
index 3fafc01..27279c5 100644
--- a/loolwsd/Common.hpp
+++ b/loolwsd/Common.hpp
@@ -35,7 +35,6 @@ constexpr int READ_BUFFER_SIZE = 2048;
 constexpr int SMALL_MESSAGE_SIZE = READ_BUFFER_SIZE / 2;
 
 constexpr auto CHILD_URI = "/loolws/child?";
-constexpr auto FIFO_LOOLWSD = "loolwsdfifo";
 constexpr auto FIFO_PATH = "pipe";
 constexpr auto JAILED_DOCUMENT_ROOT = "/user/docs/";
 constexpr auto SSL_KEY_FILE = "key.pem";
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index 18ba17f..5013736 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -31,8 +31,6 @@ typedef int (LokHookPreInit)  (const char *install_path, const char *user_profil
 
 using Poco::ProcessHandle;
 
-static int readerBroker = -1;
-
 static std::string loolkitPath;
 static std::atomic<unsigned> forkCounter;
 static std::chrono::steady_clock::time_point lastMaintenanceTime = std::chrono::steady_clock::now();
@@ -152,11 +150,17 @@ namespace
     }
 }
 
-class PipeRunnable: public Runnable
+class CommandRunnable: public Runnable
 {
 public:
-    PipeRunnable()
+    CommandRunnable(const std::shared_ptr<DialogSocket>& dlgWsd) :
+        _dlgWsd(dlgWsd)
+    {
+    }
+
+    ~CommandRunnable()
     {
+        _dlgWsd->shutdown();
     }
 
     bool createSession(const std::shared_ptr<ChildProcess>& child, const std::string& session, const std::string& url)
@@ -249,19 +253,39 @@ public:
 
     void run() override
     {
-        static const std::string thread_name = "brk_pipe_reader";
+        static const std::string thread_name = "brk_cmd_reader";
+        const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
 
         if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
             Log::error("Cannot set thread name to " + thread_name + ".");
 
         Log::debug("Thread [" + thread_name + "] started.");
 
-        IoUtil::PipeReader pipeReader(FIFO_LOOLWSD, readerBroker);
-        pipeReader.process([this](std::string& message) { handleInput(message); return true; },
-                           []() { return TerminationFlag; });
+        try
+        {
+            while (!TerminationFlag)
+            {
+                if (_dlgWsd->poll(waitTime, Socket::SELECT_READ))
+                {
+                    std::string message;
+                    _dlgWsd->receiveMessage(message);
+                    handleInput(message);
+                }
+            }
+            _dlgWsd->shutdown();
+        }
+        catch (const Exception& exc)
+        {
+            Log::error() << "CommandRunnable::run: Exception: " << exc.displayText()
+                         << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
+                         << Log::end;
+        }
 
         Log::debug("Thread [" + thread_name + "] finished.");
     }
+
+private:
+    std::shared_ptr<DialogSocket> _dlgWsd;
 };
 
 /// Initializes LibreOfficeKit for cross-fork re-use.
@@ -476,11 +500,18 @@ int main(int argc, char** argv)
     assert(!childRoot.empty());
     assert(numPreSpawnedChildren >= 1);
 
-    const Path pipePath = Path::forDirectory(childRoot + Path::separator() + FIFO_PATH);
-    const std::string pipeLoolwsd = Path(pipePath, FIFO_LOOLWSD).toString();
-    if ( (readerBroker = open(pipeLoolwsd.c_str(), O_RDONLY) ) < 0 )
+    std::shared_ptr<DialogSocket> dlgWsd = std::make_shared<DialogSocket>();
+    const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
+
+    try
     {
-        Log::error("Error: failed to open pipe [" + pipeLoolwsd + "] read only. Exiting.");
+        dlgWsd->connect(SocketAddress("localhost", COMMAND_PORT_NUMBER), waitTime);
+    }
+    catch (const Exception& exc)
+    {
+        Log::error() << "LOOLBroker::main: Exception: " << exc.displayText()
+                     << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
+                     << Log::end;
         std::exit(Application::EXIT_SOFTWARE);
     }
 
@@ -490,14 +521,8 @@ int main(int argc, char** argv)
     if (!std::getenv("LOK_VIEW_CALLBACK"))
         Log::info("Note: LOK_VIEW_CALLBACK is not set.");
 
-    const std::string pipeBroker = Path(pipePath, FIFO_BROKER).toString();
-    if (mkfifo(pipeBroker.c_str(), 0666) < 0 && errno != EEXIST)
-    {
-        Log::error("Error: Failed to create pipe FIFO [" + FIFO_BROKER + "].");
-        std::exit(Application::EXIT_SOFTWARE);
-    }
-
     // Open notify pipe
+    const Path pipePath = Path::forDirectory(childRoot + Path::separator() + FIFO_PATH);
     const std::string pipeNotify = Path(pipePath, FIFO_NOTIFY).toString();
     if ((writerNotify = open(pipeNotify.c_str(), O_WRONLY) ) < 0)
     {
@@ -533,10 +558,10 @@ int main(int argc, char** argv)
         dropCapability(CAP_FOWNER);
     }
 
-    PipeRunnable pipeHandler;
-    Poco::Thread pipeThread;
+    CommandRunnable commandHandler(dlgWsd);
+    Poco::Thread commandThread;
 
-    pipeThread.start(pipeHandler);
+    commandThread.start(commandHandler);
 
     Log::info("loolbroker is ready.");
 
@@ -710,9 +735,8 @@ int main(int argc, char** argv)
     _childProcesses.clear();
     _newChildProcesses.clear();
 
-    pipeThread.join();
+    commandThread.join();
     close(writerNotify);
-    close(readerBroker);
 
     Log::info("Process [loolbroker] finished.");
     return Application::EXIT_OK;
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 1dd6eef..5fe1bd4 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -94,6 +94,7 @@ DEALINGS IN THE SOFTWARE.
 #include <Poco/Net/SecureServerSocket.h>
 #include <Poco/Net/ServerSocket.h>
 #include <Poco/Net/SocketAddress.h>
+#include <Poco/Net/DialogSocket.h>
 #include <Poco/Net/WebSocket.h>
 #include <Poco/Path.h>
 #include <Poco/Process.h>
@@ -147,6 +148,7 @@ using Poco::Net::SecureServerSocket;
 using Poco::Net::ServerSocket;
 using Poco::Net::Socket;
 using Poco::Net::SocketAddress;
+using Poco::Net::DialogSocket;
 using Poco::Net::WebSocket;
 using Poco::Net::WebSocketException;
 using Poco::Path;
@@ -464,9 +466,9 @@ private:
             session->setEditLock(true);
 
         // Request a kit process for this doc.
-        const std::string aMessage = "request " + id + " " + docKey + "\n";
-        Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1));
-        IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage);
+        const std::string message = "request " + id + " " + docKey;
+        Log::debug("MasterToBroker: " + message);
+        LOOLWSD::DlgBroker->sendMessage(message);
 
         QueueHandler handler(queue, session, "wsd_queue_" + session->getId());
 
@@ -879,7 +881,7 @@ private:
 };
 
 std::atomic<unsigned> LOOLWSD::NextSessionId;
-int LOOLWSD::BrokerWritePipe = -1;
+std::unique_ptr<DialogSocket> LOOLWSD::DlgBroker;
 std::string LOOLWSD::Cache = LOOLWSD_CACHEDIR;
 std::string LOOLWSD::SysTemplate;
 std::string LOOLWSD::LoTemplate;
@@ -1080,23 +1082,47 @@ void LOOLWSD::displayVersion()
 
 Process::PID LOOLWSD::createBroker()
 {
-    Process::Args args;
+    Process::PID brokerPid = -1;
 
-    args.push_back("--losubpath=" + LOOLWSD::LoSubPath);
-    args.push_back("--systemplate=" + SysTemplate);
-    args.push_back("--lotemplate=" + LoTemplate);
-    args.push_back("--childroot=" + ChildRoot);
-    args.push_back("--numprespawns=" + std::to_string(NumPreSpawnedChildren));
-    args.push_back("--clientport=" + std::to_string(ClientPortNumber));
+    try
+    {
+        Process::Args args;
+
+        args.push_back("--losubpath=" + LOOLWSD::LoSubPath);
+        args.push_back("--systemplate=" + SysTemplate);
+        args.push_back("--lotemplate=" + LoTemplate);
+        args.push_back("--childroot=" + ChildRoot);
+        args.push_back("--numprespawns=" + std::to_string(NumPreSpawnedChildren));
+        args.push_back("--clientport=" + std::to_string(ClientPortNumber));
+
+        const std::string brokerPath = Path(Application::instance().commandPath()).parent().toString() + "loolbroker";
+        const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
 
-    const std::string brokerPath = Path(Application::instance().commandPath()).parent().toString() + "loolbroker";
+        Log::info("Launching Broker #1: " + brokerPath + " " +
+                  Poco::cat(std::string(" "), args.begin(), args.end()));
 
-    Log::info("Launching Broker #1: " + brokerPath + " " +
-              Poco::cat(std::string(" "), args.begin(), args.end()));
+        ServerSocket srvCommand(COMMAND_PORT_NUMBER);
+        ProcessHandle child = Process::launch(brokerPath, args);
 
-    ProcessHandle child = Process::launch(brokerPath, args);
+        if (srvCommand.poll(waitTime, Socket::SELECT_READ))
+        {
+            DlgBroker.reset(new DialogSocket(srvCommand.acceptConnection()));
+            brokerPid = child.id();
+        }
+        else
+        {
+            Log::error("Error: failed to socket connection with broker.");
+            Util::requestTermination(child.id());
+        }
+    }
+    catch (const Exception& exc)
+    {
+        Log::error() << "LOOLWSD::createBroker: Exception: " << exc.displayText()
+                     << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
+                     << Log::end;
+    }
 
-    return child.id();
+    return brokerPid;
 }
 
 int LOOLWSD::main(const std::vector<std::string>& /*args*/)
@@ -1162,13 +1188,6 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
         return Application::EXIT_SOFTWARE;
     }
 
-    const std::string pipeLoolwsd = Path(pipePath, FIFO_LOOLWSD).toString();
-    if (mkfifo(pipeLoolwsd.c_str(), 0666) < 0 && errno != EEXIST)
-    {
-        Log::error("Error: Failed to create pipe FIFO [" + pipeLoolwsd + "].");
-        return Application::EXIT_SOFTWARE;
-    }
-
     // Open notify pipe
     int pipeFlags = O_RDONLY | O_NONBLOCK;
     int notifyPipe = -1;
@@ -1237,12 +1256,6 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
 
     srv2.start();
 
-    if ( (BrokerWritePipe = open(pipeLoolwsd.c_str(), O_WRONLY) ) < 0 )
-    {
-        Log::error("Error: failed to open pipe [" + pipeLoolwsd + "] write only.");
-        return Application::EXIT_SOFTWARE;
-    }
-
     threadPool.start(admin);
 
     TestInput input(*this, svs, srv);
@@ -1380,15 +1393,13 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
     threadPool.joinAll();
 
     // Terminate child processes
-    IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\n");
+    DlgBroker->shutdown();
     Log::info("Requesting child process " + std::to_string(brokerPid) + " to terminate");
     Util::requestTermination(brokerPid);
 
     // wait broker process finish
     waitpid(brokerPid, &status, WUNTRACED);
 
-    close(BrokerWritePipe);
-
     Log::info("Cleaning up childroot directory [" + ChildRoot + "].");
     std::vector<std::string> jails;
     File(ChildRoot).list(jails);
diff --git a/loolwsd/LOOLWSD.hpp b/loolwsd/LOOLWSD.hpp
index 59d7111..9d5e5d3 100644
--- a/loolwsd/LOOLWSD.hpp
+++ b/loolwsd/LOOLWSD.hpp
@@ -19,6 +19,7 @@
 #include <Poco/Random.h>
 #include <Poco/Util/OptionSet.h>
 #include <Poco/Util/ServerApplication.h>
+#include <Poco/Net/DialogSocket.h>
 
 #include "Auth.hpp"
 #include "Common.hpp"
@@ -36,6 +37,7 @@ public:
     // An Application is a singleton anyway,
     // so just keep these as statics.
     static std::atomic<unsigned> NextSessionId;
+    static std::unique_ptr<Poco::Net::DialogSocket> DlgBroker;
     static int NumPreSpawnedChildren;
     static int BrokerWritePipe;
     static bool DoTest;
diff --git a/loolwsd/MasterProcessSession.cpp b/loolwsd/MasterProcessSession.cpp
index 432e751..f8a21b6 100644
--- a/loolwsd/MasterProcessSession.cpp
+++ b/loolwsd/MasterProcessSession.cpp
@@ -782,9 +782,9 @@ void MasterProcessSession::dispatchChild()
         {
             Log::info() << "Retrying child permission... " << retries << Log::end;
             // request again new URL session
-            const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + '\n';
-            Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
-            IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, message);
+            const std::string message = "request " + getId() + " " + _docBroker->getDocKey();
+            Log::trace("MasterToBroker: " + message);
+            LOOLWSD::DlgBroker->sendMessage(message);
         }
     }
 


More information about the Libreoffice-commits mailing list