[Libreoffice-commits] online.git: net/Socket.hpp net/WebSocketHandler.hpp wsd/Admin.cpp
Michael Meeks
michael.meeks at collabora.com
Fri Mar 17 22:59:51 UTC 2017
net/Socket.hpp | 9 +++++++--
net/WebSocketHandler.hpp | 45 +++++++++++++++++++++++++++++++++++++++------
wsd/Admin.cpp | 4 ++--
3 files changed, 48 insertions(+), 10 deletions(-)
New commits:
commit a6a4094e52c3a7fafac4a617ca3956dc6228eb29
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Fri Mar 17 22:59:03 2017 +0000
Send ping message, handle pong & store ping-time on the Websocket.
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 268994a7..37b9478c 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -304,7 +304,7 @@ public:
int rc;
do
{
- rc = ::poll(&_pollFds[0], size + 1, timeoutMaxMs);
+ rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
}
while (rc < 0 && errno == EINTR);
LOG_TRC("Poll completed with " << rc << " live polls max (" << timeoutMaxMs << "ms)"
@@ -524,6 +524,9 @@ public:
virtual int getPollEvents(std::chrono::steady_clock::time_point now,
int &timeoutMaxMs) = 0;
+ /// Do we need to handle a timeout ?
+ virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {}
+
/// Do some of the queued writing.
virtual void performWrites() = 0;
@@ -673,11 +676,13 @@ protected:
/// Called when a polling event is received.
/// @events is the mask of events that triggered the wake.
- HandleResult handlePoll(std::chrono::steady_clock::time_point /* now */,
+ HandleResult handlePoll(std::chrono::steady_clock::time_point now,
const int events) override
{
assert(isCorrectThread());
+ _socketHandler->checkTimeout(now);
+
if (!events)
return Socket::HandleResult::CONTINUE;
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 50c5d0e9..bb6af23b 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -23,6 +23,10 @@ protected:
// The socket that owns us (we can't own it).
std::weak_ptr<StreamSocket> _socket;
+ const int PingFrequencyMs = 18 * 1000;
+ std::chrono::steady_clock::time_point _pingSent;
+ int _pingTimeUs;
+
std::vector<char> _wsPayload;
bool _shuttingDown;
enum class WSState { HTTP, WS } _wsState;
@@ -35,6 +39,8 @@ protected:
public:
WebSocketHandler() :
+ _pingSent(std::chrono::steady_clock::now()),
+ _pingTimeUs(0),
_shuttingDown(false),
_wsState(WSState::HTTP)
{
@@ -44,6 +50,8 @@ public:
WebSocketHandler(const std::weak_ptr<StreamSocket>& socket,
const Poco::Net::HTTPRequest& request) :
_socket(socket),
+ _pingSent(std::chrono::steady_clock::now()),
+ _pingTimeUs(0),
_shuttingDown(false),
_wsState(WSState::HTTP)
{
@@ -187,8 +195,16 @@ public:
// FIXME: fin, aggregating payloads into _wsPayload etc.
LOG_TRC("#" << socket->getFD() << ": Incoming WebSocket message code " << code << " fin? " << fin << " payload length " << _wsPayload.size());
- if (code & WSOpCode::Close)
+ switch (code)
{
+ case WSOpCode::Pong:
+ _pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _pingSent).count();
+ LOG_TRC("Pong received: " << _pingTimeUs << " microseconds");
+ break;
+ case WSOpCode::Ping:
+ LOG_ERR("Clients should not send pings, only servers");
+ // drop through
+ case WSOpCode::Close:
if (!_shuttingDown)
{
// Peer-initiated shutdown must be echoed.
@@ -207,10 +223,10 @@ public:
// TCP Close.
socket->shutdown();
- }
- else
- {
+ break;
+ default:
handleMessage(fin, code, _wsPayload);
+ break;
}
_wsPayload.clear();
@@ -225,12 +241,29 @@ public:
; // can have multiple msgs in one recv'd packet.
}
- int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int & /* timeoutMaxMs */) override
+ int getPollEvents(std::chrono::steady_clock::time_point now,
+ int & timeoutMaxMs) override
{
+ int timeSincePingMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(now - _pingSent).count();
+ timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs);
return POLLIN;
}
+ /// Do we need to handle a timeout ?
+ void checkTimeout(std::chrono::steady_clock::time_point now) override
+ {
+ int timeSincePingMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(now - _pingSent).count();
+ if (timeSincePingMs >= PingFrequencyMs)
+ {
+ LOG_TRC("Send ping message");
+ // FIXME: allow an empty payload.
+ sendMessage("", 1, WSOpCode::Ping, false);
+ _pingSent = now;
+ }
+ }
+
/// By default rely on the socket buffer.
void performWrites() override {}
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index dfa97bc4..19819e26 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -316,7 +316,7 @@ void Admin::pollingThread()
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
int cpuWait = _cpuStatsTaskIntervalMs -
std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
- if (cpuWait < 0)
+ if (cpuWait <= 0)
{
// TODO: implement me ...
lastCPU = now;
@@ -324,7 +324,7 @@ void Admin::pollingThread()
}
int memWait = _memStatsTaskIntervalMs -
std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
- if (memWait < 0)
+ if (memWait <= 0)
{
std::unique_lock<std::mutex> modelLock(getLock());
const auto totalMem = getTotalMemoryUsage();
More information about the Libreoffice-commits
mailing list