kit/Kit.cpp | 275 +++++++++++++++++++++++++++++++++++++++++++++---------- wsd/LOOLWSD.cpp | 1 wsd/TileDesc.hpp | 18 ++- 3 files changed, 239 insertions(+), 55 deletions(-)
New commits: commit 8172885f74bef3e4d90e41a54a596add5b292b6a Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Sat Apr 20 01:53:12 2019 +0100 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Thu May 2 22:36:52 2019 +0100 PNG compression a bottleneck: thread it to accelerate things. Also have a separate hash <-> wid cache to avoid re-rendering older tiles as/when we see them. Change-Id: I238fe6701a1d1cb486473c67faba8c56e9c98dcb diff --git a/kit/Kit.cpp b/kit/Kit.cpp index 25050e04b..c6ab8fd4a 100644 --- a/kit/Kit.cpp +++ b/kit/Kit.cpp @@ -435,13 +435,15 @@ private: size_t _cacheSize; static const size_t CacheSizeSoftLimit = (1024 * 4 * 32); // 128k of cache static const size_t CacheSizeHardLimit = CacheSizeSoftLimit * 2; + static const size_t CacheWidHardLimit = 4096; size_t _cacheHits; size_t _cacheTests; TileWireId _nextId; DeltaGenerator _deltaGen; - std::map< TileBinaryHash, CacheEntry > _cache; - std::map< TileWireId, TileBinaryHash > _wireToHash; + std::unordered_map< TileBinaryHash, CacheEntry > _cache; + // This uses little storage so can be much larger + std::unordered_map< TileBinaryHash, TileWireId > _hashToWireId; void clearCache(bool logStats = false) { @@ -449,6 +451,7 @@ private: LOG_DBG("cache clear " << _cache.size() << " items total size " << _cacheSize << " current hits " << _cacheHits); _cache.clear(); + _hashToWireId.clear(); _cacheSize = 0; _cacheHits = 0; _cacheTests = 0; @@ -465,6 +468,8 @@ private: return id; } +public: + // Performed only after a complete combinetiles void balanceCache() { // A normalish PNG image size for text in a writer document is @@ -489,10 +494,6 @@ private: // the chance of hitting these entries in the future. _cacheSize -= it->second.getData()->size(); - auto wIt = _wireToHash.find(it->second.getWireId()); - assert(wIt != _wireToHash.end()); - _wireToHash.erase(wIt); - it = _cache.erase(it); } else @@ -506,9 +507,22 @@ private: LOG_DBG("PNG cache has " << _cache.size() << " items, total size " << _cacheSize << " after balance."); } + + if (_hashToWireId.size() > CacheWidHardLimit) + { + LOG_DBG("Clear half of wid cache of size " << _hashToWireId.size()); + TileWireId max = _nextId - CacheWidHardLimit/2; + for (auto it = _hashToWireId.begin(); it != _hashToWireId.end();) + { + if (it->second < max) + it = _hashToWireId.erase(it); + else + ++it; + } + LOG_DBG("Wid cache is now size " << _hashToWireId.size()); + } } -public: /// Lookup an entry in the cache and store the data in output. /// Returns true on success, otherwise false. bool copyFromCache(const TileBinaryHash hash, std::vector<char>& output, size_t &imgSize) @@ -540,10 +554,13 @@ public: if (hash) { + // Adding duplicates causes grim wid mixups + assert(hashToWireId(hash) == wid); + assert(_cache.find(hash) == _cache.end()); + data->shrink_to_fit(); _cache.emplace(hash, newEntry); _cacheSize += data->size(); - balanceCache(); } } @@ -552,18 +569,18 @@ public: clearCache(); } - TileWireId hashToWireId(TileBinaryHash id) + TileWireId hashToWireId(TileBinaryHash hash) { TileWireId wid; - if (id == 0) + if (hash == 0) return 0; - auto it = _cache.find(id); - if (it != _cache.end()) - wid = it->second.getWireId(); + auto it = _hashToWireId.find(hash); + if (it != _hashToWireId.end()) + wid = it->second; else { wid = createNewWireId(); - _wireToHash.emplace(wid, id); + _hashToWireId.emplace(hash, wid); } return wid; } @@ -737,6 +754,103 @@ private: static FILE* ProcSMapsFile = nullptr; #endif +class ThreadPool { + std::mutex _mutex; + std::condition_variable _cond; + std::condition_variable _complete; + typedef std::function<void()> ThreadFn; + std::queue<ThreadFn> _work; + std::vector<std::thread> _threads; + size_t _working; + bool _shutdown; +public: + ThreadPool() + : _working(0), + _shutdown(false) + { + int maxConcurrency = 2; +#if MOBILEAPP +# warning "Good defaults ? - 2 for iOS, 4 for Android ?" +#else + const char *max = getenv("MAX_CONCURRENCY"); + if (max) + maxConcurrency = atoi(max); +#endif + LOG_TRC("PNG compression thread pool size " << maxConcurrency); + for (int i = 1; i < maxConcurrency; ++i) + _threads.push_back(std::thread(&ThreadPool::work, this)); + } + ~ThreadPool() + { + { + std::unique_lock< std::mutex > lock(_mutex); + assert(_working == 0); + _shutdown = true; + } + _cond.notify_all(); + for (auto &it : _threads) + it.join(); + } + + size_t count() const + { + return _work.size(); + } + + void pushWorkUnlocked(const ThreadFn &fn) + { + _work.push(fn); + } + + void runOne(std::unique_lock< std::mutex >& lock) + { + assert(!_work.empty()); + + ThreadFn fn = _work.front(); + _work.pop(); + _working++; + lock.unlock(); + + fn(); + + lock.lock(); + _working--; + if (_work.empty() && _working == 0) + _complete.notify_all(); + } + + void run() + { + std::unique_lock< std::mutex > lock(_mutex); + assert(_working == 0); + + // Avoid notifying threads if we don't need to. + bool useThreads = _threads.size() > 1 && _work.size() > 1; + if (useThreads) + _cond.notify_all(); + + while(!_work.empty()) + runOne(lock); + + if (useThreads && (_working > 0 || !_work.empty())) + _complete.wait(lock, [this]() { return _working == 0 && _work.empty(); } ); + + assert(_working==0); + assert(_work.empty()); + } + + void work() + { + std::unique_lock< std::mutex > lock(_mutex); + while (!_shutdown) + { + _cond.wait(lock); + if (!_shutdown && !_work.empty()) + runOne(lock); + } + } +}; + /// A document container. /// Owns LOKitDocument instance and connections. /// Manages the lifetime of a document. @@ -948,6 +1062,14 @@ public: renderTiles(tileCombined, true); } + static void pushRendered(std::vector<TileDesc> &renderedTiles, + const TileDesc &desc, TileWireId wireId, size_t imgSize) + { + renderedTiles.push_back(desc); + renderedTiles.back().setWireId(wireId); + renderedTiles.back().setImgSize(imgSize); + } + void renderTiles(TileCombined &tileCombined, bool combined) { auto& tiles = tileCombined.getTiles(); @@ -1016,6 +1138,11 @@ public: const int pixelWidth = tileCombined.getWidth(); const int pixelHeight = tileCombined.getHeight(); + std::vector<TileDesc> renderedTiles; + std::vector<TileDesc> duplicateTiles; + std::vector<TileBinaryHash> duplicateHashes; + std::vector<TileWireId> renderingIds; + size_t tileIndex = 0; for (Util::Rectangle& tileRect : tileRecs) { @@ -1035,61 +1162,109 @@ public: // The tile content is identical to what the client already has, so skip it LOG_TRC("Match for tile #" << tileIndex << " at (" << positionX << "," << positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping"); - tiles.erase(tiles.begin() + tileIndex); + tileIndex++; continue; } - size_t imgSize; - - if (!_pngCache.copyFromCache(hash, output, imgSize)) + bool skipCompress = false; + size_t imgSize = -1; + if (_pngCache.copyFromCache(hash, output, imgSize)) + { + pushRendered(renderedTiles, tiles[tileIndex], wireId, imgSize); + skipCompress = true; + } + else { LOG_DBG("PNG cache with hash " << hash << " missed."); + // Don't re-compress the same thing multiple times. + for (auto id : renderingIds) + { + if (wireId == id) + { + pushRendered(duplicateTiles, tiles[tileIndex], wireId, 0); + duplicateHashes.push_back(hash); + skipCompress = true; + LOG_TRC("Rendering duplicate tile #" << tileIndex << " at (" << positionX << "," << + positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping"); + break; + } + } + } + + if (!skipCompress) + { + renderingIds.push_back(wireId); if (_docWatermark) _docWatermark->blending(pixmap.data(), offsetX, offsetY, pixmapWidth, pixmapHeight, pixelWidth, pixelHeight, mode); - PngCache::CacheData data(new std::vector< char >() ); - data->reserve(pixmapWidth * pixmapHeight * 1); - -/* - *Disable for now - pushed in error. - * - if (_deltaGen.createDelta(pixmap, startX, startY, width, height, - bufferWidth, bufferHeight, - output, wid, oldWid)) - else ... -*/ - - LOG_DBG("Encode a new png for this tile."); - if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight, - pixmapWidth, pixmapHeight, *data, mode)) - { - // FIXME: Return error. - // sendTextFrame("error: cmd=tile kind=failure"); - LOG_ERR("Failed to encode tile into PNG."); - return; - } + // Queue to be executed later in parallel inside 'run' + _pngPool.pushWorkUnlocked([=,&output,&pixmap,&tiles,&renderedTiles](){ + PngCache::CacheData data(new std::vector< char >() ); + data->reserve(pixmapWidth * pixmapHeight * 1); + + /* + * Disable for now - pushed in error. + * + if (_deltaGen.createDelta(pixmap, startX, startY, width, height, + bufferWidth, bufferHeight, + output, wid, oldWid)) + else ... + */ + + LOG_DBG("Encode a new png for tile #" << tileIndex); + if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight, + pixmapWidth, pixmapHeight, *data, mode)) + { + // FIXME: Return error. + // sendTextFrame("error: cmd=tile kind=failure"); + LOG_ERR("Failed to encode tile into PNG."); + return; + } - output.insert(output.end(), data->begin(), data->end()); - imgSize = data->size(); - _pngCache.addToCache(data, wireId, hash); + LOG_DBG("Tile " << tileIndex << " is " << data->size() << " bytes."); + std::unique_lock<std::mutex> pngLock(_pngMutex); + output.insert(output.end(), data->begin(), data->end()); + _pngCache.addToCache(data, wireId, hash); + pushRendered(renderedTiles, tiles[tileIndex], wireId, data->size()); + }); } - LOG_TRC("Encoded tile #" << tileIndex << " at (" << positionX << "," << positionY << ") with oldWireId=" << tiles[tileIndex].getOldWireId() << ", hash=" << hash << " wireId: " << wireId << " in " << imgSize << " bytes."); - if (imgSize == 0) + tileIndex++; + } + + _pngPool.run(); + + for (auto &i : renderedTiles) + { + if (i.getImgSize() == 0) { LOG_ERR("Encoded 0-sized tile!"); assert(!"0-sized tile enocded!"); } - tiles[tileIndex].setWireId(wireId); - tiles[tileIndex].setImgSize(imgSize); - tileIndex++; } + // FIXME: append duplicates - tragically for now as real duplicates + // we should append these as + { + size_t imgSize = -1; + assert(duplicateTiles.size() == duplicateHashes.size()); + for (size_t i = 0; i < duplicateTiles.size(); ++i) + { + if (_pngCache.copyFromCache(duplicateHashes[i], output, imgSize)) + pushRendered(renderedTiles, duplicateTiles[i], + duplicateTiles[i].getWireId(), imgSize); + else + LOG_ERR("Horror - tile disappeared while rendering! " << duplicateHashes[i]); + } + } + + _pngCache.balanceCache(); + elapsed = timestamp.elapsed(); LOG_DBG("renderCombinedTiles at (" << renderArea.getLeft() << ", " << renderArea.getTop() << "), (" << renderArea.getWidth() << ", " << renderArea.getHeight() << ") " << @@ -1103,7 +1278,7 @@ public: std::string tileMsg; if (combined) - tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID); + tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID, renderedTiles); else tileMsg = tiles[0].serialize("tile:", ADD_DEBUG_RENDERID); @@ -1996,6 +2171,8 @@ private: std::shared_ptr<TileQueue> _tileQueue; SocketPoll& _socketPoll; std::shared_ptr<WebSocketHandler> _websocketHandler; + + std::mutex _pngMutex; PngCache _pngCache; // Document password provided @@ -2017,6 +2194,8 @@ private: /// like setting a view followed by a tile render, etc. std::mutex _documentMutex; + ThreadPool _pngPool; + std::condition_variable _cvLoading; std::atomic_size_t _isLoading; int _editorId; diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 57ddfd705..116e7c560 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -89,7 +89,6 @@ using Poco::Net::PartHandler; #include <Poco/StreamCopier.h> #include <Poco/StringTokenizer.h> #include <Poco/TemporaryFile.h> -#include <Poco/ThreadPool.h> #include <Poco/URI.h> #include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/HelpFormatter.h> diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp index 77fb46b26..d30b718da 100644 --- a/wsd/TileDesc.hpp +++ b/wsd/TileDesc.hpp @@ -357,27 +357,33 @@ public: std::string serialize(const std::string& prefix = std::string(), const std::string& suffix = std::string()) const { + return serialize(prefix, suffix, _tiles); + } + + std::string serialize(const std::string& prefix, const std::string &suffix, + const std::vector<TileDesc> &tiles) const + { std::ostringstream oss; oss << prefix << " part=" << _part << " width=" << _width << " height=" << _height << " tileposx="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getTilePosX() << ','; } oss.seekp(-1, std::ios_base::cur); // Seek back over last comma, overwritten below. oss << " tileposy="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getTilePosY() << ','; } oss.seekp(-1, std::ios_base::cur); // Ditto. oss << " imgsize="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getImgSize() << ','; // Ditto. } @@ -387,14 +393,14 @@ public: << " tileheight=" << _tileHeight; oss << " ver="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getVersion() << ','; } oss.seekp(-1, std::ios_base::cur); // Ditto. oss << " oldwid="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getOldWireId() << ','; } @@ -403,7 +409,7 @@ public: oss << " wid="; bool comma = false; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { if (comma) oss << ','; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits