[Libreoffice-commits] online.git: loolwsd/LOOLWSD.cpp loolwsd/MasterProcessSession.cpp loolwsd/MessageQueue.hpp loolwsd/QueueHandler.hpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Tue Mar 29 02:47:45 UTC 2016


 loolwsd/LOOLWSD.cpp              |   32 +++++++++++---------------------
 loolwsd/MasterProcessSession.cpp |    3 ++-
 loolwsd/MessageQueue.hpp         |    6 +++++-
 loolwsd/QueueHandler.hpp         |    8 +++++---
 4 files changed, 23 insertions(+), 26 deletions(-)

New commits:
commit 3217b4592a6e196486440c2149132233b79d5982
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sat Mar 26 22:56:10 2016 -0400

    loolwsd: queue payload changed to vector<char>
    
    All messages now pass through the queue.
    This resolves a race between single-line
    messages and multi-line ones.
    
    Previously, single-line messages were
    processed on the queue (on a background
    thread) while multi-line ones were handled
    immediatly. This resulted in order-inversion
    due to a race between the queue thread and the
    next multi-line message, which caused stability
    issues every so often.
    
    Change-Id: Ia220791d1d75c4f3e3e0965dd0c6f81bae63a296
    Reviewed-on: https://gerrit.libreoffice.org/23583
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 749b2e2..b4cc910 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -221,7 +221,7 @@ public:
 // Handler returns false to end.
 void SocketProcessor(std::shared_ptr<WebSocket> ws,
                      HTTPServerResponse& response,
-                     std::function<bool(const char* data, const int size)> handler)
+                     std::function<bool(const std::vector<char>&)> handler)
 {
     Log::info("Starting Socket Processor.");
 
@@ -322,7 +322,7 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
             }
 
             // Call the handler.
-            if (!handler(payload.data(), payload.size()))
+            if (!handler(payload))
             {
                 Log::info("Socket handler flagged for finishing.");
             }
@@ -614,30 +614,20 @@ private:
         queueHandlerThread.start(handler);
         bool normalShutdown = false;
 
-        SocketProcessor(ws, response, [&session, &queue, &normalShutdown](const char* data, const int size, const bool singleLine)
+        SocketProcessor(ws, response, [&session, &queue, &normalShutdown](const std::vector<char>& payload)
             {
-                // FIXME: There is a race here when a request A gets in the queue and
-                // is processed _after_ a later request B, because B gets processed
-                // synchronously and A is waiting in the queue thread.
-                // The fix is to push everything into the queue
-                // (i.e. change MessageQueue to vector<char>).
-                const std::string firstLine = getFirstLine(data, size);
+                const auto firstLine = getFirstLine(payload.data(), payload.size());
                 time(&session->_lastMessageTime);
-                if (singleLine || firstLine.find("paste") == 0)
+                if (firstLine.compare(0, 10, "disconnect") == 0) // starts with "disconnect"
                 {
-                    if (firstLine.compare(0, 10, "disconnect") == 0) // starts with "disconnect"
-                    {
-                        normalShutdown = true;
-                        return true;
-                    }
-
-                    queue.put(std::string(data, size));
-                    return true;
+                    normalShutdown = true;
                 }
                 else
                 {
-                    return session->handleInput(data, size);
+                    queue.put(payload);
                 }
+
+                return true;
             });
 
         if (docBroker->getSessionsCount() == 1 && !normalShutdown)
@@ -831,9 +821,9 @@ public:
             lock.unlock();
             MasterProcessSession::AvailableChildSessionCV.notify_one();
 
-            SocketProcessor(ws, response, [&session](const char* data, const int size, bool)
+            SocketProcessor(ws, response, [&session](const std::vector<char>& payload)
                 {
-                    return session->handleInput(data, size);
+                    return session->handleInput(payload.data(), payload.size());
                 });
         }
         catch (const Exception& exc)
diff --git a/loolwsd/MasterProcessSession.cpp b/loolwsd/MasterProcessSession.cpp
index 62e4d6f..0fbf70c 100644
--- a/loolwsd/MasterProcessSession.cpp
+++ b/loolwsd/MasterProcessSession.cpp
@@ -500,7 +500,8 @@ bool MasterProcessSession::getPartPageRectangles(const char *buffer, int length)
 
 std::string MasterProcessSession::getSaveAs()
 {
-    return _saveAsQueue.get();
+    const auto payload = _saveAsQueue.get();
+    return std::string(payload.data(), payload.size());
 }
 
 void MasterProcessSession::sendFontRendering(const char *buffer, int length, StringTokenizer& tokens)
diff --git a/loolwsd/MessageQueue.hpp b/loolwsd/MessageQueue.hpp
index 2d62173..5a0c439 100644
--- a/loolwsd/MessageQueue.hpp
+++ b/loolwsd/MessageQueue.hpp
@@ -21,7 +21,7 @@ class MessageQueue
 {
 public:
 
-    typedef std::string Payload;
+    typedef std::vector<char> Payload;
 
     MessageQueue()
     {
@@ -34,6 +34,10 @@ public:
 
     /// Thread safe insert the message.
     void put(const Payload& value);
+    void put(const std::string& value)
+    {
+        put(Payload(value.data(), value.data() + value.size()));
+    }
 
     /// Thread safe obtaining of the message.
     Payload get();
diff --git a/loolwsd/QueueHandler.hpp b/loolwsd/QueueHandler.hpp
index 4f3d612..833c331 100644
--- a/loolwsd/QueueHandler.hpp
+++ b/loolwsd/QueueHandler.hpp
@@ -11,6 +11,7 @@
 
 #include "MessageQueue.hpp"
 #include "LOOLSession.hpp"
+#include "LOOLProtocol.hpp"
 #include "Util.hpp"
 
 /// This thread handles incoming messages on a given kit instance.
@@ -36,14 +37,15 @@ public:
         {
             while (true)
             {
-                const std::string input = _queue.get();
-                if (input == "eof")
+                const auto input = _queue.get();
+                const auto firstLine = LOOLProtocol::getFirstLine(input.data(), input.size());
+                if (firstLine == "eof")
                 {
                     Log::info("Received EOF. Finishing.");
                     break;
                 }
 
-                if (!_session->handleInput(input.c_str(), input.size()))
+                if (!_session->handleInput(input.data(), input.size()))
                 {
                     Log::info("Socket handler flagged for finishing.");
                     break;


More information about the Libreoffice-commits mailing list