[Libreoffice-commits] online.git: loolwsd/ChildProcessSession.cpp loolwsd/ChildProcessSession.hpp loolwsd/LOOLKit.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Sun Jan 24 15:58:12 PST 2016


 loolwsd/ChildProcessSession.cpp |  276 +++++++++++++++++++++++++++++++++++++++-
 loolwsd/ChildProcessSession.hpp |   11 +
 loolwsd/LOOLKit.cpp             |  275 ---------------------------------------
 3 files changed, 287 insertions(+), 275 deletions(-)

New commits:
commit 65e06c1db7d494f23e9219295b4c6adf8012dbff
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Sun Jan 24 18:25:02 2016 -0500

    loolwsd: per ChildProcessSession callback queue and thread
    
    By giving dedicated queue to each session we eliminate
    the bottleneck that a slow client will introduce
    on every other session on the same document.
    
    Change-Id: I715b80a8cd7bbef1268dc472d0b32e35f3dd6444
    Reviewed-on: https://gerrit.libreoffice.org/21763
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/loolwsd/ChildProcessSession.cpp b/loolwsd/ChildProcessSession.cpp
index e89eb82..7291d99 100644
--- a/loolwsd/ChildProcessSession.cpp
+++ b/loolwsd/ChildProcessSession.cpp
@@ -7,12 +7,16 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  */
 
+#include <sys/prctl.h>
 #include <iostream>
 
+#include <Poco/Exception.h>
 #include <Poco/File.h>
 #include <Poco/JSON/Object.h>
 #include <Poco/JSON/Parser.h>
 #include <Poco/Net/WebSocket.h>
+#include <Poco/Notification.h>
+#include <Poco/NotificationQueue.h>
 #include <Poco/Path.h>
 #include <Poco/Process.h>
 #include <Poco/String.h>
@@ -28,10 +32,13 @@
 
 using namespace LOOLProtocol;
 
+using Poco::Exception;
 using Poco::File;
 using Poco::IOException;
 using Poco::JSON::Object;
 using Poco::JSON::Parser;
+using Poco::Notification;
+using Poco::NotificationQueue;
 using Poco::Net::WebSocket;
 using Poco::Path;
 using Poco::Process;
@@ -39,6 +46,260 @@ using Poco::ProcessHandle;
 using Poco::StringTokenizer;
 using Poco::URI;
 
+class CallbackNotification: public Poco::Notification
+{
+public:
+    typedef Poco::AutoPtr<CallbackNotification> Ptr;
+
+    CallbackNotification(const int nType, const std::string& rPayload)
+      : _nType(nType),
+        _aPayload(rPayload)
+    {
+    }
+
+    const int _nType;
+    const std::string _aPayload;
+};
+
+// This thread handles callbacks from the
+// lokit instance.
+class CallbackWorker: public Poco::Runnable
+{
+public:
+    CallbackWorker(NotificationQueue& queue, ChildProcessSession& session):
+        _queue(queue),
+        _session(session),
+        _stop(false)
+    {
+    }
+
+    std::string callbackTypeToString (const int nType)
+    {
+        switch (nType)
+        {
+        case LOK_CALLBACK_INVALIDATE_TILES:
+            return std::string("LOK_CALLBACK_INVALIDATE_TILES");
+        case LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR:
+            return std::string("LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR");
+        case LOK_CALLBACK_TEXT_SELECTION:
+            return std::string("LOK_CALLBACK_TEXT_SELECTION");
+        case LOK_CALLBACK_TEXT_SELECTION_START:
+            return std::string("LOK_CALLBACK_TEXT_SELECTION_START");
+        case LOK_CALLBACK_TEXT_SELECTION_END:
+            return std::string("LOK_CALLBACK_TEXT_SELECTION_END");
+        case LOK_CALLBACK_CURSOR_VISIBLE:
+            return std::string("LOK_CALLBACK_CURSOR_VISIBLE");
+        case LOK_CALLBACK_GRAPHIC_SELECTION:
+            return std::string("LOK_CALLBACK_GRAPHIC_SELECTION");
+        case LOK_CALLBACK_CELL_CURSOR:
+            return std::string("LOK_CALLBACK_CELL_CURSOR");
+        case LOK_CALLBACK_CELL_FORMULA:
+            return std::string("LOK_CALLBACK_CELL_FORMULA");
+        case LOK_CALLBACK_MOUSE_POINTER:
+            return std::string("LOK_CALLBACK_MOUSE_POINTER");
+        case LOK_CALLBACK_SEARCH_RESULT_SELECTION:
+            return std::string("LOK_CALLBACK_SEARCH_RESULT_SELECTION");
+        case LOK_CALLBACK_UNO_COMMAND_RESULT:
+            return std::string("LOK_CALLBACK_UNO_COMMAND_RESULT");
+        case LOK_CALLBACK_HYPERLINK_CLICKED:
+            return std::string("LOK_CALLBACK_HYPERLINK_CLICKED");
+        case LOK_CALLBACK_STATE_CHANGED:
+            return std::string("LOK_CALLBACK_STATE_CHANGED");
+        case LOK_CALLBACK_STATUS_INDICATOR_START:
+            return std::string("LOK_CALLBACK_STATUS_INDICATOR_START");
+        case LOK_CALLBACK_STATUS_INDICATOR_SET_VALUE:
+            return std::string("LOK_CALLBACK_STATUS_INDICATOR_SET_VALUE");
+        case LOK_CALLBACK_STATUS_INDICATOR_FINISH:
+            return std::string("LOK_CALLBACK_STATUS_INDICATOR_FINISH");
+        case LOK_CALLBACK_SEARCH_NOT_FOUND:
+            return std::string("LOK_CALLBACK_SEARCH_NOT_FOUND");
+        case LOK_CALLBACK_DOCUMENT_SIZE_CHANGED:
+            return std::string("LOK_CALLBACK_DOCUMENT_SIZE_CHANGED");
+        case LOK_CALLBACK_SET_PART:
+            return std::string("LOK_CALLBACK_SET_PART");
+        }
+        return std::to_string(nType);
+    }
+
+    void callback(const int nType, const std::string& rPayload)
+    {
+        auto lock = _session.getLock();
+
+        Log::trace() << "Callback [" << _session.getViewId() << "] "
+                     << callbackTypeToString(nType)
+                     << " [" << rPayload << "]." << Log::end;
+        if (_session.isDisconnected())
+        {
+            Log::trace("Skipping callback on disconnected session " + _session.getName());
+            return;
+        }
+        else if (_session.isInactive())
+        {
+            Log::trace("Skipping callback on inactive session " + _session.getName());
+            return;
+        }
+
+        switch (static_cast<LibreOfficeKitCallbackType>(nType))
+        {
+        case LOK_CALLBACK_INVALIDATE_TILES:
+            {
+                int curPart = _session.getLoKitDocument()->pClass->getPart(_session.getLoKitDocument());
+                _session.sendTextFrame("curpart: part=" + std::to_string(curPart));
+                if (_session.getDocType() == "text")
+                {
+                    curPart = 0;
+                }
+
+                StringTokenizer tokens(rPayload, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+                if (tokens.count() == 4)
+                {
+                    int x, y, width, height;
+
+                    try
+                    {
+                        x = std::stoi(tokens[0]);
+                        y = std::stoi(tokens[1]);
+                        width = std::stoi(tokens[2]);
+                        height = std::stoi(tokens[3]);
+                    }
+                    catch (const std::out_of_range&)
+                    {
+                        // something went wrong, invalidate everything
+                        Log::warn("Ignoring integer values out of range: " + rPayload);
+                        x = 0;
+                        y = 0;
+                        width = INT_MAX;
+                        height = INT_MAX;
+                    }
+
+                    _session.sendTextFrame("invalidatetiles:"
+                                       " part=" + std::to_string(curPart) +
+                                       " x=" + std::to_string(x) +
+                                       " y=" + std::to_string(y) +
+                                       " width=" + std::to_string(width) +
+                                       " height=" + std::to_string(height));
+                }
+                else
+                {
+                    _session.sendTextFrame("invalidatetiles: " + rPayload);
+                }
+            }
+            break;
+        case LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR:
+            _session.sendTextFrame("invalidatecursor: " + rPayload);
+            break;
+        case LOK_CALLBACK_TEXT_SELECTION:
+            _session.sendTextFrame("textselection: " + rPayload);
+            break;
+        case LOK_CALLBACK_TEXT_SELECTION_START:
+            _session.sendTextFrame("textselectionstart: " + rPayload);
+            break;
+        case LOK_CALLBACK_TEXT_SELECTION_END:
+            _session.sendTextFrame("textselectionend: " + rPayload);
+            break;
+        case LOK_CALLBACK_CURSOR_VISIBLE:
+            _session.sendTextFrame("cursorvisible: " + rPayload);
+            break;
+        case LOK_CALLBACK_GRAPHIC_SELECTION:
+            _session.sendTextFrame("graphicselection: " + rPayload);
+            break;
+        case LOK_CALLBACK_CELL_CURSOR:
+            _session.sendTextFrame("cellcursor: " + rPayload);
+            break;
+        case LOK_CALLBACK_CELL_FORMULA:
+            _session.sendTextFrame("cellformula: " + rPayload);
+            break;
+        case LOK_CALLBACK_MOUSE_POINTER:
+            _session.sendTextFrame("mousepointer: " + rPayload);
+            break;
+        case LOK_CALLBACK_HYPERLINK_CLICKED:
+            _session.sendTextFrame("hyperlinkclicked: " + rPayload);
+            break;
+        case LOK_CALLBACK_STATE_CHANGED:
+            _session.sendTextFrame("statechanged: " + rPayload);
+            break;
+        case LOK_CALLBACK_STATUS_INDICATOR_START:
+            _session.sendTextFrame("statusindicatorstart:");
+            break;
+        case LOK_CALLBACK_STATUS_INDICATOR_SET_VALUE:
+            _session.sendTextFrame("statusindicatorsetvalue: " + rPayload);
+            break;
+        case LOK_CALLBACK_STATUS_INDICATOR_FINISH:
+            _session.sendTextFrame("statusindicatorfinish:");
+            break;
+        case LOK_CALLBACK_SEARCH_NOT_FOUND:
+            _session.sendTextFrame("searchnotfound: " + rPayload);
+            break;
+        case LOK_CALLBACK_SEARCH_RESULT_SELECTION:
+            _session.sendTextFrame("searchresultselection: " + rPayload);
+            break;
+        case LOK_CALLBACK_DOCUMENT_SIZE_CHANGED:
+            _session.getStatus("", 0);
+            _session.getPartPageRectangles("", 0);
+            break;
+        case LOK_CALLBACK_SET_PART:
+            _session.sendTextFrame("setpart: " + rPayload);
+            break;
+        case LOK_CALLBACK_UNO_COMMAND_RESULT:
+            _session.sendTextFrame("unocommandresult: " + rPayload);
+            break;
+        }
+    }
+
+    void run()
+    {
+        static const std::string thread_name = "kit_callback";
+#ifdef __linux
+        if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
+            Log::error("Cannot set thread name to " + thread_name + ".");
+#endif
+        Log::debug("Thread [" + thread_name + "] started.");
+
+        while (!_stop && !TerminationFlag)
+        {
+            Notification::Ptr aNotification(_queue.waitDequeueNotification());
+            if (!_stop && !TerminationFlag && aNotification)
+            {
+                CallbackNotification::Ptr aCallbackNotification = aNotification.cast<CallbackNotification>();
+                assert(aCallbackNotification);
+
+                const auto nType = aCallbackNotification->_nType;
+                try
+                {
+                    callback(nType, aCallbackNotification->_aPayload);
+                }
+                catch (const Exception& exc)
+                {
+                    Log::error() << "Error while handling callback [" << callbackTypeToString(nType) << "]. "
+                                 << exc.displayText()
+                                 << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
+                                 << Log::end;
+                }
+                catch (const std::exception& exc)
+                {
+                    Log::error("Error while handling callback [" + callbackTypeToString(nType) + "]. " +
+                               std::string("Exception: ") + exc.what());
+                }
+            }
+            else
+                break;
+        }
+
+        Log::debug("Thread [" + thread_name + "] finished.");
+    }
+
+    void stop()
+    {
+        _stop = true;
+        _queue.wakeUpAll();
+    }
+
+private:
+    NotificationQueue& _queue;
+    ChildProcessSession& _session;
+    volatile bool _stop;
+};
+
 std::recursive_mutex ChildProcessSession::Mutex;
 
 ChildProcessSession::ChildProcessSession(const std::string& id,
@@ -55,9 +316,12 @@ ChildProcessSession::ChildProcessSession(const std::string& id,
     _viewId(0),
     _clientPart(0),
     _onLoad(onLoad),
-    _onUnload(onUnload)
+    _onUnload(onUnload),
+    _callbackWorker(new CallbackWorker(_callbackQueue, *this))
 {
     Log::info("ChildProcessSession ctor [" + getName() + "].");
+
+    _callbackThread.start(*_callbackWorker);
 }
 
 ChildProcessSession::~ChildProcessSession()
@@ -65,6 +329,10 @@ ChildProcessSession::~ChildProcessSession()
     Log::info("~ChildProcessSession dtor [" + getName() + "].");
 
     disconnect();
+
+    // Wait for the callback worker to finish.
+    _callbackWorker->stop();
+    _callbackThread.join();
 }
 
 void ChildProcessSession::disconnect(const std::string& reason)
@@ -979,4 +1247,10 @@ bool ChildProcessSession::setPage(const char* /*buffer*/, int /*length*/, String
     return true;
 }
 
+void ChildProcessSession::loKitCallback(const int nType, const char *pPayload)
+{
+    auto pNotif = new CallbackNotification(nType, pPayload ? pPayload : "(nil)");
+    _callbackQueue.enqueueNotification(pNotif);
+}
+
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/loolwsd/ChildProcessSession.hpp b/loolwsd/ChildProcessSession.hpp
index f0b47b3..fb622dd 100644
--- a/loolwsd/ChildProcessSession.hpp
+++ b/loolwsd/ChildProcessSession.hpp
@@ -15,12 +15,15 @@
 #define LOK_USE_UNSTABLE_API
 #include <LibreOfficeKit/LibreOfficeKit.h>
 
+#include <Poco/Thread.h>
 #include <Poco/NotificationQueue.h>
 #include "LOOLSession.hpp"
 
 // The client port number, which is changed via loolwsd args.
 static int ClientPortNumber = DEFAULT_CLIENT_PORT_NUMBER;
 
+class CallbackWorker;
+
 class ChildProcessSession final : public LOOLSession
 {
 public:
@@ -79,6 +82,8 @@ public:
 
     LibreOfficeKitDocument *getLoKitDocument() const { return _loKitDocument; }
 
+    void loKitCallback(const int nType, const char* pPayload);
+
     std::unique_lock<std::recursive_mutex> getLock() { return std::unique_lock<std::recursive_mutex>(Mutex); }
 
     const Statistics& getStatistics() const { return _stats; }
@@ -126,8 +131,12 @@ private:
     /// Statistics and activity tracking.
     Statistics _stats;
 
+    std::unique_ptr<CallbackWorker> _callbackWorker;
+    Poco::Thread _callbackThread;
+    Poco::NotificationQueue _callbackQueue;
+
     /// Synchronize _loKitDocument acess.
-    /// This should be inside LoKit.
+    /// This should be owned by Document.
     static std::recursive_mutex Mutex;
 
     static constexpr auto InactivityThresholdMS = 120 * 1000;
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index f542ec2..6764c29 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -61,260 +61,6 @@ using Poco::FastMutex;
 const std::string CHILD_URI = "/loolws/child/";
 const std::string LOKIT_BROKER = "/tmp/loolbroker.fifo";
 
-class CallBackNotification: public Poco::Notification
-{
-public:
-    typedef Poco::AutoPtr<CallBackNotification> Ptr;
-
-    CallBackNotification(const int nType, const std::string& rPayload, std::shared_ptr<ChildProcessSession>& pSession)
-      : m_nType(nType),
-        m_aPayload(rPayload),
-        m_pSession(pSession)
-    {
-    }
-
-    const int m_nType;
-    const std::string m_aPayload;
-    const std::shared_ptr<ChildProcessSession> m_pSession;
-};
-
-// This thread handles callbacks from the
-// lokit instance.
-class CallBackWorker: public Runnable
-{
-public:
-    CallBackWorker(NotificationQueue& queue):
-        _queue(queue),
-        _stop(false)
-    {
-    }
-
-    std::string callbackTypeToString (const int nType)
-    {
-        switch (nType)
-        {
-        case LOK_CALLBACK_INVALIDATE_TILES:
-            return std::string("LOK_CALLBACK_INVALIDATE_TILES");
-        case LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR:
-            return std::string("LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR");
-        case LOK_CALLBACK_TEXT_SELECTION:
-            return std::string("LOK_CALLBACK_TEXT_SELECTION");
-        case LOK_CALLBACK_TEXT_SELECTION_START:
-            return std::string("LOK_CALLBACK_TEXT_SELECTION_START");
-        case LOK_CALLBACK_TEXT_SELECTION_END:
-            return std::string("LOK_CALLBACK_TEXT_SELECTION_END");
-        case LOK_CALLBACK_CURSOR_VISIBLE:
-            return std::string("LOK_CALLBACK_CURSOR_VISIBLE");
-        case LOK_CALLBACK_GRAPHIC_SELECTION:
-            return std::string("LOK_CALLBACK_GRAPHIC_SELECTION");
-        case LOK_CALLBACK_CELL_CURSOR:
-            return std::string("LOK_CALLBACK_CELL_CURSOR");
-        case LOK_CALLBACK_CELL_FORMULA:
-            return std::string("LOK_CALLBACK_CELL_FORMULA");
-        case LOK_CALLBACK_MOUSE_POINTER:
-            return std::string("LOK_CALLBACK_MOUSE_POINTER");
-        case LOK_CALLBACK_SEARCH_RESULT_SELECTION:
-            return std::string("LOK_CALLBACK_SEARCH_RESULT_SELECTION");
-        case LOK_CALLBACK_UNO_COMMAND_RESULT:
-            return std::string("LOK_CALLBACK_UNO_COMMAND_RESULT");
-        case LOK_CALLBACK_HYPERLINK_CLICKED:
-            return std::string("LOK_CALLBACK_HYPERLINK_CLICKED");
-        case LOK_CALLBACK_STATE_CHANGED:
-            return std::string("LOK_CALLBACK_STATE_CHANGED");
-        case LOK_CALLBACK_STATUS_INDICATOR_START:
-            return std::string("LOK_CALLBACK_STATUS_INDICATOR_START");
-        case LOK_CALLBACK_STATUS_INDICATOR_SET_VALUE:
-            return std::string("LOK_CALLBACK_STATUS_INDICATOR_SET_VALUE");
-        case LOK_CALLBACK_STATUS_INDICATOR_FINISH:
-            return std::string("LOK_CALLBACK_STATUS_INDICATOR_FINISH");
-        case LOK_CALLBACK_SEARCH_NOT_FOUND:
-            return std::string("LOK_CALLBACK_SEARCH_NOT_FOUND");
-        case LOK_CALLBACK_DOCUMENT_SIZE_CHANGED:
-            return std::string("LOK_CALLBACK_DOCUMENT_SIZE_CHANGED");
-        case LOK_CALLBACK_SET_PART:
-            return std::string("LOK_CALLBACK_SET_PART");
-        }
-        return std::to_string(nType);
-    }
-
-    void callback(const int nType, const std::string& rPayload, std::shared_ptr<ChildProcessSession> pSession)
-    {
-        auto lock = pSession->getLock();
-
-        Log::trace() << "Callback [" << pSession->getViewId() << "] "
-                     << callbackTypeToString(nType)
-                     << " [" << rPayload << "]." << Log::end;
-        if (pSession->isDisconnected())
-        {
-            Log::trace("Skipping callback on disconnected session " + pSession->getName());
-            return;
-        }
-        else if (pSession->isInactive())
-        {
-            Log::trace("Skipping callback on inactive session " + pSession->getName());
-            return;
-        }
-
-        switch (static_cast<LibreOfficeKitCallbackType>(nType))
-        {
-        case LOK_CALLBACK_INVALIDATE_TILES:
-            {
-                int curPart = pSession->getLoKitDocument()->pClass->getPart(pSession->getLoKitDocument());
-                pSession->sendTextFrame("curpart: part=" + std::to_string(curPart));
-                if (pSession->getDocType() == "text")
-                {
-                    curPart = 0;
-                }
-
-                StringTokenizer tokens(rPayload, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
-                if (tokens.count() == 4)
-                {
-                    int x, y, width, height;
-
-                    try
-                    {
-                        x = std::stoi(tokens[0]);
-                        y = std::stoi(tokens[1]);
-                        width = std::stoi(tokens[2]);
-                        height = std::stoi(tokens[3]);
-                    }
-                    catch (const std::out_of_range&)
-                    {
-                        // something went wrong, invalidate everything
-                        Log::warn("Ignoring integer values out of range: " + rPayload);
-                        x = 0;
-                        y = 0;
-                        width = INT_MAX;
-                        height = INT_MAX;
-                    }
-
-                    pSession->sendTextFrame("invalidatetiles:"
-                                       " part=" + std::to_string(curPart) +
-                                       " x=" + std::to_string(x) +
-                                       " y=" + std::to_string(y) +
-                                       " width=" + std::to_string(width) +
-                                       " height=" + std::to_string(height));
-                }
-                else
-                {
-                    pSession->sendTextFrame("invalidatetiles: " + rPayload);
-                }
-            }
-            break;
-        case LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR:
-            pSession->sendTextFrame("invalidatecursor: " + rPayload);
-            break;
-        case LOK_CALLBACK_TEXT_SELECTION:
-            pSession->sendTextFrame("textselection: " + rPayload);
-            break;
-        case LOK_CALLBACK_TEXT_SELECTION_START:
-            pSession->sendTextFrame("textselectionstart: " + rPayload);
-            break;
-        case LOK_CALLBACK_TEXT_SELECTION_END:
-            pSession->sendTextFrame("textselectionend: " + rPayload);
-            break;
-        case LOK_CALLBACK_CURSOR_VISIBLE:
-            pSession->sendTextFrame("cursorvisible: " + rPayload);
-            break;
-        case LOK_CALLBACK_GRAPHIC_SELECTION:
-            pSession->sendTextFrame("graphicselection: " + rPayload);
-            break;
-        case LOK_CALLBACK_CELL_CURSOR:
-            pSession->sendTextFrame("cellcursor: " + rPayload);
-            break;
-        case LOK_CALLBACK_CELL_FORMULA:
-            pSession->sendTextFrame("cellformula: " + rPayload);
-            break;
-        case LOK_CALLBACK_MOUSE_POINTER:
-            pSession->sendTextFrame("mousepointer: " + rPayload);
-            break;
-        case LOK_CALLBACK_HYPERLINK_CLICKED:
-            pSession->sendTextFrame("hyperlinkclicked: " + rPayload);
-            break;
-        case LOK_CALLBACK_STATE_CHANGED:
-            pSession->sendTextFrame("statechanged: " + rPayload);
-            break;
-        case LOK_CALLBACK_STATUS_INDICATOR_START:
-            pSession->sendTextFrame("statusindicatorstart:");
-            break;
-        case LOK_CALLBACK_STATUS_INDICATOR_SET_VALUE:
-            pSession->sendTextFrame("statusindicatorsetvalue: " + rPayload);
-            break;
-        case LOK_CALLBACK_STATUS_INDICATOR_FINISH:
-            pSession->sendTextFrame("statusindicatorfinish:");
-            break;
-        case LOK_CALLBACK_SEARCH_NOT_FOUND:
-            pSession->sendTextFrame("searchnotfound: " + rPayload);
-            break;
-        case LOK_CALLBACK_SEARCH_RESULT_SELECTION:
-            pSession->sendTextFrame("searchresultselection: " + rPayload);
-            break;
-        case LOK_CALLBACK_DOCUMENT_SIZE_CHANGED:
-            pSession->getStatus("", 0);
-            pSession->getPartPageRectangles("", 0);
-            break;
-        case LOK_CALLBACK_SET_PART:
-            pSession->sendTextFrame("setpart: " + rPayload);
-            break;
-        case LOK_CALLBACK_UNO_COMMAND_RESULT:
-            pSession->sendTextFrame("unocommandresult: " + rPayload);
-            break;
-        }
-    }
-
-    void run()
-    {
-        static const std::string thread_name = "kit_callback";
-#ifdef __linux
-        if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0)
-            Log::error("Cannot set thread name to " + thread_name + ".");
-#endif
-        Log::debug("Thread [" + thread_name + "] started.");
-
-        while (!_stop && !TerminationFlag)
-        {
-            Notification::Ptr aNotification(_queue.waitDequeueNotification());
-            if (!_stop && !TerminationFlag && aNotification)
-            {
-                CallBackNotification::Ptr aCallBackNotification = aNotification.cast<CallBackNotification>();
-                assert(aCallBackNotification);
-
-                const auto nType = aCallBackNotification->m_nType;
-                try
-                {
-                    callback(nType, aCallBackNotification->m_aPayload, aCallBackNotification->m_pSession);
-                }
-                catch (const Exception& exc)
-                {
-                    Log::error() << "Error while handling callback [" << callbackTypeToString(nType) << "]. "
-                                 << exc.displayText()
-                                 << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
-                                 << Log::end;
-                }
-                catch (const std::exception& exc)
-                {
-                    Log::error("Error while handling callback [" + callbackTypeToString(nType) + "]. " +
-                               std::string("Exception: ") + exc.what());
-                }
-            }
-            else
-                break;
-        }
-
-        Log::debug("Thread [" + thread_name + "] finished.");
-    }
-
-    void stop()
-    {
-        _stop = true;
-        _queue.wakeUpAll();
-    }
-
-private:
-    NotificationQueue& _queue;
-    volatile bool _stop;
-};
-
 class Connection: public Runnable
 {
 public:
@@ -479,13 +225,10 @@ public:
         _jailId(jailId),
         _url(url),
         _loKitDocument(nullptr),
-        _clientViews(0),
-        _callbackWorker(CallbackQueue)
+        _clientViews(0)
     {
         Log::info("Document ctor for url [" + _url + "] on child [" + _jailId +
                   "] LOK_VIEW_CALLBACK=" + std::to_string(_multiView) + ".");
-
-        _callbackThread.start(_callbackWorker);
     }
 
     ~Document()
@@ -495,10 +238,6 @@ public:
         Log::info("~Document dtor for url [" + _url + "] on child [" + _jailId +
                   "]. There are " + std::to_string(_clientViews) + " views.");
 
-        // Wait for the callback worker to finish.
-        _callbackWorker.stop();
-        _callbackThread.join();
-
         // Flag all connections to stop.
         for (auto aIterator : _connections)
         {
@@ -638,9 +377,6 @@ private:
     static void ViewCallback(int , const char* , void* )
     {
         //TODO: Delegate the callback.
-        //const unsigned intSessionId = reinterpret_cast<unsigned>(pData);
-        //auto pNotif = new CallBackNotification(nType, pPayload ? pPayload : "(nil)", pData);
-        //_callbackQueue.enqueueNotification(pNotif);
     }
 
     static void DocumentCallback(int nType, const char* pPayload, void* pData)
@@ -657,8 +393,7 @@ private:
                     auto session = it.second->getSession();
                     if (session)
                     {
-                        auto pNotif = new CallBackNotification(nType, pPayload ? pPayload : "(nil)", session);
-                        CallbackQueue.enqueueNotification(pNotif);
+                        session->loKitCallback(nType, pPayload);
                     }
                 }
             }
@@ -758,14 +493,8 @@ private:
     std::recursive_mutex _mutex;
     std::map<unsigned, std::shared_ptr<Connection>> _connections;
     std::atomic<unsigned> _clientViews;
-
-    CallBackWorker _callbackWorker;
-    Thread _callbackThread;
-    static Poco::NotificationQueue CallbackQueue;
 };
 
-Poco::NotificationQueue Document::CallbackQueue;
-
 void lokit_main(const std::string &loSubPath, const std::string& jailId, const std::string& pipe)
 {
 #ifdef LOOLKIT_NO_MAIN


More information about the Libreoffice-commits mailing list