[Libreoffice-commits] online.git: common/SenderQueue.cpp common/SenderQueue.hpp Makefile.am test/Makefile.am wsd/DocumentBroker.cpp wsd/TileCache.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Dec 12 05:19:22 UTC 2016
Makefile.am | 2
common/SenderQueue.cpp | 49 ++++++++++++++++++++
common/SenderQueue.hpp | 116 +++++++++++++++++++++++++++++++++++++++++++++++++
test/Makefile.am | 3 -
wsd/DocumentBroker.cpp | 1
wsd/TileCache.cpp | 87 +++++++++++++-----------------------
6 files changed, 203 insertions(+), 55 deletions(-)
New commits:
commit 233cb94eff8cdd3430d8626651494bdcd6f7d76f
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sat Dec 10 22:33:45 2016 -0500
loolwsd: SenderQueue to hold messages to send to clients
This adds SenderQueue and a wrapper of messages to
send back to clients.
Currently no threading takes place, but the messages
are pumped through the queue nonetheless.
Change-Id: Id9997539c0a2a351cbf406f649c268dd3643e88e
Reviewed-on: https://gerrit.libreoffice.org/31883
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/Makefile.am b/Makefile.am
index dd8640a..de5feb6 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -39,6 +39,7 @@ shared_sources = common/FileUtil.cpp \
common/Protocol.cpp \
common/Session.cpp \
common/MessageQueue.cpp \
+ common/SenderQueue.cpp \
common/SigUtil.cpp \
common/SpookyV2.cpp \
common/Unit.cpp \
@@ -120,6 +121,7 @@ shared_headers = common/Common.hpp \
common/MessageQueue.hpp \
common/Png.hpp \
common/Rectangle.hpp \
+ common/SenderQueue.hpp \
common/SigUtil.hpp \
common/security.h \
common/SpookyV2.h
diff --git a/common/SenderQueue.cpp b/common/SenderQueue.cpp
new file mode 100644
index 0000000..6a81758
--- /dev/null
+++ b/common/SenderQueue.cpp
@@ -0,0 +1,49 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "SenderQueue.hpp"
+
+#include <algorithm>
+
+#include <Protocol.hpp>
+#include <Log.hpp>
+
+SenderQueue SenderQueue::TheQueue;
+
+bool DispatchSendItem(const size_t timeoutMs)
+{
+ SendItem item;
+ if (SenderQueue::instance().waitDequeue(item, timeoutMs))
+ {
+ auto session = item.Session.lock();
+ if (session)
+ {
+ try
+ {
+ const std::vector<char>& data = item.Data->data();
+ if (item.Data->isBinary())
+ {
+ return session->sendBinaryFrame(data.data(), data.size());
+ }
+ else
+ {
+ return session->sendTextFrame(data.data(), data.size());
+ }
+ }
+ catch (const std::exception& ex)
+ {
+ LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
+ }
+ }
+ }
+
+ return false;
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/common/SenderQueue.hpp b/common/SenderQueue.hpp
new file mode 100644
index 0000000..4d80a0c
--- /dev/null
+++ b/common/SenderQueue.hpp
@@ -0,0 +1,116 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_SENDERQUEUE_HPP
+#define INCLUDED_SENDERQUEUE_HPP
+
+#include <functional>
+
+#include <Poco/NotificationQueue.h>
+#include <Poco/Runnable.h>
+#include <Poco/ThreadPool.h>
+
+#include "Session.hpp"
+#include "Log.hpp"
+
+/// The payload type used to send/receive data.
+class MessagePayload
+{
+public:
+
+ enum class Type { Text, Binary };
+
+ MessagePayload(const size_t size, enum Type type) :
+ _data(size),
+ _type(type)
+ {
+ }
+
+ std::vector<char>& data() { return _data; }
+
+ Type getType() const { return _type; }
+ void setType(const Type type) { _type = type; }
+
+ /// Returns true if and only if the payload is considered Binary.
+ bool isBinary() const { return _type == Type::Binary; }
+
+private:
+ std::vector<char> _data;
+ Type _type;
+};
+
+struct SendItem
+{
+ std::weak_ptr<LOOLSession> Session;
+ std::shared_ptr<MessagePayload> Data;
+ std::chrono::steady_clock::time_point BirthTime;
+};
+
+/// A queue of data to send to certain Sessions.
+class SenderQueue
+{
+public:
+
+ static SenderQueue& instance() { return TheQueue; }
+
+ size_t enqueue(const std::weak_ptr<LOOLSession>& session,
+ const std::shared_ptr<MessagePayload>& data)
+ {
+ SendItem item = { session, data, std::chrono::steady_clock::now() };
+
+ std::unique_lock<std::mutex> lock(_mutex);
+ _queue.push_back(item);
+ const size_t size = _queue.size();
+ lock.unlock();
+
+ _cv.notify_one();
+ return size;
+ }
+
+ bool waitDequeue(SendItem& item,
+ const size_t timeoutMs = std::numeric_limits<size_t>::max())
+ {
+ const auto timeToWait = std::chrono::milliseconds(timeoutMs);
+
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ LOG_TRC("SenderQueue size: " << _queue.size());
+ if (!_queue.empty() ||
+ _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty(); }))
+ {
+ item = _queue.front();
+ _queue.pop_front();
+ return true;
+ }
+
+ LOG_WRN("SenderQueue: timeout");
+ return false;
+ }
+
+ size_t size() const
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _queue.size();
+ }
+
+private:
+ mutable std::mutex _mutex;
+ std::condition_variable _cv;
+ std::deque<SendItem> _queue;
+
+ /// The only SenderQueue instance.
+ static SenderQueue TheQueue;
+};
+
+/// Dequeue a SendItem and send it.
+bool DispatchSendItem(const size_t timeoutMs = std::numeric_limits<size_t>::max());
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/test/Makefile.am b/test/Makefile.am
index 98a8ce9..e00e172 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -35,7 +35,8 @@ wsd_sources = \
../common/Log.cpp \
../common/Protocol.cpp \
../common/Session.cpp \
- ../common/MessageQueue.cpp \
+ ../common/MessageQueue.cpp \
+ ../common/SenderQueue.cpp \
../kit/Kit.cpp \
../wsd/TileCache.cpp \
../common/Unit.cpp \
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 48852cc..01c4a8c 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -28,6 +28,7 @@
#include "PrisonerSession.hpp"
#include "Storage.hpp"
#include "TileCache.hpp"
+#include "SenderQueue.hpp"
#include "Unit.hpp"
using namespace LOOLProtocol;
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 8eb2c0e..5137073 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -33,6 +33,7 @@
#include "ClientSession.hpp"
#include "Common.hpp"
#include "common/FileUtil.hpp"
+#include "common/SenderQueue.hpp"
#include "Protocol.hpp"
#include "Unit.hpp"
#include "Util.hpp"
@@ -145,11 +146,6 @@ std::unique_ptr<std::fstream> TileCache::lookupTile(const TileDesc& tile)
return nullptr;
}
-static void enqueueTask(const std::function<void()>& func)
-{
- func();
-}
-
void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const size_t size)
{
std::unique_lock<std::mutex> lock(_tilesBeingRenderedMutex);
@@ -171,63 +167,46 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
// Notify subscribers, if any.
if (tileBeingRendered)
{
- if (!tileBeingRendered->_subscribers.empty())
+ const auto subscriberCount = tileBeingRendered->_subscribers.size();
+ if (subscriberCount > 0)
{
std::string response = tile.serialize("tile:");
- Log::debug("Sending tile message to subscribers: " + response);
+ LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response);
- std::shared_ptr<std::vector<char>> payload = std::make_shared<std::vector<char>>(256 + size);
- auto& output = *payload;
- output.resize(response.size() + 1 + size);
+ std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(256 + size, MessagePayload::Type::Binary);
+ {
+ auto& output = payload->data();
+ output.resize(response.size() + 1 + size);
- // Send to first subscriber as-is (without cache marker).
- std::memcpy(output.data(), response.data(), response.size());
- output[response.size()] = '\n';
- std::memcpy(output.data() + response.size() + 1, data, size);
+ // Send to first subscriber as-is (without cache marker).
+ std::memcpy(output.data(), response.data(), response.size());
+ output[response.size()] = '\n';
+ std::memcpy(output.data() + response.size() + 1, data, size);
+ }
auto& firstSubscriber = tileBeingRendered->_subscribers[0];
- enqueueTask([firstSubscriber, payload]()
- {
- auto session = firstSubscriber.lock();
- if (session)
- {
- try
- {
- session->sendBinaryFrame(payload->data(), payload->size());
- }
- catch (const std::exception& ex)
- {
- LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
- }
- }
- }
- );
-
- // All others must get served from the cache.
- response += " renderid=cached\n";
- output.resize(response.size() + size);
- std::memcpy(output.data(), response.data(), response.size());
- std::memcpy(output.data() + response.size(), data, size);
+ SenderQueue::instance().enqueue(firstSubscriber, payload);
+ DispatchSendItem();
- for (size_t i = 1; i < tileBeingRendered->_subscribers.size(); ++i)
+ if (subscriberCount > 1)
{
- auto& subscriber = tileBeingRendered->_subscribers[i];
- enqueueTask([subscriber, payload]()
- {
- auto session = subscriber.lock();
- if (session)
- {
- try
- {
- session->sendBinaryFrame(payload->data(), payload->size());
- }
- catch (const std::exception& ex)
- {
- LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
- }
- }
- }
- );
+ // Create a new Payload.
+ payload.reset();
+ payload = std::make_shared<MessagePayload>(256 + size, MessagePayload::Type::Binary);
+ auto& output = payload->data();
+
+ // All others must get served from the cache.
+ response += " renderid=cached\n";
+ output.resize(response.size() + size);
+ std::memcpy(output.data(), response.data(), response.size());
+ std::memcpy(output.data() + response.size(), data, size);
+
+ for (size_t i = 1; i < subscriberCount; ++i)
+ {
+ auto& subscriber = tileBeingRendered->_subscribers[i];
+ SenderQueue::instance().enqueue(subscriber, payload);
+ DispatchSendItem();
+ }
}
}
else
More information about the Libreoffice-commits
mailing list