loolwsd/LOOLStress.cpp | 135 +++++++++++++++++++++++++++++++++++++++++++------ loolwsd/LOOLWSD.cpp | 5 - loolwsd/TraceFile.hpp | 52 ++++++++++-------- 3 files changed, 152 insertions(+), 40 deletions(-)
New commits: commit 7af51f51c4f4d89c837c96bd3c59e66288025a77 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Thu Aug 4 10:31:11 2016 -0400 loolstress: new Connection manager and event handler Change-Id: Ifc921f7fcf298457a848da444c2d3830b9755603 Reviewed-on: https://gerrit.libreoffice.org/27967 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/LOOLStress.cpp b/loolwsd/LOOLStress.cpp index 6a7c38d..f60aa55 100644 --- a/loolwsd/LOOLStress.cpp +++ b/loolwsd/LOOLStress.cpp @@ -76,37 +76,63 @@ using Poco::Util::HelpFormatter; using Poco::Util::Option; using Poco::Util::OptionSet; -class Worker: public Runnable +class Connection { public: + static + std::unique_ptr<Connection> create(const std::string& serverURI, const std::string& documentURL, const std::string& sessionId) + { + Poco::URI uri(serverURI); - Worker(Stress& app, const std::string& traceFilePath) : - _app(app), _traceFile(traceFilePath) + // Load a document and get its status. + std::cerr << "NewSession [" << sessionId << "]: " << uri.toString() << "... "; + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, "/lool/ws/" + documentURL); + Poco::Net::HTTPResponse response; + auto ws = helpers::connectLOKit(uri, request, response, "loolStress "); + std::cerr << "Connected.\n"; + return std::unique_ptr<Connection>(new Connection(documentURL, sessionId, ws)); + } + + void send(const std::string& data) const { + helpers::sendTextFrame(_ws, data, "loolstress "); } - void run() override +private: + Connection(const std::string& documentURL, const std::string& sessionId, std::shared_ptr<Poco::Net::WebSocket>& ws) : + _documentURL(documentURL), + _sessionId(sessionId), + _ws(ws) { - std::cerr << "Connecting to server: " << _app._serverURI << "\n"; + } - Poco::URI uri(_app._serverURI); +private: + const std::string _documentURL; + const std::string _sessionId; + std::shared_ptr<Poco::Net::WebSocket> _ws; +}; - const auto documentURL = _traceFile.getDocURI(); - std::cerr << "Loading: " << documentURL << "\n"; +class Worker: public Runnable +{ +public: - // Load a document and get its status. - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, documentURL); - Poco::Net::HTTPResponse response; - auto socket = helpers::connectLOKit(uri, request, response, "loolStress "); + Worker(Stress& app, const std::string& traceFilePath) : + _app(app), + _traceFile(traceFilePath) + { + } + void run() override + { const auto epochStart(std::chrono::steady_clock::now()); try { for (;;) { - const auto rec = _traceFile.getNextRecord(TraceFileRecord::Direction::Incoming); + const auto rec = _traceFile.getNextRecord(); if (rec.Dir == TraceFileRecord::Direction::Invalid) { + // End of trace file. break; } @@ -117,7 +143,82 @@ public: std::this_thread::sleep_for(std::chrono::microseconds(delay)); } - helpers::sendTextFrame(socket, rec.Payload); + if (rec.Dir == TraceFileRecord::Direction::Event) + { + // Meta info about about an event. + static const std::string NewSession("NewSession: "); + static const std::string EndSession("EndSession: "); + + if (rec.Payload.find(NewSession) == 0) + { + const auto& uri = rec.Payload.substr(NewSession.size()); + auto it = Sessions.find(uri); + if (it != Sessions.end()) + { + // Add a new session. + if (it->second.find(rec.SessionId) != it->second.end()) + { + std::cerr << "ERROR: session [" << rec.SessionId << "] already exists on doc [" << uri << "]\n"; + } + else + { + it->second.emplace(rec.SessionId, Connection::create(_app._serverURI, uri, rec.SessionId)); + } + } + else + { + std::cerr << "New Document: " << uri << "\n"; + ChildToDoc.emplace(rec.Pid, uri); + Sessions[uri].emplace(rec.SessionId, Connection::create(_app._serverURI, uri, rec.SessionId)); + } + } + else if (rec.Payload.find(EndSession) == 0) + { + const auto& uri = rec.Payload.substr(EndSession.size()); + auto it = Sessions.find(uri); + if (it != Sessions.end()) + { + std::cerr << "EndSession [" << rec.SessionId << "]: " << uri << "\n"; + + it->second.erase(rec.SessionId); + if (it->second.empty()) + { + std::cerr << "End Doc [" << uri << "].\n"; + Sessions.erase(it); + ChildToDoc.erase(rec.Pid); + } + } + else + { + std::cerr << "ERROR: Doc [" << uri << "] does not exist.\n"; + } + } + } + else if (rec.Dir == TraceFileRecord::Direction::Incoming) + { + auto docIt = ChildToDoc.find(rec.Pid); + if (docIt != ChildToDoc.end()) + { + const auto& uri = docIt->second; + auto it = Sessions.find(uri); + if (it != Sessions.end()) + { + const auto sessionIt = it->second.find(rec.SessionId); + if (sessionIt != it->second.end()) + { + sessionIt->second->send(rec.Payload); + } + } + else + { + std::cerr << "ERROR: Doc [" << uri << "] does not exist.\n"; + } + } + else + { + std::cerr << "ERROR: Unknown PID [" << rec.Pid << "] maps to no active document.\n"; + } + } } } catch (const Poco::Exception &e) @@ -131,6 +232,12 @@ public: private: Stress& _app; TraceFileReader _traceFile; + + /// LOK child process PID to Doc URI map. + std::map<unsigned, std::string> ChildToDoc; + + /// Doc URI to Sessions map. Sessions are maps of SessionID to Connection. + std::map<std::string, std::map<std::string, std::unique_ptr<Connection>>> Sessions; }; Stress::Stress() : diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 187c68f..c86ceb7 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -551,7 +551,6 @@ private: if (uri.size() > 0 && uri.compare(0, 8, "lool/ws/") == 0) uri.erase(0, 8); - const auto uriPublic = DocumentBroker::sanitizeURI(uri); const auto docKey = DocumentBroker::getDocKey(uriPublic); std::shared_ptr<DocumentBroker> docBroker; @@ -683,7 +682,7 @@ private: // Wait until the client has connected with a prison socket. waitBridgeCompleted(session); - LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "NewSession"); + LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "NewSession: " + uri); // Now the bridge beetween the client and kit process is connected status = "statusindicator: ready"; @@ -760,7 +759,7 @@ private: Admin::instance().rmDoc(docKey); } - LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "EndSession"); + LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "EndSession: " + uri); Log::info("Finishing GET request handler for session [" + id + "]. Joining the queue."); queue->put("eof"); queueHandlerThread.join(); diff --git a/loolwsd/TraceFile.hpp b/loolwsd/TraceFile.hpp index cd429a6..0015cbc 100644 --- a/loolwsd/TraceFile.hpp +++ b/loolwsd/TraceFile.hpp @@ -33,7 +33,7 @@ public: Direction Dir; unsigned TimestampNs; unsigned Pid; - unsigned SessionId; + std::string SessionId; std::string Payload; }; @@ -94,13 +94,23 @@ public: TraceFileReader(const std::string& path) : _epochStart(Poco::Timestamp().epochMicroseconds()), _stream(path), + _index(0), _indexIn(-1), _indexOut(-1) { readFile(); } - const std::string& getDocURI() const { return _docURI; } + TraceFileRecord getNextRecord() + { + if (_index < _records.size()) + { + return _records[_index++]; + } + + // Invalid. + return TraceFileRecord(); + } TraceFileRecord getNextRecord(const TraceFileRecord::Direction dir) { @@ -136,38 +146,34 @@ private: while (std::getline(_stream, line) && !line.empty()) { const auto v = split(line, line[0]); - if (v.size() == 3) + if (v.size() == 4) { TraceFileRecord rec; rec.Dir = static_cast<TraceFileRecord::Direction>(line[0]); - rec.Pid = std::atoi(v[0].c_str()); - rec.TimestampNs = std::atoi(v[1].c_str()); - rec.Payload = v[2]; + unsigned index = 0; + rec.TimestampNs = std::atoi(v[index++].c_str()); + rec.Pid = std::atoi(v[index++].c_str()); + rec.SessionId = v[index++]; + rec.Payload = v[index++]; _records.push_back(rec); } + else + { + fprintf(stderr, "Invalid trace file record, expected 4 tokens. [%s]\n", line.c_str()); + } } _indexIn = advance(-1, TraceFileRecord::Direction::Incoming); _indexOut = advance(-1, TraceFileRecord::Direction::Outgoing); - if (_records.size() > 1) + if (_records.empty() || + _records[0].Dir != TraceFileRecord::Direction::Event || + _records[0].Payload.find("NewSession") != 0) { - if (_records[0].Payload.find("loolclient") == 0 && - _records[1].Payload.find("load url=") == 0) - { - _docURI = _records[1].Payload.substr(9); - return; - } - else if (_records[0].Payload.find("load url=") == 0) - { - _docURI = _records[0].Payload.substr(9); - return; - } + fprintf(stderr, "Invalid trace file with %ld records. First record: %s\n", _records.size(), + _records.empty() ? "<empty>" : _records[0].Payload.c_str()); + throw std::runtime_error("Invalid trace file."); } - - fprintf(stderr, "Invalid trace file with %ld records. First record: %s\n", _records.size(), - _records.empty() ? "<empty>" : _records[0].Payload.c_str()); - throw std::runtime_error("Invalid trace file."); } std::vector<std::string> split(const std::string& s, const char delim) const @@ -203,7 +209,7 @@ private: const Poco::Int64 _epochStart; std::ifstream _stream; std::vector<TraceFileRecord> _records; - std::string _docURI; + unsigned _index; unsigned _indexIn; unsigned _indexOut; }; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits