[Libreoffice-commits] online.git: Branch 'private/hcvcastro/forking' - loolwsd/LOOLKit.cpp
Henry Castro
hcastro at collabora.com
Sun Sep 27 10:39:21 PDT 2015
loolwsd/LOOLKit.cpp | 172 +++++++++++++++++++++++++++++++++-------------------
1 file changed, 112 insertions(+), 60 deletions(-)
New commits:
commit 02d114191d23e701282c14ff5cd580eaf42564e8
Author: Henry Castro <hcastro at collabora.com>
Date: Sun Sep 27 13:39:09 2015 -0400
loolwsd: add Connection thread class
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index bacb2dc..ac5ebc3 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -46,6 +46,9 @@ using Poco::StringTokenizer;
using Poco::Exception;
using Poco::Process;
+const int MASTER_PORT_NUMBER = 9981;
+const std::string CHILD_URI = "/loolws/child/";
+
class QueueHandler: public Runnable
{
public:
@@ -80,8 +83,115 @@ private:
tsqueue<std::string>& _queue;
};
-const int MASTER_PORT_NUMBER = 9981;
-const std::string CHILD_URI = "/loolws/child/";
+class Connection: public Runnable
+{
+public:
+ Connection(LibreOfficeKit *loKit, Poco::UInt64 childId, const std::string& threadId) :
+ _loKit(loKit),
+ _childId(childId),
+ _threadId(threadId)
+ {
+ }
+
+ void start()
+ {
+ _thread.start(*this);
+ }
+
+ bool isRunning()
+ {
+ return _thread.isRunning();
+ }
+
+ void run() override
+ {
+#ifdef __linux
+ if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>("lokit_connection"), 0, 0, 0) != 0)
+ std::cout << Util::logPrefix() << "Cannot set thread name :" << strerror(errno) << std::endl;
+#endif
+ try
+ {
+ // Open websocket connection between the child process and the
+ // parent. The parent forwards us requests that it can't handle.
+
+ HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER);
+ cs.setTimeout(0);
+ HTTPRequest request(HTTPRequest::HTTP_GET, CHILD_URI);
+ HTTPResponse response;
+ std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response));
+
+ std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, _loKit));
+ ws->setReceiveTimeout(0);
+
+ // child Jail TID PID
+ std::string hello("child " + std::to_string(_childId) + " " +
+ _threadId + " " + std::to_string(Process::id()));
+ session->sendTextFrame(hello);
+
+ tsqueue<std::string> queue;
+ Thread queueHandlerThread;
+ QueueHandler handler(queue);
+
+ handler.setSession(session);
+ queueHandlerThread.start(handler);
+
+ int flags;
+ int n;
+ do
+ {
+ char buffer[1024];
+ n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+
+ if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
+ {
+ std::string firstLine = getFirstLine(buffer, n);
+ StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+
+ // The only kind of messages a child process receives are the single-line ones (?)
+ assert(firstLine.size() == static_cast<std::string::size_type>(n));
+
+ // Check if it is a "canceltiles" and in that case remove outstanding
+ // "tile" messages from the queue.
+ if (tokens.count() == 1 && tokens[0] == "canceltiles")
+ {
+ queue.remove_if([](std::string& x) {
+ return (x.find("tile ") == 0 && x.find("id=") == std::string::npos);
+ });
+ }
+ else
+ {
+ queue.put(firstLine);
+ }
+ }
+ }
+ while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE);
+
+ queue.clear();
+ queue.put("eof");
+ queueHandlerThread.join();
+ }
+
+ catch (Exception& exc)
+ {
+ std::cout << Util::logPrefix() + "Exception: " + exc.what() << std::endl;
+ }
+ catch (std::exception& exc)
+ {
+ std::cout << Util::logPrefix() + "Exception: " + exc.what() << std::endl;
+ }
+ }
+
+ ~Connection()
+ {
+ //_thread.stop();
+ }
+
+private:
+ LibreOfficeKit *_loKit;
+ Poco::UInt64 _childId;
+ std::string _threadId;
+ Thread _thread;
+};
void run_lok_main(const std::string &loSubPath, Poco::UInt64 _childId)
{
@@ -107,64 +217,6 @@ void run_lok_main(const std::string &loSubPath, Poco::UInt64 _childId)
exit(-1);
}
- // Open websocket connection between the child process and the
- // parent. The parent forwards us requests that it can't handle.
-
- HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER);
- cs.setTimeout(0);
- HTTPRequest request(HTTPRequest::HTTP_GET, CHILD_URI);
- HTTPResponse response;
- std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response));
-
- std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, loKit));
-
- ws->setReceiveTimeout(0);
-
- std::string hello("child " + std::to_string(_childId) + " " + std::to_string(Process::id()));
- session->sendTextFrame(hello);
-
- tsqueue<std::string> queue;
- Thread queueHandlerThread;
- QueueHandler handler(queue);
-
- handler.setSession(session);
- queueHandlerThread.start(handler);
-
- int flags;
- int n;
- do
- {
- char buffer[1024];
- n = ws->receiveFrame(buffer, sizeof(buffer), flags);
-
- if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
- {
- std::string firstLine = getFirstLine(buffer, n);
- StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-
- // The only kind of messages a child process receives are the single-line ones (?)
- assert(firstLine.size() == static_cast<std::string::size_type>(n));
-
- // Check if it is a "canceltiles" and in that case remove outstanding
- // "tile" messages from the queue.
- if (tokens.count() == 1 && tokens[0] == "canceltiles")
- {
- queue.remove_if([](std::string& x) {
- return (x.find("tile ") == 0 && x.find("id=") == std::string::npos);
- });
- }
- else
- {
- queue.put(firstLine);
- }
- }
- }
- while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE);
-
- queue.clear();
- queue.put("eof");
- queueHandlerThread.join();
-
// Destroy LibreOfficeKit
loKit->pClass->destroy(loKit);
}
More information about the Libreoffice-commits
mailing list