[Libreoffice-commits] online.git: Branch 'distro/collabora/collabora-online-2-0' - wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/PrisonerSession.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Wed Dec 14 10:27:10 UTC 2016


 wsd/ClientSession.cpp   |   44 ++++++++++++++++
 wsd/ClientSession.hpp   |   39 ++++++++++++++
 wsd/DocumentBroker.cpp  |   20 +++----
 wsd/PrisonerSession.cpp |    1 
 wsd/SenderQueue.hpp     |  126 ++++++++++++++++++++++++++++++++++++++++++++++++
 wsd/TileCache.cpp       |   73 ++++++++++++++-------------
 6 files changed, 255 insertions(+), 48 deletions(-)

New commits:
commit 2de26e9ba46b0f3971223b8f9e530b3e51bdd644
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Tue Dec 13 21:59:01 2016 -0500

    loolwsd: per-socket dedicated sending thread
    
    To avoid degrading performance for everyone
    because of a single slow/bad connection, we
    send data to clients each in its own thread.
    
    Change-Id: I6f980c25a404c4d05bcdb1979849ea3d2776c7b9
    Reviewed-on: https://gerrit.libreoffice.org/31981
    Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
    Tested-by: Michael Meeks <michael.meeks at collabora.com>

diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 0bd20fc..202114f 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -44,9 +44,12 @@ ClientSession::ClientSession(const std::string& id,
     _uriPublic(uriPublic),
     _isReadOnly(readOnly),
     _isDocumentOwner(false),
-    _loadPart(-1)
+    _loadPart(-1),
+    _stop(false)
 {
     Log::info("ClientSession ctor [" + getName() + "].");
+
+    _senderThread = std::thread([this]{ senderThread(); });
 }
 
 ClientSession::~ClientSession()
@@ -55,6 +58,13 @@ ClientSession::~ClientSession()
 
     // Release the save-as queue.
     _saveAsQueue.put("");
+
+    stop();
+    if (_senderThread.joinable())
+    {
+        _senderThread.join();
+    }
+
 }
 
 void ClientSession::bridgePrisonerSession()
@@ -418,4 +428,36 @@ void ClientSession::setReadOnly()
     sendTextFrame("perm: readonly");
 }
 
+void ClientSession::senderThread()
+{
+    LOG_DBG(getName() + " SenderThread started");
+
+    while (!stopping())
+    {
+        std::shared_ptr<MessagePayload> item;
+        if (_senderQueue.waitDequeue(item, static_cast<size_t>(COMMAND_TIMEOUT_MS)))
+        {
+            const std::vector<char>& data = item->data();
+            try
+            {
+                if (item->isBinary())
+                {
+                    LOOLSession::sendBinaryFrame(data.data(), data.size());
+                }
+                else
+                {
+                    LOOLSession::sendTextFrame(data.data(), data.size());
+                }
+            }
+            catch (const std::exception& ex)
+            {
+                LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
+                        "] to " << getName() << ": " << ex.what());
+            }
+        }
+    }
+
+    LOG_DBG(getName() + " SenderThread finished");
+}
+
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index 3703593..bb94bff 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -12,6 +12,7 @@
 
 #include "Session.hpp"
 #include "MessageQueue.hpp"
+#include "SenderQueue.hpp"
 
 #include <Poco/URI.h>
 
@@ -43,6 +44,38 @@ public:
     void setDocumentOwner(const bool documentOwner) { _isDocumentOwner = documentOwner; }
     bool isDocumentOwner() const { return _isDocumentOwner; }
 
+    using LOOLSession::sendTextFrame;
+
+    bool sendBinaryFrame(const char* buffer, int length) override
+    {
+        auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary);
+        auto& output = payload->data();
+        std::memcpy(output.data(), buffer, length);
+        enqueueSendMessage(payload);
+        return true;
+    }
+
+    bool sendTextFrame(const char* buffer, const int length) override
+    {
+        auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text);
+        auto& output = payload->data();
+        std::memcpy(output.data(), buffer, length);
+        enqueueSendMessage(payload);
+        return true;
+    }
+
+    void enqueueSendMessage(const std::shared_ptr<MessagePayload>& data)
+    {
+        _senderQueue.enqueue(data);
+    }
+
+    bool stopping() const { return _stop || _senderQueue.stopping(); }
+    void stop()
+    {
+        _stop = true;
+        _senderQueue.stop();
+    }
+
     /**
      * Return the URL of the saved-as document when it's ready. If called
      * before it's ready, the call blocks till then.
@@ -91,6 +124,8 @@ private:
     /// Eg. in readonly mode only few messages should be allowed
     bool filterMessage(const std::string& msg) const;
 
+    void senderThread();
+
 private:
     std::weak_ptr<DocumentBroker> _docBroker;
 
@@ -110,6 +145,10 @@ private:
     MessageQueue _saveAsQueue;
 
     int _loadPart;
+
+    SenderQueue<std::shared_ptr<MessagePayload>> _senderQueue;
+    std::thread _senderThread;
+    std::atomic<bool> _stop;
 };
 
 #endif
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 2251cd0..513e12b 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -28,6 +28,7 @@
 #include "PrisonerSession.hpp"
 #include "Storage.hpp"
 #include "TileCache.hpp"
+#include "SenderQueue.hpp"
 #include "Unit.hpp"
 
 using namespace LOOLProtocol;
@@ -243,7 +244,7 @@ bool DocumentBroker::load(std::shared_ptr<ClientSession>& session, const std::st
 
     assert(_storage != nullptr);
 
-    // Call the storage specific file info functions
+    // Call the storage specific fileinfo functions
     std::string userid, username;
     std::chrono::duration<double> getInfoCallDuration(0);
     if (dynamic_cast<WopiStorage*>(_storage.get()) != nullptr)
@@ -616,17 +617,14 @@ void DocumentBroker::alertAllUsers(const std::string& msg)
 {
     Util::assertIsLocked(_mutex);
 
+    auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
+    auto& output = payload->data();
+    std::memcpy(output.data(), msg.data(), msg.size());
+
     LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg);
     for (auto& it : _sessions)
     {
-        try
-        {
-            it.second->sendTextFrame(msg);
-        }
-        catch (const std::exception& ex)
-        {
-            LOG_ERR("Error while alerting all users [" << msg << "]: " << ex.what());
-        }
+        it.second->enqueueSendMessage(payload);
     }
 }
 
@@ -808,7 +806,7 @@ void DocumentBroker::cancelTileRequests(const std::shared_ptr<ClientSession>& se
 void DocumentBroker::handleTileResponse(const std::vector<char>& payload)
 {
     const std::string firstLine = getFirstLine(payload);
-    LOG_DBG("Handling tile combined: " << firstLine);
+    LOG_DBG("Handling tile: " << firstLine);
 
     try
     {
@@ -959,7 +957,7 @@ bool DocumentBroker::forwardToClient(const std::string& prefix, const std::vecto
     }
     else
     {
-        LOG_ERR("Failed to parse prefix of forward-to-client message: " << prefix);
+        LOG_ERR("Unexpected prefix of forward-to-client message: " << prefix);
     }
 
     return false;
diff --git a/wsd/PrisonerSession.cpp b/wsd/PrisonerSession.cpp
index bbfd902..ae9341e 100644
--- a/wsd/PrisonerSession.cpp
+++ b/wsd/PrisonerSession.cpp
@@ -23,6 +23,7 @@
 #include "Log.hpp"
 #include "ClientSession.hpp"
 #include "Rectangle.hpp"
+#include "SenderQueue.hpp"
 #include "Storage.hpp"
 #include "TileCache.hpp"
 #include "IoUtil.hpp"
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
new file mode 100644
index 0000000..b1ded81
--- /dev/null
+++ b/wsd/SenderQueue.hpp
@@ -0,0 +1,126 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_SENDERQUEUE_HPP
+#define INCLUDED_SENDERQUEUE_HPP
+
+#include <condition_variable>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "common/SigUtil.hpp"
+#include "LOOLWebSocket.hpp"
+#include "Log.hpp"
+
+/// The payload type used to send/receive data.
+class MessagePayload
+{
+public:
+
+    enum class Type { Text, Binary };
+
+    MessagePayload(const size_t size, enum Type type) :
+        _data(size),
+        _type(type)
+    {
+    }
+
+    std::vector<char>& data() { return _data; }
+
+    /// Returns true if and only if the payload is considered Binary.
+    bool isBinary() const { return _type == Type::Binary; }
+
+private:
+    std::vector<char> _data;
+    Type _type;
+};
+
+struct SendItem
+{
+    std::weak_ptr<LOOLWebSocket> Socket;
+    std::shared_ptr<MessagePayload> Data;
+    std::string Meta;
+    std::chrono::steady_clock::time_point BirthTime;
+};
+
+/// A queue of data to send to certain Session's WS.
+template <typename Item>
+class SenderQueue final
+{
+public:
+
+    SenderQueue() :
+        _stop(false)
+    {
+    }
+
+    bool stopping() const { return _stop || TerminationFlag; }
+    void stop()
+    {
+        _stop = true;
+        _cv.notify_all();
+    }
+
+    size_t enqueue(const Item& item)
+    {
+        std::unique_lock<std::mutex> lock(_mutex);
+        if (!stopping())
+        {
+            _queue.push_back(item);
+        }
+
+        const size_t queuesize = _queue.size();
+        lock.unlock();
+
+        _cv.notify_one();
+        return queuesize;
+    }
+
+    bool waitDequeue(Item& item,
+                     const size_t timeoutMs = std::numeric_limits<size_t>::max())
+    {
+        const auto timeToWait = std::chrono::milliseconds(timeoutMs);
+
+        std::unique_lock<std::mutex> lock(_mutex);
+
+        if (!_queue.empty() ||
+            _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); }))
+        {
+            if (!stopping())
+            {
+                item = _queue.front();
+                _queue.pop_front();
+                return true;
+            }
+
+            LOG_DBG("SenderQueue: stopping");
+            return false;
+        }
+
+        return false;
+    }
+
+    size_t size() const
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _queue.size();
+    }
+
+private:
+    mutable std::mutex _mutex;
+    std::condition_variable _cv;
+    std::deque<Item> _queue;
+    std::atomic<bool> _stop;
+};
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index a6f0b7f..7a536e1 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -34,6 +34,7 @@
 #include "Common.hpp"
 #include "common/FileUtil.hpp"
 #include "Protocol.hpp"
+#include "SenderQueue.hpp"
 #include "Unit.hpp"
 #include "Util.hpp"
 
@@ -155,8 +156,8 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
     const auto cachedName = (tileBeingRendered ? tileBeingRendered->getCacheName()
                                                : cacheFileName(tile));
 
-    // Ignore if we can't save the tile, things will work anyway, but slower. An error indication
-    // has been supposed to be sent to all users in that case.
+    // Ignore if we can't save the tile, things will work anyway, but slower.
+    // An error indication is supposed to be sent to all users in that case.
     const auto fileName = _cacheDir + "/" + cachedName;
     if (FileUtil::saveDataToFileSafely(fileName, data, size))
     {
@@ -166,50 +167,50 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
     // Notify subscribers, if any.
     if (tileBeingRendered)
     {
-        if (!tileBeingRendered->_subscribers.empty())
+        const auto subscriberCount = tileBeingRendered->_subscribers.size();
+        if (subscriberCount > 0)
         {
             std::string response = tile.serialize("tile:");
-            Log::debug("Sending tile message to subscribers: " + response);
+            LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response);
 
-            std::vector<char> output(256 + size);
-            output.resize(response.size() + 1 + size);
+            std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(response.size() + 1 + size,
+                                                                                       MessagePayload::Type::Binary);
+            {
+                auto& output = payload->data();
 
-            std::memcpy(output.data(), response.data(), response.size());
-            output[response.size()] = '\n';
-            std::memcpy(output.data() + response.size() + 1, data, size);
+                // Send to first subscriber as-is (without cache marker).
+                std::memcpy(output.data(), response.data(), response.size());
+                output[response.size()] = '\n';
+                std::memcpy(output.data() + response.size() + 1, data, size);
+            }
 
-            // Send to first subscriber as-is (without cache marker).
-            auto firstSubscriber = tileBeingRendered->_subscribers[0].lock();
-            if (firstSubscriber)
+            auto& firstSubscriber = tileBeingRendered->_subscribers[0];
+            auto firstSession = firstSubscriber.lock();
+            if (firstSession)
             {
-                try
-                {
-                    firstSubscriber->sendBinaryFrame(output.data(), output.size());
-                }
-                catch (const std::exception& ex)
-                {
-                    Log::warn("Failed to send tile to " + firstSubscriber->getName() + ": " + ex.what());
-                }
+                firstSession->enqueueSendMessage(payload);
             }
 
-            // All others must get served from the cache.
-            response += " renderid=cached\n";
-            output.resize(response.size() + size);
-            std::memcpy(output.data(), response.data(), response.size());
-            std::memcpy(output.data() + response.size(), data, size);
-
-            for (size_t i = 1; i < tileBeingRendered->_subscribers.size(); ++i)
+            if (subscriberCount > 1)
             {
-                auto subscriber = tileBeingRendered->_subscribers[i].lock();
-                if (subscriber)
+                // All others must get served from the cache.
+                response += " renderid=cached\n";
+
+                // Create a new Payload.
+                payload.reset();
+                payload = std::make_shared<MessagePayload>(response.size() + size, MessagePayload::Type::Binary);
+                auto& output = payload->data();
+
+                std::memcpy(output.data(), response.data(), response.size());
+                std::memcpy(output.data() + response.size(), data, size);
+
+                for (size_t i = 1; i < subscriberCount; ++i)
                 {
-                    try
-                    {
-                        subscriber->sendBinaryFrame(output.data(), output.size());
-                    }
-                    catch (const std::exception& ex)
+                    auto& subscriber = tileBeingRendered->_subscribers[i];
+                    auto session = subscriber.lock();
+                    if (session)
                     {
-                        Log::warn("Failed to send tile to " + subscriber->getName() + ": " + ex.what());
+                        session->enqueueSendMessage(payload);
                     }
                 }
             }
@@ -443,7 +444,7 @@ void TileCache::saveLastModified(const Timestamp& timestamp)
 }
 
 // FIXME: to be further simplified when we centralize tile messages.
-void TileCache::subscribeToTileRendering(const TileDesc& tile, const std::shared_ptr<ClientSession> &subscriber)
+void TileCache::subscribeToTileRendering(const TileDesc& tile, const std::shared_ptr<ClientSession>& subscriber)
 {
     assert(subscriber->getKind() == LOOLSession::Kind::ToClient);
 


More information about the Libreoffice-commits mailing list