[Libreoffice-commits] online.git: Branch 'distro/collabora/collabora-online-4' - kit/Kit.cpp wsd/LOOLWSD.cpp wsd/TileDesc.hpp
Libreoffice Gerrit user
logerrit at kemper.freedesktop.org
Fri May 3 13:44:29 UTC 2019
kit/Kit.cpp | 276 +++++++++++++++++++++++++++++++++++++++++++++----------
wsd/LOOLWSD.cpp | 1
wsd/TileDesc.hpp | 18 ++-
3 files changed, 242 insertions(+), 53 deletions(-)
New commits:
commit 88130a9233089ab01fa52f34a13e2825c7498023
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Sat Apr 20 01:53:12 2019 +0100
Commit: Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri May 3 15:44:11 2019 +0200
PNG compression a bottleneck: thread it to accelerate things.
Also have a separate hash <-> wid cache to avoid re-rendering
older tiles as/when we see them.
Change-Id: I238fe6701a1d1cb486473c67faba8c56e9c98dcb
Reviewed-on: https://gerrit.libreoffice.org/71638
Reviewed-by: Jan Holesovsky <kendy at collabora.com>
Tested-by: Jan Holesovsky <kendy at collabora.com>
diff --git a/kit/Kit.cpp b/kit/Kit.cpp
index 2c56687c0..c61c99251 100644
--- a/kit/Kit.cpp
+++ b/kit/Kit.cpp
@@ -435,13 +435,15 @@ private:
size_t _cacheSize;
static const size_t CacheSizeSoftLimit = (1024 * 4 * 32); // 128k of cache
static const size_t CacheSizeHardLimit = CacheSizeSoftLimit * 2;
+ static const size_t CacheWidHardLimit = 4096;
size_t _cacheHits;
size_t _cacheTests;
TileWireId _nextId;
DeltaGenerator _deltaGen;
- std::map< TileBinaryHash, CacheEntry > _cache;
- std::map< TileWireId, TileBinaryHash > _wireToHash;
+ std::unordered_map< TileBinaryHash, CacheEntry > _cache;
+ // This uses little storage so can be much larger
+ std::unordered_map< TileBinaryHash, TileWireId > _hashToWireId;
void clearCache(bool logStats = false)
{
@@ -449,6 +451,7 @@ private:
LOG_DBG("cache clear " << _cache.size() << " items total size " <<
_cacheSize << " current hits " << _cacheHits);
_cache.clear();
+ _hashToWireId.clear();
_cacheSize = 0;
_cacheHits = 0;
_cacheTests = 0;
@@ -465,6 +468,8 @@ private:
return id;
}
+public:
+ // Performed only after a complete combinetiles
void balanceCache()
{
// A normalish PNG image size for text in a writer document is
@@ -489,10 +494,6 @@ private:
// the chance of hitting these entries in the future.
_cacheSize -= it->second.getData()->size();
- auto wIt = _wireToHash.find(it->second.getWireId());
- assert(wIt != _wireToHash.end());
- _wireToHash.erase(wIt);
-
it = _cache.erase(it);
}
else
@@ -506,9 +507,22 @@ private:
LOG_DBG("PNG cache has " << _cache.size() << " items, total size " <<
_cacheSize << " after balance.");
}
+
+ if (_hashToWireId.size() > CacheWidHardLimit)
+ {
+ LOG_DBG("Clear half of wid cache of size " << _hashToWireId.size());
+ TileWireId max = _nextId - CacheWidHardLimit/2;
+ for (auto it = _hashToWireId.begin(); it != _hashToWireId.end();)
+ {
+ if (it->second < max)
+ it = _hashToWireId.erase(it);
+ else
+ ++it;
+ }
+ LOG_DBG("Wid cache is now size " << _hashToWireId.size());
+ }
}
-public:
/// Lookup an entry in the cache and store the data in output.
/// Returns true on success, otherwise false.
bool copyFromCache(const TileBinaryHash hash, std::vector<char>& output, size_t &imgSize)
@@ -540,10 +554,13 @@ public:
if (hash)
{
+ // Adding duplicates causes grim wid mixups
+ assert(hashToWireId(hash) == wid);
+ assert(_cache.find(hash) == _cache.end());
+
data->shrink_to_fit();
_cache.emplace(hash, newEntry);
_cacheSize += data->size();
- balanceCache();
}
}
@@ -552,18 +569,18 @@ public:
clearCache();
}
- TileWireId hashToWireId(TileBinaryHash id)
+ TileWireId hashToWireId(TileBinaryHash hash)
{
TileWireId wid;
- if (id == 0)
+ if (hash == 0)
return 0;
- auto it = _cache.find(id);
- if (it != _cache.end())
- wid = it->second.getWireId();
+ auto it = _hashToWireId.find(hash);
+ if (it != _hashToWireId.end())
+ wid = it->second;
else
{
wid = createNewWireId();
- _wireToHash.emplace(wid, id);
+ _hashToWireId.emplace(hash, wid);
}
return wid;
}
@@ -737,6 +754,103 @@ private:
static FILE* ProcSMapsFile = nullptr;
#endif
+class ThreadPool {
+ std::mutex _mutex;
+ std::condition_variable _cond;
+ std::condition_variable _complete;
+ typedef std::function<void()> ThreadFn;
+ std::queue<ThreadFn> _work;
+ std::vector<std::thread> _threads;
+ size_t _working;
+ bool _shutdown;
+public:
+ ThreadPool()
+ : _working(0),
+ _shutdown(false)
+ {
+ int maxConcurrency = 2;
+#if MOBILEAPP
+# warning "Good defaults ? - 2 for iOS, 4 for Android ?"
+#else
+ const char *max = getenv("MAX_CONCURRENCY");
+ if (max)
+ maxConcurrency = atoi(max);
+#endif
+ LOG_TRC("PNG compression thread pool size " << maxConcurrency);
+ for (int i = 1; i < maxConcurrency; ++i)
+ _threads.push_back(std::thread(&ThreadPool::work, this));
+ }
+ ~ThreadPool()
+ {
+ {
+ std::unique_lock< std::mutex > lock(_mutex);
+ assert(_working == 0);
+ _shutdown = true;
+ }
+ _cond.notify_all();
+ for (auto &it : _threads)
+ it.join();
+ }
+
+ size_t count() const
+ {
+ return _work.size();
+ }
+
+ void pushWorkUnlocked(const ThreadFn &fn)
+ {
+ _work.push(fn);
+ }
+
+ void runOne(std::unique_lock< std::mutex >& lock)
+ {
+ assert(!_work.empty());
+
+ ThreadFn fn = _work.front();
+ _work.pop();
+ _working++;
+ lock.unlock();
+
+ fn();
+
+ lock.lock();
+ _working--;
+ if (_work.empty() && _working == 0)
+ _complete.notify_all();
+ }
+
+ void run()
+ {
+ std::unique_lock< std::mutex > lock(_mutex);
+ assert(_working == 0);
+
+ // Avoid notifying threads if we don't need to.
+ bool useThreads = _threads.size() > 1 && _work.size() > 1;
+ if (useThreads)
+ _cond.notify_all();
+
+ while(!_work.empty())
+ runOne(lock);
+
+ if (useThreads && (_working > 0 || !_work.empty()))
+ _complete.wait(lock, [this]() { return _working == 0 && _work.empty(); } );
+
+ assert(_working==0);
+ assert(_work.empty());
+ }
+
+ void work()
+ {
+ std::unique_lock< std::mutex > lock(_mutex);
+ while (!_shutdown)
+ {
+ _cond.wait(lock);
+ if (!_shutdown && !_work.empty())
+ runOne(lock);
+ }
+ }
+};
+
/// A document container.
/// Owns LOKitDocument instance and connections.
/// Manages the lifetime of a document.
@@ -946,6 +1060,14 @@ public:
renderTiles(tileCombined, true);
}
+ static void pushRendered(std::vector<TileDesc> &renderedTiles,
+ const TileDesc &desc, TileWireId wireId, size_t imgSize)
+ {
+ renderedTiles.push_back(desc);
+ renderedTiles.back().setWireId(wireId);
+ renderedTiles.back().setImgSize(imgSize);
+ }
+
void renderTiles(TileCombined &tileCombined, bool combined)
{
auto& tiles = tileCombined.getTiles();
@@ -1015,6 +1137,11 @@ public:
const int pixelWidth = tileCombined.getWidth();
const int pixelHeight = tileCombined.getHeight();
+ std::vector<TileDesc> renderedTiles;
+ std::vector<TileDesc> duplicateTiles;
+ std::vector<TileBinaryHash> duplicateHashes;
+ std::vector<TileWireId> renderingIds;
+
size_t tileIndex = 0;
for (Util::Rectangle& tileRect : tileRecs)
{
@@ -1034,56 +1161,109 @@ public:
// The tile content is identical to what the client already has, so skip it
LOG_TRC("Match for tile #" << tileIndex << " at (" << positionX << "," <<
positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping");
- tiles.erase(tiles.begin() + tileIndex);
+ tileIndex++;
continue;
}
- size_t imgSize;
-
- if (!_pngCache.copyFromCache(hash, output, imgSize))
+ bool skipCompress = false;
+ size_t imgSize = -1;
+ if (_pngCache.copyFromCache(hash, output, imgSize))
+ {
+ pushRendered(renderedTiles, tiles[tileIndex], wireId, imgSize);
+ skipCompress = true;
+ }
+ else
{
LOG_DBG("PNG cache with hash " << hash << " missed.");
+ // Don't re-compress the same thing multiple times.
+ for (auto id : renderingIds)
+ {
+ if (wireId == id)
+ {
+ pushRendered(duplicateTiles, tiles[tileIndex], wireId, 0);
+ duplicateHashes.push_back(hash);
+ skipCompress = true;
+ LOG_TRC("Rendering duplicate tile #" << tileIndex << " at (" << positionX << "," <<
+ positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping");
+ break;
+ }
+ }
+ }
+
+ if (!skipCompress)
+ {
+ renderingIds.push_back(wireId);
if (_docWatermark)
_docWatermark->blending(pixmap.data(), offsetX, offsetY,
pixmapWidth, pixmapHeight,
pixelWidth, pixelHeight,
mode);
- PngCache::CacheData data(new std::vector< char >() );
- data->reserve(pixmapWidth * pixmapHeight * 1);
-
-/*
- *Disable for now - pushed in error.
- *
- if (_deltaGen.createDelta(pixmap, startX, startY, width, height,
- bufferWidth, bufferHeight,
- output, wid, oldWid))
- else ...
-*/
-
- LOG_DBG("Encode a new png for this tile.");
- if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight,
- pixmapWidth, pixmapHeight, *data, mode))
- {
- // FIXME: Return error.
- // sendTextFrame("error: cmd=tile kind=failure");
- LOG_ERR("Failed to encode tile into PNG.");
- return;
- }
+ // Queue to be executed later in parallel inside 'run'
+ _pngPool.pushWorkUnlocked([=,&output,&pixmap,&tiles,&renderedTiles](){
+ PngCache::CacheData data(new std::vector< char >() );
+ data->reserve(pixmapWidth * pixmapHeight * 1);
+
+ /*
+ * Disable for now - pushed in error.
+ *
+ if (_deltaGen.createDelta(pixmap, startX, startY, width, height,
+ bufferWidth, bufferHeight,
+ output, wid, oldWid))
+ else ...
+ */
+
+ LOG_DBG("Encode a new png for tile #" << tileIndex);
+ if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight,
+ pixmapWidth, pixmapHeight, *data, mode))
+ {
+ // FIXME: Return error.
+ // sendTextFrame("error: cmd=tile kind=failure");
+ LOG_ERR("Failed to encode tile into PNG.");
+ return;
+ }
- output.insert(output.end(), data->begin(), data->end());
- imgSize = data->size();
- _pngCache.addToCache(data, wireId, hash);
+ LOG_DBG("Tile " << tileIndex << " is " << data->size() << " bytes.");
+ std::unique_lock<std::mutex> pngLock(_pngMutex);
+ output.insert(output.end(), data->begin(), data->end());
+ _pngCache.addToCache(data, wireId, hash);
+ pushRendered(renderedTiles, tiles[tileIndex], wireId, data->size());
+ });
}
-
LOG_TRC("Encoded tile #" << tileIndex << " at (" << positionX << "," << positionY << ") with oldWireId=" <<
tiles[tileIndex].getOldWireId() << ", hash=" << hash << " wireId: " << wireId << " in " << imgSize << " bytes.");
- tiles[tileIndex].setWireId(wireId);
- tiles[tileIndex].setImgSize(imgSize);
tileIndex++;
}
+ _pngPool.run();
+
+ for (auto &i : renderedTiles)
+ {
+ if (i.getImgSize() == 0)
+ {
+ LOG_ERR("Encoded 0-sized tile!");
+ assert(!"0-sized tile enocded!");
+ }
+ }
+
+ // FIXME: append duplicates - tragically for now as real duplicates
+ // we should append these as
+ {
+ size_t imgSize = -1;
+ assert(duplicateTiles.size() == duplicateHashes.size());
+ for (size_t i = 0; i < duplicateTiles.size(); ++i)
+ {
+ if (_pngCache.copyFromCache(duplicateHashes[i], output, imgSize))
+ pushRendered(renderedTiles, duplicateTiles[i],
+ duplicateTiles[i].getWireId(), imgSize);
+ else
+ LOG_ERR("Horror - tile disappeared while rendering! " << duplicateHashes[i]);
+ }
+ }
+
+ _pngCache.balanceCache();
+
elapsed = timestamp.elapsed();
LOG_DBG("renderCombinedTiles at (" << renderArea.getLeft() << ", " << renderArea.getTop() << "), (" <<
renderArea.getWidth() << ", " << renderArea.getHeight() << ") " <<
@@ -1097,7 +1277,7 @@ public:
std::string tileMsg;
if (combined)
- tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID);
+ tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID, renderedTiles);
else
tileMsg = tiles[0].serialize("tile:", ADD_DEBUG_RENDERID);
@@ -1998,6 +2178,8 @@ private:
std::shared_ptr<TileQueue> _tileQueue;
SocketPoll& _socketPoll;
std::shared_ptr<WebSocketHandler> _websocketHandler;
+
+ std::mutex _pngMutex;
PngCache _pngCache;
// Document password provided
@@ -2019,6 +2201,8 @@ private:
/// like setting a view followed by a tile render, etc.
std::mutex _documentMutex;
+ ThreadPool _pngPool;
+
std::condition_variable _cvLoading;
std::atomic_size_t _isLoading;
int _editorId;
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 3486723a8..87272afdd 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -89,7 +89,6 @@ using Poco::Net::PartHandler;
#include <Poco/StreamCopier.h>
#include <Poco/StringTokenizer.h>
#include <Poco/TemporaryFile.h>
-#include <Poco/ThreadPool.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/HelpFormatter.h>
diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp
index 20f61d5c4..c566a9161 100644
--- a/wsd/TileDesc.hpp
+++ b/wsd/TileDesc.hpp
@@ -349,27 +349,33 @@ public:
std::string serialize(const std::string& prefix = std::string(),
const std::string& suffix = std::string()) const
{
+ return serialize(prefix, suffix, _tiles);
+ }
+
+ std::string serialize(const std::string& prefix, const std::string &suffix,
+ const std::vector<TileDesc> &tiles) const
+ {
std::ostringstream oss;
oss << prefix
<< " part=" << _part
<< " width=" << _width
<< " height=" << _height
<< " tileposx=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getTilePosX() << ',';
}
oss.seekp(-1, std::ios_base::cur); // Seek back over last comma, overwritten below.
oss << " tileposy=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getTilePosY() << ',';
}
oss.seekp(-1, std::ios_base::cur); // Ditto.
oss << " imgsize=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getImgSize() << ','; // Ditto.
}
@@ -379,14 +385,14 @@ public:
<< " tileheight=" << _tileHeight;
oss << " ver=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getVersion() << ',';
}
oss.seekp(-1, std::ios_base::cur); // Ditto.
oss << " oldwid=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getOldWireId() << ',';
}
@@ -395,7 +401,7 @@ public:
oss << " wid=";
bool comma = false;
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
if (comma)
oss << ',';
More information about the Libreoffice-commits
mailing list