loolwsd/LOOLStress.cpp |  135 +++++++++++++++++++++++++++++++++++++++++++------
 loolwsd/LOOLWSD.cpp    |    5 -
 loolwsd/TraceFile.hpp  |   52 ++++++++++--------
 3 files changed, 152 insertions(+), 40 deletions(-)

New commits:
commit 7af51f51c4f4d89c837c96bd3c59e66288025a77
Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk>
Date:   Thu Aug 4 10:31:11 2016 -0400

    loolstress: new Connection manager and event handler
    
    Change-Id: Ifc921f7fcf298457a848da444c2d3830b9755603
    Reviewed-on: https://gerrit.libreoffice.org/27967
    Reviewed-by: Ashod Nakashian <ashnak...@gmail.com>
    Tested-by: Ashod Nakashian <ashnak...@gmail.com>

diff --git a/loolwsd/LOOLStress.cpp b/loolwsd/LOOLStress.cpp
index 6a7c38d..f60aa55 100644
--- a/loolwsd/LOOLStress.cpp
+++ b/loolwsd/LOOLStress.cpp
@@ -76,37 +76,63 @@ using Poco::Util::HelpFormatter;
 using Poco::Util::Option;
 using Poco::Util::OptionSet;
 
-class Worker: public Runnable
+class Connection
 {
 public:
+    static
+    std::unique_ptr<Connection> create(const std::string& serverURI, const 
std::string& documentURL, const std::string& sessionId)
+    {
+        Poco::URI uri(serverURI);
 
-    Worker(Stress& app, const std::string& traceFilePath) :
-        _app(app), _traceFile(traceFilePath)
+        // Load a document and get its status.
+        std::cerr << "NewSession [" << sessionId << "]: " << uri.toString() << 
"... ";
+        Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, 
"/lool/ws/" + documentURL);
+        Poco::Net::HTTPResponse response;
+        auto ws = helpers::connectLOKit(uri, request, response, "loolStress ");
+        std::cerr << "Connected.\n";
+        return std::unique_ptr<Connection>(new Connection(documentURL, 
sessionId, ws));
+    }
+
+    void send(const std::string& data) const
     {
+        helpers::sendTextFrame(_ws, data, "loolstress ");
     }
 
-    void run() override
+private:
+    Connection(const std::string& documentURL, const std::string& sessionId, 
std::shared_ptr<Poco::Net::WebSocket>& ws) :
+        _documentURL(documentURL),
+        _sessionId(sessionId),
+        _ws(ws)
     {
-        std::cerr << "Connecting to server: " << _app._serverURI << "\n";
+    }
 
-        Poco::URI uri(_app._serverURI);
+private:
+    const std::string _documentURL;
+    const std::string _sessionId;
+    std::shared_ptr<Poco::Net::WebSocket> _ws;
+};
 
-        const auto documentURL = _traceFile.getDocURI();
-        std::cerr << "Loading: " << documentURL << "\n";
+class Worker: public Runnable
+{
+public:
 
-        // Load a document and get its status.
-        Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, 
documentURL);
-        Poco::Net::HTTPResponse response;
-        auto socket = helpers::connectLOKit(uri, request, response, 
"loolStress ");
+    Worker(Stress& app, const std::string& traceFilePath) :
+        _app(app),
+        _traceFile(traceFilePath)
+    {
+    }
 
+    void run() override
+    {
         const auto epochStart(std::chrono::steady_clock::now());
         try
         {
             for (;;)
             {
-                const auto rec = 
_traceFile.getNextRecord(TraceFileRecord::Direction::Incoming);
+                const auto rec = _traceFile.getNextRecord();
                 if (rec.Dir == TraceFileRecord::Direction::Invalid)
                 {
+                    // End of trace file.
                     break;
                 }
 
@@ -117,7 +143,82 @@ public:
                     
std::this_thread::sleep_for(std::chrono::microseconds(delay));
                 }
 
-                helpers::sendTextFrame(socket, rec.Payload);
+                if (rec.Dir == TraceFileRecord::Direction::Event)
+                {
+                    // Meta info about about an event.
+                    static const std::string NewSession("NewSession: ");
+                    static const std::string EndSession("EndSession: ");
+
+                    if (rec.Payload.find(NewSession) == 0)
+                    {
+                        const auto& uri = 
rec.Payload.substr(NewSession.size());
+                        auto it = Sessions.find(uri);
+                        if (it != Sessions.end())
+                        {
+                            // Add a new session.
+                            if (it->second.find(rec.SessionId) != 
it->second.end())
+                            {
+                                std::cerr << "ERROR: session [" << 
rec.SessionId << "] already exists on doc [" << uri << "]\n";
+                            }
+                            else
+                            {
+                                it->second.emplace(rec.SessionId, 
Connection::create(_app._serverURI, uri, rec.SessionId));
+                            }
+                        }
+                        else
+                        {
+                            std::cerr << "New Document: " << uri << "\n";
+                            ChildToDoc.emplace(rec.Pid, uri);
+                            Sessions[uri].emplace(rec.SessionId, 
Connection::create(_app._serverURI, uri, rec.SessionId));
+                        }
+                    }
+                    else if (rec.Payload.find(EndSession) == 0)
+                    {
+                        const auto& uri = 
rec.Payload.substr(EndSession.size());
+                        auto it = Sessions.find(uri);
+                        if (it != Sessions.end())
+                        {
+                            std::cerr << "EndSession [" << rec.SessionId << 
"]: " << uri << "\n";
+
+                            it->second.erase(rec.SessionId);
+                            if (it->second.empty())
+                            {
+                                std::cerr << "End Doc [" << uri << "].\n";
+                                Sessions.erase(it);
+                                ChildToDoc.erase(rec.Pid);
+                            }
+                        }
+                        else
+                        {
+                            std::cerr << "ERROR: Doc [" << uri << "] does not 
exist.\n";
+                        }
+                    }
+                }
+                else if (rec.Dir == TraceFileRecord::Direction::Incoming)
+                {
+                    auto docIt = ChildToDoc.find(rec.Pid);
+                    if (docIt != ChildToDoc.end())
+                    {
+                        const auto& uri = docIt->second;
+                        auto it = Sessions.find(uri);
+                        if (it != Sessions.end())
+                        {
+                            const auto sessionIt = 
it->second.find(rec.SessionId);
+                            if (sessionIt != it->second.end())
+                            {
+                                sessionIt->second->send(rec.Payload);
+                            }
+                        }
+                        else
+                        {
+                            std::cerr << "ERROR: Doc [" << uri << "] does not 
exist.\n";
+                        }
+                    }
+                    else
+                    {
+                        std::cerr << "ERROR: Unknown PID [" << rec.Pid << "] 
maps to no active document.\n";
+                    }
+                }
             }
         }
         catch (const Poco::Exception &e)
@@ -131,6 +232,12 @@ public:
 private:
     Stress& _app;
     TraceFileReader _traceFile;
+
+    /// LOK child process PID to Doc URI map.
+    std::map<unsigned, std::string> ChildToDoc;
+
+    /// Doc URI to Sessions map. Sessions are maps of SessionID to Connection.
+    std::map<std::string, std::map<std::string, std::unique_ptr<Connection>>> 
Sessions;
 };
 
 Stress::Stress() :
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 187c68f..c86ceb7 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -551,7 +551,6 @@ private:
         if (uri.size() > 0 && uri.compare(0, 8, "lool/ws/") == 0)
             uri.erase(0, 8);
 
-
         const auto uriPublic = DocumentBroker::sanitizeURI(uri);
         const auto docKey = DocumentBroker::getDocKey(uriPublic);
         std::shared_ptr<DocumentBroker> docBroker;
@@ -683,7 +682,7 @@ private:
             // Wait until the client has connected with a prison socket.
             waitBridgeCompleted(session);
 
-            LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "NewSession");
+            LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "NewSession: " 
+ uri);
 
             // Now the bridge beetween the client and kit process is connected
             status = "statusindicator: ready";
@@ -760,7 +759,7 @@ private:
                 Admin::instance().rmDoc(docKey);
             }
 
-            LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "EndSession");
+            LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "EndSession: " 
+ uri);
             Log::info("Finishing GET request handler for session [" + id + "]. 
Joining the queue.");
             queue->put("eof");
             queueHandlerThread.join();
diff --git a/loolwsd/TraceFile.hpp b/loolwsd/TraceFile.hpp
index cd429a6..0015cbc 100644
--- a/loolwsd/TraceFile.hpp
+++ b/loolwsd/TraceFile.hpp
@@ -33,7 +33,7 @@ public:
     Direction Dir;
     unsigned TimestampNs;
     unsigned Pid;
-    unsigned SessionId;
+    std::string SessionId;
     std::string Payload;
 };
 
@@ -94,13 +94,23 @@ public:
     TraceFileReader(const std::string& path) :
         _epochStart(Poco::Timestamp().epochMicroseconds()),
         _stream(path),
+        _index(0),
         _indexIn(-1),
         _indexOut(-1)
     {
         readFile();
     }
 
-    const std::string& getDocURI() const { return _docURI; }
+    TraceFileRecord getNextRecord()
+    {
+        if (_index < _records.size())
+        {
+            return _records[_index++];
+        }
+
+        // Invalid.
+        return TraceFileRecord();
+    }
 
     TraceFileRecord getNextRecord(const TraceFileRecord::Direction dir)
     {
@@ -136,38 +146,34 @@ private:
         while (std::getline(_stream, line) && !line.empty())
         {
             const auto v = split(line, line[0]);
-            if (v.size() == 3)
+            if (v.size() == 4)
             {
                 TraceFileRecord rec;
                 rec.Dir = static_cast<TraceFileRecord::Direction>(line[0]);
-                rec.Pid = std::atoi(v[0].c_str());
-                rec.TimestampNs = std::atoi(v[1].c_str());
-                rec.Payload = v[2];
+                unsigned index = 0;
+                rec.TimestampNs = std::atoi(v[index++].c_str());
+                rec.Pid = std::atoi(v[index++].c_str());
+                rec.SessionId = v[index++];
+                rec.Payload = v[index++];
                 _records.push_back(rec);
             }
+            else
+            {
+                fprintf(stderr, "Invalid trace file record, expected 4 tokens. 
[%s]\n", line.c_str());
+            }
         }
 
         _indexIn = advance(-1, TraceFileRecord::Direction::Incoming);
         _indexOut = advance(-1, TraceFileRecord::Direction::Outgoing);
 
-        if (_records.size() > 1)
+        if (_records.empty() ||
+            _records[0].Dir != TraceFileRecord::Direction::Event ||
+            _records[0].Payload.find("NewSession") != 0)
         {
-            if (_records[0].Payload.find("loolclient") == 0 &&
-                _records[1].Payload.find("load url=") == 0)
-            {
-                _docURI = _records[1].Payload.substr(9);
-                return;
-            }
-            else if (_records[0].Payload.find("load url=") == 0)
-            {
-                _docURI = _records[0].Payload.substr(9);
-                return;
-            }
+            fprintf(stderr, "Invalid trace file with %ld records. First 
record: %s\n", _records.size(),
+                    _records.empty() ? "<empty>" : 
_records[0].Payload.c_str());
+            throw std::runtime_error("Invalid trace file.");
         }
-
-        fprintf(stderr, "Invalid trace file with %ld records. First record: 
%s\n", _records.size(),
-                _records.empty() ? "<empty>" : _records[0].Payload.c_str());
-        throw std::runtime_error("Invalid trace file.");
     }
 
     std::vector<std::string> split(const std::string& s, const char delim) 
const
@@ -203,7 +209,7 @@ private:
     const Poco::Int64 _epochStart;
     std::ifstream _stream;
     std::vector<TraceFileRecord> _records;
-    std::string _docURI;
+    unsigned _index;
     unsigned _indexIn;
     unsigned _indexOut;
 };
_______________________________________________
Libreoffice-commits mailing list
libreoffice-comm...@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits

Reply via email to