[Libreoffice-commits] online.git: loleaflet/js wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp
Michael Meeks (via logerrit)
logerrit at kemper.freedesktop.org
Fri Apr 24 14:45:15 UTC 2020
loleaflet/js/global.js | 20 +++++++++++++-------
wsd/LOOLWSD.cpp | 2 +-
wsd/ProxyProtocol.cpp | 48 +++++++++++++++++++++++++++++++-----------------
wsd/ProxyProtocol.hpp | 3 ++-
4 files changed, 47 insertions(+), 26 deletions(-)
New commits:
commit 01519bdc644781cb66a2179dfe40343bf9769b84
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Sat Mar 21 15:07:10 2020 +0000
Commit: Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri Apr 24 16:44:55 2020 +0200
Proxy: improve debugging & naming.
Change-Id: Ifba669a33855a67c9a4e968db42ef1a2cb301d63
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92813
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
Reviewed-by: Jan Holesovsky <kendy at collabora.com>
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index c711c84b0..aacf59496 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -216,7 +216,7 @@
};
this.parseIncomingArray = function(arr) {
var decoder = new TextDecoder();
- console.debug('Parse incoming array of length ' + arr.length);
+ console.debug('proxy: parse incoming array of length ' + arr.length);
for (var i = 0; i < arr.length; ++i)
{
var left = arr.length - i;
@@ -274,7 +274,7 @@
if (this.status == 200)
that.parseIncomingArray(new Uint8Array(this.response));
else
- console.debug('Error on incoming response');
+ console.debug('proxy: error on incoming response');
});
}
req.send(that.sendQueue);
@@ -300,21 +300,24 @@
this.sendTimeout = setTimeout(this.doSend, 2 /* ms */);
};
this.close = function() {
- console.debug('close socket');
+ console.debug('proxy: close socket');
this.readyState = 3;
this.onclose();
+ clearInterval(this.waitInterval);
+ this.waitInterval = undefined;
};
this.getEndPoint = function(type) {
var base = this.uri;
return base.replace(/^ws/, 'http') + '/' + type;
};
- console.debug('New proxy socket ' + this.id + ' ' + this.uri);
+ console.debug('proxy: new socket ' + this.id + ' ' + this.uri);
// queue fetch of session id.
this.getSessionId();
// horrors ...
- this.readInterval = setInterval(function() {
+ this.waitConnect = function() {
+ console.debug('proxy: waiting - ' + that.readWaiting + ' on session ' + that.sessionId);
if (that.readWaiting > 4) // max 4 waiting connections concurrently.
return;
if (that.sessionId == 'fetchsession')
@@ -329,13 +332,16 @@
});
req.addEventListener('loadend', function() {
that.readWaiting--;
+ console.debug('proxy: wait ended, re-issue');
+ that.waitConnect();
});
- req.open('GET', that.getEndPoint('read'));
+ req.open('GET', that.getEndPoint('wait'));
req.setRequestHeader('SessionId', that.sessionId);
req.responseType = 'arraybuffer';
req.send('');
that.readWaiting++;
- }, 250);
+ };
+ this.waitInterval = setInterval(this.waitConnect, 250);
};
if (global.socketProxy)
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index ef7c01c2a..ba1d83c77 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2943,7 +2943,7 @@ private:
none, url, docKey, _id, uriPublic);
std::string fullURL = request.getURI();
- std::string ending = "/ws/read";
+ std::string ending = "/ws/wait";
bool isWaiting = (fullURL.size() > ending.size() &&
std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin()));
if (docBroker)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index c8f578559..89b3d9ba9 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -31,7 +31,7 @@ void DocumentBroker::handleProxyRequest(
std::shared_ptr<ClientSession> clientSession;
if (sessionId == "fetchsession")
{
- LOG_TRC("Create session for " << _docKey);
+ LOG_TRC("proxy: Create session for " << _docKey);
clientSession = createNewClientSession(
std::make_shared<ProxyProtocolHandler>(),
id, uriPublic, isReadOnly, hostNoTrust);
@@ -39,7 +39,7 @@ void DocumentBroker::handleProxyRequest(
LOOLWSD::checkDiskSpaceAndWarnClients(true);
LOOLWSD::checkSessionLimitsAndWarnClients();
- LOG_TRC("Returning id " << clientSession->getId());
+ LOG_TRC("proxy: Returning sessionId " << clientSession->getId());
std::ostringstream oss;
oss << "HTTP/1.1 200 OK\r\n"
@@ -57,7 +57,7 @@ void DocumentBroker::handleProxyRequest(
}
else
{
- LOG_TRC("Find session for " << _docKey << " with id " << sessionId);
+ LOG_TRC("proxy: find session for " << _docKey << " with id " << sessionId);
for (const auto &it : _sessions)
{
if (it.second->getId() == sessionId)
@@ -133,28 +133,29 @@ void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<S
{
auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
- LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
+ LOG_INF("proxy: handle request type: " << (isWaiting ? "wait" : "respond") <<
+ " on socket #" << socket->getFD());
if (!isWaiting)
{
if (!_msgHandler)
- LOG_WRN("unusual - incoming message with no-one to handle it");
+ LOG_WRN("proxy: unusual - incoming message with no-one to handle it");
else if (!parseEmitIncoming(streamSocket))
{
std::stringstream oss;
streamSocket->dumpState(oss);
- LOG_ERR("bad socket structure " << oss.str());
+ LOG_ERR("proxy: bad socket structure " << oss.str());
}
}
if (!flushQueueTo(streamSocket) && isWaiting)
{
- LOG_TRC("Queue a waiting socket");
+ LOG_TRC("proxy: queue a waiting out socket #" << streamSocket->getFD());
// longer running 'write socket' (marked 'read' by the client)
_outSockets.push_back(streamSocket);
if (_outSockets.size() > 16)
{
- LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
+ LOG_ERR("proxy: Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
// cleanup older waiting sockets.
auto sockWeak = _outSockets.front();
_outSockets.erase(_outSockets.begin());
@@ -180,7 +181,7 @@ void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush)
{
_writeQueue.push_back(std::make_shared<Message>(msg, len, text));
- auto sock = popWriteSocket();
+ auto sock = popOutSocket();
if (sock && flush)
{
flushQueueTo(sock);
@@ -230,16 +231,24 @@ int ProxyProtocolHandler::getPollEvents(std::chrono::steady_clock::time_point /*
return events;
}
-void ProxyProtocolHandler::performWrites()
+/// slurp from the core to us, @returns true if there are messages to send
+bool ProxyProtocolHandler::slurpHasMessages()
{
- if (_msgHandler)
+ if (_msgHandler && _msgHandler->hasQueuedMessages())
_msgHandler->writeQueuedMessages();
- if (_writeQueue.size() <= 0)
+
+ return _writeQueue.size() > 0;
+}
+
+void ProxyProtocolHandler::performWrites()
+{
+ if (!slurpHasMessages())
return;
- auto sock = popWriteSocket();
+ auto sock = popOutSocket();
if (sock)
{
+ LOG_TRC("proxy: performWrites");
flushQueueTo(sock);
sock->shutdown();
}
@@ -247,9 +256,8 @@ void ProxyProtocolHandler::performWrites()
bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket)
{
- // slurp from the core to us.
- if (_msgHandler && _msgHandler->hasQueuedMessages())
- _msgHandler->writeQueuedMessages();
+ if (!slurpHasMessages())
+ return false;
size_t totalSize = 0;
for (auto it : _writeQueue)
@@ -258,6 +266,8 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
if (!totalSize)
return false;
+ LOG_TRC("proxy: flushQueue of size " << totalSize << " to socket #" << socket->getFD() << " & close");
+
std::ostringstream oss;
oss << "HTTP/1.1 200 OK\r\n"
"Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
@@ -276,7 +286,7 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
}
// LRU-ness ...
-std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popOutSocket()
{
std::weak_ptr<StreamSocket> sock;
while (!_outSockets.empty())
@@ -285,8 +295,12 @@ std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
_outSockets.erase(_outSockets.begin());
auto realSock = sock.lock();
if (realSock)
+ {
+ LOG_TRC("proxy: popped an out socket #" << realSock->getFD() << " leaving: " << _outSockets.size());
return realSock;
+ }
}
+ LOG_TRC("proxy: no out sockets to pop.");
return std::shared_ptr<StreamSocket>();
}
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index a2b98c62d..eb15edfdd 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -60,7 +60,8 @@ public:
void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
private:
- std::shared_ptr<StreamSocket> popWriteSocket();
+ std::shared_ptr<StreamSocket> popOutSocket();
+ bool slurpHasMessages();
int sendMessage(const char *msg, const size_t len, bool text, bool flush);
bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
More information about the Libreoffice-commits
mailing list