loolwsd/DocumentBroker.cpp | 17 +++++++ loolwsd/DocumentBroker.hpp | 2 loolwsd/LOOLWSD.cpp | 106 ++++++++++++--------------------------------- 3 files changed, 48 insertions(+), 77 deletions(-)
New commits: commit 4f7b911066bd29d5901b2724b85aae77258b73eb Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Apr 24 22:09:13 2016 -0400 loolwsd: simplified the bridging between client and prisoner sessions Change-Id: I1335060963eda3356312f42060da229f43d239d8 Reviewed-on: https://gerrit.libreoffice.org/24358 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/DocumentBroker.cpp b/loolwsd/DocumentBroker.cpp index 31de15e..fc7d887 100644 --- a/loolwsd/DocumentBroker.cpp +++ b/loolwsd/DocumentBroker.cpp @@ -292,6 +292,23 @@ size_t DocumentBroker::addSession(std::shared_ptr<MasterProcessSession>& session return _sessions.size(); } +bool DocumentBroker::connectPeers(std::shared_ptr<MasterProcessSession>& session) +{ + const auto id = session->getId(); + + std::lock_guard<std::mutex> lock(_mutex); + + auto it = _sessions.find(id); + if (it != _sessions.end()) + { + it->second->setPeer(session); + session->setPeer(it->second); + return true; + } + + return false; +} + size_t DocumentBroker::removeSession(const std::string& id) { std::lock_guard<std::mutex> lock(_mutex); diff --git a/loolwsd/DocumentBroker.hpp b/loolwsd/DocumentBroker.hpp index dcf2d2a..5a5298f 100644 --- a/loolwsd/DocumentBroker.hpp +++ b/loolwsd/DocumentBroker.hpp @@ -194,6 +194,8 @@ public: /// Add a new session. Returns the new number of sessions. size_t addSession(std::shared_ptr<MasterProcessSession>& session); + /// Connect a prison session to its client peer. + bool connectPeers(std::shared_ptr<MasterProcessSession>& session); /// Removes a session by ID. Returns the new number of sessions. size_t removeSession(const std::string& id); diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 675444e..068f25c 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -271,51 +271,28 @@ public: class ClientRequestHandler: public HTTPRequestHandler { private: - - static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& clientSession, - const std::shared_ptr<DocumentBroker>& docBroker) + static void waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& session) { - int retries = 5; bool isFound = false; - - // Wait until the client has connected with a prison socket. - std::shared_ptr<MasterProcessSession> prisonSession; std::unique_lock<std::mutex> lock(AvailableChildSessionMutex); + Log::debug() << "Waiting for client session [" << session->getId() << "] to connect." << Log::end; + AvailableChildSessionCV.wait_for( + lock, + std::chrono::milliseconds(COMMAND_TIMEOUT_MS), + [&isFound, &session] + { + return (isFound = AvailableChildSessions.find(session->getId()) != AvailableChildSessions.end()); + }); - Log::debug() << "Waiting for client session [" << clientSession->getId() << "] to connect." << Log::end; - while (!TerminationFlag && retries-- && !isFound) - { - AvailableChildSessionCV.wait_for( - lock, - std::chrono::milliseconds(COMMAND_TIMEOUT_MS), - [&isFound, &clientSession] - { - return (isFound = AvailableChildSessions.find(clientSession->getId()) != AvailableChildSessions.end()); - }); - - if (!isFound) - { - //FIXME: outdated! - Log::info() << "Retrying client permission... " << retries << Log::end; - // request again new URL session - const std::string message = "request " + clientSession->getId() + " " + docBroker->getDocKey() + '\n'; - Log::trace("MasterToBroker: " + message.substr(0, message.length()-1)); - IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message); - } - } - - if (isFound) + if (!isFound) { - Log::debug("Waiting child session permission, done!"); - prisonSession = AvailableChildSessions[clientSession->getId()]; - AvailableChildSessions.erase(clientSession->getId()); - - clientSession->setPeer(prisonSession); - prisonSession->setPeer(clientSession); - Log::debug("Connected " + clientSession->getName() + " - " + prisonSession->getName() + "."); + // Let the client know we can't serve now. + Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now."); + throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); } - return isFound; + Log::debug("Waiting child session permission, done!"); + AvailableChildSessions.erase(session->getId()); } /// Handle POST requests. @@ -362,15 +339,16 @@ private: // Load the document. std::shared_ptr<WebSocket> ws; auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, nullptr); + + // Request the child to connect to us and add this session. auto sessionsCount = docBroker->addSession(session); + Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount)); + lock.unlock(); Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount)); - if (!waitBridgeCompleted(session, docBroker)) - { - // Let the client know we can't serve now. - throw std::runtime_error("Failed to connect to lokit child."); - } + // Wait until the client has connected with a prison socket. + waitBridgeCompleted(session); // Now the bridge between the client and kit processes is connected // Let messages flow @@ -588,6 +566,8 @@ private: // thread to pump them. This is to empty the queue when we get a "canceltiles" message. auto queue = std::make_shared<BasicTileQueue>(); session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, queue); + + // Request the child to connect to us and add this session. const auto sessionsCount = docBroker->addSession(session); Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount)); @@ -595,15 +575,11 @@ private: status = "statusindicator: connect"; ws->sendFrame(status.data(), (int) status.size()); - if (!waitBridgeCompleted(session, docBroker)) - { - // Let the client know we can't serve now. - Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now."); - throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); - } - + // Wait until the client has connected with a prison socket. + waitBridgeCompleted(session); // Now the bridge beetween the client and kit process is connected // Let messages flow + status = "statusindicator: ready"; ws->sendFrame(status.data(), (int) status.size()); @@ -826,24 +802,6 @@ class PrisonerRequestHandler: public HTTPRequestHandler { public: - static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& prisonSession) - { - // time to live, if the kit process cannot connect to a client session. - int ttl = 180; - bool isFound = true; - // Wait until the prison has connected with a client socket. - Log::debug() << "Waiting for prison session [" << prisonSession->getId() << "] to connect." << Log::end; - while (!TerminationFlag && - (isFound = AvailableChildSessions.find(prisonSession->getId()) != AvailableChildSessions.end()) && - ttl--) - { - std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS)); - Log::debug() << "Sleeping prison session [" << prisonSession->getId() << "] to connect." << Log::end; - } - - return isFound; - } - void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override { if (UnitWSD::get().filterHandleRequest( @@ -959,6 +917,9 @@ public: auto ws = std::make_shared<WebSocket>(request, response); auto session = std::make_shared<MasterProcessSession>(sessionId, LOOLSession::Kind::ToPrisoner, ws, docBroker, nullptr); + // Connect the prison session to the client. + docBroker->connectPeers(session); + std::unique_lock<std::mutex> lock(AvailableChildSessionMutex); AvailableChildSessions.emplace(sessionId, session); @@ -971,15 +932,6 @@ public: Log::info("Adding doc " + docKey + " to Admin"); Admin::instance().addDoc(docKey, pid, docBroker->getFilename(), sessionId); - if (waitBridgeCompleted(session)) - { - ws->shutdown(); - throw WebSocketException("Failed to connect to client session", WebSocket::WS_ENDPOINT_GOING_AWAY); - } - Log::debug("Connected " + session->getName() + "."); - // Now the bridge beetween the prison and the client is connected - // Let messages flow - UnitWSD::get().onChildConnected(pid, sessionId); IoUtil::SocketProcessor(ws, _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits