[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - 4 commits - wsd/DocumentBroker.cpp wsd/DocumentBroker.hpp wsd/LOOLWSD.cpp

Jan Holesovsky kendy at collabora.com
Mon Mar 6 18:12:47 UTC 2017


 wsd/DocumentBroker.cpp |   41 +++++++++++++++++++++++++++++++++++++++--
 wsd/DocumentBroker.hpp |   27 +++++++++++++++++++++++++--
 wsd/LOOLWSD.cpp        |    7 ++-----
 3 files changed, 66 insertions(+), 9 deletions(-)

New commits:
commit 93b2fd0b29221cfbb50433ce33f293b931fdd96d
Author: Jan Holesovsky <kendy at collabora.com>
Date:   Mon Mar 6 19:05:49 2017 +0100

    nb: Remember also the messages sent to sessions queued for creation.
    
    Without this, it was impossible to connect to an existing session, because we
    were trying to send messages to sessions that were not connected yet.
    
    Change-Id: If9260a1f0ac8f5387f492541548724b0065df9d9

diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index cbc83ef..fd711a9 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -214,10 +214,17 @@ void DocumentBroker::pollThread(std::shared_ptr<DocumentBroker> docBroker)
             if (docBroker->_newSessions.empty())
                 break;
 
-            std::shared_ptr<ClientSession> session = docBroker->_newSessions.front();
-            docBroker->_newSessions.pop_front();
+            NewSession& newSession = docBroker->_newSessions.front();
+            docBroker->addSession(newSession._session);
+
+            // now send the queued messages
+            for (const auto& message : newSession._messages)
+            {
+                LOG_DBG("Sending a queued message: " + message);
+                docBroker->_childProcess->sendTextFrame(message);
+            }
 
-            docBroker->addSession(session);
+            docBroker->_newSessions.pop_front();
         }
 
         docBroker->_poll.poll(5000);
@@ -681,7 +688,7 @@ size_t DocumentBroker::queueSession(std::shared_ptr<ClientSession>& session)
 {
     Util::assertIsLocked(_mutex);
 
-    _newSessions.push_back(session);
+    _newSessions.push_back(NewSession(session));
 
     return _sessions.size() + _newSessions.size();
 }
@@ -749,7 +756,7 @@ size_t DocumentBroker::removeSession(const std::string& id)
                 "]. Have " << _sessions.size() << " sessions.");
 
         // remove also from the _newSessions
-        _newSessions.erase(std::remove_if(_newSessions.begin(), _newSessions.end(), [&id](std::shared_ptr<ClientSession>& session) { return session->getId() == id; }),
+        _newSessions.erase(std::remove_if(_newSessions.begin(), _newSessions.end(), [&id](NewSession& newSession) { return newSession._session->getId() == id; }),
                            _newSessions.end());
 
         Admin::instance().rmDoc(_docKey, id);
@@ -1089,15 +1096,20 @@ bool DocumentBroker::forwardToChild(const std::string& viewId, const std::string
     LOG_TRC("Forwarding payload to child [" << viewId << "]: " << message);
 
     const auto it = _sessions.find(viewId);
+    const auto msg = "child-" + viewId + ' ' + message;
     if (it != _sessions.end())
     {
-        const auto msg = "child-" + viewId + ' ' + message;
         _childProcess->sendTextFrame(msg);
         return true;
     }
     else
     {
-        LOG_WRN("Client session [" << viewId << "] not found to forward message: " << message);
+        // try the not yet created sessions
+        const auto n = std::find_if(_newSessions.begin(), _newSessions.end(), [&viewId](NewSession& newSession) { return newSession._session->getId() == viewId; });
+        if (n != _newSessions.end())
+            n->_messages.push_back(msg);
+        else
+            LOG_WRN("Child session [" << viewId << "] not found to forward message: " << message);
     }
 
     return false;
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index e391d9b..c22eb91 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -374,7 +374,22 @@ private:
     /// The jailed file last-modified time.
     Poco::Timestamp _lastFileModifiedTime;
     std::map<std::string, std::shared_ptr<ClientSession> > _sessions;
-    std::deque<std::shared_ptr<ClientSession> > _newSessions;
+
+    /// Private class to remember what sessions we have to create, and what
+    /// messages to send to them after they are really created.
+    class NewSession {
+    public:
+        NewSession(const std::shared_ptr<ClientSession>& session) : _session(session)
+        {
+        }
+
+        std::shared_ptr<ClientSession> _session;
+        std::deque<std::string> _messages;
+    };
+
+    /// Sessions that are queued for addition.
+    std::deque<NewSession> _newSessions;
+
     std::unique_ptr<StorageBase> _storage;
     std::unique_ptr<TileCache> _tileCache;
     std::atomic<bool> _markToDestroy;
commit 645f174f0b84ac09cbdb1e43432a3fcce0c443ae
Author: Jan Holesovsky <kendy at collabora.com>
Date:   Mon Mar 6 17:01:17 2017 +0100

    nb: Remove also the queued sessions.
    
    Change-Id: I455fbbabfea9805d70fe909fd9b4078f02d21438

diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index a67152a..cbc83ef 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -748,6 +748,10 @@ size_t DocumentBroker::removeSession(const std::string& id)
         LOG_INF("Removing session [" << id << "] on docKey [" << _docKey <<
                 "]. Have " << _sessions.size() << " sessions.");
 
+        // remove also from the _newSessions
+        _newSessions.erase(std::remove_if(_newSessions.begin(), _newSessions.end(), [&id](std::shared_ptr<ClientSession>& session) { return session->getId() == id; }),
+                           _newSessions.end());
+
         Admin::instance().rmDoc(_docKey, id);
 
         auto it = _sessions.find(id);
commit 5ae8d47cb1b08a73dab9c140cfaa9ae9915b365e
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Mon Mar 6 16:05:11 2017 +0100

    nb: Fix race when loading a document.
    
    Don't block when creating new sessions, instead queue the requests, and handle
    them in the DocumentBroker.
    
    Change-Id: I200bbfc740f004c37178fa316d1eb91afdde5d4a

diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index f1715aa..a67152a 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -208,6 +208,18 @@ void DocumentBroker::pollThread(std::shared_ptr<DocumentBroker> docBroker)
     // Main polling loop goodness.
     while (!docBroker->_stop && !TerminationFlag && !ShutdownRequestFlag)
     {
+        while (true)
+        {
+            std::unique_lock<std::mutex> lock(docBroker->_mutex);
+            if (docBroker->_newSessions.empty())
+                break;
+
+            std::shared_ptr<ClientSession> session = docBroker->_newSessions.front();
+            docBroker->_newSessions.pop_front();
+
+            docBroker->addSession(session);
+        }
+
         docBroker->_poll.poll(5000);
     }
 }
@@ -665,6 +677,15 @@ std::string DocumentBroker::getJailRoot() const
     return Poco::Path(_childRoot, _jailId).toString();
 }
 
+size_t DocumentBroker::queueSession(std::shared_ptr<ClientSession>& session)
+{
+    Util::assertIsLocked(_mutex);
+
+    _newSessions.push_back(session);
+
+    return _sessions.size() + _newSessions.size();
+}
+
 size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session)
 {
     Util::assertIsLocked(_mutex);
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index 6981d4e..e391d9b 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -15,6 +15,7 @@
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
+#include <deque>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -259,8 +260,11 @@ public:
 
     std::string getJailRoot() const;
 
-    /// Add a new session. Returns the new number of sessions.
-    size_t addSession(std::shared_ptr<ClientSession>& session);
+    /// Queue a new session to be added asynchronously.
+    /// @return amount of session we have after all the queued ones will be
+    /// created.
+    size_t queueSession(std::shared_ptr<ClientSession>& session);
+
     /// Removes a session by ID. Returns the new number of sessions.
     size_t removeSession(const std::string& id);
 
@@ -342,6 +346,9 @@ private:
     /// Forward a message from child session to its respective client session.
     bool forwardToClient(const std::shared_ptr<Message>& payload);
 
+    /// Add a new session. Returns the new number of sessions.
+    size_t addSession(std::shared_ptr<ClientSession>& session);
+
     /// The thread function that all of the I/O for all sessions
     /// associated with this document.
     static void pollThread(std::shared_ptr<DocumentBroker> docBroker);
@@ -367,6 +374,7 @@ private:
     /// The jailed file last-modified time.
     Poco::Timestamp _lastFileModifiedTime;
     std::map<std::string, std::shared_ptr<ClientSession> > _sessions;
+    std::deque<std::shared_ptr<ClientSession> > _newSessions;
     std::unique_ptr<StorageBase> _storage;
     std::unique_ptr<TileCache> _tileCache;
     std::atomic<bool> _markToDestroy;
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 2fcd2b3..eb6a270 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1606,7 +1606,7 @@ static std::shared_ptr<ClientSession> createNewClientSession(const WebSocketHand
         // (UserCanWrite param).
         auto session = std::make_shared<ClientSession>(id, docBroker, uriPublic, isReadOnly);
 
-        docBroker->addSession(session);
+        docBroker->queueSession(session);
 
         lock.unlock();
 
@@ -1988,7 +1988,7 @@ private:
                     auto session = std::make_shared<ClientSession>(_id, docBroker, uriPublic);
 
                     auto lock = docBroker->getLock();
-                    auto sessionsCount = docBroker->addSession(session);
+                    size_t sessionsCount = docBroker->queueSession(session);
                     lock.unlock();
                     LOG_TRC(docKey << ", ws_sessions++: " << sessionsCount);
 
@@ -2228,9 +2228,7 @@ private:
         {
             _clientSession = createNewClientSession(ws, _id, uriPublic, docBroker, isReadOnly);
             if (_clientSession)
-            {
                 _clientSession->onConnect(_socket);
-            }
         }
         if (!docBroker || !_clientSession)
             LOG_WRN("Failed to connect DocBroker and Client Session.");
commit 5834dbdde720ac88f35021ea918a60212cb69257
Author: Jan Holesovsky <kendy at collabora.com>
Date:   Mon Mar 6 15:31:00 2017 +0100

    Revert "Horror hack to avoid race for now."
    
    This reverts commit e977667754ba34b83ba0259d6568aa88ae5995df.

diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 68bc905..2fcd2b3 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2226,7 +2226,6 @@ private:
         auto docBroker = findOrCreateDocBroker(ws, url, docKey, _id, uriPublic);
         if (docBroker)
         {
-            sleep(1); // FIXME: horror hack - wait for the kit process ...
             _clientSession = createNewClientSession(ws, _id, uriPublic, docBroker, isReadOnly);
             if (_clientSession)
             {


More information about the Libreoffice-commits mailing list