[Libreoffice-commits] online.git: loleaflet/js wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp
Michael Meeks (via logerrit)
logerrit at kemper.freedesktop.org
Fri Apr 24 13:59:01 UTC 2020
loleaflet/js/global.js | 12 +++++++-----
wsd/DocumentBroker.hpp | 3 ++-
wsd/LOOLWSD.cpp | 12 +++++++++---
wsd/ProxyProtocol.cpp | 42 +++++++++++++++++++++++++++---------------
wsd/ProxyProtocol.hpp | 7 ++-----
5 files changed, 47 insertions(+), 29 deletions(-)
New commits:
commit c28fff4cfa3fb37a887db2656dd535760e5a0a86
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 20:15:08 2020 +0000
Commit: Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri Apr 24 15:58:42 2020 +0200
Proxy: open four wait sockets concurrently.
Change-Id: I08b85677be528b7aa77272a8527c9bacf3f7c336
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92809
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
Reviewed-by: Jan Holesovsky <kendy at collabora.com>
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 023a8bf7c..0739681a2 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -207,7 +207,7 @@
this.sessionId = 'fetchsession';
this.id = window.proxySocketCounter++;
this.sendCounter = 0;
- this.readWaiting = false;
+ this.readWaiting = 0;
this.onclose = function() {
};
this.onerror = function() {
@@ -314,9 +314,9 @@
// horrors ...
this.readInterval = setInterval(function() {
- if (this.readWaiting) // one at a time for now
+ if (that.readWaiting > 4) // max 4 waiting connections concurrently.
return;
- if (this.sessionId == 'fetchsession')
+ if (that.sessionId == 'fetchsession')
return; // waiting for our session id.
var req = new XMLHttpRequest();
// fetch session id:
@@ -325,13 +325,15 @@
that.parseIncomingArray(new Uint8Array(this.response));
else
console.debug('Handle error ' + this.status);
- that.readWaiting = false;
+ });
+ req.addEventListener('loadend', function() {
+ that.readWaiting--;
});
req.open('GET', that.getEndPoint('read'));
req.setRequestHeader('SessionId', that.sessionId);
req.responseType = 'arraybuffer';
req.send('');
- that.readWaiting = true;
+ that.readWaiting++;
}, 250);
};
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 4e854d3fb..6e265c3dc 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -147,7 +147,8 @@ public:
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<StreamSocket> &socket);
+ const std::shared_ptr<StreamSocket> &socket,
+ bool isWaiting);
/// Thread safe termination of this broker if it has a lingering thread
void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 7607facef..ef7c01c2a 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2941,11 +2941,16 @@ private:
// Request a kit process for this doc.
std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
none, url, docKey, _id, uriPublic);
+
+ std::string fullURL = request.getURI();
+ std::string ending = "/ws/read";
+ bool isWaiting = (fullURL.size() > ending.size() &&
+ std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin()));
if (docBroker)
{
// need to move into the DocumentBroker context before doing session lookup / creation etc.
std::string id = _id;
- disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId]
+ disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, isWaiting]
(const std::shared_ptr<Socket> &moveSocket)
{
LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey());
@@ -2955,7 +2960,8 @@ private:
// We no longer own this socket.
moveSocket->setThreadOwner(std::thread::id());
- docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]()
+ docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust,
+ sessionId, moveSocket, isWaiting]()
{
// Now inside the document broker thread ...
LOG_TRC("In the docbroker thread for " << docBroker->getDocKey());
@@ -2965,7 +2971,7 @@ private:
{
docBroker->handleProxyRequest(
sessionId, id, uriPublic, isReadOnly,
- hostNoTrust, streamSocket);
+ hostNoTrust, streamSocket, isWaiting);
return;
}
catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 8aaff0131..25602f146 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,8 @@ void DocumentBroker::handleProxyRequest(
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<StreamSocket> &socket)
+ const std::shared_ptr<StreamSocket> &socket,
+ bool isWaiting)
{
std::shared_ptr<ClientSession> clientSession;
if (sessionId == "fetchsession")
@@ -82,7 +83,7 @@ void DocumentBroker::handleProxyRequest(
auto proxy = std::static_pointer_cast<ProxyProtocolHandler>(
protocol);
- proxy->handleRequest(uriPublic.toString(), socket);
+ proxy->handleRequest(isWaiting, socket);
}
bool ProxyProtocolHandler::parseEmitIncoming(
@@ -128,16 +129,13 @@ bool ProxyProtocolHandler::parseEmitIncoming(
return true;
}
-void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
- const std::shared_ptr<Socket> &socket)
+void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket)
{
auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
- bool bRead = uriPublic.find("/write") == std::string::npos;
- LOG_INF("Proxy handle request " << uriPublic << " type: " <<
- (bRead ? "read" : "write"));
+ LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
- if (bRead)
+ if (!isWaiting)
{
if (!_msgHandler)
LOG_WRN("unusual - incoming message with no-one to handle it");
@@ -149,13 +147,27 @@ void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
}
}
- if (!flushQueueTo(streamSocket) && !bRead)
+ if (!flushQueueTo(streamSocket) && isWaiting)
{
- // longer running 'write socket'
- _writeSockets.push_back(streamSocket);
+ LOG_TRC("Queue a waiting socket");
+ // longer running 'write socket' (marked 'read' by the client)
+ _outSockets.push_back(streamSocket);
+ if (_outSockets.size() > 16)
+ {
+ LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
+ // cleanup older waiting sockets.
+ auto sockWeak = _outSockets.front();
+ _outSockets.erase(_outSockets.begin());
+ auto sock = sockWeak.lock();
+ if (sock)
+ sock->shutdown();
+ }
}
else
+ {
+ LOG_TRC("Return a reply immediately");
socket->shutdown();
+ }
}
void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
@@ -202,7 +214,7 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
void ProxyProtocolHandler::dumpState(std::ostream& os)
{
- os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
+ os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
for (auto it : _writeQueue)
Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
}
@@ -256,10 +268,10 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
{
std::weak_ptr<StreamSocket> sock;
- while (!_writeSockets.empty())
+ while (!_outSockets.empty())
{
- sock = _writeSockets.front();
- _writeSockets.erase(_writeSockets.begin());
+ sock = _outSockets.front();
+ _outSockets.erase(_outSockets.begin());
auto realSock = sock.lock();
if (realSock)
return realSock;
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 7548a7acd..af1394164 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -60,11 +60,8 @@ public:
void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
void getIOStats(uint64_t &sent, uint64_t &recv) override;
void dumpState(std::ostream& os) override;
-
bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
-
- void handleRequest(const std::string &uriPublic,
- const std::shared_ptr<Socket> &socket);
+ void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
private:
std::shared_ptr<StreamSocket> popWriteSocket();
@@ -88,7 +85,7 @@ private:
};
/// queue things when we have no socket to hand.
std::vector<std::shared_ptr<Message>> _writeQueue;
- std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
+ std::vector<std::weak_ptr<StreamSocket>> _outSockets;
};
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
More information about the Libreoffice-commits
mailing list