[Libreoffice-commits] online.git: 2 commits - common/MessageQueue.hpp kit/Kit.cpp

Libreoffice Gerrit user logerrit at kemper.freedesktop.org
Fri May 10 14:14:00 UTC 2019


 common/MessageQueue.hpp |    7 ++
 kit/Kit.cpp             |  129 +++++++++++++++++++++++++++++-------------------
 2 files changed, 86 insertions(+), 50 deletions(-)

New commits:
commit 73dc711e0ade81c1238281a290a59a7c28cabab4
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Sat May 4 14:51:14 2019 +0100
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri May 10 15:13:40 2019 +0100

    Unipoll: terminate repeated polling loop on wakeup.
    
    Wakeup wakes up the nested SocketPoll::poll nicely, but that's no
    use if we immediately ignore that and re-poll, so shorten the
    timeout in this case.
    
    Change-Id: I927d2375b92c9ce6c6ebe3f0ab33e2863894e2ef

diff --git a/common/MessageQueue.hpp b/common/MessageQueue.hpp
index ac8d4e51f..f07b4be2a 100644
--- a/common/MessageQueue.hpp
+++ b/common/MessageQueue.hpp
@@ -88,6 +88,13 @@ public:
         return get_impl();
     }
 
+    /// Anything in the queue ?
+    bool isEmpty()
+    {
+        std::unique_lock<std::mutex> lock(_mutex);
+        return _queue.size() == 0;
+    }
+
     /// Thread safe removal of all the pending messages.
     void clear()
     {
diff --git a/kit/Kit.cpp b/kit/Kit.cpp
index c75047a8e..216c8d757 100644
--- a/kit/Kit.cpp
+++ b/kit/Kit.cpp
@@ -1983,15 +1983,18 @@ private:
     }
 
 public:
+    bool hasQueued()
+    {
+        return !_tileQueue->isEmpty();
+    }
+
     void drainQueue(const std::chrono::steady_clock::time_point &now)
     {
         try
         {
-            while (true)
+            while (hasQueued())
             {
                 const TileQueue::Payload input = _tileQueue->pop();
-                if (input.size() <= 0)
-                    break;
 
                 LOG_TRC("Kit Recv " << LOOLProtocol::getAbbreviatedMessage(input));
 
@@ -2327,71 +2330,99 @@ void documentViewCallback(const int type, const char* payload, void* data)
     Document::ViewCallback(type, payload, data);
 }
 
-/// Called by LOK main-loop the central location for data processing.
-int pollCallback(void* pData, int timeoutUs)
+class KitSocketPoll : public SocketPoll
 {
-    if (!pData)
-        return 0;
-
-    // The maximum number of extra events to process beyond the first.
-    int maxExtraEvents = 15;
-    int eventsSignalled = 0;
+    std::chrono::steady_clock::time_point _pollEnd;
+public:
+    KitSocketPoll() :
+        SocketPoll("kit")
+    {
+    }
 
-    int timeoutMs = timeoutUs / 1000;
+    // process pending message-queue events.
+    void drainQueue(const std::chrono::steady_clock::time_point &now)
+    {
+        if (document)
+            document->drainQueue(now);
+    }
 
-    SocketPoll* pSocketPoll = reinterpret_cast<SocketPoll*>(pData);
-    if (timeoutMs < 0)
+    // called from inside poll, inside a wakeup
+    void wakeupHook()
     {
-        // Flush at most 1 + maxExtraEvents, or return when nothing left.
-        while (pSocketPoll->poll(0) > 0 && maxExtraEvents-- > 0)
-            ++eventsSignalled;
+        _pollEnd = std::chrono::steady_clock::now();
     }
-    else
+
+    // a LOK compatible poll function merging the functions.
+    // returns the number of events signalled
+    int kitPoll(int timeoutUs)
     {
-        const auto startTime = std::chrono::steady_clock::now();
-        do
+        if (TerminationFlag)
+        {
+            LOG_TRC("Termination of unipoll mainloop flagged");
+            return -1;
+        }
+
+        // The maximum number of extra events to process beyond the first.
+        int maxExtraEvents = 15;
+        int eventsSignalled = 0;
+
+        int timeoutMs = timeoutUs / 1000;
+
+        if (timeoutMs < 0)
+        {
+            // Flush at most 1 + maxExtraEvents, or return when nothing left.
+            while (poll(0) > 0 && maxExtraEvents-- > 0)
+                ++eventsSignalled;
+        }
+        else
         {
             // Flush at most maxEvents+1, or return when nothing left.
-            if (pSocketPoll->poll(timeoutMs) <= 0)
-                break;
+            _pollEnd = std::chrono::steady_clock::now() + std::chrono::microseconds(timeoutUs);
+            do
+            {
+                if (poll(timeoutMs) <= 0)
+                    break;
 
-            const auto now = std::chrono::steady_clock::now();
-            const auto elapsedTimeMs
-                = std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime)
-                .count();
-            if (elapsedTimeMs >= timeoutMs)
-                break;
+                const auto now = std::chrono::steady_clock::now();
+                drainQueue(now);
 
-            timeoutMs -= elapsedTimeMs;
-            ++eventsSignalled;
+                timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(_pollEnd - now).count();
+                ++eventsSignalled;
+            }
+            while (timeoutMs > 0 && !TerminationFlag && maxExtraEvents-- > 0);
         }
-        while (!TerminationFlag && maxExtraEvents-- > 0);
-    }
 
-    if (document)
-        document->drainQueue(std::chrono::steady_clock::now());
+        drainQueue(std::chrono::steady_clock::now());
 
 #if !MOBILEAPP
-    if (document && document->purgeSessions() == 0)
-    {
-        LOG_INF("Last session discarded. Setting TerminationFlag");
-        TerminationFlag = true;
-        return -1;
-    }
+        if (document && document->purgeSessions() == 0)
+        {
+            LOG_INF("Last session discarded. Setting TerminationFlag");
+            TerminationFlag = true;
+            return -1;
+        }
 #endif
+        // Report the number of events we processsed.
+        return eventsSignalled;
+    }
+};
 
-    // Report the number of events we processsed.
-    return eventsSignalled;
+/// Called by LOK main-loop the central location for data processing.
+int pollCallback(void* pData, int timeoutUs)
+{
+    if (!pData)
+        return 0;
+    else
+        return reinterpret_cast<KitSocketPoll*>(pData)->kitPoll(timeoutUs);
 }
 
 /// Called by LOK main-loop
 void wakeCallback(void* pData)
 {
-    if (pData)
-    {
-        SocketPoll* pSocketPoll = reinterpret_cast<SocketPoll*>(pData);
-        pSocketPoll->wakeup();
-    }
+    if (!pData)
+        return;
+    else
+        return reinterpret_cast<KitSocketPoll*>(pData)->wakeup();
 }
 
 #ifndef BUILDING_TESTS
@@ -2687,7 +2718,7 @@ void lokit_main(
 
 #endif // MOBILEAPP
 
-        SocketPoll mainKit("kit");
+        KitSocketPoll mainKit;
         mainKit.runOnClientThread(); // We will do the polling on this thread.
 
         std::shared_ptr<SocketHandlerInterface> websocketHandler =
commit 8360f2c95140beeda41927e77070a1114a448fe3
Author:     Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Fri May 3 18:05:48 2019 +0100
Commit:     Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri May 10 15:13:40 2019 +0100

    unipoll: process lots of events at once.
    
    Change-Id: I8b0a37d114a55e5d64d7e5dd7df6c494971087ca

diff --git a/kit/Kit.cpp b/kit/Kit.cpp
index 6ac59e84d..c75047a8e 100644
--- a/kit/Kit.cpp
+++ b/kit/Kit.cpp
@@ -2334,9 +2334,7 @@ int pollCallback(void* pData, int timeoutUs)
         return 0;
 
     // The maximum number of extra events to process beyond the first.
-    //FIXME: When processing more than one event, full-document
-    //FIXME: invalidations happen (for some reason), so disable for now.
-    int maxExtraEvents = 0;
+    int maxExtraEvents = 15;
     int eventsSignalled = 0;
 
     int timeoutMs = timeoutUs / 1000;
@@ -2367,7 +2365,7 @@ int pollCallback(void* pData, int timeoutUs)
             timeoutMs -= elapsedTimeMs;
             ++eventsSignalled;
         }
-        while (maxExtraEvents-- > 0);
+        while (!TerminationFlag && maxExtraEvents-- > 0);
     }
 
     if (document)


More information about the Libreoffice-commits mailing list