[Libreoffice-commits] online.git: common/SenderQueue.cpp common/SenderQueue.hpp Makefile.am test/Makefile.am wsd/DocumentBroker.cpp wsd/TileCache.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Dec 12 05:19:22 UTC 2016


 Makefile.am            |    2 
 common/SenderQueue.cpp |   49 ++++++++++++++++++++
 common/SenderQueue.hpp |  116 +++++++++++++++++++++++++++++++++++++++++++++++++
 test/Makefile.am       |    3 -
 wsd/DocumentBroker.cpp |    1 
 wsd/TileCache.cpp      |   87 +++++++++++++-----------------------
 6 files changed, 203 insertions(+), 55 deletions(-)

New commits:
commit 233cb94eff8cdd3430d8626651494bdcd6f7d76f
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sat Dec 10 22:33:45 2016 -0500

    loolwsd: SenderQueue to hold messages to send to clients
    
    This adds SenderQueue and a wrapper of messages to
    send back to clients.
    
    Currently no threading takes place, but the messages
    are pumped through the queue nonetheless.
    
    Change-Id: Id9997539c0a2a351cbf406f649c268dd3643e88e
    Reviewed-on: https://gerrit.libreoffice.org/31883
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/Makefile.am b/Makefile.am
index dd8640a..de5feb6 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -39,6 +39,7 @@ shared_sources = common/FileUtil.cpp \
                  common/Protocol.cpp \
                  common/Session.cpp \
                  common/MessageQueue.cpp \
+                 common/SenderQueue.cpp \
                  common/SigUtil.cpp \
                  common/SpookyV2.cpp \
                  common/Unit.cpp \
@@ -120,6 +121,7 @@ shared_headers = common/Common.hpp \
                  common/MessageQueue.hpp \
                  common/Png.hpp \
                  common/Rectangle.hpp \
+                 common/SenderQueue.hpp \
                  common/SigUtil.hpp \
                  common/security.h \
                  common/SpookyV2.h
diff --git a/common/SenderQueue.cpp b/common/SenderQueue.cpp
new file mode 100644
index 0000000..6a81758
--- /dev/null
+++ b/common/SenderQueue.cpp
@@ -0,0 +1,49 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "SenderQueue.hpp"
+
+#include <algorithm>
+
+#include <Protocol.hpp>
+#include <Log.hpp>
+
+SenderQueue SenderQueue::TheQueue;
+
+bool DispatchSendItem(const size_t timeoutMs)
+{
+    SendItem item;
+    if (SenderQueue::instance().waitDequeue(item, timeoutMs))
+    {
+        auto session = item.Session.lock();
+        if (session)
+        {
+            try
+            {
+                const std::vector<char>& data = item.Data->data();
+                if (item.Data->isBinary())
+                {
+                    return session->sendBinaryFrame(data.data(), data.size());
+                }
+                else
+                {
+                    return session->sendTextFrame(data.data(), data.size());
+                }
+            }
+            catch (const std::exception& ex)
+            {
+                LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
+            }
+        }
+    }
+
+    return false;
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/common/SenderQueue.hpp b/common/SenderQueue.hpp
new file mode 100644
index 0000000..4d80a0c
--- /dev/null
+++ b/common/SenderQueue.hpp
@@ -0,0 +1,116 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_SENDERQUEUE_HPP
+#define INCLUDED_SENDERQUEUE_HPP
+
+#include <functional>
+
+#include <Poco/NotificationQueue.h>
+#include <Poco/Runnable.h>
+#include <Poco/ThreadPool.h>
+
+#include "Session.hpp"
+#include "Log.hpp"
+
+/// The payload type used to send/receive data.
+class MessagePayload
+{
+public:
+
+    enum class Type { Text, Binary };
+
+    MessagePayload(const size_t size, enum Type type) :
+        _data(size),
+        _type(type)
+    {
+    }
+
+    std::vector<char>& data() { return _data; }
+
+    Type getType() const { return _type; }
+    void setType(const Type type) { _type = type; }
+
+    /// Returns true if and only if the payload is considered Binary.
+    bool isBinary() const { return _type == Type::Binary; }
+
+private:
+    std::vector<char> _data;
+    Type _type;
+};
+
+struct SendItem
+{
+    std::weak_ptr<LOOLSession> Session;
+    std::shared_ptr<MessagePayload> Data;
+    std::chrono::steady_clock::time_point BirthTime;
+};
+
+/// A queue of data to send to certain Sessions.
+class SenderQueue
+{
+public:
+
+    static SenderQueue& instance() { return TheQueue; }
+
+    size_t enqueue(const std::weak_ptr<LOOLSession>& session,
+                   const std::shared_ptr<MessagePayload>& data)
+    {
+        SendItem item = { session, data, std::chrono::steady_clock::now() };
+
+        std::unique_lock<std::mutex> lock(_mutex);
+        _queue.push_back(item);
+        const size_t size = _queue.size();
+        lock.unlock();
+
+        _cv.notify_one();
+        return size;
+    }
+
+    bool waitDequeue(SendItem& item,
+                     const size_t timeoutMs = std::numeric_limits<size_t>::max())
+    {
+        const auto timeToWait = std::chrono::milliseconds(timeoutMs);
+
+        std::unique_lock<std::mutex> lock(_mutex);
+
+        LOG_TRC("SenderQueue size: " << _queue.size());
+        if (!_queue.empty() ||
+            _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty(); }))
+        {
+            item = _queue.front();
+            _queue.pop_front();
+            return true;
+        }
+
+        LOG_WRN("SenderQueue: timeout");
+        return false;
+    }
+
+    size_t size() const
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _queue.size();
+    }
+
+private:
+    mutable std::mutex _mutex;
+    std::condition_variable _cv;
+    std::deque<SendItem> _queue;
+
+    /// The only SenderQueue instance.
+    static SenderQueue TheQueue;
+};
+
+/// Dequeue a SendItem and send it.
+bool DispatchSendItem(const size_t timeoutMs = std::numeric_limits<size_t>::max());
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/test/Makefile.am b/test/Makefile.am
index 98a8ce9..e00e172 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -35,7 +35,8 @@ wsd_sources = \
             ../common/Log.cpp \
             ../common/Protocol.cpp \
             ../common/Session.cpp \
-	    ../common/MessageQueue.cpp \
+            ../common/MessageQueue.cpp \
+            ../common/SenderQueue.cpp \
             ../kit/Kit.cpp \
             ../wsd/TileCache.cpp \
             ../common/Unit.cpp \
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 48852cc..01c4a8c 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -28,6 +28,7 @@
 #include "PrisonerSession.hpp"
 #include "Storage.hpp"
 #include "TileCache.hpp"
+#include "SenderQueue.hpp"
 #include "Unit.hpp"
 
 using namespace LOOLProtocol;
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 8eb2c0e..5137073 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -33,6 +33,7 @@
 #include "ClientSession.hpp"
 #include "Common.hpp"
 #include "common/FileUtil.hpp"
+#include "common/SenderQueue.hpp"
 #include "Protocol.hpp"
 #include "Unit.hpp"
 #include "Util.hpp"
@@ -145,11 +146,6 @@ std::unique_ptr<std::fstream> TileCache::lookupTile(const TileDesc& tile)
     return nullptr;
 }
 
-static void enqueueTask(const std::function<void()>& func)
-{
-    func();
-}
-
 void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const size_t size)
 {
     std::unique_lock<std::mutex> lock(_tilesBeingRenderedMutex);
@@ -171,63 +167,46 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
     // Notify subscribers, if any.
     if (tileBeingRendered)
     {
-        if (!tileBeingRendered->_subscribers.empty())
+        const auto subscriberCount = tileBeingRendered->_subscribers.size();
+        if (subscriberCount > 0)
         {
             std::string response = tile.serialize("tile:");
-            Log::debug("Sending tile message to subscribers: " + response);
+            LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response);
 
-            std::shared_ptr<std::vector<char>> payload = std::make_shared<std::vector<char>>(256 + size);
-            auto& output = *payload;
-            output.resize(response.size() + 1 + size);
+            std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(256 + size, MessagePayload::Type::Binary);
+            {
+                auto& output = payload->data();
+                output.resize(response.size() + 1 + size);
 
-            // Send to first subscriber as-is (without cache marker).
-            std::memcpy(output.data(), response.data(), response.size());
-            output[response.size()] = '\n';
-            std::memcpy(output.data() + response.size() + 1, data, size);
+                // Send to first subscriber as-is (without cache marker).
+                std::memcpy(output.data(), response.data(), response.size());
+                output[response.size()] = '\n';
+                std::memcpy(output.data() + response.size() + 1, data, size);
+            }
 
             auto& firstSubscriber = tileBeingRendered->_subscribers[0];
-            enqueueTask([firstSubscriber, payload]()
-                {
-                    auto session = firstSubscriber.lock();
-                    if (session)
-                    {
-                        try
-                        {
-                            session->sendBinaryFrame(payload->data(), payload->size());
-                        }
-                        catch (const std::exception& ex)
-                        {
-                            LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
-                        }
-                    }
-                }
-            );
-
-            // All others must get served from the cache.
-            response += " renderid=cached\n";
-            output.resize(response.size() + size);
-            std::memcpy(output.data(), response.data(), response.size());
-            std::memcpy(output.data() + response.size(), data, size);
+            SenderQueue::instance().enqueue(firstSubscriber, payload);
+            DispatchSendItem();
 
-            for (size_t i = 1; i < tileBeingRendered->_subscribers.size(); ++i)
+            if (subscriberCount > 1)
             {
-                auto& subscriber = tileBeingRendered->_subscribers[i];
-                enqueueTask([subscriber, payload]()
-                    {
-                        auto session = subscriber.lock();
-                        if (session)
-                        {
-                            try
-                            {
-                                session->sendBinaryFrame(payload->data(), payload->size());
-                            }
-                            catch (const std::exception& ex)
-                            {
-                                LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
-                            }
-                        }
-                    }
-                );
+                // Create a new Payload.
+                payload.reset();
+                payload = std::make_shared<MessagePayload>(256 + size, MessagePayload::Type::Binary);
+                auto& output = payload->data();
+
+                // All others must get served from the cache.
+                response += " renderid=cached\n";
+                output.resize(response.size() + size);
+                std::memcpy(output.data(), response.data(), response.size());
+                std::memcpy(output.data() + response.size(), data, size);
+
+                for (size_t i = 1; i < subscriberCount; ++i)
+                {
+                    auto& subscriber = tileBeingRendered->_subscribers[i];
+                    SenderQueue::instance().enqueue(subscriber, payload);
+                    DispatchSendItem();
+                }
             }
         }
         else


More information about the Libreoffice-commits mailing list