loolwsd/LOOLKit.cpp | 163 ++++++++++++++--------------------------------- loolwsd/MessageQueue.cpp | 87 +++++++++---------------- loolwsd/MessageQueue.hpp | 6 - 3 files changed, 82 insertions(+), 174 deletions(-)
New commits: commit c97ebfc585f213cb897b7c7e4a948cb1332d6169 Author: Jan Holesovsky <[email protected]> Date: Fri Sep 23 10:18:54 2016 +0200 Prioritize the tile requests when extracting from the TileQueue. When putting tiles into the queue, do only the de-duplication, reprioritization is better at the get() time - no need to keep shuffling the priorities according to the cursor moves etc. all the time. The tile combination then does the rest of the work for us :-) Change-Id: I100c433dd3b24228d1ca8e4c89891635db1115c1 Reviewed-on: https://gerrit.libreoffice.org/29283 Reviewed-by: Ashod Nakashian <[email protected]> Tested-by: Ashod Nakashian <[email protected]> diff --git a/loolwsd/MessageQueue.cpp b/loolwsd/MessageQueue.cpp index 459098f..2f46e10 100644 --- a/loolwsd/MessageQueue.cpp +++ b/loolwsd/MessageQueue.cpp @@ -135,8 +135,15 @@ void TileQueue::put_impl(const Payload& value) return; } + // TODO because we are doing tile combining ourselves in the get_impl(), + // we could split the "tilecombine" messages into separate tiles here; + // could lead to some improvements in case we are getting subsequent + // tilecombines with overlapping, but not completely same areas. + if (!_queue.empty()) { + // TODO probably we could do the same with the invalidation callbacks + // (later one wins). if (msg.compare(0, 4, "tile") == 0 || msg.compare(0, 10, "tilecombine") == 0) { const auto newMsg = msg.substr(0, msg.find(" ver")); @@ -146,65 +153,17 @@ void TileQueue::put_impl(const Payload& value) auto& it = _queue[i]; const std::string old(it.data(), it.size()); const auto oldMsg = old.substr(0, old.find(" ver")); - Log::trace() << "TileQueue #" << i << ": " << old << Log::end; if (newMsg == oldMsg) { - Log::debug() << "Replacing duplicate tile: " << old << " -> " << msg << Log::end; - _queue[i] = value; - - if (priority(msg)) - { - // Bump to top. - Log::debug() << "And bumping tile to top: " << msg << Log::end; - _queue.erase(_queue.begin() + i); - _queue.push_front(value); - } - - return; + Log::debug() << "Remove duplicate message: " << old << " -> " << msg << Log::end; + _queue.erase(_queue.begin() + i); + break; } } } } - if (priority(msg)) - { - Log::debug() << "Priority tile [" << msg << "]" << Log::end; - _queue.push_front(value); - } - else - { - BasicTileQueue::put_impl(value); - } -} - -/// Bring the underlying tile (if any) to the top. -/// There should be only one overlapping tile at most. -void TileQueue::reprioritize(const CursorPosition& cursorPosition) -{ - for (size_t i = 0; i < _queue.size(); ++i) - { - auto it = _queue[i]; - const std::string msg(it.data(), it.size()); - if (msg.compare(0, 5, "tile ") != 0) - { - continue; - } - - auto tile = TileDesc::parse(msg); //FIXME: Expensive, avoid. - - if (tile.intersectsWithRect(cursorPosition.X, cursorPosition.Y, cursorPosition.Width, cursorPosition.Height)) - { - if (i != 0) - { - // Bump to top. - Log::trace() << "Bumping tile to top: " << msg << Log::end; - _queue.erase(_queue.begin() + i); - _queue.push_front(it); - } - - return; - } - } + BasicTileQueue::put_impl(value); } bool TileQueue::priority(const std::string& tileMsg) @@ -231,7 +190,6 @@ MessageQueue::Payload TileQueue::get_impl() { std::vector<TileDesc> tiles; const auto front = _queue.front(); - _queue.pop_front(); auto msg = std::string(front.data(), front.size()); Log::trace() << "MessageQueue Get, Size: " << _queue.size() << ", Front: " << msg << Log::end; @@ -240,9 +198,30 @@ MessageQueue::Payload TileQueue::get_impl() { // Don't combine non-tiles or tiles with id. Log::trace() << "MessageQueue res: " << msg << Log::end; + _queue.pop_front(); return front; } + // We are handling a tile; first try to find one that is at the cursor's + // position, otherwise handle the one that is at the front + bool foundPrioritized = false; + for (size_t i = 0; i < _queue.size(); ++i) + { + auto& it = _queue[i]; + const std::string prio(it.data(), it.size()); + if (priority(prio)) + { + Log::debug() << "Handling a priority message: " << prio << Log::end; + _queue.erase(_queue.begin() + i); + msg = prio; + foundPrioritized = true; + break; + } + } + + if (!foundPrioritized) + _queue.pop_front(); + tiles.emplace_back(TileDesc::parse(msg)); // Combine as many tiles as possible with the top one. @@ -263,7 +242,7 @@ MessageQueue::Payload TileQueue::get_impl() } auto tile2 = TileDesc::parse(msg); - Log::trace() << "combining?: " << msg << Log::end; + Log::trace() << "combining candidate: " << msg << Log::end; // Check if adjacent tiles. bool found = false; diff --git a/loolwsd/MessageQueue.hpp b/loolwsd/MessageQueue.hpp index 4d1efba..89d314e 100644 --- a/loolwsd/MessageQueue.hpp +++ b/loolwsd/MessageQueue.hpp @@ -112,8 +112,6 @@ public: { _cursorPositions[viewId] = cursorPosition; } - - reprioritize(cursorPosition); } void removeCursorPosition(int viewId) @@ -128,10 +126,6 @@ protected: private: - /// Bring the underlying tile (if any) to the top. - /// There should be only one overlapping tile at most. - void reprioritize(const CursorPosition& cursorPosition); - /// Check if the given tile msg underlies a cursor. bool priority(const std::string& tileMsg); commit 61b0ad4933550289017a7f59b436299ff01155af Author: Jan Holesovsky <[email protected]> Date: Thu Sep 22 21:59:12 2016 +0200 Fix a race between the tile rendering and invalidation callbacks. Instead of 2 queues and 2 threads, merge those to one - which gives a perfect ordering of the invalidations and rendering. Change-Id: I229dfc08b43e6ce7e4f08ea8059d3298d9bf8f8a Reviewed-on: https://gerrit.libreoffice.org/29282 Reviewed-by: Ashod Nakashian <[email protected]> Tested-by: Ashod Nakashian <[email protected]> diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index 7c9f451..5633536 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -368,26 +368,6 @@ private: std::atomic<bool> _joined; }; -/// Worker callback notification object. -/// Used to pass callback data to the worker -/// thread to invoke sessions with the data. -class CallbackNotification : public Notification -{ -public: - typedef AutoPtr<CallbackNotification> Ptr; - - CallbackNotification(const std::shared_ptr<ChildSession>& session, const int nType, const std::string& rPayload) - : _session(session), - _nType(nType), - _aPayload(rPayload) - { - } - - const std::shared_ptr<ChildSession> _session; - const int _nType; - const std::string _aPayload; -}; - /// A document container. /// Owns LOKitDocument instance and connections. /// Manages the lifetime of a document. @@ -430,7 +410,6 @@ public: _docPasswordType(PasswordType::ToView), _stop(false), _isLoading(0), - _tilesThread(tilesThread, this), _clientViews(0) { Log::info("Document ctor for url [" + _url + "] on child [" + _jailId + "]."); @@ -446,11 +425,9 @@ public: // Wait for the callback worker to finish. _stop = true; - _callbackQueue.wakeUpAll(); - _callbackThread.join(); _tileQueue->put("eof"); - _tilesThread.join(); + _callbackThread.join(); // Flag all connections to stop. for (auto aIterator : _connections) @@ -875,38 +852,7 @@ private: } } - // Forward to the same view only. - // Demultiplexing is done by Core. - // TODO: replace with a map to be faster. - bool isFound = false; - for (auto& it : pDescr->Doc->_connections) - { - auto session = it.second->getSession(); - if (session && session->getViewId() == pDescr->ViewId) - { - if (it.second->isRunning()) - { - isFound = true; - auto pNotif = new CallbackNotification(session, nType, payload); - pDescr->Doc->_callbackQueue.enqueueNotification(pNotif); - } - else - { - Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view " - << pDescr->ViewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(nType) - << "] payload [" << payload << "]." << Log::end; - } - - break; - } - } - - if (!isFound) - { - Log::warn() << "Document::ViewCallback. The message [" << pDescr->ViewId - << "] [" << LOKitHelper::kitCallbackTypeToString(nType) - << "] [" << payload << "] is not sent to Master Session." << Log::end; - } + pDescr->Doc->_tileQueue->put("callback " + std::to_string(pDescr->ViewId) + " " + std::to_string(nType) + " " + payload); } static void DocumentCallback(const int nType, const char* pPayload, void* pData) @@ -924,18 +870,8 @@ private: { std::unique_lock<std::mutex> lock(_mutex); - for (auto& it: _connections) - { - if (it.second->isRunning()) - { - auto session = it.second->getSession(); - if (session) - { - auto pNotif = new CallbackNotification(session, nType, payload); - _callbackQueue.enqueueNotification(pNotif); - } - } - } + // "-1" means broadcast + _tileQueue->put("callback -1 " + std::to_string(nType) + " " + payload); } /// Load a document (or view) and register callbacks. @@ -1230,55 +1166,17 @@ private: void run() override { - Util::setThreadName("kit_callback"); - - Log::debug("Thread started."); - - while (!_stop && !TerminationFlag) - { - Notification::Ptr aNotification(_callbackQueue.waitDequeueNotification()); - if (!_stop && !TerminationFlag && aNotification) - { - CallbackNotification::Ptr aCallbackNotification = aNotification.cast<CallbackNotification>(); - assert(aCallbackNotification); - - const auto nType = aCallbackNotification->_nType; - try - { - aCallbackNotification->_session->loKitCallback(nType, aCallbackNotification->_aPayload); - } - catch (const Exception& exc) - { - Log::error() << "CallbackWorker::run: Exception while handling callback [" << LOKitHelper::kitCallbackTypeToString(nType) << "]: " - << exc.displayText() - << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "") - << Log::end; - } - catch (const std::exception& exc) - { - Log::error("CallbackWorker::run: Exception while handling callback [" + LOKitHelper::kitCallbackTypeToString(nType) + "]: " + exc.what()); - } - } - else - break; - } - - Log::debug("Thread finished."); - } - - static void tilesThread(Document* pThis) - { - Util::setThreadName("tile_renderer"); + Util::setThreadName("lok_handler"); Log::debug("Thread started."); try { - while (!pThis->_stop) + while (!_stop && !TerminationFlag) { - const auto input = pThis->_tileQueue->get(); + const auto input = _tileQueue->get(); const std::string message(input.data(), input.size()); - StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + StringTokenizer tokens(message, " "); if (tokens[0] == "eof") { @@ -1288,11 +1186,50 @@ private: if (tokens[0] == "tile") { - pThis->renderTile(tokens, pThis->_ws); + renderTile(tokens, _ws); } else if (tokens[0] == "tilecombine") { - pThis->renderCombinedTiles(tokens, pThis->_ws); + renderCombinedTiles(tokens, _ws); + } + else if (tokens[0] == "callback") + { + int viewId = std::stoi(tokens[1]); // -1 means broadcast + int type = std::stoi(tokens[2]); + + // payload is the rest of the message + std::string payload(message.substr(tokens[0].length() + tokens[1].length() + tokens[2].length() + 3)); + + // Forward the callback to the same view, demultiplexing is done by the LibreOffice core. + // TODO: replace with a map to be faster. + bool isFound = false; + for (auto& it : _connections) + { + auto session = it.second->getSession(); + if (session && ((session->getViewId() == viewId) || (viewId == -1))) + { + if (it.second->isRunning()) + { + isFound = true; + session->loKitCallback(type, payload); + } + else + { + Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view " + << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type) + << "] payload [" << payload << "]." << Log::end; + } + + break; + } + } + + if (!isFound) + { + Log::warn() << "Document::ViewCallback. The message [" << viewId + << "] [" << LOKitHelper::kitCallbackTypeToString(type) + << "] [" << payload << "] is not sent to Master Session." << Log::end; + } } else { @@ -1336,8 +1273,6 @@ private: std::map<int, std::unique_ptr<CallbackDescriptor>> _viewIdToCallbackDescr; std::map<unsigned, std::shared_ptr<Connection>> _connections; Poco::Thread _callbackThread; - std::thread _tilesThread; - Poco::NotificationQueue _callbackQueue; std::atomic_size_t _clientViews; }; _______________________________________________ Libreoffice-commits mailing list [email protected] https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits
