[Libreoffice-commits] online.git: Branch 'feature/proxyhack' - 3 commits - loleaflet/js wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp

Michael Meeks (via logerrit) logerrit at kemper.freedesktop.org
Fri Mar 20 20:46:53 UTC 2020


Rebased ref, commits from common ancestor:
commit 0775320f6f59b2904b56a69ab3c109672455bc1d
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 20:45:38 2020 +0000
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Mar 20 20:46:22 2020 +0000

    Proxy: poll for output space if we need waking.
    
    Change-Id: I18a5e71bd3342eea7992672d9be1f5518ea008e3

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 74d72af98..98c1d1afe 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -196,6 +196,7 @@
 		};
 		this.parseIncomingArray = function(arr) {
 			var decoder = new TextDecoder();
+			console.debug('Parse incoming array of length ' + arr.length);
 			for (var i = 0; i < arr.length; ++i)
 			{
 				var left = arr.length - i;
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 25602f146..7db033c99 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -219,6 +219,15 @@ void ProxyProtocolHandler::dumpState(std::ostream& os)
         Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
 }
 
+int ProxyProtocolHandler::getPollEvents(std::chrono::steady_clock::time_point /* now */,
+                                        int &/* timeoutMaxMs */)
+{
+    int events = POLLIN;
+    if (_msgHandler && _msgHandler->hasQueuedMessages())
+        events |= POLLOUT;
+    return events;
+}
+
 void ProxyProtocolHandler::performWrites()
 {
     if (_msgHandler)
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index ca7070b27..61f0f32be 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -35,11 +35,7 @@ public:
     void handleIncomingMessage(SocketDisposition &/* disposition */) override;
 
     int getPollEvents(std::chrono::steady_clock::time_point /* now */,
-                      int &/* timeoutMaxMs */) override
-    {
-        // underlying buffer based polling is fine.
-        return POLLIN;
-    }
+                      int &/* timeoutMaxMs */) override;
 
     void checkTimeout(std::chrono::steady_clock::time_point /* now */) override
     {
commit 712ab179d58e1ffe7142e7bb92f6eaf196924392
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 20:15:08 2020 +0000
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Mar 20 20:46:22 2020 +0000

    Proxy: open four wait sockets concurrently.
    
    Change-Id: I08b85677be528b7aa77272a8527c9bacf3f7c336

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 4f68e841b..74d72af98 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -187,7 +187,7 @@
 		this.sessionId = 'fetchsession';
 		this.id = window.proxySocketCounter++;
 		this.sendCounter = 0;
-		this.readWaiting = false;
+		this.readWaiting = 0;
 		this.onclose = function() {
 		};
 		this.onerror = function() {
@@ -296,9 +296,9 @@
 
 		// horrors ...
 		this.readInterval = setInterval(function() {
-			if (this.readWaiting) // one at a time for now
+			if (that.readWaiting > 4) // max 4 waiting connections concurrently.
 				return;
-			if (this.sessionId == 'fetchsession')
+			if (that.sessionId == 'fetchsession')
 				return; // waiting for our session id.
 			var req = new XMLHttpRequest();
 			// fetch session id:
@@ -307,13 +307,15 @@
 					that.parseIncomingArray(new Uint8Array(this.response));
 				else
 					console.debug('Handle error ' + this.status);
-				that.readWaiting = false;
+			});
+			req.addEventListener('loadend', function() {
+				that.readWaiting--;
 			});
 			req.open('GET', that.getEndPoint('read'));
 			req.setRequestHeader('SessionId', that.sessionId);
 			req.responseType = 'arraybuffer';
 			req.send('');
-			that.readWaiting = true;
+			that.readWaiting++;
 		}, 250);
 	};
 
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index cb5bf54e3..6bcb606f3 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -258,7 +258,8 @@ public:
         const Poco::URI& uriPublic,
         const bool isReadOnly,
         const std::string& hostNoTrust,
-        const std::shared_ptr<StreamSocket> &socket);
+        const std::shared_ptr<StreamSocket> &socket,
+        bool isWaiting);
 
     /// Thread safe termination of this broker if it has a lingering thread
     void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 69bf3357e..116b5d432 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2831,11 +2831,16 @@ private:
         // Request a kit process for this doc.
         std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
             none, url, docKey, _id, uriPublic);
+
+        std::string fullURL = request.getURI();
+        std::string ending = "/ws/read";
+        bool isWaiting = (fullURL.size() > ending.size() &&
+                          std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin()));
         if (docBroker)
         {
             // need to move into the DocumentBroker context before doing session lookup / creation etc.
             std::string id = _id;
-            disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId]
+            disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, isWaiting]
                                 (const std::shared_ptr<Socket> &moveSocket)
                 {
                     LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey());
@@ -2845,7 +2850,8 @@ private:
                     // We no longer own this socket.
                     moveSocket->setThreadOwner(std::thread::id());
 
-                    docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]()
+                    docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust,
+                                            sessionId, moveSocket, isWaiting]()
                         {
                             // Now inside the document broker thread ...
                             LOG_TRC("In the docbroker thread for " << docBroker->getDocKey());
@@ -2855,7 +2861,7 @@ private:
                             {
                                 docBroker->handleProxyRequest(
                                     sessionId, id, uriPublic, isReadOnly,
-                                    hostNoTrust, streamSocket);
+                                    hostNoTrust, streamSocket, isWaiting);
                                 return;
                             }
                             catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 8aaff0131..25602f146 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,8 @@ void DocumentBroker::handleProxyRequest(
     const Poco::URI& uriPublic,
     const bool isReadOnly,
     const std::string& hostNoTrust,
-    const std::shared_ptr<StreamSocket> &socket)
+    const std::shared_ptr<StreamSocket> &socket,
+    bool isWaiting)
 {
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
@@ -82,7 +83,7 @@ void DocumentBroker::handleProxyRequest(
     auto proxy = std::static_pointer_cast<ProxyProtocolHandler>(
         protocol);
 
-    proxy->handleRequest(uriPublic.toString(), socket);
+    proxy->handleRequest(isWaiting, socket);
 }
 
 bool ProxyProtocolHandler::parseEmitIncoming(
@@ -128,16 +129,13 @@ bool ProxyProtocolHandler::parseEmitIncoming(
     return true;
 }
 
-void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
-                                         const std::shared_ptr<Socket> &socket)
+void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket)
 {
     auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
 
-    bool bRead = uriPublic.find("/write") == std::string::npos;
-    LOG_INF("Proxy handle request " << uriPublic << " type: " <<
-            (bRead ? "read" : "write"));
+    LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
 
-    if (bRead)
+    if (!isWaiting)
     {
         if (!_msgHandler)
             LOG_WRN("unusual - incoming message with no-one to handle it");
@@ -149,13 +147,27 @@ void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
         }
     }
 
-    if (!flushQueueTo(streamSocket) && !bRead)
+    if (!flushQueueTo(streamSocket) && isWaiting)
     {
-        // longer running 'write socket'
-        _writeSockets.push_back(streamSocket);
+        LOG_TRC("Queue a waiting socket");
+        // longer running 'write socket' (marked 'read' by the client)
+        _outSockets.push_back(streamSocket);
+        if (_outSockets.size() > 16)
+        {
+            LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
+            // cleanup older waiting sockets.
+            auto sockWeak = _outSockets.front();
+            _outSockets.erase(_outSockets.begin());
+            auto sock = sockWeak.lock();
+            if (sock)
+                sock->shutdown();
+        }
     }
     else
+    {
+        LOG_TRC("Return a reply immediately");
         socket->shutdown();
+    }
 }
 
 void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
@@ -202,7 +214,7 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
 
 void ProxyProtocolHandler::dumpState(std::ostream& os)
 {
-    os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
+    os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
     for (auto it : _writeQueue)
         Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
 }
@@ -256,10 +268,10 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
 std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
 {
     std::weak_ptr<StreamSocket> sock;
-    while (!_writeSockets.empty())
+    while (!_outSockets.empty())
     {
-        sock = _writeSockets.front();
-        _writeSockets.erase(_writeSockets.begin());
+        sock = _outSockets.front();
+        _outSockets.erase(_outSockets.begin());
         auto realSock = sock.lock();
         if (realSock)
             return realSock;
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 091ac3295..ca7070b27 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -61,11 +61,8 @@ public:
     void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
     void getIOStats(uint64_t &sent, uint64_t &recv) override;
     void dumpState(std::ostream& os);
-
     bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
-
-    void handleRequest(const std::string &uriPublic,
-                       const std::shared_ptr<Socket> &socket);
+    void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
 
 private:
     std::shared_ptr<StreamSocket> popWriteSocket();
@@ -89,7 +86,7 @@ private:
     };
     /// queue things when we have no socket to hand.
     std::vector<std::shared_ptr<Message>> _writeQueue;
-    std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
+    std::vector<std::weak_ptr<StreamSocket>> _outSockets;
 };
 
 #endif
commit 277fc4dbf0354f7e75827671b2f75995343c7faa
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 19:05:48 2020 +0000
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Mar 20 20:46:22 2020 +0000

    Proxy: re-write css image URLs to handle the proxy.
    
    Change-Id: I09f3dea2f5e3a51869d5b0aa3f473d8f3ba75f44

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 78bceaa9c..4f68e841b 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -317,6 +317,33 @@
 		}, 250);
 	};
 
+	if (global.socketProxy)
+	{
+		// re-write relative URLs in CSS - somewhat grim.
+		window.addEventListener('load', function() {
+			var sheets = document.styleSheets;
+			for (var i = 0; i < sheets.length; ++i) {
+				var relBases = sheets[i].href.split('/');
+				relBases.pop(); // bin last - css name.
+				var replaceBase = 'url("' + relBases.join('/') + '/images/';
+
+				var rules = sheets[i].cssRules || sheets[i].rules;
+				for (var r = 0; r < rules.length; ++r) {
+					if (!rules[r] || !rules[r].style)
+						continue;
+					var img = rules[r].style.backgroundImage;
+					if (img === '' || img === undefined)
+						continue;
+					if (img.startsWith('url("images/'))
+					{
+						rules[r].style.backgroundImage =
+							img.replace('url("images/', replaceBase);
+					}
+				}
+			}
+		}, false);
+	}
+
 	global.createWebSocket = function(uri) {
 		if (global.socketProxy) {
 			return new global.ProxySocket(uri);


More information about the Libreoffice-commits mailing list