[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - wsd/ClientSession.cpp wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/LOOLWSD.hpp
Michael Meeks
michael.meeks at collabora.com
Sat Mar 4 23:08:28 UTC 2017
wsd/ClientSession.cpp | 1
wsd/DocumentBroker.cpp | 58 ++
wsd/DocumentBroker.hpp | 21
wsd/LOOLWSD.cpp | 1213 +++----------------------------------------------
wsd/LOOLWSD.hpp | 3
5 files changed, 176 insertions(+), 1120 deletions(-)
New commits:
commit c53ea05f7ee8a03fa6752797d4b2d1e651a09300
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Sat Mar 4 23:07:17 2017 +0000
Setup a poll per DocumentBroker with thread to go with that.
Also dung out a chunk of older code.
FIXME: websocket / ClientSession needs to associate itself with the
DocumentBroker poll loop in place of the original loop.
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index 140ea77..edcf34c 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -40,6 +40,7 @@ ClientSession::ClientSession(const std::string& id,
{
LOG_INF("ClientSession ctor [" << getName() << "].");
+ // FIXME: one thread per client session [!?].
_senderThread = std::thread([this]{ senderThread(); });
}
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 5cd7fb3..8f72e21 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -147,14 +147,12 @@ std::string DocumentBroker::getDocKey(const Poco::URI& uri)
DocumentBroker::DocumentBroker(const std::string& uri,
const Poco::URI& uriPublic,
const std::string& docKey,
- const std::string& childRoot,
- const std::shared_ptr<ChildProcess>& childProcess) :
+ const std::string& childRoot) :
_uriOrig(uri),
_uriPublic(uriPublic),
_docKey(docKey),
_childRoot(childRoot),
_cacheRoot(getCachePath(uriPublic.toString())),
- _childProcess(childProcess),
_lastSaveTime(std::chrono::steady_clock::now()),
_markToDestroy(false),
_lastEditableSession(false),
@@ -171,6 +169,57 @@ DocumentBroker::DocumentBroker(const std::string& uri,
assert(!_childRoot.empty());
LOG_INF("DocumentBroker [" << _uriPublic.toString() << "] created. DocKey: [" << _docKey << "]");
+
+ _stop = false;
+}
+
+std::shared_ptr<DocumentBroker> DocumentBroker::create(
+ const std::string& uri,
+ const Poco::URI& uriPublic,
+ const std::string& docKey,
+ const std::string& childRoot)
+{
+ std::shared_ptr<DocumentBroker> docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, childRoot);
+ docBroker->_thread = std::thread(pollThread, docBroker);
+ return docBroker;
+}
+
+void DocumentBroker::pollThread(std::shared_ptr<DocumentBroker> docBroker)
+{
+ // Request a kit process for this doc.
+ docBroker->_childProcess = getNewChild_Blocks();
+ if (!docBroker->_childProcess)
+ {
+ // Let the client know we can't serve now.
+ LOG_ERR("Failed to get new child.");
+
+ // FIXME: need to notify all clients and shut this down ...
+#if 0
+ const std::string msg = SERVICE_UNAVAILABLE_INTERNAL_ERROR;
+ ws.sendFrame(msg);
+ // abnormal close frame handshake
+ ws.shutdown(WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY);
+#endif
+ // FIXME: return something good down the websocket ...
+ docBroker->_stop = true;
+ }
+ docBroker->_childProcess->setDocumentBroker(docBroker);
+
+ // Main polling loop goodness.
+ while (!docBroker->_stop && !TerminationFlag && !ShutdownRequestFlag)
+ {
+ docBroker->_poll.poll(5000);
+ }
+}
+
+bool DocumentBroker::isAlive() const
+{
+ if (!_childProcess)
+ return true; // waiting to get a child.
+ if (_stop) // we're dead.
+ return false;
+
+ return _childProcess->isAlive();
}
DocumentBroker::~DocumentBroker()
@@ -188,6 +237,9 @@ DocumentBroker::~DocumentBroker()
// Need to first make sure the child exited, socket closed,
// and thread finished before we are destroyed.
_childProcess.reset();
+
+ if (_thread.joinable())
+ _thread.join();
}
bool DocumentBroker::load(std::shared_ptr<ClientSession>& session, const std::string& jailId)
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 2c0122d..a2d23a6 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -28,6 +28,7 @@
#include "LOOLWebSocket.hpp"
#include "TileDesc.hpp"
#include "Util.hpp"
+#include "net/Socket.hpp"
#include "common/SigUtil.hpp"
@@ -207,11 +208,18 @@ public:
/// Dummy document broker that is marked to destroy.
DocumentBroker();
+ /// Use create - not this constructor ...
+ /// FIXME: friend with make_shared etc.
DocumentBroker(const std::string& uri,
const Poco::URI& uriPublic,
const std::string& docKey,
- const std::string& childRoot,
- const std::shared_ptr<ChildProcess>& childProcess);
+ const std::string& childRoot);
+public:
+ static std::shared_ptr<DocumentBroker> create(
+ const std::string& uri,
+ const Poco::URI& uriPublic,
+ const std::string& docKey,
+ const std::string& childRoot);
~DocumentBroker();
@@ -240,7 +248,7 @@ public:
const std::string& getDocKey() const { return _docKey; }
const std::string& getFilename() const { return _filename; };
TileCache& tileCache() { return *_tileCache; }
- bool isAlive() const { return _childProcess && _childProcess->isAlive(); }
+ bool isAlive() const;
size_t getSessionsCount() const
{
Util::assertIsLocked(_mutex);
@@ -332,6 +340,10 @@ private:
/// Forward a message from child session to its respective client session.
bool forwardToClient(const std::shared_ptr<Message>& payload);
+ /// The thread function that all of the I/O for all sessions
+ /// associated with this document.
+ static void pollThread(std::shared_ptr<DocumentBroker> docBroker);
+
private:
const std::string _uriOrig;
const Poco::URI _uriPublic;
@@ -366,6 +378,9 @@ private:
mutable std::mutex _mutex;
std::condition_variable _saveCV;
std::mutex _saveMutex;
+ SocketPoll _poll;
+ std::thread _thread;
+ std::atomic<bool> _stop;
/// Versioning is used to prevent races between
/// painting and invalidation.
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index ce39f25..755991f 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -463,966 +463,102 @@ static size_t addNewChild(const std::shared_ptr<ChildProcess>& child)
return count;
}
-static std::shared_ptr<ChildProcess> getNewChild_Blocks()
+std::shared_ptr<ChildProcess> getNewChild_Blocks()
{
- Util::assertIsLocked(DocBrokersMutex);
- std::unique_lock<std::mutex> lock(NewChildrenMutex);
-
- namespace chrono = std::chrono;
- const auto startTime = chrono::steady_clock::now();
- do
- {
- LOG_DBG("getNewChild: Rebalancing children.");
- int numPreSpawn = LOOLWSD::NumPreSpawnedChildren;
- ++numPreSpawn; // Replace the one we'll dispatch just now.
- if (rebalanceChildren(numPreSpawn) < 0)
- {
- // Fatal. Let's fail and retry at a higher level.
- LOG_DBG("getNewChild: rebalancing of children failed.");
- return nullptr;
- }
-
- // With valgrind we need extended time to spawn kits.
-#ifdef KIT_IN_PROCESS
- const auto timeoutMs = CHILD_TIMEOUT_MS;
-#else
- const auto timeoutMs = CHILD_TIMEOUT_MS * (LOOLWSD::NoCapsForKit ? 100 : 1);
-#endif
- LOG_TRC("Waiting for a new child for a max of " << timeoutMs << " ms.");
- const auto timeout = chrono::milliseconds(timeoutMs);
- // FIXME: blocks ...
- if (NewChildrenCV.wait_for(lock, timeout, []() { return !NewChildren.empty(); }))
- {
- auto child = NewChildren.back();
- NewChildren.pop_back();
- const auto available = NewChildren.size();
-
- // Validate before returning.
- if (child && child->isAlive())
- {
- LOG_DBG("getNewChild: Have " << available << " spare " <<
- (available == 1 ? "child" : "children") <<
- " after poping [" << child->getPid() << "] to return.");
- return child;
- }
-
- LOG_WRN("getNewChild: popped dead child, need to find another.");
- }
- else
- {
- LOG_WRN("getNewChild: No available child. Sending spawn request to forkit and failing.");
- }
- }
- while (chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - startTime).count() <
- CHILD_TIMEOUT_MS * 4);
-
- LOG_DBG("getNewChild: Timed out while waiting for new child.");
- return nullptr;
-}
-
-/// Handles the filename part of the convert-to POST request payload.
-class ConvertToPartHandler : public PartHandler
-{
- std::string& _filename;
-public:
- ConvertToPartHandler(std::string& filename)
- : _filename(filename)
- {
- }
-
- virtual void handlePart(const MessageHeader& header, std::istream& stream) override
- {
- // Extract filename and put it to a temporary directory.
- std::string disp;
- NameValueCollection params;
- if (header.has("Content-Disposition"))
- {
- std::string cd = header.get("Content-Disposition");
- MessageHeader::splitParameters(cd, disp, params);
- }
-
- if (!params.has("filename"))
- return;
-
- Path tempPath = Path::forDirectory(Poco::TemporaryFile::tempName() + "/");
- File(tempPath).createDirectories();
- // Prevent user inputting anything funny here.
- // A "filename" should always be a filename, not a path
- const Path filenameParam(params.get("filename"));
- tempPath.setFileName(filenameParam.getFileName());
- _filename = tempPath.toString();
-
- // Copy the stream to _filename.
- std::ofstream fileStream;
- fileStream.open(_filename);
- StreamCopier::copyStream(stream, fileStream);
- fileStream.close();
- }
-};
-
-#if 0 // loolnb
-/// Handle a public connection from a client.
-class ClientRequestHandler : public HTTPRequestHandler
-{
-private:
- static std::string getContentType(const std::string& fileName)
- {
- const std::string nodePath = Poco::format("//[@ext='%s']", Poco::Path(fileName).getExtension());
- std::string discPath = Path(Application::instance().commandPath()).parent().toString() + "discovery.xml";
- if (!File(discPath).exists())
- {
- discPath = LOOLWSD::FileServerRoot + "/discovery.xml";
- }
-
- InputSource input(discPath);
- DOMParser domParser;
- AutoPtr<Poco::XML::Document> doc = domParser.parse(&input);
- // TODO. discovery.xml missing application/pdf
- Node* node = doc->getNodeByPath(nodePath);
- if (node && (node = node->parentNode()) && node->hasAttributes())
- {
- return dynamic_cast<Element*>(node)->getAttribute("name");
- }
-
- return "application/octet-stream";
- }
-
- /// Handle POST requests.
- /// Always throw on error, do not set response status here.
- /// Returns true if a response has been sent.
- static bool handlePostRequest_Blocks(HTTPServerRequest& request, HTTPServerResponse& response, const std::string& id)
- {
- LOG_INF("Post request: [" << request.getURI() << "]");
- StringTokenizer tokens(request.getURI(), "/?");
- if (tokens.count() >= 3 && tokens[2] == "convert-to")
- {
- std::string fromPath;
- ConvertToPartHandler handler(fromPath);
- HTMLForm form(request, request.stream(), handler);
- const std::string format = (form.has("format") ? form.get("format") : "");
-
- bool sent = false;
- if (!fromPath.empty())
- {
- if (!format.empty())
- {
- LOG_INF("Conversion request for URI [" << fromPath << "].");
-
- auto uriPublic = DocumentBroker::sanitizeURI(fromPath);
- const auto docKey = DocumentBroker::getDocKey(uriPublic);
-
- // This lock could become a bottleneck.
- // In that case, we can use a pool and index by publicPath.
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
-
- // Request a kit process for this doc.
- auto child = getNewChild_Blocks();
- if (!child)
- {
- // Let the client know we can't serve now.
- throw std::runtime_error("Failed to spawn lokit child.");
- }
-
- LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
- auto docBroker = std::make_shared<DocumentBroker>(fromPath, uriPublic, docKey, LOOLWSD::ChildRoot, child);
- child->setDocumentBroker(docBroker);
-
- cleanupDocBrokers();
-
- // FIXME: What if the same document is already open? Need a fake dockey here?
- LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
- DocBrokers.emplace(docKey, docBroker);
- LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "].");
-
- // Load the document.
- std::shared_ptr<LOOLWebSocket> ws;
- auto session = std::make_shared<ClientSession>(id, ws, docBroker, uriPublic);
-
- auto lock = docBroker->getLock();
- auto sessionsCount = docBroker->addSession(session);
- lock.unlock();
- LOG_TRC(docKey << ", ws_sessions++: " << sessionsCount);
-
- docBrokersLock.unlock();
-
- std::string encodedFrom;
- URI::encode(docBroker->getPublicUri().getPath(), "", encodedFrom);
- const std::string load = "load url=" + encodedFrom;
- session->handleInput(load.data(), load.size());
-
- // FIXME: Check for security violations.
- Path toPath(docBroker->getPublicUri().getPath());
- toPath.setExtension(format);
- const std::string toJailURL = "file://" + std::string(JAILED_DOCUMENT_ROOT) + toPath.getFileName();
- std::string encodedTo;
- URI::encode(toJailURL, "", encodedTo);
-
- // Convert it to the requested format.
- const auto saveas = "saveas url=" + encodedTo + " format=" + format + " options=";
- session->handleInput(saveas.data(), saveas.size());
-
- // Send it back to the client.
- try
- {
- Poco::URI resultURL(session->getSaveAsUrl(COMMAND_TIMEOUT_MS));
- LOG_TRC("Save-as URL: " << resultURL.toString());
-
- if (!resultURL.getPath().empty())
- {
- const std::string mimeType = "application/octet-stream";
- std::string encodedFilePath;
- URI::encode(resultURL.getPath(), "", encodedFilePath);
- LOG_TRC("Sending file: " << encodedFilePath);
- response.sendFile(encodedFilePath, mimeType);
- sent = true;
- }
- }
- catch (const std::exception& ex)
- {
- LOG_ERR("Failed to get save-as url: " << ex.what());
- }
-
- docBrokersLock.lock();
- auto docLock = docBroker->getLock();
- sessionsCount = docBroker->removeSession(id);
- if (sessionsCount == 0)
- {
- // At this point we're done.
- LOG_DBG("Removing DocumentBroker for docKey [" << docKey << "].");
- DocBrokers.erase(docKey);
- docBroker->terminateChild(docLock, "");
- LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after removing [" << docKey << "].");
- }
- else
- {
- LOG_ERR("Multiple sessions during conversion. " << sessionsCount << " sessions remain.");
- }
- }
-
- // Clean up the temporary directory the HTMLForm ctor created.
- Path tempDirectory(fromPath);
- tempDirectory.setFileName("");
- FileUtil::removeFile(tempDirectory, /*recursive=*/true);
- }
-
- if (!sent)
- {
- // TODO: We should differentiate between bad request and failed conversion.
- throw BadRequestException("Failed to convert and send file.");
- }
-
- return true;
- }
- else if (tokens.count() >= 4 && tokens[3] == "insertfile")
- {
- LOG_INF("Insert file request.");
- response.set("Access-Control-Allow-Origin", "*");
- response.set("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
- response.set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
-
- std::string tmpPath;
- ConvertToPartHandler handler(tmpPath);
- HTMLForm form(request, request.stream(), handler);
-
- if (form.has("childid") && form.has("name"))
- {
- const std::string formChildid(form.get("childid"));
- const std::string formName(form.get("name"));
-
- // Validate the docKey
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
- std::string decodedUri;
- URI::decode(tokens[2], decodedUri);
- const auto docKey = DocumentBroker::getDocKey(DocumentBroker::sanitizeURI(decodedUri));
- auto docBrokerIt = DocBrokers.find(docKey);
-
- // Maybe just free the client from sending childid in form ?
- if (docBrokerIt == DocBrokers.end() || docBrokerIt->second->getJailId() != formChildid)
- {
- throw BadRequestException("DocKey [" + docKey + "] or childid [" + formChildid + "] is invalid.");
- }
- docBrokersLock.unlock();
-
- // protect against attempts to inject something funny here
- if (formChildid.find('/') == std::string::npos && formName.find('/') == std::string::npos)
- {
- LOG_INF("Perform insertfile: " << formChildid << ", " << formName);
- const std::string dirPath = LOOLWSD::ChildRoot + formChildid
- + JAILED_DOCUMENT_ROOT + "insertfile";
- File(dirPath).createDirectories();
- std::string fileName = dirPath + "/" + form.get("name");
- File(tmpPath).moveTo(fileName);
- return false;
- }
- }
- }
- else if (tokens.count() >= 6)
- {
- LOG_INF("File download request.");
- // TODO: Check that the user in question has access to this file!
-
- // 1. Validate the dockey
- std::string decodedUri;
- URI::decode(tokens[2], decodedUri);
- const auto docKey = DocumentBroker::getDocKey(DocumentBroker::sanitizeURI(decodedUri));
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
- auto docBrokerIt = DocBrokers.find(docKey);
- if (docBrokerIt == DocBrokers.end())
- {
- throw BadRequestException("DocKey [" + docKey + "] is invalid.");
- }
-
- // 2. Cross-check if received child id is correct
- if (docBrokerIt->second->getJailId() != tokens[3])
- {
- throw BadRequestException("ChildId does not correspond to docKey");
- }
-
- // 3. Don't let user download the file in main doc directory containing
- // the document being edited otherwise we will end up deleting main directory
- // after download finishes
- if (docBrokerIt->second->getJailId() == tokens[4])
- {
- throw BadRequestException("RandomDir cannot be equal to ChildId");
- }
- docBrokersLock.unlock();
-
- std::string fileName;
- bool responded = false;
- URI::decode(tokens[5], fileName);
- const Path filePath(LOOLWSD::ChildRoot + tokens[3]
- + JAILED_DOCUMENT_ROOT + tokens[4] + "/" + fileName);
- LOG_INF("HTTP request for: " << filePath.toString());
- if (filePath.isAbsolute() && File(filePath).exists())
- {
- std::string contentType = getContentType(fileName);
- response.set("Access-Control-Allow-Origin", "*");
- if (Poco::Path(fileName).getExtension() == "pdf")
- {
- contentType = "application/pdf";
- response.set("Content-Disposition", "attachment; filename=\"" + fileName + "\"");
- }
-
- try
- {
- response.sendFile(filePath.toString(), contentType);
- responded = true;
- }
- catch (const Exception& exc)
- {
- LOG_ERR("Error sending file to client: " << exc.displayText() <<
- (exc.nested() ? " (" + exc.nested()->displayText() + ")" : ""));
- }
-
- FileUtil::removeFile(File(filePath.parent()).path(), true);
- }
- else
- {
- LOG_ERR("Download file [" << filePath.toString() << "] not found.");
- }
-
- return responded;
- }
-
- throw BadRequestException("Invalid or unknown request.");
- }
-
- /// Handle GET requests.
- static void handleGetRequest_Blocks(const std::string& uri, std::shared_ptr<LOOLWebSocket>& ws, const std::string& id)
- {
- LOG_INF("Starting GET request handler for session [" << id << "] on url [" << uri << "].");
- try
- {
- // First, setup WS options.
- // We need blocking here, because the POCO's
- // non-blocking implementation of websockes is
- // broken; essentially it leads to sending
- // incomplete frames.
- ws->setBlocking(true);
- ws->setSendTimeout(WS_SEND_TIMEOUT_MS * 1000);
-
- // Indicate to the client that document broker is searching.
- const std::string status("statusindicator: find");
- LOG_TRC("Sending to Client [" << status << "].");
- ws->sendFrame(status.data(), status.size());
-
- const auto uriPublic = DocumentBroker::sanitizeURI(uri);
- const auto docKey = DocumentBroker::getDocKey(uriPublic);
- LOG_INF("Sanitized URI [" << uri << "] to [" << uriPublic.toString() <<
- "] and mapped to docKey [" << docKey << "] for session [" << id << "].");
-
- // Check if readonly session is required
- bool isReadOnly = false;
- for (const auto& param : uriPublic.getQueryParameters())
- {
- LOG_DBG("Query param: " << param.first << ", value: " << param.second);
- if (param.first == "permission" && param.second == "readonly")
- {
- isReadOnly = true;
- }
- }
-
- int retry = 3;
- while (retry-- > 0)
- {
- auto docBroker = findOrCreateDocBroker_Blocks(uri, docKey, ws, id, uriPublic);
- if (docBroker)
- {
- auto session = createNewClientSession(ws, id, uriPublic, docBroker, isReadOnly);
- if (session)
- {
- // Process the request in an exception-safe way.
- processGetRequest(ws, id, docBroker, session);
- break;
- }
- }
-
- if (retry > 0)
- {
- LOG_WRN("Failed to connect DocBroker and Client Session, retrying.");
- LOOLWSD::checkAndRestoreForKit();
- }
- else
- {
- const std::string msg = SERVICE_UNAVAILABLE_INTERNAL_ERROR;
- LOG_ERR("handleGetRequest: Giving up trying to connect client: " << msg);
- try
- {
- ws->sendFrame(msg.data(), msg.size());
- // abnormal close frame handshake
- ws->shutdown(WebSocket::WS_ENDPOINT_GOING_AWAY);
- }
- catch (const std::exception& exc2)
- {
- LOG_ERR("handleGetRequest: exception while sending WS error message [" << msg << "]: " << exc2.what());
- }
-
- break;
- }
- }
- }
- catch (const std::exception& exc)
- {
- LOG_INF("Finished GET request handler for session [" << id << "] on uri [" << uri << "] with exception: " << exc.what());
- throw;
- }
-
- LOG_INF("Finished GET request handler for session [" << id << "] on uri [" << uri << "].");
- }
-
- /// Find the DocumentBroker for the given docKey, if one exists.
- /// Otherwise, creates and adds a new one to DocBrokers.
- /// May return null if terminating or MaxDocuments limit is reached.
- /// After returning a valid instance DocBrokers must be cleaned up after exceptions.
- static std::shared_ptr<DocumentBroker> findOrCreateDocBroker_Blocks(const std::string& uri,
- const std::string& docKey,
- std::shared_ptr<LOOLWebSocket>& ws,
- const std::string& id,
- const Poco::URI& uriPublic)
- {
- LOG_INF("Find or create DocBroker for docKey [" << docKey <<
- "] for session [" << id << "] on url [" << uriPublic.toString() << "].");
-
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
-
- cleanupDocBrokers();
-
- if (TerminationFlag)
- {
- LOG_ERR("Termination flag set. No loading new session [" << id << "]");
- return nullptr;
- }
-
- std::shared_ptr<DocumentBroker> docBroker;
-
- // Lookup this document.
- auto it = DocBrokers.find(docKey);
- if (it != DocBrokers.end() && it->second)
- {
- // Get the DocumentBroker from the Cache.
- LOG_DBG("Found DocumentBroker with docKey [" << docKey << "].");
- docBroker = it->second;
- if (docBroker->isMarkedToDestroy())
- {
- // Let the waiting happen in parallel to new requests.
- docBrokersLock.unlock();
-
- // If this document is going out, wait.
- LOG_DBG("Document [" << docKey << "] is marked to destroy, waiting to reload.");
-
- bool timedOut = true;
- for (size_t i = 0; i < COMMAND_TIMEOUT_MS / POLL_TIMEOUT_MS; ++i)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS));
-
- docBrokersLock.lock();
- it = DocBrokers.find(docKey);
- if (it == DocBrokers.end())
- {
- // went away successfully
- docBroker.reset();
- docBrokersLock.unlock();
- timedOut = false;
- break;
- }
- else if (it->second && !it->second->isMarkedToDestroy())
- {
- // was actually replaced by a real document
- docBroker = it->second;
- docBrokersLock.unlock();
- timedOut = false;
- break;
- }
-
- docBrokersLock.unlock();
- if (TerminationFlag)
- {
- LOG_ERR("Termination flag set. Not loading new session [" << id << "]");
- return nullptr;
- }
- }
-
- if (timedOut)
- {
- // Still here, but marked to destroy. Proceed and hope to recover.
- LOG_ERR("Timed out while waiting for document to unload before loading.");
- }
-
- // Retake the lock and recheck if another thread created the DocBroker.
- docBrokersLock.lock();
- it = DocBrokers.find(docKey);
- if (it != DocBrokers.end())
- {
- // Get the DocumentBroker from the Cache.
- LOG_DBG("Found DocumentBroker for docKey [" << docKey << "].");
- docBroker = it->second;
- assert(docBroker);
- }
- }
- }
- else
- {
- LOG_DBG("No DocumentBroker with docKey [" << docKey << "] found. New Child and Document.");
- }
-
- Util::assertIsLocked(docBrokersLock);
-
- if (TerminationFlag)
- {
- LOG_ERR("Termination flag set. No loading new session [" << id << "]");
- return nullptr;
- }
-
- // Indicate to the client that we're connecting to the docbroker.
- const std::string statusConnect = "statusindicator: connect";
- LOG_TRC("Sending to Client [" << statusConnect << "].");
- ws->sendFrame(statusConnect.data(), statusConnect.size());
-
- if (!docBroker)
- {
- docBroker = createNewDocBroker(uri, docKey, ws, uriPublic);
- }
-
- return docBroker;
- }
-
- static std::shared_ptr<DocumentBroker> createNewDocBroker_Blocks(const std::string& uri,
- const std::string& docKey,
- std::shared_ptr<LOOLWebSocket>& ws,
- const Poco::URI& uriPublic)
- {
- Util::assertIsLocked(DocBrokersMutex);
-
- static_assert(MAX_DOCUMENTS > 0, "MAX_DOCUMENTS must be positive");
- if (DocBrokers.size() + 1 > MAX_DOCUMENTS)
- {
- LOG_ERR("Maximum number of open documents reached.");
- shutdownLimitReached(*ws);
- return nullptr;
- }
-
- // Request a kit process for this doc.
- // FIXME: Blocks !
- auto child = getNewChild_Blocks();
- if (!child)
- {
- // Let the client know we can't serve now.
- LOG_ERR("Failed to get new child.");
- return nullptr;
- }
-
- // Set the one we just created.
- LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
- auto docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, LOOLWSD::ChildRoot, child);
- child->setDocumentBroker(docBroker);
- DocBrokers.emplace(docKey, docBroker);
- LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "].");
-
- return docBroker;
- }
-
- static std::shared_ptr<ClientSession> createNewClientSession(std::shared_ptr<LOOLWebSocket>& ws,
- const std::string& id,
- const Poco::URI& uriPublic,
- const std::shared_ptr<DocumentBroker>& docBroker,
- const bool isReadOnly)
- {
- LOG_CHECK_RET(docBroker && "Null docBroker instance", nullptr);
- try
- {
- auto lock = docBroker->getLock();
-
- // Validate the broker.
- if (!docBroker->isAlive())
- {
- LOG_ERR("DocBroker is invalid or premature termination of child process.");
- lock.unlock();
- removeDocBrokerSession(docBroker);
- return nullptr;
- }
-
- if (docBroker->isMarkedToDestroy())
- {
- LOG_ERR("DocBroker is marked to destroy, can't add session.");
- lock.unlock();
- removeDocBrokerSession(docBroker);
- return nullptr;
- }
-
- // Now we have a DocumentBroker and we're ready to process client commands.
- const std::string statusReady = "statusindicator: ready";
- LOG_TRC("Sending to Client [" << statusReady << "].");
- ws->sendFrame(statusReady.data(), statusReady.size());
-
- // In case of WOPI, if this session is not set as readonly, it might be set so
- // later after making a call to WOPI host which tells us the permission on files
- // (UserCanWrite param).
- auto session = std::make_shared<ClientSession>(id, ws, docBroker, uriPublic, isReadOnly);
-
- docBroker->addSession(session);
-
- lock.unlock();
-
- const std::string fs = FileUtil::checkDiskSpaceOnRegisteredFileSystems();
- if (!fs.empty())
- {
- LOG_WRN("File system of [" << fs << "] is dangerously low on disk space.");
- const std::string diskfullMsg = "error: cmd=internal kind=diskfull";
- // Alert all other existing sessions also
- Util::alertAllUsers(diskfullMsg);
- }
-
- return session;
- }
- catch (const std::exception& exc)
- {
- LOG_WRN("Exception while preparing session [" << id << "]: " << exc.what());
- removeDocBrokerSession(docBroker, id);
- }
-
- return nullptr;
- }
-
- /// Remove DocumentBroker session and instance from DocBrokers.
- static void removeDocBrokerSession(const std::shared_ptr<DocumentBroker>& docBroker, const std::string& id = "")
- {
- LOG_CHECK_RET(docBroker && "Null docBroker instance", );
-
- const auto docKey = docBroker->getDocKey();
- LOG_DBG("Removing docBroker [" << docKey << "]" << (id.empty() ? "" : (" and session [" + id + "].")));
-
- std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
- auto lock = docBroker->getLock();
-
- if (!id.empty())
- {
- docBroker->removeSession(id);
- }
-
- if (docBroker->getSessionsCount() == 0 || !docBroker->isAlive())
- {
- LOG_INF("Removing unloaded DocumentBroker for docKey [" << docKey << "].");
- DocBrokers.erase(docKey);
- docBroker->terminateChild(lock, "");
- }
- }
-
- /// Process GET requests.
- static void processGetRequest(std::shared_ptr<LOOLWebSocket>& ws,
- const std::string& id,
- const std::shared_ptr<DocumentBroker>& docBroker,
- const std::shared_ptr<ClientSession>& session)
- {
- LOG_CHECK_RET(docBroker && "Null docBroker instance", );
- const auto docKey = docBroker->getDocKey();
- LOG_CHECK_RET(session && "Null ClientSession instance", );
- try
- {
- // Let messages flow.
- IoUtil::SocketProcessor(ws, "client_ws_" + id,
- [&session](const std::vector<char>& payload)
- {
- return session->handleInput(payload.data(), payload.size());
- },
- [&session]() { session->closeFrame(); },
- []() { return TerminationFlag || isShuttingDown(); });
-
- // Connection terminated. Destroy session.
- LOG_DBG("Client session [" << id << "] on docKey [" << docKey << "] terminated. Cleaning up.");
-
- auto docLock = docBroker->getLock();
-
- // We issue a force-save when last editable (non-readonly) session is going away
- const bool forceSave = docBroker->startDestroy(id);
- if (forceSave)
- {
- LOG_INF("Shutdown of the last editable (non-readonly) session, saving the document before tearing down.");
- }
-
- // We need to wait until the save notification reaches us
- // and Storage persists the document.
- if (!docBroker->autoSave(forceSave, COMMAND_TIMEOUT_MS, docLock))
- {
- LOG_ERR("Auto-save before closing failed.");
- }
-
- const auto sessionsCount = docBroker->removeSession(id);
- docLock.unlock();
-
- if (sessionsCount == 0)
- {
- // We've supposedly destroyed the last session, now cleanup.
- removeDocBrokerSession(docBroker);
- }
-
- LOG_INF("Finishing GET request handler for session [" << id << "].");
- }
- catch (const UnauthorizedRequestException& exc)
- {
- LOG_ERR("Error in client request handler: " << exc.toString());
- const std::string status = "error: cmd=internal kind=unauthorized";
- LOG_TRC("Sending to Client [" << status << "].");
- ws->sendFrame(status.data(), status.size());
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Error in client request handler: " << exc.what());
- }
-
- try
- {
- if (session->isCloseFrame())
- {
- LOG_TRC("Normal close handshake.");
- // Client initiated close handshake
- // respond close frame
- ws->shutdown();
- }
- else if (!isShuttingDown())
- {
- // something wrong, with internal exceptions
- LOG_TRC("Abnormal close handshake.");
- session->closeFrame();
- ws->shutdown(WebSocket::WS_ENDPOINT_GOING_AWAY);
- }
- else
- {
- std::lock_guard<std::mutex> lock(ClientWebSocketsMutex);
- LOG_TRC("Capturing Client WS for [" << id << "]");
- ClientWebSockets.push_back(ws);
- }
- }
- catch (const std::exception& exc)
- {
- LOG_WRN("Exception while closing socket for session [" << id <<
- "] of docKey [" << docKey << "]: " << exc.what());
- }
- }
+ std::unique_lock<std::mutex> locka(DocBrokersMutex);
+ std::unique_lock<std::mutex> lockb(NewChildrenMutex);
- /// Sends back the WOPI Discovery XML.
- /// The XML needs to be preprocessed to stamp the correct URL etc.
- /// Returns true if a response has been sent.
- static bool handleGetWOPIDiscovery(HTTPServerRequest& request, HTTPServerResponse& response)
+ namespace chrono = std::chrono;
+ const auto startTime = chrono::steady_clock::now();
+ do
{
- std::string discoveryPath = Path(Application::instance().commandPath()).parent().toString() + "discovery.xml";
- if (!File(discoveryPath).exists())
+ LOG_DBG("getNewChild: Rebalancing children.");
+ int numPreSpawn = LOOLWSD::NumPreSpawnedChildren;
+ ++numPreSpawn; // Replace the one we'll dispatch just now.
+ if (rebalanceChildren(numPreSpawn) < 0)
{
- discoveryPath = LOOLWSD::FileServerRoot + "/discovery.xml";
+ // Fatal. Let's fail and retry at a higher level.
+ LOG_DBG("getNewChild: rebalancing of children failed.");
+ return nullptr;
}
- const std::string mediaType = "text/xml";
- const std::string action = "action";
- const std::string urlsrc = "urlsrc";
- const auto& config = Application::instance().config();
- const std::string loleafletHtml = config.getString("loleaflet_html", "loleaflet.html");
- const std::string uriValue = ((LOOLWSD::isSSLEnabled() || LOOLWSD::isSSLTermination()) ? "https://" : "http://")
- + (LOOLWSD::ServerName.empty() ? request.getHost() : LOOLWSD::ServerName)
- + "/loleaflet/" LOOLWSD_VERSION_HASH "/" + loleafletHtml + '?';
+ // With valgrind we need extended time to spawn kits.
+#ifdef KIT_IN_PROCESS
+ const auto timeoutMs = CHILD_TIMEOUT_MS;
+#else
+ const auto timeoutMs = CHILD_TIMEOUT_MS * (LOOLWSD::NoCapsForKit ? 100 : 1);
+#endif
+ LOG_TRC("Waiting for a new child for a max of " << timeoutMs << " ms.");
+ const auto timeout = chrono::milliseconds(timeoutMs);
+ // FIXME: blocks ...
+ if (NewChildrenCV.wait_for(lockb, timeout, []() { return !NewChildren.empty(); }))
+ {
+ auto child = NewChildren.back();
+ NewChildren.pop_back();
+ const auto available = NewChildren.size();
- InputSource inputSrc(discoveryPath);
- DOMParser parser;
- AutoPtr<Poco::XML::Document> docXML = parser.parse(&inputSrc);
- AutoPtr<NodeList> listNodes = docXML->getElementsByTagName(action);
+ // Validate before returning.
+ if (child && child->isAlive())
+ {
+ LOG_DBG("getNewChild: Have " << available << " spare " <<
+ (available == 1 ? "child" : "children") <<
+ " after poping [" << child->getPid() << "] to return.");
+ return child;
+ }
- for (unsigned long it = 0; it < listNodes->length(); ++it)
+ LOG_WRN("getNewChild: popped dead child, need to find another.");
+ }
+ else
{
- static_cast<Element*>(listNodes->item(it))->setAttribute(urlsrc, uriValue);
+ LOG_WRN("getNewChild: No available child. Sending spawn request to forkit and failing.");
}
-
- std::ostringstream ostrXML;
- DOMWriter writer;
- writer.writeNode(ostrXML, docXML);
-
- response.set("User-Agent", "LOOLWSD WOPI Agent");
- response.setContentLength(ostrXML.str().length());
- response.setContentType(mediaType);
- response.setChunkedTransferEncoding(false);
-
- std::ostream& ostr = response.send();
- ostr << ostrXML.str();
- LOG_INF("Sent discovery.xml successfully.");
- return true;
}
+ while (chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - startTime).count() <
+ CHILD_TIMEOUT_MS * 4);
-public:
+ LOG_DBG("getNewChild: Timed out while waiting for new child.");
+ return nullptr;
+}
- void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override
+/// Handles the filename part of the convert-to POST request payload.
+class ConvertToPartHandler : public PartHandler
+{
+ std::string& _filename;
+public:
+ ConvertToPartHandler(std::string& filename)
+ : _filename(filename)
{
- if (UnitWSD::get().filterHandleRequest(
- UnitWSD::TestRequest::Client,
- request, response))
- return;
-
- const auto connectionNum = ++LOOLWSD::NumConnections;
- if (connectionNum > MAX_CONNECTIONS)
- {
- --LOOLWSD::NumConnections;
- LOG_ERR("Limit on maximum number of connections of " << MAX_CONNECTIONS << " reached.");
- // accept handshake
- LOOLWebSocket ws(request, response);
- shutdownLimitReached(ws);
- return;
- }
-
- try
- {
- const auto id = LOOLWSD::GenSessionId();
- LOG_TRC("Accepted connection #" << connectionNum << " of " <<
- MAX_CONNECTIONS << " as session [" << id << "].");
- handleClientRequest(request, response, id);
- }
- catch (const std::exception& exc)
- {
- // Nothing to do.
- }
-
- --LOOLWSD::NumConnections;
}
- static void handleClientRequest(HTTPServerRequest& request, HTTPServerResponse& response, const std::string& id)
+ virtual void handlePart(const MessageHeader& header, std::istream& stream) override
{
- Util::setThreadName("client_ws_" + id);
-
- LOG_DBG("Thread started.");
-
- Poco::URI requestUri(request.getURI());
- LOG_DBG("Handling: " << request.getURI());
-
- StringTokenizer reqPathTokens(request.getURI(), "/?", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-
- bool responded = false;
- try
- {
- if ((request.getMethod() == HTTPRequest::HTTP_GET ||
- request.getMethod() == HTTPRequest::HTTP_HEAD) &&
- request.getURI() == "/")
- {
- std::string mimeType = "text/plain";
- std::string responseString = "OK";
- response.setContentLength(responseString.length());
- response.setContentType(mimeType);
- response.setChunkedTransferEncoding(false);
- std::ostream& ostr = response.send();
- if (request.getMethod() == HTTPRequest::HTTP_GET)
- {
- ostr << responseString;
- }
- responded = true;
- }
- else if (request.getMethod() == HTTPRequest::HTTP_GET && request.getURI() == "/favicon.ico")
- {
- std::string mimeType = "image/vnd.microsoft.icon";
- std::string faviconPath = Path(Application::instance().commandPath()).parent().toString() + "favicon.ico";
- if (!File(faviconPath).exists())
- {
- faviconPath = LOOLWSD::FileServerRoot + "/favicon.ico";
- }
- response.setContentType(mimeType);
- response.sendFile(faviconPath, mimeType);
- responded = true;
- }
- else if (request.getMethod() == HTTPRequest::HTTP_GET && request.getURI() == "/hosting/discovery")
- {
- // http://server/hosting/discovery
- responded = handleGetWOPIDiscovery(request, response);
- }
- else if (!(request.find("Upgrade") != request.end() && Poco::icompare(request["Upgrade"], "websocket") == 0) &&
- reqPathTokens.count() > 0 && reqPathTokens[0] == "lool")
- {
- // All post requests have url prefix 'lool'.
- responded = handlePostRequest_Blocks(request, response, id);
- }
- else if (reqPathTokens.count() > 2 && reqPathTokens[0] == "lool" && reqPathTokens[2] == "ws")
- {
- auto ws = std::make_shared<LOOLWebSocket>(request, response);
- responded = true; // After upgrading to WS we should not set HTTP response.
- handleGetRequest_Blocks(reqPathTokens[1], ws, id);
- }
- else
- {
- LOG_ERR("Unknown resource: " << request.getURI());
- response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
- }
- }
- catch (const Exception& exc)
- {
- LOG_ERR("ClientRequestHandler::handleClientRequest: " << exc.displayText() <<
- (exc.nested() ? " (" + exc.nested()->displayText() + ")" : ""));
- response.setStatusAndReason(HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
- }
- catch (const UnauthorizedRequestException& exc)
- {
- LOG_ERR("ClientRequestHandler::handleClientRequest: UnauthorizedException: " << exc.toString());
- response.setStatusAndReason(HTTPResponse::HTTP_UNAUTHORIZED);
- }
- catch (const BadRequestException& exc)
- {
- LOG_ERR("ClientRequestHandler::handleClientRequest: BadRequestException: " << exc.toString());
- response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
- }
- catch (const std::exception& exc)
+ // Extract filename and put it to a temporary directory.
+ std::string disp;
+ NameValueCollection params;
+ if (header.has("Content-Disposition"))
{
- LOG_ERR("ClientRequestHandler::handleClientRequest: Exception: " << exc.what());
- response.setStatusAndReason(HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
+ std::string cd = header.get("Content-Disposition");
+ MessageHeader::splitParameters(cd, disp, params);
}
- if (responded)
- {
- LOG_DBG("Already sent response!?");
- }
- else
- {
- // I wonder if this code path has ever been exercised
- LOG_DBG("Attempting to send response");
- response.setContentLength(0);
- std::ostream& os = response.send();
- LOG_DBG("Response stream " << (os.good() ? "*is*" : "is not") << " good after send.");
- }
+ if (!params.has("filename"))
+ return;
+
+ Path tempPath = Path::forDirectory(Poco::TemporaryFile::tempName() + "/");
+ File(tempPath).createDirectories();
+ // Prevent user inputting anything funny here.
+ // A "filename" should always be a filename, not a path
+ const Path filenameParam(params.get("filename"));
+ tempPath.setFileName(filenameParam.getFileName());
+ _filename = tempPath.toString();
- LOG_DBG("Thread finished.");
+ // Copy the stream to _filename.
+ std::ofstream fileStream;
+ fileStream.open(_filename);
+ StreamCopier::copyStream(stream, fileStream);
+ fileStream.close();
}
};
-#endif
/// Handler of announcements that a new loolkit process was created.
///
@@ -1483,59 +619,6 @@ public:
}
};
-#if 0 // loolnb
-/// External (client) connection handler factory.
-/// Creates handler objects.
-class ClientRequestHandlerFactory : public HTTPRequestHandlerFactory
-{
-public:
- ClientRequestHandlerFactory()
- {
- }
-
- HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override
- {
- Util::setThreadName("client_req_hdl");
-
- auto logger = Log::info();
- logger << "Request from " << request.clientAddress().toString() << ": "
- << request.getMethod() << " " << request.getURI() << " "
- << request.getVersion();
-
- for (const auto& it : request)
- {
- logger << " / " << it.first << ": " << it.second;
- }
-
- logger << Log::end;
-
- // Routing
- Poco::URI requestUri(request.getURI());
- std::vector<std::string> reqPathSegs;
- requestUri.getPathSegments(reqPathSegs);
- HTTPRequestHandler* requestHandler;
-
- // File server
- if (reqPathSegs.size() >= 1 && reqPathSegs[0] == "loleaflet")
- {
- requestHandler = FileServer::createRequestHandler();
- }
- // Admin LOOLWebSocket Connections
- else if (reqPathSegs.size() >= 2 && reqPathSegs[0] == "lool" && reqPathSegs[1] == "adminws")
- {
- requestHandler = Admin::createRequestHandler();
- }
- // Client post and websocket connections
- else
- {
- requestHandler = new ClientRequestHandler();
- }
-
- return requestHandler;
- }
-};
-#endif
-
/// Internal (prisoner) connection handler factory.
/// Creates handler objects.
class PrisonerRequestHandlerFactory : public HTTPRequestHandlerFactory
@@ -1626,23 +709,6 @@ inline Poco::Net::ServerSocket* getServerSocket(int portNumber, bool reuseDetail
}
}
-#if 0
-inline Poco::Net::ServerSocket* findFreeServerPort(int& portNumber)
-{
- Poco::Net::ServerSocket* socket = nullptr;
- while (!socket)
- {
- socket = getServerSocket(portNumber, false);
- if (!socket)
- {
- portNumber++;
- LOG_INF("client port busy - trying " << portNumber);
- }
- }
- return socket;
-}
-#endif
-
inline Poco::Net::ServerSocket* getMasterSocket(int portNumber)
{
try
@@ -2344,10 +1410,10 @@ bool LOOLWSD::createForKit()
std::mutex Connection::Mutex;
#endif
-static std::shared_ptr<DocumentBroker> createDocBroker_Blocks(WebSocketHandler& ws,
- const std::string& uri,
- const std::string& docKey,
- const Poco::URI& uriPublic)
+static std::shared_ptr<DocumentBroker> createDocBroker(WebSocketHandler& ws,
+ const std::string& uri,
+ const std::string& docKey,
+ const Poco::URI& uriPublic)
{
Util::assertIsLocked(DocBrokersMutex);
@@ -2359,21 +1425,9 @@ static std::shared_ptr<DocumentBroker> createDocBroker_Blocks(WebSocketHandler&
return nullptr;
}
- // Request a kit process for this doc.
-
- // FIXME: blocks !
- auto child = getNewChild_Blocks();
- if (!child)
- {
- // Let the client know we can't serve now.
- LOG_ERR("Failed to get new child.");
- return nullptr;
- }
-
// Set the one we just created.
LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
- auto docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, LOOLWSD::ChildRoot, child);
- child->setDocumentBroker(docBroker);
+ auto docBroker = DocumentBroker::create(uri, uriPublic, docKey, LOOLWSD::ChildRoot);
DocBrokers.emplace(docKey, docBroker);
LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "].");
@@ -2384,11 +1438,11 @@ static std::shared_ptr<DocumentBroker> createDocBroker_Blocks(WebSocketHandler&
/// Otherwise, creates and adds a new one to DocBrokers.
/// May return null if terminating or MaxDocuments limit is reached.
/// After returning a valid instance DocBrokers must be cleaned up after exceptions.
-static std::shared_ptr<DocumentBroker> findOrCreateDocBroker_Blocks(WebSocketHandler& ws,
- const std::string& uri,
- const std::string& docKey,
- const std::string& id,
- const Poco::URI& uriPublic)
+static std::shared_ptr<DocumentBroker> findOrCreateDocBroker(WebSocketHandler& ws,
+ const std::string& uri,
+ const std::string& docKey,
+ const std::string& id,
+ const Poco::URI& uriPublic)
{
LOG_INF("Find or create DocBroker for docKey [" << docKey <<
"] for session [" << id << "] on url [" << uriPublic.toString() << "].");
@@ -2420,6 +1474,10 @@ static std::shared_ptr<DocumentBroker> findOrCreateDocBroker_Blocks(WebSocketHan
// If this document is going out, wait.
LOG_DBG("Document [" << docKey << "] is marked to destroy, waiting to reload.");
+ // FIXME: - easiest to send a fast message to the
+ // client to wait & retry in a bit ...
+
+#if 0 // loolnb
bool timedOut = true;
for (size_t i = 0; i < COMMAND_TIMEOUT_MS / POLL_TIMEOUT_MS; ++i)
{
@@ -2459,17 +1517,7 @@ static std::shared_ptr<DocumentBroker> findOrCreateDocBroker_Blocks(WebSocketHan
// Still here, but marked to destroy. Proceed and hope to recover.
LOG_ERR("Timed out while waiting for document to unload before loading.");
}
-
- // Retake the lock and recheck if another thread created the DocBroker.
- docBrokersLock.lock();
- it = DocBrokers.find(docKey);
- if (it != DocBrokers.end())
- {
- // Get the DocumentBroker from the Cache.
- LOG_DBG("Found DocumentBroker for docKey [" << docKey << "].");
- docBroker = it->second;
- assert(docBroker);
- }
+#endif
}
}
else
@@ -2491,9 +1539,7 @@ static std::shared_ptr<DocumentBroker> findOrCreateDocBroker_Blocks(WebSocketHan
ws.sendFrame(statusConnect);
if (!docBroker)
- {
- docBroker = createDocBroker_Blocks(ws, uri, docKey, uriPublic);
- }
+ docBroker = createDocBroker(ws, uri, docKey, uriPublic);
return docBroker;
}
@@ -2725,7 +1771,7 @@ private:
}
else if (reqPathTokens.count() > 2 && reqPathTokens[0] == "lool" && reqPathTokens[2] == "ws")
{
- handleClientWsRequest_Blocks(request, reqPathTokens[1]);
+ handleClientWsRequest(request, reqPathTokens[1]);
}
else
{
@@ -2909,17 +1955,8 @@ private:
// In that case, we can use a pool and index by publicPath.
std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
- // Request a kit process for this doc.
- auto child = getNewChild_Blocks();
- if (!child)
- {
- // Let the client know we can't serve now.
- throw std::runtime_error("Failed to spawn lokit child.");
- }
-
LOG_DBG("New DocumentBroker for docKey [" << docKey << "].");
- auto docBroker = std::make_shared<DocumentBroker>(fromPath, uriPublic, docKey, LOOLWSD::ChildRoot, child);
- child->setDocumentBroker(docBroker);
+ auto docBroker = DocumentBroker::create(fromPath, uriPublic, docKey, LOOLWSD::ChildRoot);
cleanupDocBrokers();
@@ -3126,7 +2163,7 @@ private:
throw BadRequestException("Invalid or unknown request.");
}
- void handleClientWsRequest_Blocks(const Poco::Net::HTTPRequest& request, const std::string& url)
+ void handleClientWsRequest(const Poco::Net::HTTPRequest& request, const std::string& url)
{
// requestHandler = new ClientRequestHandler();
LOG_INF("Client WS request" << request.getURI() << ", url: " << url);
@@ -3166,46 +2203,16 @@ private:
LOG_INF("URL [" << url << "] is " << (isReadOnly ? "readonly" : "writable") << ".");
- // FIXME: we need to push this all out into its own thread - to not block.
-
// Request a kit process for this doc.
- int retry = 3;
- while (retry-- > 0)
+ auto docBroker = findOrCreateDocBroker(ws, url, docKey, _id, uriPublic);
+ if (docBroker)
{
- auto docBroker = findOrCreateDocBroker_Blocks(ws, url, docKey, _id, uriPublic);
- if (docBroker)
- {
- _clientSession = createNewClientSession(ws, _id, uriPublic, docBroker, isReadOnly);
- if (_clientSession)
- {
- _clientSession->onConnect(_socket);
- break;
- }
- }
-
- if (retry > 0)
- {
- LOG_WRN("Failed to connect DocBroker and Client Session, retrying.");
- LOOLWSD::checkAndRestoreForKit();
- }
- else
- {
- const std::string msg = SERVICE_UNAVAILABLE_INTERNAL_ERROR;
- LOG_ERR("handleGetRequest: Giving up trying to connect client: " << msg);
- try
- {
- ws.sendFrame(msg);
- // abnormal close frame handshake
- ws.shutdown(WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY);
- }
- catch (const std::exception& exc2)
- {
- LOG_ERR("handleGetRequest: exception while sending WS error message [" << msg << "]: " << exc2.what());
- }
-
- break;
- }
+ _clientSession = createNewClientSession(ws, _id, uriPublic, docBroker, isReadOnly);
+ if (_clientSession)
+ _clientSession->onConnect(_socket);
}
+ if (!docBroker || !_clientSession)
+ LOG_WRN("Failed to connect DocBroker and Client Session.");
}
void saveDocument()
@@ -3536,10 +2543,6 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
static_assert(MAX_CONNECTIONS >= 3, "MAX_CONNECTIONS must be at least 3");
const auto maxThreadCount = MAX_CONNECTIONS * 5;
-#if 0 // loolnb
- auto params1 = new HTTPServerParams();
- params1->setMaxThreads(maxThreadCount);
-#endif
auto params2 = new HTTPServerParams();
params2->setMaxThreads(maxThreadCount);
@@ -3577,24 +2580,6 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
return Application::EXIT_SOFTWARE;
}
-#if 0 // loolnb
- // Now we can serve clients; Start listening on the public port.
- std::unique_ptr<Poco::Net::ServerSocket> psvs(
- UnitWSD::isUnitTesting() ?
- findFreeServerPort(ClientPortNumber) :
- getServerSocket(ClientPortNumber, true));
- if (!psvs)
- {
- LOG_FTL("Failed to listen on client port (" <<
- ClientPortNumber << ") or find a free port. Exiting.");
- return Application::EXIT_SOFTWARE;
- }
-
- HTTPServer srv(new ClientRequestHandlerFactory(), threadPool, *psvs, params1);
- LOG_INF("Starting master server listening on " << ClientPortNumber);
- srv.start();
-#endif
-
// TODO loolnb
srv.start(ClientPortNumber);
diff --git a/wsd/LOOLWSD.hpp b/wsd/LOOLWSD.hpp
index fee0091..94ff147 100644
--- a/wsd/LOOLWSD.hpp
+++ b/wsd/LOOLWSD.hpp
@@ -21,8 +21,11 @@
#include "Util.hpp"
+class ChildProcess;
class TraceFileWriter;
+std::shared_ptr<ChildProcess> getNewChild_Blocks();
+
/// The Server class which is responsible for all
/// external interactions.
class LOOLWSD : public Poco::Util::ServerApplication
More information about the Libreoffice-commits
mailing list