[Libreoffice-commits] core.git: sc/Library_scfilt.mk sc/source

Michael Meeks michael.meeks at collabora.com
Wed Nov 27 10:41:17 PST 2013


 sc/Library_scfilt.mk                      |    1 
 sc/source/filter/inc/sheetdatacontext.hxx |   11 +
 sc/source/filter/oox/sheetdatacontext.cxx |    6 
 sc/source/filter/oox/threadpool.cxx       |  162 ++++++++++++++++++++++
 sc/source/filter/oox/threadpool.hxx       |   53 +++++++
 sc/source/filter/oox/workbookfragment.cxx |  212 +++++++-----------------------
 6 files changed, 285 insertions(+), 160 deletions(-)

New commits:
commit 0710299634a2276749c36ed86a5a60a20d63073f
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Nov 27 18:11:34 2013 +0000

    sc: threaded parsing of the core data inside large XLSX files
    
    Enabled in experimental mode only or via SC_IMPORT_THREADS=<N> this
    allows significant parallelisation of sheet reading. I also implement
    a simple thread pool to manage that.
    
    Change-Id: I66c72211f2699490230e993a374c26b1892eac12

diff --git a/sc/Library_scfilt.mk b/sc/Library_scfilt.mk
index 499f873..eb0d5d2 100644
--- a/sc/Library_scfilt.mk
+++ b/sc/Library_scfilt.mk
@@ -211,6 +211,7 @@ $(eval $(call gb_Library_add_exception_objects,scfilt,\
 	sc/source/filter/oox/tablebuffer \
 	sc/source/filter/oox/tablefragment \
 	sc/source/filter/oox/themebuffer \
+	sc/source/filter/oox/threadpool \
 	sc/source/filter/oox/unitconverter \
 	sc/source/filter/oox/viewsettings \
 	sc/source/filter/oox/workbookfragment \
diff --git a/sc/source/filter/inc/sheetdatacontext.hxx b/sc/source/filter/inc/sheetdatacontext.hxx
index b492d2a..3f3e377 100644
--- a/sc/source/filter/inc/sheetdatacontext.hxx
+++ b/sc/source/filter/inc/sheetdatacontext.hxx
@@ -23,6 +23,9 @@
 #include "excelhandlers.hxx"
 #include "richstring.hxx"
 #include "sheetdatabuffer.hxx"
+#include <vcl/svapp.hxx>
+
+#define MULTI_THREAD_SHEET_PARSING 1
 
 namespace oox {
 namespace xls {
@@ -54,8 +57,16 @@ struct SheetDataContextBase
  */
 class SheetDataContext : public WorksheetContextBase, private SheetDataContextBase
 {
+    // If we are doing threaded parsing, this SheetDataContext
+    // forms the inner loop for bulk data parsing, and for the
+    // duration of this we can drop the solar mutex.
+#if MULTI_THREAD_SHEET_PARSING
+    SolarMutexReleaser aReleaser;
+#endif
+
 public:
     explicit            SheetDataContext( WorksheetFragmentBase& rFragment );
+    virtual            ~SheetDataContext();
 
 protected:
     virtual ::oox::core::ContextHandlerRef onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs );
diff --git a/sc/source/filter/oox/sheetdatacontext.cxx b/sc/source/filter/oox/sheetdatacontext.cxx
index 5170234..9a0f7df 100644
--- a/sc/source/filter/oox/sheetdatacontext.cxx
+++ b/sc/source/filter/oox/sheetdatacontext.cxx
@@ -90,6 +90,12 @@ SheetDataContext::SheetDataContext( WorksheetFragmentBase& rFragment ) :
     mnRow( -1 ),
     mnCol( -1 )
 {
+    SAL_INFO( "sc.filter",  "start safe sheet data context - unlock\n" );
+}
+
+SheetDataContext::~SheetDataContext()
+{
+    SAL_INFO( "sc.filter",  "end safe sheet data context - relock\n" );
 }
 
 ContextHandlerRef SheetDataContext::onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs )
diff --git a/sc/source/filter/oox/threadpool.cxx b/sc/source/filter/oox/threadpool.cxx
new file mode 100644
index 0000000..9de1a14
--- /dev/null
+++ b/sc/source/filter/oox/threadpool.cxx
@@ -0,0 +1,162 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "threadpool.hxx"
+
+class ThreadPool::ThreadWorker : public salhelper::Thread
+{
+    ThreadPool    *mpPool;
+    osl::Condition maNewWork;
+public:
+    ThreadWorker( ThreadPool *pPool ) :
+        salhelper::Thread("sheet-import-thread-pool"),
+        mpPool( pPool ) {}
+
+    virtual void execute()
+    {
+        ThreadTask *pTask;
+        while ( ( pTask = waitForWork() ) )
+        {
+            pTask->doWork();
+            delete pTask;
+        }
+    }
+
+    ThreadTask *waitForWork()
+    {
+        ThreadTask *pRet = NULL;
+
+        osl::ResettableMutexGuard aGuard( mpPool->maGuard );
+
+        pRet = mpPool->popWork();
+
+        while( !pRet )
+        {
+            maNewWork.reset();
+
+            if( mpPool->mbTerminate )
+                break;
+
+            aGuard.clear(); // unlock
+
+            maNewWork.wait();
+
+            aGuard.reset(); // lock
+
+            pRet = mpPool->popWork();
+        }
+
+        return pRet;
+    }
+
+    //
+    // Why a condition per worker thread - you may ask.
+    //
+    // Unfortunately the Windows synchronisation API that we wrap
+    // is horribly inadequate cf.
+    //    http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
+    // The existing osl::Condition API should only ever be used
+    // between one producer and one consumer thread to avoid the
+    // lost wakeup problem.
+    //
+    void signalNewWork()
+    {
+        maNewWork.set();
+    }
+};
+
+ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
+    mbTerminate( false )
+{
+    for( sal_Int32 i = 0; i < nWorkers; i++ )
+        maWorkers.push_back( new ThreadWorker( this ) );
+
+    maTasksEmpty.reset();
+
+    osl::MutexGuard aGuard( maGuard );
+    for( size_t i = 0; i < maWorkers.size(); i++ )
+        maWorkers[ i ]->launch();
+}
+
+ThreadPool::~ThreadPool()
+{
+    waitUntilWorkersDone();
+}
+
+/// wait until all the workers have completed and
+/// terminate all threads
+void ThreadPool::waitUntilWorkersDone()
+{
+    waitUntilEmpty();
+
+    osl::ResettableMutexGuard aGuard( maGuard );
+    mbTerminate = true;
+
+    while( !maWorkers.empty() )
+    {
+        rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
+        maWorkers.pop_back();
+        assert( maWorkers.find( xWorker ) == maWorkers.end() );
+        xWorker->signalNewWork();
+        aGuard.clear();
+        { // unlocked
+            xWorker->join();
+            xWorker.clear();
+        }
+        aGuard.reset();
+    }
+}
+
+void ThreadPool::pushTask( ThreadTask *pTask )
+{
+    osl::MutexGuard aGuard( maGuard );
+    maTasks.insert( maTasks.begin(), pTask );
+    // horrible beyond belief:
+    for( size_t i = 0; i < maWorkers.size(); i++ )
+        maWorkers[ i ]->signalNewWork();
+    maTasksEmpty.reset();
+}
+
+ThreadTask *ThreadPool::popWork()
+{
+    if( !maTasks.empty() )
+    {
+        ThreadTask *pTask = maTasks.back();
+        maTasks.pop_back();
+        return pTask;
+    }
+    else
+        maTasksEmpty.set();
+    return NULL;
+}
+
+void ThreadPool::waitUntilEmpty()
+{
+    osl::ResettableMutexGuard aGuard( maGuard );
+
+    if( maWorkers.empty() )
+    { // no threads at all -> execute the work in-line
+        ThreadTask *pTask;
+        while ( ( pTask = popWork() ) )
+        {
+            pTask->doWork();
+            delete pTask;
+        }
+        mbTerminate = true;
+    }
+    else
+    {
+        aGuard.clear();
+        maTasksEmpty.wait();
+        aGuard.reset();
+    }
+    assert( maTasks.empty() );
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/filter/oox/threadpool.hxx b/sc/source/filter/oox/threadpool.hxx
new file mode 100644
index 0000000..036534f
--- /dev/null
+++ b/sc/source/filter/oox/threadpool.hxx
@@ -0,0 +1,53 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef SC_THREADPOOL_HXX
+#define SC_THREADPOOL_HXX
+
+#include <sal/config.h>
+#include <salhelper/thread.hxx>
+#include <osl/mutex.hxx>
+#include <osl/conditn.hxx>
+#include <rtl/ref.hxx>
+#include <vector>
+
+class ThreadTask
+{
+public:
+    virtual      ~ThreadTask() {}
+    virtual void doWork() = 0;
+};
+
+/// A very basic thread pool implementation
+class ThreadPool
+{
+public:
+                ThreadPool( sal_Int32 nWorkers );
+    virtual    ~ThreadPool();
+    void        pushTask( ThreadTask *pTask /* takes ownership */ );
+    void        waitUntilEmpty();
+    void        waitUntilWorkersDone();
+
+private:
+    class ThreadWorker;
+    friend class ThreadWorker;
+
+    ThreadTask *waitForWork( osl::Condition &rNewWork );
+    ThreadTask *popWork();
+
+    osl::Mutex maGuard;
+    osl::Condition maTasksEmpty;
+    bool mbTerminate;
+    std::vector< rtl::Reference< ThreadWorker > > maWorkers;
+    std::vector< ThreadTask * >   maTasks;
+};
+
+#endif // SC_THREADPOOL_HXX
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/filter/oox/workbookfragment.cxx b/sc/source/filter/oox/workbookfragment.cxx
index 7acb16a..d3c06ac 100644
--- a/sc/source/filter/oox/workbookfragment.cxx
+++ b/sc/source/filter/oox/workbookfragment.cxx
@@ -42,11 +42,16 @@
 #include "workbooksettings.hxx"
 #include "worksheetbuffer.hxx"
 #include "worksheetfragment.hxx"
+#include "sheetdatacontext.hxx"
+#include "threadpool.hxx"
+#include "officecfg/Office/Common.hxx"
 
 #include "document.hxx"
 #include "docsh.hxx"
 #include "calcconfig.hxx"
 
+#include <vcl/svapp.hxx>
+
 #include <oox/core/fastparser.hxx>
 #include <salhelper/thread.hxx>
 #include <osl/conditn.hxx>
@@ -54,8 +59,6 @@
 #include <queue>
 #include <boost/scoped_ptr.hpp>
 
-#define MULTI_THREAD_SHEET_PARSING 0
-
 #include "oox/ole/vbaproject.hxx"
 
 namespace oox {
@@ -204,188 +207,77 @@ namespace {
 typedef std::pair<WorksheetGlobalsRef, FragmentHandlerRef> SheetFragmentHandler;
 typedef std::vector<SheetFragmentHandler> SheetFragmentVector;
 
-#if MULTI_THREAD_SHEET_PARSING
-
-class WorkerThread;
-typedef rtl::Reference<WorkerThread> WorkerThreadRef;
-
-struct WorkerThreadData
-{
-    osl::Mutex maMtx;
-    std::vector<WorkerThreadRef> maThreads;
-};
-
-struct IdleWorkerThreadData
-{
-    osl::Mutex maMtx;
-    osl::Condition maCondAdded;
-    std::queue<WorkerThread*> maThreads;
-};
-
-struct
-{
-    boost::scoped_ptr<WorkerThreadData> mpWorkerThreads;
-    boost::scoped_ptr<IdleWorkerThreadData> mpIdleThreads;
-
-} aThreadGlobals;
-
-enum WorkerAction
-{
-    None = 0,
-    TerminateThread,
-    Work
-};
-
-class WorkerThread : public salhelper::Thread
+class WorkerThread : public ThreadTask
 {
     WorkbookFragment& mrWorkbookHandler;
-    size_t mnID;
-    FragmentHandlerRef mxHandler;
-    boost::scoped_ptr<oox::core::FastParser> mxParser;
-    osl::Mutex maMtxAction;
-    osl::Condition maCondActionChanged;
-    WorkerAction meAction;
-public:
-    WorkerThread( WorkbookFragment& rWorkbookHandler, size_t nID ) :
-        salhelper::Thread("sheet-import-worker-thread"),
-        mrWorkbookHandler(rWorkbookHandler),
-        mnID(nID),
-        mxParser(rWorkbookHandler.getOoxFilter().createParser()),
-        meAction(None) {}
-
-    virtual void execute()
-    {
-        announceIdle();
-
-        // Keep looping until the terminate request is set.
-        for (maCondActionChanged.wait(); true; maCondActionChanged.wait())
-        {
-            osl::MutexGuard aGuard(maMtxAction);
-            if (!maCondActionChanged.check())
-                // Wait again.
-                continue;
-
-            maCondActionChanged.reset();
-
-            if (meAction == TerminateThread)
-                // End the thread.
-                return;
-
-            if (meAction != Work)
-                continue;
-
-#if 0
-            // TODO : This still deadlocks in the fast parser code.
-            mrWorkbookHandler.importOoxFragment(mxHandler, *mxParser);
-#else
-            double val = rand() / static_cast<double>(RAND_MAX);
-            val *= 1000000; // normalize to 1 second.
-            val *= 1.5; // inflate it a bit.
-            usleep(val); // pretend to be working while asleep.
-#endif
-            announceIdle();
-        }
-    }
-
-    void announceIdle()
-    {
-        // Set itself idle to receive a new task from the main thread.
-        osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
-        aThreadGlobals.mpIdleThreads->maThreads.push(this);
-        aThreadGlobals.mpIdleThreads->maCondAdded.set();
-    }
+    rtl::Reference<FragmentHandler> mxHandler;
 
-    void terminate()
+public:
+    WorkerThread( WorkbookFragment& rWorkbookHandler,
+                  const rtl::Reference<FragmentHandler>& xHandler ) :
+        mrWorkbookHandler( rWorkbookHandler ),
+        mxHandler( xHandler )
     {
-        osl::MutexGuard aGuard(maMtxAction);
-        meAction = TerminateThread;
-        maCondActionChanged.set();
     }
 
-    void assign( const FragmentHandlerRef& rHandler )
+    virtual void doWork()
     {
-        osl::MutexGuard aGuard(maMtxAction);
-        mxHandler = rHandler;
-        meAction = Work;
-        maCondActionChanged.set();
+        // We hold the solar mutex in all threads except for
+        // the small safe section of the inner loop in
+        // sheetdatacontext.cxx
+        SAL_INFO( "sc.filter",  "start wait on solar\n" );
+        SolarMutexGuard maGuard;
+        SAL_INFO( "sc.filter",  "got solar\n" );
+
+        boost::scoped_ptr<oox::core::FastParser> xParser(
+                mrWorkbookHandler.getOoxFilter().createParser() );
+
+        SAL_INFO( "sc.filter",  "start import\n" );
+        mrWorkbookHandler.importOoxFragment( mxHandler, *xParser );
+        SAL_INFO( "sc.filter",  "end import, release solar\n" );
     }
 };
 
-#endif
-
 void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVector& rSheets )
 {
-#if MULTI_THREAD_SHEET_PARSING // threaded version
-    size_t nThreadCount = 3;
-    if (nThreadCount > rSheets.size())
-        nThreadCount = rSheets.size();
+    sal_Int32 nThreads = std::min( rSheets.size(), (size_t) 4 /* FIXME: ncpus/2 */ );
 
-    // Create new thread globals.
-    aThreadGlobals.mpWorkerThreads.reset(new WorkerThreadData);
-    aThreadGlobals.mpIdleThreads.reset(new IdleWorkerThreadData);
+    Reference< XComponentContext > xContext = comphelper::getProcessComponentContext();
 
-    SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
+    // Force threading off unless experimental mode or env. var is set.
+    if( !officecfg::Office::Common::Misc::ExperimentalMode::get( xContext ) )
+        nThreads = 0;
 
-    {
-        // Initialize worker threads.
-        osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
-        for (size_t i = 0; i < nThreadCount; ++i)
-        {
-            WorkerThreadRef pThread(new WorkerThread(rWorkbookHandler, i));
-            aThreadGlobals.mpWorkerThreads->maThreads.push_back(pThread);
-            pThread->launch();
-        }
-    }
+    const char *pEnv;
+    if( ( pEnv = getenv( "SC_IMPORT_THREADS" ) ) )
+        nThreads = rtl_str_toInt32( pEnv, 10 );
 
-    for (aThreadGlobals.mpIdleThreads->maCondAdded.wait(); true; aThreadGlobals.mpIdleThreads->maCondAdded.wait())
+    if( nThreads != 0 )
     {
-        osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
-        if (!aThreadGlobals.mpIdleThreads->maCondAdded.check())
-            // Wait again.
-            continue;
-
-        aThreadGlobals.mpIdleThreads->maCondAdded.reset();
-
-        // Assign work to all idle threads.
-        while (!aThreadGlobals.mpIdleThreads->maThreads.empty())
-        {
-            if (it == itEnd)
-                break;
-
-            WorkerThread* p = aThreadGlobals.mpIdleThreads->maThreads.front();
-            aThreadGlobals.mpIdleThreads->maThreads.pop();
-            p->assign(it->second);
-            ++it;
-        }
+        // test sequential read in this mode
+        if( nThreads < 0)
+            nThreads = 0;
+        ThreadPool aPool( nThreads );
 
-        if (it == itEnd)
-            // Finished!  Exit the loop.
-            break;
-    }
+        SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
+        for( ; it != itEnd; ++it )
+            aPool.pushTask( new WorkerThread( rWorkbookHandler, it->second ) )
+                ;
 
-    {
-        // Terminate all worker threads.
-        osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
-        for (size_t i = 0, n = aThreadGlobals.mpWorkerThreads->maThreads.size(); i < n; ++i)
         {
-            WorkerThreadRef pWorker = aThreadGlobals.mpWorkerThreads->maThreads[i];
-            pWorker->terminate();
-            if (pWorker.is())
-                pWorker->join();
+            // Ideally no-one else but our worker threads can re-acquire that.
+            // potentially if that causes a problem we might want to extend
+            // the SolarMutex functionality to allow passing it around.
+            SolarMutexReleaser aReleaser;
+            aPool.waitUntilWorkersDone();
         }
     }
-
-    // Delete all thread globals.
-    aThreadGlobals.mpWorkerThreads.reset();
-    aThreadGlobals.mpIdleThreads.reset();
-
-#else // non-threaded version
-    for( SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); it != itEnd; ++it)
+    else
     {
-        // import the sheet fragment
-        rWorkbookHandler.importOoxFragment(it->second);
+        SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
+        for( ; it != itEnd; ++it )
+            rWorkbookHandler.importOoxFragment( it->second );
     }
-#endif
 }
 
 }


More information about the Libreoffice-commits mailing list