[Libreoffice-commits] online.git: loolwsd/LOOLStress.cpp loolwsd/LOOLWSD.cpp loolwsd/TraceFile.hpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Mon Aug 8 03:52:53 UTC 2016
loolwsd/LOOLStress.cpp | 135 +++++++++++++++++++++++++++++++++++++++++++------
loolwsd/LOOLWSD.cpp | 5 -
loolwsd/TraceFile.hpp | 52 ++++++++++--------
3 files changed, 152 insertions(+), 40 deletions(-)
New commits:
commit 7af51f51c4f4d89c837c96bd3c59e66288025a77
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Thu Aug 4 10:31:11 2016 -0400
loolstress: new Connection manager and event handler
Change-Id: Ifc921f7fcf298457a848da444c2d3830b9755603
Reviewed-on: https://gerrit.libreoffice.org/27967
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLStress.cpp b/loolwsd/LOOLStress.cpp
index 6a7c38d..f60aa55 100644
--- a/loolwsd/LOOLStress.cpp
+++ b/loolwsd/LOOLStress.cpp
@@ -76,37 +76,63 @@ using Poco::Util::HelpFormatter;
using Poco::Util::Option;
using Poco::Util::OptionSet;
-class Worker: public Runnable
+class Connection
{
public:
+ static
+ std::unique_ptr<Connection> create(const std::string& serverURI, const std::string& documentURL, const std::string& sessionId)
+ {
+ Poco::URI uri(serverURI);
- Worker(Stress& app, const std::string& traceFilePath) :
- _app(app), _traceFile(traceFilePath)
+ // Load a document and get its status.
+ std::cerr << "NewSession [" << sessionId << "]: " << uri.toString() << "... ";
+ Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, "/lool/ws/" + documentURL);
+ Poco::Net::HTTPResponse response;
+ auto ws = helpers::connectLOKit(uri, request, response, "loolStress ");
+ std::cerr << "Connected.\n";
+ return std::unique_ptr<Connection>(new Connection(documentURL, sessionId, ws));
+ }
+
+ void send(const std::string& data) const
{
+ helpers::sendTextFrame(_ws, data, "loolstress ");
}
- void run() override
+private:
+ Connection(const std::string& documentURL, const std::string& sessionId, std::shared_ptr<Poco::Net::WebSocket>& ws) :
+ _documentURL(documentURL),
+ _sessionId(sessionId),
+ _ws(ws)
{
- std::cerr << "Connecting to server: " << _app._serverURI << "\n";
+ }
- Poco::URI uri(_app._serverURI);
+private:
+ const std::string _documentURL;
+ const std::string _sessionId;
+ std::shared_ptr<Poco::Net::WebSocket> _ws;
+};
- const auto documentURL = _traceFile.getDocURI();
- std::cerr << "Loading: " << documentURL << "\n";
+class Worker: public Runnable
+{
+public:
- // Load a document and get its status.
- Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, documentURL);
- Poco::Net::HTTPResponse response;
- auto socket = helpers::connectLOKit(uri, request, response, "loolStress ");
+ Worker(Stress& app, const std::string& traceFilePath) :
+ _app(app),
+ _traceFile(traceFilePath)
+ {
+ }
+ void run() override
+ {
const auto epochStart(std::chrono::steady_clock::now());
try
{
for (;;)
{
- const auto rec = _traceFile.getNextRecord(TraceFileRecord::Direction::Incoming);
+ const auto rec = _traceFile.getNextRecord();
if (rec.Dir == TraceFileRecord::Direction::Invalid)
{
+ // End of trace file.
break;
}
@@ -117,7 +143,82 @@ public:
std::this_thread::sleep_for(std::chrono::microseconds(delay));
}
- helpers::sendTextFrame(socket, rec.Payload);
+ if (rec.Dir == TraceFileRecord::Direction::Event)
+ {
+ // Meta info about about an event.
+ static const std::string NewSession("NewSession: ");
+ static const std::string EndSession("EndSession: ");
+
+ if (rec.Payload.find(NewSession) == 0)
+ {
+ const auto& uri = rec.Payload.substr(NewSession.size());
+ auto it = Sessions.find(uri);
+ if (it != Sessions.end())
+ {
+ // Add a new session.
+ if (it->second.find(rec.SessionId) != it->second.end())
+ {
+ std::cerr << "ERROR: session [" << rec.SessionId << "] already exists on doc [" << uri << "]\n";
+ }
+ else
+ {
+ it->second.emplace(rec.SessionId, Connection::create(_app._serverURI, uri, rec.SessionId));
+ }
+ }
+ else
+ {
+ std::cerr << "New Document: " << uri << "\n";
+ ChildToDoc.emplace(rec.Pid, uri);
+ Sessions[uri].emplace(rec.SessionId, Connection::create(_app._serverURI, uri, rec.SessionId));
+ }
+ }
+ else if (rec.Payload.find(EndSession) == 0)
+ {
+ const auto& uri = rec.Payload.substr(EndSession.size());
+ auto it = Sessions.find(uri);
+ if (it != Sessions.end())
+ {
+ std::cerr << "EndSession [" << rec.SessionId << "]: " << uri << "\n";
+
+ it->second.erase(rec.SessionId);
+ if (it->second.empty())
+ {
+ std::cerr << "End Doc [" << uri << "].\n";
+ Sessions.erase(it);
+ ChildToDoc.erase(rec.Pid);
+ }
+ }
+ else
+ {
+ std::cerr << "ERROR: Doc [" << uri << "] does not exist.\n";
+ }
+ }
+ }
+ else if (rec.Dir == TraceFileRecord::Direction::Incoming)
+ {
+ auto docIt = ChildToDoc.find(rec.Pid);
+ if (docIt != ChildToDoc.end())
+ {
+ const auto& uri = docIt->second;
+ auto it = Sessions.find(uri);
+ if (it != Sessions.end())
+ {
+ const auto sessionIt = it->second.find(rec.SessionId);
+ if (sessionIt != it->second.end())
+ {
+ sessionIt->second->send(rec.Payload);
+ }
+ }
+ else
+ {
+ std::cerr << "ERROR: Doc [" << uri << "] does not exist.\n";
+ }
+ }
+ else
+ {
+ std::cerr << "ERROR: Unknown PID [" << rec.Pid << "] maps to no active document.\n";
+ }
+ }
}
}
catch (const Poco::Exception &e)
@@ -131,6 +232,12 @@ public:
private:
Stress& _app;
TraceFileReader _traceFile;
+
+ /// LOK child process PID to Doc URI map.
+ std::map<unsigned, std::string> ChildToDoc;
+
+ /// Doc URI to Sessions map. Sessions are maps of SessionID to Connection.
+ std::map<std::string, std::map<std::string, std::unique_ptr<Connection>>> Sessions;
};
Stress::Stress() :
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 187c68f..c86ceb7 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -551,7 +551,6 @@ private:
if (uri.size() > 0 && uri.compare(0, 8, "lool/ws/") == 0)
uri.erase(0, 8);
-
const auto uriPublic = DocumentBroker::sanitizeURI(uri);
const auto docKey = DocumentBroker::getDocKey(uriPublic);
std::shared_ptr<DocumentBroker> docBroker;
@@ -683,7 +682,7 @@ private:
// Wait until the client has connected with a prison socket.
waitBridgeCompleted(session);
- LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "NewSession");
+ LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "NewSession: " + uri);
// Now the bridge beetween the client and kit process is connected
status = "statusindicator: ready";
@@ -760,7 +759,7 @@ private:
Admin::instance().rmDoc(docKey);
}
- LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "EndSession");
+ LOOLWSD::dumpEventTrace(docBroker->getJailId(), id, "EndSession: " + uri);
Log::info("Finishing GET request handler for session [" + id + "]. Joining the queue.");
queue->put("eof");
queueHandlerThread.join();
diff --git a/loolwsd/TraceFile.hpp b/loolwsd/TraceFile.hpp
index cd429a6..0015cbc 100644
--- a/loolwsd/TraceFile.hpp
+++ b/loolwsd/TraceFile.hpp
@@ -33,7 +33,7 @@ public:
Direction Dir;
unsigned TimestampNs;
unsigned Pid;
- unsigned SessionId;
+ std::string SessionId;
std::string Payload;
};
@@ -94,13 +94,23 @@ public:
TraceFileReader(const std::string& path) :
_epochStart(Poco::Timestamp().epochMicroseconds()),
_stream(path),
+ _index(0),
_indexIn(-1),
_indexOut(-1)
{
readFile();
}
- const std::string& getDocURI() const { return _docURI; }
+ TraceFileRecord getNextRecord()
+ {
+ if (_index < _records.size())
+ {
+ return _records[_index++];
+ }
+
+ // Invalid.
+ return TraceFileRecord();
+ }
TraceFileRecord getNextRecord(const TraceFileRecord::Direction dir)
{
@@ -136,38 +146,34 @@ private:
while (std::getline(_stream, line) && !line.empty())
{
const auto v = split(line, line[0]);
- if (v.size() == 3)
+ if (v.size() == 4)
{
TraceFileRecord rec;
rec.Dir = static_cast<TraceFileRecord::Direction>(line[0]);
- rec.Pid = std::atoi(v[0].c_str());
- rec.TimestampNs = std::atoi(v[1].c_str());
- rec.Payload = v[2];
+ unsigned index = 0;
+ rec.TimestampNs = std::atoi(v[index++].c_str());
+ rec.Pid = std::atoi(v[index++].c_str());
+ rec.SessionId = v[index++];
+ rec.Payload = v[index++];
_records.push_back(rec);
}
+ else
+ {
+ fprintf(stderr, "Invalid trace file record, expected 4 tokens. [%s]\n", line.c_str());
+ }
}
_indexIn = advance(-1, TraceFileRecord::Direction::Incoming);
_indexOut = advance(-1, TraceFileRecord::Direction::Outgoing);
- if (_records.size() > 1)
+ if (_records.empty() ||
+ _records[0].Dir != TraceFileRecord::Direction::Event ||
+ _records[0].Payload.find("NewSession") != 0)
{
- if (_records[0].Payload.find("loolclient") == 0 &&
- _records[1].Payload.find("load url=") == 0)
- {
- _docURI = _records[1].Payload.substr(9);
- return;
- }
- else if (_records[0].Payload.find("load url=") == 0)
- {
- _docURI = _records[0].Payload.substr(9);
- return;
- }
+ fprintf(stderr, "Invalid trace file with %ld records. First record: %s\n", _records.size(),
+ _records.empty() ? "<empty>" : _records[0].Payload.c_str());
+ throw std::runtime_error("Invalid trace file.");
}
-
- fprintf(stderr, "Invalid trace file with %ld records. First record: %s\n", _records.size(),
- _records.empty() ? "<empty>" : _records[0].Payload.c_str());
- throw std::runtime_error("Invalid trace file.");
}
std::vector<std::string> split(const std::string& s, const char delim) const
@@ -203,7 +209,7 @@ private:
const Poco::Int64 _epochStart;
std::ifstream _stream;
std::vector<TraceFileRecord> _records;
- std::string _docURI;
+ unsigned _index;
unsigned _indexIn;
unsigned _indexOut;
};
More information about the Libreoffice-commits
mailing list