loolwsd/DocumentBroker.cpp | 9 +++ loolwsd/DocumentBroker.hpp | 81 ++++++++++++++++++++++++++++++++++- loolwsd/LOOLKit.cpp | 68 +++++++++++++++++++++++++++++ loolwsd/LOOLWSD.cpp | 103 +++++++++++---------------------------------- 4 files changed, 182 insertions(+), 79 deletions(-)
New commits: commit d5e2f647901bf6b764afdae4c6b2922527718b7f Author: Ashod Nakashian <[email protected]> Date: Sun Apr 3 21:40:14 2016 -0400 loolwsd: WSD <-> Child direct communication WSD now communicates on a WebSocket directly with kit processes. ChildProcess encapsulates kit processes and the control WS, which itself is owned by DocumentBroker. Change-Id: Ica209aaa07974739b8e51a14e11325d084e193f6 Reviewed-on: https://gerrit.libreoffice.org/23789 Reviewed-by: Ashod Nakashian <[email protected]> Tested-by: Ashod Nakashian <[email protected]> diff --git a/loolwsd/DocumentBroker.cpp b/loolwsd/DocumentBroker.cpp index c36067e..393ba9a 100644 --- a/loolwsd/DocumentBroker.cpp +++ b/loolwsd/DocumentBroker.cpp @@ -64,11 +64,13 @@ std::string DocumentBroker::getDocKey(const Poco::URI& uri) DocumentBroker::DocumentBroker(const Poco::URI& uriPublic, const std::string& docKey, - const std::string& childRoot) : + const std::string& childRoot, + std::shared_ptr<ChildProcess> childProcess) : _uriPublic(uriPublic), _docKey(docKey), _childRoot(childRoot), _cacheRoot(getCachePath(uriPublic.toString())), + _childProcess(childProcess), _sessionsCount(0) { assert(!_docKey.empty()); @@ -163,6 +165,11 @@ void DocumentBroker::addWSSession(const std::string id, std::shared_ptr<MasterPr { Log::warn("DocumentBroker: Trying to add already existed session."); } + + // Request a new session from the child kit. + const std::string aMessage = "session " + id + " " + _docKey + "\n"; + Log::debug("DocBroker to Child: " + aMessage.substr(0, aMessage.length() - 1)); + _childProcess->getWebSocket()->sendFrame(aMessage.data(), aMessage.size()); } void DocumentBroker::removeWSSession(const std::string id) diff --git a/loolwsd/DocumentBroker.hpp b/loolwsd/DocumentBroker.hpp index 3d848de..458090f 100644 --- a/loolwsd/DocumentBroker.hpp +++ b/loolwsd/DocumentBroker.hpp @@ -10,6 +10,8 @@ #ifndef INCLUDED_DOCUMENTBROKER_HPP #define INCLUDED_DOCUMENTBROKER_HPP +#include <signal.h> + #include <atomic> #include <memory> #include <mutex> @@ -25,6 +27,81 @@ class StorageBase; class TileCache; +/// Represents a new LOK child that is read +/// to host a document. +class ChildProcess +{ +public: + ChildProcess() : + _pid(-1) + { + } + + /// pid is the process ID of the child. + /// ws is the control WebSocket to the child. + ChildProcess(const Poco::Process::PID pid, const std::shared_ptr<Poco::Net::WebSocket>& ws) : + _pid(pid), + _ws(ws) + { + Log::info("ChildProcess ctor [" + std::to_string(_pid) + "]."); + } + + ChildProcess(ChildProcess&& other) : + _pid(other._pid), + _ws(other._ws) + { + Log::info("ChildProcess move ctor [" + std::to_string(_pid) + "]."); + other._pid = -1; + other._ws.reset(); + } + + const ChildProcess& operator=(ChildProcess&& other) + { + Log::info("ChildProcess assign [" + std::to_string(_pid) + "]."); + _pid = other._pid; + other._pid = -1; + _ws = other._ws; + other._ws.reset(); + + return *this; + } + + ~ChildProcess() + { + Log::info("~ChildProcess dtor [" + std::to_string(_pid) + "]."); + close(true); + } + + void close(const bool rude) + { + Log::info("Closing child [" + std::to_string(_pid) + "]."); + if (_pid != -1) + { + if (kill(_pid, SIGINT) != 0 && rude && kill(_pid, 0) != 0) + { + Log::error("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning."); + } + + //TODO: Notify Admin. + std::ostringstream message; + message << "rmdoc" << " " + << _pid << " " + << "\n"; + //IoUtil::writeFIFO(WriterNotify, message.str()); + _pid = -1; + } + + _ws.reset(); + } + + Poco::Process::PID getPid() const { return _pid; } + std::shared_ptr<Poco::Net::WebSocket> getWebSocket() const { return _ws; } + +private: + Poco::Process::PID _pid; + std::shared_ptr<Poco::Net::WebSocket> _ws; +}; + /// DocumentBroker is responsible for setting up a document /// in jail and brokering loading it from Storage /// and saving it back. @@ -43,7 +120,8 @@ public: DocumentBroker(const Poco::URI& uriPublic, const std::string& docKey, - const std::string& childRoot); + const std::string& childRoot, + std::shared_ptr<ChildProcess> childProcess); ~DocumentBroker() { @@ -94,6 +172,7 @@ private: std::string _filename; std::unique_ptr<StorageBase> _storage; std::unique_ptr<TileCache> _tileCache; + std::shared_ptr<ChildProcess> _childProcess; std::mutex _mutex; std::atomic<unsigned> _sessionsCount; }; diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index cdf2ca0..149ddff 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -1016,7 +1016,75 @@ void lokit_main(const std::string& childRoot, Log::info("loolkit [" + std::to_string(Process::id()) + "] is ready."); if (doBenchmark) + { IoUtil::writeFIFO(writerBroker, "started\n"); + } + + // Open websocket connection between the child process and WSD. + Poco::Net::HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER); + cs.setTimeout(0); + HTTPRequest request(HTTPRequest::HTTP_GET, std::string(NEW_CHILD_URI) + "pid=" + pid); + Poco::Net::HTTPResponse response; + auto ws = std::make_shared<WebSocket>(cs, request, response); + ws->setReceiveTimeout(0); + + const std::string socketName = "ChildControllerWS"; + IoUtil::SocketProcessor(ws, response, [&socketName, &ws, &document, &loKit](const std::vector<char>& data) + { + const std::string message(data.data(), data.size()); + Log::debug(socketName + ": recv [" + message + "]."); + StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + auto response = std::to_string(Process::id()) + " "; + + if (TerminationFlag) + { + // Too late, we're going down. + response += "down\n"; + } + else if (tokens[0] == "session") + { + const std::string& sessionId = tokens[1]; + const unsigned intSessionId = Util::decodeId(sessionId); + const std::string& docKey = tokens[2]; + + std::string url; + Poco::URI::decode(docKey, url); + Log::info("New session [" + sessionId + "] request on url [" + url + "]."); + + if (!document) + { + document = std::make_shared<Document>(loKit, jailId, docKey, url); + } + + // Validate and create session. + if (url == document->getUrl() && + document->createSession(sessionId, intSessionId)) + { + response += "ok\n"; + } + else + { + response += "bad\n"; + } + } + else if (document && document->canDiscard()) + { + TerminationFlag = true; + response += "down\n"; + } + else + { + response += "bad unknown token [" + tokens[0] + "]\n"; + } + + //FIXME: Do we really need to respond here? + Log::trace("KitToDocBroker: " + response.substr(0, response.length()-2)); + ws->sendFrame(response.data(), response.size()); + + return true; + }, + [](){ return TerminationFlag; }, + socketName); char buffer[READ_BUFFER_SIZE]; std::string message; diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index ded9089..0332e48 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -173,76 +173,6 @@ using Poco::XML::InputSource; using Poco::XML::Node; using Poco::XML::NodeList; -/// Represents a new LOK child that is read -/// to host a document. -class ChildProcess -{ -public: - ChildProcess() : - _pid(-1) - { - } - - /// pid is the process ID of the child. - /// ws is the control WebSocket to the child. - ChildProcess(const Poco::Process::PID pid, const std::shared_ptr<Poco::Net::WebSocket>& ws) : - _pid(pid), - _ws(ws) - { - } - - ChildProcess(ChildProcess&& other) : - _pid(other._pid), - _ws(other._ws) - { - other._pid = -1; - other._ws.reset(); - } - - const ChildProcess& operator=(ChildProcess&& other) - { - _pid = other._pid; - other._pid = -1; - _ws = other._ws; - other._ws.reset(); - - return *this; - } - - ~ChildProcess() - { - close(true); - } - - void close(const bool rude) - { - if (_pid != -1) - { - if (kill(_pid, SIGINT) != 0 && rude && kill(_pid, 0) != 0) - { - Log::error("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning."); - } - - //TODO: Notify Admin. - std::ostringstream message; - message << "rmdoc" << " " - << _pid << " " - << "\n"; - //IoUtil::writeFIFO(WriterNotify, message.str()); - _pid = -1; - } - - _ws.reset(); - } - - Poco::Process::PID getPid() const { return _pid; } - std::shared_ptr<Poco::Net::WebSocket> getWebSocket() const { return _ws; } - -private: - Poco::Process::PID _pid; - std::shared_ptr<Poco::Net::WebSocket> _ws; -}; - /// New LOK child processes ready to host documents. static std::vector<std::shared_ptr<ChildProcess>> newChilds; static std::mutex newChildsMutex; @@ -337,9 +267,21 @@ private: if (!format.empty()) { Log::info("Conversion request for URI [" + fromPath + "]."); + + // Request a kit process for this doc. + auto child = getNewChild(); + if (!child) + { + // Let the client know we can't serve now. + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE); + response.setContentLength(0); + response.send(); + return; + } + auto uriPublic = DocumentBroker::sanitizeURI(fromPath); const auto docKey = DocumentBroker::getDocKey(uriPublic); - auto docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot); + auto docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child); // This lock could become a bottleneck. // In that case, we can use a pool and index by publicPath. @@ -517,9 +459,20 @@ private: } else { + // Request a kit process for this doc. + auto child = getNewChild(); + if (!child) + { + // Let the client know we can't serve now. + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE); + response.setContentLength(0); + response.send(); + return; + } + // Set one we just created. Log::debug("New DocumentBroker for docKey [" + docKey + "]."); - docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot); + docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child); docBrokers.emplace(docKey, docBroker); } @@ -543,11 +496,6 @@ private: if (wsSessionsCount == 1) session->setEditLock(true); - // Request a kit process for this doc. - const std::string aMessage = "request " + id + " " + docKey + "\n"; - Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1)); - IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage); - QueueHandler handler(queue, session, "wsd_queue_" + session->getId()); Thread queueHandlerThread; @@ -722,6 +670,7 @@ public: auto ws = std::make_shared<WebSocket>(request, response); std::unique_lock<std::mutex> lock(newChildsMutex); newChilds.emplace_back(std::make_shared<ChildProcess>(pid, ws)); + Log::info("Have " + std::to_string(newChilds.size()) + " childs."); return; } _______________________________________________ Libreoffice-commits mailing list [email protected] https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits
