common/Common.hpp | 2 common/IoUtil.cpp | 4 - common/LOOLWebSocket.hpp | 2 configure.ac | 2 kit/ForKit.cpp | 2 kit/Kit.cpp | 15 +--- net/DelaySocket.cpp | 14 +-- net/ServerSocket.hpp | 4 - net/Socket.cpp | 140 ++++++++++++++++++++++++++++++++++++- net/Socket.hpp | 142 +++----------------------------------- net/SslSocket.hpp | 6 - net/WebSocketHandler.hpp | 24 +++--- test/UnitWOPIDocumentConflict.cpp | 2 test/helpers.hpp | 4 - tools/WebSocketDump.cpp | 6 - wsd/Admin.cpp | 38 +++------- wsd/Admin.hpp | 9 +- wsd/DocumentBroker.cpp | 12 +-- wsd/LOOLWSD.cpp | 16 ++-- 19 files changed, 224 insertions(+), 220 deletions(-)
New commits: commit 5710c8632383e92372e1d81b6e26acc975e25ec4 Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Thu Apr 9 14:43:51 2020 +0100 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Fri Apr 10 10:06:23 2020 +0200 Poll - switch to ppoll for closer to microsecond accuracy. Change-Id: Ib8a2bb6f60302df8631edadbbb8db626894c457c Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92000 Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoff...@gmail.com> Reviewed-by: Michael Meeks <michael.me...@collabora.com> diff --git a/common/Common.hpp b/common/Common.hpp index 82f848579..ab9989e73 100644 --- a/common/Common.hpp +++ b/common/Common.hpp @@ -18,7 +18,7 @@ constexpr int DEFAULT_CLIENT_PORT_NUMBER = 9980; constexpr int COMMAND_TIMEOUT_MS = 5000; constexpr int CHILD_TIMEOUT_MS = COMMAND_TIMEOUT_MS; constexpr int CHILD_REBALANCE_INTERVAL_MS = CHILD_TIMEOUT_MS / 10; -constexpr int POLL_TIMEOUT_MS = COMMAND_TIMEOUT_MS / 5; +constexpr int POLL_TIMEOUT_MICRO_S = (COMMAND_TIMEOUT_MS / 5) * 1000; constexpr int WS_SEND_TIMEOUT_MS = 1000; constexpr int TILE_ROUNDTRIP_TIMEOUT_MS = 5000; diff --git a/common/IoUtil.cpp b/common/IoUtil.cpp index 209d0f5d3..f7ea25225 100644 --- a/common/IoUtil.cpp +++ b/common/IoUtil.cpp @@ -49,7 +49,7 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws, LOG_INF("SocketProcessor [" << name << "] starting."); // Timeout given is in microseconds. - static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000); + static const Poco::Timespan waitTime(POLL_TIMEOUT_MICRO_S); int flags = 0; int n = -1; bool stop = false; @@ -244,7 +244,7 @@ int PipeReader::readLine(std::string& line, // Poll in short intervals to check for stop condition. const int pollTimeoutMs = 500; - int maxPollCount = std::max<int>(POLL_TIMEOUT_MS / pollTimeoutMs, 1); + int maxPollCount = std::max<int>((POLL_TIMEOUT_MICRO_S / 1000) / pollTimeoutMs, 1); while (maxPollCount-- > 0) { if (stopPredicate()) diff --git a/common/LOOLWebSocket.hpp b/common/LOOLWebSocket.hpp index 6d3bd42cf..2c0559593 100644 --- a/common/LOOLWebSocket.hpp +++ b/common/LOOLWebSocket.hpp @@ -53,7 +53,7 @@ public: int receiveFrame(char* buffer, const int length, int& flags) { // Timeout is in microseconds. We don't need this, except to yield the cpu. - static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000 / 10); + static const Poco::Timespan waitTime(POLL_TIMEOUT_MICRO_S / 10); static const Poco::Timespan waitZero(0); while (poll(waitTime, Poco::Net::Socket::SELECT_READ)) diff --git a/configure.ac b/configure.ac index 1edd5408f..7d795bce8 100644 --- a/configure.ac +++ b/configure.ac @@ -1031,6 +1031,8 @@ AS_IF([test "$ENABLE_IOSAPP" = "true"], ]) AC_SUBST(IOSAPP_FONTS) +AC_CHECK_FUNCS(ppoll) + ENABLE_CYPRESS=false if test "$enable_cypress" = "yes"; then cypress_msg="cypress is enabled" diff --git a/kit/ForKit.cpp b/kit/ForKit.cpp index bea72f4d5..5506fa339 100644 --- a/kit/ForKit.cpp +++ b/kit/ForKit.cpp @@ -606,7 +606,7 @@ int main(int argc, char** argv) { UnitKit::get().invokeForKitTest(); - mainPoll.poll(POLL_TIMEOUT_MS); + mainPoll.ppoll(POLL_TIMEOUT_MICRO_S); #if ENABLE_DEBUG if (!SingleKit) diff --git a/kit/Kit.cpp b/kit/Kit.cpp index e99ec8e61..acec37bb9 100644 --- a/kit/Kit.cpp +++ b/kit/Kit.cpp @@ -2245,7 +2245,7 @@ public: // a LOK compatible poll function merging the functions. // returns the number of events signalled - int kitPoll(int timeoutUs) + int kitPoll(int timeoutMicroS) { if (SigUtil::getTerminationFlag()) { @@ -2257,30 +2257,29 @@ public: int maxExtraEvents = 15; int eventsSignalled = 0; - int timeoutMs = timeoutUs / 1000; - if (timeoutMs < 0) + if (timeoutMicroS < 0) { // Flush at most 1 + maxExtraEvents, or return when nothing left. - while (poll(0) > 0 && maxExtraEvents-- > 0) + while (ppoll(0) > 0 && maxExtraEvents-- > 0) ++eventsSignalled; } else { // Flush at most maxEvents+1, or return when nothing left. - _pollEnd = std::chrono::steady_clock::now() + std::chrono::microseconds(timeoutUs); + _pollEnd = std::chrono::steady_clock::now() + std::chrono::microseconds(timeoutMicroS); do { - if (poll(timeoutMs) <= 0) + if (ppoll(timeoutMicroS) <= 0) break; const auto now = std::chrono::steady_clock::now(); drainQueue(now); - timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(_pollEnd - now).count(); + timeoutMicroS = std::chrono::duration_cast<std::chrono::microseconds>(_pollEnd - now).count(); ++eventsSignalled; } - while (timeoutMs > 0 && !SigUtil::getTerminationFlag() && maxExtraEvents-- > 0); + while (timeoutMicroS > 0 && !SigUtil::getTerminationFlag() && maxExtraEvents-- > 0); } drainQueue(std::chrono::steady_clock::now()); diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp index cfd8b7e54..b07b46cc4 100644 --- a/net/DelaySocket.cpp +++ b/net/DelaySocket.cpp @@ -77,17 +77,17 @@ public: // FIXME - really need to propagate 'noDelay' etc. // have a debug only lookup of delayed sockets for this case ? - int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) override { if (_chunks.size() > 0) { - int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>( + int64_t remainingMicroS = std::chrono::duration_cast<std::chrono::microseconds>( (*_chunks.begin())->getSendTime() - now).count(); - if (remainingMs < timeoutMaxMs) - DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs - << "ms from " << timeoutMaxMs << "ms\n"); - timeoutMaxMs = std::min(timeoutMaxMs, remainingMs); + if (remainingMicroS < timeoutMaxMicroS) + DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMicroS + << "us from " << timeoutMaxMicroS << "us\n"); + timeoutMaxMicroS = std::min(timeoutMaxMicroS, remainingMicroS); } if (_chunks.size() > 0 && diff --git a/net/ServerSocket.hpp b/net/ServerSocket.hpp index 1017aeb5e..65f826913 100644 --- a/net/ServerSocket.hpp +++ b/net/ServerSocket.hpp @@ -65,8 +65,8 @@ public: /// Returns a valid Socket shared_ptr on success only. virtual std::shared_ptr<Socket> accept(); - int getPollEvents(std::chrono::steady_clock::time_point /* now */, - int & /* timeoutMaxMs */) override + int pgetPollEvents(std::chrono::steady_clock::time_point /* now */, + int64_t & /* timeoutMaxMicroS */) override { return POLLIN; } diff --git a/net/Socket.cpp b/net/Socket.cpp index 5bb1fa250..47ae1ad41 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -35,7 +35,7 @@ #endif #include "WebSocketHandler.hpp" -int SocketPoll::DefaultPollTimeoutMs = 5000; +int SocketPoll::DefaultPollTimeoutMicroS = 5000 * 1000; std::atomic<bool> SocketPoll::InhibitThreadChecks(false); std::atomic<bool> Socket::InhibitThreadChecks(false); @@ -194,6 +194,136 @@ void SocketPoll::pollingThreadEntry() LOG_INF("Finished polling thread [" << _name << "]."); } +int SocketPoll::ppoll(int64_t timeoutMaxMicroS) +{ + if (_runOnClientThread) + checkAndReThread(); + else + assertCorrectThread(); + + std::chrono::steady_clock::time_point now = + std::chrono::steady_clock::now(); + + // The events to poll on change each spin of the loop. + psetupPollFds(now, timeoutMaxMicroS); + const size_t size = _pollSockets.size(); + + int rc; + do + { +#if !MOBILEAPP +# if HAVE_PPOLL + LOG_TRC("ppoll start, timeoutMicroS: " << timeoutMaxMicroS << " size " << size); + timeoutMaxMicroS = std::max(timeoutMaxMicroS, (int64_t)0); + struct timespec timeout; + timeout.tv_sec = timeoutMaxMicroS / (1000 * 1000); + timeout.tv_nsec = (timeoutMaxMicroS % (1000 * 1000)) * 1000; + rc = ::ppoll(&_pollFds[0], size + 1, &timeout, nullptr); + LOG_TRC("ppoll result " << rc << " errno " << strerror(errno)); +# else + int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000; + LOG_TRC("Legacy Poll start, timeoutMs: " << timeoutMaxMs); + rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); +# endif +#else + LOG_TRC("SocketPoll Poll"); + int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000; + rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); +#endif + } + while (rc < 0 && errno == EINTR); + LOG_TRC("Poll completed with " << rc << " live polls max (" << + timeoutMaxMicroS << "us)" << ((rc==0) ? "(timedout)" : "")); + + // First process the wakeup pipe (always the last entry). + if (_pollFds[size].revents) + { + std::vector<CallbackFn> invoke; + { + std::lock_guard<std::mutex> lock(_mutex); + + // Clear the data. +#if !MOBILEAPP + int dump = ::read(_wakeup[0], &dump, sizeof(dump)); +#else + LOG_TRC("Wakeup pipe read"); + int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump)); +#endif + // Copy the new sockets over and clear. + _pollSockets.insert(_pollSockets.end(), + _newSockets.begin(), _newSockets.end()); + + // Update thread ownership. + for (auto &i : _newSockets) + i->setThreadOwner(std::this_thread::get_id()); + + _newSockets.clear(); + + // Extract list of callbacks to process + std::swap(_newCallbacks, invoke); + } + + for (const auto& callback : invoke) + { + try + { + callback(); + } + catch (const std::exception& exc) + { + LOG_ERR("Exception while invoking poll [" << _name << + "] callback: " << exc.what()); + } + } + + try + { + wakeupHook(); + } + catch (const std::exception& exc) + { + LOG_ERR("Exception while invoking poll [" << _name << + "] wakeup hook: " << exc.what()); + } + } + + // This should only happen when we're stopping. + if (_pollSockets.size() != size) + return rc; + + // Fire the poll callbacks and remove dead fds. + std::chrono::steady_clock::time_point newNow = + std::chrono::steady_clock::now(); + + for (int i = static_cast<int>(size) - 1; i >= 0; --i) + { + SocketDisposition disposition(_pollSockets[i]); + try + { + _pollSockets[i]->handlePoll(disposition, newNow, + _pollFds[i].revents); + } + catch (const std::exception& exc) + { + LOG_ERR("Error while handling poll for socket #" << + _pollFds[i].fd << " in " << _name << ": " << exc.what()); + disposition.setClosed(); + rc = -1; + } + + if (disposition.isMove() || disposition.isClosed()) + { + LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " << + _pollSockets.size() << ") from " << _name); + _pollSockets.erase(_pollSockets.begin() + i); + } + + disposition.execute(); + } + + return rc; +} + void SocketPoll::wakeupWorld() { for (const auto& fd : getWakeupsArray()) @@ -384,8 +514,8 @@ void SocketDisposition::execute() _socketMove = nullptr; } -const int WebSocketHandler::InitialPingDelayMs = 25; -const int WebSocketHandler::PingFrequencyMs = 18 * 1000; +const int WebSocketHandler::InitialPingDelayMicroS = 25 * 1000; +const int WebSocketHandler::PingFrequencyMicroS = 18 * 1000 * 1000; void WebSocketHandler::dumpState(std::ostream& os) { @@ -398,8 +528,8 @@ void WebSocketHandler::dumpState(std::ostream& os) void StreamSocket::dumpState(std::ostream& os) { - int timeoutMaxMs = SocketPoll::DefaultPollTimeoutMs; - int events = getPollEvents(std::chrono::steady_clock::now(), timeoutMaxMs); + int64_t timeoutMaxMicroS = SocketPoll::DefaultPollTimeoutMicroS; + int events = pgetPollEvents(std::chrono::steady_clock::now(), timeoutMaxMicroS); os << "\t" << getFD() << "\t" << events << "\t" << _inBuffer.size() << "\t" << _outBuffer.size() << "\t" << " r: " << _bytesRecvd << "\t w: " << _bytesSent << "\t" diff --git a/net/Socket.hpp b/net/Socket.hpp index ab56e5d10..852012424 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -154,8 +154,8 @@ public: /// Prepare our poll record; adjust @timeoutMaxMs downwards /// for timeouts, based on current time @now. /// @returns POLLIN and POLLOUT if output is expected. - virtual int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) = 0; + virtual int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) = 0; /// Handle results of events returned from poll virtual void handlePoll(SocketDisposition &disposition, @@ -370,8 +370,8 @@ public: /// Prepare our poll record; adjust @timeoutMaxMs downwards /// for timeouts, based on current time @now. /// @returns POLLIN and POLLOUT if output is expected. - virtual int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) = 0; + virtual int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) = 0; /// Do we need to handle a timeout ? virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {} @@ -476,7 +476,7 @@ public: ~SocketPoll(); /// Default poll time - useful to increase for debugging. - static int DefaultPollTimeoutMs; + static int DefaultPollTimeoutMicroS; static std::atomic<bool> InhibitThreadChecks; /// Stop the polling thread. @@ -533,7 +533,7 @@ public: { while (continuePolling()) { - poll(DefaultPollTimeoutMs); + ppoll(DefaultPollTimeoutMicroS); } } @@ -576,123 +576,7 @@ public: /// Poll the sockets for available data to read or buffer to write. /// Returns the return-value of poll(2): 0 on timeout, /// -1 for error, and otherwise the number of events signalled. - int poll(int timeoutMaxMs) - { - if (_runOnClientThread) - checkAndReThread(); - else - assertCorrectThread(); - - std::chrono::steady_clock::time_point now = - std::chrono::steady_clock::now(); - - // The events to poll on change each spin of the loop. - setupPollFds(now, timeoutMaxMs); - const size_t size = _pollSockets.size(); - - int rc; - do - { -#if !MOBILEAPP - LOG_TRC("Poll start, timeoutMs: " << timeoutMaxMs); - rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); -#else - LOG_TRC("SocketPoll Poll"); - rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); -#endif - } - while (rc < 0 && errno == EINTR); - LOG_TRC("Poll completed with " << rc << " live polls max (" << - timeoutMaxMs << "ms)" << ((rc==0) ? "(timedout)" : "")); - - // First process the wakeup pipe (always the last entry). - if (_pollFds[size].revents) - { - std::vector<CallbackFn> invoke; - { - std::lock_guard<std::mutex> lock(_mutex); - - // Clear the data. -#if !MOBILEAPP - int dump = ::read(_wakeup[0], &dump, sizeof(dump)); -#else - LOG_TRC("Wakeup pipe read"); - int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump)); -#endif - // Copy the new sockets over and clear. - _pollSockets.insert(_pollSockets.end(), - _newSockets.begin(), _newSockets.end()); - - // Update thread ownership. - for (auto &i : _newSockets) - i->setThreadOwner(std::this_thread::get_id()); - - _newSockets.clear(); - - // Extract list of callbacks to process - std::swap(_newCallbacks, invoke); - } - - for (const auto& callback : invoke) - { - try - { - callback(); - } - catch (const std::exception& exc) - { - LOG_ERR("Exception while invoking poll [" << _name << - "] callback: " << exc.what()); - } - } - - try - { - wakeupHook(); - } - catch (const std::exception& exc) - { - LOG_ERR("Exception while invoking poll [" << _name << - "] wakeup hook: " << exc.what()); - } - } - - // This should only happen when we're stopping. - if (_pollSockets.size() != size) - return rc; - - // Fire the poll callbacks and remove dead fds. - std::chrono::steady_clock::time_point newNow = - std::chrono::steady_clock::now(); - - for (int i = static_cast<int>(size) - 1; i >= 0; --i) - { - SocketDisposition disposition(_pollSockets[i]); - try - { - _pollSockets[i]->handlePoll(disposition, newNow, - _pollFds[i].revents); - } - catch (const std::exception& exc) - { - LOG_ERR("Error while handling poll for socket #" << - _pollFds[i].fd << " in " << _name << ": " << exc.what()); - disposition.setClosed(); - rc = -1; - } - - if (disposition.isMove() || disposition.isClosed()) - { - LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " << - _pollSockets.size() << ") from " << _name); - _pollSockets.erase(_pollSockets.begin() + i); - } - - disposition.execute(); - } - - return rc; - } + int ppoll(int64_t timeoutMaxMicroS); /// Write to a wakeup descriptor static void wakeup (int fd) @@ -811,8 +695,8 @@ private: const std::string &pathAndQuery); /// Initialize the poll fds array with the right events - void setupPollFds(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) + void psetupPollFds(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) { const size_t size = _pollSockets.size(); @@ -820,7 +704,7 @@ private: for (size_t i = 0; i < size; ++i) { - int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs); + int events = _pollSockets[i]->pgetPollEvents(now, timeoutMaxMicroS); assert(events >= 0); // Or > 0 even? _pollFds[i].fd = _pollSockets[i]->getFD(); _pollFds[i].events = events; @@ -920,12 +804,12 @@ public: Socket::shutdown(); } - int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) override { // cf. SslSocket::getPollEvents assertCorrectThread(); - int events = _socketHandler->getPollEvents(now, timeoutMaxMs); + int events = _socketHandler->pgetPollEvents(now, timeoutMaxMicroS); if (!_outBuffer.empty() || _shutdownSignalled) events |= POLLOUT; return events; diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp index 27e075328..e6b7f908f 100644 --- a/net/SslSocket.hpp +++ b/net/SslSocket.hpp @@ -126,11 +126,11 @@ public: return handleSslState(SSL_write(_ssl, buf, len)); } - int getPollEvents(std::chrono::steady_clock::time_point now, - int & timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t & timeoutMaxMicroS) override { assertCorrectThread(); - int events = getSocketHandler()->getPollEvents(now, timeoutMaxMs); + int events = getSocketHandler()->pgetPollEvents(now, timeoutMaxMicroS); if (_sslWantsTo == SslWantsTo::Read) { diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index b23c3951f..24c3a839a 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -48,8 +48,8 @@ protected: static const unsigned char Mask = 0x80; }; - static const int InitialPingDelayMs; - static const int PingFrequencyMs; + static const int InitialPingDelayMicroS; + static const int PingFrequencyMicroS; public: /// Perform upgrade ourselves, or select a client web socket. @@ -81,8 +81,8 @@ public: const Poco::Net::HTTPRequest& request) : _socket(socket) , _lastPingSentTime(std::chrono::steady_clock::now() - - std::chrono::milliseconds(PingFrequencyMs) - - std::chrono::milliseconds(InitialPingDelayMs)) + std::chrono::microseconds(PingFrequencyMicroS) - + std::chrono::microseconds(InitialPingDelayMicroS)) , _pingTimeUs(0) , _shuttingDown(false) , _isClient(false) @@ -430,14 +430,14 @@ public: } } - int getPollEvents(std::chrono::steady_clock::time_point now, - int & timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t & timeoutMaxMicroS) override { if (!_isClient) { - const int timeSincePingMs = - std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count(); - timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs); + const int64_t timeSincePingMicroS = + std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count(); + timeoutMaxMicroS = std::min(timeoutMaxMicroS, PingFrequencyMicroS - timeSincePingMicroS); } int events = POLLIN; if (_msgHandler && _msgHandler->hasQueuedMessages()) @@ -493,9 +493,9 @@ private: if (_isClient) return; - const int timeSincePingMs = - std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count(); - if (timeSincePingMs >= PingFrequencyMs) + const int64_t timeSincePingMicroS = + std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count(); + if (timeSincePingMicroS >= PingFrequencyMicroS) { const std::shared_ptr<StreamSocket> socket = _socket.lock(); if (socket) diff --git a/test/UnitWOPIDocumentConflict.cpp b/test/UnitWOPIDocumentConflict.cpp index b1aed94bc..de6b2d97f 100644 --- a/test/UnitWOPIDocumentConflict.cpp +++ b/test/UnitWOPIDocumentConflict.cpp @@ -98,7 +98,7 @@ public: // ModifiedStatus=true is a bit slow; let's sleep and hope that // it is received before we wake up - std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS)); + std::this_thread::sleep_for(std::chrono::microseconds(POLL_TIMEOUT_MICRO_S)); // change the document underneath, in storage setFileContent("Modified content in storage"); diff --git a/test/helpers.hpp b/test/helpers.hpp index 1b831a289..748e5572d 100644 --- a/test/helpers.hpp +++ b/test/helpers.hpp @@ -275,7 +275,7 @@ int getErrorCode(LOOLWebSocket& ws, std::string& message, const std::string& tes { bytes = ws.receiveFrame(buffer.begin(), READ_BUFFER_SIZE, flags); TST_LOG("Got " << LOOLProtocol::getAbbreviatedFrameDump(buffer.begin(), bytes, flags)); - std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS)); + std::this_thread::sleep_for(std::chrono::microseconds(POLL_TIMEOUT_MICRO_S)); } while (bytes > 0 && (flags & Poco::Net::WebSocket::FRAME_OP_BITMASK) != Poco::Net::WebSocket::FRAME_OP_CLOSE); @@ -463,7 +463,7 @@ connectLOKit(const Poco::URI& uri, TST_LOG("Connection problem: " << ex.what()); } - std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS)); + std::this_thread::sleep_for(std::chrono::microseconds(POLL_TIMEOUT_MICRO_S)); } while (retries--); diff --git a/tools/WebSocketDump.cpp b/tools/WebSocketDump.cpp index c699a8fed..822a256e8 100644 --- a/tools/WebSocketDump.cpp +++ b/tools/WebSocketDump.cpp @@ -175,8 +175,8 @@ private: in.erase(in.begin(), itBody); } - int getPollEvents(std::chrono::steady_clock::time_point /* now */, - int & /* timeoutMaxMs */) override + int pgetPollEvents(std::chrono::steady_clock::time_point /* now */, + int64_t & /* timeoutMaxMicroS */) override { return POLLIN; } @@ -290,7 +290,7 @@ int main (int argc, char **argv) while (true) { - DumpSocketPoll.poll(100 * 1000); + DumpSocketPoll.ppoll(100 * 1000 * 1000); } } diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp index 3fc0b6955..ea905827c 100644 --- a/wsd/Admin.cpp +++ b/wsd/Admin.cpp @@ -24,6 +24,7 @@ #include <Common.hpp> #include "FileServer.hpp" #include <IoUtil.hpp> +#include "LOOLWSD.hpp" #include <Log.hpp> #include <Protocol.hpp> #include "Storage.hpp" @@ -157,24 +158,11 @@ void AdminSocketHandler::handleMessage(const std::vector<char> &payload) { const int pid = std::stoi(tokens[1]); LOG_INF("Admin request to kill PID: " << pid); - - std::set<pid_t> pids = model.getDocumentPids(); - if (pids.find(pid) != pids.end()) - { - SigUtil::killChild(pid); - } - else - { - LOG_WRN("Invalid PID to kill (not a document pid)"); - } + SigUtil::killChild(pid); } catch (std::invalid_argument& exc) { - LOG_WRN("Invalid PID to kill (invalid argument): " << tokens[1]); - } - catch (std::out_of_range& exc) - { - LOG_WRN("Invalid PID to kill (out of range): " << tokens[1]); + LOG_WRN("Invalid PID to kill: " << tokens[1]); } } else if (tokens.equals(0, "settings")) @@ -295,11 +283,7 @@ AdminSocketHandler::AdminSocketHandler(Admin* adminManager) void AdminSocketHandler::sendTextFrame(const std::string& message) { - if (!Util::isFuzzing()) - { - UnitWSD::get().onAdminQueryMessage(message); - } - + UnitWSD::get().onAdminQueryMessage(message); if (_isAuthenticated) { LOG_TRC("send admin text frame '" << message << "'"); @@ -360,6 +344,7 @@ Admin::Admin() : SocketPoll("admin"), _model(AdminModel()), _forKitPid(-1), + _forKitWritePipe(-1), _lastTotalMemory(0), _lastJiffies(0), _lastSentCount(0), @@ -470,7 +455,7 @@ void Admin::pollingThread() // Handle websockets & other work. const int timeout = capAndRoundInterval(std::min(std::min(cpuWait, memWait), netWait)); LOG_TRC("Admin poll for " << timeout << "ms."); - poll(timeout); + ppoll(timeout * 1000); // continue with ms for admin, settings etc. } } @@ -606,7 +591,10 @@ void Admin::notifyForkit() << "setconfig limit_file_size_mb " << _defDocProcSettings.getLimitFileSizeMb() << '\n' << "setconfig limit_num_open_files " << _defDocProcSettings.getLimitNumberOpenFiles() << '\n'; - LOOLWSD::sendMessageToForKit(oss.str()); + if (_forKitWritePipe != -1) + IoUtil::writeToPipe(_forKitWritePipe, oss.str()); + else + LOG_INF("Forkit write pipe not set (yet)."); } void Admin::triggerMemoryCleanup(const size_t totalMem) @@ -675,8 +663,8 @@ public: _uri(uri) { } - int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) override { if (_connecting) { @@ -684,7 +672,7 @@ public: return POLLOUT; } else - return AdminSocketHandler::getPollEvents(now, timeoutMaxMs); + return AdminSocketHandler::pgetPollEvents(now, timeoutMaxMicroS); } void performWrites() override diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp index 6287d38bc..a418040c6 100644 --- a/wsd/Admin.hpp +++ b/wsd/Admin.hpp @@ -16,7 +16,6 @@ #include "Log.hpp" #include "net/WebSocketHandler.hpp" -#include "LOOLWSD.hpp" class Admin; @@ -39,13 +38,13 @@ public: static void subscribeAsync(const std::shared_ptr<AdminSocketHandler>& handler); - /// Process incoming websocket messages - void handleMessage(const std::vector<char> &data) override; - private: /// Sends text frames simply to authenticated clients. void sendTextFrame(const std::string& message); + /// Process incoming websocket messages + void handleMessage(const std::vector<char> &data) override; + private: Admin* _admin; int _sessionId; @@ -92,6 +91,7 @@ public: void rmDoc(const std::string& docKey); void setForKitPid(const int forKitPid) { _forKitPid = forKitPid; _model.setForKitPid(forKitPid);} + void setForKitWritePipe(const int forKitWritePipe) { _forKitWritePipe = forKitWritePipe; } /// Callers must ensure that modelMutex is acquired AdminModel& getModel(); @@ -157,6 +157,7 @@ private: /// the Admin Poll thread. AdminModel _model; int _forKitPid; + int _forKitWritePipe; size_t _lastTotalMemory; size_t _lastJiffies; uint64_t _lastSentCount; diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 02f6d2a2b..1d63e813f 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -298,7 +298,7 @@ void DocumentBroker::pollThread() // Main polling loop goodness. while (!_stop && _poll->continuePolling() && !SigUtil::getTerminationFlag()) { - _poll->poll(SocketPoll::DefaultPollTimeoutMs); + _poll->ppoll(SocketPoll::DefaultPollTimeoutMicroS); const auto now = std::chrono::steady_clock::now(); @@ -440,9 +440,9 @@ void DocumentBroker::pollThread() } // Flush socket data first. - constexpr int flushTimeoutMs = POLL_TIMEOUT_MS * 2; // ~1000ms + constexpr int64_t flushTimeoutMicroS = POLL_TIMEOUT_MICRO_S * 2; // ~1000ms LOG_INF("Flushing socket for doc [" - << _docKey << "] for " << flushTimeoutMs << " ms. stop: " << _stop + << _docKey << "] for " << flushTimeoutMicroS << " us. stop: " << _stop << ", continuePolling: " << _poll->continuePolling() << ", ShutdownRequestFlag: " << SigUtil::getShutdownRequestFlag() << ", TerminationFlag: " << SigUtil::getTerminationFlag() @@ -451,11 +451,11 @@ void DocumentBroker::pollThread() while (_poll->getSocketCount()) { const auto now = std::chrono::steady_clock::now(); - const int elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - flushStartTime).count(); - if (elapsedMs > flushTimeoutMs) + const int64_t elapsedMicroS = std::chrono::duration_cast<std::chrono::microseconds>(now - flushStartTime).count(); + if (elapsedMicroS > flushTimeoutMicroS) break; - _poll->poll(std::min(flushTimeoutMs - elapsedMs, POLL_TIMEOUT_MS / 5)); + _poll->ppoll(std::min(flushTimeoutMicroS - elapsedMicroS, (int64_t)POLL_TIMEOUT_MICRO_S / 5)); } LOG_INF("Finished flushing socket for doc [" << _docKey << "]. stop: " << _stop << ", continuePolling: " << diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index c732169c7..1d014927a 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -2061,8 +2061,8 @@ private: " has no DocumentBroker to handle message: [" << abbr << "]."); } - int getPollEvents(std::chrono::steady_clock::time_point /* now */, - int & /* timeoutMaxMs */) override + int pgetPollEvents(std::chrono::steady_clock::time_point /* now */, + int64_t & /* timeoutMaxMs */) override { return POLLIN; } @@ -2393,8 +2393,8 @@ private: #endif } - int getPollEvents(std::chrono::steady_clock::time_point /* now */, - int & /* timeoutMaxMs */) override + int pgetPollEvents(std::chrono::steady_clock::time_point /* now */, + int64_t & /* timeoutMaxMs */) override { return POLLIN; } @@ -3577,10 +3577,10 @@ int LOOLWSD::innerMain() UnitWSD::get().invokeTest(); // This timeout affects the recovery time of prespawned children. - const int msWait = UnitWSD::isUnitTesting() ? - UnitWSD::get().getTimeoutMilliSeconds() / 4 : - SocketPoll::DefaultPollTimeoutMs * 4; - mainWait.poll(msWait); + const int waitMicroS = UnitWSD::isUnitTesting() ? + UnitWSD::get().getTimeoutMilliSeconds() * 1000 / 4 : + SocketPoll::DefaultPollTimeoutMicroS * 4; + mainWait.ppoll(waitMicroS); // Wake the prisoner poll to spawn some children, if necessary. PrisonerPoll.wakeup(); _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits