[Libreoffice-commits] online.git: Branch 'feature/proxyhack' - loleaflet/js net/Socket.hpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp wsd/ProxyProtocol.cpp wsd/ProxyProtocol.hpp
Michael Meeks (via logerrit)
logerrit at kemper.freedesktop.org
Thu Mar 19 20:16:26 UTC 2020
Rebased ref, commits from common ancestor:
commit 1b3a81e48a020754a8ee840bf0c96058b45ef276
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Thu Mar 19 15:54:28 2020 +0000
Commit: Michael Meeks <michael.meeks at collabora.com>
CommitDate: Thu Mar 19 20:16:05 2020 +0000
Proxy protocol bits.
For now very silly: [T|B] + hex length + \n + content + \n
Change-Id: I256b834a23cca975a705da2c569887665ac6be02
diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 74e982873..97d1ca344 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -193,6 +193,25 @@
};
this.onmessage = function() {
};
+ this.parseIncoming = function(msg) {
+ var lines = msg.split('\n');
+ while (lines.length > 0) {
+ var header = lines.shift();
+ var lenStr = header.substring(3 /* [TB]0x */);
+ var len = parseInt(lenStr, 16);
+ var data = lines.shift();
+ if (data === undefined)
+ {
+ if (header !== '')
+ console.debug('Undefined data for header ' + header);
+ }
+ else
+ {
+ // FIXME: in theory use the length - and/or re-assemble if wrong.
+ this.onmessage({ data: data });
+ }
+ }
+ };
this.send = function(msg) {
console.debug('send msg "' + msg + '"');
var req = new XMLHttpRequest();
@@ -205,7 +224,16 @@
that.readyState = 1;
that.onopen();
});
- req.send(msg);
+ else
+ {
+ req.addEventListener('load', function() {
+ if (this.status == 200)
+ that.parseIncoming(this.response);
+ else
+ console.debug("Error on incoming response");
+ });
+ }
+ req.send('B0x' + msg.length.toString(16) + '\n' + msg + '\n');
},
this.close = function() {
console.debug('close socket');
@@ -218,7 +246,6 @@
};
console.debug('New proxy socket ' + this.id + ' ' + this.uri);
- // FIXME: perhaps a little risky.
this.send('fetchsession');
var that = this;
@@ -233,13 +260,9 @@
req.addEventListener('load', function() {
console.debug('read: ' + this.responseText);
if (this.status == 200)
- {
- that.onmessage({ data: this.response });
- }
+ that.parseIncoming(this.response);
else
- {
console.debug('Handle error ' + this.status);
- }
that.readWaiting = false;
});
req.open('GET', that.getEndPoint('read'));
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 2290dadd9..be05a5559 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -86,6 +86,10 @@ public:
{
_disposition = Type::CLOSED;
}
+ std::shared_ptr<Socket> getSocket() const
+ {
+ return _socket;
+ }
bool isMove() { return _disposition == Type::MOVE; }
bool isClosed() { return _disposition == Type::CLOSED; }
@@ -1035,6 +1039,12 @@ public:
std::vector<std::pair<size_t, size_t>> _spans;
};
+ /// remove all queued input bytes
+ void clearInput()
+ {
+ _inBuffer.clear();
+ }
+
/// Remove the first @count bytes from input buffer
void eraseFirstInputBytes(const MessageMap &map)
{
@@ -1198,6 +1208,8 @@ public:
/// Does it look like we have some TLS / SSL where we don't expect it ?
bool sniffSSL() const;
+ void dumpState(std::ostream& os) override;
+
protected:
/// Override to handle reading of socket data differently.
virtual int readData(char* buf, int len)
@@ -1221,8 +1233,6 @@ protected:
#endif
}
- void dumpState(std::ostream& os) override;
-
void setShutdownSignalled(bool shutdownSignalled)
{
_shutdownSignalled = shutdownSignalled;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index dd84960d1..cb5bf54e3 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -258,7 +258,7 @@ public:
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<Socket> &moveSocket);
+ const std::shared_ptr<StreamSocket> &socket);
/// Thread safe termination of this broker if it has a lingering thread
void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 683197879..69bf3357e 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2855,7 +2855,7 @@ private:
{
docBroker->handleProxyRequest(
sessionId, id, uriPublic, isReadOnly,
- hostNoTrust, moveSocket);
+ hostNoTrust, streamSocket);
return;
}
catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 41043a57a..985615699 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,7 @@ void DocumentBroker::handleProxyRequest(
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
- const std::shared_ptr<Socket> &socket)
+ const std::shared_ptr<StreamSocket> &socket)
{
std::shared_ptr<ClientSession> clientSession;
if (sessionId == "fetchsession")
@@ -37,6 +37,22 @@ void DocumentBroker::handleProxyRequest(
addSession(clientSession);
LOOLWSD::checkDiskSpaceAndWarnClients(true);
LOOLWSD::checkSessionLimitsAndWarnClients();
+
+ LOG_TRC("Returning id " << clientSession->getId());
+
+ std::ostringstream oss;
+ oss << "HTTP/1.1 200 OK\r\n"
+ "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+ "User-Agent: " WOPI_AGENT_STRING "\r\n"
+ "Content-Length: " << clientSession->getId().size() << "\r\n"
+ "Content-Type: application/json\r\n"
+ "X-Content-Type-Options: nosniff\r\n"
+ "\r\n"
+ << clientSession->getId();
+
+ socket->send(oss.str());
+ socket->shutdown();
+ return;
}
else
{
@@ -69,13 +85,168 @@ void DocumentBroker::handleProxyRequest(
proxy->handleRequest(uriPublic.toString(), socket);
}
+bool ProxyProtocolHandler::parseEmitIncoming(
+ const std::shared_ptr<StreamSocket> &socket)
+{
+ std::vector<char> &in = socket->getInBuffer();
+
+ std::stringstream oss;
+ socket->dumpState(oss);
+ LOG_TRC("Parse message:\n" << oss.str());
+
+ while (in.size() > 0)
+ {
+ if (in[0] != 'T' && in[0] != 'B')
+ {
+ LOG_ERR("Invalid message type " << in[0]);
+ return false;
+ }
+ auto it = in.begin() + 1;
+ for (; it != in.end() && *it != '\n'; ++it);
+ *it = '\0';
+ uint64_t len = strtoll( &in[1], nullptr, 16 );
+ in.erase(in.begin(), it + 1);
+ if (len > in.size())
+ {
+ LOG_ERR("Invalid message length " << len << " vs " << in.size());
+ return false;
+ }
+ // far from efficient:
+ std::vector<char> data;
+ data.insert(data.begin(), in.begin(), in.begin() + len + 1);
+ in.erase(in.begin(), in.begin() + len);
+
+ if (in.size() < 1 || in[0] != '\n')
+ {
+ LOG_ERR("Missing final newline");
+ return false;
+ }
+ in.erase(in.begin(), in.begin() + 1);
+
+ _msgHandler->handleMessage(data);
+ }
+ return true;
+}
+
void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
const std::shared_ptr<Socket> &socket)
{
+ auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
+
bool bRead = uriPublic.find("/write") == std::string::npos;
LOG_INF("Proxy handle request " << uriPublic << " type: " <<
(bRead ? "read" : "write"));
- (void)socket;
+
+ if (bRead)
+ {
+ if (!_msgHandler)
+ LOG_WRN("unusual - incoming message with no-one to handle it");
+ else if (!parseEmitIncoming(streamSocket))
+ {
+ std::stringstream oss;
+ streamSocket->dumpState(oss);
+ LOG_ERR("bad socket structure " << oss.str());
+ }
+ }
+
+ if (!flushQueueTo(streamSocket) && !bRead)
+ {
+ // longer running 'write socket'
+ _writeSockets.push_back(streamSocket);
+ }
+ else
+ socket->shutdown();
+}
+
+void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition)
+{
+ std::stringstream oss;
+ disposition.getSocket()->dumpState(oss);
+ LOG_ERR("If you got here, it means we failed to parse this properly in handleRequest: " << oss.str());
+}
+
+int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush)
+{
+ _writeQueue.push_back(std::make_shared<Message>(msg, len, text));
+ auto sock = popWriteSocket();
+ if (sock && flush)
+ {
+ flushQueueTo(sock);
+ sock->shutdown();
+ }
+
+ return len;
+}
+
+int ProxyProtocolHandler::sendTextMessage(const char *msg, const size_t len, bool flush) const
+{
+ LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
+ return const_cast<ProxyProtocolHandler *>(this)->sendMessage(msg, len, true, flush);
+}
+
+int ProxyProtocolHandler::sendBinaryMessage(const char *data, const size_t len, bool flush) const
+{
+ LOG_TRC("ProxyHack - send binary msg len " << len);
+ return const_cast<ProxyProtocolHandler *>(this)->sendMessage(data, len, false, flush);
+}
+
+void ProxyProtocolHandler::shutdown(bool goingAway, const std::string &statusMessage)
+{
+ LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
+}
+
+void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv)
+{
+ sent = recv = 0;
+}
+
+void ProxyProtocolHandler::dumpState(std::ostream& os)
+{
+ os << "proxy protocol\n";
+}
+
+bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket)
+{
+ // slurp from the core to us.
+ if (_msgHandler && _msgHandler->hasQueuedMessages())
+ _msgHandler->writeQueuedMessages();
+
+ size_t totalSize = 0;
+ for (auto it : _writeQueue)
+ totalSize += it->size();
+
+ if (!totalSize)
+ return false;
+
+ std::ostringstream oss;
+ oss << "HTTP/1.1 200 OK\r\n"
+ "Last-Modified: " << Util::getHttpTimeNow() << "\r\n"
+ "User-Agent: " WOPI_AGENT_STRING "\r\n"
+ "Content-Length: " << totalSize << "\r\n"
+ "Content-Type: application/json\r\n"
+ "X-Content-Type-Options: nosniff\r\n"
+ "\r\n";
+ socket->send(oss.str());
+
+ for (auto it : _writeQueue)
+ socket->send(it->data(), it->size(), false);
+
+ return true;
+}
+
+// LRU-ness ...
+std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
+{
+ std::weak_ptr<StreamSocket> sock;
+ while (!_writeSockets.empty())
+ {
+ sock = _writeSockets.front();
+ _writeSockets.erase(_writeSockets.begin());
+ auto realSock = sock.lock();
+ if (realSock)
+ return realSock;
+ }
+ return std::shared_ptr<StreamSocket>();
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 1f88e1fa7..aade8b416 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -10,19 +10,21 @@
#ifndef INCLUDED_PROXY_PROTOCOL_HPP
#define INCLUDED_PROXY_PROTOCOL_HPP
+#include <memory>
#include <net/Socket.hpp>
-/// Interface for building a websocket from this ...
+/**
+ * Implementation that builds a websocket like protocol from many
+ * individual proxied HTTP requests back to back.
+ *
+ * we use a trivial framing: <hex-length>\r\n<content>\r\n
+ */
class ProxyProtocolHandler : public ProtocolHandlerInterface
{
public:
- ProxyProtocolHandler()
- {
- }
+ ProxyProtocolHandler() { }
- virtual ~ProxyProtocolHandler()
- {
- }
+ virtual ~ProxyProtocolHandler() { }
/// Will be called exactly once by setHandler
void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override
@@ -30,10 +32,7 @@ public:
}
/// Called after successful socket reads.
- void handleIncomingMessage(SocketDisposition &/* disposition */) override
- {
- assert("we get our data a different way" && false);
- }
+ void handleIncomingMessage(SocketDisposition &/* disposition */) override;
int getPollEvents(std::chrono::steady_clock::time_point /* now */,
int &/* timeoutMaxMs */) override
@@ -59,40 +58,40 @@ public:
/// Clear all external references
virtual void dispose() { _msgHandler.reset(); }
- int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override
- {
- LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
- (void) flush;
- return len;
- }
+ int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override;
+ int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override;
+ void shutdown(bool goingAway = false, const std::string &statusMessage = "") override;
+ void getIOStats(uint64_t &sent, uint64_t &recv) override;
+ void dumpState(std::ostream& os);
- int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
- {
- (void) data; (void) flush;
- LOG_TRC("ProxyHack - send binary msg len " << len);
- return len;
- }
-
- void shutdown(bool goingAway = false, const std::string &statusMessage = "") override
- {
- LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
- }
-
- void getIOStats(uint64_t &sent, uint64_t &recv) override
- {
- sent = recv = 0;
- }
-
- void dumpState(std::ostream& os)
- {
- os << "proxy protocol\n";
- }
+ bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
void handleRequest(const std::string &uriPublic,
const std::shared_ptr<Socket> &socket);
private:
- std::vector<std::weak_ptr<StreamSocket>> _sockets;
+ std::shared_ptr<StreamSocket> popWriteSocket();
+ int sendMessage(const char *msg, const size_t len, bool text, bool flush);
+ bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket);
+
+ struct Message : public std::vector<char>
+ {
+ Message(const char *msg, const size_t len, bool text)
+ {
+ const char *type = text ? "T" : "B";
+ insert(end(), type, type + 1);
+ std::ostringstream os;
+ os << std::hex << "0x" << len << "\n";
+ std::string str = os.str();
+ insert(end(), str.c_str(), str.c_str() + str.size());
+ insert(end(), msg, msg + len);
+ const char *terminator = "\n";
+ insert(end(), terminator, terminator + 1);
+ }
+ };
+ /// queue things when we have no socket to hand.
+ std::vector<std::shared_ptr<Message>> _writeQueue;
+ std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
};
#endif
More information about the Libreoffice-commits
mailing list