wsd/ClientSession.cpp | 44 ++++++++++++++++ wsd/ClientSession.hpp | 39 ++++++++++++++ wsd/DocumentBroker.cpp | 20 +++---- wsd/PrisonerSession.cpp | 1 wsd/SenderQueue.hpp | 126 ++++++++++++++++++++++++++++++++++++++++++++++++ wsd/TileCache.cpp | 73 ++++++++++++++------------- 6 files changed, 255 insertions(+), 48 deletions(-)
New commits: commit 2de26e9ba46b0f3971223b8f9e530b3e51bdd644 Author: Ashod Nakashian <[email protected]> Date: Tue Dec 13 21:59:01 2016 -0500 loolwsd: per-socket dedicated sending thread To avoid degrading performance for everyone because of a single slow/bad connection, we send data to clients each in its own thread. Change-Id: I6f980c25a404c4d05bcdb1979849ea3d2776c7b9 Reviewed-on: https://gerrit.libreoffice.org/31981 Reviewed-by: Michael Meeks <[email protected]> Tested-by: Michael Meeks <[email protected]> diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp index 0bd20fc..202114f 100644 --- a/wsd/ClientSession.cpp +++ b/wsd/ClientSession.cpp @@ -44,9 +44,12 @@ ClientSession::ClientSession(const std::string& id, _uriPublic(uriPublic), _isReadOnly(readOnly), _isDocumentOwner(false), - _loadPart(-1) + _loadPart(-1), + _stop(false) { Log::info("ClientSession ctor [" + getName() + "]."); + + _senderThread = std::thread([this]{ senderThread(); }); } ClientSession::~ClientSession() @@ -55,6 +58,13 @@ ClientSession::~ClientSession() // Release the save-as queue. _saveAsQueue.put(""); + + stop(); + if (_senderThread.joinable()) + { + _senderThread.join(); + } + } void ClientSession::bridgePrisonerSession() @@ -418,4 +428,36 @@ void ClientSession::setReadOnly() sendTextFrame("perm: readonly"); } +void ClientSession::senderThread() +{ + LOG_DBG(getName() + " SenderThread started"); + + while (!stopping()) + { + std::shared_ptr<MessagePayload> item; + if (_senderQueue.waitDequeue(item, static_cast<size_t>(COMMAND_TIMEOUT_MS))) + { + const std::vector<char>& data = item->data(); + try + { + if (item->isBinary()) + { + LOOLSession::sendBinaryFrame(data.data(), data.size()); + } + else + { + LOOLSession::sendTextFrame(data.data(), data.size()); + } + } + catch (const std::exception& ex) + { + LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) << + "] to " << getName() << ": " << ex.what()); + } + } + } + + LOG_DBG(getName() + " SenderThread finished"); +} + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index 3703593..bb94bff 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -12,6 +12,7 @@ #include "Session.hpp" #include "MessageQueue.hpp" +#include "SenderQueue.hpp" #include <Poco/URI.h> @@ -43,6 +44,38 @@ public: void setDocumentOwner(const bool documentOwner) { _isDocumentOwner = documentOwner; } bool isDocumentOwner() const { return _isDocumentOwner; } + using LOOLSession::sendTextFrame; + + bool sendBinaryFrame(const char* buffer, int length) override + { + auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary); + auto& output = payload->data(); + std::memcpy(output.data(), buffer, length); + enqueueSendMessage(payload); + return true; + } + + bool sendTextFrame(const char* buffer, const int length) override + { + auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text); + auto& output = payload->data(); + std::memcpy(output.data(), buffer, length); + enqueueSendMessage(payload); + return true; + } + + void enqueueSendMessage(const std::shared_ptr<MessagePayload>& data) + { + _senderQueue.enqueue(data); + } + + bool stopping() const { return _stop || _senderQueue.stopping(); } + void stop() + { + _stop = true; + _senderQueue.stop(); + } + /** * Return the URL of the saved-as document when it's ready. If called * before it's ready, the call blocks till then. @@ -91,6 +124,8 @@ private: /// Eg. in readonly mode only few messages should be allowed bool filterMessage(const std::string& msg) const; + void senderThread(); + private: std::weak_ptr<DocumentBroker> _docBroker; @@ -110,6 +145,10 @@ private: MessageQueue _saveAsQueue; int _loadPart; + + SenderQueue<std::shared_ptr<MessagePayload>> _senderQueue; + std::thread _senderThread; + std::atomic<bool> _stop; }; #endif diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 2251cd0..513e12b 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -28,6 +28,7 @@ #include "PrisonerSession.hpp" #include "Storage.hpp" #include "TileCache.hpp" +#include "SenderQueue.hpp" #include "Unit.hpp" using namespace LOOLProtocol; @@ -243,7 +244,7 @@ bool DocumentBroker::load(std::shared_ptr<ClientSession>& session, const std::st assert(_storage != nullptr); - // Call the storage specific file info functions + // Call the storage specific fileinfo functions std::string userid, username; std::chrono::duration<double> getInfoCallDuration(0); if (dynamic_cast<WopiStorage*>(_storage.get()) != nullptr) @@ -616,17 +617,14 @@ void DocumentBroker::alertAllUsers(const std::string& msg) { Util::assertIsLocked(_mutex); + auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text); + auto& output = payload->data(); + std::memcpy(output.data(), msg.data(), msg.size()); + LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg); for (auto& it : _sessions) { - try - { - it.second->sendTextFrame(msg); - } - catch (const std::exception& ex) - { - LOG_ERR("Error while alerting all users [" << msg << "]: " << ex.what()); - } + it.second->enqueueSendMessage(payload); } } @@ -808,7 +806,7 @@ void DocumentBroker::cancelTileRequests(const std::shared_ptr<ClientSession>& se void DocumentBroker::handleTileResponse(const std::vector<char>& payload) { const std::string firstLine = getFirstLine(payload); - LOG_DBG("Handling tile combined: " << firstLine); + LOG_DBG("Handling tile: " << firstLine); try { @@ -959,7 +957,7 @@ bool DocumentBroker::forwardToClient(const std::string& prefix, const std::vecto } else { - LOG_ERR("Failed to parse prefix of forward-to-client message: " << prefix); + LOG_ERR("Unexpected prefix of forward-to-client message: " << prefix); } return false; diff --git a/wsd/PrisonerSession.cpp b/wsd/PrisonerSession.cpp index bbfd902..ae9341e 100644 --- a/wsd/PrisonerSession.cpp +++ b/wsd/PrisonerSession.cpp @@ -23,6 +23,7 @@ #include "Log.hpp" #include "ClientSession.hpp" #include "Rectangle.hpp" +#include "SenderQueue.hpp" #include "Storage.hpp" #include "TileCache.hpp" #include "IoUtil.hpp" diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp new file mode 100644 index 0000000..b1ded81 --- /dev/null +++ b/wsd/SenderQueue.hpp @@ -0,0 +1,126 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef INCLUDED_SENDERQUEUE_HPP +#define INCLUDED_SENDERQUEUE_HPP + +#include <condition_variable> +#include <deque> +#include <memory> +#include <mutex> +#include <vector> + +#include "common/SigUtil.hpp" +#include "LOOLWebSocket.hpp" +#include "Log.hpp" + +/// The payload type used to send/receive data. +class MessagePayload +{ +public: + + enum class Type { Text, Binary }; + + MessagePayload(const size_t size, enum Type type) : + _data(size), + _type(type) + { + } + + std::vector<char>& data() { return _data; } + + /// Returns true if and only if the payload is considered Binary. + bool isBinary() const { return _type == Type::Binary; } + +private: + std::vector<char> _data; + Type _type; +}; + +struct SendItem +{ + std::weak_ptr<LOOLWebSocket> Socket; + std::shared_ptr<MessagePayload> Data; + std::string Meta; + std::chrono::steady_clock::time_point BirthTime; +}; + +/// A queue of data to send to certain Session's WS. +template <typename Item> +class SenderQueue final +{ +public: + + SenderQueue() : + _stop(false) + { + } + + bool stopping() const { return _stop || TerminationFlag; } + void stop() + { + _stop = true; + _cv.notify_all(); + } + + size_t enqueue(const Item& item) + { + std::unique_lock<std::mutex> lock(_mutex); + if (!stopping()) + { + _queue.push_back(item); + } + + const size_t queuesize = _queue.size(); + lock.unlock(); + + _cv.notify_one(); + return queuesize; + } + + bool waitDequeue(Item& item, + const size_t timeoutMs = std::numeric_limits<size_t>::max()) + { + const auto timeToWait = std::chrono::milliseconds(timeoutMs); + + std::unique_lock<std::mutex> lock(_mutex); + + if (!_queue.empty() || + _cv.wait_for(lock, timeToWait, [this](){ return !_queue.empty() || stopping(); })) + { + if (!stopping()) + { + item = _queue.front(); + _queue.pop_front(); + return true; + } + + LOG_DBG("SenderQueue: stopping"); + return false; + } + + return false; + } + + size_t size() const + { + std::lock_guard<std::mutex> lock(_mutex); + return _queue.size(); + } + +private: + mutable std::mutex _mutex; + std::condition_variable _cv; + std::deque<Item> _queue; + std::atomic<bool> _stop; +}; + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp index a6f0b7f..7a536e1 100644 --- a/wsd/TileCache.cpp +++ b/wsd/TileCache.cpp @@ -34,6 +34,7 @@ #include "Common.hpp" #include "common/FileUtil.hpp" #include "Protocol.hpp" +#include "SenderQueue.hpp" #include "Unit.hpp" #include "Util.hpp" @@ -155,8 +156,8 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const const auto cachedName = (tileBeingRendered ? tileBeingRendered->getCacheName() : cacheFileName(tile)); - // Ignore if we can't save the tile, things will work anyway, but slower. An error indication - // has been supposed to be sent to all users in that case. + // Ignore if we can't save the tile, things will work anyway, but slower. + // An error indication is supposed to be sent to all users in that case. const auto fileName = _cacheDir + "/" + cachedName; if (FileUtil::saveDataToFileSafely(fileName, data, size)) { @@ -166,50 +167,50 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const // Notify subscribers, if any. if (tileBeingRendered) { - if (!tileBeingRendered->_subscribers.empty()) + const auto subscriberCount = tileBeingRendered->_subscribers.size(); + if (subscriberCount > 0) { std::string response = tile.serialize("tile:"); - Log::debug("Sending tile message to subscribers: " + response); + LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response); - std::vector<char> output(256 + size); - output.resize(response.size() + 1 + size); + std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(response.size() + 1 + size, + MessagePayload::Type::Binary); + { + auto& output = payload->data(); - std::memcpy(output.data(), response.data(), response.size()); - output[response.size()] = '\n'; - std::memcpy(output.data() + response.size() + 1, data, size); + // Send to first subscriber as-is (without cache marker). + std::memcpy(output.data(), response.data(), response.size()); + output[response.size()] = '\n'; + std::memcpy(output.data() + response.size() + 1, data, size); + } - // Send to first subscriber as-is (without cache marker). - auto firstSubscriber = tileBeingRendered->_subscribers[0].lock(); - if (firstSubscriber) + auto& firstSubscriber = tileBeingRendered->_subscribers[0]; + auto firstSession = firstSubscriber.lock(); + if (firstSession) { - try - { - firstSubscriber->sendBinaryFrame(output.data(), output.size()); - } - catch (const std::exception& ex) - { - Log::warn("Failed to send tile to " + firstSubscriber->getName() + ": " + ex.what()); - } + firstSession->enqueueSendMessage(payload); } - // All others must get served from the cache. - response += " renderid=cached\n"; - output.resize(response.size() + size); - std::memcpy(output.data(), response.data(), response.size()); - std::memcpy(output.data() + response.size(), data, size); - - for (size_t i = 1; i < tileBeingRendered->_subscribers.size(); ++i) + if (subscriberCount > 1) { - auto subscriber = tileBeingRendered->_subscribers[i].lock(); - if (subscriber) + // All others must get served from the cache. + response += " renderid=cached\n"; + + // Create a new Payload. + payload.reset(); + payload = std::make_shared<MessagePayload>(response.size() + size, MessagePayload::Type::Binary); + auto& output = payload->data(); + + std::memcpy(output.data(), response.data(), response.size()); + std::memcpy(output.data() + response.size(), data, size); + + for (size_t i = 1; i < subscriberCount; ++i) { - try - { - subscriber->sendBinaryFrame(output.data(), output.size()); - } - catch (const std::exception& ex) + auto& subscriber = tileBeingRendered->_subscribers[i]; + auto session = subscriber.lock(); + if (session) { - Log::warn("Failed to send tile to " + subscriber->getName() + ": " + ex.what()); + session->enqueueSendMessage(payload); } } } @@ -443,7 +444,7 @@ void TileCache::saveLastModified(const Timestamp& timestamp) } // FIXME: to be further simplified when we centralize tile messages. -void TileCache::subscribeToTileRendering(const TileDesc& tile, const std::shared_ptr<ClientSession> &subscriber) +void TileCache::subscribeToTileRendering(const TileDesc& tile, const std::shared_ptr<ClientSession>& subscriber) { assert(subscriber->getKind() == LOOLSession::Kind::ToClient); _______________________________________________ Libreoffice-commits mailing list [email protected] https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits
