[Libreoffice-commits] online.git: loleaflet/js wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp

Michael Meeks (via logerrit) logerrit at kemper.freedesktop.org
Fri Apr 24 13:59:01 UTC 2020


 loleaflet/js/global.js |   12 +++++++-----
 wsd/DocumentBroker.hpp |    3 ++-
 wsd/LOOLWSD.cpp        |   12 +++++++++---
 wsd/ProxyProtocol.cpp  |   42 +++++++++++++++++++++++++++---------------
 wsd/ProxyProtocol.hpp  |    7 ++-----
 5 files changed, 47 insertions(+), 29 deletions(-)

New commits:
commit c28fff4cfa3fb37a887db2656dd535760e5a0a86
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri Mar 20 20:15:08 2020 +0000
Commit:     Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri Apr 24 15:58:42 2020 +0200

    Proxy: open four wait sockets concurrently.
    
    Change-Id: I08b85677be528b7aa77272a8527c9bacf3f7c336
    Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92809
    Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
    Reviewed-by: Jan Holesovsky <kendy at collabora.com>

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 023a8bf7c..0739681a2 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -207,7 +207,7 @@
 		this.sessionId = 'fetchsession';
 		this.id = window.proxySocketCounter++;
 		this.sendCounter = 0;
-		this.readWaiting = false;
+		this.readWaiting = 0;
 		this.onclose = function() {
 		};
 		this.onerror = function() {
@@ -314,9 +314,9 @@
 
 		// horrors ...
 		this.readInterval = setInterval(function() {
-			if (this.readWaiting) // one at a time for now
+			if (that.readWaiting > 4) // max 4 waiting connections concurrently.
 				return;
-			if (this.sessionId == 'fetchsession')
+			if (that.sessionId == 'fetchsession')
 				return; // waiting for our session id.
 			var req = new XMLHttpRequest();
 			// fetch session id:
@@ -325,13 +325,15 @@
 					that.parseIncomingArray(new Uint8Array(this.response));
 				else
 					console.debug('Handle error ' + this.status);
-				that.readWaiting = false;
+			});
+			req.addEventListener('loadend', function() {
+				that.readWaiting--;
 			});
 			req.open('GET', that.getEndPoint('read'));
 			req.setRequestHeader('SessionId', that.sessionId);
 			req.responseType = 'arraybuffer';
 			req.send('');
-			that.readWaiting = true;
+			that.readWaiting++;
 		}, 250);
 	};
 
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 4e854d3fb..6e265c3dc 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -147,7 +147,8 @@ public:
         const Poco::URI& uriPublic,
         const bool isReadOnly,
         const std::string& hostNoTrust,
-        const std::shared_ptr<StreamSocket> &socket);
+        const std::shared_ptr<StreamSocket> &socket,
+        bool isWaiting);
 
     /// Thread safe termination of this broker if it has a lingering thread
     void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 7607facef..ef7c01c2a 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2941,11 +2941,16 @@ private:
         // Request a kit process for this doc.
         std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
             none, url, docKey, _id, uriPublic);
+
+        std::string fullURL = request.getURI();
+        std::string ending = "/ws/read";
+        bool isWaiting = (fullURL.size() > ending.size() &&
+                          std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin()));
         if (docBroker)
         {
             // need to move into the DocumentBroker context before doing session lookup / creation etc.
             std::string id = _id;
-            disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId]
+            disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, isWaiting]
                                 (const std::shared_ptr<Socket> &moveSocket)
                 {
                     LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey());
@@ -2955,7 +2960,8 @@ private:
                     // We no longer own this socket.
                     moveSocket->setThreadOwner(std::thread::id());
 
-                    docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]()
+                    docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust,
+                                            sessionId, moveSocket, isWaiting]()
                         {
                             // Now inside the document broker thread ...
                             LOG_TRC("In the docbroker thread for " << docBroker->getDocKey());
@@ -2965,7 +2971,7 @@ private:
                             {
                                 docBroker->handleProxyRequest(
                                     sessionId, id, uriPublic, isReadOnly,
-                                    hostNoTrust, streamSocket);
+                                    hostNoTrust, streamSocket, isWaiting);
                                 return;
                             }
                             catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 8aaff0131..25602f146 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,8 @@ void DocumentBroker::handleProxyRequest(
     const Poco::URI& uriPublic,
     const bool isReadOnly,
     const std::string& hostNoTrust,
-    const std::shared_ptr<StreamSocket> &socket)
+    const std::shared_ptr<StreamSocket> &socket,
+    bool isWaiting)
 {
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
@@ -82,7 +83,7 @@ void DocumentBroker::handleProxyRequest(
     auto proxy = std::static_pointer_cast<ProxyProtocolHandler>(
         protocol);
 
-    proxy->handleRequest(uriPublic.toString(), socket);
+    proxy->handleRequest(isWaiting, socket);
 }
 
 bool ProxyProtocolHandler::parseEmitIncoming(
@@ -128,16 +129,13 @@ bool ProxyProtocolHandler::parseEmitIncoming(
     return true;
 }
 
-void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
-                                         const std::shared_ptr<Socket> &socket)
+void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket)
 {
     auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
 
-    bool bRead = uriPublic.find("/write") == std::string::npos;
-    LOG_INF("Proxy handle request " << uriPublic << " type: " <<
-            (bRead ? "read" : "write"));
+    LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
 
-    if (bRead)
+    if (!isWaiting)
     {
         if (!_msgHandler)
             LOG_WRN("unusual - incoming message with no-one to handle it");
@@ -149,13 +147,27 @@ void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
         }
     }
 
-    if (!flushQueueTo(streamSocket) && !bRead)
+    if (!flushQueueTo(streamSocket) && isWaiting)
     {
-        // longer running 'write socket'
-        _writeSockets.push_back(streamSocket);
+        LOG_TRC("Queue a waiting socket");
+        // longer running 'write socket' (marked 'read' by the client)
+        _outSockets.push_back(streamSocket);
+        if (_outSockets.size() > 16)
+        {
+            LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
+            // cleanup older waiting sockets.
+            auto sockWeak = _outSockets.front();
+            _outSockets.erase(_outSockets.begin());
+            auto sock = sockWeak.lock();
+            if (sock)
+                sock->shutdown();
+        }
     }
     else
+    {
+        LOG_TRC("Return a reply immediately");
         socket->shutdown();
+    }
 }
 
 void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
@@ -202,7 +214,7 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
 
 void ProxyProtocolHandler::dumpState(std::ostream& os)
 {
-    os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
+    os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
     for (auto it : _writeQueue)
         Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
 }
@@ -256,10 +268,10 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
 std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
 {
     std::weak_ptr<StreamSocket> sock;
-    while (!_writeSockets.empty())
+    while (!_outSockets.empty())
     {
-        sock = _writeSockets.front();
-        _writeSockets.erase(_writeSockets.begin());
+        sock = _outSockets.front();
+        _outSockets.erase(_outSockets.begin());
         auto realSock = sock.lock();
         if (realSock)
             return realSock;
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 7548a7acd..af1394164 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -60,11 +60,8 @@ public:
     void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
     void getIOStats(uint64_t &sent, uint64_t &recv) override;
     void dumpState(std::ostream& os) override;
-
     bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
-
-    void handleRequest(const std::string &uriPublic,
-                       const std::shared_ptr<Socket> &socket);
+    void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
 
 private:
     std::shared_ptr<StreamSocket> popWriteSocket();
@@ -88,7 +85,7 @@ private:
     };
     /// queue things when we have no socket to hand.
     std::vector<std::shared_ptr<Message>> _writeQueue;
-    std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
+    std::vector<std::weak_ptr<StreamSocket>> _outSockets;
 };
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */


More information about the Libreoffice-commits mailing list