[Libreoffice-commits] core.git: sc/source

Matúš Kukan matus.kukan at collabora.com
Wed Nov 20 08:24:01 PST 2013


 sc/source/ui/inc/datastreams.hxx         |   15 ++-
 sc/source/ui/miscdlgs/datastreams.cxx    |  129 ++++++++++++++++++++++++++-----
 sc/source/ui/miscdlgs/datastreamsdlg.cxx |    7 +
 3 files changed, 129 insertions(+), 22 deletions(-)

New commits:
commit ed89a069f462ae106802e0d1376c38723c2c12cb
Author: Matúš Kukan <matus.kukan at collabora.com>
Date:   Wed Nov 20 17:09:41 2013 +0100

    datastreams: read data in another thread
    
    Change-Id: Iedd4075eadce9ca8fc41b279ea03c2679b01ec71

diff --git a/sc/source/ui/inc/datastreams.hxx b/sc/source/ui/inc/datastreams.hxx
index 80f9cd6..93d1574 100644
--- a/sc/source/ui/inc/datastreams.hxx
+++ b/sc/source/ui/inc/datastreams.hxx
@@ -14,23 +14,30 @@
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <vector>
 
-namespace datastreams { class CallerThread; }
+namespace datastreams {
+    class CallerThread;
+    class ReaderThread;
+}
 class ScDocShell;
 class ScDocument;
 class ScRange;
 class SvStream;
 class Window;
 
+typedef std::vector<OString> LinesList;
+
 class DataStreams : boost::noncopyable
 {
 public:
     enum MoveEnum { NO_MOVE, RANGE_DOWN, MOVE_DOWN, MOVE_UP };
     DataStreams(ScDocShell *pScDocShell);
     ~DataStreams();
+    OString ConsumeLine();
     bool ImportData();
     void MoveData();
-    void Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine,
+    void Set(SvStream *pStream, bool bValuesInLine,
             const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove);
     void ShowDialog(Window *pParent);
     void Start();
@@ -43,11 +50,13 @@ private:
     bool mbRunning;
     bool mbIsUndoEnabled;
     bool mbValuesInLine;
+    LinesList *mpLines;
+    size_t mnLinesCount;
     boost::scoped_ptr<ScRange> mpRange;
     boost::scoped_ptr<ScRange> mpStartRange;
     boost::scoped_ptr<ScRange> mpEndRange;
-    boost::scoped_ptr<SvStream> mpStream;
     rtl::Reference<datastreams::CallerThread> mxThread;
+    rtl::Reference<datastreams::ReaderThread> mxReaderThread;
 };
 
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/ui/miscdlgs/datastreams.cxx b/sc/source/ui/miscdlgs/datastreams.cxx
index b938513..9d66ee3 100644
--- a/sc/source/ui/miscdlgs/datastreams.cxx
+++ b/sc/source/ui/miscdlgs/datastreams.cxx
@@ -24,6 +24,8 @@
 #include <tabvwsh.hxx>
 #include <viewdata.hxx>
 
+#include <queue>
+
 namespace datastreams {
 
 class CallerThread : public salhelper::Thread
@@ -57,6 +59,82 @@ private:
     }
 };
 
+class ReaderThread : public salhelper::Thread
+{
+    SvStream *mpStream;
+public:
+    bool mbTerminateReading;
+    osl::Condition maProduceResume;
+    osl::Condition maConsumeResume;
+    osl::Mutex maLinesProtector;
+    std::queue<LinesList* > maPendingLines;
+    std::queue<LinesList* > maUsedLines;
+
+    ReaderThread(SvStream *pData):
+        Thread("ReaderThread")
+        ,mpStream(pData)
+        ,mbTerminateReading(false)
+    {
+    }
+
+    virtual ~ReaderThread()
+    {
+        delete mpStream;
+        while (!maPendingLines.empty())
+        {
+            delete maPendingLines.front();
+            maPendingLines.pop();
+        }
+        while (!maUsedLines.empty())
+        {
+            delete maUsedLines.front();
+            maUsedLines.pop();
+        }
+    }
+
+    void terminate()
+    {
+        mbTerminateReading = true;
+        maProduceResume.set();
+        join();
+    }
+
+private:
+    virtual void execute() SAL_OVERRIDE
+    {
+        while (!mbTerminateReading)
+        {
+            LinesList *pLines = 0;
+            osl::ResettableMutexGuard aGuard(maLinesProtector);
+            if (!maUsedLines.empty())
+            {
+                pLines = maUsedLines.front();
+                maUsedLines.pop();
+                aGuard.clear(); // unlock
+            }
+            else
+            {
+                aGuard.clear(); // unlock
+                pLines = new LinesList(10);
+            }
+            for (size_t i = 0; i < pLines->size(); ++i)
+                mpStream->ReadLine( pLines->at(i) );
+            aGuard.reset(); // lock
+            while (!mbTerminateReading && maPendingLines.size() >= 8)
+            { // pause reading for a bit
+                aGuard.clear(); // unlock
+                maProduceResume.wait();
+                maProduceResume.reset();
+                aGuard.reset(); // lock
+            }
+            maPendingLines.push(pLines);
+            maConsumeResume.set();
+            if (!mpStream->good())
+                mbTerminateReading = true;
+        }
+    }
+};
+
 }
 
 DataStreams::DataStreams(ScDocShell *pScDocShell):
@@ -64,6 +142,8 @@ DataStreams::DataStreams(ScDocShell *pScDocShell):
     , mpScDocument(mpScDocShell->GetDocument())
     , meMove(NO_MOVE)
     , mbRunning(false)
+    , mpLines(0)
+    , mnLinesCount(0)
 {
     mxThread = new datastreams::CallerThread( this );
     mxThread->launch();
@@ -76,6 +156,31 @@ DataStreams::~DataStreams()
     mxThread->mbTerminate = true;
     mxThread->maStart.set();
     mxThread->join();
+    if (mxReaderThread.is())
+        mxReaderThread->terminate();
+}
+
+OString DataStreams::ConsumeLine()
+{
+    if (!mpLines || mnLinesCount >= mpLines->size())
+    {
+        mnLinesCount = 0;
+        osl::ResettableMutexGuard aGuard(mxReaderThread->maLinesProtector);
+        if (mpLines)
+            mxReaderThread->maUsedLines.push(mpLines);
+        while (mxReaderThread->maPendingLines.empty())
+        {
+            aGuard.clear(); // unlock
+            mxReaderThread->maConsumeResume.wait();
+            mxReaderThread->maConsumeResume.reset();
+            aGuard.reset(); // lock
+        }
+        mpLines = mxReaderThread->maPendingLines.front();
+        mxReaderThread->maPendingLines.pop();
+        if (mxReaderThread->maPendingLines.size() <= 4)
+            mxReaderThread->maProduceResume.set(); // start producer again
+    }
+    return mpLines->at(mnLinesCount++);
 }
 
 void DataStreams::Start()
@@ -117,13 +222,13 @@ void DataStreams::Stop()
     mpScDocument->EnableUndo(mbIsUndoEnabled);
 }
 
-void DataStreams::Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine,
+void DataStreams::Set(SvStream *pStream, bool bValuesInLine,
         const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove)
 {
-    if (bIsScript)
-        mpStream.reset( new SvScriptStream(rUrl) );
-    else
-        mpStream.reset( new SvFileStream(rUrl, STREAM_READ) );
+    if (mxReaderThread.is())
+        mxReaderThread->terminate();
+    mxReaderThread = new datastreams::ReaderThread( pStream );
+    mxReaderThread->launch();
 
     mpEndRange.reset( NULL );
     mpRange.reset ( new ScRange() );
@@ -170,14 +275,6 @@ void DataStreams::MoveData()
 
 bool DataStreams::ImportData()
 {
-    if (!mpStream->good())
-    {
-        // if there is a problem with SvStream, stop running
-        mbRunning = false;
-        return mbRunning;
-    }
-
-    OString sTmp;
     SolarMutexGuard aGuard;
     MoveData();
     if (mbValuesInLine)
@@ -186,8 +283,7 @@ bool DataStreams::ImportData()
         OStringBuffer aBuf;
         while (nHeight--)
         {
-            mpStream->ReadLine(sTmp);
-            aBuf.append(sTmp);
+            aBuf.append(ConsumeLine());
             aBuf.append('\n');
         }
         SvMemoryStream aMemoryStream((void *)aBuf.getStr(), aBuf.getLength(), STREAM_READ);
@@ -202,8 +298,7 @@ bool DataStreams::ImportData()
         // read more lines at once but not too much
         for (int i = 0; i < 10; ++i)
         {
-            mpStream->ReadLine(sTmp);
-            OUString sLine(OStringToOUString(sTmp, RTL_TEXTENCODING_UTF8));
+            OUString sLine( OStringToOUString(ConsumeLine(), RTL_TEXTENCODING_UTF8) );
             if (sLine.indexOf(',') <= 0)
                 continue;
 
diff --git a/sc/source/ui/miscdlgs/datastreamsdlg.cxx b/sc/source/ui/miscdlgs/datastreamsdlg.cxx
index cbe19a3..bacb67a 100644
--- a/sc/source/ui/miscdlgs/datastreamsdlg.cxx
+++ b/sc/source/ui/miscdlgs/datastreamsdlg.cxx
@@ -74,11 +74,14 @@ DataStreamsDlg::DataStreamsDlg(DataStreams *pDataStreams, Window* pParent)
 
 void DataStreamsDlg::Start()
 {
-    bool bIsScript = m_pRBScriptData->IsChecked();
     sal_Int32 nLimit = 0;
     if (m_pRBMaxLimit->IsChecked())
         nLimit = m_pEdLimit->GetText().toInt32();
-    mpDataStreams->Set( m_pCbUrl->GetText(), bIsScript, m_pRBValuesInLine->IsChecked(),
+    mpDataStreams->Set(
+            (m_pRBScriptData->IsChecked() ?
+                dynamic_cast<SvStream*>( new SvScriptStream(m_pCbUrl->GetText()) ) :
+                dynamic_cast<SvStream*>( new SvFileStream(m_pCbUrl->GetText(), STREAM_READ) )),
+            m_pRBValuesInLine->IsChecked(),
             m_pEdRange->GetText(), nLimit, (m_pRBNoMove->IsChecked() ? DataStreams::NO_MOVE :
             m_pRBRangeDown->IsChecked() ? DataStreams::RANGE_DOWN : DataStreams::MOVE_DOWN) );
     mpDataStreams->Start();


More information about the Libreoffice-commits mailing list