[Libreoffice-commits] online.git: loolwsd/LOOLWSD.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Jan 6 06:23:17 PST 2016


 loolwsd/LOOLWSD.cpp |   61 +++++++++++++++++++++++++++++-----------------------
 1 file changed, 35 insertions(+), 26 deletions(-)

New commits:
commit ea1415de75156f8989e8bdcc6cfe7c53e08b3fbc
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Tue Jan 5 22:32:58 2016 -0500

    loolwsd: improved HTTP Request Handler
    
    Change-Id: I1ad4359732c7b5ee9fc8743ebc60e1c94c304dcc
    Reviewed-on: https://gerrit.libreoffice.org/21158
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 5f1ad43..259b97e 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -244,18 +244,17 @@ public:
 // Handler returns false to end.
 void SocketProcessor(std::shared_ptr<WebSocket> ws,
                      HTTPServerResponse& response,
-                     std::function<bool(const char* data, const int size)> handler)
+                     std::function<bool(const char* data, const int size, const bool singleLine)> handler)
 {
+    Log::info("Starting Socket Processor.");
+
     const Poco::Timespan waitTime(POLL_TIMEOUT);
     try
     {
-        // Loop, receiving WebSocket messages either from the client, or from the child
-        // process (to be forwarded to the client).
-        int flags;
-        int n;
+        int flags = 0;
+        int n = 0;
         bool pollTimeout = true;
         ws->setReceiveTimeout(0);
-
         do
         {
             char buffer[200000]; //FIXME: Dynamic?
@@ -270,24 +269,27 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
                 }
                 else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
                 {
-                    n = 1;
+                    continue;
                 }
                 else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
                 {
                     const std::string firstLine = getFirstLine(buffer, n);
                     if (firstLine == "eof")
+                    {
+                        Log::info("Recieved EOF. Finishing.");
                         break;
-
-                    StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+                    }
 
                     if (firstLine.size() == static_cast<std::string::size_type>(n))
                     {
-                        handler(firstLine.c_str(), firstLine.size());
+                        handler(firstLine.c_str(), firstLine.size(), true);
                     }
                     else
                     {
                         // Check if it is a "nextmessage:" and in that case read the large
                         // follow-up message separately, and handle that only.
+                        StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+
                         int size;
                         if (tokens.count() == 2 &&
                             tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
@@ -297,14 +299,20 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
                             n = ws->receiveFrame(largeBuffer, size, flags);
                             if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
                             {
-                                if (!handler(largeBuffer, n))
-                                    n = 0;
+                                if (!handler(largeBuffer, n, false))
+                                {
+                                    Log::info("Socket handler flagged for finishing.");
+                                    break;
+                                }
                             }
                         }
                         else
                         {
-                            if (!handler(buffer, n))
-                                n = 0;
+                            if (!handler(buffer, n, false))
+                            {
+                                Log::info("Socket handler flagged for finishing.");
+                                break;
+                            }
                         }
                     }
                 }
@@ -312,6 +320,8 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
         }
         while (!TerminationFlag &&
                (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)));
+        Log::debug() << "Finishing SocketProcessor. TerminationFlag: " << TerminationFlag
+                     << ", pollTimeout: " << pollTimeout << ", payload size: " << n << Log::end;
     }
     catch (const WebSocketException& exc)
     {
@@ -330,6 +340,8 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
             break;
         }
     }
+
+    Log::info("Finished Socket Processor.");
 }
 
 
@@ -479,14 +491,15 @@ public:
             handler.setSession(session);
             queueHandlerThread.start(handler);
 
-            SocketProcessor(ws, response, [&session, &queue](const char* data, const int size)
+            SocketProcessor(ws, response, [&session, &queue](const char* data, const int size, const bool singleLine)
                 {
-                    const std::string firstLine = getFirstLine(data, size);
-                    if (firstLine == "eof")
-                        return false;
-
-                    if (firstLine.size() == static_cast<std::string::size_type>(size))
+                    // FIXME: There is a race here when a request A gets in the queue and
+                    // is processed _after_ a later request B, because B gets processed
+                    // synchronously and A is waiting in the queue thread.
+                    // Fix is to push everything into the queue.
+                    if (singleLine)
                     {
+                        const std::string firstLine = getFirstLine(data, size);
                         queue.put(firstLine);
                         return true;
                     }
@@ -533,15 +546,11 @@ public:
         {
             auto ws = std::make_shared<WebSocket>(request, response);
 
-            std::string id;
+            const std::string id;
             auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToPrisoner, ws);
 
-            SocketProcessor(ws, response, [&session](const char* data, const int size)
+            SocketProcessor(ws, response, [&session](const char* data, const int size, bool)
                 {
-                    const std::string firstLine = getFirstLine(data, size);
-                    if (firstLine == "eof")
-                        return false;
-
                     return session->handleInput(data, size);
                 });
         }


More information about the Libreoffice-commits mailing list