[Libreoffice-commits] online.git: 2 commits - test/Makefile.am test/TileQueueTests.cpp wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/SenderQueue.hpp wsd/TestStubs.cpp wsd/TileCache.cpp

Michael Meeks michael.meeks at collabora.com
Wed Mar 15 14:23:02 UTC 2017


 test/Makefile.am        |    1 
 test/TileQueueTests.cpp |   28 +++++++++++++-------------
 wsd/ClientSession.cpp   |    2 -
 wsd/ClientSession.hpp   |    6 ++++-
 wsd/DocumentBroker.cpp  |    5 ++++
 wsd/DocumentBroker.hpp  |    4 +++
 wsd/LOOLWSD.cpp         |    2 -
 wsd/SenderQueue.hpp     |   51 +++++++++++++-----------------------------------
 wsd/TestStubs.cpp       |   20 ++++++++++++++++++
 wsd/TileCache.cpp       |    2 -
 10 files changed, 65 insertions(+), 56 deletions(-)

New commits:
commit 6d6dc458500c17d4ee30d125289dd3352cfada75
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Mar 15 12:07:17 2017 +0000

    Remove un-necesssary and inefficient wakeupWorld.
    
    While message emplacement happens in the DocumentBroker poll, we
    can be sure that the next iteration of the poll will call
    hasQueuedWrites before polling.

diff --git a/test/Makefile.am b/test/Makefile.am
index 4bc4e02..9683bee 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -38,6 +38,7 @@ wsd_sources = \
             ../common/MessageQueue.cpp \
             ../kit/Kit.cpp \
             ../wsd/TileCache.cpp \
+            ../wsd/TestStubs.cpp \
             ../common/Unit.cpp \
             ../common/Util.cpp \
             ../net/Socket.cpp
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index b59c566..8b88a59 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -14,7 +14,7 @@
 #include "Storage.hpp"
 #include "MessageQueue.hpp"
 #include "SenderQueue.hpp"
-
+#include "DocumentBroker.hpp"
 #include <Poco/URI.h>
 
 class DocumentBroker;
@@ -63,6 +63,10 @@ public:
 
     void enqueueSendMessage(const std::shared_ptr<Message>& data)
     {
+        const auto docBroker = _docBroker.lock();
+        // If in the correct thread - no need for wakeups.
+        assert (!docBroker || docBroker->isCorrectThread());
+
         if (isHeadless())
         {
             // Fail silently and return as there is no actual websocket
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 04fe5ac..a4153ff 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -167,6 +167,11 @@ void DocumentBroker::startThread()
     _poll->startThread();
 }
 
+bool DocumentBroker::isCorrectThread()
+{
+    return _poll->isCorrectThread();
+}
+
 // The inner heart of the DocumentBroker - our poll loop.
 void DocumentBroker::pollThread()
 {
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 8d36cb2..6762ad8 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -251,6 +251,10 @@ public:
         return _sessions.size();
     }
 
+    /// Are we running in either shutdown, or the polling thread.
+    bool isCorrectThread();
+
+    /// Pretty print internal state to a stream.
     void dumpState(std::ostream& os);
 
     std::string getJailRoot() const;
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 9ff7511..2b48d70 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -55,24 +55,11 @@ public:
     size_t enqueue(const Item& item)
     {
         std::unique_lock<std::mutex> lock(_mutex);
-        bool wasEmpty = _queue.empty();
-        if (!stopping())
-        {
-            if (deduplicate(item))
-            {
-                _queue.push_back(item);
-            }
-        }
 
-        const size_t queuesize = _queue.size();
-        lock.unlock();
+        if (!stopping() && deduplicate(item))
+            _queue.push_back(item);
 
-        if (wasEmpty)
-        {
-            // FIXME: Horrible hack - we need to wakeup just our own poll ...
-            SocketPoll::wakeupWorld();
-        }
-        return queuesize;
+        return _queue.size();
     }
 
     /// Dequeue an item if we have one - @returns true if we do, else false.
diff --git a/wsd/TestStubs.cpp b/wsd/TestStubs.cpp
new file mode 100644
index 0000000..7972aec
--- /dev/null
+++ b/wsd/TestStubs.cpp
@@ -0,0 +1,20 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+/*
+ * Stub missing symbols required for unit tests ...
+ */
+
+#include "config.h"
+
+#include "DocumentBroker.hpp"
+
+bool DocumentBroker::isCorrectThread() { return true; }
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 0607d1e..54ff8e7 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -178,9 +178,7 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
             auto& firstSubscriber = tileBeingRendered->_subscribers[0];
             auto firstSession = firstSubscriber.lock();
             if (firstSession)
-            {
                 firstSession->enqueueSendMessage(payload);
-            }
 
             if (subscriberCount > 1)
             {
commit 44ec90343ec1edfe09ee254993674523a5d5cfcd
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Mar 15 10:20:43 2017 +0000

    SenderQueue - remove condition, and waiting.

diff --git a/test/TileQueueTests.cpp b/test/TileQueueTests.cpp
index 8c4a5ad..4128df3 100644
--- a/test/TileQueueTests.cpp
+++ b/test/TileQueueTests.cpp
@@ -276,7 +276,7 @@ void TileQueueTests::testSenderQueue()
     std::shared_ptr<Message> item;
 
     // Empty queue
-    CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(false, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
 
     const std::vector<std::string> messages =
@@ -293,15 +293,15 @@ void TileQueueTests::testSenderQueue()
 
     CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
 
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
     CPPUNIT_ASSERT_EQUAL(messages[0], std::string(item->data().data(), item->data().size()));
 
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
     CPPUNIT_ASSERT_EQUAL(messages[1], std::string(item->data().data(), item->data().size()));
 
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
     CPPUNIT_ASSERT_EQUAL(messages[2], std::string(item->data().data(), item->data().size()));
 
@@ -315,7 +315,7 @@ void TileQueueTests::testSenderQueueTileDeduplication()
     std::shared_ptr<Message> item;
 
     // Empty queue
-    CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(false, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
 
     const std::vector<std::string> part_messages =
@@ -331,9 +331,9 @@ void TileQueueTests::testSenderQueueTileDeduplication()
     }
 
     CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
 
     CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
 
@@ -350,7 +350,7 @@ void TileQueueTests::testSenderQueueTileDeduplication()
     }
 
     CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
 
     // The last one should persist.
     CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
@@ -365,7 +365,7 @@ void TileQueueTests::testInvalidateViewCursorDeduplication()
     std::shared_ptr<Message> item;
 
     // Empty queue
-    CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10));
+    CPPUNIT_ASSERT_EQUAL(false, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
 
     const std::vector<std::string> view_messages =
@@ -382,15 +382,15 @@ void TileQueueTests::testInvalidateViewCursorDeduplication()
 
     CPPUNIT_ASSERT_EQUAL(3UL, queue.size());
 
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(2UL, queue.size());
     CPPUNIT_ASSERT_EQUAL(view_messages[0], std::string(item->data().data(), item->data().size()));
 
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
     CPPUNIT_ASSERT_EQUAL(view_messages[1], std::string(item->data().data(), item->data().size()));
 
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
     CPPUNIT_ASSERT_EQUAL(0UL, queue.size());
     CPPUNIT_ASSERT_EQUAL(view_messages[2], std::string(item->data().data(), item->data().size()));
 
@@ -409,7 +409,7 @@ void TileQueueTests::testInvalidateViewCursorDeduplication()
     }
 
     CPPUNIT_ASSERT_EQUAL(1UL, queue.size());
-    CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0));
+    CPPUNIT_ASSERT_EQUAL(true, queue.dequeue(item));
 
     // The last one should persist.
     CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size()));
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 75e16f5..5ca126d 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -439,7 +439,7 @@ void ClientSession::performWrites()
     LOG_DBG(getName() << " ClientSession: performing writes");
 
     std::shared_ptr<Message> item;
-    if (_senderQueue.waitDequeue(item, 0 /* ms - don't block */))
+    if (_senderQueue.dequeue(item))
     {
         const std::vector<char>& data = item->data();
         try
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 056b2e2..e141bb9 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -770,7 +770,7 @@ void LOOLWSD::initialize(Application& self)
 
     Log::initialize("wsd", logLevel, withColor, logToFile, logProperties);
 
-#if ENABLE_SSL
+#if 0 // ENABLE_SSL SAL_DEBUG
     LOOLWSD::SSLEnabled.set(getConfigValue<bool>(conf, "ssl.enable", true));
 #else
     LOOLWSD::SSLEnabled.set(false);
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 6debf59..9ff7511 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -50,7 +50,6 @@ public:
     void stop()
     {
         _stop = true;
-        _cv.notify_all();
     }
 
     size_t enqueue(const Item& item)
@@ -68,7 +67,6 @@ public:
         const size_t queuesize = _queue.size();
         lock.unlock();
 
-        _cv.notify_one();
         if (wasEmpty)
         {
             // FIXME: Horrible hack - we need to wakeup just our own poll ...
@@ -77,28 +75,23 @@ public:
         return queuesize;
     }
 
-    bool waitDequeue(Item& item,
-                     const size_t timeoutMs = std::numeric_limits<size_t>::max())
+    /// Dequeue an item if we have one - @returns true if we do, else false.
+    bool dequeue(Item& item)
     {
-        const auto timeToWait = std::chrono::milliseconds(timeoutMs);
-
         std::unique_lock<std::mutex> lock(_mutex);
 
-        if (!_queue.empty() ||
-            _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); }))
+        if (!_queue.empty() && !stopping())
         {
-            if (!stopping())
-            {
-                item = _queue.front();
-                _queue.pop_front();
-                return true;
-            }
-
-            LOG_DBG("SenderQueue: stopping");
+            item = _queue.front();
+            _queue.pop_front();
+            return true;
+        }
+        else
+        {
+            if (stopping())
+                LOG_DBG("SenderQueue: stopping");
             return false;
         }
-
-        return false;
     }
 
     size_t size() const
@@ -172,9 +165,7 @@ private:
                 });
 
             if (pos != _queue.end())
-            {
                 _queue.erase(pos);
-            }
         }
 
         return true;
@@ -182,7 +173,6 @@ private:
 
 private:
     mutable std::mutex _mutex;
-    std::condition_variable _cv;
     std::deque<Item> _queue;
     typedef typename std::deque<Item>::value_type queue_item_t;
     std::atomic<bool> _stop;


More information about the Libreoffice-commits mailing list