[Libreoffice-commits] online.git: loolwsd/Admin.cpp loolwsd/Admin.hpp loolwsd/AdminModel.hpp loolwsd/LOOLBroker.cpp loolwsd/LOOLKit.cpp

Pranav Kant pranavk at collabora.com
Thu Mar 3 18:30:51 UTC 2016


 loolwsd/Admin.cpp      |   26 +---
 loolwsd/Admin.hpp      |    4 
 loolwsd/AdminModel.hpp |  302 ++++++++++++++++++++++++++++++++++++++++++++++++-
 loolwsd/LOOLBroker.cpp |   10 -
 loolwsd/LOOLKit.cpp    |   31 +++++
 5 files changed, 347 insertions(+), 26 deletions(-)

New commits:
commit ada6a74dc019ba30f00e761452da4185ef91306a
Author: Pranav Kant <pranavk at collabora.com>
Date:   Thu Mar 3 23:31:37 2016 +0530

    loolwsd: Basic layout and interaction with AdminModel
    
    Admin web sessions are added as subscribers to AdminModel. Live
    notification fill up the AdminModel, and notifies to
    subscribers, if present any. AdminModel can also be queried to
    fetch any previous data since the start of the server including
    expired documents/views with timestamps for analysis.
    
    There is lot of stuff that can be added in future. This commit
    just lays the foundation of appropriate classes.
    
    Change-Id: Ifcf6c2896ef46b33935802e79cd28240fd4f980e
    Reviewed-on: https://gerrit.libreoffice.org/22869
    Reviewed-by: Tor Lillqvist <tml at collabora.com>
    Tested-by: Tor Lillqvist <tml at collabora.com>

diff --git a/loolwsd/Admin.cpp b/loolwsd/Admin.cpp
index e30d55e..ba1d154 100644
--- a/loolwsd/Admin.cpp
+++ b/loolwsd/Admin.cpp
@@ -61,6 +61,11 @@ public:
             Log::debug("Thread [" + thread_name + "] started.");
 
             auto ws = std::make_shared<WebSocket>(request, response);
+
+            // Subscribe the websocket of any AdminModel updates
+            AdminModel& model = _admin->getModel();
+            model.subscribe(ws);
+
             const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
             int flags = 0;
             int n = 0;
@@ -138,7 +143,7 @@ public:
                         else if (tokens.count() == 1 && tokens[0] == "documents")
                         {
 
-                            std::string response = "documents " + _admin->getDocuments();
+                            std::string response = "documents " + model.query("documents");
                             ws->sendFrame(response.data(), response.size());
                         }
                     }
@@ -222,21 +227,9 @@ Admin::~Admin()
     _srv.stop();
 }
 
-std::string Admin::getDocuments()
-{
-    return _model.getDocuments();
-}
-
 void Admin::handleInput(std::string& message)
 {
-    StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-    if (tokens.count() > 2 && tokens[0] == "document")
-    {
-        std::string pid = tokens[1];
-        std::string url = tokens[2];
-
-        _model.addDocument(std::stoi(pid), url);
-    }
+    _model.update(message);
 }
 
 void Admin::run()
@@ -265,6 +258,11 @@ void Admin::run()
     Log::debug("Thread [" + thread_name + "] finished.");
 }
 
+AdminModel& Admin::getModel()
+{
+    return _model;
+}
+
 //TODO: Clean up with something more elegant.
 int Admin::BrokerPipe;
 int Admin::NotifyPipe;
diff --git a/loolwsd/Admin.hpp b/loolwsd/Admin.hpp
index 0356213..1f1146e 100644
--- a/loolwsd/Admin.hpp
+++ b/loolwsd/Admin.hpp
@@ -28,10 +28,10 @@ public:
 
     static int getBrokerPid() { return Admin::BrokerPipe; }
 
-    std::string getDocuments();
-
     void run() override;
 
+    AdminModel& getModel();
+
 private:
     void handleInput(std::string& message);
 
diff --git a/loolwsd/AdminModel.hpp b/loolwsd/AdminModel.hpp
index dd7cea3..c9abe0a 100644
--- a/loolwsd/AdminModel.hpp
+++ b/loolwsd/AdminModel.hpp
@@ -12,8 +12,157 @@
 
 #include "config.h"
 
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <Poco/Net/WebSocket.h>
+#include <Poco/Process.h>
+#include <Poco/StringTokenizer.h>
+
 #include "Util.hpp"
 
+class View
+{
+public:
+    View(int nSessionId)
+        : _nSessionId(nSessionId),
+          _start(std::time(nullptr))
+    {    }
+
+    void expire()
+    {
+        _end = std::time(nullptr);
+    }
+
+    bool isExpired()
+    {
+        return _end != 0 && std::time(nullptr) > _end;
+    }
+
+private:
+    int _nSessionId;
+
+    std::time_t _start;
+    std::time_t _end = 0;
+};
+
+class Document
+{
+public:
+    Document(Poco::Process::PID nPid, std::string sUrl)
+        : _nPid(nPid),
+          _sUrl(sUrl),
+          _start(std::time(nullptr))
+    {
+        Log::info("Document " + std::to_string(_nPid) + " ctor.");
+    }
+
+    ~Document()
+    {
+        Log::info("Document " + std::to_string(_nPid) + " dtor.");
+    }
+
+    Poco::Process::PID getPid() const
+    {
+        return _nPid;
+    }
+
+    std::string getUrl() const
+    {
+        return _sUrl;
+    }
+
+    void expire()
+    {
+        _end = std::time(nullptr);
+    }
+
+    bool isExpired() const
+    {
+        return _end != 0 && std::time(nullptr) > _end;
+    }
+
+    void addView(int nSessionId)
+    {
+        std::pair<std::map<int, View>::iterator, bool > ret;
+        ret = _views.insert(std::pair<int, View>(nSessionId, View(nSessionId)));
+        if (!ret.second)
+        {
+            Log::warn() << "View with SessionID [" + std::to_string(nSessionId) + "] already exists." << Log::end;
+        }
+        else
+        {
+            _nViews++;
+        }
+    }
+
+    void removeView(int nSessionId)
+    {
+        auto it = _views.find(nSessionId);
+        if (it != _views.end())
+        {
+            it->second.expire();
+            _nViews--;
+        }
+    }
+
+    unsigned getTotalViews() const
+    {
+        return _nViews;
+    }
+
+private:
+    Poco::Process::PID _nPid;
+    /// SessionId mapping to View object
+    std::map<int, View> _views;
+    /// Total number of active views
+    unsigned _nViews = 0;
+    /// Hosted URL
+    std::string _sUrl;
+
+    std::time_t _start;
+    std::time_t _end = 0;
+};
+
+class Subscriber
+{
+public:
+    Subscriber(std::shared_ptr<Poco::Net::WebSocket>& ws)
+        : _ws(ws)
+    {
+        Log::info("Subscriber ctor.");
+    }
+
+    ~Subscriber()
+    {
+        Log::info("Subscriber dtor.");
+    }
+
+    bool notify(const std::string& message)
+    {
+        auto webSocket = _ws.lock();
+        if (webSocket)
+        {
+            webSocket->sendFrame(message.data(), message.length());
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+private:
+    /// WebSocket to use to send messages to session
+    std::weak_ptr<Poco::Net::WebSocket> _ws;
+
+    /// In case of huge number of documents,
+    /// client can tell us the specific page it is
+    /// interested in getting live notifications
+    unsigned _currentPage;
+};
+
 class AdminModel
 {
 public:
@@ -27,24 +176,165 @@ public:
         Log::info("AdminModel dtor.");
     }
 
+    void update(const std::string& data)
+    {
+        Poco::StringTokenizer tokens(data, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM);
+
+        Log::info("AdminModel Recv: " + data);
+
+        if (tokens[0] == "document")
+        {
+            addDocument(std::stoi(tokens[1]), tokens[2]);
+            unsigned mem = getMemoryUsage(std::stoi(tokens[1]));
+            std::string response = data + std::to_string(mem);
+            notify(response);
+            return;
+        }
+        else if (tokens[0] == "addview")
+        {
+            auto it = _documents.find(std::stoi(tokens[1]));
+            if (it != _documents.end())
+            {
+                const unsigned nSessionId = Util::decodeId(tokens[2]);
+                it->second.addView(nSessionId);
+            }
+        }
+        else if (tokens[0] == "rmview")
+        {
+            auto it = _documents.find(std::stoi(tokens[1]));
+            if (it != _documents.end())
+            {
+                const unsigned nSessionId = Util::decodeId(tokens[2]);
+                it->second.removeView(nSessionId);
+            }
+        }
+        else if (tokens[0] == "rmdoc")
+        {
+            removeDocument(std::stoi(tokens[1]));
+        }
+
+        notify(data);
+    }
+
+    std::string query(const std::string command)
+    {
+        Poco::StringTokenizer tokens(command, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM);
+
+        if (tokens[0] == "documents")
+        {
+            return getDocuments();
+        }
+
+        return std::string("");
+    }
+
+    unsigned getMemoryUsage(Poco::Process::PID nPid)
+    {
+        //TODO: Instead of RSS, show PSS
+        std::string sResponse;
+        const auto cmd = "ps o rss= -p " + std::to_string(nPid);
+        FILE* fp = popen(cmd.c_str(), "r");
+        if (fp == nullptr)
+        {
+            return 0;
+        }
+
+        char cmdBuffer[1024];
+        while (fgets(cmdBuffer, sizeof(cmdBuffer) - 1, fp) != nullptr)
+        {
+            sResponse += cmdBuffer;
+        }
+        pclose(fp);
+
+        unsigned nMem;
+        try
+        {
+            nMem = std::stoi(sResponse);
+        }
+        catch(std::exception& e)
+        {
+            Log::warn() << "Trying to find memory of invalid/dead PID" << Log::end;
+        }
+
+        return nMem;
+    }
+
+    void subscribe(std::shared_ptr<Poco::Net::WebSocket>& ws)
+    {
+        _subscribers.push_back(ws);
+    }
+
+private:
+    // FIXME: we have a problem if new document to be added has PID = expired document in the map
+    // Prolly, *move* expired documents to another container (?)
     void addDocument(Poco::Process::PID pid, std::string url)
     {
-        _documents[pid] = url;
+        std::pair<std::map<Poco::Process::PID, Document>::iterator, bool > ret;
+        ret = _documents.insert(std::pair<Poco::Process::PID, Document>(pid, Document(pid, url)));
+        if (!ret.second)
+        {
+            Log::warn() << "Document with PID [" + std::to_string(pid) + "] already exists." << Log::end;
+        }
+        else
+        {
+            _nDocuments++;
+        }
+    }
+
+    void removeDocument(Poco::Process::PID pid)
+    {
+        auto it = _documents.find(pid);
+        if (it != _documents.end())
+        {
+            it->second.expire();
+            _nDocuments--;
+        }
+    }
+
+    void notify(const std::string& message)
+    {
+        auto it = std::begin(_subscribers);
+        while (it != std::end(_subscribers))
+        {
+            if (!it->notify(message))
+            {
+                it = _subscribers.erase(it);
+            }
+            else
+            {
+                it++;
+            }
+        }
     }
 
     std::string getDocuments()
     {
-        std::string response;
-        for (const auto& it: _documents)
+        std::ostringstream oss;
+        for (auto& it: _documents)
         {
-            response += std::to_string(it.first)  + " " + it.second + " <BR/>";
+            if (it.second.isExpired())
+                continue;
+
+            std::string sPid = std::to_string(it.second.getPid());
+            std::string sUrl = it.second.getUrl();
+            std::string sViews = std::to_string(it.second.getTotalViews());
+            std::string sMem = std::to_string(getMemoryUsage(it.second.getPid()));
+
+            oss << sPid << " "
+                << sUrl << " "
+                << sViews << " "
+                << sMem << " \n ";
         }
 
-        return response;
+        return oss.str();
     }
 
 private:
-    std::map<Poco::Process::PID, std::string> _documents;
+    std::vector<Subscriber> _subscribers;
+    std::map<Poco::Process::PID, Document> _documents;
+
+    /// Number of active documents
+    unsigned _nDocuments;
 };
 
 #endif
diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp
index fcf1333..fe35be3 100644
--- a/loolwsd/LOOLBroker.cpp
+++ b/loolwsd/LOOLBroker.cpp
@@ -97,6 +97,12 @@ namespace
             {
                 if (kill(_pid, SIGINT) != 0 && kill(_pid, 0) != 0)
                     Log::warn("Cannot terminate lokit [" + std::to_string(_pid) + "]. Abandoning.");
+
+                std::ostringstream message;
+                message << "rmdoc" << " "
+                        << _pid << " "
+                        << "\r\n";
+                Util::writeFIFO(writerNotify, message.str());
                _pid = -1;
             }
 
@@ -247,10 +253,6 @@ public:
         }
 
         StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-        if (tokens.count() > 1 && tokens[1] == "ok")
-        {
-            Util::writeFIFO(writerNotify, "document " + std::to_string(pid) + " "  + url + " \r\n");
-        }
         return (tokens.count() == 2 && tokens[0] == std::to_string(pid) && tokens[1] == "ok");
     }
 
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index e9a6fbb..b2d0eda 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -658,6 +658,14 @@ private:
             // Retake the lock.
             lock.lock();
 
+            // Notify the Admin thread
+            std::ostringstream message;
+            message << "document" << " "
+                    << Process::id() << " "
+                    << _url << " "
+                    << "\r\n";
+            Util::writeFIFO(writerNotify, message.str());
+
             if (_multiView)
             {
                 Log::info("Loading view to document from URI: [" + uri + "] for session [" + sessionId + "].");
@@ -698,6 +706,14 @@ private:
         }
 
         ++_clientViews;
+
+        std::ostringstream message;
+        message << "addview" << " "
+                << Process::id() << " "
+                << sessionId << " "
+                << "\r\n";
+        Util::writeFIFO(writerNotify, message.str());
+
         return _loKitDocument;
     }
 
@@ -714,6 +730,14 @@ private:
         }
 
         --_clientViews;
+
+        std::ostringstream message;
+        message << "rmview" << " "
+                << Process::id() << " "
+                << sessionId << " "
+                << "\r\n";
+        Util::writeFIFO(writerNotify, message.str());
+
         Log::info("Session " + sessionId + " is unloading. " + std::to_string(_clientViews) + " views will remain.");
 
         if (_multiView && _loKitDocument)
@@ -1055,6 +1079,13 @@ void lokit_main(const std::string& childRoot,
     if (loKit)
         loKit->pClass->destroy(loKit);
 
+    std::ostringstream message;
+    message << "rmdoc" << " "
+            << Process::id() << " "
+            << "\r\n";
+    Util::writeFIFO(writerNotify, message.str());
+    close(writerNotify);
+
     Log::info("Process [" + process_name + "] finished.");
 }
 


More information about the Libreoffice-commits mailing list