[Libreoffice-commits] online.git: net/Socket.hpp net/WebSocketHandler.hpp wsd/Admin.cpp

Michael Meeks michael.meeks at collabora.com
Fri Mar 17 22:59:51 UTC 2017


 net/Socket.hpp           |    9 +++++++--
 net/WebSocketHandler.hpp |   45 +++++++++++++++++++++++++++++++++++++++------
 wsd/Admin.cpp            |    4 ++--
 3 files changed, 48 insertions(+), 10 deletions(-)

New commits:
commit a6a4094e52c3a7fafac4a617ca3956dc6228eb29
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Fri Mar 17 22:59:03 2017 +0000

    Send ping message, handle pong & store ping-time on the Websocket.

diff --git a/net/Socket.hpp b/net/Socket.hpp
index 268994a7..37b9478c 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -304,7 +304,7 @@ public:
         int rc;
         do
         {
-            rc = ::poll(&_pollFds[0], size + 1, timeoutMaxMs);
+            rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
         }
         while (rc < 0 && errno == EINTR);
         LOG_TRC("Poll completed with " << rc << " live polls max (" << timeoutMaxMs << "ms)"
@@ -524,6 +524,9 @@ public:
     virtual int getPollEvents(std::chrono::steady_clock::time_point now,
                               int &timeoutMaxMs) = 0;
 
+    /// Do we need to handle a timeout ?
+    virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {}
+
     /// Do some of the queued writing.
     virtual void performWrites() = 0;
 
@@ -673,11 +676,13 @@ protected:
 
     /// Called when a polling event is received.
     /// @events is the mask of events that triggered the wake.
-    HandleResult handlePoll(std::chrono::steady_clock::time_point /* now */,
+    HandleResult handlePoll(std::chrono::steady_clock::time_point now,
                             const int events) override
     {
         assert(isCorrectThread());
 
+        _socketHandler->checkTimeout(now);
+
         if (!events)
             return Socket::HandleResult::CONTINUE;
 
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 50c5d0e9..bb6af23b 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -23,6 +23,10 @@ protected:
     // The socket that owns us (we can't own it).
     std::weak_ptr<StreamSocket> _socket;
 
+    const int PingFrequencyMs = 18 * 1000;
+    std::chrono::steady_clock::time_point _pingSent;
+    int _pingTimeUs;
+
     std::vector<char> _wsPayload;
     bool _shuttingDown;
     enum class WSState { HTTP, WS } _wsState;
@@ -35,6 +39,8 @@ protected:
 
 public:
     WebSocketHandler() :
+        _pingSent(std::chrono::steady_clock::now()),
+        _pingTimeUs(0),
         _shuttingDown(false),
         _wsState(WSState::HTTP)
     {
@@ -44,6 +50,8 @@ public:
     WebSocketHandler(const std::weak_ptr<StreamSocket>& socket,
                      const Poco::Net::HTTPRequest& request) :
         _socket(socket),
+        _pingSent(std::chrono::steady_clock::now()),
+        _pingTimeUs(0),
         _shuttingDown(false),
         _wsState(WSState::HTTP)
     {
@@ -187,8 +195,16 @@ public:
         // FIXME: fin, aggregating payloads into _wsPayload etc.
         LOG_TRC("#" << socket->getFD() << ": Incoming WebSocket message code " << code << " fin? " << fin << " payload length " << _wsPayload.size());
 
-        if (code & WSOpCode::Close)
+        switch (code)
         {
+        case WSOpCode::Pong:
+            _pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _pingSent).count();
+            LOG_TRC("Pong received: " << _pingTimeUs << " microseconds");
+            break;
+        case WSOpCode::Ping:
+            LOG_ERR("Clients should not send pings, only servers");
+            // drop through
+        case WSOpCode::Close:
             if (!_shuttingDown)
             {
                 // Peer-initiated shutdown must be echoed.
@@ -207,10 +223,10 @@ public:
 
             // TCP Close.
             socket->shutdown();
-        }
-        else
-        {
+            break;
+        default:
             handleMessage(fin, code, _wsPayload);
+            break;
         }
 
         _wsPayload.clear();
@@ -225,12 +241,29 @@ public:
             ; // can have multiple msgs in one recv'd packet.
     }
 
-    int getPollEvents(std::chrono::steady_clock::time_point /* now */,
-                      int & /* timeoutMaxMs */) override
+    int getPollEvents(std::chrono::steady_clock::time_point now,
+                      int & timeoutMaxMs) override
     {
+        int timeSincePingMs =
+            std::chrono::duration_cast<std::chrono::milliseconds>(now - _pingSent).count();
+        timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs);
         return POLLIN;
     }
 
+    /// Do we need to handle a timeout ?
+    void checkTimeout(std::chrono::steady_clock::time_point now) override
+    {
+        int timeSincePingMs =
+            std::chrono::duration_cast<std::chrono::milliseconds>(now - _pingSent).count();
+        if (timeSincePingMs >= PingFrequencyMs)
+        {
+            LOG_TRC("Send ping message");
+            // FIXME: allow an empty payload.
+            sendMessage("", 1, WSOpCode::Ping, false);
+            _pingSent = now;
+        }
+    }
+
     /// By default rely on the socket buffer.
     void performWrites() override {}
 
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index dfa97bc4..19819e26 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -316,7 +316,7 @@ void Admin::pollingThread()
         std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
         int cpuWait = _cpuStatsTaskIntervalMs -
             std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
-        if (cpuWait < 0)
+        if (cpuWait <= 0)
         {
             // TODO: implement me ...
             lastCPU = now;
@@ -324,7 +324,7 @@ void Admin::pollingThread()
         }
         int memWait = _memStatsTaskIntervalMs -
             std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
-        if (memWait < 0)
+        if (memWait <= 0)
         {
             std::unique_lock<std::mutex> modelLock(getLock());
             const auto totalMem = getTotalMemoryUsage();


More information about the Libreoffice-commits mailing list