loolwsd/LOOLKit.cpp | 172 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 112 insertions(+), 60 deletions(-)
New commits: commit 02d114191d23e701282c14ff5cd580eaf42564e8 Author: Henry Castro <hcas...@collabora.com> Date: Sun Sep 27 13:39:09 2015 -0400 loolwsd: add Connection thread class diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index bacb2dc..ac5ebc3 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -46,6 +46,9 @@ using Poco::StringTokenizer; using Poco::Exception; using Poco::Process; +const int MASTER_PORT_NUMBER = 9981; +const std::string CHILD_URI = "/loolws/child/"; + class QueueHandler: public Runnable { public: @@ -80,8 +83,115 @@ private: tsqueue<std::string>& _queue; }; -const int MASTER_PORT_NUMBER = 9981; -const std::string CHILD_URI = "/loolws/child/"; +class Connection: public Runnable +{ +public: + Connection(LibreOfficeKit *loKit, Poco::UInt64 childId, const std::string& threadId) : + _loKit(loKit), + _childId(childId), + _threadId(threadId) + { + } + + void start() + { + _thread.start(*this); + } + + bool isRunning() + { + return _thread.isRunning(); + } + + void run() override + { +#ifdef __linux + if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>("lokit_connection"), 0, 0, 0) != 0) + std::cout << Util::logPrefix() << "Cannot set thread name :" << strerror(errno) << std::endl; +#endif + try + { + // Open websocket connection between the child process and the + // parent. The parent forwards us requests that it can't handle. + + HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER); + cs.setTimeout(0); + HTTPRequest request(HTTPRequest::HTTP_GET, CHILD_URI); + HTTPResponse response; + std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response)); + + std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, _loKit)); + ws->setReceiveTimeout(0); + + // child Jail TID PID + std::string hello("child " + std::to_string(_childId) + " " + + _threadId + " " + std::to_string(Process::id())); + session->sendTextFrame(hello); + + tsqueue<std::string> queue; + Thread queueHandlerThread; + QueueHandler handler(queue); + + handler.setSession(session); + queueHandlerThread.start(handler); + + int flags; + int n; + do + { + char buffer[1024]; + n = ws->receiveFrame(buffer, sizeof(buffer), flags); + + if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) + { + std::string firstLine = getFirstLine(buffer, n); + StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + + // The only kind of messages a child process receives are the single-line ones (?) + assert(firstLine.size() == static_cast<std::string::size_type>(n)); + + // Check if it is a "canceltiles" and in that case remove outstanding + // "tile" messages from the queue. + if (tokens.count() == 1 && tokens[0] == "canceltiles") + { + queue.remove_if([](std::string& x) { + return (x.find("tile ") == 0 && x.find("id=") == std::string::npos); + }); + } + else + { + queue.put(firstLine); + } + } + } + while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE); + + queue.clear(); + queue.put("eof"); + queueHandlerThread.join(); + } + + catch (Exception& exc) + { + std::cout << Util::logPrefix() + "Exception: " + exc.what() << std::endl; + } + catch (std::exception& exc) + { + std::cout << Util::logPrefix() + "Exception: " + exc.what() << std::endl; + } + } + + ~Connection() + { + //_thread.stop(); + } + +private: + LibreOfficeKit *_loKit; + Poco::UInt64 _childId; + std::string _threadId; + Thread _thread; +}; void run_lok_main(const std::string &loSubPath, Poco::UInt64 _childId) { @@ -107,64 +217,6 @@ void run_lok_main(const std::string &loSubPath, Poco::UInt64 _childId) exit(-1); } - // Open websocket connection between the child process and the - // parent. The parent forwards us requests that it can't handle. - - HTTPClientSession cs("127.0.0.1", MASTER_PORT_NUMBER); - cs.setTimeout(0); - HTTPRequest request(HTTPRequest::HTTP_GET, CHILD_URI); - HTTPResponse response; - std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response)); - - std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, loKit)); - - ws->setReceiveTimeout(0); - - std::string hello("child " + std::to_string(_childId) + " " + std::to_string(Process::id())); - session->sendTextFrame(hello); - - tsqueue<std::string> queue; - Thread queueHandlerThread; - QueueHandler handler(queue); - - handler.setSession(session); - queueHandlerThread.start(handler); - - int flags; - int n; - do - { - char buffer[1024]; - n = ws->receiveFrame(buffer, sizeof(buffer), flags); - - if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) - { - std::string firstLine = getFirstLine(buffer, n); - StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - - // The only kind of messages a child process receives are the single-line ones (?) - assert(firstLine.size() == static_cast<std::string::size_type>(n)); - - // Check if it is a "canceltiles" and in that case remove outstanding - // "tile" messages from the queue. - if (tokens.count() == 1 && tokens[0] == "canceltiles") - { - queue.remove_if([](std::string& x) { - return (x.find("tile ") == 0 && x.find("id=") == std::string::npos); - }); - } - else - { - queue.put(firstLine); - } - } - } - while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE); - - queue.clear(); - queue.put("eof"); - queueHandlerThread.join(); - // Destroy LibreOfficeKit loKit->pClass->destroy(loKit); } _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/libreoffice-commits