[Libreoffice-commits] online.git: 2 commits - loolwsd/LOOLKit.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Mon Oct 10 06:32:37 UTC 2016


 loolwsd/LOOLKit.cpp |  254 ++++++++--------------------------------------------
 1 file changed, 42 insertions(+), 212 deletions(-)

New commits:
commit e0ff6eef6b636789b0f7e849df77a6047fec5bae
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Oct 9 17:18:29 2016 -0400

    loolwsd: kill Connection object in child
    
    Change-Id: Ic4d0d3e7286272a0765b299824dfa3556fe56f4b
    Reviewed-on: https://gerrit.libreoffice.org/29652
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 95d3f8c..852be7a 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -258,123 +258,6 @@ namespace
 #endif
 }
 
-/// Connection thread with a client (via WSD).
-class Connection: public Runnable
-{
-public:
-    Connection(std::shared_ptr<ChildSession> session,
-               std::shared_ptr<WebSocket> ws) :
-        _sessionId(session->getId()),
-        _session(std::move(session)),
-        _ws(std::move(ws)),
-        _threadMutex(),
-        _joined(false)
-    {
-        Log::info("Connection ctor in child for " + _sessionId);
-    }
-
-    ~Connection()
-    {
-        Log::info("~Connection dtor in child for " + _sessionId);
-        stop();
-        join();
-    }
-
-    const std::string& getSessionId() const { return _sessionId; };
-    std::shared_ptr<WebSocket> getWebSocket() const { return _ws; }
-    std::shared_ptr<ChildSession> getSession() { return _session; }
-
-    void start()
-    {
-        _thread.start(*this);
-
-        // Busy-wait until we run.
-        // This is important to make sure we can process
-        // callbacks, which if we're late to start will
-        // be dropped. No need for async notification here.
-        constexpr auto delay = COMMAND_TIMEOUT_MS / 20;
-        for (auto i = 0; i < 20 && !isRunning(); ++i)
-        {
-            std::this_thread::sleep_for(std::chrono::milliseconds(delay));
-        }
-    }
-
-    bool isRunning()
-    {
-        return _thread.isRunning();
-    }
-
-    void stop()
-    {
-        // What should we do here?
-    }
-
-    void join()
-    {
-        // The thread is joinable only once.
-        std::unique_lock<std::mutex> lock(_threadMutex);
-        if (!_joined)
-        {
-            _thread.join();
-            _joined = true;
-        }
-    }
-
-    void run() override
-    {
-        Util::setThreadName("kit_ws_" + _sessionId);
-
-        Log::debug("Thread started.");
-        try
-        {
-            IoUtil::SocketProcessor(_ws,
-                [this](const std::vector<char>& payload)
-                {
-                    if (!_session->handleInput(payload.data(), payload.size()))
-                    {
-                        Log::info("Socket handler flagged for finishing.");
-                        return false;
-                    }
-
-                    return true;
-                },
-                [this]() { _session->closeFrame(); },
-                []() { return !!TerminationFlag; });
-
-            if (_session->isCloseFrame())
-            {
-                Log::trace("Normal close handshake.");
-                _ws->shutdown();
-            }
-            else
-            {
-                Log::trace("Abnormal close handshake.");
-               _ws->shutdown(WebSocket::WS_ENDPOINT_GOING_AWAY, SERVICE_UNAVALABLE_INTERNAL_ERROR);
-            }
-        }
-        catch (const Exception& exc)
-        {
-            Log::error() << "Connection::run: Exception: " << exc.displayText()
-                         << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
-                         << Log::end;
-        }
-        catch (const std::exception& exc)
-        {
-            Log::error(std::string("Connection::run: Exception: ") + exc.what());
-        }
-
-        Log::debug("Thread finished.");
-    }
-
-private:
-    const std::string _sessionId;
-    Thread _thread;
-    std::shared_ptr<ChildSession> _session;
-    std::shared_ptr<WebSocket> _ws;
-    std::mutex _threadMutex;
-    std::atomic<bool> _joined;
-};
-
 /// A document container.
 /// Owns LOKitDocument instance and connections.
 /// Manages the lifetime of a document.
@@ -464,10 +347,10 @@ public:
 
             if (!_sessions.emplace(sessionId, session).second)
             {
-                Log::error("Connection already exists for child: " + _jailId + ", session: " + sessionId);
+                Log::error("Session already exists for child: " + _jailId + ", session: " + sessionId);
             }
 
-            Log::debug("Connections: " + std::to_string(_sessions.size()));
+            Log::debug("Sessions: " + std::to_string(_sessions.size()));
             return true;
         }
         catch (const std::exception& ex)
@@ -539,7 +422,7 @@ public:
 
     /// Returns true if at least one *live* connection exists.
     /// Does not consider user activity, just socket status.
-    bool hasConnections()
+    bool hasSessions()
     {
         // -ve values for failure.
         return purgeSessions() != 0;
@@ -550,7 +433,7 @@ public:
     bool canDiscard()
     {
         //TODO: Implement proper time-out on inactivity.
-        return !hasConnections();
+        return !hasSessions();
     }
 
     /// Set Document password for given URL
@@ -1296,7 +1179,7 @@ private:
                             }
                             else
                             {
-                                Log::error() << "Connection thread for session " << session->getId() << " for view "
+                                Log::error() << "Session thread for session " << session->getId() << " for view "
                                              << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type)
                                              << "] payload [" << payload << "]." << Log::end;
                             }
commit e33eff4abdc4a3145b5bf7a9b6def2fce63d64a6
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Oct 9 17:05:24 2016 -0400

    loolwsd: cleanup of Connection in ChildSession
    
    Change-Id: I07636163df7b2973dada55b9704abf7105ad285f
    Reviewed-on: https://gerrit.libreoffice.org/29651
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 17f7f3f..95d3f8c 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -428,62 +428,21 @@ public:
 
         _tileQueue->put("eof");
         _callbackThread.join();
-
-        // Flag all connections to stop.
-        for (auto aIterator : _connections)
-        {
-            aIterator.second->stop();
-        }
-
-        // Destroy all connections and views.
-        for (auto aIterator : _connections)
-        {
-            try
-            {
-                // stop all websockets
-                if (aIterator.second->isRunning())
-                {
-                    std::shared_ptr<WebSocket> ws = aIterator.second->getWebSocket();
-                    if (ws)
-                    {
-                        ws->shutdownReceive();
-                        aIterator.second->join();
-                    }
-                }
-            }
-            catch(NetException& exc)
-            {
-                Log::error() << "Document::~Document: " << exc.displayText()
-                             << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
-                             << Log::end;
-            }
-        }
-
-        // Destroy all connections and views.
-        _connections.clear();
     }
 
     const std::string& getUrl() const { return _url; }
 
-    bool createSession(const std::string& sessionId, const unsigned intSessionId)
+    bool createSession(const std::string& sessionId)
     {
         std::unique_lock<std::mutex> lock(_mutex);
 
         try
         {
-            const auto& it = _connections.find(intSessionId);
-            if (it != _connections.end())
+            const auto& it = _sessions.find(sessionId);
+            if (it != _sessions.end())
             {
-                // found item, check if still running
-                if (it->second->isRunning())
-                {
-                    Log::warn("Session [" + sessionId + "] is already running.");
-                    return true;
-                }
-
-                // Restore thread. TODO: Review this logic.
-                Log::warn("Session [" + sessionId + "] is not running. Restoring.");
-                _connections.erase(intSessionId);
+                Log::warn("Session [" + sessionId + "] is already running.");
+                return true;
             }
 
             Log::info() << "Creating " << (_clientViews ? "new" : "first")
@@ -503,18 +462,12 @@ public:
 
             auto session = std::make_shared<ChildSession>(sessionId, ws, _jailId, *this);
 
-            auto thread = std::make_shared<Connection>(session, ws);
-            const auto aInserted = _connections.emplace(intSessionId, thread);
-            if (aInserted.second)
-            {
-                thread->start();
-            }
-            else
+            if (!_sessions.emplace(sessionId, session).second)
             {
                 Log::error("Connection already exists for child: " + _jailId + ", session: " + sessionId);
             }
 
-            Log::debug("Connections: " + std::to_string(_connections.size()));
+            Log::debug("Connections: " + std::to_string(_sessions.size()));
             return true;
         }
         catch (const std::exception& ex)
@@ -531,7 +484,7 @@ public:
     {
         std::vector<std::shared_ptr<ChildSession>> deadSessions;
         size_t numRunning = 0;
-        size_t num_connections = 0;
+        size_t num_sessions = 0;
         {
             std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
             if (!lock.try_lock())
@@ -544,20 +497,20 @@ public:
             // bluntly exit, no need to clean up our own data structures. Also, there is a bug that
             // causes the deadSessions.clear() call below to crash in some situations when the last
             // session is being removed.
-            for (auto it = _connections.cbegin(); it != _connections.cend(); ++it)
+            for (auto it = _sessions.cbegin(); it != _sessions.cend(); ++it)
             {
-                if (it->second->isRunning())
+                if (!it->second->isCloseFrame())
                     numRunning++;
             }
 
             if (numRunning > 0)
             {
-                for (auto it = _connections.cbegin(); it != _connections.cend(); )
+                for (auto it = _sessions.cbegin(); it != _sessions.cend(); )
                 {
-                    if (!it->second->isRunning())
+                    if (it->second->isCloseFrame())
                     {
-                        deadSessions.push_back(it->second->getSession());
-                        it = _connections.erase(it);
+                        deadSessions.push_back(it->second);
+                        it = _sessions.erase(it);
                     }
                     else
                     {
@@ -566,7 +519,7 @@ public:
                 }
             }
 
-            num_connections = _connections.size();
+            num_sessions = _sessions.size();
         }
 
         if (numRunning == 0)
@@ -581,7 +534,7 @@ public:
         // and the dtor tries to take its lock (which is taken).
         deadSessions.clear();
 
-        return num_connections;
+        return num_sessions;
     }
 
     /// Returns true if at least one *live* connection exists.
@@ -1027,11 +980,11 @@ private:
         std::unique_lock<std::mutex> lock(_mutex);
         std::map<int, std::string> viewInfo;
 
-        for (auto& connection : _connections)
+        for (auto& pair : _sessions)
         {
-            if (connection.second->isRunning())
+            const auto session = pair.second;
+            if (!session->isCloseFrame())
             {
-                const auto session = connection.second->getSession();
                 const auto viewId = session->getViewId();
                 viewInfo[viewId] = session->getViewUserName();
             }
@@ -1083,15 +1036,12 @@ private:
         viewInfoArray->stringify(ossViewInfo);
 
         // Broadcast updated viewinfo to all _active_ connections
-        for (auto& connectionIt: _connections)
+        for (auto& pair : _sessions)
         {
-            if (connectionIt.second->isRunning())
+            const auto session = pair.second;
+            if (!session->isCloseFrame() && session->isActive())
             {
-                auto session = connectionIt.second->getSession();
-                if (session->isActive())
-                {
-                    session->sendTextFrame("viewinfo: " + ossViewInfo.str());
-                }
+                session->sendTextFrame("viewinfo: " + ossViewInfo.str());
             }
         }
     }
@@ -1105,15 +1055,14 @@ private:
                                         const std::string& renderOpts,
                                         const bool haveDocPassword)
     {
-        const unsigned intSessionId = Util::decodeId(sessionId);
-        const auto it = _connections.find(intSessionId);
-        if (it == _connections.end() || !it->second)
+        const auto it = _sessions.find(sessionId);
+        if (it == _sessions.end() || !it->second)
         {
             Log::error("Cannot find session [" + sessionId + "].");
             return nullptr;
         }
 
-        auto session = it->second->getSession();
+        auto session = it->second;
         int viewId = 0;
         std::unique_lock<std::mutex> lockLokDoc;
 
@@ -1257,21 +1206,20 @@ private:
         Log::trace("Forwarding payload to " + prefix + ' ' + message);
 
         std::string name;
-        std::string value;
-        if (LOOLProtocol::parseNameValuePair(prefix, name, value, '-') && name == "child")
+        std::string viewId;
+        if (LOOLProtocol::parseNameValuePair(prefix, name, viewId, '-') && name == "child")
         {
-            const unsigned viewId = Util::decodeId(value);
-            const auto it = _connections.find(viewId);
-            if (it != _connections.end())
+            const auto it = _sessions.find(viewId);
+            if (it != _sessions.end())
             {
                 if (message == "disconnect")
                 {
-                    Log::debug("Removing ChildSession " + value);
-                    _connections.erase(it);
+                    Log::debug("Removing ChildSession " + viewId);
+                    _sessions.erase(it);
                     return true;
                 }
 
-                auto session = it->second->getSession();
+                auto session = it->second;
                 if (session)
                 {
                     return session->handleInput(message.data(), message.size());
@@ -1336,19 +1284,19 @@ private:
                     // Forward the callback to the same view, demultiplexing is done by the LibreOffice core.
                     // TODO: replace with a map to be faster.
                     bool isFound = false;
-                    for (auto& it : _connections)
+                    for (auto& it : _sessions)
                     {
-                        auto session = it.second->getSession();
+                        auto session = it.second;
                         if (session && ((session->getViewId() == viewId) || (viewId == -1)))
                         {
-                            if (it.second->isRunning())
+                            if (!it.second->isCloseFrame())
                             {
                                 isFound = true;
                                 session->loKitCallback(type, payload);
                             }
                             else
                             {
-                                Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view "
+                                Log::error() << "Connection thread for session " << session->getId() << " for view "
                                              << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type)
                                              << "] payload [" << payload << "]." << Log::end;
                             }
@@ -1404,7 +1352,7 @@ private:
     std::condition_variable _cvLoading;
     std::atomic_size_t _isLoading;
     std::map<int, std::unique_ptr<CallbackDescriptor>> _viewIdToCallbackDescr;
-    std::map<unsigned, std::shared_ptr<Connection>> _connections;
+    std::map<std::string, std::shared_ptr<ChildSession>> _sessions;
     Poco::Thread _callbackThread;
     std::atomic_size_t _clientViews;
 };
@@ -1628,7 +1576,6 @@ void lokit_main(const std::string& childRoot,
                     else if (tokens[0] == "session")
                     {
                         const std::string& sessionId = tokens[1];
-                        const unsigned intSessionId = Util::decodeId(sessionId);
                         const std::string& docKey = tokens[2];
 
                         std::string url;
@@ -1642,7 +1589,7 @@ void lokit_main(const std::string& childRoot,
 
                         // Validate and create session.
                         if (!(url == document->getUrl() &&
-                            document->createSession(sessionId, intSessionId)))
+                            document->createSession(sessionId)))
                         {
                             Log::debug("CreateSession failed.");
                         }


More information about the Libreoffice-commits mailing list