[Libreoffice-commits] online.git: kit/Kit.cpp wsd/LOOLWSD.cpp wsd/TileDesc.hpp

Libreoffice Gerrit user logerrit at kemper.freedesktop.org
Thu May 2 21:45:25 UTC 2019


 kit/Kit.cpp      |  275 +++++++++++++++++++++++++++++++++++++++++++++----------
 wsd/LOOLWSD.cpp  |    1 
 wsd/TileDesc.hpp |   18 ++-
 3 files changed, 239 insertions(+), 55 deletions(-)

New commits:
commit 8172885f74bef3e4d90e41a54a596add5b292b6a
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Sat Apr 20 01:53:12 2019 +0100
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Thu May 2 22:36:52 2019 +0100

    PNG compression a bottleneck: thread it to accelerate things.
    
    Also have a separate hash <-> wid cache to avoid re-rendering
    older tiles as/when we see them.
    
    Change-Id: I238fe6701a1d1cb486473c67faba8c56e9c98dcb

diff --git a/kit/Kit.cpp b/kit/Kit.cpp
index 25050e04b..c6ab8fd4a 100644
--- a/kit/Kit.cpp
+++ b/kit/Kit.cpp
@@ -435,13 +435,15 @@ private:
     size_t _cacheSize;
     static const size_t CacheSizeSoftLimit = (1024 * 4 * 32); // 128k of cache
     static const size_t CacheSizeHardLimit = CacheSizeSoftLimit * 2;
+    static const size_t CacheWidHardLimit = 4096;
     size_t _cacheHits;
     size_t _cacheTests;
     TileWireId _nextId;
     DeltaGenerator _deltaGen;
 
-    std::map< TileBinaryHash, CacheEntry > _cache;
-    std::map< TileWireId, TileBinaryHash > _wireToHash;
+    std::unordered_map< TileBinaryHash, CacheEntry > _cache;
+    // This uses little storage so can be much larger
+    std::unordered_map< TileBinaryHash, TileWireId > _hashToWireId;
 
     void clearCache(bool logStats = false)
     {
@@ -449,6 +451,7 @@ private:
             LOG_DBG("cache clear " << _cache.size() << " items total size " <<
                     _cacheSize << " current hits " << _cacheHits);
         _cache.clear();
+        _hashToWireId.clear();
         _cacheSize = 0;
         _cacheHits = 0;
         _cacheTests = 0;
@@ -465,6 +468,8 @@ private:
         return id;
     }
 
+public:
+    // Performed only after a complete combinetiles
     void balanceCache()
     {
         // A normalish PNG image size for text in a writer document is
@@ -489,10 +494,6 @@ private:
                     // the chance of hitting these entries in the future.
                     _cacheSize -= it->second.getData()->size();
 
-                    auto wIt = _wireToHash.find(it->second.getWireId());
-                    assert(wIt != _wireToHash.end());
-                    _wireToHash.erase(wIt);
-
                     it = _cache.erase(it);
                 }
                 else
@@ -506,9 +507,22 @@ private:
             LOG_DBG("PNG cache has " << _cache.size() << " items, total size " <<
                     _cacheSize << " after balance.");
         }
+
+        if (_hashToWireId.size() > CacheWidHardLimit)
+        {
+            LOG_DBG("Clear half of wid cache of size " << _hashToWireId.size());
+            TileWireId max = _nextId - CacheWidHardLimit/2;
+            for (auto it = _hashToWireId.begin(); it != _hashToWireId.end();)
+            {
+                if (it->second < max)
+                    it = _hashToWireId.erase(it);
+                else
+                    ++it;
+            }
+            LOG_DBG("Wid cache is now size " << _hashToWireId.size());
+        }
     }
 
-public:
     /// Lookup an entry in the cache and store the data in output.
     /// Returns true on success, otherwise false.
     bool copyFromCache(const TileBinaryHash hash, std::vector<char>& output, size_t &imgSize)
@@ -540,10 +554,13 @@ public:
 
         if (hash)
         {
+            // Adding duplicates causes grim wid mixups
+            assert(hashToWireId(hash) == wid);
+            assert(_cache.find(hash) == _cache.end());
+
             data->shrink_to_fit();
             _cache.emplace(hash, newEntry);
             _cacheSize += data->size();
-            balanceCache();
         }
     }
 
@@ -552,18 +569,18 @@ public:
         clearCache();
     }
 
-    TileWireId hashToWireId(TileBinaryHash id)
+    TileWireId hashToWireId(TileBinaryHash hash)
     {
         TileWireId wid;
-        if (id == 0)
+        if (hash == 0)
             return 0;
-        auto it = _cache.find(id);
-        if (it != _cache.end())
-            wid = it->second.getWireId();
+        auto it = _hashToWireId.find(hash);
+        if (it != _hashToWireId.end())
+            wid = it->second;
         else
         {
             wid = createNewWireId();
-            _wireToHash.emplace(wid, id);
+            _hashToWireId.emplace(hash, wid);
         }
         return wid;
     }
@@ -737,6 +754,103 @@ private:
 static FILE* ProcSMapsFile = nullptr;
 #endif
 
+class ThreadPool {
+    std::mutex _mutex;
+    std::condition_variable _cond;
+    std::condition_variable _complete;
+    typedef std::function<void()> ThreadFn;
+    std::queue<ThreadFn> _work;
+    std::vector<std::thread> _threads;
+    size_t _working;
+    bool   _shutdown;
+public:
+    ThreadPool()
+        : _working(0),
+          _shutdown(false)
+    {
+        int maxConcurrency = 2;
+#if MOBILEAPP
+#  warning "Good defaults ? - 2 for iOS, 4 for Android ?"
+#else
+        const char *max = getenv("MAX_CONCURRENCY");
+        if (max)
+            maxConcurrency = atoi(max);
+#endif
+        LOG_TRC("PNG compression thread pool size " << maxConcurrency);
+        for (int i = 1; i < maxConcurrency; ++i)
+            _threads.push_back(std::thread(&ThreadPool::work, this));
+    }
+    ~ThreadPool()
+    {
+        {
+            std::unique_lock< std::mutex > lock(_mutex);
+            assert(_working == 0);
+            _shutdown = true;
+        }
+        _cond.notify_all();
+        for (auto &it : _threads)
+            it.join();
+    }
+
+    size_t count() const
+    {
+        return _work.size();
+    }
+
+    void pushWorkUnlocked(const ThreadFn &fn)
+    {
+        _work.push(fn);
+    }
+
+    void runOne(std::unique_lock< std::mutex >& lock)
+    {
+        assert(!_work.empty());
+
+        ThreadFn fn = _work.front();
+        _work.pop();
+        _working++;
+        lock.unlock();
+
+        fn();
+
+        lock.lock();
+        _working--;
+        if (_work.empty() && _working == 0)
+            _complete.notify_all();
+    }
+
+    void run()
+    {
+        std::unique_lock< std::mutex > lock(_mutex);
+        assert(_working == 0);
+
+        // Avoid notifying threads if we don't need to.
+        bool useThreads = _threads.size() > 1 && _work.size() > 1;
+        if (useThreads)
+            _cond.notify_all();
+
+        while(!_work.empty())
+            runOne(lock);
+
+        if (useThreads && (_working > 0 || !_work.empty()))
+            _complete.wait(lock, [this]() { return _working == 0 && _work.empty(); } );
+
+        assert(_working==0);
+        assert(_work.empty());
+    }
+
+    void work()
+    {
+        std::unique_lock< std::mutex > lock(_mutex);
+        while (!_shutdown)
+        {
+            _cond.wait(lock);
+            if (!_shutdown && !_work.empty())
+                runOne(lock);
+        }
+    }
+};
+
 /// A document container.
 /// Owns LOKitDocument instance and connections.
 /// Manages the lifetime of a document.
@@ -948,6 +1062,14 @@ public:
         renderTiles(tileCombined, true);
     }
 
+    static void pushRendered(std::vector<TileDesc> &renderedTiles,
+                             const TileDesc &desc, TileWireId wireId, size_t imgSize)
+    {
+        renderedTiles.push_back(desc);
+        renderedTiles.back().setWireId(wireId);
+        renderedTiles.back().setImgSize(imgSize);
+    }
+
     void renderTiles(TileCombined &tileCombined, bool combined)
     {
         auto& tiles = tileCombined.getTiles();
@@ -1016,6 +1138,11 @@ public:
         const int pixelWidth = tileCombined.getWidth();
         const int pixelHeight = tileCombined.getHeight();
 
+        std::vector<TileDesc> renderedTiles;
+        std::vector<TileDesc> duplicateTiles;
+        std::vector<TileBinaryHash> duplicateHashes;
+        std::vector<TileWireId> renderingIds;
+
         size_t tileIndex = 0;
         for (Util::Rectangle& tileRect : tileRecs)
         {
@@ -1035,61 +1162,109 @@ public:
                 // The tile content is identical to what the client already has, so skip it
                 LOG_TRC("Match for tile #" << tileIndex << " at (" << positionX << "," <<
                         positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping");
-                tiles.erase(tiles.begin() + tileIndex);
+                tileIndex++;
                 continue;
             }
 
-            size_t imgSize;
-
-            if (!_pngCache.copyFromCache(hash, output, imgSize))
+            bool skipCompress = false;
+            size_t imgSize = -1;
+            if (_pngCache.copyFromCache(hash, output, imgSize))
+            {
+                pushRendered(renderedTiles, tiles[tileIndex], wireId, imgSize);
+                skipCompress = true;
+            }
+            else
             {
                 LOG_DBG("PNG cache with hash " << hash << " missed.");
 
+                // Don't re-compress the same thing multiple times.
+                for (auto id : renderingIds)
+                {
+                    if (wireId == id)
+                    {
+                        pushRendered(duplicateTiles, tiles[tileIndex], wireId, 0);
+                        duplicateHashes.push_back(hash);
+                        skipCompress = true;
+                        LOG_TRC("Rendering duplicate tile #" << tileIndex << " at (" << positionX << "," <<
+                                positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping");
+                        break;
+                    }
+                }
+            }
+
+            if (!skipCompress)
+            {
+                renderingIds.push_back(wireId);
                 if (_docWatermark)
                     _docWatermark->blending(pixmap.data(), offsetX, offsetY,
                                             pixmapWidth, pixmapHeight,
                                             pixelWidth, pixelHeight,
                                             mode);
 
-                PngCache::CacheData data(new std::vector< char >() );
-                data->reserve(pixmapWidth * pixmapHeight * 1);
-
-/*
- *Disable for now - pushed in error.
- *
-                if (_deltaGen.createDelta(pixmap, startX, startY, width, height,
-                                          bufferWidth, bufferHeight,
-                                          output, wid, oldWid))
-                else ...
-*/
-
-                LOG_DBG("Encode a new png for this tile.");
-                if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight,
-                                               pixmapWidth, pixmapHeight, *data, mode))
-                {
-                    // FIXME: Return error.
-                    // sendTextFrame("error: cmd=tile kind=failure");
-                    LOG_ERR("Failed to encode tile into PNG.");
-                    return;
-                }
+                // Queue to be executed later in parallel inside 'run'
+                _pngPool.pushWorkUnlocked([=,&output,&pixmap,&tiles,&renderedTiles](){
+                        PngCache::CacheData data(new std::vector< char >() );
+                        data->reserve(pixmapWidth * pixmapHeight * 1);
+
+                        /*
+                         * Disable for now - pushed in error.
+                         *
+                         if (_deltaGen.createDelta(pixmap, startX, startY, width, height,
+                                                   bufferWidth, bufferHeight,
+                                                   output, wid, oldWid))
+                         else ...
+                        */
+
+                        LOG_DBG("Encode a new png for tile #" << tileIndex);
+                        if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight,
+                                                       pixmapWidth, pixmapHeight, *data, mode))
+                        {
+                            // FIXME: Return error.
+                            // sendTextFrame("error: cmd=tile kind=failure");
+                            LOG_ERR("Failed to encode tile into PNG.");
+                            return;
+                        }
 
-                output.insert(output.end(), data->begin(), data->end());
-                imgSize = data->size();
-                _pngCache.addToCache(data, wireId, hash);
+                        LOG_DBG("Tile " << tileIndex << " is " << data->size() << " bytes.");
+                        std::unique_lock<std::mutex> pngLock(_pngMutex);
+                        output.insert(output.end(), data->begin(), data->end());
+                        _pngCache.addToCache(data, wireId, hash);
+                        pushRendered(renderedTiles, tiles[tileIndex], wireId, data->size());
+                    });
             }
-
             LOG_TRC("Encoded tile #" << tileIndex << " at (" << positionX << "," << positionY << ") with oldWireId=" <<
                     tiles[tileIndex].getOldWireId() << ", hash=" << hash << " wireId: " << wireId << " in " << imgSize << " bytes.");
-            if (imgSize == 0)
+            tileIndex++;
+        }
+
+        _pngPool.run();
+
+        for (auto &i : renderedTiles)
+        {
+            if (i.getImgSize() == 0)
             {
                 LOG_ERR("Encoded 0-sized tile!");
                 assert(!"0-sized tile enocded!");
             }
-            tiles[tileIndex].setWireId(wireId);
-            tiles[tileIndex].setImgSize(imgSize);
-            tileIndex++;
         }
 
+        // FIXME: append duplicates - tragically for now as real duplicates
+        // we should append these as
+        {
+            size_t imgSize = -1;
+            assert(duplicateTiles.size() == duplicateHashes.size());
+            for (size_t i = 0; i < duplicateTiles.size(); ++i)
+            {
+                if (_pngCache.copyFromCache(duplicateHashes[i], output, imgSize))
+                    pushRendered(renderedTiles, duplicateTiles[i],
+                                 duplicateTiles[i].getWireId(), imgSize);
+                else
+                    LOG_ERR("Horror - tile disappeared while rendering! " << duplicateHashes[i]);
+            }
+        }
+
+        _pngCache.balanceCache();
+
         elapsed = timestamp.elapsed();
         LOG_DBG("renderCombinedTiles at (" << renderArea.getLeft() << ", " << renderArea.getTop() << "), (" <<
                 renderArea.getWidth() << ", " << renderArea.getHeight() << ") " <<
@@ -1103,7 +1278,7 @@ public:
 
         std::string tileMsg;
         if (combined)
-            tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID);
+            tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID, renderedTiles);
         else
             tileMsg = tiles[0].serialize("tile:", ADD_DEBUG_RENDERID);
 
@@ -1996,6 +2171,8 @@ private:
     std::shared_ptr<TileQueue> _tileQueue;
     SocketPoll& _socketPoll;
     std::shared_ptr<WebSocketHandler> _websocketHandler;
+
+    std::mutex _pngMutex;
     PngCache _pngCache;
 
     // Document password provided
@@ -2017,6 +2194,8 @@ private:
     /// like setting a view followed by a tile render, etc.
     std::mutex _documentMutex;
 
+    ThreadPool _pngPool;
+
     std::condition_variable _cvLoading;
     std::atomic_size_t _isLoading;
     int _editorId;
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 57ddfd705..116e7c560 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -89,7 +89,6 @@ using Poco::Net::PartHandler;
 #include <Poco/StreamCopier.h>
 #include <Poco/StringTokenizer.h>
 #include <Poco/TemporaryFile.h>
-#include <Poco/ThreadPool.h>
 #include <Poco/URI.h>
 #include <Poco/Util/AbstractConfiguration.h>
 #include <Poco/Util/HelpFormatter.h>
diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp
index 77fb46b26..d30b718da 100644
--- a/wsd/TileDesc.hpp
+++ b/wsd/TileDesc.hpp
@@ -357,27 +357,33 @@ public:
     std::string serialize(const std::string& prefix = std::string(),
                           const std::string& suffix = std::string()) const
     {
+        return serialize(prefix, suffix, _tiles);
+    }
+
+    std::string serialize(const std::string& prefix, const std::string &suffix,
+                          const std::vector<TileDesc> &tiles) const
+    {
         std::ostringstream oss;
         oss << prefix
             << " part=" << _part
             << " width=" << _width
             << " height=" << _height
             << " tileposx=";
-        for (const auto& tile : _tiles)
+        for (const auto& tile : tiles)
         {
             oss << tile.getTilePosX() << ',';
         }
         oss.seekp(-1, std::ios_base::cur); // Seek back over last comma, overwritten below.
 
         oss << " tileposy=";
-        for (const auto& tile : _tiles)
+        for (const auto& tile : tiles)
         {
             oss << tile.getTilePosY() << ',';
         }
         oss.seekp(-1, std::ios_base::cur); // Ditto.
 
         oss << " imgsize=";
-        for (const auto& tile : _tiles)
+        for (const auto& tile : tiles)
         {
             oss << tile.getImgSize() << ','; // Ditto.
         }
@@ -387,14 +393,14 @@ public:
             << " tileheight=" << _tileHeight;
 
         oss << " ver=";
-        for (const auto& tile : _tiles)
+        for (const auto& tile : tiles)
         {
             oss << tile.getVersion() << ',';
         }
         oss.seekp(-1, std::ios_base::cur); // Ditto.
 
         oss << " oldwid=";
-        for (const auto& tile : _tiles)
+        for (const auto& tile : tiles)
         {
             oss << tile.getOldWireId() << ',';
         }
@@ -403,7 +409,7 @@ public:
         oss << " wid=";
 
         bool comma = false;
-        for (const auto& tile : _tiles)
+        for (const auto& tile : tiles)
         {
             if (comma)
                 oss << ',';


More information about the Libreoffice-commits mailing list