[Libreoffice-commits] online.git: Branch 'distro/collabora/collabora-online-2-0' - common/Protocol.hpp test/TileQueueTests.cpp test/WhiteBoxTests.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp wsd/TileDesc.hpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Dec 19 12:00:44 UTC 2016
common/Protocol.hpp | 74 ++++++++++++++++++++--
test/TileQueueTests.cpp | 155 ++++++++++++++++++++++++++++++++++++++++++++++
test/WhiteBoxTests.cpp | 101 ++++++++++++++++++++++++++++++
wsd/ClientSession.hpp | 8 --
wsd/DocumentBroker.cpp | 4 -
wsd/SenderQueue.hpp | 159 ++++++++++++++++++++++++++++++++++++++++++++++--
wsd/TileCache.cpp | 25 +++----
wsd/TileDesc.hpp | 13 +++
8 files changed, 503 insertions(+), 36 deletions(-)
New commits:
commit 8b59e78ad0b44a8f98ce18c8529e00227d0b357c
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sat Dec 17 14:28:11 2016 -0500
wsd: message deduplication in SenderQueue
Squashed multiple commits from master.
Change-Id: I0ad9ef7bf826b3fd0eba9cc17ec5212a3334a2f5
Reviewed-on: https://gerrit.libreoffice.org/32164
Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
Tested-by: Michael Meeks <michael.meeks at collabora.com>
diff --git a/common/Protocol.hpp b/common/Protocol.hpp
index e95090d..67f7ec7 100644
--- a/common/Protocol.hpp
+++ b/common/Protocol.hpp
@@ -77,16 +77,65 @@ namespace LOOLProtocol
// Functions that parse messages. All return false if parsing fails
bool parseStatus(const std::string& message, LibreOfficeKitDocumentType& type, int& nParts, int& currentPart, int& width, int& height);
+ /// Tokenize space-delimited values until we hit new-line or the end.
inline
- std::string getDelimitedInitialSubstring(const char *message, const int length, const char delim)
+ std::vector<std::string> tokenize(const char* data, const size_t size)
{
- if (message == nullptr || length <= 0)
+ std::vector<std::string> tokens;
+ if (size == 0 || data == nullptr)
+ {
+ return tokens;
+ }
+
+ const char* start = data;
+ const char* end = data;
+ for (size_t i = 0; i < size && data[i] != '\n'; ++i, ++end)
+ {
+ if (data[i] == ' ')
+ {
+ if (start != end && *start != ' ')
+ {
+ tokens.emplace_back(start, end);
+ }
+
+ start = end;
+ }
+ else if (*start == ' ')
+ {
+ ++start;
+ }
+ }
+
+ if (start != end && *start != ' ' && *start != '\n')
{
- return "";
+ tokens.emplace_back(start, end);
}
- const char *founddelim = static_cast<const char *>(std::memchr(message, delim, length));
- const auto size = (founddelim == nullptr ? length : founddelim - message);
+ return tokens;
+ }
+
+ inline
+ std::vector<std::string> tokenize(const std::string& s)
+ {
+ return tokenize(s.data(), s.size());
+ }
+
+ inline size_t getDelimiterPosition(const char* message, const int length, const char delim)
+ {
+ if (message && length > 0)
+ {
+ const char *founddelim = static_cast<const char *>(std::memchr(message, delim, length));
+ const auto size = (founddelim == nullptr ? length : founddelim - message);
+ return size;
+ }
+
+ return 0;
+ }
+
+ inline
+ std::string getDelimitedInitialSubstring(const char *message, const int length, const char delim)
+ {
+ const auto size = getDelimiterPosition(message, length, delim);
return std::string(message, size);
}
@@ -170,7 +219,7 @@ namespace LOOLProtocol
{
if (message == nullptr || length <= 0)
{
- return "";
+ return std::string();
}
const auto firstLine = getFirstLine(message, std::min(length, 120));
@@ -184,6 +233,19 @@ namespace LOOLProtocol
return firstLine;
}
+ inline std::string getAbbreviatedMessage(const std::string& message)
+ {
+ const auto pos = getDelimiterPosition(message.data(), std::min(message.size(), 120UL), '\n');
+
+ // If first line is less than the length (minus newline), add ellipsis.
+ if (pos < static_cast<std::string::size_type>(message.size()) - 1)
+ {
+ return message.substr(0, pos) + "...";
+ }
+
+ return message;
+ }
+
template <typename T>
std::string getAbbreviatedMessage(const T& message)
{
diff --git a/test/TileQueueTests.cpp b/test/TileQueueTests.cpp
index dea692b..4587cc4 100644
--- a/test/TileQueueTests.cpp
+++ b/test/TileQueueTests.cpp
@@ -14,6 +14,7 @@
#include "Common.hpp"
#include "Protocol.hpp"
#include "MessageQueue.hpp"
+#include "SenderQueue.hpp"
#include "Util.hpp"
namespace CPPUNIT_NS
@@ -44,6 +45,9 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture
CPPUNIT_TEST(testTileRecombining);
CPPUNIT_TEST(testViewOrder);
CPPUNIT_TEST(testPreviewsDeprioritization);
+ CPPUNIT_TEST(testSenderQueue);
+ CPPUNIT_TEST(testSenderQueueTileDeduplication);
+ CPPUNIT_TEST(testInvalidateViewCursorDeduplication);
CPPUNIT_TEST_SUITE_END();
@@ -52,6 +56,9 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture
void testTileRecombining();
void testViewOrder();
void testPreviewsDeprioritization();
+ void testSenderQueue();
+ void testSenderQueueTileDeduplication();
+ void testInvalidateViewCursorDeduplication();
};
void TileQueueTests::testTileQueuePriority()
@@ -259,6 +266,154 @@ void TileQueueTests::testPreviewsDeprioritization()
CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(queue._queue.size()));
}
+void TileQueueTests::testSenderQueue()
+{
+ SenderQueue<std::shared_ptr<MessagePayload>> queue;
+
+ std::shared_ptr<MessagePayload> item;
+
+ // Empty queue
+ CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+ const std::vector<std::string> messages =
+ {
+ "message 1",
+ "message 2",
+ "message 3"
+ };
+
+ for (const auto& msg : messages)
+ {
+ queue.enqueue(std::make_shared<MessagePayload>(msg));
+ }
+
+ CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
+
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(messages[0], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(messages[1], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(messages[2], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+}
+
+void TileQueueTests::testSenderQueueTileDeduplication()
+{
+ SenderQueue<std::shared_ptr<MessagePayload>> queue;
+
+ std::shared_ptr<MessagePayload> item;
+
+ // Empty queue
+ CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+ const std::vector<std::string> part_messages =
+ {
+ "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=0",
+ "tile: part=1 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1",
+ "tile: part=2 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1"
+ };
+
+ for (const auto& msg : part_messages)
+ {
+ queue.enqueue(std::make_shared<MessagePayload>(msg));
+ }
+
+ CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+ const std::vector<std::string> dup_messages =
+ {
+ "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1",
+ "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1",
+ "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1"
+ };
+
+ for (const auto& msg : dup_messages)
+ {
+ queue.enqueue(std::make_shared<MessagePayload>(msg));
+ }
+
+ CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+
+ // The last one should persist.
+ CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+}
+
+void TileQueueTests::testInvalidateViewCursorDeduplication()
+{
+ SenderQueue<std::shared_ptr<MessagePayload>> queue;
+
+ std::shared_ptr<MessagePayload> item;
+
+ // Empty queue
+ CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+ const std::vector<std::string> view_messages =
+ {
+ "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }",
+ "invalidateviewcursor: { \"viewId\": \"2\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }",
+ "invalidateviewcursor: { \"viewId\": \"3\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }",
+ };
+
+ for (const auto& msg : view_messages)
+ {
+ queue.enqueue(std::make_shared<MessagePayload>(msg));
+ }
+
+ CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
+
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(view_messages[0], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(view_messages[1], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(view_messages[2], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+
+ const std::vector<std::string> dup_messages =
+ {
+ "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }",
+ "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"1000, 1418, 0, 298\", \"part\": \"0\" }",
+ "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"2000, 1418, 0, 298\", \"part\": \"0\" }",
+ };
+
+ for (const auto& msg : dup_messages)
+ {
+ queue.enqueue(std::make_shared<MessagePayload>(msg));
+ }
+
+ CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
+ CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+
+ // The last one should persist.
+ CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
+
+ CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
+}
+
CPPUNIT_TEST_SUITE_REGISTRATION(TileQueueTests);
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/test/WhiteBoxTests.cpp b/test/WhiteBoxTests.cpp
index 6ca75bc..75b9ec6 100644
--- a/test/WhiteBoxTests.cpp
+++ b/test/WhiteBoxTests.cpp
@@ -24,6 +24,8 @@ class WhiteBoxTests : public CPPUNIT_NS::TestFixture
CPPUNIT_TEST_SUITE(WhiteBoxTests);
CPPUNIT_TEST(testLOOLProtocolFunctions);
+ CPPUNIT_TEST(testMessageAbbreviation);
+ CPPUNIT_TEST(testTokenizer);
CPPUNIT_TEST(testRegexListMatcher);
CPPUNIT_TEST(testRegexListMatcher_Init);
CPPUNIT_TEST(testEmptyCellCursor);
@@ -31,6 +33,8 @@ class WhiteBoxTests : public CPPUNIT_NS::TestFixture
CPPUNIT_TEST_SUITE_END();
void testLOOLProtocolFunctions();
+ void testMessageAbbreviation();
+ void testTokenizer();
void testRegexListMatcher();
void testRegexListMatcher_Init();
void testEmptyCellCursor();
@@ -75,6 +79,103 @@ void WhiteBoxTests::testLOOLProtocolFunctions()
}
+void WhiteBoxTests::testMessageAbbreviation()
+{
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring(nullptr, 5, '\n'));
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring(nullptr, -1, '\n'));
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring("abc", 0, '\n'));
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring("abc", -1, '\n'));
+ CPPUNIT_ASSERT_EQUAL(std::string("ab"), LOOLProtocol::getDelimitedInitialSubstring("abc", 2, '\n'));
+
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage(nullptr, 5));
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage(nullptr, -1));
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage("abc", 0));
+ CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage("abc", -1));
+ CPPUNIT_ASSERT_EQUAL(std::string("ab"), LOOLProtocol::getAbbreviatedMessage("abc", 2));
+
+ std::string s;
+ std::string abbr;
+
+ s = "abcdefg";
+ CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s));
+
+ s = "1234567890123\n45678901234567890123456789012345678901234567890123";
+ abbr = "1234567890123...";
+ CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size()));
+ CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s));
+
+ // 120 characters. Change when the abbreviation max-length changes.
+ s = "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890";
+ CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size()));
+ CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s));
+
+ abbr = s + "...";
+ s += "more data";
+ CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size()));
+ CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s));
+}
+
+void WhiteBoxTests::testTokenizer()
+{
+ std::vector<std::string> tokens;
+
+ tokens = LOOLProtocol::tokenize("");
+ CPPUNIT_ASSERT_EQUAL(0UL, tokens.size());
+
+ tokens = LOOLProtocol::tokenize(" ");
+ CPPUNIT_ASSERT_EQUAL(0UL, tokens.size());
+
+ tokens = LOOLProtocol::tokenize("A");
+ CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+ tokens = LOOLProtocol::tokenize(" A");
+ CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+ tokens = LOOLProtocol::tokenize("A ");
+ CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+ tokens = LOOLProtocol::tokenize(" A ");
+ CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+ tokens = LOOLProtocol::tokenize(" A Z ");
+ CPPUNIT_ASSERT_EQUAL(2UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+ CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]);
+
+ tokens = LOOLProtocol::tokenize("\n");
+ CPPUNIT_ASSERT_EQUAL(0UL, tokens.size());
+
+ tokens = LOOLProtocol::tokenize(" A \nZ ");
+ CPPUNIT_ASSERT_EQUAL(1UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+
+ tokens = LOOLProtocol::tokenize(" A Z\n ");
+ CPPUNIT_ASSERT_EQUAL(2UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+ CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]);
+
+ tokens = LOOLProtocol::tokenize(" A Z \n ");
+ CPPUNIT_ASSERT_EQUAL(2UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]);
+ CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]);
+
+ tokens = LOOLProtocol::tokenize("tile part=0 width=256 height=256 tileposx=0 tileposy=0 tilewidth=3840 tileheight=3840 ver=-1");
+ CPPUNIT_ASSERT_EQUAL(9UL, tokens.size());
+ CPPUNIT_ASSERT_EQUAL(std::string("tile"), tokens[0]);
+ CPPUNIT_ASSERT_EQUAL(std::string("part=0"), tokens[1]);
+ CPPUNIT_ASSERT_EQUAL(std::string("width=256"), tokens[2]);
+ CPPUNIT_ASSERT_EQUAL(std::string("height=256"), tokens[3]);
+ CPPUNIT_ASSERT_EQUAL(std::string("tileposx=0"), tokens[4]);
+ CPPUNIT_ASSERT_EQUAL(std::string("tileposy=0"), tokens[5]);
+ CPPUNIT_ASSERT_EQUAL(std::string("tilewidth=3840"), tokens[6]);
+ CPPUNIT_ASSERT_EQUAL(std::string("tileheight=3840"), tokens[7]);
+ CPPUNIT_ASSERT_EQUAL(std::string("ver=-1"), tokens[8]);
+}
+
void WhiteBoxTests::testRegexListMatcher()
{
Util::RegexListMatcher matcher;
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index bb94bff..ae233e9 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -48,18 +48,14 @@ public:
bool sendBinaryFrame(const char* buffer, int length) override
{
- auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary);
- auto& output = payload->data();
- std::memcpy(output.data(), buffer, length);
+ auto payload = std::make_shared<MessagePayload>(buffer, length, MessagePayload::Type::Binary);
enqueueSendMessage(payload);
return true;
}
bool sendTextFrame(const char* buffer, const int length) override
{
- auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text);
- auto& output = payload->data();
- std::memcpy(output.data(), buffer, length);
+ auto payload = std::make_shared<MessagePayload>(buffer, length, MessagePayload::Type::Text);
enqueueSendMessage(payload);
return true;
}
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 513e12b..6b061ea 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -617,9 +617,7 @@ void DocumentBroker::alertAllUsers(const std::string& msg)
{
Util::assertIsLocked(_mutex);
- auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
- auto& output = payload->data();
- std::memcpy(output.data(), msg.data(), msg.size());
+ auto payload = std::make_shared<MessagePayload>(msg);
LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg);
for (auto& it : _sessions)
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index b1ded81..44742ab 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -16,31 +16,101 @@
#include <mutex>
#include <vector>
+#include <Poco/Dynamic/Var.h>
+#include <Poco/JSON/JSON.h>
+#include <Poco/JSON/Object.h>
+#include <Poco/JSON/Parser.h>
+
#include "common/SigUtil.hpp"
#include "LOOLWebSocket.hpp"
#include "Log.hpp"
+#include "TileDesc.hpp"
/// The payload type used to send/receive data.
class MessagePayload
{
public:
- enum class Type { Text, Binary };
+ enum class Type { Text, JSON, Binary };
- MessagePayload(const size_t size, enum Type type) :
- _data(size),
+ /// Construct a text message.
+ /// message must include the full first-line.
+ MessagePayload(const std::string& message,
+ const enum Type type = Type::Text) :
+ _data(message.data(), message.data() + message.size()),
+ _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())),
+ _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())),
+ _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())),
_type(type)
{
}
- std::vector<char>& data() { return _data; }
+ /// Construct a message from a string with type and
+ /// reserve extra space (total, including message).
+ /// message must include the full first-line.
+ MessagePayload(const std::string& message,
+ const enum Type type,
+ const size_t reserve) :
+ _data(std::max(reserve, message.size())),
+ _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())),
+ _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())),
+ _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())),
+ _type(type)
+ {
+ _data.resize(message.size());
+ std::memcpy(_data.data(), message.data(), message.size());
+ }
+
+ /// Construct a message from a character array with type.
+ /// data must be include the full first-line.
+ MessagePayload(const char* data,
+ const size_t size,
+ const enum Type type) :
+ _data(data, data + size),
+ _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())),
+ _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())),
+ _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())),
+ _type(type)
+ {
+ }
+
+ size_t size() const { return _data.size(); }
+ const std::vector<char>& data() const { return _data; }
+
+ const std::vector<std::string>& tokens() const { return _tokens; }
+ const std::string& firstToken() const { return _tokens[0]; }
+ const std::string& firstLine() const { return _firstLine; }
+ const std::string& abbreviation() const { return _abbreviation; }
+
+ /// Returns the json part of the message, if any.
+ std::string jsonString() const
+ {
+ if (_tokens.size() > 1 && _tokens[1] == "{")
+ {
+ const auto firstTokenSize = _tokens[0].size();
+ return std::string(_data.data() + firstTokenSize, _data.size() - firstTokenSize);
+ }
+
+ return std::string();
+ }
+
+ /// Append more data to the message.
+ void append(const char* data, const size_t size)
+ {
+ const auto curSize = _data.size();
+ _data.resize(curSize + size);
+ std::memcpy(_data.data() + curSize, data, size);
+ }
/// Returns true if and only if the payload is considered Binary.
bool isBinary() const { return _type == Type::Binary; }
private:
std::vector<char> _data;
- Type _type;
+ const std::vector<std::string> _tokens;
+ const std::string _firstLine;
+ const std::string _abbreviation;
+ const Type _type;
};
struct SendItem
@@ -74,7 +144,10 @@ public:
std::unique_lock<std::mutex> lock(_mutex);
if (!stopping())
{
- _queue.push_back(item);
+ if (deduplicate(item))
+ {
+ _queue.push_back(item);
+ }
}
const size_t queuesize = _queue.size();
@@ -115,9 +188,83 @@ public:
}
private:
+ /// Deduplicate messages based on the new one.
+ /// Returns true if the new message should be
+ /// enqueued, otherwise false.
+ bool deduplicate(const Item& item)
+ {
+ // Deduplicate messages based on the incoming one.
+ const std::string command = item->firstToken();
+ if (command == "tile:")
+ {
+ // Remove previous identical tile, if any, and use most recent (incoming).
+ const TileDesc newTile = TileDesc::parse(item->firstLine());
+ const auto& pos = std::find_if(_queue.begin(), _queue.end(),
+ [&newTile](const queue_item_t& cur)
+ {
+ return (cur->firstToken() == "tile:" &&
+ newTile == TileDesc::parse(cur->firstLine()));
+ });
+
+ if (pos != _queue.end())
+ {
+ _queue.erase(pos);
+ }
+ }
+ else if (command == "statusindicatorsetvalue:" ||
+ command == "invalidatecursor:")
+ {
+ // Remove previous identical enties of this command,
+ // if any, and use most recent (incoming).
+ const auto& pos = std::find_if(_queue.begin(), _queue.end(),
+ [&command](const queue_item_t& cur)
+ {
+ return (cur->firstToken() == command);
+ });
+
+ if (pos != _queue.end())
+ {
+ _queue.erase(pos);
+ }
+ }
+ else if (command == "invalidateviewcursor:")
+ {
+ // Remove previous cursor invalidation for same view,
+ // if any, and use most recent (incoming).
+ const std::string newMsg = item->jsonString();
+ Poco::JSON::Parser newParser;
+ const auto newResult = newParser.parse(newMsg);
+ const auto& newJson = newResult.extract<Poco::JSON::Object::Ptr>();
+ const auto viewId = newJson->get("viewId").toString();
+ const auto& pos = std::find_if(_queue.begin(), _queue.end(),
+ [command, viewId](const queue_item_t& cur)
+ {
+ if (cur->firstToken() == command)
+ {
+ const std::string msg = cur->jsonString();
+ Poco::JSON::Parser parser;
+ const auto result = parser.parse(msg);
+ const auto& json = result.extract<Poco::JSON::Object::Ptr>();
+ return (viewId == json->get("viewId").toString());
+ }
+
+ return false;
+ });
+
+ if (pos != _queue.end())
+ {
+ _queue.erase(pos);
+ }
+ }
+
+ return true;
+ }
+
+private:
mutable std::mutex _mutex;
std::condition_variable _cv;
std::deque<Item> _queue;
+ typedef typename std::deque<Item>::value_type queue_item_t;
std::atomic<bool> _stop;
};
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 7a536e1..9bac2a6 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -173,16 +173,12 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
std::string response = tile.serialize("tile:");
LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response);
- std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(response.size() + 1 + size,
- MessagePayload::Type::Binary);
- {
- auto& output = payload->data();
-
- // Send to first subscriber as-is (without cache marker).
- std::memcpy(output.data(), response.data(), response.size());
- output[response.size()] = '\n';
- std::memcpy(output.data() + response.size() + 1, data, size);
- }
+ // Send to first subscriber as-is (without cache marker).
+ auto payload = std::make_shared<MessagePayload>(response,
+ MessagePayload::Type::Binary,
+ response.size() + 1 + size);
+ payload->append("\n", 1);
+ payload->append(data, size);
auto& firstSubscriber = tileBeingRendered->_subscribers[0];
auto firstSession = firstSubscriber.lock();
@@ -198,11 +194,10 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
// Create a new Payload.
payload.reset();
- payload = std::make_shared<MessagePayload>(response.size() + size, MessagePayload::Type::Binary);
- auto& output = payload->data();
-
- std::memcpy(output.data(), response.data(), response.size());
- std::memcpy(output.data() + response.size(), data, size);
+ payload = std::make_shared<MessagePayload>(response,
+ MessagePayload::Type::Binary,
+ response.size() + size);
+ payload->append(data, size);
for (size_t i = 1; i < subscriberCount; ++i)
{
diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp
index 765341d..9721deb 100644
--- a/wsd/TileDesc.hpp
+++ b/wsd/TileDesc.hpp
@@ -65,6 +65,19 @@ public:
int getId() const { return _id; }
bool getBroadcast() const { return _broadcast; }
+ bool operator==(const TileDesc& other) const
+ {
+ return _part == other._part &&
+ _width == other._width &&
+ _height == other._height &&
+ _tilePosX == other._tilePosX &&
+ _tilePosY == other._tilePosY &&
+ _tileWidth == other._tileWidth &&
+ _tileHeight == other._tileHeight &&
+ _id == other._id &&
+ _broadcast == other._broadcast;
+ }
+
bool intersectsWithRect(int x, int y, int w, int h) const
{
return x + w >= getTilePosX() &&
More information about the Libreoffice-commits
mailing list