common/MessageQueue.hpp | 7 ++ kit/Kit.cpp | 129 +++++++++++++++++++++++++++++------------------- 2 files changed, 86 insertions(+), 50 deletions(-)
New commits: commit 73dc711e0ade81c1238281a290a59a7c28cabab4 Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Sat May 4 14:51:14 2019 +0100 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Fri May 10 15:13:40 2019 +0100 Unipoll: terminate repeated polling loop on wakeup. Wakeup wakes up the nested SocketPoll::poll nicely, but that's no use if we immediately ignore that and re-poll, so shorten the timeout in this case. Change-Id: I927d2375b92c9ce6c6ebe3f0ab33e2863894e2ef diff --git a/common/MessageQueue.hpp b/common/MessageQueue.hpp index ac8d4e51f..f07b4be2a 100644 --- a/common/MessageQueue.hpp +++ b/common/MessageQueue.hpp @@ -88,6 +88,13 @@ public: return get_impl(); } + /// Anything in the queue ? + bool isEmpty() + { + std::unique_lock<std::mutex> lock(_mutex); + return _queue.size() == 0; + } + /// Thread safe removal of all the pending messages. void clear() { diff --git a/kit/Kit.cpp b/kit/Kit.cpp index c75047a8e..216c8d757 100644 --- a/kit/Kit.cpp +++ b/kit/Kit.cpp @@ -1983,15 +1983,18 @@ private: } public: + bool hasQueued() + { + return !_tileQueue->isEmpty(); + } + void drainQueue(const std::chrono::steady_clock::time_point &now) { try { - while (true) + while (hasQueued()) { const TileQueue::Payload input = _tileQueue->pop(); - if (input.size() <= 0) - break; LOG_TRC("Kit Recv " << LOOLProtocol::getAbbreviatedMessage(input)); @@ -2327,71 +2330,99 @@ void documentViewCallback(const int type, const char* payload, void* data) Document::ViewCallback(type, payload, data); } -/// Called by LOK main-loop the central location for data processing. -int pollCallback(void* pData, int timeoutUs) +class KitSocketPoll : public SocketPoll { - if (!pData) - return 0; - - // The maximum number of extra events to process beyond the first. - int maxExtraEvents = 15; - int eventsSignalled = 0; + std::chrono::steady_clock::time_point _pollEnd; +public: + KitSocketPoll() : + SocketPoll("kit") + { + } - int timeoutMs = timeoutUs / 1000; + // process pending message-queue events. + void drainQueue(const std::chrono::steady_clock::time_point &now) + { + if (document) + document->drainQueue(now); + } - SocketPoll* pSocketPoll = reinterpret_cast<SocketPoll*>(pData); - if (timeoutMs < 0) + // called from inside poll, inside a wakeup + void wakeupHook() { - // Flush at most 1 + maxExtraEvents, or return when nothing left. - while (pSocketPoll->poll(0) > 0 && maxExtraEvents-- > 0) - ++eventsSignalled; + _pollEnd = std::chrono::steady_clock::now(); } - else + + // a LOK compatible poll function merging the functions. + // returns the number of events signalled + int kitPoll(int timeoutUs) { - const auto startTime = std::chrono::steady_clock::now(); - do + if (TerminationFlag) + { + LOG_TRC("Termination of unipoll mainloop flagged"); + return -1; + } + + // The maximum number of extra events to process beyond the first. + int maxExtraEvents = 15; + int eventsSignalled = 0; + + int timeoutMs = timeoutUs / 1000; + + if (timeoutMs < 0) + { + // Flush at most 1 + maxExtraEvents, or return when nothing left. + while (poll(0) > 0 && maxExtraEvents-- > 0) + ++eventsSignalled; + } + else { // Flush at most maxEvents+1, or return when nothing left. - if (pSocketPoll->poll(timeoutMs) <= 0) - break; + _pollEnd = std::chrono::steady_clock::now() + std::chrono::microseconds(timeoutUs); + do + { + if (poll(timeoutMs) <= 0) + break; - const auto now = std::chrono::steady_clock::now(); - const auto elapsedTimeMs - = std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime) - .count(); - if (elapsedTimeMs >= timeoutMs) - break; + const auto now = std::chrono::steady_clock::now(); + drainQueue(now); - timeoutMs -= elapsedTimeMs; - ++eventsSignalled; + timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(_pollEnd - now).count(); + ++eventsSignalled; + } + while (timeoutMs > 0 && !TerminationFlag && maxExtraEvents-- > 0); } - while (!TerminationFlag && maxExtraEvents-- > 0); - } - if (document) - document->drainQueue(std::chrono::steady_clock::now()); + drainQueue(std::chrono::steady_clock::now()); #if !MOBILEAPP - if (document && document->purgeSessions() == 0) - { - LOG_INF("Last session discarded. Setting TerminationFlag"); - TerminationFlag = true; - return -1; - } + if (document && document->purgeSessions() == 0) + { + LOG_INF("Last session discarded. Setting TerminationFlag"); + TerminationFlag = true; + return -1; + } #endif + // Report the number of events we processsed. + return eventsSignalled; + } +}; - // Report the number of events we processsed. - return eventsSignalled; +/// Called by LOK main-loop the central location for data processing. +int pollCallback(void* pData, int timeoutUs) +{ + if (!pData) + return 0; + else + return reinterpret_cast<KitSocketPoll*>(pData)->kitPoll(timeoutUs); } /// Called by LOK main-loop void wakeCallback(void* pData) { - if (pData) - { - SocketPoll* pSocketPoll = reinterpret_cast<SocketPoll*>(pData); - pSocketPoll->wakeup(); - } + if (!pData) + return; + else + return reinterpret_cast<KitSocketPoll*>(pData)->wakeup(); } #ifndef BUILDING_TESTS @@ -2687,7 +2718,7 @@ void lokit_main( #endif // MOBILEAPP - SocketPoll mainKit("kit"); + KitSocketPoll mainKit; mainKit.runOnClientThread(); // We will do the polling on this thread. std::shared_ptr<SocketHandlerInterface> websocketHandler = commit 8360f2c95140beeda41927e77070a1114a448fe3 Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Fri May 3 18:05:48 2019 +0100 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Fri May 10 15:13:40 2019 +0100 unipoll: process lots of events at once. Change-Id: I8b0a37d114a55e5d64d7e5dd7df6c494971087ca diff --git a/kit/Kit.cpp b/kit/Kit.cpp index 6ac59e84d..c75047a8e 100644 --- a/kit/Kit.cpp +++ b/kit/Kit.cpp @@ -2334,9 +2334,7 @@ int pollCallback(void* pData, int timeoutUs) return 0; // The maximum number of extra events to process beyond the first. - //FIXME: When processing more than one event, full-document - //FIXME: invalidations happen (for some reason), so disable for now. - int maxExtraEvents = 0; + int maxExtraEvents = 15; int eventsSignalled = 0; int timeoutMs = timeoutUs / 1000; @@ -2367,7 +2365,7 @@ int pollCallback(void* pData, int timeoutUs) timeoutMs -= elapsedTimeMs; ++eventsSignalled; } - while (maxExtraEvents-- > 0); + while (!TerminationFlag && maxExtraEvents-- > 0); } if (document) _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits