[Libreoffice-commits] online.git: Branch 'feature/proxyhack' - loleaflet/js net/Socket.hpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp

Michael Meeks (via logerrit) logerrit at kemper.freedesktop.org
Thu Mar 19 20:16:26 UTC 2020


Rebased ref, commits from common ancestor:
commit 1b3a81e48a020754a8ee840bf0c96058b45ef276
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Thu Mar 19 15:54:28 2020 +0000
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Thu Mar 19 20:16:05 2020 +0000

    Proxy protocol bits.
    
    For now very silly: [T|B] + hex length + \n + content + \n
    
    Change-Id: I256b834a23cca975a705da2c569887665ac6be02

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 74e982873..97d1ca344 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -193,6 +193,25 @@
 		};
 		this.onmessage = function() {
 		};
+		this.parseIncoming = function(msg) {
+			var lines = msg.split('\n');
+			while (lines.length > 0) {
+				var header = lines.shift();
+				var lenStr = header.substring(3 /* [TB]0x */);
+				var len = parseInt(lenStr, 16);
+				var data = lines.shift();
+				if (data === undefined)
+				{
+					if (header !== '')
+						console.debug('Undefined data for header ' + header);
+				}
+				else
+				{
+					// FIXME: in theory use the length - and/or re-assemble if wrong.
+					this.onmessage({ data: data });
+				}
+			}
+		};
 		this.send = function(msg) {
 			console.debug('send msg "' + msg + '"');
 			var req = new XMLHttpRequest();
@@ -205,7 +224,16 @@
 					that.readyState = 1;
 					that.onopen();
 				});
-			req.send(msg);
+			else
+			{
+				req.addEventListener('load', function() {
+					if (this.status == 200)
+						that.parseIncoming(this.response);
+					else
+						console.debug("Error on incoming response");
+				});
+			}
+			req.send('B0x' + msg.length.toString(16) + '\n' + msg + '\n');
 		},
 		this.close = function() {
 			console.debug('close socket');
@@ -218,7 +246,6 @@
 		};
 		console.debug('New proxy socket ' + this.id + ' ' + this.uri);
 
-		// FIXME: perhaps a little risky.
 		this.send('fetchsession');
 		var that = this;
 
@@ -233,13 +260,9 @@
 			req.addEventListener('load', function() {
 				console.debug('read: ' + this.responseText);
 				if (this.status == 200)
-				{
-					that.onmessage({ data: this.response });
-				}
+					that.parseIncoming(this.response);
 				else
-				{
 					console.debug('Handle error ' + this.status);
-				}
 				that.readWaiting = false;
 			});
 			req.open('GET', that.getEndPoint('read'));
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 2290dadd9..be05a5559 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -86,6 +86,10 @@ public:
     {
         _disposition = Type::CLOSED;
     }
+    std::shared_ptr<Socket> getSocket() const
+    {
+        return _socket;
+    }
     bool isMove() { return _disposition == Type::MOVE; }
     bool isClosed() { return _disposition == Type::CLOSED; }
 
@@ -1035,6 +1039,12 @@ public:
         std::vector<std::pair<size_t, size_t>> _spans;
     };
 
+    /// remove all queued input bytes
+    void clearInput()
+    {
+        _inBuffer.clear();
+    }
+
     /// Remove the first @count bytes from input buffer
     void eraseFirstInputBytes(const MessageMap &map)
     {
@@ -1198,6 +1208,8 @@ public:
     /// Does it look like we have some TLS / SSL where we don't expect it ?
     bool sniffSSL() const;
 
+    void dumpState(std::ostream& os) override;
+
 protected:
     /// Override to handle reading of socket data differently.
     virtual int readData(char* buf, int len)
@@ -1221,8 +1233,6 @@ protected:
 #endif
     }
 
-    void dumpState(std::ostream& os) override;
-
     void setShutdownSignalled(bool shutdownSignalled)
     {
         _shutdownSignalled = shutdownSignalled;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index dd84960d1..cb5bf54e3 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -258,7 +258,7 @@ public:
         const Poco::URI& uriPublic,
         const bool isReadOnly,
         const std::string& hostNoTrust,
-        const std::shared_ptr<Socket> &moveSocket);
+        const std::shared_ptr<StreamSocket> &socket);
 
     /// Thread safe termination of this broker if it has a lingering thread
     void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 683197879..69bf3357e 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2855,7 +2855,7 @@ private:
                             {
                                 docBroker->handleProxyRequest(
                                     sessionId, id, uriPublic, isReadOnly,
-                                    hostNoTrust, moveSocket);
+                                    hostNoTrust, streamSocket);
                                 return;
                             }
                             catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 41043a57a..985615699 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,7 @@ void DocumentBroker::handleProxyRequest(
     const Poco::URI& uriPublic,
     const bool isReadOnly,
     const std::string& hostNoTrust,
-    const std::shared_ptr<Socket> &socket)
+    const std::shared_ptr<StreamSocket> &socket)
 {
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
@@ -37,6 +37,22 @@ void DocumentBroker::handleProxyRequest(
         addSession(clientSession);
         LOOLWSD::checkDiskSpaceAndWarnClients(true);
         LOOLWSD::checkSessionLimitsAndWarnClients();
+
+        LOG_TRC("Returning id " << clientSession->getId());
+
+        std::ostringstream oss;
+        oss << "HTTP/1.1 200 OK\r\n"
+            "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+            "User-Agent: " WOPI_AGENT_STRING "\r\n"
+            "Content-Length: " << clientSession->getId().size() << "\r\n"
+            "Content-Type: application/json\r\n"
+            "X-Content-Type-Options: nosniff\r\n"
+            "\r\n"
+            << clientSession->getId();
+
+        socket->send(oss.str());
+        socket->shutdown();
+        return;
     }
     else
     {
@@ -69,13 +85,168 @@ void DocumentBroker::handleProxyRequest(
     proxy->handleRequest(uriPublic.toString(), socket);
 }
 
+bool ProxyProtocolHandler::parseEmitIncoming(
+    const std::shared_ptr<StreamSocket> &socket)
+{
+    std::vector<char> &in = socket->getInBuffer();
+
+    std::stringstream oss;
+    socket->dumpState(oss);
+    LOG_TRC("Parse message:\n" << oss.str());
+
+    while (in.size() > 0)
+    {
+        if (in[0] != 'T' && in[0] != 'B')
+        {
+            LOG_ERR("Invalid message type " << in[0]);
+            return false;
+        }
+        auto it = in.begin() + 1;
+        for (; it != in.end() && *it != '\n'; ++it);
+        *it = '\0';
+        uint64_t len = strtoll( &in[1], nullptr, 16 );
+        in.erase(in.begin(), it + 1);
+        if (len > in.size())
+        {
+            LOG_ERR("Invalid message length " << len << " vs " << in.size());
+            return false;
+        }
+        // far from efficient:
+        std::vector<char> data;
+        data.insert(data.begin(), in.begin(), in.begin() + len + 1);
+        in.erase(in.begin(), in.begin() + len);
+
+        if (in.size() < 1 || in[0] != '\n')
+        {
+            LOG_ERR("Missing final newline");
+            return false;
+        }
+        in.erase(in.begin(), in.begin() + 1);
+
+        _msgHandler->handleMessage(data);
+    }
+    return true;
+}
+
 void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
                                          const std::shared_ptr<Socket> &socket)
 {
+    auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
+
     bool bRead = uriPublic.find("/write") == std::string::npos;
     LOG_INF("Proxy handle request " << uriPublic << " type: " <<
             (bRead ? "read" : "write"));
-    (void)socket;
+
+    if (bRead)
+    {
+        if (!_msgHandler)
+            LOG_WRN("unusual - incoming message with no-one to handle it");
+        else if (!parseEmitIncoming(streamSocket))
+        {
+            std::stringstream oss;
+            streamSocket->dumpState(oss);
+            LOG_ERR("bad socket structure " << oss.str());
+        }
+    }
+
+    if (!flushQueueTo(streamSocket) && !bRead)
+    {
+        // longer running 'write socket'
+        _writeSockets.push_back(streamSocket);
+    }
+    else
+        socket->shutdown();
+}
+
+void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
+{
+    std::stringstream oss;
+    disposition.getSocket()->dumpState(oss);
+    LOG_ERR("If you got here, it means we failed to parse this properly in handleRequest: " << oss.str());
+}
+
+int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush)
+{
+    _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
+    auto sock = popWriteSocket();
+    if (sock && flush)
+    {
+        flushQueueTo(sock);
+        sock->shutdown();
+    }
+
+    return len;
+}
+
+int ProxyProtocolHandler::sendTextMessage(const char *msg, const size_t len, bool flush) const
+{
+    LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
+    return const_cast<ProxyProtocolHandler *>(this)->sendMessage(msg, len, true, flush);
+}
+
+int ProxyProtocolHandler::sendBinaryMessage(const char *data, const size_t len, bool flush) const
+{
+    LOG_TRC("ProxyHack - send binary msg len " << len);
+    return const_cast<ProxyProtocolHandler *>(this)->sendMessage(data, len, false, flush);
+}
+
+void ProxyProtocolHandler::shutdown(bool goingAway, const std::string &statusMessage)
+{
+    LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
+}
+
+void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
+{
+    sent = recv = 0;
+}
+
+void ProxyProtocolHandler::dumpState(std::ostream& os)
+{
+    os << "proxy protocol\n";
+}
+
+bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket)
+{
+    // slurp from the core to us.
+    if (_msgHandler && _msgHandler->hasQueuedMessages())
+        _msgHandler->writeQueuedMessages();
+
+    size_t totalSize = 0;
+    for (auto it : _writeQueue)
+        totalSize += it->size();
+
+    if (!totalSize)
+        return false;
+
+    std::ostringstream oss;
+    oss << "HTTP/1.1 200 OK\r\n"
+        "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+        "User-Agent: " WOPI_AGENT_STRING "\r\n"
+        "Content-Length: " << totalSize << "\r\n"
+        "Content-Type: application/json\r\n"
+        "X-Content-Type-Options: nosniff\r\n"
+        "\r\n";
+    socket->send(oss.str());
+
+    for (auto it : _writeQueue)
+        socket->send(it->data(), it->size(), false);
+
+    return true;
+}
+
+// LRU-ness ...
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+{
+    std::weak_ptr<StreamSocket> sock;
+    while (!_writeSockets.empty())
+    {
+        sock = _writeSockets.front();
+        _writeSockets.erase(_writeSockets.begin());
+        auto realSock = sock.lock();
+        if (realSock)
+            return realSock;
+    }
+    return std::shared_ptr<StreamSocket>();
 }
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 1f88e1fa7..aade8b416 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -10,19 +10,21 @@
 #ifndef INCLUDED_PROXY_PROTOCOL_HPP
 #define INCLUDED_PROXY_PROTOCOL_HPP
 
+#include <memory>
 #include <net/Socket.hpp>
 
-/// Interface for building a websocket from this ...
+/**
+ * Implementation that builds a websocket like protocol from many
+ * individual proxied HTTP requests back to back.
+ *
+ * we use a trivial framing: <hex-length>\r\n<content>\r\n
+ */
 class ProxyProtocolHandler : public ProtocolHandlerInterface
 {
 public:
-    ProxyProtocolHandler()
-    {
-    }
+    ProxyProtocolHandler() { }
 
-    virtual ~ProxyProtocolHandler()
-    {
-    }
+    virtual ~ProxyProtocolHandler() { }
 
     /// Will be called exactly once by setHandler
     void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override
@@ -30,10 +32,7 @@ public:
     }
 
     /// Called after successful socket reads.
-    void handleIncomingMessage(SocketDisposition &/* disposition */) override
-    {
-        assert("we get our data a different way" && false);
-    }
+    void handleIncomingMessage(SocketDisposition &/* disposition */) override;
 
     int getPollEvents(std::chrono::steady_clock::time_point /* now */,
                       int &/* timeoutMaxMs */) override
@@ -59,40 +58,40 @@ public:
     /// Clear all external references
     virtual void dispose() { _msgHandler.reset(); }
 
-    int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override
-    {
-        LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
-        (void) flush;
-        return len;
-    }
+    int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override;
+    int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override;
+    void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
+    void getIOStats(uint64_t &sent, uint64_t &recv) override;
+    void dumpState(std::ostream& os);
 
-    int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
-    {
-        (void) data; (void) flush;
-        LOG_TRC("ProxyHack - send binary msg len " << len);
-        return len;
-    }
-
-    void shutdown(bool goingAway = false, const std::string &statusMessage = "") override
-    {
-        LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
-    }
-
-    void getIOStats(uint64_t &sent, uint64_t &recv) override
-    {
-        sent = recv = 0;
-    }
-
-    void dumpState(std::ostream& os)
-    {
-        os << "proxy protocol\n";
-    }
+    bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
 
     void handleRequest(const std::string &uriPublic,
                        const std::shared_ptr<Socket> &socket);
 
 private:
-    std::vector<std::weak_ptr<StreamSocket>> _sockets;
+    std::shared_ptr<StreamSocket> popWriteSocket();
+    int sendMessage(const char *msg, const size_t len, bool text, bool flush);
+    bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
+
+    struct Message : public std::vector<char>
+    {
+        Message(const char *msg, const size_t len, bool text)
+        {
+            const char *type = text ? "T" : "B";
+            insert(end(), type, type + 1);
+            std::ostringstream os;
+            os << std::hex << "0x" << len << "\n";
+            std::string str = os.str();
+            insert(end(), str.c_str(), str.c_str() + str.size());
+            insert(end(), msg, msg + len);
+            const char *terminator = "\n";
+            insert(end(), terminator, terminator + 1);
+        }
+    };
+    /// queue things when we have no socket to hand.
+    std::vector<std::shared_ptr<Message>> _writeQueue;
+    std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
 };
 
 #endif


More information about the Libreoffice-commits mailing list