[Libreoffice-commits] online.git: Branch 'feature/proxyhack' - 3 commits - loleaflet/js wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp
Michael Meeks (via logerrit)
logerrit at kemper.freedesktop.org
Fri Mar 20 20:46:53 UTC 2020
Rebased ref, commits from common ancestor:
commit 0775320f6f59b2904b56a69ab3c109672455bc1d
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 20:45:38 2020 +0000
Commit: Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Mar 20 20:46:22 2020 +0000
Proxy: poll for output space if we need waking.
Change-Id: I18a5e71bd3342eea7992672d9be1f5518ea008e3
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 74d72af98..98c1d1afe 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -196,6 +196,7 @@
};
this.parseIncomingArray = function(arr) {
var decoder = new TextDecoder();
+ console.debug('Parse incoming array of length ' + arr.length);
for (var i = 0; i < arr.length; ++i)
{
var left = arr.length - i;
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 25602f146..7db033c99 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -219,6 +219,15 @@ void ProxyProtocolHandler::dumpState(std::ostream& os)
Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
}
+int ProxyProtocolHandler::getPollEvents(std::chrono::steady_clock::time_point /* now */,
+ int &/* timeoutMaxMs */)
+{
+ int events = POLLIN;
+ if (_msgHandler && _msgHandler->hasQueuedMessages())
+ events |= POLLOUT;
+ return events;
+}
+
void ProxyProtocolHandler::performWrites()
{
if (_msgHandler)
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index ca7070b27..61f0f32be 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -35,11 +35,7 @@ public:
void handleIncomingMessage(SocketDisposition &/* disposition */) override;
int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int &/* timeoutMaxMs */) override
- {
- // underlying buffer based polling is fine.
- return POLLIN;
- }
+ int &/* timeoutMaxMs */) override;
void checkTimeout(std::chrono::steady_clock::time_point /* now */) override
{
commit 712ab179d58e1ffe7142e7bb92f6eaf196924392
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 20:15:08 2020 +0000
Commit: Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Mar 20 20:46:22 2020 +0000
Proxy: open four wait sockets concurrently.
Change-Id: I08b85677be528b7aa77272a8527c9bacf3f7c336
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 4f68e841b..74d72af98 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -187,7 +187,7 @@
this.sessionId = 'fetchsession';
this.id = window.proxySocketCounter++;
this.sendCounter = 0;
- this.readWaiting = false;
+ this.readWaiting = 0;
this.onclose = function() {
};
this.onerror = function() {
@@ -296,9 +296,9 @@
// horrors ...
this.readInterval = setInterval(function() {
- if (this.readWaiting) // one at a time for now
+ if (that.readWaiting > 4) // max 4 waiting connections concurrently.
return;
- if (this.sessionId == 'fetchsession')
+ if (that.sessionId == 'fetchsession')
return; // waiting for our session id.
var req = new XMLHttpRequest();
// fetch session id:
@@ -307,13 +307,15 @@
that.parseIncomingArray(new Uint8Array(this.response));
else
console.debug('Handle error ' + this.status);
- that.readWaiting = false;
+ });
+ req.addEventListener('loadend', function() {
+ that.readWaiting--;
});
req.open('GET', that.getEndPoint('read'));
req.setRequestHeader('SessionId', that.sessionId);
req.responseType = 'arraybuffer';
req.send('');
- that.readWaiting = true;
+ that.readWaiting++;
}, 250);
};
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index cb5bf54e3..6bcb606f3 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -258,7 +258,8 @@ public:
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<StreamSocket> &socket);
+ const std::shared_ptr<StreamSocket> &socket,
+ bool isWaiting);
/// Thread safe termination of this broker if it has a lingering thread
void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 69bf3357e..116b5d432 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2831,11 +2831,16 @@ private:
// Request a kit process for this doc.
std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
none, url, docKey, _id, uriPublic);
+
+ std::string fullURL = request.getURI();
+ std::string ending = "/ws/read";
+ bool isWaiting = (fullURL.size() > ending.size() &&
+ std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin()));
if (docBroker)
{
// need to move into the DocumentBroker context before doing session lookup / creation etc.
std::string id = _id;
- disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId]
+ disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, isWaiting]
(const std::shared_ptr<Socket> &moveSocket)
{
LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey());
@@ -2845,7 +2850,8 @@ private:
// We no longer own this socket.
moveSocket->setThreadOwner(std::thread::id());
- docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]()
+ docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust,
+ sessionId, moveSocket, isWaiting]()
{
// Now inside the document broker thread ...
LOG_TRC("In the docbroker thread for " << docBroker->getDocKey());
@@ -2855,7 +2861,7 @@ private:
{
docBroker->handleProxyRequest(
sessionId, id, uriPublic, isReadOnly,
- hostNoTrust, streamSocket);
+ hostNoTrust, streamSocket, isWaiting);
return;
}
catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 8aaff0131..25602f146 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,8 @@ void DocumentBroker::handleProxyRequest(
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<StreamSocket> &socket)
+ const std::shared_ptr<StreamSocket> &socket,
+ bool isWaiting)
{
std::shared_ptr<ClientSession> clientSession;
if (sessionId == "fetchsession")
@@ -82,7 +83,7 @@ void DocumentBroker::handleProxyRequest(
auto proxy = std::static_pointer_cast<ProxyProtocolHandler>(
protocol);
- proxy->handleRequest(uriPublic.toString(), socket);
+ proxy->handleRequest(isWaiting, socket);
}
bool ProxyProtocolHandler::parseEmitIncoming(
@@ -128,16 +129,13 @@ bool ProxyProtocolHandler::parseEmitIncoming(
return true;
}
-void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
- const std::shared_ptr<Socket> &socket)
+void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket)
{
auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
- bool bRead = uriPublic.find("/write") == std::string::npos;
- LOG_INF("Proxy handle request " << uriPublic << " type: " <<
- (bRead ? "read" : "write"));
+ LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
- if (bRead)
+ if (!isWaiting)
{
if (!_msgHandler)
LOG_WRN("unusual - incoming message with no-one to handle it");
@@ -149,13 +147,27 @@ void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
}
}
- if (!flushQueueTo(streamSocket) && !bRead)
+ if (!flushQueueTo(streamSocket) && isWaiting)
{
- // longer running 'write socket'
- _writeSockets.push_back(streamSocket);
+ LOG_TRC("Queue a waiting socket");
+ // longer running 'write socket' (marked 'read' by the client)
+ _outSockets.push_back(streamSocket);
+ if (_outSockets.size() > 16)
+ {
+ LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
+ // cleanup older waiting sockets.
+ auto sockWeak = _outSockets.front();
+ _outSockets.erase(_outSockets.begin());
+ auto sock = sockWeak.lock();
+ if (sock)
+ sock->shutdown();
+ }
}
else
+ {
+ LOG_TRC("Return a reply immediately");
socket->shutdown();
+ }
}
void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
@@ -202,7 +214,7 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
void ProxyProtocolHandler::dumpState(std::ostream& os)
{
- os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
+ os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
for (auto it : _writeQueue)
Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
}
@@ -256,10 +268,10 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
{
std::weak_ptr<StreamSocket> sock;
- while (!_writeSockets.empty())
+ while (!_outSockets.empty())
{
- sock = _writeSockets.front();
- _writeSockets.erase(_writeSockets.begin());
+ sock = _outSockets.front();
+ _outSockets.erase(_outSockets.begin());
auto realSock = sock.lock();
if (realSock)
return realSock;
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 091ac3295..ca7070b27 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -61,11 +61,8 @@ public:
void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
void getIOStats(uint64_t &sent, uint64_t &recv) override;
void dumpState(std::ostream& os);
-
bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
-
- void handleRequest(const std::string &uriPublic,
- const std::shared_ptr<Socket> &socket);
+ void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
private:
std::shared_ptr<StreamSocket> popWriteSocket();
@@ -89,7 +86,7 @@ private:
};
/// queue things when we have no socket to hand.
std::vector<std::shared_ptr<Message>> _writeQueue;
- std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
+ std::vector<std::weak_ptr<StreamSocket>> _outSockets;
};
#endif
commit 277fc4dbf0354f7e75827671b2f75995343c7faa
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 19:05:48 2020 +0000
Commit: Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Mar 20 20:46:22 2020 +0000
Proxy: re-write css image URLs to handle the proxy.
Change-Id: I09f3dea2f5e3a51869d5b0aa3f473d8f3ba75f44
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 78bceaa9c..4f68e841b 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -317,6 +317,33 @@
}, 250);
};
+ if (global.socketProxy)
+ {
+ // re-write relative URLs in CSS - somewhat grim.
+ window.addEventListener('load', function() {
+ var sheets = document.styleSheets;
+ for (var i = 0; i < sheets.length; ++i) {
+ var relBases = sheets[i].href.split('/');
+ relBases.pop(); // bin last - css name.
+ var replaceBase = 'url("' + relBases.join('/') + '/images/';
+
+ var rules = sheets[i].cssRules || sheets[i].rules;
+ for (var r = 0; r < rules.length; ++r) {
+ if (!rules[r] || !rules[r].style)
+ continue;
+ var img = rules[r].style.backgroundImage;
+ if (img === '' || img === undefined)
+ continue;
+ if (img.startsWith('url("images/'))
+ {
+ rules[r].style.backgroundImage =
+ img.replace('url("images/', replaceBase);
+ }
+ }
+ }
+ }, false);
+ }
+
global.createWebSocket = function(uri) {
if (global.socketProxy) {
return new global.ProxySocket(uri);
More information about the Libreoffice-commits
mailing list