[Libreoffice-commits] online.git: Branch 'distro/collabora/collabora-online-2-0' - wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/DocumentBroker.cpp wsd/PrisonerSession.cpp wsd/SenderQueue.hpp wsd/TileCache.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Wed Dec 14 10:27:10 UTC 2016
wsd/ClientSession.cpp | 44 ++++++++++++++++
wsd/ClientSession.hpp | 39 ++++++++++++++
wsd/DocumentBroker.cpp | 20 +++----
wsd/PrisonerSession.cpp | 1
wsd/SenderQueue.hpp | 126 ++++++++++++++++++++++++++++++++++++++++++++++++
wsd/TileCache.cpp | 73 ++++++++++++++-------------
6 files changed, 255 insertions(+), 48 deletions(-)
New commits:
commit 2de26e9ba46b0f3971223b8f9e530b3e51bdd644
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Tue Dec 13 21:59:01 2016 -0500
loolwsd: per-socket dedicated sending thread
To avoid degrading performance for everyone
because of a single slow/bad connection, we
send data to clients each in its own thread.
Change-Id: I6f980c25a404c4d05bcdb1979849ea3d2776c7b9
Reviewed-on: https://gerrit.libreoffice.org/31981
Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
Tested-by: Michael Meeks <michael.meeks at collabora.com>
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 0bd20fc..202114f 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -44,9 +44,12 @@ ClientSession::ClientSession(const std::string& id,
_uriPublic(uriPublic),
_isReadOnly(readOnly),
_isDocumentOwner(false),
- _loadPart(-1)
+ _loadPart(-1),
+ _stop(false)
{
Log::info("ClientSession ctor [" + getName() + "].");
+
+ _senderThread = std::thread([this]{ senderThread(); });
}
ClientSession::~ClientSession()
@@ -55,6 +58,13 @@ ClientSession::~ClientSession()
// Release the save-as queue.
_saveAsQueue.put("");
+
+ stop();
+ if (_senderThread.joinable())
+ {
+ _senderThread.join();
+ }
+
}
void ClientSession::bridgePrisonerSession()
@@ -418,4 +428,36 @@ void ClientSession::setReadOnly()
sendTextFrame("perm: readonly");
}
+void ClientSession::senderThread()
+{
+ LOG_DBG(getName() + " SenderThread started");
+
+ while (!stopping())
+ {
+ std::shared_ptr<MessagePayload> item;
+ if (_senderQueue.waitDequeue(item, static_cast<size_t>(COMMAND_TIMEOUT_MS)))
+ {
+ const std::vector<char>& data = item->data();
+ try
+ {
+ if (item->isBinary())
+ {
+ LOOLSession::sendBinaryFrame(data.data(), data.size());
+ }
+ else
+ {
+ LOOLSession::sendTextFrame(data.data(), data.size());
+ }
+ }
+ catch (const std::exception& ex)
+ {
+ LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
+ "] to " << getName() << ": " << ex.what());
+ }
+ }
+ }
+
+ LOG_DBG(getName() + " SenderThread finished");
+}
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index 3703593..bb94bff 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -12,6 +12,7 @@
#include "Session.hpp"
#include "MessageQueue.hpp"
+#include "SenderQueue.hpp"
#include <Poco/URI.h>
@@ -43,6 +44,38 @@ public:
void setDocumentOwner(const bool documentOwner) { _isDocumentOwner = documentOwner; }
bool isDocumentOwner() const { return _isDocumentOwner; }
+ using LOOLSession::sendTextFrame;
+
+ bool sendBinaryFrame(const char* buffer, int length) override
+ {
+ auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary);
+ auto& output = payload->data();
+ std::memcpy(output.data(), buffer, length);
+ enqueueSendMessage(payload);
+ return true;
+ }
+
+ bool sendTextFrame(const char* buffer, const int length) override
+ {
+ auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text);
+ auto& output = payload->data();
+ std::memcpy(output.data(), buffer, length);
+ enqueueSendMessage(payload);
+ return true;
+ }
+
+ void enqueueSendMessage(const std::shared_ptr<MessagePayload>& data)
+ {
+ _senderQueue.enqueue(data);
+ }
+
+ bool stopping() const { return _stop || _senderQueue.stopping(); }
+ void stop()
+ {
+ _stop = true;
+ _senderQueue.stop();
+ }
+
/**
* Return the URL of the saved-as document when it's ready. If called
* before it's ready, the call blocks till then.
@@ -91,6 +124,8 @@ private:
/// Eg. in readonly mode only few messages should be allowed
bool filterMessage(const std::string& msg) const;
+ void senderThread();
+
private:
std::weak_ptr<DocumentBroker> _docBroker;
@@ -110,6 +145,10 @@ private:
MessageQueue _saveAsQueue;
int _loadPart;
+
+ SenderQueue<std::shared_ptr<MessagePayload>> _senderQueue;
+ std::thread _senderThread;
+ std::atomic<bool> _stop;
};
#endif
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 2251cd0..513e12b 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -28,6 +28,7 @@
#include "PrisonerSession.hpp"
#include "Storage.hpp"
#include "TileCache.hpp"
+#include "SenderQueue.hpp"
#include "Unit.hpp"
using namespace LOOLProtocol;
@@ -243,7 +244,7 @@ bool DocumentBroker::load(std::shared_ptr<ClientSession>& session, const std::st
assert(_storage != nullptr);
- // Call the storage specific file info functions
+ // Call the storage specific fileinfo functions
std::string userid, username;
std::chrono::duration<double> getInfoCallDuration(0);
if (dynamic_cast<WopiStorage*>(_storage.get()) != nullptr)
@@ -616,17 +617,14 @@ void DocumentBroker::alertAllUsers(const std::string& msg)
{
Util::assertIsLocked(_mutex);
+ auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
+ auto& output = payload->data();
+ std::memcpy(output.data(), msg.data(), msg.size());
+
LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg);
for (auto& it : _sessions)
{
- try
- {
- it.second->sendTextFrame(msg);
- }
- catch (const std::exception& ex)
- {
- LOG_ERR("Error while alerting all users [" << msg << "]: " << ex.what());
- }
+ it.second->enqueueSendMessage(payload);
}
}
@@ -808,7 +806,7 @@ void DocumentBroker::cancelTileRequests(const std::shared_ptr<ClientSession>& se
void DocumentBroker::handleTileResponse(const std::vector<char>& payload)
{
const std::string firstLine = getFirstLine(payload);
- LOG_DBG("Handling tile combined: " << firstLine);
+ LOG_DBG("Handling tile: " << firstLine);
try
{
@@ -959,7 +957,7 @@ bool DocumentBroker::forwardToClient(const std::string& prefix, const std::vecto
}
else
{
- LOG_ERR("Failed to parse prefix of forward-to-client message: " << prefix);
+ LOG_ERR("Unexpected prefix of forward-to-client message: " << prefix);
}
return false;
diff --git a/wsd/PrisonerSession.cpp b/wsd/PrisonerSession.cpp
index bbfd902..ae9341e 100644
--- a/wsd/PrisonerSession.cpp
+++ b/wsd/PrisonerSession.cpp
@@ -23,6 +23,7 @@
#include "Log.hpp"
#include "ClientSession.hpp"
#include "Rectangle.hpp"
+#include "SenderQueue.hpp"
#include "Storage.hpp"
#include "TileCache.hpp"
#include "IoUtil.hpp"
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
new file mode 100644
index 0000000..b1ded81
--- /dev/null
+++ b/wsd/SenderQueue.hpp
@@ -0,0 +1,126 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_SENDERQUEUE_HPP
+#define INCLUDED_SENDERQUEUE_HPP
+
+#include <condition_variable>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "common/SigUtil.hpp"
+#include "LOOLWebSocket.hpp"
+#include "Log.hpp"
+
+/// The payload type used to send/receive data.
+class MessagePayload
+{
+public:
+
+ enum class Type { Text, Binary };
+
+ MessagePayload(const size_t size, enum Type type) :
+ _data(size),
+ _type(type)
+ {
+ }
+
+ std::vector<char>& data() { return _data; }
+
+ /// Returns true if and only if the payload is considered Binary.
+ bool isBinary() const { return _type == Type::Binary; }
+
+private:
+ std::vector<char> _data;
+ Type _type;
+};
+
+struct SendItem
+{
+ std::weak_ptr<LOOLWebSocket> Socket;
+ std::shared_ptr<MessagePayload> Data;
+ std::string Meta;
+ std::chrono::steady_clock::time_point BirthTime;
+};
+
+/// A queue of data to send to certain Session's WS.
+template <typename Item>
+class SenderQueue final
+{
+public:
+
+ SenderQueue() :
+ _stop(false)
+ {
+ }
+
+ bool stopping() const { return _stop || TerminationFlag; }
+ void stop()
+ {
+ _stop = true;
+ _cv.notify_all();
+ }
+
+ size_t enqueue(const Item& item)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+ if (!stopping())
+ {
+ _queue.push_back(item);
+ }
+
+ const size_t queuesize = _queue.size();
+ lock.unlock();
+
+ _cv.notify_one();
+ return queuesize;
+ }
+
+ bool waitDequeue(Item& item,
+ const size_t timeoutMs = std::numeric_limits<size_t>::max())
+ {
+ const auto timeToWait = std::chrono::milliseconds(timeoutMs);
+
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ if (!_queue.empty() ||
+ _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); }))
+ {
+ if (!stopping())
+ {
+ item = _queue.front();
+ _queue.pop_front();
+ return true;
+ }
+
+ LOG_DBG("SenderQueue: stopping");
+ return false;
+ }
+
+ return false;
+ }
+
+ size_t size() const
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _queue.size();
+ }
+
+private:
+ mutable std::mutex _mutex;
+ std::condition_variable _cv;
+ std::deque<Item> _queue;
+ std::atomic<bool> _stop;
+};
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index a6f0b7f..7a536e1 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -34,6 +34,7 @@
#include "Common.hpp"
#include "common/FileUtil.hpp"
#include "Protocol.hpp"
+#include "SenderQueue.hpp"
#include "Unit.hpp"
#include "Util.hpp"
@@ -155,8 +156,8 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
const auto cachedName = (tileBeingRendered ? tileBeingRendered->getCacheName()
: cacheFileName(tile));
- // Ignore if we can't save the tile, things will work anyway, but slower. An error indication
- // has been supposed to be sent to all users in that case.
+ // Ignore if we can't save the tile, things will work anyway, but slower.
+ // An error indication is supposed to be sent to all users in that case.
const auto fileName = _cacheDir + "/" + cachedName;
if (FileUtil::saveDataToFileSafely(fileName, data, size))
{
@@ -166,50 +167,50 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
// Notify subscribers, if any.
if (tileBeingRendered)
{
- if (!tileBeingRendered->_subscribers.empty())
+ const auto subscriberCount = tileBeingRendered->_subscribers.size();
+ if (subscriberCount > 0)
{
std::string response = tile.serialize("tile:");
- Log::debug("Sending tile message to subscribers: " + response);
+ LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response);
- std::vector<char> output(256 + size);
- output.resize(response.size() + 1 + size);
+ std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(response.size() + 1 + size,
+ MessagePayload::Type::Binary);
+ {
+ auto& output = payload->data();
- std::memcpy(output.data(), response.data(), response.size());
- output[response.size()] = '\n';
- std::memcpy(output.data() + response.size() + 1, data, size);
+ // Send to first subscriber as-is (without cache marker).
+ std::memcpy(output.data(), response.data(), response.size());
+ output[response.size()] = '\n';
+ std::memcpy(output.data() + response.size() + 1, data, size);
+ }
- // Send to first subscriber as-is (without cache marker).
- auto firstSubscriber = tileBeingRendered->_subscribers[0].lock();
- if (firstSubscriber)
+ auto& firstSubscriber = tileBeingRendered->_subscribers[0];
+ auto firstSession = firstSubscriber.lock();
+ if (firstSession)
{
- try
- {
- firstSubscriber->sendBinaryFrame(output.data(), output.size());
- }
- catch (const std::exception& ex)
- {
- Log::warn("Failed to send tile to " + firstSubscriber->getName() + ": " + ex.what());
- }
+ firstSession->enqueueSendMessage(payload);
}
- // All others must get served from the cache.
- response += " renderid=cached\n";
- output.resize(response.size() + size);
- std::memcpy(output.data(), response.data(), response.size());
- std::memcpy(output.data() + response.size(), data, size);
-
- for (size_t i = 1; i < tileBeingRendered->_subscribers.size(); ++i)
+ if (subscriberCount > 1)
{
- auto subscriber = tileBeingRendered->_subscribers[i].lock();
- if (subscriber)
+ // All others must get served from the cache.
+ response += " renderid=cached\n";
+
+ // Create a new Payload.
+ payload.reset();
+ payload = std::make_shared<MessagePayload>(response.size() + size, MessagePayload::Type::Binary);
+ auto& output = payload->data();
+
+ std::memcpy(output.data(), response.data(), response.size());
+ std::memcpy(output.data() + response.size(), data, size);
+
+ for (size_t i = 1; i < subscriberCount; ++i)
{
- try
- {
- subscriber->sendBinaryFrame(output.data(), output.size());
- }
- catch (const std::exception& ex)
+ auto& subscriber = tileBeingRendered->_subscribers[i];
+ auto session = subscriber.lock();
+ if (session)
{
- Log::warn("Failed to send tile to " + subscriber->getName() + ": " + ex.what());
+ session->enqueueSendMessage(payload);
}
}
}
@@ -443,7 +444,7 @@ void TileCache::saveLastModified(const Timestamp& timestamp)
}
// FIXME: to be further simplified when we centralize tile messages.
-void TileCache::subscribeToTileRendering(const TileDesc& tile, const std::shared_ptr<ClientSession> &subscriber)
+void TileCache::subscribeToTileRendering(const TileDesc& tile, const std::shared_ptr<ClientSession>& subscriber)
{
assert(subscriber->getKind() == LOOLSession::Kind::ToClient);
More information about the Libreoffice-commits
mailing list