[Libreoffice-commits] core.git: comphelper/source desktop/source include/comphelper package/source

Michael Meeks michael.meeks at collabora.com
Thu Dec 1 18:44:34 UTC 2016


 comphelper/source/misc/threadpool.cxx     |  266 +++++++++++++-----------------
 desktop/source/app/app.cxx                |    4 
 include/comphelper/threadpool.hxx         |   47 +++--
 package/source/zipapi/ZipOutputStream.cxx |    1 
 4 files changed, 152 insertions(+), 166 deletions(-)

New commits:
commit aa68c99d88fd7abe08c4aee5206c859a0cdba38e
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Thu Dec 1 11:14:24 2016 +0000

    tdf#104126 - comphelper thread-pool, use reliable std::condition_variable.
    
    The existing osl::Condition is an API and reliability disaster area.
    
    Change-Id: I3be84e1c6a83e58c43c40c9c8720790d923a6694
    Reviewed-on: https://gerrit.libreoffice.org/31163
    Tested-by: Jenkins <ci at libreoffice.org>
    Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
    Tested-by: Michael Meeks <michael.meeks at collabora.com>

diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index 0fda264..6329143 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -10,11 +10,14 @@
 #include <comphelper/threadpool.hxx>
 
 #include <com/sun/star/uno/Exception.hpp>
+#include <sal/config.h>
 #include <rtl/instance.hxx>
 #include <rtl/string.hxx>
+#include <salhelper/thread.hxx>
 #include <algorithm>
 #include <memory>
 #include <thread>
+#include <chrono>
 
 namespace comphelper {
 
@@ -26,30 +29,27 @@ static thread_local bool gbIsWorkerThread;
 // used to group thread-tasks for waiting in waitTillDone()
 class COMPHELPER_DLLPUBLIC ThreadTaskTag
 {
-    osl::Mutex mMutex;
-    std::size_t mnTasksWorking;
-    osl::Condition maTasksComplete;
+    std::mutex maMutex;
+    sal_Int32 mnTasksWorking;
+    std::condition_variable maTasksComplete;
 
 public:
     ThreadTaskTag();
-    bool           isDone();
-    void           waitUntilDone();
-    void           onTaskWorkerDone();
-    void           onTaskPushed();
+    bool isDone();
+    void waitUntilDone();
+    void onTaskWorkerDone();
+    void onTaskPushed();
 };
 
 
 class ThreadPool::ThreadWorker : public salhelper::Thread
 {
-    ThreadPool    *mpPool;
-    osl::Condition maNewWork;
-    bool           mbWorking;
+    ThreadPool *mpPool;
 public:
 
     explicit ThreadWorker( ThreadPool *pPool ) :
         salhelper::Thread("thread-pool"),
-        mpPool( pPool ),
-        mbWorking( false )
+        mpPool( pPool )
     {
     }
 
@@ -58,74 +58,20 @@ public:
 #if defined DBG_UTIL && defined LINUX
         gbIsWorkerThread = true;
 #endif
-        while ( ThreadTask * pTask = waitForWork() )
-        {
-            std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
-            try {
-                pTask->doWork();
-            }
-            catch (const std::exception &e)
-            {
-                SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
-            }
-            catch (const css::uno::Exception &e)
-            {
-                SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
-            }
-            delete pTask;
-            pTag->onTaskWorkerDone();
-        }
-    }
-
-    ThreadTask *waitForWork()
-    {
-        ThreadTask *pRet = nullptr;
+        std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
 
-        osl::ResettableMutexGuard aGuard( mpPool->maGuard );
-
-        pRet = mpPool->popWork();
-
-        while( !pRet )
+        while( !mpPool->mbTerminate )
         {
-            if (mbWorking)
-                mpPool->stopWork();
-            mbWorking = false;
-            maNewWork.reset();
-
-            if( mpPool->mbTerminate )
-                break;
-
-            aGuard.clear(); // unlock
-
-            maNewWork.wait();
-
-            aGuard.reset(); // lock
+            ThreadTask *pTask = mpPool->popWorkLocked( aGuard, true );
+            if( pTask )
+            {
+                aGuard.unlock();
 
-            pRet = mpPool->popWork();
-        }
+                pTask->execAndDelete();
 
-        if (pRet)
-        {
-            if (!mbWorking)
-                mpPool->startWork();
-            mbWorking = true;
+                aGuard.lock();
+            }
         }
-
-        return pRet;
-    }
-
-    // Why a condition per worker thread - you may ask.
-    //
-    // Unfortunately the Windows synchronisation API that we wrap
-    // is horribly inadequate cf.
-    //    http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
-    // The existing osl::Condition API should only ever be used
-    // between one producer and one consumer thread to avoid the
-    // lost wakeup problem.
-
-    void signalNewWork()
-    {
-        maNewWork.set();
     }
 };
 
@@ -133,19 +79,18 @@ ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
     mnThreadsWorking( 0 ),
     mbTerminate( false )
 {
+    std::unique_lock< std::mutex > aGuard( maMutex );
+
     for( sal_Int32 i = 0; i < nWorkers; i++ )
         maWorkers.push_back( new ThreadWorker( this ) );
 
-    maTasksComplete.set();
-
-    osl::MutexGuard aGuard( maGuard );
     for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers)
         rpWorker->launch();
 }
 
 ThreadPool::~ThreadPool()
 {
-    waitAndCleanupWorkers();
+    shutdown();
 }
 
 struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
@@ -183,100 +128,108 @@ sal_Int32 ThreadPool::getPreferredConcurrency()
     return ThreadCount;
 }
 
-void ThreadPool::waitAndCleanupWorkers()
+// FIXME: there should be no need for this as/when our baseline
+// is >VS2015 and drop WinXP; the sorry details are here:
+// https://connect.microsoft.com/VisualStudio/feedback/details/1282596
+void ThreadPool::shutdown()
 {
-    osl::ResettableMutexGuard aGuard( maGuard );
+    if (mbTerminate)
+        return;
+
+    std::unique_lock< std::mutex > aGuard( maMutex );
 
     if( maWorkers.empty() )
     { // no threads at all -> execute the work in-line
-        while ( ThreadTask * pTask = popWork() )
-        {
-            std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
-            pTask->doWork();
-            delete pTask;
-            pTag->onTaskWorkerDone();
-        }
+        ThreadTask *pTask;
+        while ( ( pTask = popWorkLocked(aGuard, false) ) )
+            pTask->execAndDelete();
     }
     else
     {
-        aGuard.clear();
-        maTasksComplete.wait();
-        aGuard.reset();
+        while( !maTasks.empty() )
+            maTasksChanged.wait( aGuard );
     }
     assert( maTasks.empty() );
 
     mbTerminate = true;
 
+    maTasksChanged.notify_all();
+
     while( !maWorkers.empty() )
     {
         rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
         maWorkers.pop_back();
         assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
                 == maWorkers.end());
-        xWorker->signalNewWork();
-        aGuard.clear();
-        { // unlocked
+        aGuard.unlock();
+        {
             xWorker->join();
             xWorker.clear();
         }
-        aGuard.reset();
+        aGuard.lock();
     }
 }
 
 void ThreadPool::pushTask( ThreadTask *pTask )
 {
-    osl::MutexGuard aGuard( maGuard );
+    std::unique_lock< std::mutex > aGuard( maMutex );
+
     pTask->mpTag->onTaskPushed();
     maTasks.insert( maTasks.begin(), pTask );
 
-    // horrible beyond belief:
-    for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers)
-        rpWorker->signalNewWork();
-    maTasksComplete.reset();
+    maTasksChanged.notify_one();
 }
 
-ThreadTask *ThreadPool::popWork()
+ThreadTask *ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
 {
-    if( !maTasks.empty() )
+    do
     {
-        ThreadTask *pTask = maTasks.back();
-        maTasks.pop_back();
-        return pTask;
-    }
-    else
-        return nullptr;
+        if( !maTasks.empty() )
+        {
+            ThreadTask *pTask = maTasks.back();
+            maTasks.pop_back();
+            return pTask;
+        }
+        else if (!bWait || mbTerminate)
+            return nullptr;
+
+        maTasksChanged.wait( rGuard );
+
+    } while (!mbTerminate);
+
+    return nullptr;
 }
 
-void ThreadPool::startWork()
+void ThreadPool::startWorkLocked()
 {
     mnThreadsWorking++;
 }
 
-void ThreadPool::stopWork()
+void ThreadPool::stopWorkLocked()
 {
     assert( mnThreadsWorking > 0 );
     if ( --mnThreadsWorking == 0 )
-        maTasksComplete.set();
+        maTasksChanged.notify_all();
 }
 
+
 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag)
 {
 #if defined DBG_UTIL && defined LINUX
     assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
 #endif
-    osl::ResettableMutexGuard aGuard( maGuard );
-
-    if( maWorkers.empty() )
-    { // no threads at all -> execute the work in-line
-        while ( ThreadTask * pTask = popWork() )
-        {
-            std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
-            pTask->doWork();
-            delete pTask;
-            pTag->onTaskWorkerDone();
+    {
+        std::unique_lock< std::mutex > aGuard( maMutex );
+
+        if( maWorkers.empty() )
+        { // no threads at all -> execute the work in-line
+            ThreadTask *pTask;
+            while (!rTag->isDone() &&
+                   ( pTask = popWorkLocked(aGuard, false) ) )
+                pTask->execAndDelete();
         }
     }
-    aGuard.clear();
+
     rTag->waitUntilDone();
 }
 
@@ -290,54 +243,73 @@ bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
     return pTag->isDone();
 }
 
-
 ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
     : mpTag(pTag)
 {
 }
 
+void ThreadTask::execAndDelete()
+{
+    std::shared_ptr<ThreadTaskTag> pTag(mpTag);
+    try {
+        doWork();
+    }
+    catch (const std::exception &e)
+    {
+        SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
+    }
+    catch (const css::uno::Exception &e)
+    {
+        SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
+    }
+
+    delete this;
+    pTag->onTaskWorkerDone();
+}
+
 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
 {
-    maTasksComplete.set();
 }
 
 void ThreadTaskTag::onTaskPushed()
 {
-    osl::MutexGuard g(mMutex);
-    assert( mnTasksWorking < 65535 ); // sanity checking
-    ++mnTasksWorking;
-    maTasksComplete.reset();
+    std::unique_lock< std::mutex > aGuard( maMutex );
+    mnTasksWorking++;
+    assert( mnTasksWorking < 65536 ); // sanity checking
 }
 
 void ThreadTaskTag::onTaskWorkerDone()
 {
-    osl::MutexGuard g(mMutex);
-    assert(mnTasksWorking > 0);
-    --mnTasksWorking;
+    std::unique_lock< std::mutex > aGuard( maMutex );
+    mnTasksWorking--;
+    assert(mnTasksWorking >= 0);
     if (mnTasksWorking == 0)
-        maTasksComplete.set();
+        maTasksComplete.notify_all();
 }
 
-void ThreadTaskTag::waitUntilDone()
+bool ThreadTaskTag::isDone()
 {
-#if defined DBG_UTIL && defined LINUX
-    assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
-#endif
+    std::unique_lock< std::mutex > aGuard( maMutex );
+    return mnTasksWorking == 0;
+}
 
+void ThreadTaskTag::waitUntilDone()
+{
+    std::unique_lock< std::mutex > aGuard( maMutex );
+    while( mnTasksWorking > 0 )
+    {
 #ifdef DBG_UTIL
-    // 3 minute timeout in debug mode so our tests fail sooner rather than later
-    osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 3*60, 0 });
-    assert(rv != osl::Condition::result_timeout);
+        // 3 minute timeout in debug mode so our tests fail sooner rather than later
+        std::cv_status result = maTasksComplete.wait_for(
+            aGuard, std::chrono::seconds( 3 * 60 ));
+        assert(result != std::cv_status::timeout);
 #else
-    // 10 minute timeout in production so the app eventually throws some kind of error
-    if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl::Condition::Result::result_timeout)
-        throw std::runtime_error("timeout waiting for threadpool tasks");
+        // 10 minute timeout in production so the app eventually throws some kind of error
+        if (maTasksComplete.wait_for(
+                aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
+            throw std::runtime_error("timeout waiting for threadpool tasks");
 #endif
-}
-
-bool ThreadTaskTag::isDone()
-{
-    return mnTasksWorking == 0;
+    }
 }
 
 } // namespace comphelper
diff --git a/desktop/source/app/app.cxx b/desktop/source/app/app.cxx
index 501ebe1..9a68158 100644
--- a/desktop/source/app/app.cxx
+++ b/desktop/source/app/app.cxx
@@ -81,6 +81,7 @@
 #include <toolkit/helper/vclunohelper.hxx>
 #include <comphelper/configuration.hxx>
 #include <comphelper/fileurl.hxx>
+#include <comphelper/threadpool.hxx>
 #include <comphelper/processfactory.hxx>
 #include <comphelper/backupfilehelper.hxx>
 #include <unotools/bootstrap.hxx>
@@ -1791,11 +1792,14 @@ int Desktop::doShutdown()
         StarBASIC::DetachAllDocBasicItems();
 #endif
     }
+
     // be sure that path/language options gets destroyed before
     // UCB is deinitialized
     pExecGlobals->pLanguageOptions.reset( nullptr );
     pExecGlobals->pPathOptions.reset( nullptr );
 
+    comphelper::ThreadPool::getSharedOptimalPool().shutdown();
+
     bool bRR = pExecGlobals->bRestartRequested;
     delete pExecGlobals;
     pExecGlobals = nullptr;
diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx
index 7910a83..9f76922 100644
--- a/include/comphelper/threadpool.hxx
+++ b/include/comphelper/threadpool.hxx
@@ -11,11 +11,11 @@
 #define INCLUDED_COMPHELPER_THREADPOOL_HXX
 
 #include <sal/config.h>
-#include <salhelper/thread.hxx>
-#include <osl/mutex.hxx>
-#include <osl/conditn.hxx>
 #include <rtl/ref.hxx>
 #include <comphelper/comphelperdllapi.h>
+#include <mutex>
+#include <thread>
+#include <condition_variable>
 #include <vector>
 #include <memory>
 
@@ -28,14 +28,19 @@ class COMPHELPER_DLLPUBLIC ThreadTask
 {
 friend class ThreadPool;
     std::shared_ptr<ThreadTaskTag>  mpTag;
+
+    /// execute and delete this task
+    void      execAndDelete();
+protected:
+    /// override to get your task performed by the pool
+    virtual void doWork() = 0;
+    /// once pushed ThreadTasks are destroyed by the pool
+    virtual   ~ThreadTask() {}
 public:
     ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag);
-    virtual      ~ThreadTask() {}
-    virtual void doWork() = 0;
-    const std::shared_ptr<ThreadTaskTag>& getTag() { return mpTag; }
 };
 
-/// A very basic thread pool implementation
+/// A very basic thread-safe thread pool implementation
 class COMPHELPER_DLLPUBLIC ThreadPool final
 {
 public:
@@ -50,7 +55,7 @@ public:
     /// returns a configurable max-concurrency
     /// limit to avoid spawning an unnecessarily
     /// large number of threads on high-core boxes.
-    /// MAX_CONCURRENCY envar controls the cap.
+    /// MAX_CONCURRENCY env. var. controls the cap.
     static      sal_Int32 getPreferredConcurrency();
 
     ThreadPool( sal_Int32 nWorkers );
@@ -65,6 +70,9 @@ public:
     /// return the number of live worker threads
     sal_Int32   getWorkerCount() const { return maWorkers.size(); }
 
+    /// wait until all work is completed, then join all threads
+    void        shutdown();
+
 private:
     ThreadPool(const ThreadPool&) = delete;
     ThreadPool& operator=(const ThreadPool&) = delete;
@@ -72,20 +80,21 @@ private:
     class ThreadWorker;
     friend class ThreadWorker;
 
-    /// wait until all work is completed, then join all threads
-    void        waitAndCleanupWorkers();
-
-    ThreadTask *popWork();
-    void        startWork();
-    void        stopWork();
+    /** Pop a work task
+        @param  bWait - if set wait until task present or termination
+        @return a new task to perform, or NULL if list empty or terminated
+    */
+    ThreadTask *popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait );
+    void        startWorkLocked();
+    void        stopWorkLocked();
 
-    osl::Mutex     maGuard;
-    sal_Int32      mnThreadsWorking;
     /// signalled when all in-progress tasks are complete
-    osl::Condition maTasksComplete;
-    bool           mbTerminate;
-    std::vector< rtl::Reference< ThreadWorker > > maWorkers;
+    std::mutex              maMutex;
+    std::condition_variable maTasksChanged;
+    sal_Int32               mnThreadsWorking;
+    bool                    mbTerminate;
     std::vector< ThreadTask * >   maTasks;
+    std::vector< rtl::Reference< ThreadWorker > > maWorkers;
 };
 
 } // namespace comphelper
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index 603a614..d0fce89 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -27,6 +27,7 @@
 #include <osl/diagnose.h>
 
 #include <osl/time.h>
+#include <osl/thread.hxx>
 
 #include <PackageConstants.hxx>
 #include <ZipEntry.hxx>


More information about the Libreoffice-commits mailing list