[Libreoffice-commits] online.git: loleaflet/js wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp

Michael Meeks (via logerrit) logerrit at kemper.freedesktop.org
Fri Apr 24 14:45:15 UTC 2020


 loleaflet/js/global.js |   20 +++++++++++++-------
 wsd/LOOLWSD.cpp        |    2 +-
 wsd/ProxyProtocol.cpp  |   48 +++++++++++++++++++++++++++++++-----------------
 wsd/ProxyProtocol.hpp  |    3 ++-
 4 files changed, 47 insertions(+), 26 deletions(-)

New commits:
commit 01519bdc644781cb66a2179dfe40343bf9769b84
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Sat Mar 21 15:07:10 2020 +0000
Commit:     Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri Apr 24 16:44:55 2020 +0200

    Proxy: improve debugging & naming.
    
    Change-Id: Ifba669a33855a67c9a4e968db42ef1a2cb301d63
    Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92813
    Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
    Reviewed-by: Jan Holesovsky <kendy at collabora.com>

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index c711c84b0..aacf59496 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -216,7 +216,7 @@
 		};
 		this.parseIncomingArray = function(arr) {
 			var decoder = new TextDecoder();
-			console.debug('Parse incoming array of length ' + arr.length);
+			console.debug('proxy: parse incoming array of length ' + arr.length);
 			for (var i = 0; i < arr.length; ++i)
 			{
 				var left = arr.length - i;
@@ -274,7 +274,7 @@
 					if (this.status == 200)
 						that.parseIncomingArray(new Uint8Array(this.response));
 					else
-						console.debug('Error on incoming response');
+						console.debug('proxy: error on incoming response');
 				});
 			}
 			req.send(that.sendQueue);
@@ -300,21 +300,24 @@
 				this.sendTimeout = setTimeout(this.doSend, 2 /* ms */);
 		};
 		this.close = function() {
-			console.debug('close socket');
+			console.debug('proxy: close socket');
 			this.readyState = 3;
 			this.onclose();
+			clearInterval(this.waitInterval);
+			this.waitInterval = undefined;
 		};
 		this.getEndPoint = function(type) {
 			var base = this.uri;
 			return base.replace(/^ws/, 'http') + '/' + type;
 		};
-		console.debug('New proxy socket ' + this.id + ' ' + this.uri);
+		console.debug('proxy: new socket ' + this.id + ' ' + this.uri);
 
 		// queue fetch of session id.
 		this.getSessionId();
 
 		// horrors ...
-		this.readInterval = setInterval(function() {
+		this.waitConnect = function() {
+			console.debug('proxy: waiting - ' + that.readWaiting + ' on session ' + that.sessionId);
 			if (that.readWaiting > 4) // max 4 waiting connections concurrently.
 				return;
 			if (that.sessionId == 'fetchsession')
@@ -329,13 +332,16 @@
 			});
 			req.addEventListener('loadend', function() {
 				that.readWaiting--;
+				console.debug('proxy: wait ended, re-issue');
+				that.waitConnect();
 			});
-			req.open('GET', that.getEndPoint('read'));
+			req.open('GET', that.getEndPoint('wait'));
 			req.setRequestHeader('SessionId', that.sessionId);
 			req.responseType = 'arraybuffer';
 			req.send('');
 			that.readWaiting++;
-		}, 250);
+		};
+		this.waitInterval = setInterval(this.waitConnect, 250);
 	};
 
 	if (global.socketProxy)
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index ef7c01c2a..ba1d83c77 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2943,7 +2943,7 @@ private:
             none, url, docKey, _id, uriPublic);
 
         std::string fullURL = request.getURI();
-        std::string ending = "/ws/read";
+        std::string ending = "/ws/wait";
         bool isWaiting = (fullURL.size() > ending.size() &&
                           std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin()));
         if (docBroker)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index c8f578559..89b3d9ba9 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -31,7 +31,7 @@ void DocumentBroker::handleProxyRequest(
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
     {
-        LOG_TRC("Create session for " << _docKey);
+        LOG_TRC("proxy: Create session for " << _docKey);
         clientSession = createNewClientSession(
                 std::make_shared<ProxyProtocolHandler>(),
                 id, uriPublic, isReadOnly, hostNoTrust);
@@ -39,7 +39,7 @@ void DocumentBroker::handleProxyRequest(
         LOOLWSD::checkDiskSpaceAndWarnClients(true);
         LOOLWSD::checkSessionLimitsAndWarnClients();
 
-        LOG_TRC("Returning id " << clientSession->getId());
+        LOG_TRC("proxy: Returning sessionId " << clientSession->getId());
 
         std::ostringstream oss;
         oss << "HTTP/1.1 200 OK\r\n"
@@ -57,7 +57,7 @@ void DocumentBroker::handleProxyRequest(
     }
     else
     {
-        LOG_TRC("Find session for " << _docKey << " with id " << sessionId);
+        LOG_TRC("proxy: find session for " << _docKey << " with id " << sessionId);
         for (const auto &it : _sessions)
         {
             if (it.second->getId() == sessionId)
@@ -133,28 +133,29 @@ void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<S
 {
     auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
 
-    LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
+    LOG_INF("proxy: handle request type: " << (isWaiting ? "wait" : "respond") <<
+            " on socket #" << socket->getFD());
 
     if (!isWaiting)
     {
         if (!_msgHandler)
-            LOG_WRN("unusual - incoming message with no-one to handle it");
+            LOG_WRN("proxy: unusual - incoming message with no-one to handle it");
         else if (!parseEmitIncoming(streamSocket))
         {
             std::stringstream oss;
             streamSocket->dumpState(oss);
-            LOG_ERR("bad socket structure " << oss.str());
+            LOG_ERR("proxy: bad socket structure " << oss.str());
         }
     }
 
     if (!flushQueueTo(streamSocket) && isWaiting)
     {
-        LOG_TRC("Queue a waiting socket");
+        LOG_TRC("proxy: queue a waiting out socket #" << streamSocket->getFD());
         // longer running 'write socket' (marked 'read' by the client)
         _outSockets.push_back(streamSocket);
         if (_outSockets.size() > 16)
         {
-            LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
+            LOG_ERR("proxy: Unexpected - client opening many concurrent waiting connections " << _outSockets.size());
             // cleanup older waiting sockets.
             auto sockWeak = _outSockets.front();
             _outSockets.erase(_outSockets.begin());
@@ -180,7 +181,7 @@ void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
 int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush)
 {
     _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
-    auto sock = popWriteSocket();
+    auto sock = popOutSocket();
     if (sock && flush)
     {
         flushQueueTo(sock);
@@ -230,16 +231,24 @@ int ProxyProtocolHandler::getPollEvents(std::chrono::steady_clock::time_point /*
     return events;
 }
 
-void ProxyProtocolHandler::performWrites()
+/// slurp from the core to us, @returns true if there are messages to send
+bool ProxyProtocolHandler::slurpHasMessages()
 {
-    if (_msgHandler)
+    if (_msgHandler && _msgHandler->hasQueuedMessages())
         _msgHandler->writeQueuedMessages();
-    if (_writeQueue.size() <= 0)
+
+    return _writeQueue.size() > 0;
+}
+
+void ProxyProtocolHandler::performWrites()
+{
+    if (!slurpHasMessages())
         return;
 
-    auto sock = popWriteSocket();
+    auto sock = popOutSocket();
     if (sock)
     {
+        LOG_TRC("proxy: performWrites");
         flushQueueTo(sock);
         sock->shutdown();
     }
@@ -247,9 +256,8 @@ void ProxyProtocolHandler::performWrites()
 
 bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket)
 {
-    // slurp from the core to us.
-    if (_msgHandler && _msgHandler->hasQueuedMessages())
-        _msgHandler->writeQueuedMessages();
+    if (!slurpHasMessages())
+        return false;
 
     size_t totalSize = 0;
     for (auto it : _writeQueue)
@@ -258,6 +266,8 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
     if (!totalSize)
         return false;
 
+    LOG_TRC("proxy: flushQueue of size " << totalSize << " to socket #" << socket->getFD() << " & close");
+
     std::ostringstream oss;
     oss << "HTTP/1.1 200 OK\r\n"
         "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
@@ -276,7 +286,7 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc
 }
 
 // LRU-ness ...
-std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popOutSocket()
 {
     std::weak_ptr<StreamSocket> sock;
     while (!_outSockets.empty())
@@ -285,8 +295,12 @@ std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
         _outSockets.erase(_outSockets.begin());
         auto realSock = sock.lock();
         if (realSock)
+        {
+            LOG_TRC("proxy: popped an out socket #" << realSock->getFD() << " leaving: " << _outSockets.size());
             return realSock;
+        }
     }
+    LOG_TRC("proxy: no out sockets to pop.");
     return std::shared_ptr<StreamSocket>();
 }
 
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index a2b98c62d..eb15edfdd 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -60,7 +60,8 @@ public:
     void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
 
 private:
-    std::shared_ptr<StreamSocket> popWriteSocket();
+    std::shared_ptr<StreamSocket> popOutSocket();
+    bool slurpHasMessages();
     int sendMessage(const char *msg, const size_t len, bool text, bool flush);
     bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
 


More information about the Libreoffice-commits mailing list