[Libreoffice-commits] online.git: loleaflet/js net/Socket.hpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp

Michael Meeks (via logerrit) logerrit at kemper.freedesktop.org
Fri Apr 24 11:58:07 UTC 2020


 loleaflet/js/global.js |   94 +++++++++++++++++++++--
 net/Socket.hpp         |   14 +++
 wsd/DocumentBroker.hpp |    2 
 wsd/LOOLWSD.cpp        |    2 
 wsd/ProxyProtocol.cpp  |  193 ++++++++++++++++++++++++++++++++++++++++++++++++-
 wsd/ProxyProtocol.hpp  |   81 +++++++++-----------
 6 files changed, 328 insertions(+), 58 deletions(-)

New commits:
commit fdc062b488afaeed5eaf061e279fff73623bccf3
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Thu Mar 19 15:54:28 2020 +0000
Commit:     Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri Apr 24 13:57:49 2020 +0200

    Proxy protocol bits.
    
    For now very silly: [T|B] + hex length + \n + content + \n
    
    Change-Id: I256b834a23cca975a705da2c569887665ac6be02
    Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92806
    Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
    Reviewed-by: Jan Holesovsky <kendy at collabora.com>

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 477c66a5d..eba6852b8 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -1,4 +1,5 @@
 /* -*- js-indent-level: 8 -*- */
+/* global Uint8Array */
 (function (global) {
 
 	var ua = navigator.userAgent.toLowerCase(),
@@ -212,19 +213,97 @@
 		};
 		this.onmessage = function() {
 		};
+		this.parseIncomingArray = function(arr) {
+			var decoder = new TextDecoder();
+			for (var i = 0; i < arr.length; ++i)
+			{
+				var left = arr.length - i;
+				if (left < 4)
+				{
+					console.debug('no data left');
+					break;
+				}
+				var type = String.fromCharCode(arr[i+0]);
+				if (type != 'T' && type != 'B')
+				{
+					console.debug('wrong data type: ' + type);
+					break;
+				}
+				if (arr[i+1] !== 48 && arr[i+2] !== 120) // '0x'
+				{
+					console.debug('missing hex preamble');
+					break;
+				}
+				i += 3;
+				var numStr = '';
+				var start = i;
+				while (arr[i] != 10) // '\n'
+					i++;
+				numStr = decoder.decode(arr.slice(start, i)); // FIXME: IE11
+				var size = parseInt(numStr, 16);
+
+				i++; // skip \n
+
+				var data;
+				if (type == 'T') // FIXME: IE11
+					data = decoder.decode(arr.slice(i, i + size));
+				else
+					data = arr.slice(i, i + size);
+
+				this.onmessage({ data: data });
+
+				i += size; // skip trailing '\n' in loop-increment
+			}
+		};
+		this.parseIncoming = function(type, msg) {
+			if (type === 'blob')
+			{
+				var fileReader = new FileReader();
+				var that = this;
+				fileReader.onload = function(event) {
+					that.parseIncomingArray(event.target.result);
+				};
+				fileReader.readAsArrayBuffer(msg);
+			}
+			else if (type === 'arraybuffer')
+			{
+				this.parseIncomingArray(new Uint8Array(msg));
+			}
+			else if (type === 'text' || type === '')
+			{
+				const encoder = new TextEncoder();
+				const arr = encoder.encode(msg);
+				this.parseIncomingArray(arr);
+			}
+			else
+				console.debug('Unknown encoding type: ' + type);
+		};
 		this.send = function(msg) {
 			console.debug('send msg "' + msg + '"');
 			var req = new XMLHttpRequest();
 			req.open('POST', this.getEndPoint('write'));
 			req.setRequestHeader('SessionId', this.sessionId);
 			if (this.sessionId === 'fetchsession')
+			{
+				req.responseType = 'text';
 				req.addEventListener('load', function() {
 					console.debug('got session: ' + this.responseText);
 					that.sessionId = this.responseText;
 					that.readyState = 1;
 					that.onopen();
 				});
-			req.send(msg);
+			}
+			else
+			{
+				req.responseType = 'arraybuffer';
+				req.addEventListener('load', function() {
+					if (this.status == 200)
+						that.parseIncoming(this.responseType, this.response);
+					else
+						console.debug('Error on incoming response');
+				});
+			}
+			req.send('B0x' + msg.length.toString(16) + '\n' + msg + '\n');
 		},
 		this.close = function() {
 			console.debug('close socket');
@@ -237,7 +316,6 @@
 		};
 		console.debug('New proxy socket ' + this.id + ' ' + this.uri);
 
-		// FIXME: perhaps a little risky.
 		this.send('fetchsession');
 		var that = this;
 
@@ -250,20 +328,16 @@
 			var req = new XMLHttpRequest();
 			// fetch session id:
 			req.addEventListener('load', function() {
-				console.debug('read: ' + this.responseText);
 				if (this.status == 200)
-				{
-					that.onmessage({ data: this.response });
-				}
+					that.parseIncoming(this.responseType, this.response);
 				else
-				{
 					console.debug('Handle error ' + this.status);
-				}
 				that.readWaiting = false;
 			});
 			req.open('GET', that.getEndPoint('read'));
-			req.setRequestHeader('SessionId', this.sessionId);
-			req.send(that.sessionId);
+			req.setRequestHeader('SessionId', that.sessionId);
+			req.responseType = 'arraybuffer';
+			req.send('');
 			that.readWaiting = true;
 		}, 250);
 	};
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 7ae042b86..2c94601ee 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -85,6 +85,10 @@ public:
     {
         _disposition = Type::CLOSED;
     }
+    std::shared_ptr<Socket> getSocket() const
+    {
+        return _socket;
+    }
     bool isMove() { return _disposition == Type::MOVE; }
     bool isClosed() { return _disposition == Type::CLOSED; }
 
@@ -923,6 +927,12 @@ public:
         std::vector<std::pair<size_t, size_t>> _spans;
     };
 
+    /// remove all queued input bytes
+    void clearInput()
+    {
+        _inBuffer.clear();
+    }
+
     /// Remove the first @count bytes from input buffer
     void eraseFirstInputBytes(const MessageMap &map)
     {
@@ -1086,6 +1096,8 @@ public:
     /// Does it look like we have some TLS / SSL where we don't expect it ?
     bool sniffSSL() const;
 
+    void dumpState(std::ostream& os) override;
+
 protected:
     /// Override to handle reading of socket data differently.
     virtual int readData(char* buf, int len)
@@ -1109,8 +1121,6 @@ protected:
 #endif
     }
 
-    void dumpState(std::ostream& os) override;
-
     void setShutdownSignalled()
     {
         _shutdownSignalled = true;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 7d7b5e67d..4e854d3fb 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -147,7 +147,7 @@ public:
         const Poco::URI& uriPublic,
         const bool isReadOnly,
         const std::string& hostNoTrust,
-        const std::shared_ptr<Socket> &moveSocket);
+        const std::shared_ptr<StreamSocket> &socket);
 
     /// Thread safe termination of this broker if it has a lingering thread
     void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 7aaa79e0d..c58430509 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2938,7 +2938,7 @@ private:
                             {
                                 docBroker->handleProxyRequest(
                                     sessionId, id, uriPublic, isReadOnly,
-                                    hostNoTrust, moveSocket);
+                                    hostNoTrust, streamSocket);
                                 return;
                             }
                             catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 41043a57a..8aaff0131 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,7 @@ void DocumentBroker::handleProxyRequest(
     const Poco::URI& uriPublic,
     const bool isReadOnly,
     const std::string& hostNoTrust,
-    const std::shared_ptr<Socket> &socket)
+    const std::shared_ptr<StreamSocket> &socket)
 {
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
@@ -37,6 +37,22 @@ void DocumentBroker::handleProxyRequest(
         addSession(clientSession);
         LOOLWSD::checkDiskSpaceAndWarnClients(true);
         LOOLWSD::checkSessionLimitsAndWarnClients();
+
+        LOG_TRC("Returning id " << clientSession->getId());
+
+        std::ostringstream oss;
+        oss << "HTTP/1.1 200 OK\r\n"
+            "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+            "User-Agent: " WOPI_AGENT_STRING "\r\n"
+            "Content-Length: " << clientSession->getId().size() << "\r\n"
+            "Content-Type: application/json\r\n"
+            "X-Content-Type-Options: nosniff\r\n"
+            "\r\n"
+            << clientSession->getId();
+
+        socket->send(oss.str());
+        socket->shutdown();
+        return;
     }
     else
     {
@@ -69,13 +85,186 @@ void DocumentBroker::handleProxyRequest(
     proxy->handleRequest(uriPublic.toString(), socket);
 }
 
+bool ProxyProtocolHandler::parseEmitIncoming(
+    const std::shared_ptr<StreamSocket> &socket)
+{
+    std::vector<char> &in = socket->getInBuffer();
+
+    std::stringstream oss;
+    socket->dumpState(oss);
+    LOG_TRC("Parse message:\n" << oss.str());
+
+    while (in.size() > 0)
+    {
+        if (in[0] != 'T' && in[0] != 'B')
+        {
+            LOG_ERR("Invalid message type " << in[0]);
+            return false;
+        }
+        auto it = in.begin() + 1;
+        for (; it != in.end() && *it != '\n'; ++it);
+        *it = '\0';
+        uint64_t len = strtoll( &in[1], nullptr, 16 );
+        in.erase(in.begin(), it + 1);
+        if (len > in.size())
+        {
+            LOG_ERR("Invalid message length " << len << " vs " << in.size());
+            return false;
+        }
+        // far from efficient:
+        std::vector<char> data;
+        data.insert(data.begin(), in.begin(), in.begin() + len + 1);
+        in.erase(in.begin(), in.begin() + len);
+
+        if (in.size() < 1 || in[0] != '\n')
+        {
+            LOG_ERR("Missing final newline");
+            return false;
+        }
+        in.erase(in.begin(), in.begin() + 1);
+
+        _msgHandler->handleMessage(data);
+    }
+    return true;
+}
+
 void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
                                          const std::shared_ptr<Socket> &socket)
 {
+    auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
+
     bool bRead = uriPublic.find("/write") == std::string::npos;
     LOG_INF("Proxy handle request " << uriPublic << " type: " <<
             (bRead ? "read" : "write"));
-    (void)socket;
+
+    if (bRead)
+    {
+        if (!_msgHandler)
+            LOG_WRN("unusual - incoming message with no-one to handle it");
+        else if (!parseEmitIncoming(streamSocket))
+        {
+            std::stringstream oss;
+            streamSocket->dumpState(oss);
+            LOG_ERR("bad socket structure " << oss.str());
+        }
+    }
+
+    if (!flushQueueTo(streamSocket) && !bRead)
+    {
+        // longer running 'write socket'
+        _writeSockets.push_back(streamSocket);
+    }
+    else
+        socket->shutdown();
+}
+
+void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
+{
+    std::stringstream oss;
+    disposition.getSocket()->dumpState(oss);
+    LOG_ERR("If you got here, it means we failed to parse this properly in handleRequest: " << oss.str());
+}
+
+int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush)
+{
+    _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
+    auto sock = popWriteSocket();
+    if (sock && flush)
+    {
+        flushQueueTo(sock);
+        sock->shutdown();
+    }
+
+    return len;
+}
+
+int ProxyProtocolHandler::sendTextMessage(const char *msg, const size_t len, bool flush) const
+{
+    LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
+    return const_cast<ProxyProtocolHandler *>(this)->sendMessage(msg, len, true, flush);
+}
+
+int ProxyProtocolHandler::sendBinaryMessage(const char *data, const size_t len, bool flush) const
+{
+    LOG_TRC("ProxyHack - send binary msg len " << len);
+    return const_cast<ProxyProtocolHandler *>(this)->sendMessage(data, len, false, flush);
+}
+
+void ProxyProtocolHandler::shutdown(bool goingAway, const std::string &statusMessage)
+{
+    LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
+}
+
+void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
+{
+    sent = recv = 0;
+}
+
+void ProxyProtocolHandler::dumpState(std::ostream& os)
+{
+    os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
+    for (auto it : _writeQueue)
+        Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
+}
+
+void ProxyProtocolHandler::performWrites()
+{
+    if (_msgHandler)
+        _msgHandler->writeQueuedMessages();
+    if (_writeQueue.size() <= 0)
+        return;
+
+    auto sock = popWriteSocket();
+    if (sock)
+    {
+        flushQueueTo(sock);
+        sock->shutdown();
+    }
+}
+
+bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket)
+{
+    // slurp from the core to us.
+    if (_msgHandler && _msgHandler->hasQueuedMessages())
+        _msgHandler->writeQueuedMessages();
+
+    size_t totalSize = 0;
+    for (auto it : _writeQueue)
+        totalSize += it->size();
+
+    if (!totalSize)
+        return false;
+
+    std::ostringstream oss;
+    oss << "HTTP/1.1 200 OK\r\n"
+        "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+        "User-Agent: " WOPI_AGENT_STRING "\r\n"
+        "Content-Length: " << totalSize << "\r\n"
+        "Content-Type: application/json\r\n"
+        "X-Content-Type-Options: nosniff\r\n"
+        "\r\n";
+    socket->send(oss.str());
+
+    for (auto it : _writeQueue)
+        socket->send(it->data(), it->size(), false);
+    _writeQueue.clear();
+
+    return true;
+}
+
+// LRU-ness ...
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+{
+    std::weak_ptr<StreamSocket> sock;
+    while (!_writeSockets.empty())
+    {
+        sock = _writeSockets.front();
+        _writeSockets.erase(_writeSockets.begin());
+        auto realSock = sock.lock();
+        if (realSock)
+            return realSock;
+    }
+    return std::shared_ptr<StreamSocket>();
 }
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index dd109c3a4..7548a7acd 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -9,19 +9,21 @@
 
 #pragma once
 
+#include <memory>
 #include <net/Socket.hpp>
 
-/// Interface for building a websocket from this ...
+/**
+ * Implementation that builds a websocket like protocol from many
+ * individual proxied HTTP requests back to back.
+ *
+ * we use a trivial framing: <hex-length>\r\n<content>\r\n
+ */
 class ProxyProtocolHandler : public ProtocolHandlerInterface
 {
 public:
-    ProxyProtocolHandler()
-    {
-    }
+    ProxyProtocolHandler() { }
 
-    virtual ~ProxyProtocolHandler()
-    {
-    }
+    virtual ~ProxyProtocolHandler() { }
 
     /// Will be called exactly once by setHandler
     void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override
@@ -29,10 +31,7 @@ public:
     }
 
     /// Called after successful socket reads.
-    void handleIncomingMessage(SocketDisposition &/* disposition */) override
-    {
-        assert("we get our data a different way" && false);
-    }
+    void handleIncomingMessage(SocketDisposition &/* disposition */) override;
 
     int getPollEvents(std::chrono::steady_clock::time_point /* now */,
                       int64_t &/* timeoutMaxMs */) override
@@ -45,9 +44,7 @@ public:
     {
     }
 
-    void performWrites() override
-    {
-    }
+    void performWrites() override;
 
     void onDisconnect() override
     {
@@ -58,40 +55,40 @@ public:
     /// Clear all external references
     void dispose() override { _msgHandler.reset(); }
 
-    int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override
-    {
-        LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
-        (void) flush;
-        return len;
-    }
+    int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override;
+    int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override;
+    void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
+    void getIOStats(uint64_t &sent, uint64_t &recv) override;
+    void dumpState(std::ostream& os) override;
 
-    int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
-    {
-        (void) data; (void) flush;
-        LOG_TRC("ProxyHack - send binary msg len " << len);
-        return len;
-    }
-
-    void shutdown(bool goingAway = false, const std::string &statusMessage = "") override
-    {
-        LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
-    }
-
-    void getIOStats(uint64_t &sent, uint64_t &recv) override
-    {
-        sent = recv = 0;
-    }
-
-    void dumpState(std::ostream& os) override
-    {
-        os << "proxy protocol\n";
-    }
+    bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
 
     void handleRequest(const std::string &uriPublic,
                        const std::shared_ptr<Socket> &socket);
 
 private:
-    std::vector<std::weak_ptr<StreamSocket>> _sockets;
+    std::shared_ptr<StreamSocket> popWriteSocket();
+    int sendMessage(const char *msg, const size_t len, bool text, bool flush);
+    bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
+
+    struct Message : public std::vector<char>
+    {
+        Message(const char *msg, const size_t len, bool text)
+        {
+            const char *type = text ? "T" : "B";
+            insert(end(), type, type + 1);
+            std::ostringstream os;
+            os << std::hex << "0x" << len << "\n";
+            std::string str = os.str();
+            insert(end(), str.c_str(), str.c_str() + str.size());
+            insert(end(), msg, msg + len);
+            const char *terminator = "\n";
+            insert(end(), terminator, terminator + 1);
+        }
+    };
+    /// queue things when we have no socket to hand.
+    std::vector<std::shared_ptr<Message>> _writeQueue;
+    std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
 };
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */


More information about the Libreoffice-commits mailing list