[Libreoffice-commits] online.git: 2 commits - loolwsd/LOOLKit.cpp loolwsd/MessageQueue.cpp loolwsd/MessageQueue.hpp

Jan Holesovsky kendy at collabora.com
Mon Sep 26 02:47:16 UTC 2016


 loolwsd/LOOLKit.cpp      |  163 ++++++++++++++---------------------------------
 loolwsd/MessageQueue.cpp |   87 +++++++++----------------
 loolwsd/MessageQueue.hpp |    6 -
 3 files changed, 82 insertions(+), 174 deletions(-)

New commits:
commit c97ebfc585f213cb897b7c7e4a948cb1332d6169
Author: Jan Holesovsky <kendy at collabora.com>
Date:   Fri Sep 23 10:18:54 2016 +0200

    Prioritize the tile requests when extracting from the TileQueue.
    
    When putting tiles into the queue, do only the de-duplication,
    reprioritization is better at the get() time - no need to keep shuffling
    the priorities according to the cursor moves etc. all the time.
    
    The tile combination then does the rest of the work for us :-)
    
    Change-Id: I100c433dd3b24228d1ca8e4c89891635db1115c1
    Reviewed-on: https://gerrit.libreoffice.org/29283
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/MessageQueue.cpp b/loolwsd/MessageQueue.cpp
index 459098f..2f46e10 100644
--- a/loolwsd/MessageQueue.cpp
+++ b/loolwsd/MessageQueue.cpp
@@ -135,8 +135,15 @@ void TileQueue::put_impl(const Payload& value)
         return;
     }
 
+    // TODO because we are doing tile combining ourselves in the get_impl(),
+    // we could split the "tilecombine" messages into separate tiles here;
+    // could lead to some improvements in case we are getting subsequent
+    // tilecombines with overlapping, but not completely same areas.
+
     if (!_queue.empty())
     {
+        // TODO probably we could do the same with the invalidation callbacks
+        // (later one wins).
         if (msg.compare(0, 4, "tile") == 0 || msg.compare(0, 10, "tilecombine") == 0)
         {
             const auto newMsg = msg.substr(0, msg.find(" ver"));
@@ -146,65 +153,17 @@ void TileQueue::put_impl(const Payload& value)
                 auto& it = _queue[i];
                 const std::string old(it.data(), it.size());
                 const auto oldMsg = old.substr(0, old.find(" ver"));
-                Log::trace() << "TileQueue #" << i << ": " << old << Log::end;
                 if (newMsg == oldMsg)
                 {
-                    Log::debug() << "Replacing duplicate tile: " << old << " -> " << msg << Log::end;
-                    _queue[i] = value;
-
-                    if (priority(msg))
-                    {
-                        // Bump to top.
-                        Log::debug() << "And bumping tile to top: " << msg << Log::end;
-                        _queue.erase(_queue.begin() + i);
-                        _queue.push_front(value);
-                    }
-
-                    return;
+                    Log::debug() << "Remove duplicate message: " << old << " -> " << msg << Log::end;
+                    _queue.erase(_queue.begin() + i);
+                    break;
                 }
             }
         }
     }
 
-    if (priority(msg))
-    {
-        Log::debug() << "Priority tile [" << msg << "]" << Log::end;
-        _queue.push_front(value);
-    }
-    else
-    {
-        BasicTileQueue::put_impl(value);
-    }
-}
-
-/// Bring the underlying tile (if any) to the top.
-/// There should be only one overlapping tile at most.
-void TileQueue::reprioritize(const CursorPosition& cursorPosition)
-{
-    for (size_t i = 0; i < _queue.size(); ++i)
-    {
-        auto it = _queue[i];
-        const std::string msg(it.data(), it.size());
-        if (msg.compare(0, 5, "tile ") != 0)
-        {
-            continue;
-        }
-
-        auto tile = TileDesc::parse(msg); //FIXME: Expensive, avoid.
-
-        if (tile.intersectsWithRect(cursorPosition.X, cursorPosition.Y, cursorPosition.Width, cursorPosition.Height))
-        {
-            if (i != 0)
-            {
-                // Bump to top.
-                Log::trace() << "Bumping tile to top: " << msg << Log::end;
-                _queue.erase(_queue.begin() + i);
-                _queue.push_front(it);
-            }
-
-            return;
-        }
-    }
+    BasicTileQueue::put_impl(value);
 }
 
 bool TileQueue::priority(const std::string& tileMsg)
@@ -231,7 +190,6 @@ MessageQueue::Payload TileQueue::get_impl()
 {
     std::vector<TileDesc> tiles;
     const auto front = _queue.front();
-    _queue.pop_front();
 
     auto msg = std::string(front.data(), front.size());
     Log::trace() << "MessageQueue Get, Size: " << _queue.size() << ", Front: " << msg << Log::end;
@@ -240,9 +198,30 @@ MessageQueue::Payload TileQueue::get_impl()
     {
         // Don't combine non-tiles or tiles with id.
         Log::trace() << "MessageQueue res: " << msg << Log::end;
+        _queue.pop_front();
         return front;
     }
 
+    // We are handling a tile; first try to find one that is at the cursor's
+    // position, otherwise handle the one that is at the front
+    bool foundPrioritized = false;
+    for (size_t i = 0; i < _queue.size(); ++i)
+    {
+        auto& it = _queue[i];
+        const std::string prio(it.data(), it.size());
+        if (priority(prio))
+        {
+            Log::debug() << "Handling a priority message: " << prio << Log::end;
+            _queue.erase(_queue.begin() + i);
+            msg = prio;
+            foundPrioritized = true;
+            break;
+        }
+    }
+
+    if (!foundPrioritized)
+        _queue.pop_front();
+
     tiles.emplace_back(TileDesc::parse(msg));
 
     // Combine as many tiles as possible with the top one.
@@ -263,7 +242,7 @@ MessageQueue::Payload TileQueue::get_impl()
             }
 
             auto tile2 = TileDesc::parse(msg);
-            Log::trace() << "combining?: " << msg << Log::end;
+            Log::trace() << "combining candidate: " << msg << Log::end;
 
             // Check if adjacent tiles.
             bool found = false;
diff --git a/loolwsd/MessageQueue.hpp b/loolwsd/MessageQueue.hpp
index 4d1efba..89d314e 100644
--- a/loolwsd/MessageQueue.hpp
+++ b/loolwsd/MessageQueue.hpp
@@ -112,8 +112,6 @@ public:
         {
             _cursorPositions[viewId] = cursorPosition;
         }
-
-        reprioritize(cursorPosition);
     }
 
     void removeCursorPosition(int viewId)
@@ -128,10 +126,6 @@ protected:
 
 private:
 
-    /// Bring the underlying tile (if any) to the top.
-    /// There should be only one overlapping tile at most.
-    void reprioritize(const CursorPosition& cursorPosition);
-
     /// Check if the given tile msg underlies a cursor.
     bool priority(const std::string& tileMsg);
 
commit 61b0ad4933550289017a7f59b436299ff01155af
Author: Jan Holesovsky <kendy at collabora.com>
Date:   Thu Sep 22 21:59:12 2016 +0200

    Fix a race between the tile rendering and invalidation callbacks.
    
    Instead of 2 queues and 2 threads, merge those to one - which gives a perfect
    ordering of the invalidations and rendering.
    
    Change-Id: I229dfc08b43e6ce7e4f08ea8059d3298d9bf8f8a
    Reviewed-on: https://gerrit.libreoffice.org/29282
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 7c9f451..5633536 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -368,26 +368,6 @@ private:
     std::atomic<bool> _joined;
 };
 
-/// Worker callback notification object.
-/// Used to pass callback data to the worker
-/// thread to invoke sessions with the data.
-class CallbackNotification : public Notification
-{
-public:
-    typedef AutoPtr<CallbackNotification> Ptr;
-
-    CallbackNotification(const std::shared_ptr<ChildSession>& session, const int nType, const std::string& rPayload)
-      : _session(session),
-        _nType(nType),
-        _aPayload(rPayload)
-    {
-    }
-
-    const std::shared_ptr<ChildSession> _session;
-    const int _nType;
-    const std::string _aPayload;
-};
-
 /// A document container.
 /// Owns LOKitDocument instance and connections.
 /// Manages the lifetime of a document.
@@ -430,7 +410,6 @@ public:
         _docPasswordType(PasswordType::ToView),
         _stop(false),
         _isLoading(0),
-        _tilesThread(tilesThread, this),
         _clientViews(0)
     {
         Log::info("Document ctor for url [" + _url + "] on child [" + _jailId + "].");
@@ -446,11 +425,9 @@ public:
 
         // Wait for the callback worker to finish.
         _stop = true;
-        _callbackQueue.wakeUpAll();
-        _callbackThread.join();
 
         _tileQueue->put("eof");
-        _tilesThread.join();
+        _callbackThread.join();
 
         // Flag all connections to stop.
         for (auto aIterator : _connections)
@@ -875,38 +852,7 @@ private:
             }
         }
 
-        // Forward to the same view only.
-        // Demultiplexing is done by Core.
-        // TODO: replace with a map to be faster.
-        bool isFound = false;
-        for (auto& it : pDescr->Doc->_connections)
-        {
-            auto session = it.second->getSession();
-            if (session && session->getViewId() == pDescr->ViewId)
-            {
-                if (it.second->isRunning())
-                {
-                    isFound = true;
-                    auto pNotif = new CallbackNotification(session, nType, payload);
-                    pDescr->Doc->_callbackQueue.enqueueNotification(pNotif);
-                }
-                else
-                {
-                    Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view "
-                                 << pDescr->ViewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(nType)
-                                 << "] payload [" << payload << "]." << Log::end;
-                }
-
-                break;
-            }
-        }
-
-        if (!isFound)
-        {
-            Log::warn() << "Document::ViewCallback. The message [" << pDescr->ViewId
-                        << "] [" << LOKitHelper::kitCallbackTypeToString(nType)
-                        << "] [" << payload << "] is not sent to Master Session." << Log::end;
-        }
+        pDescr->Doc->_tileQueue->put("callback " + std::to_string(pDescr->ViewId) + " " + std::to_string(nType) + " " + payload);
     }
 
     static void DocumentCallback(const int nType, const char* pPayload, void* pData)
@@ -924,18 +870,8 @@ private:
     {
         std::unique_lock<std::mutex> lock(_mutex);
 
-        for (auto& it: _connections)
-        {
-            if (it.second->isRunning())
-            {
-                auto session = it.second->getSession();
-                if (session)
-                {
-                    auto pNotif = new CallbackNotification(session, nType, payload);
-                    _callbackQueue.enqueueNotification(pNotif);
-                }
-            }
-        }
+        // "-1" means broadcast
+        _tileQueue->put("callback -1 " + std::to_string(nType) + " " + payload);
     }
 
     /// Load a document (or view) and register callbacks.
@@ -1230,55 +1166,17 @@ private:
 
     void run() override
     {
-        Util::setThreadName("kit_callback");
-
-        Log::debug("Thread started.");
-
-        while (!_stop && !TerminationFlag)
-        {
-            Notification::Ptr aNotification(_callbackQueue.waitDequeueNotification());
-            if (!_stop && !TerminationFlag && aNotification)
-            {
-                CallbackNotification::Ptr aCallbackNotification = aNotification.cast<CallbackNotification>();
-                assert(aCallbackNotification);
-
-                const auto nType = aCallbackNotification->_nType;
-                try
-                {
-                    aCallbackNotification->_session->loKitCallback(nType, aCallbackNotification->_aPayload);
-                }
-                catch (const Exception& exc)
-                {
-                    Log::error() << "CallbackWorker::run: Exception while handling callback [" << LOKitHelper::kitCallbackTypeToString(nType) << "]: "
-                                 << exc.displayText()
-                                 << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
-                                 << Log::end;
-                }
-                catch (const std::exception& exc)
-                {
-                    Log::error("CallbackWorker::run: Exception while handling callback [" + LOKitHelper::kitCallbackTypeToString(nType) + "]: " + exc.what());
-                }
-            }
-            else
-                break;
-        }
-
-        Log::debug("Thread finished.");
-    }
-
-    static void tilesThread(Document* pThis)
-    {
-        Util::setThreadName("tile_renderer");
+        Util::setThreadName("lok_handler");
 
         Log::debug("Thread started.");
 
         try
         {
-            while (!pThis->_stop)
+            while (!_stop && !TerminationFlag)
             {
-                const auto input = pThis->_tileQueue->get();
+                const auto input = _tileQueue->get();
                 const std::string message(input.data(), input.size());
-                StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+                StringTokenizer tokens(message, " ");
 
                 if (tokens[0] == "eof")
                 {
@@ -1288,11 +1186,50 @@ private:
 
                 if (tokens[0] == "tile")
                 {
-                    pThis->renderTile(tokens, pThis->_ws);
+                    renderTile(tokens, _ws);
                 }
                 else if (tokens[0] == "tilecombine")
                 {
-                    pThis->renderCombinedTiles(tokens, pThis->_ws);
+                    renderCombinedTiles(tokens, _ws);
+                }
+                else if (tokens[0] == "callback")
+                {
+                    int viewId = std::stoi(tokens[1]); // -1 means broadcast
+                    int type = std::stoi(tokens[2]);
+
+                    // payload is the rest of the message
+                    std::string payload(message.substr(tokens[0].length() + tokens[1].length() + tokens[2].length() + 3));
+
+                    // Forward the callback to the same view, demultiplexing is done by the LibreOffice core.
+                    // TODO: replace with a map to be faster.
+                    bool isFound = false;
+                    for (auto& it : _connections)
+                    {
+                        auto session = it.second->getSession();
+                        if (session && ((session->getViewId() == viewId) || (viewId == -1)))
+                        {
+                            if (it.second->isRunning())
+                            {
+                                isFound = true;
+                                session->loKitCallback(type, payload);
+                            }
+                            else
+                            {
+                                Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view "
+                                             << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type)
+                                             << "] payload [" << payload << "]." << Log::end;
+                            }
+
+                            break;
+                        }
+                    }
+
+                    if (!isFound)
+                    {
+                        Log::warn() << "Document::ViewCallback. The message [" << viewId
+                                    << "] [" << LOKitHelper::kitCallbackTypeToString(type)
+                                    << "] [" << payload << "] is not sent to Master Session." << Log::end;
+                    }
                 }
                 else
                 {
@@ -1336,8 +1273,6 @@ private:
     std::map<int, std::unique_ptr<CallbackDescriptor>> _viewIdToCallbackDescr;
     std::map<unsigned, std::shared_ptr<Connection>> _connections;
     Poco::Thread _callbackThread;
-    std::thread _tilesThread;
-    Poco::NotificationQueue _callbackQueue;
     std::atomic_size_t _clientViews;
 };
 


More information about the Libreoffice-commits mailing list