[Libreoffice-commits] online.git: Branch 'private/mmeeks/admin-nonblock' - 3 commits - net/Socket.hpp wsd/Admin.cpp wsd/Admin.hpp wsd/Exceptions.hpp wsd/LOOLWSD.cpp

Michael Meeks michael.meeks at collabora.com
Wed Mar 15 17:41:52 UTC 2017


 net/Socket.hpp     |    2 
 wsd/Admin.cpp      |  148 ++++++++++++++++++++++++++++-------------------------
 wsd/Admin.hpp      |   71 ++++++-------------------
 wsd/Exceptions.hpp |    9 ---
 wsd/LOOLWSD.cpp    |   12 +++-
 5 files changed, 107 insertions(+), 135 deletions(-)

New commits:
commit d19b6eb3512931bbb2dc45004ca75aef682d9709
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Mar 15 16:39:13 2017 +0000

    Move memstats & cpustats into the main polling thread.
    
    We can calculate the timeout ourselves easily and add it to the
    polling loop simplifying life.
    
    Also ensure we never send messages to a non-authenticated thread.

diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index e4b616b..a3b7035 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -32,7 +32,6 @@
 #include "FileServer.hpp"
 #include "IoUtil.hpp"
 #include "Protocol.hpp"
-#include "LOOLWebSocket.hpp" // FIXME: remove.
 #include "LOOLWSD.hpp"
 #include "Log.hpp"
 #include "Storage.hpp"
@@ -87,16 +86,18 @@ void AdminRequestHandler::handleMessage(bool /* fin */, WSOpCode /* code */, std
         }
         else
         {
-            sendTextFrame("InvalidAuthToken");
+            sendFrame("InvalidAuthToken");
             LOG_TRC("Invalid auth token");
+            shutdown();
             return;
         }
     }
 
     if (!_isAuthenticated)
     {
-        sendTextFrame("NotAuthenticated");
+        sendFrame("NotAuthenticated");
         LOG_TRC("Not authenticated");
+        shutdown();
         return;
     }
     else if (tokens[0] == "documents" ||
@@ -229,7 +230,11 @@ AdminRequestHandler::AdminRequestHandler(Admin* adminManager,
 void AdminRequestHandler::sendTextFrame(const std::string& message)
 {
     UnitWSD::get().onAdminQueryMessage(message);
-    sendFrame(message);
+    std::cerr << "Admin: send text frame '" << message << "'\n";
+    if (_isAuthenticated)
+        sendFrame(message);
+    else
+        LOG_TRC("Skip sending message to non-authenticated client: '" << message << "'");
 }
 
 bool AdminRequestHandler::handleInitialRequest(
@@ -273,7 +278,10 @@ bool AdminRequestHandler::handleInitialRequest(
 Admin::Admin() :
     SocketPoll("admin"),
     _model(AdminModel()),
-    _forKitPid(-1)
+    _forKitPid(-1),
+    _lastTotalMemory(0),
+    _memStatsTaskIntervalMs(5000),
+    _cpuStatsTaskIntervalMs(5000)
 {
     LOG_INF("Admin ctor.");
 
@@ -281,20 +289,54 @@ Admin::Admin() :
     const auto totalMem = getTotalMemoryUsage();
     LOG_TRC("Total memory used: " << totalMem);
     _model.addMemStats(totalMem);
-
-    _memStatsTask = new MemoryStatsTask(this);
-    _memStatsTimer.schedule(_memStatsTask, _memStatsTaskInterval, _memStatsTaskInterval);
-
-    _cpuStatsTask = new CpuStats(this);
-    _cpuStatsTimer.schedule(_cpuStatsTask, _cpuStatsTaskInterval, _cpuStatsTaskInterval);
 }
 
 Admin::~Admin()
 {
     LOG_INF("~Admin dtor.");
+}
+
+void Admin::pollingThread()
+{
+    std::chrono::steady_clock::time_point lastCPU, lastMem;
+
+    lastCPU = std::chrono::steady_clock::now();
+    lastMem = lastCPU;
+
+    while (!_stop && !TerminationFlag && !ShutdownRequestFlag)
+    {
+        std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
+        int cpuWait = _cpuStatsTaskIntervalMs -
+            std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
+        if (cpuWait < 0)
+        {
+            // TODO: implement me ...
+            lastCPU = now;
+            cpuWait += _cpuStatsTaskIntervalMs;
+        }
+        int memWait = _memStatsTaskIntervalMs -
+            std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count();
+        if (memWait < 0)
+        {
+            std::unique_lock<std::mutex> modelLock(getLock());
+            const auto totalMem = getTotalMemoryUsage();
+            if (totalMem != _lastTotalMemory)
+            {
+                LOG_TRC("Total memory used: " << totalMem);
+                _lastTotalMemory = totalMem;
+            }
+
+            _model.addMemStats(totalMem);
 
-    _memStatsTask->cancel();
-    _cpuStatsTask->cancel();
+            lastMem = now;
+            memWait += _memStatsTaskIntervalMs;
+        }
+
+        // Handle websockets & other work.
+        int timeout = std::min(cpuWait, memWait);
+        LOG_TRC("Admin poll for " << timeout << "ms");
+        poll(timeout);
+    }
 }
 
 void Admin::addDoc(const std::string& docKey, Poco::Process::PID pid, const std::string& filename, const std::string& sessionId)
@@ -316,43 +358,18 @@ void Admin::rmDoc(const std::string& docKey)
     _model.removeDocument(docKey);
 }
 
-void MemoryStatsTask::run()
-{
-    std::unique_lock<std::mutex> modelLock(_admin->getLock());
-    const auto totalMem = _admin->getTotalMemoryUsage();
-
-    if (totalMem != _lastTotalMemory)
-    {
-        LOG_TRC("Total memory used: " << totalMem);
-        _lastTotalMemory = totalMem;
-    }
-
-    _admin->getModel().addMemStats(totalMem);
-}
-
-void CpuStats::run()
-{
-    //TODO: Implement me
-    //std::unique_lock<std::mutex> modelLock(_admin->getLock());
-    //model.addCpuStats(totalMem);
-}
-
 void Admin::rescheduleMemTimer(unsigned interval)
 {
-    _memStatsTask->cancel();
-    _memStatsTaskInterval = interval;
-    _memStatsTask = new MemoryStatsTask(this);
-    _memStatsTimer.schedule(_memStatsTask, _memStatsTaskInterval, _memStatsTaskInterval);
+    _memStatsTaskIntervalMs = interval;
     LOG_INF("Memory stats interval changed - New interval: " << interval);
+    wakeup();
 }
 
 void Admin::rescheduleCpuTimer(unsigned interval)
 {
-    _cpuStatsTask->cancel();
-    _cpuStatsTaskInterval = interval;
-    _cpuStatsTask = new CpuStats(this);
-    _cpuStatsTimer.schedule(_cpuStatsTask, _cpuStatsTaskInterval, _cpuStatsTaskInterval);
+    _cpuStatsTaskIntervalMs = interval;
     LOG_INF("CPU stats interval changed - New interval: " << interval);
+    wakeup();
 }
 
 unsigned Admin::getTotalMemoryUsage()
@@ -373,12 +390,12 @@ unsigned Admin::getTotalMemoryUsage()
 
 unsigned Admin::getMemStatsInterval()
 {
-    return _memStatsTaskInterval;
+    return _memStatsTaskIntervalMs;
 }
 
 unsigned Admin::getCpuStatsInterval()
 {
-    return _cpuStatsTaskInterval;
+    return _cpuStatsTaskIntervalMs;
 }
 
 AdminModel& Admin::getModel()
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index 2b8d4f0..da51cdc 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -22,7 +22,6 @@
 
 #include "AdminModel.hpp"
 #include "Log.hpp"
-#include <LOOLWebSocket.hpp>
 
 #include "net/WebSocketHandler.hpp"
 
@@ -76,6 +75,9 @@ public:
         startThread();
     }
 
+    /// Custom poll thread function
+    void pollingThread() override;
+
     unsigned getTotalMemoryUsage();
 
     /// Update the Admin Model.
@@ -114,58 +116,10 @@ private:
     AdminModel _model;
     std::mutex _modelMutex;
     int _forKitPid;
-
-    Poco::Util::Timer _memStatsTimer;
-    Poco::AutoPtr<MemoryStatsTask> _memStatsTask;
-    unsigned _memStatsTaskInterval = 5000;
-
-    Poco::Util::Timer _cpuStatsTimer;
-    Poco::Util::TimerTask::Ptr _cpuStatsTask;
-    unsigned _cpuStatsTaskInterval = 5000;
-};
-
-/// Memory statistics.
-class MemoryStatsTask : public Poco::Util::TimerTask
-{
-public:
-    MemoryStatsTask(Admin* admin)
-        : _admin(admin),
-          _lastTotalMemory(0)
-    {
-        LOG_DBG("Memory stat ctor");
-    }
-
-    ~MemoryStatsTask()
-    {
-        LOG_DBG("Memory stat dtor");
-    }
-
-    long getLastTotalMemory() { return _lastTotalMemory; }
-
-    void run() override;
-
-private:
-    Admin* _admin;
     long _lastTotalMemory;
-};
-
-/// CPU statistics.
-class CpuStats : public Poco::Util::TimerTask
-{
-public:
-    CpuStats(Admin* /*admin*/)
-    {
-        LOG_DBG("Cpu stat ctor");
-    }
 
-    ~CpuStats()
-    {
-        LOG_DBG("Cpu stat dtor");
-    }
-
-    void run() override;
-
-private:
+    std::atomic<int> _memStatsTaskIntervalMs;
+    std::atomic<int> _cpuStatsTaskIntervalMs;
 };
 
 #endif
diff --git a/wsd/Exceptions.hpp b/wsd/Exceptions.hpp
index b05d98d..b97263e 100644
--- a/wsd/Exceptions.hpp
+++ b/wsd/Exceptions.hpp
@@ -58,15 +58,6 @@ public:
     using LoolException::LoolException;
 };
 
-/// An generic error-message exception meant to
-/// propagate via a valid LOOLWebSocket to the client.
-/// The contents of what() will be displayed on screen.
-class WebSocketErrorMessageException : public LoolException
-{
-public:
-    using LoolException::LoolException;
-};
-
 #endif
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
commit 0089723f693a8d9549b9abb7236c2f5bae4d633a
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Mar 15 16:25:29 2017 +0000

    Admin: review error handling on auth. failure.

diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index d3e684b..e4b616b 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -232,10 +232,10 @@ void AdminRequestHandler::sendTextFrame(const std::string& message)
     sendFrame(message);
 }
 
-bool AdminRequestHandler::handleInitialRequest(const std::weak_ptr<StreamSocket> &socketWeak,
-                                               const Poco::Net::HTTPRequest& request)
+bool AdminRequestHandler::handleInitialRequest(
+    const std::weak_ptr<StreamSocket> &socketWeak,
+    const Poco::Net::HTTPRequest& request)
 {
-    HTTPResponse response;
     auto socket = socketWeak.lock();
 
     // Different session id pool for admin sessions (?)
@@ -260,28 +260,12 @@ bool AdminRequestHandler::handleInitialRequest(const std::weak_ptr<StreamSocket>
         return true;
     }
 
-// FIXME: ... should we move ourselves to an Admin poll [!?] ...
-//  Probably [!] ... and use this termination thing ? ...
-//  Perhaps that should be the 'Admin' instance / sub-class etc.
-//  FIXME: have name 'admin' etc.
-//                            []() { return TerminationFlag.load(); });
-#if 0
-    catch(const Poco::Net::NotAuthenticatedException& exc)
-    {
-        LOG_INF("Admin::NotAuthenticated");
-        response.set("WWW-Authenticate", "Basic realm=\"online\"");
-        response.setStatusAndReason(HTTPResponse::HTTP_UNAUTHORIZED);
-        response.setContentLength(0);
-        socket->send(response);
-    }
-    catch (const std::exception& exc)
-    {
-        LOG_INF("Admin::handleRequest: Exception: " << exc.what());
-        response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
-        response.setContentLength(0);
-        socket->send(response);
-    }
-#endif
+    HTTPResponse response;
+    response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
+    response.setContentLength(0);
+    LOG_INF("Admin::handleInitialRequest bad request");
+    socket->send(response);
+
     return false;
 }
 
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index 4332e27..2b8d4f0 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -51,7 +51,6 @@ private:
 
 private:
     Admin* _admin;
-//    std::shared_ptr<LOOLWebSocket> _adminWs; FIXME - this is us now !
     int _sessionId;
     bool _isAuthenticated;
 };
commit 909b5f8ac3aa57c4bc786fa03c5d999805e6d7ca
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Mar 15 16:13:13 2017 +0000

    Admin: should be its own socket-poll goodness.

diff --git a/net/Socket.hpp b/net/Socket.hpp
index 1d67bb6..9460c45 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -412,7 +412,7 @@ public:
         wakeup();
     }
 
-    void dumpState(std::ostream& os);
+    virtual void dumpState(std::ostream& os);
 
     /// Removes a socket from this poller.
     /// NB. this must be called from the socket poll that
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 6f9f7cb..d3e684b 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -232,7 +232,7 @@ void AdminRequestHandler::sendTextFrame(const std::string& message)
     sendFrame(message);
 }
 
-void AdminRequestHandler::handleInitialRequest(const std::weak_ptr<StreamSocket> &socketWeak,
+bool AdminRequestHandler::handleInitialRequest(const std::weak_ptr<StreamSocket> &socketWeak,
                                                const Poco::Net::HTTPRequest& request)
 {
     HTTPResponse response;
@@ -257,6 +257,7 @@ void AdminRequestHandler::handleInitialRequest(const std::weak_ptr<StreamSocket>
             handler->_sessionId = sessionId;
             model.subscribe(sessionId, handler);
         }
+        return true;
     }
 
 // FIXME: ... should we move ourselves to an Admin poll [!?] ...
@@ -281,10 +282,12 @@ void AdminRequestHandler::handleInitialRequest(const std::weak_ptr<StreamSocket>
         socket->send(response);
     }
 #endif
+    return false;
 }
 
 /// An admin command processor.
 Admin::Admin() :
+    SocketPoll("admin"),
     _model(AdminModel()),
     _forKitPid(-1)
 {
@@ -411,4 +414,10 @@ void Admin::updateMemoryDirty(const std::string& docKey, int dirty)
     _model.updateMemoryDirty(docKey, dirty);
 }
 
+void Admin::dumpState(std::ostream& os)
+{
+    // FIXME: be more helpful ...
+    SocketPoll::dumpState(os);
+}
+
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index c15cbbb..4332e27 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -36,7 +36,9 @@ public:
                         const std::weak_ptr<StreamSocket>& socket,
                         const Poco::Net::HTTPRequest& request);
 
-    static void handleInitialRequest(const std::weak_ptr<StreamSocket> &socket,
+    /// Handle the initial Admin WS upgrade request.
+    /// @returns true if we should give this socket to the Admin poll.
+    static bool handleInitialRequest(const std::weak_ptr<StreamSocket> &socket,
                                      const Poco::Net::HTTPRequest& request);
 
 private:
@@ -57,7 +59,7 @@ private:
 class MemoryStatsTask;
 
 /// An admin command processor.
-class Admin
+class Admin : public SocketPoll
 {
     Admin();
 public:
@@ -69,6 +71,12 @@ public:
         return admin;
     }
 
+    void start()
+    {
+        // FIXME: not if admin console is not enabled ?
+        startThread();
+    }
+
     unsigned getTotalMemoryUsage();
 
     /// Update the Admin Model.
@@ -101,6 +109,8 @@ public:
     void updateLastActivityTime(const std::string& docKey);
     void updateMemoryDirty(const std::string& docKey, int dirty);
 
+    void dumpState(std::ostream& os) override;
+
 private:
     AdminModel _model;
     std::mutex _modelMutex;
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 3aee14d..81608b6 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1808,8 +1808,12 @@ private:
             else if (reqPathSegs.size() >= 2 && reqPathSegs[0] == "lool" && reqPathSegs[1] == "adminws")
             {
                 LOG_ERR("Admin request: " << request.getURI());
-                AdminRequestHandler::handleInitialRequest(_socket, request);
-
+                if (AdminRequestHandler::handleInitialRequest(_socket, request))
+                {
+                    // Hand the socket over to the Admin poll.
+                    WebServerPoll.releaseSocket(socket);
+                    Admin::instance().insertNewSocket(socket);
+                }
             }
             // Client post and websocket connections
             else if ((request.getMethod() == HTTPRequest::HTTP_GET ||
@@ -2336,6 +2340,7 @@ public:
         _acceptPoll.insertNewSocket(findServerPort(port));
         _acceptPoll.startThread();
         WebServerPoll.startThread();
+        Admin::instance().start();
     }
 
     void stop()
@@ -2364,6 +2369,9 @@ public:
         os << "Prisoner poll:\n";
         PrisonerPoll.dumpState(os);
 
+        os << "Admin poll:\n";
+        Admin::instance().dumpState(os);
+
         os << "Document Broker polls "
                   << "[ " << DocBrokers.size() << " ]:\n";
         for (auto &i : DocBrokers)


More information about the Libreoffice-commits mailing list