[Libreoffice-commits] online.git: Makefile.am test/Makefile.am wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/PrisonerSession.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Dec 14 04:21:38 UTC 2016


 Makefile.am             |    1 -
 test/Makefile.am        |    1 -
 wsd/ClientSession.cpp   |   44 +++++++++++++++++++++++++++++++++++++++++++-
 wsd/ClientSession.hpp   |   39 +++++++++++++++++++++++++++++++++++++++
 wsd/DocumentBroker.cpp  |   15 +++++----------
 wsd/PrisonerSession.cpp |    2 +-
 wsd/SenderQueue.hpp     |   48 ++++++++++++++++++++++++------------------------
 wsd/TileCache.cpp       |   12 ++++++++++--
 8 files changed, 122 insertions(+), 40 deletions(-)

New commits:
commit fe38e0e1e67c64bc7d91a5ac20c5bf05d4282928
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Tue Dec 13 19:20:05 2016 -0500

    loolwsd: per-socket dedicated sending thread
    
    To avoid degrading performance for everyone
    because of a single slow/bad connection, we
    send data to clients each in its own thread.
    
    Change-Id: I6f980c25a404c4d05bcdb1979849ea3d2776c7b9
    Reviewed-on: https://gerrit.libreoffice.org/31984
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/Makefile.am b/Makefile.am
index b1f1932..79120bc 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -53,7 +53,6 @@ loolwsd_SOURCES = wsd/Admin.cpp \
                   wsd/ClientSession.cpp \
                   wsd/FileServer.cpp \
                   wsd/PrisonerSession.cpp \
-                  wsd/SenderQueue.cpp \
                   wsd/Storage.cpp \
                   wsd/TileCache.cpp \
                   $(shared_sources)
diff --git a/test/Makefile.am b/test/Makefile.am
index 9a6d715..992830e 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -37,7 +37,6 @@ wsd_sources = \
             ../common/Session.cpp \
             ../common/MessageQueue.cpp \
             ../kit/Kit.cpp \
-            ../wsd/SenderQueue.cpp \
             ../wsd/TileCache.cpp \
             ../common/Unit.cpp \
             ../common/Util.cpp
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index f1188b5..c97ef64 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -44,9 +44,12 @@ ClientSession::ClientSession(const std::string& id,
     _uriPublic(uriPublic),
     _isReadOnly(readOnly),
     _isDocumentOwner(false),
-    _loadPart(-1)
+    _loadPart(-1),
+    _stop(false)
 {
     Log::info("ClientSession ctor [" + getName() + "].");
+
+    _senderThread = std::thread([this]{ senderThread(); });
 }
 
 ClientSession::~ClientSession()
@@ -55,6 +58,13 @@ ClientSession::~ClientSession()
 
     // Release the save-as queue.
     _saveAsQueue.put("");
+
+    stop();
+    if (_senderThread.joinable())
+    {
+        _senderThread.join();
+    }
+
 }
 
 void ClientSession::bridgePrisonerSession()
@@ -453,4 +463,36 @@ void ClientSession::setReadOnly()
     sendTextFrame("perm: readonly");
 }
 
+void ClientSession::senderThread()
+{
+    LOG_DBG(getName() + " SenderThread started");
+
+    while (!stopping())
+    {
+        std::shared_ptr<MessagePayload> item;
+        if (_senderQueue.waitDequeue(item, static_cast<size_t>(POLL_TIMEOUT_MS)))
+        {
+            const std::vector<char>& data = item->data();
+            try
+            {
+                if (item->isBinary())
+                {
+                    Session::sendBinaryFrame(data.data(), data.size());
+                }
+                else
+                {
+                    Session::sendTextFrame(data.data(), data.size());
+                }
+            }
+            catch (const std::exception& ex)
+            {
+                LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
+                        "] to " << getName() << ": " << ex.what());
+            }
+        }
+    }
+
+    LOG_DBG(getName() + " SenderThread finished");
+}
+
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index a96d807..f6bc8a5 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -13,6 +13,7 @@
 #include "Session.hpp"
 #include "Storage.hpp"
 #include "MessageQueue.hpp"
+#include "SenderQueue.hpp"
 
 #include <Poco/URI.h>
 
@@ -44,6 +45,38 @@ public:
     void setDocumentOwner(const bool documentOwner) { _isDocumentOwner = documentOwner; }
     bool isDocumentOwner() const { return _isDocumentOwner; }
 
+    using Session::sendTextFrame;
+
+    bool sendBinaryFrame(const char* buffer, int length) override
+    {
+        auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary);
+        auto& output = payload->data();
+        std::memcpy(output.data(), buffer, length);
+        enqueueSendMessage(payload);
+        return true;
+    }
+
+    bool sendTextFrame(const char* buffer, const int length) override
+    {
+        auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text);
+        auto& output = payload->data();
+        std::memcpy(output.data(), buffer, length);
+        enqueueSendMessage(payload);
+        return true;
+    }
+
+    void enqueueSendMessage(const std::shared_ptr<MessagePayload>& data)
+    {
+        _senderQueue.enqueue(data);
+    }
+
+    bool stopping() const { return _stop || _senderQueue.stopping(); }
+    void stop()
+    {
+        _stop = true;
+        _senderQueue.stop();
+    }
+
     /**
      * Return the URL of the saved-as document when it's ready. If called
      * before it's ready, the call blocks till then.
@@ -95,6 +128,8 @@ private:
     /// Eg. in readonly mode only few messages should be allowed
     bool filterMessage(const std::string& msg) const;
 
+    void senderThread();
+
 private:
     std::weak_ptr<DocumentBroker> _docBroker;
 
@@ -117,6 +152,10 @@ private:
 
     /// Wopi FileInfo object
     std::unique_ptr<WopiStorage::WOPIFileInfo> _wopiFileInfo;
+
+    SenderQueue<std::shared_ptr<MessagePayload>> _senderQueue;
+    std::thread _senderThread;
+    std::atomic<bool> _stop;
 };
 
 #endif
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 0470cbb..21c3b4c 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -596,9 +596,6 @@ size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session)
     // Now we are ready to bridge between the kit and client.
     session->bridgePrisonerSession();
 
-    // Provision for another thread to service this session.
-    SenderThreadPool::instance().incMaxThreadCount();
-
     return count;
 }
 
@@ -610,9 +607,6 @@ size_t DocumentBroker::removeSession(const std::string& id)
     {
         Admin::instance().rmDoc(_docKey, id);
 
-        // Reduce thread provisioning.
-        SenderThreadPool::instance().decMaxThreadCount();
-
         auto it = _sessions.find(id);
         if (it != _sessions.end())
         {
@@ -635,13 +629,14 @@ void DocumentBroker::alertAllUsers(const std::string& msg)
 {
     Util::assertIsLocked(_mutex);
 
+    auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
+    auto& output = payload->data();
+    std::memcpy(output.data(), msg.data(), msg.size());
+
     LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg);
     for (auto& it : _sessions)
     {
-        auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
-        auto& output = payload->data();
-        std::memcpy(output.data(), msg.data(), msg.size());
-        SenderQueue::instance().enqueue(it.second, payload);
+        it.second->enqueueSendMessage(payload);
     }
 }
 
diff --git a/wsd/PrisonerSession.cpp b/wsd/PrisonerSession.cpp
index 69eafad..59b6a43 100644
--- a/wsd/PrisonerSession.cpp
+++ b/wsd/PrisonerSession.cpp
@@ -286,7 +286,7 @@ bool PrisonerSession::forwardToPeer(const std::shared_ptr<ClientSession>& client
                                                                    : MessagePayload::Type::Text);
     auto& output = payload->data();
     std::memcpy(output.data(), buffer, length);
-    SenderQueue::instance().enqueue(clientSession, payload);
+    clientSession->enqueueSendMessage(payload);
 
     return true;
 }
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 3dc2856..647b81a 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -10,12 +10,14 @@
 #ifndef INCLUDED_SENDERQUEUE_HPP
 #define INCLUDED_SENDERQUEUE_HPP
 
+#include <condition_variable>
 #include <deque>
 #include <memory>
+#include <mutex>
 #include <vector>
 
 #include "common/SigUtil.hpp"
-#include "Session.hpp"
+#include "LOOLWebSocket.hpp"
 #include "Log.hpp"
 
 /// The payload type used to send/receive data.
@@ -43,32 +45,38 @@ private:
 
 struct SendItem
 {
-    std::weak_ptr<::Session> Session;
+    std::weak_ptr<LOOLWebSocket> Socket;
     std::shared_ptr<MessagePayload> Data;
+    std::string Meta;
     std::chrono::steady_clock::time_point BirthTime;
 };
 
-/// A queue of data to send to certain Sessions.
+/// A queue of data to send to certain Session's WS.
+template <typename Item>
 class SenderQueue final
 {
 public:
 
-    static SenderQueue& instance() { return TheQueue; }
+    SenderQueue() :
+        _stop(false)
+    {
+    }
 
     bool stopping() const { return _stop || TerminationFlag; }
     void stop()
     {
-         _stop = true;
-         _cv.notify_all();
+        _stop = true;
+        _cv.notify_all();
     }
 
-    size_t enqueue(const std::weak_ptr<Session>& session,
-                   const std::shared_ptr<MessagePayload>& data)
+    size_t enqueue(const Item& item)
     {
-        SendItem item = { session, data, std::chrono::steady_clock::now() };
-
         std::unique_lock<std::mutex> lock(_mutex);
-        _queue.push_back(item);
+        if (!stopping())
+        {
+            _queue.push_back(item);
+        }
+
         const size_t queuesize = _queue.size();
         lock.unlock();
 
@@ -76,7 +84,7 @@ public:
         return queuesize;
     }
 
-    bool waitDequeue(SendItem& item,
+    bool waitDequeue(Item& item,
                      const size_t timeoutMs = std::numeric_limits<size_t>::max())
     {
         const auto timeToWait = std::chrono::milliseconds(timeoutMs);
@@ -93,7 +101,7 @@ public:
                 return true;
             }
 
-            LOG_INF("SenderQueue: stopping");
+            LOG_DBG("SenderQueue: stopping");
             return false;
         }
 
@@ -109,16 +117,13 @@ public:
 private:
     mutable std::mutex _mutex;
     std::condition_variable _cv;
-    std::deque<SendItem> _queue;
+    std::deque<Item> _queue;
     std::atomic<bool> _stop;
-
-    /// The only SenderQueue instance.
-    static SenderQueue TheQueue;
 };
 
 /// Pool of sender threads.
 /// These are dedicated threads that only dequeue from
-/// the SenderQueue and send to the target Session.
+/// the SenderQueue and send to the target Session's WS.
 /// This pool has long-running threads that grow
 /// only on congention and shrink otherwise.
 class SenderThreadPool final
@@ -141,7 +146,7 @@ public:
     {
         // Stop us and the queue.
         stop();
-        SenderQueue::instance().stop();
+        //SenderQueue::instance().stop();
 
         for (const auto& threadData : _threads)
         {
@@ -152,8 +157,6 @@ public:
         }
     }
 
-    static SenderThreadPool& instance() { return ThePool; }
-
     void stop() { _stop = true; }
     bool stopping() const { return _stop || TerminationFlag; }
 
@@ -218,9 +221,6 @@ private:
 
     /// How often to do housekeeping when we idle.
     static constexpr size_t HousekeepIdleIntervalMs = 60000;
-
-    /// The only pool.
-    static SenderThreadPool ThePool;
 };
 
 #endif
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 60123a7..c00cc55 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -183,7 +183,11 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
             }
 
             auto& firstSubscriber = tileBeingRendered->_subscribers[0];
-            SenderQueue::instance().enqueue(firstSubscriber, payload);
+            auto firstSession = firstSubscriber.lock();
+            if (firstSession)
+            {
+                firstSession->enqueueSendMessage(payload);
+            }
 
             if (subscriberCount > 1)
             {
@@ -201,7 +205,11 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
                 for (size_t i = 1; i < subscriberCount; ++i)
                 {
                     auto& subscriber = tileBeingRendered->_subscribers[i];
-                    SenderQueue::instance().enqueue(subscriber, payload);
+                    auto session = subscriber.lock();
+                    if (session)
+                    {
+                        session->enqueueSendMessage(payload);
+                    }
                 }
             }
         }


More information about the Libreoffice-commits mailing list