[Libreoffice-commits] online.git: loolwsd/LOOLWSD.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Jan 6 06:22:13 PST 2016


 loolwsd/LOOLWSD.cpp |  275 ++++++++++++++++++++++++++++++----------------------
 1 file changed, 161 insertions(+), 114 deletions(-)

New commits:
commit c2c41ceb63fee92b9d958434c843a96ea976c7e1
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Tue Jan 5 21:29:12 2016 -0500

    loolwsd: refactored HttpRequestHandler
    
    Change-Id: Ie785d814aff1d28634c8933511c4a5a4a4f5cebc
    Reviewed-on: https://gerrit.libreoffice.org/21156
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index cf4658e..5f1ad43 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -240,21 +240,107 @@ public:
     }
 };
 
-/// Handle a WebSocket connection or a simple HTTP request.
-class RequestHandler: public HTTPRequestHandler
+// Synchronously process WebSocket requests and dispatch to handler.
+// Handler returns false to end.
+void SocketProcessor(std::shared_ptr<WebSocket> ws,
+                     HTTPServerResponse& response,
+                     std::function<bool(const char* data, const int size)> handler)
 {
-public:
-    RequestHandler()
+    const Poco::Timespan waitTime(POLL_TIMEOUT);
+    try
+    {
+        // Loop, receiving WebSocket messages either from the client, or from the child
+        // process (to be forwarded to the client).
+        int flags;
+        int n;
+        bool pollTimeout = true;
+        ws->setReceiveTimeout(0);
+
+        do
+        {
+            char buffer[200000]; //FIXME: Dynamic?
+
+            if ((pollTimeout = ws->poll(waitTime, Socket::SELECT_READ)))
+            {
+                n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+
+                if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
+                {
+                    ws->sendFrame("", 0, WebSocket::FRAME_OP_PONG);
+                }
+                else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
+                {
+                    n = 1;
+                }
+                else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
+                {
+                    const std::string firstLine = getFirstLine(buffer, n);
+                    if (firstLine == "eof")
+                        break;
+
+                    StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+
+                    if (firstLine.size() == static_cast<std::string::size_type>(n))
+                    {
+                        handler(firstLine.c_str(), firstLine.size());
+                    }
+                    else
+                    {
+                        // Check if it is a "nextmessage:" and in that case read the large
+                        // follow-up message separately, and handle that only.
+                        int size;
+                        if (tokens.count() == 2 &&
+                            tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
+                        {
+                            char largeBuffer[size];     //FIXME: Security risk! Flooding may segfault us.
+
+                            n = ws->receiveFrame(largeBuffer, size, flags);
+                            if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
+                            {
+                                if (!handler(largeBuffer, n))
+                                    n = 0;
+                            }
+                        }
+                        else
+                        {
+                            if (!handler(buffer, n))
+                                n = 0;
+                        }
+                    }
+                }
+            }
+        }
+        while (!TerminationFlag &&
+               (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)));
+    }
+    catch (const WebSocketException& exc)
     {
+        Log::error("RequestHandler::handleRequest(), WebSocketException: " + exc.message());
+        switch (exc.code())
+        {
+        case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
+            response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
+            // fallthrough
+        case WebSocket::WS_ERR_NO_HANDSHAKE:
+        case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
+        case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
+            response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
+            response.setContentLength(0);
+            response.send();
+            break;
+        }
     }
+}
+
+
+/// Handle a public connection from a client.
+class ClientRequestHandler: public HTTPRequestHandler
+{
+public:
 
     void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
     {
-        std::string thread_name;
-        if (request.serverAddress().port() == MASTER_PORT_NUMBER)
-            thread_name = "prison_socket";
-        else
-            thread_name = "client_socket";
+        const std::string thread_name = "client_socket";
 
 #ifdef __linux
         if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
@@ -382,118 +468,82 @@ public:
 
         try
         {
-            try
-            {
-                auto ws = std::make_shared<WebSocket>(request, response);
+            auto ws = std::make_shared<WebSocket>(request, response);
 
-                LOOLSession::Kind kind;
-                std::string id;
+            const std::string id = LOOLWSD::GenSessionId();
+            auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws);
 
-                if (request.getURI() == LOOLWSD::CHILD_URI && request.serverAddress().port() == MASTER_PORT_NUMBER)
-                    kind = LOOLSession::Kind::ToPrisoner;
-                else
+            // For ToClient sessions, we store incoming messages in a queue and have a separate
+            // thread that handles them. This is so that we can empty the queue when we get a
+            // "canceltiles" message.
+            handler.setSession(session);
+            queueHandlerThread.start(handler);
+
+            SocketProcessor(ws, response, [&session, &queue](const char* data, const int size)
                 {
-                    kind = LOOLSession::Kind::ToClient;
-                    id = LOOLWSD::GenSessionId();
-                }
+                    const std::string firstLine = getFirstLine(data, size);
+                    if (firstLine == "eof")
+                        return false;
 
-                auto session = std::make_shared<MasterProcessSession>(id, kind, ws);
+                    if (firstLine.size() == static_cast<std::string::size_type>(size))
+                    {
+                        queue.put(firstLine);
+                        return true;
+                    }
+                    else
+                    {
+                        return session->handleInput(data, size);
+                    }
+                });
 
-                // For ToClient sessions, we store incoming messages in a queue and have a separate
-                // thread that handles them. This is so that we can empty the queue when we get a
-                // "canceltiles" message.
-                if (kind == LOOLSession::Kind::ToClient)
-                {
-                    handler.setSession(session);
-                    queueHandlerThread.start(handler);
-                }
+            queue.clear();
+            queue.put("eof");
+            queueHandlerThread.join();
+        }
+        catch (const IOException& exc)
+        {
+            Log::error("IOException: " + exc.message());
+        }
 
-                // Loop, receiving WebSocket messages either from the client, or from the child
-                // process (to be forwarded to the client).
-                int flags;
-                int n;
-                bool pollTimeout = true;
-                ws->setReceiveTimeout(0);
+        Log::debug("Thread [" + thread_name + "] finished.");
+    }
+};
 
-                do
-                {
-                    char buffer[200000]; //FIXME: Dynamic?
+/// Handle requests from prisoners (internal).
+class PrisonerRequestHandler: public HTTPRequestHandler
+{
+public:
 
-                    if ((pollTimeout = ws->poll(waitTime, Socket::SELECT_READ)))
-                    {
-                        n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+    void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
+    {
+        assert(request.serverAddress().port() == MASTER_PORT_NUMBER);
+        assert(request.getURI() == LOOLWSD::CHILD_URI);
 
-                        if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
-                        {
-                            ws->sendFrame("", 0, WebSocket::FRAME_OP_PONG);
-                        }
-                        else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
-                        {
-                            n = 1;
-                        }
-                        else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
-                        {
-                            const std::string firstLine = getFirstLine(buffer, n);
-                            if (firstLine == "eof")
-                                break;
+        const std::string thread_name = "prison_socket";
+
+#ifdef __linux
+        if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
+            Log::error("Cannot set thread name to " + thread_name + ".");
+#endif
+        Log::debug("Thread [" + thread_name + "] started.");
 
-                            StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+        Poco::Timespan waitTime(POLL_TIMEOUT);
 
-                            if (kind == LOOLSession::Kind::ToClient &&
-                                firstLine.size() == static_cast<std::string::size_type>(n))
-                            {
-                                queue.put(firstLine);
-                            }
-                            else
-                            {
-                                // Check if it is a "nextmessage:" and in that case read the large
-                                // follow-up message separately, and handle that only.
-                                int size;
-                                if (tokens.count() == 2 &&
-                                    tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
-                                {
-                                    char largeBuffer[size];     //FIXME: Security risk! Flooding may segfault us.
-
-                                    n = ws->receiveFrame(largeBuffer, size, flags);
-                                    if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
-                                    {
-                                        if (!session->handleInput(largeBuffer, n))
-                                            n = 0;
-                                    }
-                                }
-                                else
-                                {
-                                    if (!session->handleInput(buffer, n))
-                                        n = 0;
-                                }
-                            }
-                        }
-                    }
-                }
-                while (!TerminationFlag &&
-                       (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)));
+        try
+        {
+            auto ws = std::make_shared<WebSocket>(request, response);
 
-                queue.clear();
-                queue.put("eof");
-                queueHandlerThread.join();
-            }
-            catch (const WebSocketException& exc)
-            {
-                Log::error("RequestHandler::handleRequest(), WebSocketException: " + exc.message());
-                switch (exc.code())
+            std::string id;
+            auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToPrisoner, ws);
+
+            SocketProcessor(ws, response, [&session](const char* data, const int size)
                 {
-                case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
-                    response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
-                    // fallthrough
-                case WebSocket::WS_ERR_NO_HANDSHAKE:
-                case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
-                case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
-                    response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
-                    response.setContentLength(0);
-                    response.send();
-                    break;
-                }
-            }
+                    const std::string firstLine = getFirstLine(data, size);
+                    if (firstLine == "eof")
+                        return false;
+
+                    return session->handleInput(data, size);
+                });
         }
         catch (const IOException& exc)
         {
@@ -504,13 +554,10 @@ public:
     }
 };
 
+template <class RequestHandler>
 class RequestHandlerFactory: public HTTPRequestHandlerFactory
 {
 public:
-    RequestHandlerFactory()
-    {
-    }
-
     HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override
     {
         auto logger = Log::info();
@@ -868,7 +915,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
     // Start a server listening on the port for clients
     ServerSocket svs(ClientPortNumber, NumPreSpawnedChildren*10);
     ThreadPool threadPool(NumPreSpawnedChildren*2, NumPreSpawnedChildren*5);
-    HTTPServer srv(new RequestHandlerFactory(), threadPool, svs, new HTTPServerParams);
+    HTTPServer srv(new RequestHandlerFactory<ClientRequestHandler>(), threadPool, svs, new HTTPServerParams);
 
     srv.start();
 
@@ -876,7 +923,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
     SocketAddress addr2("127.0.0.1", MASTER_PORT_NUMBER);
     ServerSocket svs2(addr2, NumPreSpawnedChildren);
     ThreadPool threadPool2(NumPreSpawnedChildren*2, NumPreSpawnedChildren*5);
-    HTTPServer srv2(new RequestHandlerFactory(), threadPool2, svs2, new HTTPServerParams);
+    HTTPServer srv2(new RequestHandlerFactory<PrisonerRequestHandler>(), threadPool2, svs2, new HTTPServerParams);
 
     srv2.start();
 


More information about the Libreoffice-commits mailing list