[Libreoffice-commits] online.git: loleaflet/js net/Socket.hpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp
Michael Meeks (via logerrit)
logerrit at kemper.freedesktop.org
Fri Apr 24 11:58:07 UTC 2020
loleaflet/js/global.js | 94 +++++++++++++++++++++--
net/Socket.hpp | 14 +++
wsd/DocumentBroker.hpp | 2
wsd/LOOLWSD.cpp | 2
wsd/ProxyProtocol.cpp | 193 ++++++++++++++++++++++++++++++++++++++++++++++++-
wsd/ProxyProtocol.hpp | 81 +++++++++-----------
6 files changed, 328 insertions(+), 58 deletions(-)
New commits:
commit fdc062b488afaeed5eaf061e279fff73623bccf3
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Thu Mar 19 15:54:28 2020 +0000
Commit: Jan Holesovsky <kendy at collabora.com>
CommitDate: Fri Apr 24 13:57:49 2020 +0200
Proxy protocol bits.
For now very silly: [T|B] + hex length + \n + content + \n
Change-Id: I256b834a23cca975a705da2c569887665ac6be02
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92806
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
Reviewed-by: Jan Holesovsky <kendy at collabora.com>
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 477c66a5d..eba6852b8 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -1,4 +1,5 @@
/* -*- js-indent-level: 8 -*- */
+/* global Uint8Array */
(function (global) {
var ua = navigator.userAgent.toLowerCase(),
@@ -212,19 +213,97 @@
};
this.onmessage = function() {
};
+ this.parseIncomingArray = function(arr) {
+ var decoder = new TextDecoder();
+ for (var i = 0; i < arr.length; ++i)
+ {
+ var left = arr.length - i;
+ if (left < 4)
+ {
+ console.debug('no data left');
+ break;
+ }
+ var type = String.fromCharCode(arr[i+0]);
+ if (type != 'T' && type != 'B')
+ {
+ console.debug('wrong data type: ' + type);
+ break;
+ }
+ if (arr[i+1] !== 48 && arr[i+2] !== 120) // '0x'
+ {
+ console.debug('missing hex preamble');
+ break;
+ }
+ i += 3;
+ var numStr = '';
+ var start = i;
+ while (arr[i] != 10) // '\n'
+ i++;
+ numStr = decoder.decode(arr.slice(start, i)); // FIXME: IE11
+ var size = parseInt(numStr, 16);
+
+ i++; // skip \n
+
+ var data;
+ if (type == 'T') // FIXME: IE11
+ data = decoder.decode(arr.slice(i, i + size));
+ else
+ data = arr.slice(i, i + size);
+
+ this.onmessage({ data: data });
+
+ i += size; // skip trailing '\n' in loop-increment
+ }
+ };
+ this.parseIncoming = function(type, msg) {
+ if (type === 'blob')
+ {
+ var fileReader = new FileReader();
+ var that = this;
+ fileReader.onload = function(event) {
+ that.parseIncomingArray(event.target.result);
+ };
+ fileReader.readAsArrayBuffer(msg);
+ }
+ else if (type === 'arraybuffer')
+ {
+ this.parseIncomingArray(new Uint8Array(msg));
+ }
+ else if (type === 'text' || type === '')
+ {
+ const encoder = new TextEncoder();
+ const arr = encoder.encode(msg);
+ this.parseIncomingArray(arr);
+ }
+ else
+ console.debug('Unknown encoding type: ' + type);
+ };
this.send = function(msg) {
console.debug('send msg "' + msg + '"');
var req = new XMLHttpRequest();
req.open('POST', this.getEndPoint('write'));
req.setRequestHeader('SessionId', this.sessionId);
if (this.sessionId === 'fetchsession')
+ {
+ req.responseType = 'text';
req.addEventListener('load', function() {
console.debug('got session: ' + this.responseText);
that.sessionId = this.responseText;
that.readyState = 1;
that.onopen();
});
- req.send(msg);
+ }
+ else
+ {
+ req.responseType = 'arraybuffer';
+ req.addEventListener('load', function() {
+ if (this.status == 200)
+ that.parseIncoming(this.responseType, this.response);
+ else
+ console.debug('Error on incoming response');
+ });
+ }
+ req.send('B0x' + msg.length.toString(16) + '\n' + msg + '\n');
},
this.close = function() {
console.debug('close socket');
@@ -237,7 +316,6 @@
};
console.debug('New proxy socket ' + this.id + ' ' + this.uri);
- // FIXME: perhaps a little risky.
this.send('fetchsession');
var that = this;
@@ -250,20 +328,16 @@
var req = new XMLHttpRequest();
// fetch session id:
req.addEventListener('load', function() {
- console.debug('read: ' + this.responseText);
if (this.status == 200)
- {
- that.onmessage({ data: this.response });
- }
+ that.parseIncoming(this.responseType, this.response);
else
- {
console.debug('Handle error ' + this.status);
- }
that.readWaiting = false;
});
req.open('GET', that.getEndPoint('read'));
- req.setRequestHeader('SessionId', this.sessionId);
- req.send(that.sessionId);
+ req.setRequestHeader('SessionId', that.sessionId);
+ req.responseType = 'arraybuffer';
+ req.send('');
that.readWaiting = true;
}, 250);
};
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 7ae042b86..2c94601ee 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -85,6 +85,10 @@ public:
{
_disposition = Type::CLOSED;
}
+ std::shared_ptr<Socket> getSocket() const
+ {
+ return _socket;
+ }
bool isMove() { return _disposition == Type::MOVE; }
bool isClosed() { return _disposition == Type::CLOSED; }
@@ -923,6 +927,12 @@ public:
std::vector<std::pair<size_t, size_t>> _spans;
};
+ /// remove all queued input bytes
+ void clearInput()
+ {
+ _inBuffer.clear();
+ }
+
/// Remove the first @count bytes from input buffer
void eraseFirstInputBytes(const MessageMap &map)
{
@@ -1086,6 +1096,8 @@ public:
/// Does it look like we have some TLS / SSL where we don't expect it ?
bool sniffSSL() const;
+ void dumpState(std::ostream& os) override;
+
protected:
/// Override to handle reading of socket data differently.
virtual int readData(char* buf, int len)
@@ -1109,8 +1121,6 @@ protected:
#endif
}
- void dumpState(std::ostream& os) override;
-
void setShutdownSignalled()
{
_shutdownSignalled = true;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 7d7b5e67d..4e854d3fb 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -147,7 +147,7 @@ public:
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<Socket> &moveSocket);
+ const std::shared_ptr<StreamSocket> &socket);
/// Thread safe termination of this broker if it has a lingering thread
void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 7aaa79e0d..c58430509 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2938,7 +2938,7 @@ private:
{
docBroker->handleProxyRequest(
sessionId, id, uriPublic, isReadOnly,
- hostNoTrust, moveSocket);
+ hostNoTrust, streamSocket);
return;
}
catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 41043a57a..8aaff0131 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,7 @@ void DocumentBroker::handleProxyRequest(
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<Socket> &socket)
+ const std::shared_ptr<StreamSocket> &socket)
{
std::shared_ptr<ClientSession> clientSession;
if (sessionId == "fetchsession")
@@ -37,6 +37,22 @@ void DocumentBroker::handleProxyRequest(
addSession(clientSession);
LOOLWSD::checkDiskSpaceAndWarnClients(true);
LOOLWSD::checkSessionLimitsAndWarnClients();
+
+ LOG_TRC("Returning id " << clientSession->getId());
+
+ std::ostringstream oss;
+ oss << "HTTP/1.1 200 OK\r\n"
+ "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+ "User-Agent: " WOPI_AGENT_STRING "\r\n"
+ "Content-Length: " << clientSession->getId().size() << "\r\n"
+ "Content-Type: application/json\r\n"
+ "X-Content-Type-Options: nosniff\r\n"
+ "\r\n"
+ << clientSession->getId();
+
+ socket->send(oss.str());
+ socket->shutdown();
+ return;
}
else
{
@@ -69,13 +85,186 @@ void DocumentBroker::handleProxyRequest(
proxy->handleRequest(uriPublic.toString(), socket);
}
+bool ProxyProtocolHandler::parseEmitIncoming(
+ const std::shared_ptr<StreamSocket> &socket)
+{
+ std::vector<char> &in = socket->getInBuffer();
+
+ std::stringstream oss;
+ socket->dumpState(oss);
+ LOG_TRC("Parse message:\n" << oss.str());
+
+ while (in.size() > 0)
+ {
+ if (in[0] != 'T' && in[0] != 'B')
+ {
+ LOG_ERR("Invalid message type " << in[0]);
+ return false;
+ }
+ auto it = in.begin() + 1;
+ for (; it != in.end() && *it != '\n'; ++it);
+ *it = '\0';
+ uint64_t len = strtoll( &in[1], nullptr, 16 );
+ in.erase(in.begin(), it + 1);
+ if (len > in.size())
+ {
+ LOG_ERR("Invalid message length " << len << " vs " << in.size());
+ return false;
+ }
+ // far from efficient:
+ std::vector<char> data;
+ data.insert(data.begin(), in.begin(), in.begin() + len + 1);
+ in.erase(in.begin(), in.begin() + len);
+
+ if (in.size() < 1 || in[0] != '\n')
+ {
+ LOG_ERR("Missing final newline");
+ return false;
+ }
+ in.erase(in.begin(), in.begin() + 1);
+
+ _msgHandler->handleMessage(data);
+ }
+ return true;
+}
+
void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
const std::shared_ptr<Socket> &socket)
{
+ auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
+
bool bRead = uriPublic.find("/write") == std::string::npos;
LOG_INF("Proxy handle request " << uriPublic << " type: " <<
(bRead ? "read" : "write"));
- (void)socket;
+
+ if (bRead)
+ {
+ if (!_msgHandler)
+ LOG_WRN("unusual - incoming message with no-one to handle it");
+ else if (!parseEmitIncoming(streamSocket))
+ {
+ std::stringstream oss;
+ streamSocket->dumpState(oss);
+ LOG_ERR("bad socket structure " << oss.str());
+ }
+ }
+
+ if (!flushQueueTo(streamSocket) && !bRead)
+ {
+ // longer running 'write socket'
+ _writeSockets.push_back(streamSocket);
+ }
+ else
+ socket->shutdown();
+}
+
+void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
+{
+ std::stringstream oss;
+ disposition.getSocket()->dumpState(oss);
+ LOG_ERR("If you got here, it means we failed to parse this properly in handleRequest: " << oss.str());
+}
+
+int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush)
+{
+ _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
+ auto sock = popWriteSocket();
+ if (sock && flush)
+ {
+ flushQueueTo(sock);
+ sock->shutdown();
+ }
+
+ return len;
+}
+
+int ProxyProtocolHandler::sendTextMessage(const char *msg, const size_t len, bool flush) const
+{
+ LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
+ return const_cast<ProxyProtocolHandler *>(this)->sendMessage(msg, len, true, flush);
+}
+
+int ProxyProtocolHandler::sendBinaryMessage(const char *data, const size_t len, bool flush) const
+{
+ LOG_TRC("ProxyHack - send binary msg len " << len);
+ return const_cast<ProxyProtocolHandler *>(this)->sendMessage(data, len, false, flush);
+}
+
+void ProxyProtocolHandler::shutdown(bool goingAway, const std::string &statusMessage)
+{
+ LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
+}
+
+void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
+{
+ sent = recv = 0;
+}
+
+void ProxyProtocolHandler::dumpState(std::ostream& os)
+{
+ os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n";
+ for (auto it : _writeQueue)
+ Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
+}
+
+void ProxyProtocolHandler::performWrites()
+{
+ if (_msgHandler)
+ _msgHandler->writeQueuedMessages();
+ if (_writeQueue.size() <= 0)
+ return;
+
+ auto sock = popWriteSocket();
+ if (sock)
+ {
+ flushQueueTo(sock);
+ sock->shutdown();
+ }
+}
+
+bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket)
+{
+ // slurp from the core to us.
+ if (_msgHandler && _msgHandler->hasQueuedMessages())
+ _msgHandler->writeQueuedMessages();
+
+ size_t totalSize = 0;
+ for (auto it : _writeQueue)
+ totalSize += it->size();
+
+ if (!totalSize)
+ return false;
+
+ std::ostringstream oss;
+ oss << "HTTP/1.1 200 OK\r\n"
+ "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+ "User-Agent: " WOPI_AGENT_STRING "\r\n"
+ "Content-Length: " << totalSize << "\r\n"
+ "Content-Type: application/json\r\n"
+ "X-Content-Type-Options: nosniff\r\n"
+ "\r\n";
+ socket->send(oss.str());
+
+ for (auto it : _writeQueue)
+ socket->send(it->data(), it->size(), false);
+ _writeQueue.clear();
+
+ return true;
+}
+
+// LRU-ness ...
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+{
+ std::weak_ptr<StreamSocket> sock;
+ while (!_writeSockets.empty())
+ {
+ sock = _writeSockets.front();
+ _writeSockets.erase(_writeSockets.begin());
+ auto realSock = sock.lock();
+ if (realSock)
+ return realSock;
+ }
+ return std::shared_ptr<StreamSocket>();
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index dd109c3a4..7548a7acd 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -9,19 +9,21 @@
#pragma once
+#include <memory>
#include <net/Socket.hpp>
-/// Interface for building a websocket from this ...
+/**
+ * Implementation that builds a websocket like protocol from many
+ * individual proxied HTTP requests back to back.
+ *
+ * we use a trivial framing: <hex-length>\r\n<content>\r\n
+ */
class ProxyProtocolHandler : public ProtocolHandlerInterface
{
public:
- ProxyProtocolHandler()
- {
- }
+ ProxyProtocolHandler() { }
- virtual ~ProxyProtocolHandler()
- {
- }
+ virtual ~ProxyProtocolHandler() { }
/// Will be called exactly once by setHandler
void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override
@@ -29,10 +31,7 @@ public:
}
/// Called after successful socket reads.
- void handleIncomingMessage(SocketDisposition &/* disposition */) override
- {
- assert("we get our data a different way" && false);
- }
+ void handleIncomingMessage(SocketDisposition &/* disposition */) override;
int getPollEvents(std::chrono::steady_clock::time_point /* now */,
int64_t &/* timeoutMaxMs */) override
@@ -45,9 +44,7 @@ public:
{
}
- void performWrites() override
- {
- }
+ void performWrites() override;
void onDisconnect() override
{
@@ -58,40 +55,40 @@ public:
/// Clear all external references
void dispose() override { _msgHandler.reset(); }
- int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override
- {
- LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
- (void) flush;
- return len;
- }
+ int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override;
+ int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override;
+ void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
+ void getIOStats(uint64_t &sent, uint64_t &recv) override;
+ void dumpState(std::ostream& os) override;
- int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
- {
- (void) data; (void) flush;
- LOG_TRC("ProxyHack - send binary msg len " << len);
- return len;
- }
-
- void shutdown(bool goingAway = false, const std::string &statusMessage = "") override
- {
- LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
- }
-
- void getIOStats(uint64_t &sent, uint64_t &recv) override
- {
- sent = recv = 0;
- }
-
- void dumpState(std::ostream& os) override
- {
- os << "proxy protocol\n";
- }
+ bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
void handleRequest(const std::string &uriPublic,
const std::shared_ptr<Socket> &socket);
private:
- std::vector<std::weak_ptr<StreamSocket>> _sockets;
+ std::shared_ptr<StreamSocket> popWriteSocket();
+ int sendMessage(const char *msg, const size_t len, bool text, bool flush);
+ bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
+
+ struct Message : public std::vector<char>
+ {
+ Message(const char *msg, const size_t len, bool text)
+ {
+ const char *type = text ? "T" : "B";
+ insert(end(), type, type + 1);
+ std::ostringstream os;
+ os << std::hex << "0x" << len << "\n";
+ std::string str = os.str();
+ insert(end(), str.c_str(), str.c_str() + str.size());
+ insert(end(), msg, msg + len);
+ const char *terminator = "\n";
+ insert(end(), terminator, terminator + 1);
+ }
+ };
+ /// queue things when we have no socket to hand.
+ std::vector<std::shared_ptr<Message>> _writeQueue;
+ std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
};
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
More information about the Libreoffice-commits
mailing list