[Libreoffice-commits] online.git: loolwsd/IoUtil.cpp loolwsd/IoUtil.hpp loolwsd/LOOLWSD.cpp loolwsd/Makefile.am

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Mar 30 01:45:38 UTC 2016


 loolwsd/IoUtil.cpp  |  165 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 loolwsd/IoUtil.hpp  |    9 ++
 loolwsd/LOOLWSD.cpp |  158 +++----------------------------------------------
 loolwsd/Makefile.am |    2 
 4 files changed, 186 insertions(+), 148 deletions(-)

New commits:
commit b25fe9d88a86bc247f1f3aedbbf7a51c5193d259
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Mar 27 16:06:22 2016 -0400

    loolwsd: moved SocketProcessor to IoUtil and generalized more
    
    Change-Id: I527e57d2430e21249cf8cd4867f22fdbbd092b09
    Reviewed-on: https://gerrit.libreoffice.org/23637
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp
index 7284eeb..cf46e7e 100644
--- a/loolwsd/IoUtil.cpp
+++ b/loolwsd/IoUtil.cpp
@@ -18,16 +18,179 @@
 #include <sstream>
 #include <string>
 
-#include <Poco/Exception.h>
+#include <Poco/StringTokenizer.h>
+#include <Poco/Net/HTTPServerResponse.h>
 #include <Poco/Net/WebSocket.h>
+#include <Poco/Net/NetException.h>
 #include <Poco/Thread.h>
 
 #include "Common.hpp"
+#include "LOOLProtocol.hpp"
 #include "IoUtil.hpp"
 #include "Util.hpp"
 
 namespace IoUtil
 {
+using Poco::Net::WebSocket;
+using Poco::Net::WebSocketException;
+
+// Synchronously process WebSocket requests and dispatch to handler.
+// Handler returns false to end.
+void SocketProcessor(std::shared_ptr<WebSocket> ws,
+                     Poco::Net::HTTPServerResponse& response,
+                     std::function<bool(const std::vector<char>&)> handler,
+                     std::function<bool()> stopPredicate,
+                     std::string name,
+                     const size_t pollTimeoutMs)
+{
+    if (!name.empty())
+    {
+        name = "[" + name + "] ";
+    }
+
+    Log::info(name + "Starting Socket Processor.");
+
+    // Timeout given is in microseconds.
+    const Poco::Timespan waitTime(pollTimeoutMs * 1000);
+    try
+    {
+        ws->setReceiveTimeout(0);
+
+        int flags = 0;
+        int n = 0;
+        bool stop = false;
+        std::vector<char> payload(READ_BUFFER_SIZE * 100);
+
+        for (;;)
+        {
+            stop = stopPredicate();
+            if (stop)
+            {
+                Log::info(name + "Termination flagged. Finishing.");
+                break;
+            }
+
+            if (!ws->poll(waitTime, Poco::Net::Socket::SELECT_READ))
+            {
+                // Wait some more.
+                continue;
+            }
+
+            payload.resize(payload.capacity());
+            n = ws->receiveFrame(payload.data(), payload.capacity(), flags);
+            if (n >= 0)
+            {
+                payload.resize(n);
+            }
+
+            if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
+            {
+                // Echo back the ping payload as pong.
+                // Technically, we should send back a PONG control frame.
+                // However Firefox (probably) or Node.js (possibly) doesn't
+                // like that and closes the socket when we do.
+                // Echoing the payload as a normal frame works with Firefox.
+                ws->sendFrame(payload.data(), n /*, WebSocket::FRAME_OP_PONG*/);
+                continue;
+            }
+            else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
+            {
+                // In case we do send pings in the future.
+                continue;
+            }
+            else if (n <= 0 || ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE))
+            {
+                Log::warn(name + "Connection closed.");
+                break;
+            }
+
+            assert(n > 0);
+
+            const std::string firstLine = LOOLProtocol::getFirstLine(payload);
+            if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN)
+            {
+                // One WS message split into multiple frames.
+                while (true)
+                {
+                    char buffer[READ_BUFFER_SIZE * 10];
+                    n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+                    if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
+                    {
+                        break;
+                    }
+
+                    payload.insert(payload.end(), buffer, buffer + n);
+                    if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN)
+                    {
+                        // No more frames.
+                        break;
+                    }
+                }
+            }
+            else
+            {
+                int size = 0;
+                Poco::StringTokenizer tokens(firstLine, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM);
+                if (tokens.count() == 2 &&
+                    tokens[0] == "nextmessage:" && LOOLProtocol::getTokenInteger(tokens[1], "size", size) && size > 0)
+                {
+                    // Check if it is a "nextmessage:" and in that case read the large
+                    // follow-up message separately, and handle that only.
+                    payload.resize(size);
+
+                    n = ws->receiveFrame(payload.data(), size, flags);
+                }
+            }
+
+            if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
+            {
+                Log::warn(name + "Connection closed.");
+                break;
+            }
+
+            if (firstLine == "eof")
+            {
+                Log::info(name + "Received EOF. Finishing.");
+                break;
+            }
+
+            // Call the handler.
+            if (!handler(payload))
+            {
+                Log::info(name + "Socket handler flagged to finish.");
+                break;
+            }
+        }
+
+        Log::debug() << name << "Finishing SocketProcessor. TerminationFlag: " << stop
+                     << ", payload size: " << payload.size()
+                     << ", flags: " << std::hex << flags << Log::end;
+        if (!payload.empty())
+        {
+            Log::warn(name + "Last message will not be processed: [" +
+                      LOOLProtocol::getAbbreviatedMessage(payload.data(), payload.size()) + "].");
+        }
+    }
+    catch (const WebSocketException& exc)
+    {
+        Log::error("SocketProcessor: WebSocketException: " + exc.message());
+        switch (exc.code())
+        {
+        case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
+            response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
+            // fallthrough
+        case WebSocket::WS_ERR_NO_HANDSHAKE:
+        case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
+        case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
+            response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST);
+            response.setContentLength(0);
+            response.send();
+            break;
+        }
+    }
+
+    Log::info(name + "Finished Socket Processor.");
+}
 
 void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws)
 {
diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp
index da3db34..1647e29 100644
--- a/loolwsd/IoUtil.hpp
+++ b/loolwsd/IoUtil.hpp
@@ -21,6 +21,15 @@
 
 namespace IoUtil
 {
+    /// Synchronously process WebSocket requests and dispatch to handler.
+    //. Handler returns false to end.
+    void SocketProcessor(std::shared_ptr<Poco::Net::WebSocket> ws,
+                         Poco::Net::HTTPServerResponse& response,
+                         std::function<bool(const std::vector<char>&)> handler,
+                         std::function<bool()> stopPredicate,
+                         std::string name = std::string(),
+                         const size_t pollTimeoutMs = POLL_TIMEOUT_MS);
+
     /// Call WebSocket::shutdown() ignoring Poco::IOException.
     void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws);
 
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 5003a65..650b1e1 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -158,7 +158,6 @@ using Poco::TemporaryFile;
 using Poco::Thread;
 using Poco::ThreadLocal;
 using Poco::ThreadPool;
-using Poco::Timespan;
 using Poco::URI;
 using Poco::Util::Application;
 using Poco::Util::HelpFormatter;
@@ -218,147 +217,6 @@ public:
     }
 };
 
-// Synchronously process WebSocket requests and dispatch to handler.
-// Handler returns false to end.
-void SocketProcessor(std::shared_ptr<WebSocket> ws,
-                     HTTPServerResponse& response,
-                     std::function<bool(const std::vector<char>&)> handler)
-{
-    Log::info("Starting Socket Processor.");
-
-    const Timespan waitTime(POLL_TIMEOUT_MS * 1000);
-    try
-    {
-        ws->setReceiveTimeout(0);
-
-        int flags = 0;
-        int n = 0;
-        std::vector<char> payload(READ_BUFFER_SIZE * 100);
-
-        while (!TerminationFlag &&
-               (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
-        {
-            if (!ws->poll(waitTime, Socket::SELECT_READ))
-            {
-                // Wait some more.
-                continue;
-            }
-
-            payload.resize(payload.capacity());
-            n = ws->receiveFrame(payload.data(), payload.capacity(), flags);
-            if (n >= 0)
-            {
-                payload.resize(n);
-            }
-
-            if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
-            {
-                // Echo back the ping payload as pong.
-                // Technically, we should send back a PONG control frame.
-                // However Firefox (probably) or Node.js (possibly) doesn't
-                // like that and closes the socket when we do.
-                // Echoing the payload as a normal frame works with Firefox.
-                ws->sendFrame(payload.data(), n /*, WebSocket::FRAME_OP_PONG*/);
-            }
-            else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
-            {
-                // In case we do send pings in the future.
-            }
-            else if (n <= 0 || ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE))
-            {
-                // Connection closed.
-                Log::warn() << "Received " << n
-                            << " bytes. Connection closed. Flags: "
-                            << std::hex << flags << Log::end;
-                break;
-            }
-
-            assert(n > 0);
-
-            const std::string firstLine = LOOLProtocol::getFirstLine(payload);
-            if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN)
-            {
-                // One WS message split into multiple frames.
-                while (true)
-                {
-                    char buffer[READ_BUFFER_SIZE * 10];
-                    n = ws->receiveFrame(buffer, sizeof(buffer), flags);
-                    if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
-                    {
-                        break;
-                    }
-
-                    payload.insert(payload.end(), buffer, buffer + n);
-                    if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN)
-                    {
-                        // No more frames.
-                        break;
-                    }
-                }
-            }
-            else
-            {
-                int size = 0;
-                StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-                if (tokens.count() == 2 &&
-                    tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
-                {
-                    // Check if it is a "nextmessage:" and in that case read the large
-                    // follow-up message separately, and handle that only.
-                    payload.resize(size);
-
-                    n = ws->receiveFrame(payload.data(), size, flags);
-                }
-            }
-
-            if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
-            {
-                break;
-            }
-
-            if (firstLine == "eof")
-            {
-                Log::info("Received EOF. Finishing.");
-                break;
-            }
-
-            // Call the handler.
-            if (!handler(payload))
-            {
-                Log::info("Socket handler flagged for finishing.");
-            }
-        }
-
-        Log::debug() << "Finishing SocketProcessor. TerminationFlag: " << TerminationFlag
-                     << ", payload size: " << payload.size()
-                     << ", flags: " << std::hex << flags << Log::end;
-        if (!payload.empty())
-        {
-            Log::warn("Last message will not be processed: [" + getAbbreviatedMessage(payload.data(), payload.size()) + "].");
-        }
-    }
-    catch (const WebSocketException& exc)
-    {
-        Log::error("SocketProcessor: WebSocketException: " + exc.message());
-        switch (exc.code())
-        {
-        case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
-            response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
-            // fallthrough
-        case WebSocket::WS_ERR_NO_HANDSHAKE:
-        case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
-        case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
-            response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
-            response.setContentLength(0);
-            response.send();
-            break;
-        }
-    }
-
-    Log::info("Finished Socket Processor.");
-}
-
-
 /// Handle a public connection from a client.
 class ClientRequestHandler: public HTTPRequestHandler
 {
@@ -615,7 +473,8 @@ private:
         queueHandlerThread.start(handler);
         bool normalShutdown = false;
 
-        SocketProcessor(ws, response, [&session, &queue, &normalShutdown](const std::vector<char>& payload)
+        IoUtil::SocketProcessor(ws, response,
+                [&session, &queue, &normalShutdown](const std::vector<char>& payload)
             {
                 time(&session->_lastMessageTime);
                 const auto token = LOOLProtocol::getFirstToken(payload);
@@ -629,7 +488,10 @@ private:
                 }
 
                 return true;
-            });
+            },
+            []() { return TerminationFlag; },
+            "Client_ws_" + id
+            );
 
         if (docBroker->getSessionsCount() == 1 && !normalShutdown)
         {
@@ -826,10 +688,14 @@ public:
             lock.unlock();
             MasterProcessSession::AvailableChildSessionCV.notify_one();
 
-            SocketProcessor(ws, response, [&session](const std::vector<char>& payload)
+            IoUtil::SocketProcessor(ws, response,
+                    [&session](const std::vector<char>& payload)
                 {
                     return session->handleInput(payload.data(), payload.size());
-                });
+                },
+                []() { return TerminationFlag; },
+                "Child_ws_" + sessionId
+                );
         }
         catch (const Exception& exc)
         {
diff --git a/loolwsd/Makefile.am b/loolwsd/Makefile.am
index fe54269..458ea85 100644
--- a/loolwsd/Makefile.am
+++ b/loolwsd/Makefile.am
@@ -20,7 +20,7 @@ loadtest_SOURCES = LoadTest.cpp Util.cpp LOOLProtocol.cpp
 
 connect_SOURCES = Connect.cpp Util.cpp LOOLProtocol.cpp
 
-lokitclient_SOURCES = LOKitClient.cpp IoUtil.cpp Util.cpp
+lokitclient_SOURCES = LOKitClient.cpp LOOLProtocol.cpp IoUtil.cpp Util.cpp
 
 broker_shared_sources = ChildProcessSession.cpp $(shared_sources)
 


More information about the Libreoffice-commits mailing list