[Libreoffice-commits] core.git: package/inc package/source

Dennis Francis (via logerrit) logerrit at kemper.freedesktop.org
Mon Jan 13 11:12:20 UTC 2020


 package/inc/ThreadedDeflater.hxx           |   29 ++++---
 package/source/zipapi/ThreadedDeflater.cxx |  118 +++++++++++++++++++----------
 package/source/zipapi/ZipOutputEntry.cxx   |   32 ++-----
 3 files changed, 109 insertions(+), 70 deletions(-)

New commits:
commit 353d4528b8ad8abca9a13f3016632e42bab7afde
Author:     Dennis Francis <dennis.francis at collabora.com>
AuthorDate: Sat Jan 11 11:51:34 2020 +0530
Commit:     Dennis Francis <dennis.francis at collabora.com>
CommitDate: Mon Jan 13 12:11:44 2020 +0100

    tdf#125662: do parallel-zip in batches
    
    In this approach the input stream is read one batch (of constant size)
    at a time and each batch is compressed by ThreadedDeflater. After
    we are done with a batch, the deflated buffer is processed straightaway
    (directed to file backed storage).
    
    Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230
    Reviewed-on: https://gerrit.libreoffice.org/c/core/+/86596
    Tested-by: Jenkins
    Reviewed-by: Luboš Luňák <l.lunak at collabora.com>

diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx
index 3bd7e4bc966a..f22a40a0c941 100644
--- a/package/inc/ThreadedDeflater.hxx
+++ b/package/inc/ThreadedDeflater.hxx
@@ -21,37 +21,48 @@
 #define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX
 
 #include <com/sun/star/uno/Sequence.hxx>
+#include <com/sun/star/io/XInputStream.hpp>
+#include <com/sun/star/uno/Reference.hxx>
 #include <package/packagedllapi.hxx>
 #include <comphelper/threadpool.hxx>
 #include <atomic>
 #include <memory>
+#include <vector>
+#include <functional>
 
 namespace ZipUtils
 {
 /// Parallel compression a stream using the libz deflate algorithm.
 ///
-/// Almost a replacement for the Deflater class. Call startDeflate() with the data,
-/// check with finished() or waitForTasks() and retrieve result with getOutput().
-/// The class will internally split into multiple threads.
+/// Call deflateWrite() with the input stream and input/output processing functions.
+/// This will use multiple threads for compression on each batch of data from the stream.
 class ThreadedDeflater final
 {
     class Task;
     // Note: All this should be lock-less. Each task writes only to its part
-    // of the data, flags are atomic.
+    // of the data.
     std::vector<std::vector<sal_Int8>> outBuffers;
     std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag;
     css::uno::Sequence<sal_Int8> inBuffer;
+    css::uno::Sequence<sal_Int8> prevDataBlock;
+    std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> maProcessOutputFunc;
+    sal_Int64 totalIn;
+    sal_Int64 totalOut;
     int zlibLevel;
-    std::atomic<int> pendingTasksCount;
 
 public:
     // Unlike with Deflater class, bNoWrap is always true.
     ThreadedDeflater(sal_Int32 nSetLevel);
     ~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE;
-    void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer);
-    void waitForTasks();
-    bool finished() const;
-    css::uno::Sequence<sal_Int8> getOutput() const;
+    void deflateWrite(
+        const css::uno::Reference<css::io::XInputStream>& xInStream,
+        std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
+        std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc);
+    sal_Int64 getTotalIn() const { return totalIn; }
+    sal_Int64 getTotalOut() const { return totalOut; }
+
+private:
+    void processDeflatedBuffers();
     void clear();
 };
 
diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx
index 19bbda01bbb7..73725c580c02 100644
--- a/package/source/zipapi/ThreadedDeflater.cxx
+++ b/package/source/zipapi/ThreadedDeflater.cxx
@@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask
     ThreadedDeflater* deflater;
     int sequence;
     int blockSize;
+    bool firstTask : 1;
+    bool lastTask : 1;
 
 public:
-    Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
+    Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_,
+         bool lastTask_)
         : comphelper::ThreadTask(deflater_->threadTaskTag)
         , stream()
         , deflater(deflater_)
         , sequence(sequence_)
         , blockSize(blockSize_)
+        , firstTask(firstTask_)
+        , lastTask(lastTask_)
     {
     }
 
@@ -61,58 +66,83 @@ private:
 
 ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
     : threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
+    , totalIn(0)
+    , totalOut(0)
     , zlibLevel(nSetLevel)
-    , pendingTasksCount(0)
 {
 }
 
-ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
-{
-    waitForTasks();
-    clear();
-}
+ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); }
 
-void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
+void ThreadedDeflater::deflateWrite(
+    const css::uno::Reference<css::io::XInputStream>& xInStream,
+    std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
+    std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc)
 {
-    inBuffer = rBuffer;
-    sal_Int64 size = inBuffer.getLength();
-    int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
-    tasksCount = std::max(tasksCount, 1);
-    pendingTasksCount = tasksCount;
-    outBuffers.resize(pendingTasksCount);
-    for (int sequence = 0; sequence < tasksCount; ++sequence)
+    sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
+    sal_Int64 batchSize = MaxBlockSize * nThreadCount;
+    inBuffer.realloc(batchSize);
+    prevDataBlock.realloc(MaxBlockSize);
+    outBuffers.resize(nThreadCount);
+    maProcessOutputFunc = aProcessOutputFunc;
+    bool firstTask = true;
+
+    while (xInStream->available() > 0)
     {
-        sal_Int64 thisSize = std::min(MaxBlockSize, size);
-        size -= thisSize;
-        comphelper::ThreadPool::getSharedOptimalPool().pushTask(
-            std::make_unique<Task>(this, sequence, thisSize));
+        sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize);
+        aProcessInputFunc(inBuffer, inputBytes);
+        totalIn += inputBytes;
+        int sequence = 0;
+        bool lastBatch = xInStream->available() <= 0;
+        sal_Int64 bytesPending = inputBytes;
+        while (bytesPending > 0)
+        {
+            sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending);
+            bytesPending -= taskSize;
+            bool lastTask = lastBatch && !bytesPending;
+            comphelper::ThreadPool::getSharedOptimalPool().pushTask(
+                std::make_unique<Task>(this, sequence++, taskSize, firstTask, lastTask));
+
+            if (firstTask)
+                firstTask = false;
+        }
+
+        assert(bytesPending == 0);
+
+        comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+
+        if (!lastBatch)
+        {
+            assert(inputBytes == batchSize);
+            std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize,
+                        prevDataBlock.begin());
+        }
+
+        processDeflatedBuffers();
     }
-    assert(size == 0);
 }
 
-bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
-
-css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
+void ThreadedDeflater::processDeflatedBuffers()
 {
-    assert(finished());
-    sal_Int64 totalSize = 0;
+    sal_Int64 batchOutputSize = 0;
     for (const auto& buffer : outBuffers)
-        totalSize += buffer.size();
-    uno::Sequence<sal_Int8> outBuffer(totalSize);
+        batchOutputSize += buffer.size();
+
+    css::uno::Sequence<sal_Int8> outBuffer(batchOutputSize);
+
     auto pos = outBuffer.begin();
-    for (const auto& buffer : outBuffers)
+    for (auto& buffer : outBuffers)
+    {
         pos = std::copy(buffer.begin(), buffer.end(), pos);
-    return outBuffer;
-}
+        buffer.clear();
+    }
 
-void ThreadedDeflater::waitForTasks()
-{
-    comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+    maProcessOutputFunc(outBuffer, batchOutputSize);
+    totalOut += batchOutputSize;
 }
 
 void ThreadedDeflater::clear()
 {
-    assert(finished());
     inBuffer = uno::Sequence<sal_Int8>();
     outBuffers.clear();
 }
@@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork()
     // zlib doesn't handle const properly
     unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
         const_cast<signed char*>(deflater->inBuffer.getConstArray()));
-    if (sequence != 0)
+    if (!firstTask)
     {
         // the window size is 32k, so set last 32k of previous data as the dictionary
         assert(MAX_WBITS == 15);
         assert(MaxBlockSize >= 32768);
-        deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+        if (sequence > 0)
+        {
+            deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+        }
+        else
+        {
+            unsigned char* prevBufferPtr = reinterpret_cast<unsigned char*>(
+                const_cast<signed char*>(deflater->prevDataBlock.getConstArray()));
+            deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768);
+        }
     }
     stream.next_in = inBufferPtr + myInBufferStart;
     stream.avail_in = blockSize;
     stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
     stream.avail_out = outputMaxSize;
-    bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
+
     // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
     // and since we use a raw stream, the data blocks then can be simply concatenated.
-    int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
+    int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH);
     assert(stream.avail_in == 0); // Check that everything has been deflated.
-    if (last ? res == Z_STREAM_END : res == Z_OK)
+    if (lastTask ? res == Z_STREAM_END : res == Z_OK)
     { // ok
         sal_Int64 outSize = outputMaxSize - stream.avail_out;
         deflater->outBuffers[sequence].resize(outSize);
-        --deflater->pendingTasksCount;
     }
     else
     {
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index bee9d0aeb70c..f08e687c43a4 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel(
 
 void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
 {
-    sal_Int64 toRead = xInStream->available();
-    uno::Sequence< sal_Int8 > inBuffer( toRead );
-    sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
-    if (read < toRead)
-        inBuffer.realloc( read );
-    while( xInStream->available() > 0 )
-    {   // We didn't get the full size from available().
-        uno::Sequence< sal_Int8 > buf( xInStream->available());
-        read = xInStream->readBytes( buf, xInStream->available());
-        sal_Int64 oldSize = inBuffer.getLength();
-        inBuffer.realloc( oldSize + read );
-        std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
-    }
     ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
-    totalIn = inBuffer.getLength();
-    deflater.startDeflate( inBuffer );
-    processInput( inBuffer );
-    deflater.waitForTasks();
-    uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
-    deflater.clear(); // release memory
-    totalOut = outBuffer.getLength();
-    processDeflated(outBuffer, outBuffer.getLength());
+    deflater.deflateWrite(xInStream,
+            [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
+                if (!m_bEncryptCurrentEntry)
+                    m_aCRC.updateSegment(rBuffer, nLen);
+            },
+            [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
+                processDeflated(rBuffer, nLen);
+            }
+    );
+    totalIn = deflater.getTotalIn();
+    totalOut = deflater.getTotalOut();
     closeEntry();
 }
 


More information about the Libreoffice-commits mailing list