loolwsd/LOOLWSD.cpp | 275 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 161 insertions(+), 114 deletions(-)
New commits: commit c2c41ceb63fee92b9d958434c843a96ea976c7e1 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Tue Jan 5 21:29:12 2016 -0500 loolwsd: refactored HttpRequestHandler Change-Id: Ie785d814aff1d28634c8933511c4a5a4a4f5cebc Reviewed-on: https://gerrit.libreoffice.org/21156 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index cf4658e..5f1ad43 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -240,21 +240,107 @@ public: } }; -/// Handle a WebSocket connection or a simple HTTP request. -class RequestHandler: public HTTPRequestHandler +// Synchronously process WebSocket requests and dispatch to handler. +// Handler returns false to end. +void SocketProcessor(std::shared_ptr<WebSocket> ws, + HTTPServerResponse& response, + std::function<bool(const char* data, const int size)> handler) { -public: - RequestHandler() + const Poco::Timespan waitTime(POLL_TIMEOUT); + try + { + // Loop, receiving WebSocket messages either from the client, or from the child + // process (to be forwarded to the client). + int flags; + int n; + bool pollTimeout = true; + ws->setReceiveTimeout(0); + + do + { + char buffer[200000]; //FIXME: Dynamic? + + if ((pollTimeout = ws->poll(waitTime, Socket::SELECT_READ))) + { + n = ws->receiveFrame(buffer, sizeof(buffer), flags); + + if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) + { + ws->sendFrame("", 0, WebSocket::FRAME_OP_PONG); + } + else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) + { + n = 1; + } + else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) + { + const std::string firstLine = getFirstLine(buffer, n); + if (firstLine == "eof") + break; + + StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + + if (firstLine.size() == static_cast<std::string::size_type>(n)) + { + handler(firstLine.c_str(), firstLine.size()); + } + else + { + // Check if it is a "nextmessage:" and in that case read the large + // follow-up message separately, and handle that only. + int size; + if (tokens.count() == 2 && + tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0) + { + char largeBuffer[size]; //FIXME: Security risk! Flooding may segfault us. + + n = ws->receiveFrame(largeBuffer, size, flags); + if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) + { + if (!handler(largeBuffer, n)) + n = 0; + } + } + else + { + if (!handler(buffer, n)) + n = 0; + } + } + } + } + } + while (!TerminationFlag && + (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE))); + } + catch (const WebSocketException& exc) { + Log::error("RequestHandler::handleRequest(), WebSocketException: " + exc.message()); + switch (exc.code()) + { + case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: + response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION); + // fallthrough + case WebSocket::WS_ERR_NO_HANDSHAKE: + case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION: + case WebSocket::WS_ERR_HANDSHAKE_NO_KEY: + response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST); + response.setContentLength(0); + response.send(); + break; + } } +} + + +/// Handle a public connection from a client. +class ClientRequestHandler: public HTTPRequestHandler +{ +public: void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override { - std::string thread_name; - if (request.serverAddress().port() == MASTER_PORT_NUMBER) - thread_name = "prison_socket"; - else - thread_name = "client_socket"; + const std::string thread_name = "client_socket"; #ifdef __linux if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) @@ -382,118 +468,82 @@ public: try { - try - { - auto ws = std::make_shared<WebSocket>(request, response); + auto ws = std::make_shared<WebSocket>(request, response); - LOOLSession::Kind kind; - std::string id; + const std::string id = LOOLWSD::GenSessionId(); + auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws); - if (request.getURI() == LOOLWSD::CHILD_URI && request.serverAddress().port() == MASTER_PORT_NUMBER) - kind = LOOLSession::Kind::ToPrisoner; - else + // For ToClient sessions, we store incoming messages in a queue and have a separate + // thread that handles them. This is so that we can empty the queue when we get a + // "canceltiles" message. + handler.setSession(session); + queueHandlerThread.start(handler); + + SocketProcessor(ws, response, [&session, &queue](const char* data, const int size) { - kind = LOOLSession::Kind::ToClient; - id = LOOLWSD::GenSessionId(); - } + const std::string firstLine = getFirstLine(data, size); + if (firstLine == "eof") + return false; - auto session = std::make_shared<MasterProcessSession>(id, kind, ws); + if (firstLine.size() == static_cast<std::string::size_type>(size)) + { + queue.put(firstLine); + return true; + } + else + { + return session->handleInput(data, size); + } + }); - // For ToClient sessions, we store incoming messages in a queue and have a separate - // thread that handles them. This is so that we can empty the queue when we get a - // "canceltiles" message. - if (kind == LOOLSession::Kind::ToClient) - { - handler.setSession(session); - queueHandlerThread.start(handler); - } + queue.clear(); + queue.put("eof"); + queueHandlerThread.join(); + } + catch (const IOException& exc) + { + Log::error("IOException: " + exc.message()); + } - // Loop, receiving WebSocket messages either from the client, or from the child - // process (to be forwarded to the client). - int flags; - int n; - bool pollTimeout = true; - ws->setReceiveTimeout(0); + Log::debug("Thread [" + thread_name + "] finished."); + } +}; - do - { - char buffer[200000]; //FIXME: Dynamic? +/// Handle requests from prisoners (internal). +class PrisonerRequestHandler: public HTTPRequestHandler +{ +public: - if ((pollTimeout = ws->poll(waitTime, Socket::SELECT_READ))) - { - n = ws->receiveFrame(buffer, sizeof(buffer), flags); + void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override + { + assert(request.serverAddress().port() == MASTER_PORT_NUMBER); + assert(request.getURI() == LOOLWSD::CHILD_URI); - if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) - { - ws->sendFrame("", 0, WebSocket::FRAME_OP_PONG); - } - else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) - { - n = 1; - } - else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) - { - const std::string firstLine = getFirstLine(buffer, n); - if (firstLine == "eof") - break; + const std::string thread_name = "prison_socket"; + +#ifdef __linux + if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) + Log::error("Cannot set thread name to " + thread_name + "."); +#endif + Log::debug("Thread [" + thread_name + "] started."); - StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + Poco::Timespan waitTime(POLL_TIMEOUT); - if (kind == LOOLSession::Kind::ToClient && - firstLine.size() == static_cast<std::string::size_type>(n)) - { - queue.put(firstLine); - } - else - { - // Check if it is a "nextmessage:" and in that case read the large - // follow-up message separately, and handle that only. - int size; - if (tokens.count() == 2 && - tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0) - { - char largeBuffer[size]; //FIXME: Security risk! Flooding may segfault us. - - n = ws->receiveFrame(largeBuffer, size, flags); - if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) - { - if (!session->handleInput(largeBuffer, n)) - n = 0; - } - } - else - { - if (!session->handleInput(buffer, n)) - n = 0; - } - } - } - } - } - while (!TerminationFlag && - (!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE))); + try + { + auto ws = std::make_shared<WebSocket>(request, response); - queue.clear(); - queue.put("eof"); - queueHandlerThread.join(); - } - catch (const WebSocketException& exc) - { - Log::error("RequestHandler::handleRequest(), WebSocketException: " + exc.message()); - switch (exc.code()) + std::string id; + auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToPrisoner, ws); + + SocketProcessor(ws, response, [&session](const char* data, const int size) { - case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: - response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION); - // fallthrough - case WebSocket::WS_ERR_NO_HANDSHAKE: - case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION: - case WebSocket::WS_ERR_HANDSHAKE_NO_KEY: - response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST); - response.setContentLength(0); - response.send(); - break; - } - } + const std::string firstLine = getFirstLine(data, size); + if (firstLine == "eof") + return false; + + return session->handleInput(data, size); + }); } catch (const IOException& exc) { @@ -504,13 +554,10 @@ public: } }; +template <class RequestHandler> class RequestHandlerFactory: public HTTPRequestHandlerFactory { public: - RequestHandlerFactory() - { - } - HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override { auto logger = Log::info(); @@ -868,7 +915,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/) // Start a server listening on the port for clients ServerSocket svs(ClientPortNumber, NumPreSpawnedChildren*10); ThreadPool threadPool(NumPreSpawnedChildren*2, NumPreSpawnedChildren*5); - HTTPServer srv(new RequestHandlerFactory(), threadPool, svs, new HTTPServerParams); + HTTPServer srv(new RequestHandlerFactory<ClientRequestHandler>(), threadPool, svs, new HTTPServerParams); srv.start(); @@ -876,7 +923,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/) SocketAddress addr2("127.0.0.1", MASTER_PORT_NUMBER); ServerSocket svs2(addr2, NumPreSpawnedChildren); ThreadPool threadPool2(NumPreSpawnedChildren*2, NumPreSpawnedChildren*5); - HTTPServer srv2(new RequestHandlerFactory(), threadPool2, svs2, new HTTPServerParams); + HTTPServer srv2(new RequestHandlerFactory<PrisonerRequestHandler>(), threadPool2, svs2, new HTTPServerParams); srv2.start(); _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/libreoffice-commits