[Libreoffice-commits] online.git: Branch 'private/hcvcastro/forking' - loolwsd/LOOLKit.cpp

Henry Castro hcastro at collabora.com
Sun Sep 27 10:39:21 PDT 2015


 loolwsd/LOOLKit.cpp |  172 +++++++++++++++++++++++++++++++++-------------------
 1 file changed, 112 insertions(+), 60 deletions(-)

New commits:
commit 02d114191d23e701282c14ff5cd580eaf42564e8
Author: Henry Castro <hcastro at collabora.com>
Date:   Sun Sep 27 13:39:09 2015 -0400

    loolwsd: add Connection thread class

diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index bacb2dc..ac5ebc3 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -46,6 +46,9 @@ using Poco::StringTokenizer;
 using Poco::Exception;
 using Poco::Process;
 
+const int MASTER_PORT_NUMBER = 9981;
+const std::string CHILD_URI = "/loolws/child/";
+
 class QueueHandler: public Runnable
 {
 public:
@@ -80,8 +83,115 @@ private:
     tsqueue<std::string>& _queue;
 };
 
-const int MASTER_PORT_NUMBER = 9981;
-const std::string CHILD_URI = "/loolws/child/";
+class Connection: public Runnable
+{
+public:
+    Connection(LibreOfficeKit *loKit, Poco::UInt64 childId, const std::string& threadId) :
+        _loKit(loKit),
+        _childId(childId),
+        _threadId(threadId)
+    {
+    }
+
+    void start()
+    {
+        _thread.start(*this);
+    }
+
+    bool isRunning()
+    {
+        return _thread.isRunning();
+    }
+
+    void run() override
+    {
+#ifdef __linux
+        if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>("lokit_connection"), 0, 0, 0) != 0)
+            std::cout << Util::logPrefix() << "Cannot set thread name :" << strerror(errno) << std::endl;
+#endif
+        try
+        {
+            // Open websocket connection between the child process and the
+            // parent. The parent forwards us requests that it can't handle.
+
+            HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER);
+            cs.setTimeout(0);
+            HTTPRequest request(HTTPRequest::HTTP_GET, CHILD_URI);
+            HTTPResponse response;
+            std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response));
+
+            std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, _loKit));
+            ws->setReceiveTimeout(0);
+
+            // child Jail TID PID
+            std::string hello("child " + std::to_string(_childId) + " " +
+                _threadId + " " + std::to_string(Process::id()));
+            session->sendTextFrame(hello);
+
+            tsqueue<std::string> queue;
+            Thread queueHandlerThread;
+            QueueHandler handler(queue);
+
+            handler.setSession(session);
+            queueHandlerThread.start(handler);
+
+            int flags;
+            int n;
+            do
+            {
+                char buffer[1024];
+                n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+
+                if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
+                {
+                    std::string firstLine = getFirstLine(buffer, n);
+                    StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+
+                    // The only kind of messages a child process receives are the single-line ones (?)
+                    assert(firstLine.size() == static_cast<std::string::size_type>(n));
+
+                    // Check if it is a "canceltiles" and in that case remove outstanding
+                    // "tile" messages from the queue.
+                    if (tokens.count() == 1 && tokens[0] == "canceltiles")
+                    {
+                        queue.remove_if([](std::string& x) {
+                            return (x.find("tile ") == 0 && x.find("id=") == std::string::npos);
+                        });
+                    }
+                    else
+                    {
+                        queue.put(firstLine);
+                    }
+                }
+            }
+            while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE);
+
+            queue.clear();
+            queue.put("eof");
+            queueHandlerThread.join();
+        }
+
+        catch (Exception& exc)
+        {
+            std::cout << Util::logPrefix() + "Exception: " + exc.what() << std::endl;
+        }
+        catch (std::exception& exc)
+        {
+            std::cout << Util::logPrefix() + "Exception: " + exc.what() << std::endl;
+        }
+    }
+
+    ~Connection()
+    {
+        //_thread.stop();
+    }
+
+private:
+    LibreOfficeKit *_loKit;
+    Poco::UInt64 _childId;
+    std::string _threadId;
+    Thread _thread;
+};
 
 void run_lok_main(const std::string &loSubPath, Poco::UInt64 _childId)
 {
@@ -107,64 +217,6 @@ void run_lok_main(const std::string &loSubPath, Poco::UInt64 _childId)
             exit(-1);
         }
 
-        // Open websocket connection between the child process and the
-        // parent. The parent forwards us requests that it can't handle.
-
-        HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER);
-        cs.setTimeout(0);
-        HTTPRequest request(HTTPRequest::HTTP_GET, CHILD_URI);
-        HTTPResponse response;
-        std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response));
-
-        std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, loKit));
-
-        ws->setReceiveTimeout(0);
-
-        std::string hello("child " + std::to_string(_childId) + " " + std::to_string(Process::id()));
-        session->sendTextFrame(hello);
-
-        tsqueue<std::string> queue;
-        Thread queueHandlerThread;
-        QueueHandler handler(queue);
-
-        handler.setSession(session);
-        queueHandlerThread.start(handler);
-
-        int flags;
-        int n;
-        do
-        {
-            char buffer[1024];
-            n = ws->receiveFrame(buffer, sizeof(buffer), flags);
-
-            if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
-            {
-                std::string firstLine = getFirstLine(buffer, n);
-                StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-
-                // The only kind of messages a child process receives are the single-line ones (?)
-                assert(firstLine.size() == static_cast<std::string::size_type>(n));
-
-                // Check if it is a "canceltiles" and in that case remove outstanding
-                // "tile" messages from the queue.
-                if (tokens.count() == 1 && tokens[0] == "canceltiles")
-                {
-                    queue.remove_if([](std::string& x) {
-                        return (x.find("tile ") == 0 && x.find("id=") == std::string::npos);
-                    });
-                }
-                else
-                {
-                    queue.put(firstLine);
-                }
-            }
-        }
-        while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE);
-
-        queue.clear();
-        queue.put("eof");
-        queueHandlerThread.join();
-
         // Destroy LibreOfficeKit
         loKit->pClass->destroy(loKit);
     }


More information about the Libreoffice-commits mailing list