[Libreoffice-commits] online.git: 2 commits - test/Makefile.am test/TileQueueTests.cpp wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/SenderQueue.hpp wsd/TestStubs.cpp wsd/TileCache.cpp
Michael Meeks
michael.meeks at collabora.com
Wed Mar 15 14:23:02 UTC 2017
test/Makefile.am | 1
test/TileQueueTests.cpp | 28 +++++++++++++-------------
wsd/ClientSession.cpp | 2 -
wsd/ClientSession.hpp | 6 ++++-
wsd/DocumentBroker.cpp | 5 ++++
wsd/DocumentBroker.hpp | 4 +++
wsd/LOOLWSD.cpp | 2 -
wsd/SenderQueue.hpp | 51 +++++++++++++-----------------------------------
wsd/TestStubs.cpp | 20 ++++++++++++++++++
wsd/TileCache.cpp | 2 -
10 files changed, 65 insertions(+), 56 deletions(-)
New commits:
commit 6d6dc458500c17d4ee30d125289dd3352cfada75
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Wed Mar 15 12:07:17 2017 +0000
Remove un-necesssary and inefficient wakeupWorld.
While message emplacement happens in the DocumentBroker poll, we
can be sure that the next iteration of the poll will call
hasQueuedWrites before polling.
diff --git a/test/Makefile.am b/test/Makefile.am
index 4bc4e02..9683bee 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -38,6 +38,7 @@ wsd_sources = \
../common/MessageQueue.cpp \
../kit/Kit.cpp \
../wsd/TileCache.cpp \
+ ../wsd/TestStubs.cpp \
../common/Unit.cpp \
../common/Util.cpp \
../net/Socket.cpp
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index b59c566..8b88a59 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -14,7 +14,7 @@
#include "Storage.hpp"
#include "MessageQueue.hpp"
#include "SenderQueue.hpp"
-
+#include "DocumentBroker.hpp"
#include <Poco/URI.h>
class DocumentBroker;
@@ -63,6 +63,10 @@ public:
void enqueueSendMessage(const std::shared_ptr<Message>& data)
{
+ const auto docBroker = _docBroker.lock();
+ // If in the correct thread - no need for wakeups.
+ assert (!docBroker || docBroker->isCorrectThread());
+
if (isHeadless())
{
// Fail silently and return as there is no actual websocket
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 04fe5ac..a4153ff 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -167,6 +167,11 @@ void DocumentBroker::startThread()
_poll->startThread();
}
+bool DocumentBroker::isCorrectThread()
+{
+ return _poll->isCorrectThread();
+}
+
// The inner heart of the DocumentBroker - our poll loop.
void DocumentBroker::pollThread()
{
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 8d36cb2..6762ad8 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -251,6 +251,10 @@ public:
return _sessions.size();
}
+ /// Are we running in either shutdown, or the polling thread.
+ bool isCorrectThread();
+
+ /// Pretty print internal state to a stream.
void dumpState(std::ostream& os);
std::string getJailRoot() const;
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 9ff7511..2b48d70 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -55,24 +55,11 @@ public:
size_t enqueue(const Item& item)
{
std::unique_lock<std::mutex> lock(_mutex);
- bool wasEmpty = _queue.empty();
- if (!stopping())
- {
- if (deduplicate(item))
- {
- _queue.push_back(item);
- }
- }
- const size_t queuesize = _queue.size();
- lock.unlock();
+ if (!stopping() && deduplicate(item))
+ _queue.push_back(item);
- if (wasEmpty)
- {
- // FIXME: Horrible hack - we need to wakeup just our own poll ...
- SocketPoll::wakeupWorld();
- }
- return queuesize;
+ return _queue.size();
}
/// Dequeue an item if we have one - @returns true if we do, else false.
diff --git a/wsd/TestStubs.cpp b/wsd/TestStubs.cpp
new file mode 100644
index 0000000..7972aec
--- /dev/null
+++ b/wsd/TestStubs.cpp
@@ -0,0 +1,20 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+/*
+ * Stub missing symbols required for unit tests ...
+ */
+
+#include "config.h"
+
+#include "DocumentBroker.hpp"
+
+bool DocumentBroker::isCorrectThread() { return true; }
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 0607d1e..54ff8e7 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -178,9 +178,7 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
auto& firstSubscriber = tileBeingRendered->_subscribers[0];
auto firstSession = firstSubscriber.lock();
if (firstSession)
- {
firstSession->enqueueSendMessage(payload);
- }
if (subscriberCount > 1)
{
commit 44ec90343ec1edfe09ee254993674523a5d5cfcd
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Wed Mar 15 10:20:43 2017 +0000
SenderQueue - remove condition, and waiting.
diff --git a/test/TileQueueTests.cpp b/test/TileQueueTests.cpp
index 8c4a5ad..4128df3 100644
--- a/test/TileQueueTests.cpp
+++ b/test/TileQueueTests.cpp
@@ -276,7 +276,7 @@ void TileQueueTests::testSenderQueue()
std::shared_ptr<Message> item;
// Empty queue
- CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(false, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
const std::vector<std::string> messages =
@@ -293,15 +293,15 @@ void TileQueueTests::testSenderQueue()
CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
CPPUNIT_ASSERT_EQUAL(messages[0], std::string(item->data().data(), item->data().size()));
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
CPPUNIT_ASSERT_EQUAL(messages[1], std::string(item->data().data(), item->data().size()));
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
CPPUNIT_ASSERT_EQUAL(messages[2], std::string(item->data().data(), item->data().size()));
@@ -315,7 +315,7 @@ void TileQueueTests::testSenderQueueTileDeduplication()
std::shared_ptr<Message> item;
// Empty queue
- CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(false, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
const std::vector<std::string> part_messages =
@@ -331,9 +331,9 @@ void TileQueueTests::testSenderQueueTileDeduplication()
}
CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
@@ -350,7 +350,7 @@ void TileQueueTests::testSenderQueueTileDeduplication()
}
CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
// The last one should persist.
CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
@@ -365,7 +365,7 @@ void TileQueueTests::testInvalidateViewCursorDeduplication()
std::shared_ptr<Message> item;
// Empty queue
- CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+ CPPUNIT_ASSERT_EQUAL(false, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
const std::vector<std::string> view_messages =
@@ -382,15 +382,15 @@ void TileQueueTests::testInvalidateViewCursorDeduplication()
CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
CPPUNIT_ASSERT_EQUAL(view_messages[0], std::string(item->data().data(), item->data().size()));
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
CPPUNIT_ASSERT_EQUAL(view_messages[1], std::string(item->data().data(), item->data().size()));
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
CPPUNIT_ASSERT_EQUAL(view_messages[2], std::string(item->data().data(), item->data().size()));
@@ -409,7 +409,7 @@ void TileQueueTests::testInvalidateViewCursorDeduplication()
}
CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
- CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+ CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
// The last one should persist.
CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 75e16f5..5ca126d 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -439,7 +439,7 @@ void ClientSession::performWrites()
LOG_DBG(getName() << " ClientSession: performing writes");
std::shared_ptr<Message> item;
- if (_senderQueue.waitDequeue(item, 0 /* ms - don't block */))
+ if (_senderQueue.dequeue(item))
{
const std::vector<char>& data = item->data();
try
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 056b2e2..e141bb9 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -770,7 +770,7 @@ void LOOLWSD::initialize(Application& self)
Log::initialize("wsd", logLevel, withColor, logToFile, logProperties);
-#if ENABLE_SSL
+#if 0 // ENABLE_SSL SAL_DEBUG
LOOLWSD::SSLEnabled.set(getConfigValue<bool>(conf, "ssl.enable", true));
#else
LOOLWSD::SSLEnabled.set(false);
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 6debf59..9ff7511 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -50,7 +50,6 @@ public:
void stop()
{
_stop = true;
- _cv.notify_all();
}
size_t enqueue(const Item& item)
@@ -68,7 +67,6 @@ public:
const size_t queuesize = _queue.size();
lock.unlock();
- _cv.notify_one();
if (wasEmpty)
{
// FIXME: Horrible hack - we need to wakeup just our own poll ...
@@ -77,28 +75,23 @@ public:
return queuesize;
}
- bool waitDequeue(Item& item,
- const size_t timeoutMs = std::numeric_limits<size_t>::max())
+ /// Dequeue an item if we have one - @returns true if we do, else false.
+ bool dequeue(Item& item)
{
- const auto timeToWait = std::chrono::milliseconds(timeoutMs);
-
std::unique_lock<std::mutex> lock(_mutex);
- if (!_queue.empty() ||
- _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); }))
+ if (!_queue.empty() && !stopping())
{
- if (!stopping())
- {
- item = _queue.front();
- _queue.pop_front();
- return true;
- }
-
- LOG_DBG("SenderQueue: stopping");
+ item = _queue.front();
+ _queue.pop_front();
+ return true;
+ }
+ else
+ {
+ if (stopping())
+ LOG_DBG("SenderQueue: stopping");
return false;
}
-
- return false;
}
size_t size() const
@@ -172,9 +165,7 @@ private:
});
if (pos != _queue.end())
- {
_queue.erase(pos);
- }
}
return true;
@@ -182,7 +173,6 @@ private:
private:
mutable std::mutex _mutex;
- std::condition_variable _cv;
std::deque<Item> _queue;
typedef typename std::deque<Item>::value_type queue_item_t;
std::atomic<bool> _stop;
More information about the Libreoffice-commits
mailing list