[Libreoffice-commits] online.git: Branch 'distro/collabora/collabora-online-2-0' - common/Protocol.hpp test/TileQueueTests.cpp test/WhiteBoxTests.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp wsd/TileDesc.hpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Dec 19 12:00:44 UTC 2016


 common/Protocol.hpp     |   74 ++++++++++++++++++++--
 test/TileQueueTests.cpp |  155 ++++++++++++++++++++++++++++++++++++++++++++++
 test/WhiteBoxTests.cpp  |  101 ++++++++++++++++++++++++++++++
 wsd/ClientSession.hpp   |    8 --
 wsd/DocumentBroker.cpp  |    4 -
 wsd/SenderQueue.hpp     |  159 ++++++++++++++++++++++++++++++++++++++++++++++--
 wsd/TileCache.cpp       |   25 +++----
 wsd/TileDesc.hpp        |   13 +++
 8 files changed, 503 insertions(+), 36 deletions(-)

New commits:
commit 8b59e78ad0b44a8f98ce18c8529e00227d0b357c
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sat Dec 17 14:28:11 2016 -0500

    wsd: message deduplication in SenderQueue
    
    Squashed multiple commits from master.
    
    Change-Id: I0ad9ef7bf826b3fd0eba9cc17ec5212a3334a2f5
    Reviewed-on: https://gerrit.libreoffice.org/32164
    Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
    Tested-by: Michael Meeks <michael.meeks at collabora.com>

diff --git a/common/Protocol.hpp b/common/Protocol.hpp
index e95090d..67f7ec7 100644
--- a/common/Protocol.hpp
+++ b/common/Protocol.hpp
@@ -77,16 +77,65 @@ namespace LOOLProtocol
     // Functions that parse messages. All return false if parsing fails
     bool parseStatus(const std::string& message, LibreOfficeKitDocumentType& type, int& nParts, int& currentPart, int& width, int& height);
 
+    /// Tokenize space-delimited values until we hit new-line or the end.
     inline
-    std::string getDelimitedInitialSubstring(const char *message, const int length, const char delim)
+    std::vector<std::string> tokenize(const char* data, const size_t size)
     {
-        if (message == nullptr || length <= 0)
+        std::vector<std::string> tokens;
+        if (size == 0 || data == nullptr)
+        {
+            return tokens;
+        }
+
+        const char* start = data;
+        const char* end = data;
+        for (size_t i = 0; i < size && data[i] != '\n'; ++i, ++end)
+        {
+            if (data[i] == ' ')
+            {
+                if (start != end && *start != ' ')
+                {
+                    tokens.emplace_back(start, end);
+                }
+
+                start = end;
+            }
+            else if (*start == ' ')
+            {
+                ++start;
+            }
+        }
+
+        if (start != end && *start != ' ' && *start != '\n')
         {
-            return "";
+            tokens.emplace_back(start, end);
         }
 
-        const char *founddelim = static_cast<const char *>(std::memchr(message, delim, length));
-        const auto size = (founddelim == nullptr ? length : founddelim - message);
+        return tokens;
+    }
+
+    inline
+    std::vector<std::string> tokenize(const std::string& s)
+    {
+        return tokenize(s.data(), s.size());
+    }
+
+    inline size_t getDelimiterPosition(const char* message, const int length, const char delim)
+    {
+        if (message && length > 0)
+        {
+            const char *founddelim = static_cast<const char *>(std::memchr(message, delim, length));
+            const auto size = (founddelim == nullptr ? length : founddelim - message);
+            return size;
+        }
+
+        return 0;
+    }
+
+    inline
+    std::string getDelimitedInitialSubstring(const char *message, const int length, const char delim)
+    {
+        const auto size = getDelimiterPosition(message, length, delim);
         return std::string(message, size);
     }
 
@@ -170,7 +219,7 @@ namespace LOOLProtocol
     {
         if (message == nullptr || length <= 0)
         {
-            return "";
+            return std::string();
         }
 
         const auto firstLine = getFirstLine(message, std::min(length, 120));
@@ -184,6 +233,19 @@ namespace LOOLProtocol
         return firstLine;
     }
 
+    inline std::string getAbbreviatedMessage(const std::string& message)
+    {
+        const auto pos = getDelimiterPosition(message.data(), std::min(message.size(), 120UL), '\n');
+
+        // If first line is less than the length (minus newline), add ellipsis.
+        if (pos < static_cast<std::string::size_type>(message.size()) - 1)
+        {
+            return message.substr(0, pos) + "...";
+        }
+
+        return message;
+    }
+
     template <typename T>
     std::string getAbbreviatedMessage(const T& message)
     {
diff --git a/test/TileQueueTests.cpp b/test/TileQueueTests.cpp
index dea692b..4587cc4 100644
--- a/test/TileQueueTests.cpp
+++ b/test/TileQueueTests.cpp
@@ -14,6 +14,7 @@
 #include "Common.hpp"
 #include "Protocol.hpp"
 #include "MessageQueue.hpp"
+#include "SenderQueue.hpp"
 #include "Util.hpp"
 
 namespace CPPUNIT_NS
@@ -44,6 +45,9 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture
     CPPUNIT_TEST(testTileRecombining);
     CPPUNIT_TEST(testViewOrder);
     CPPUNIT_TEST(testPreviewsDeprioritization);
+    CPPUNIT_TEST(testSenderQueue);
+    CPPUNIT_TEST(testSenderQueueTileDeduplication);
+    CPPUNIT_TEST(testInvalidateViewCursorDeduplication);
 
     CPPUNIT_TEST_SUITE_END();
 
@@ -52,6 +56,9 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture
     void testTileRecombining();
     void testViewOrder();
     void testPreviewsDeprioritization();
+    void testSenderQueue();
+    void testSenderQueueTileDeduplication();
+    void testInvalidateViewCursorDeduplication();
 };
 
 void TileQueueTests::testTileQueuePriority()
@@ -259,6 +266,154 @@ void TileQueueTests::testPreviewsDeprioritization()
     CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(queue._queue.size()));
 }
 
+void TileQueueTests::testSenderQueue()
+{
+    SenderQueue<std::shared_ptr<MessagePayload>> queue;
+
+    std::shared_ptr<MessagePayload> item;
+
+    // Empty queue
+    CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+    const std::vector<std::string> messages =
+    {
+        "message 1",
+        "message 2",
+        "message 3"
+    };
+
+    for (const auto& msg : messages)
+    {
+        queue.enqueue(std::make_shared<MessagePayload>(msg));
+    }
+
+    CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
+
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(messages[0], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(messages[1], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(messages[2], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+}
+
+void TileQueueTests::testSenderQueueTileDeduplication()
+{
+    SenderQueue<std::shared_ptr<MessagePayload>> queue;
+
+    std::shared_ptr<MessagePayload> item;
+
+    // Empty queue
+    CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+    const std::vector<std::string> part_messages =
+    {
+        "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=0",
+        "tile: part=1 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1",
+        "tile: part=2 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1"
+    };
+
+    for (const auto& msg : part_messages)
+    {
+        queue.enqueue(std::make_shared<MessagePayload>(msg));
+    }
+
+    CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+    const std::vector<std::string> dup_messages =
+    {
+        "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1",
+        "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1",
+        "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1"
+    };
+
+    for (const auto& msg : dup_messages)
+    {
+        queue.enqueue(std::make_shared<MessagePayload>(msg));
+    }
+
+    CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+
+    // The last one should persist.
+    CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+}
+
+void TileQueueTests::testInvalidateViewCursorDeduplication()
+{
+    SenderQueue<std::shared_ptr<MessagePayload>> queue;
+
+    std::shared_ptr<MessagePayload> item;
+
+    // Empty queue
+    CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+    const std::vector<std::string> view_messages =
+    {
+        "invalidateviewcursor: {    \"viewId\": \"1\",     \"rectangle\": \"3999, 1418, 0, 298\",     \"part\": \"0\" }",
+        "invalidateviewcursor: {    \"viewId\": \"2\",     \"rectangle\": \"3999, 1418, 0, 298\",     \"part\": \"0\" }",
+        "invalidateviewcursor: {    \"viewId\": \"3\",     \"rectangle\": \"3999, 1418, 0, 298\",     \"part\": \"0\" }",
+    };
+
+    for (const auto& msg : view_messages)
+    {
+        queue.enqueue(std::make_shared<MessagePayload>(msg));
+    }
+
+    CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
+
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(view_messages[0], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(view_messages[1], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(view_messages[2], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+    const std::vector<std::string> dup_messages =
+    {
+        "invalidateviewcursor: {    \"viewId\": \"1\",     \"rectangle\": \"3999, 1418, 0, 298\",     \"part\": \"0\" }",
+        "invalidateviewcursor: {    \"viewId\": \"1\",     \"rectangle\": \"1000, 1418, 0, 298\",     \"part\": \"0\" }",
+        "invalidateviewcursor: {    \"viewId\": \"1\",     \"rectangle\": \"2000, 1418, 0, 298\",     \"part\": \"0\" }",
+    };
+
+    for (const auto& msg : dup_messages)
+    {
+        queue.enqueue(std::make_shared<MessagePayload>(msg));
+    }
+
+    CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+
+    // The last one should persist.
+    CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
+
+    CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+}
+
 CPPUNIT_TEST_SUITE_REGISTRATION(TileQueueTests);
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/test/WhiteBoxTests.cpp b/test/WhiteBoxTests.cpp
index 6ca75bc..75b9ec6 100644
--- a/test/WhiteBoxTests.cpp
+++ b/test/WhiteBoxTests.cpp
@@ -24,6 +24,8 @@ class WhiteBoxTests : public CPPUNIT_NS::TestFixture
     CPPUNIT_TEST_SUITE(WhiteBoxTests);
 
     CPPUNIT_TEST(testLOOLProtocolFunctions);
+    CPPUNIT_TEST(testMessageAbbreviation);
+    CPPUNIT_TEST(testTokenizer);
     CPPUNIT_TEST(testRegexListMatcher);
     CPPUNIT_TEST(testRegexListMatcher_Init);
     CPPUNIT_TEST(testEmptyCellCursor);
@@ -31,6 +33,8 @@ class WhiteBoxTests : public CPPUNIT_NS::TestFixture
     CPPUNIT_TEST_SUITE_END();
 
     void testLOOLProtocolFunctions();
+    void testMessageAbbreviation();
+    void testTokenizer();
     void testRegexListMatcher();
     void testRegexListMatcher_Init();
     void testEmptyCellCursor();
@@ -75,6 +79,103 @@ void WhiteBoxTests::testLOOLProtocolFunctions()
 
 }
 
+void WhiteBoxTests::testMessageAbbreviation()
+{
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring(nullptr, 5, '\n'));
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring(nullptr, -1, '\n'));
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring("abc", 0, '\n'));
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring("abc", -1, '\n'));
+    CPPUNIT_ASSERT_EQUAL(std::string("ab"), LOOLProtocol::getDelimitedInitialSubstring("abc", 2, '\n'));
+
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage(nullptr, 5));
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage(nullptr, -1));
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage("abc", 0));
+    CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage("abc", -1));
+    CPPUNIT_ASSERT_EQUAL(std::string("ab"), LOOLProtocol::getAbbreviatedMessage("abc", 2));
+
+    std::string s;
+    std::string abbr;
+
+    s = "abcdefg";
+    CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s));
+
+    s = "1234567890123\n45678901234567890123456789012345678901234567890123";
+    abbr = "1234567890123...";
+    CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size()));
+    CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s));
+
+    // 120 characters. Change when the abbreviation max-length changes.
+    s = "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890";
+    CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size()));
+    CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s));
+
+    abbr = s + "...";
+    s += "more data";
+    CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size()));
+    CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s));
+}
+
+void WhiteBoxTests::testTokenizer()
+{
+    std::vector<std::string> tokens;
+
+    tokens = LOOLProtocol::tokenize("");
+    CPPUNIT_ASSERT_EQUAL(0UL, tokens.size());
+
+    tokens = LOOLProtocol::tokenize("  ");
+    CPPUNIT_ASSERT_EQUAL(0UL, tokens.size());
+
+    tokens = LOOLProtocol::tokenize("A");
+    CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+    tokens = LOOLProtocol::tokenize("  A");
+    CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+    tokens = LOOLProtocol::tokenize("A  ");
+    CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+    tokens = LOOLProtocol::tokenize(" A ");
+    CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+    tokens = LOOLProtocol::tokenize(" A  Z ");
+    CPPUNIT_ASSERT_EQUAL(2UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+    CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]);
+
+    tokens = LOOLProtocol::tokenize("\n");
+    CPPUNIT_ASSERT_EQUAL(0UL, tokens.size());
+
+    tokens = LOOLProtocol::tokenize(" A  \nZ ");
+    CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+    tokens = LOOLProtocol::tokenize(" A  Z\n ");
+    CPPUNIT_ASSERT_EQUAL(2UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+    CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]);
+
+    tokens = LOOLProtocol::tokenize(" A  Z  \n ");
+    CPPUNIT_ASSERT_EQUAL(2UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+    CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]);
+
+    tokens = LOOLProtocol::tokenize("tile part=0 width=256 height=256 tileposx=0 tileposy=0 tilewidth=3840 tileheight=3840 ver=-1");
+    CPPUNIT_ASSERT_EQUAL(9UL, tokens.size());
+    CPPUNIT_ASSERT_EQUAL(std::string("tile"), tokens[0]);
+    CPPUNIT_ASSERT_EQUAL(std::string("part=0"), tokens[1]);
+    CPPUNIT_ASSERT_EQUAL(std::string("width=256"), tokens[2]);
+    CPPUNIT_ASSERT_EQUAL(std::string("height=256"), tokens[3]);
+    CPPUNIT_ASSERT_EQUAL(std::string("tileposx=0"), tokens[4]);
+    CPPUNIT_ASSERT_EQUAL(std::string("tileposy=0"), tokens[5]);
+    CPPUNIT_ASSERT_EQUAL(std::string("tilewidth=3840"), tokens[6]);
+    CPPUNIT_ASSERT_EQUAL(std::string("tileheight=3840"), tokens[7]);
+    CPPUNIT_ASSERT_EQUAL(std::string("ver=-1"), tokens[8]);
+}
+
 void WhiteBoxTests::testRegexListMatcher()
 {
     Util::RegexListMatcher matcher;
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index bb94bff..ae233e9 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -48,18 +48,14 @@ public:
 
     bool sendBinaryFrame(const char* buffer, int length) override
     {
-        auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary);
-        auto& output = payload->data();
-        std::memcpy(output.data(), buffer, length);
+        auto payload = std::make_shared<MessagePayload>(buffer, length, MessagePayload::Type::Binary);
         enqueueSendMessage(payload);
         return true;
     }
 
     bool sendTextFrame(const char* buffer, const int length) override
     {
-        auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text);
-        auto& output = payload->data();
-        std::memcpy(output.data(), buffer, length);
+        auto payload = std::make_shared<MessagePayload>(buffer, length, MessagePayload::Type::Text);
         enqueueSendMessage(payload);
         return true;
     }
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 513e12b..6b061ea 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -617,9 +617,7 @@ void DocumentBroker::alertAllUsers(const std::string& msg)
 {
     Util::assertIsLocked(_mutex);
 
-    auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
-    auto& output = payload->data();
-    std::memcpy(output.data(), msg.data(), msg.size());
+    auto payload = std::make_shared<MessagePayload>(msg);
 
     LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg);
     for (auto& it : _sessions)
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index b1ded81..44742ab 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -16,31 +16,101 @@
 #include <mutex>
 #include <vector>
 
+#include <Poco/Dynamic/Var.h>
+#include <Poco/JSON/JSON.h>
+#include <Poco/JSON/Object.h>
+#include <Poco/JSON/Parser.h>
+
 #include "common/SigUtil.hpp"
 #include "LOOLWebSocket.hpp"
 #include "Log.hpp"
+#include "TileDesc.hpp"
 
 /// The payload type used to send/receive data.
 class MessagePayload
 {
 public:
 
-    enum class Type { Text, Binary };
+    enum class Type { Text, JSON, Binary };
 
-    MessagePayload(const size_t size, enum Type type) :
-        _data(size),
+    /// Construct a text message.
+    /// message must include the full first-line.
+    MessagePayload(const std::string& message,
+                   const enum Type type = Type::Text) :
+        _data(message.data(), message.data() + message.size()),
+        _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())),
+        _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())),
+        _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())),
         _type(type)
     {
     }
 
-    std::vector<char>& data() { return _data; }
+    /// Construct a message from a string with type and
+    /// reserve extra space (total, including message).
+    /// message must include the full first-line.
+    MessagePayload(const std::string& message,
+                   const enum Type type,
+                   const size_t reserve) :
+        _data(std::max(reserve, message.size())),
+        _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())),
+        _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())),
+        _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())),
+        _type(type)
+    {
+        _data.resize(message.size());
+        std::memcpy(_data.data(), message.data(), message.size());
+    }
+
+    /// Construct a message from a character array with type.
+    /// data must be include the full first-line.
+    MessagePayload(const char* data,
+                   const size_t size,
+                   const enum Type type) :
+        _data(data, data + size),
+        _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())),
+        _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())),
+        _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())),
+        _type(type)
+    {
+    }
+
+    size_t size() const { return _data.size(); }
+    const std::vector<char>& data() const { return _data; }
+
+    const std::vector<std::string>& tokens() const { return _tokens; }
+    const std::string& firstToken() const { return _tokens[0]; }
+    const std::string& firstLine() const { return _firstLine; }
+    const std::string& abbreviation() const { return _abbreviation; }
+
+    /// Returns the json part of the message, if any.
+    std::string jsonString() const
+    {
+        if (_tokens.size() > 1 && _tokens[1] == "{")
+        {
+            const auto firstTokenSize = _tokens[0].size();
+            return std::string(_data.data() + firstTokenSize, _data.size() - firstTokenSize);
+        }
+
+        return std::string();
+    }
+
+    /// Append more data to the message.
+    void append(const char* data, const size_t size)
+    {
+        const auto curSize = _data.size();
+        _data.resize(curSize + size);
+        std::memcpy(_data.data() + curSize, data, size);
+    }
 
     /// Returns true if and only if the payload is considered Binary.
     bool isBinary() const { return _type == Type::Binary; }
 
 private:
     std::vector<char> _data;
-    Type _type;
+    const std::vector<std::string> _tokens;
+    const std::string _firstLine;
+    const std::string _abbreviation;
+    const Type _type;
 };
 
 struct SendItem
@@ -74,7 +144,10 @@ public:
         std::unique_lock<std::mutex> lock(_mutex);
         if (!stopping())
         {
-            _queue.push_back(item);
+            if (deduplicate(item))
+            {
+                _queue.push_back(item);
+            }
         }
 
         const size_t queuesize = _queue.size();
@@ -115,9 +188,83 @@ public:
     }
 
 private:
+    /// Deduplicate messages based on the new one.
+    /// Returns true if the new message should be
+    /// enqueued, otherwise false.
+    bool deduplicate(const Item& item)
+    {
+        // Deduplicate messages based on the incoming one.
+        const std::string command = item->firstToken();
+        if (command == "tile:")
+        {
+            // Remove previous identical tile, if any, and use most recent (incoming).
+            const TileDesc newTile = TileDesc::parse(item->firstLine());
+            const auto& pos = std::find_if(_queue.begin(), _queue.end(),
+                [&newTile](const queue_item_t& cur)
+                {
+                    return (cur->firstToken() == "tile:" &&
+                            newTile == TileDesc::parse(cur->firstLine()));
+                });
+
+            if (pos != _queue.end())
+            {
+                _queue.erase(pos);
+            }
+        }
+        else if (command == "statusindicatorsetvalue:" ||
+                 command == "invalidatecursor:")
+        {
+            // Remove previous identical enties of this command,
+            // if any, and use most recent (incoming).
+            const auto& pos = std::find_if(_queue.begin(), _queue.end(),
+                [&command](const queue_item_t& cur)
+                {
+                    return (cur->firstToken() == command);
+                });
+
+            if (pos != _queue.end())
+            {
+                _queue.erase(pos);
+            }
+        }
+        else if (command == "invalidateviewcursor:")
+        {
+            // Remove previous cursor invalidation for same view,
+            // if any, and use most recent (incoming).
+            const std::string newMsg = item->jsonString();
+            Poco::JSON::Parser newParser;
+            const auto newResult = newParser.parse(newMsg);
+            const auto& newJson = newResult.extract<Poco::JSON::Object::Ptr>();
+            const auto viewId = newJson->get("viewId").toString();
+            const auto& pos = std::find_if(_queue.begin(), _queue.end(),
+                [command, viewId](const queue_item_t& cur)
+                {
+                    if (cur->firstToken() == command)
+                    {
+                        const std::string msg = cur->jsonString();
+                        Poco::JSON::Parser parser;
+                        const auto result = parser.parse(msg);
+                        const auto& json = result.extract<Poco::JSON::Object::Ptr>();
+                        return (viewId == json->get("viewId").toString());
+                    }
+
+                    return false;
+                });
+
+            if (pos != _queue.end())
+            {
+                _queue.erase(pos);
+            }
+        }
+
+        return true;
+    }
+
+private:
     mutable std::mutex _mutex;
     std::condition_variable _cv;
     std::deque<Item> _queue;
+    typedef typename std::deque<Item>::value_type queue_item_t;
     std::atomic<bool> _stop;
 };
 
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 7a536e1..9bac2a6 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -173,16 +173,12 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
             std::string response = tile.serialize("tile:");
             LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response);
 
-            std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(response.size() + 1 + size,
-                                                                                       MessagePayload::Type::Binary);
-            {
-                auto& output = payload->data();
-
-                // Send to first subscriber as-is (without cache marker).
-                std::memcpy(output.data(), response.data(), response.size());
-                output[response.size()] = '\n';
-                std::memcpy(output.data() + response.size() + 1, data, size);
-            }
+            // Send to first subscriber as-is (without cache marker).
+            auto payload = std::make_shared<MessagePayload>(response,
+                                                            MessagePayload::Type::Binary,
+                                                            response.size() + 1 + size);
+            payload->append("\n", 1);
+            payload->append(data, size);
 
             auto& firstSubscriber = tileBeingRendered->_subscribers[0];
             auto firstSession = firstSubscriber.lock();
@@ -198,11 +194,10 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
 
                 // Create a new Payload.
                 payload.reset();
-                payload = std::make_shared<MessagePayload>(response.size() + size, MessagePayload::Type::Binary);
-                auto& output = payload->data();
-
-                std::memcpy(output.data(), response.data(), response.size());
-                std::memcpy(output.data() + response.size(), data, size);
+                payload = std::make_shared<MessagePayload>(response,
+                                                           MessagePayload::Type::Binary,
+                                                           response.size() + size);
+                payload->append(data, size);
 
                 for (size_t i = 1; i < subscriberCount; ++i)
                 {
diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp
index 765341d..9721deb 100644
--- a/wsd/TileDesc.hpp
+++ b/wsd/TileDesc.hpp
@@ -65,6 +65,19 @@ public:
     int getId() const { return _id; }
     bool getBroadcast() const { return _broadcast; }
 
+    bool operator==(const TileDesc& other) const
+    {
+        return _part == other._part &&
+               _width == other._width &&
+               _height == other._height &&
+               _tilePosX == other._tilePosX &&
+               _tilePosY == other._tilePosY &&
+               _tileWidth == other._tileWidth &&
+               _tileHeight == other._tileHeight &&
+               _id == other._id &&
+               _broadcast == other._broadcast;
+    }
+
     bool intersectsWithRect(int x, int y, int w, int h) const
     {
         return x + w >= getTilePosX() &&


More information about the Libreoffice-commits mailing list