[Libreoffice-commits] online.git: loolwsd/LOOLWSD.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Wed Jan 6 06:22:13 PST 2016
loolwsd/LOOLWSD.cpp | 275 ++++++++++++++++++++++++++++++----------------------
1 file changed, 161 insertions(+), 114 deletions(-)
New commits:
commit c2c41ceb63fee92b9d958434c843a96ea976c7e1
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Tue Jan 5 21:29:12 2016 -0500
loolwsd: refactored HttpRequestHandler
Change-Id: Ie785d814aff1d28634c8933511c4a5a4a4f5cebc
Reviewed-on: https://gerrit.libreoffice.org/21156
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index cf4658e..5f1ad43 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -240,21 +240,107 @@ public:
}
};
-/// Handle a WebSocket connection or a simple HTTP request.
-class RequestHandler: public HTTPRequestHandler
+// Synchronously process WebSocket requests and dispatch to handler.
+// Handler returns false to end.
+void SocketProcessor(std::shared_ptr<WebSocket> ws,
+ HTTPServerResponse& response,
+ std::function<bool(const char* data, const int size)> handler)
{
-public:
- RequestHandler()
+ const Poco::Timespan waitTime(POLL_TIMEOUT);
+ try
+ {
+ // Loop, receiving WebSocket messages either from the client, or from the child
+ // process (to be forwarded to the client).
+ int flags;
+ int n;
+ bool pollTimeout = true;
+ ws->setReceiveTimeout(0);
+
+ do
+ {
+ char buffer[200000]; //FIXME: Dynamic?
+
+ if ((pollTimeout = ws->poll(waitTime, Socket::SELECT_READ)))
+ {
+ n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+
+ if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
+ {
+ ws->sendFrame("", 0, WebSocket::FRAME_OP_PONG);
+ }
+ else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
+ {
+ n = 1;
+ }
+ else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
+ {
+ const std::string firstLine = getFirstLine(buffer, n);
+ if (firstLine == "eof")
+ break;
+
+ StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+
+ if (firstLine.size() == static_cast<std::string::size_type>(n))
+ {
+ handler(firstLine.c_str(), firstLine.size());
+ }
+ else
+ {
+ // Check if it is a "nextmessage:" and in that case read the large
+ // follow-up message separately, and handle that only.
+ int size;
+ if (tokens.count() == 2 &&
+ tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
+ {
+ char largeBuffer[size]; //FIXME: Security risk! Flooding may segfault us.
+
+ n = ws->receiveFrame(largeBuffer, size, flags);
+ if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
+ {
+ if (!handler(largeBuffer, n))
+ n = 0;
+ }
+ }
+ else
+ {
+ if (!handler(buffer, n))
+ n = 0;
+ }
+ }
+ }
+ }
+ }
+ while (!TerminationFlag &&
+ (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)));
+ }
+ catch (const WebSocketException& exc)
{
+ Log::error("RequestHandler::handleRequest(), WebSocketException: " + exc.message());
+ switch (exc.code())
+ {
+ case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
+ response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
+ // fallthrough
+ case WebSocket::WS_ERR_NO_HANDSHAKE:
+ case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
+ case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
+ response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
+ response.setContentLength(0);
+ response.send();
+ break;
+ }
}
+}
+
+
+/// Handle a public connection from a client.
+class ClientRequestHandler: public HTTPRequestHandler
+{
+public:
void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
{
- std::string thread_name;
- if (request.serverAddress().port() == MASTER_PORT_NUMBER)
- thread_name = "prison_socket";
- else
- thread_name = "client_socket";
+ const std::string thread_name = "client_socket";
#ifdef __linux
if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
@@ -382,118 +468,82 @@ public:
try
{
- try
- {
- auto ws = std::make_shared<WebSocket>(request, response);
+ auto ws = std::make_shared<WebSocket>(request, response);
- LOOLSession::Kind kind;
- std::string id;
+ const std::string id = LOOLWSD::GenSessionId();
+ auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws);
- if (request.getURI() == LOOLWSD::CHILD_URI && request.serverAddress().port() == MASTER_PORT_NUMBER)
- kind = LOOLSession::Kind::ToPrisoner;
- else
+ // For ToClient sessions, we store incoming messages in a queue and have a separate
+ // thread that handles them. This is so that we can empty the queue when we get a
+ // "canceltiles" message.
+ handler.setSession(session);
+ queueHandlerThread.start(handler);
+
+ SocketProcessor(ws, response, [&session, &queue](const char* data, const int size)
{
- kind = LOOLSession::Kind::ToClient;
- id = LOOLWSD::GenSessionId();
- }
+ const std::string firstLine = getFirstLine(data, size);
+ if (firstLine == "eof")
+ return false;
- auto session = std::make_shared<MasterProcessSession>(id, kind, ws);
+ if (firstLine.size() == static_cast<std::string::size_type>(size))
+ {
+ queue.put(firstLine);
+ return true;
+ }
+ else
+ {
+ return session->handleInput(data, size);
+ }
+ });
- // For ToClient sessions, we store incoming messages in a queue and have a separate
- // thread that handles them. This is so that we can empty the queue when we get a
- // "canceltiles" message.
- if (kind == LOOLSession::Kind::ToClient)
- {
- handler.setSession(session);
- queueHandlerThread.start(handler);
- }
+ queue.clear();
+ queue.put("eof");
+ queueHandlerThread.join();
+ }
+ catch (const IOException& exc)
+ {
+ Log::error("IOException: " + exc.message());
+ }
- // Loop, receiving WebSocket messages either from the client, or from the child
- // process (to be forwarded to the client).
- int flags;
- int n;
- bool pollTimeout = true;
- ws->setReceiveTimeout(0);
+ Log::debug("Thread [" + thread_name + "] finished.");
+ }
+};
- do
- {
- char buffer[200000]; //FIXME: Dynamic?
+/// Handle requests from prisoners (internal).
+class PrisonerRequestHandler: public HTTPRequestHandler
+{
+public:
- if ((pollTimeout = ws->poll(waitTime, Socket::SELECT_READ)))
- {
- n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+ void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
+ {
+ assert(request.serverAddress().port() == MASTER_PORT_NUMBER);
+ assert(request.getURI() == LOOLWSD::CHILD_URI);
- if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
- {
- ws->sendFrame("", 0, WebSocket::FRAME_OP_PONG);
- }
- else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
- {
- n = 1;
- }
- else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
- {
- const std::string firstLine = getFirstLine(buffer, n);
- if (firstLine == "eof")
- break;
+ const std::string thread_name = "prison_socket";
+
+#ifdef __linux
+ if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
+ Log::error("Cannot set thread name to " + thread_name + ".");
+#endif
+ Log::debug("Thread [" + thread_name + "] started.");
- StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ Poco::Timespan waitTime(POLL_TIMEOUT);
- if (kind == LOOLSession::Kind::ToClient &&
- firstLine.size() == static_cast<std::string::size_type>(n))
- {
- queue.put(firstLine);
- }
- else
- {
- // Check if it is a "nextmessage:" and in that case read the large
- // follow-up message separately, and handle that only.
- int size;
- if (tokens.count() == 2 &&
- tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
- {
- char largeBuffer[size]; //FIXME: Security risk! Flooding may segfault us.
-
- n = ws->receiveFrame(largeBuffer, size, flags);
- if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
- {
- if (!session->handleInput(largeBuffer, n))
- n = 0;
- }
- }
- else
- {
- if (!session->handleInput(buffer, n))
- n = 0;
- }
- }
- }
- }
- }
- while (!TerminationFlag &&
- (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)));
+ try
+ {
+ auto ws = std::make_shared<WebSocket>(request, response);
- queue.clear();
- queue.put("eof");
- queueHandlerThread.join();
- }
- catch (const WebSocketException& exc)
- {
- Log::error("RequestHandler::handleRequest(), WebSocketException: " + exc.message());
- switch (exc.code())
+ std::string id;
+ auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToPrisoner, ws);
+
+ SocketProcessor(ws, response, [&session](const char* data, const int size)
{
- case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
- response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
- // fallthrough
- case WebSocket::WS_ERR_NO_HANDSHAKE:
- case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
- case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
- response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
- response.setContentLength(0);
- response.send();
- break;
- }
- }
+ const std::string firstLine = getFirstLine(data, size);
+ if (firstLine == "eof")
+ return false;
+
+ return session->handleInput(data, size);
+ });
}
catch (const IOException& exc)
{
@@ -504,13 +554,10 @@ public:
}
};
+template <class RequestHandler>
class RequestHandlerFactory: public HTTPRequestHandlerFactory
{
public:
- RequestHandlerFactory()
- {
- }
-
HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override
{
auto logger = Log::info();
@@ -868,7 +915,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
// Start a server listening on the port for clients
ServerSocket svs(ClientPortNumber, NumPreSpawnedChildren*10);
ThreadPool threadPool(NumPreSpawnedChildren*2, NumPreSpawnedChildren*5);
- HTTPServer srv(new RequestHandlerFactory(), threadPool, svs, new HTTPServerParams);
+ HTTPServer srv(new RequestHandlerFactory<ClientRequestHandler>(), threadPool, svs, new HTTPServerParams);
srv.start();
@@ -876,7 +923,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
SocketAddress addr2("127.0.0.1", MASTER_PORT_NUMBER);
ServerSocket svs2(addr2, NumPreSpawnedChildren);
ThreadPool threadPool2(NumPreSpawnedChildren*2, NumPreSpawnedChildren*5);
- HTTPServer srv2(new RequestHandlerFactory(), threadPool2, svs2, new HTTPServerParams);
+ HTTPServer srv2(new RequestHandlerFactory<PrisonerRequestHandler>(), threadPool2, svs2, new HTTPServerParams);
srv2.start();
More information about the Libreoffice-commits
mailing list