[Libreoffice-commits] core.git: package/Library_package2.mk package/qa package/source

Mohammed Abdul Azeem azeemmysore at gmail.com
Thu Jun 8 12:28:09 UTC 2017


 package/Library_package2.mk                       |    2 
 package/qa/cppunit/test_package.cxx               |  116 +++++++++---
 package/source/zipapi/XBufferedThreadedStream.cxx |  200 ++++++++++++++++++++++
 package/source/zipapi/XBufferedThreadedStream.hxx |   79 ++++++++
 package/source/zipapi/ZipFile.cxx                 |   10 -
 5 files changed, 373 insertions(+), 34 deletions(-)

New commits:
commit 0632208977a204195a4f5b9e727aed511ece075f
Author: Mohammed Abdul Azeem <azeemmysore at gmail.com>
Date:   Sat May 27 13:17:04 2017 +0530

    First cut at moving unzipping into new thread:
    
    XBufferedThreadedStream class buffers data in a new thread,
    which will be available to be read from parent thread.
    
    Change-Id: I62d367fa1dec23da39aba24b5c765b57707956bb
    Reviewed-on: https://gerrit.libreoffice.org/38135
    Tested-by: Jenkins <ci at libreoffice.org>
    Reviewed-by: Michael Meeks <michael.meeks at collabora.com>

diff --git a/package/Library_package2.mk b/package/Library_package2.mk
index 3096a976601f..0ff715e031c1 100644
--- a/package/Library_package2.mk
+++ b/package/Library_package2.mk
@@ -31,6 +31,7 @@ $(eval $(call gb_Library_use_libraries,package2,\
 	sal \
 	sax \
 	ucbhelper \
+	salhelper \
 ))
 
 $(eval $(call gb_Library_use_externals,package2,\
@@ -51,6 +52,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\
 	package/source/zipapi/Deflater \
 	package/source/zipapi/Inflater \
 	package/source/zipapi/sha1context \
+	package/source/zipapi/XBufferedThreadedStream \
 	package/source/zipapi/XUnbufferedStream \
 	package/source/zipapi/ZipEnumeration \
 	package/source/zipapi/ZipFile \
diff --git a/package/qa/cppunit/test_package.cxx b/package/qa/cppunit/test_package.cxx
index 335f490ddaba..0e1f4778184d 100644
--- a/package/qa/cppunit/test_package.cxx
+++ b/package/qa/cppunit/test_package.cxx
@@ -27,19 +27,55 @@ namespace
     public:
         PackageTest() {}
 
+        virtual void setUp() override;
+
         virtual bool load(const OUString &,
             const OUString &rURL, const OUString &,
             SfxFilterFlags, SotClipboardFormatId, unsigned int) override;
 
         void test();
         void testThreadedStreams();
+        void testBufferedThreadedStreams();
 
         CPPUNIT_TEST_SUITE(PackageTest);
         CPPUNIT_TEST(test);
         CPPUNIT_TEST(testThreadedStreams);
+        CPPUNIT_TEST(testBufferedThreadedStreams);
         CPPUNIT_TEST_SUITE_END();
+
+    private:
+        uno::Reference<container::XNameAccess> mxNA;
+        void verifyStreams( std::vector<std::vector<char>> &aBuffers );
     };
 
+    void PackageTest::setUp()
+    {
+        BootstrapFixtureBase::setUp();
+        OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");
+
+        uno::Sequence<beans::NamedValue> aNVs(2);
+        aNVs[0].Name = "URL";
+        aNVs[0].Value <<= aURL;
+        aNVs[1].Name = "UseBufferedStream";
+        aNVs[1].Value <<= true;
+
+        uno::Sequence<uno::Any> aArgs(1);
+        aArgs[0] <<= aNVs;
+
+        uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
+        uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();
+
+        uno::Reference<packages::zip::XZipFileAccess2> xZip(
+            xSvcMgr->createInstanceWithArgumentsAndContext(
+                "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
+            uno::UNO_QUERY);
+
+        CPPUNIT_ASSERT(xZip.is());
+
+        mxNA = uno::Reference<container::XNameAccess>(xZip, uno::UNO_QUERY);
+        CPPUNIT_ASSERT(mxNA.is());
+    }
+
     bool PackageTest::load(const OUString &,
         const OUString &rURL, const OUString &,
         SfxFilterFlags, SotClipboardFormatId, unsigned int)
@@ -62,6 +98,20 @@ namespace
             m_directories.getURLFromSrc("/package/qa/cppunit/data/"));
     }
 
+    void PackageTest::verifyStreams( std::vector<std::vector<char>> &aBuffers )
+    {
+            CPPUNIT_ASSERT_EQUAL(size_t(26), aBuffers.size());
+            auto itBuf = aBuffers.begin();
+
+            for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
+            {
+                const std::vector<char>& rBuf = *itBuf;
+                CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
+                for (char check : rBuf)
+                    CPPUNIT_ASSERT_EQUAL(c, check);
+            }
+    }
+
     // TODO : This test currently doesn't fail even when you set
     // UseBufferedStream to false. Look into this and replace it with a better
     // test that actually fails when the aforementioned flag is set to false.
@@ -95,30 +145,6 @@ namespace
             }
         };
 
-        OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");
-
-        uno::Sequence<beans::NamedValue> aNVs(2);
-        aNVs[0].Name = "URL";
-        aNVs[0].Value <<= aURL;
-        aNVs[1].Name = "UseBufferedStream";
-        aNVs[1].Value <<= true;
-
-        uno::Sequence<uno::Any> aArgs(1);
-        aArgs[0] <<= aNVs;
-
-        uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
-        uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();
-
-        uno::Reference<packages::zip::XZipFileAccess2> xZip(
-            xSvcMgr->createInstanceWithArgumentsAndContext(
-                "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
-            uno::UNO_QUERY);
-
-        CPPUNIT_ASSERT(xZip.is());
-
-        uno::Reference<container::XNameAccess> xNA(xZip, uno::UNO_QUERY);
-        CPPUNIT_ASSERT(xNA.is());
-
         {
             comphelper::ThreadPool aPool(4);
             std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
@@ -132,26 +158,50 @@ namespace
                 aName += ".txt";
 
                 uno::Reference<io::XInputStream> xStrm;
-                xNA->getByName(aName) >>= xStrm;
+                mxNA->getByName(aName) >>= xStrm;
 
                 CPPUNIT_ASSERT(xStrm.is());
                 aPool.pushTask(new Worker(pTag, xStrm, *itBuf));
             }
 
             aPool.waitUntilDone(pTag);
+            verifyStreams( aTestBuffers );
+        }
+    }
 
-            // Verify the streams.
-            CPPUNIT_ASSERT_EQUAL(size_t(26), aTestBuffers.size());
-            itBuf = aTestBuffers.begin();
+    void PackageTest::testBufferedThreadedStreams()
+    {
+        std::vector<std::vector<char>> aTestBuffers(26);
+        auto itBuf = aTestBuffers.begin();
+        sal_Int32 nReadSize = 0;
 
-            for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
+        for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
+        {
+            OUString aName(c);
+            aName += ".txt";
+
+            uno::Reference<io::XInputStream> xStrm;
+            //Size of each stream is 1mb (>10000) => XBufferedThreadedStream
+            mxNA->getByName(aName) >>= xStrm;
+
+            CPPUNIT_ASSERT(xStrm.is());
+            sal_Int32 nSize = xStrm->available();
+
+            uno::Sequence<sal_Int8> aBytes;
+            //Read chuncks of increasing size
+            nReadSize += 1024;
+
+            while (nSize > 0)
             {
-                const std::vector<char>& rBuf = *itBuf;
-                CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
-                for (char check : rBuf)
-                    CPPUNIT_ASSERT_EQUAL(c, check);
+                sal_Int32 nBytesRead = xStrm->readBytes(aBytes, nReadSize);
+                const sal_Int8* p = aBytes.getArray();
+                const sal_Int8* pEnd = p + nBytesRead;
+                std::copy(p, pEnd, std::back_inserter(*itBuf));
+                nSize -= nBytesRead;
             }
         }
+
+        verifyStreams( aTestBuffers );
     }
 
     CPPUNIT_TEST_SUITE_REGISTRATION(PackageTest);
diff --git a/package/source/zipapi/XBufferedThreadedStream.cxx b/package/source/zipapi/XBufferedThreadedStream.cxx
new file mode 100644
index 000000000000..59a89f9c64e1
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.cxx
@@ -0,0 +1,200 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include <XBufferedThreadedStream.hxx>
+#include <com/sun/star/packages/zip/ZipIOException.hpp>
+
+using namespace css::uno;
+using com::sun::star::packages::zip::ZipIOException;
+
+namespace {
+
+class UnzippingThread: public salhelper::Thread
+{
+    XBufferedThreadedStream &mxStream;
+public:
+    explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
+private:
+    virtual void execute() override
+    {
+        try
+        {
+            mxStream.produce();
+        }
+        catch( const RuntimeException &e )
+        {
+            SAL_WARN("package", "RuntimeException from unbuffered Stream " << e.Message );
+            mxStream.saveException( new RuntimeException( e ) );
+        }
+        catch( const ZipIOException &e )
+        {
+            SAL_WARN("package", "ZipIOException from unbuffered Stream " << e.Message );
+            mxStream.saveException( new ZipIOException( e ) );
+        }
+        catch( const Exception &e )
+        {
+            SAL_WARN("package", "Unexpected exception " << e.Message );
+            mxStream.saveException( new Exception( e ) );
+        }
+
+        mxStream.setTerminateThread();
+    }
+};
+
+}
+
+XBufferedThreadedStream::XBufferedThreadedStream(
+                    const Reference<XInputStream>& xSrcStream )
+: mxSrcStream( xSrcStream )
+, mnPos(0)
+, mnStreamSize( xSrcStream->available() )
+, mnOffset( 0 )
+, mxUnzippingThread( new UnzippingThread(*this) )
+, mbTerminateThread( false )
+, maSavedException( nullptr )
+{
+    mxUnzippingThread->launch();
+}
+
+XBufferedThreadedStream::~XBufferedThreadedStream()
+{
+    setTerminateThread();
+    mxUnzippingThread->join();
+}
+
+/**
+ * Reads from UnbufferedStream in a seperate thread and stores the buffer blocks
+ * in maPendingBuffers queue for further use.
+ */
+void XBufferedThreadedStream::produce()
+{
+    Buffer pProducedBuffer;
+    std::unique_lock<std::mutex> aGuard( maBufferProtector );
+    do
+    {
+        if( !maUsedBuffers.empty() )
+        {
+            pProducedBuffer = maUsedBuffers.front();
+            maUsedBuffers.pop();
+        }
+
+        aGuard.unlock();
+        mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
+
+        aGuard.lock();
+        maPendingBuffers.push( pProducedBuffer );
+        maBufferConsumeResume.notify_one();
+        maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
+
+        if( mbTerminateThread )
+            break;
+
+    } while( hasBytes() );
+}
+
+/**
+ * Fetches next available block from maPendingBuffers for use in Reading thread.
+ */
+const Buffer& XBufferedThreadedStream::getNextBlock()
+{
+    const sal_Int32 nBufSize = maInUseBuffer.getLength();
+    if( nBufSize <= 0 || mnOffset >= nBufSize )
+    {
+        std::unique_lock<std::mutex> aGuard( maBufferProtector );
+        if( mnOffset >= nBufSize )
+            maUsedBuffers.push( maInUseBuffer );
+
+        maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
+
+        if( maPendingBuffers.empty() )
+        {
+            maInUseBuffer = Buffer();
+            if( maSavedException )
+                throw *maSavedException;
+        }
+        else
+        {
+            maInUseBuffer = maPendingBuffers.front();
+            maPendingBuffers.pop();
+            mnOffset = 0;
+
+            if( maPendingBuffers.size() <= nBufferLowWater )
+                maBufferProduceResume.notify_one();
+        }
+    }
+
+    return maInUseBuffer;
+}
+
+void XBufferedThreadedStream::setTerminateThread()
+{
+    mbTerminateThread = true;
+    maBufferProduceResume.notify_one();
+    maBufferConsumeResume.notify_one();
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
+{
+    if( !hasBytes() )
+        return 0;
+
+    const sal_Int32 nAvailableSize = std::min<sal_Int32>( nBytesToRead, remainingSize() );
+    rData.realloc( nAvailableSize );
+    sal_Int32 i = 0, nPendingBytes = nAvailableSize;
+
+    while( nPendingBytes )
+    {
+        const Buffer &pBuffer = getNextBlock();
+        if( pBuffer.getLength() <= 0 )
+        {
+            rData.realloc( nAvailableSize - nPendingBytes );
+            return nAvailableSize - nPendingBytes;
+        }
+        const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
+
+        memcpy( &rData[i], &pBuffer[mnOffset], limit );
+
+        nPendingBytes -= limit;
+        mnOffset += limit;
+        mnPos += limit;
+        i += limit;
+    }
+
+    return nAvailableSize;
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
+{
+    return readBytes( aData, nMaxBytesToRead );
+}
+void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
+{
+    if( nBytesToSkip )
+    {
+        Sequence < sal_Int8 > aSequence( nBytesToSkip );
+        readBytes( aSequence, nBytesToSkip );
+    }
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::available()
+{
+    if( !hasBytes() )
+        return 0;
+
+    return remainingSize();
+}
+
+void SAL_CALL XBufferedThreadedStream::closeInput()
+{
+    setTerminateThread();
+    mxUnzippingThread->join();
+    mxSrcStream->closeInput();
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/XBufferedThreadedStream.hxx b/package/source/zipapi/XBufferedThreadedStream.hxx
new file mode 100644
index 000000000000..b047b25fdf85
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.hxx
@@ -0,0 +1,79 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
+#define INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
+
+#include <salhelper/thread.hxx>
+#include <XUnbufferedStream.hxx>
+#include <queue>
+#include <vector>
+#include <mutex>
+#include <condition_variable>
+
+typedef css::uno::Sequence< sal_Int8 > Buffer;
+
+class XBufferedThreadedStream : public cppu::WeakImplHelper< css::io::XInputStream >
+{
+private:
+    const css::uno::Reference<XInputStream> mxSrcStream;
+    size_t mnPos;                                           /// position in stream
+    size_t mnStreamSize;                                    /// available size of stream
+
+    Buffer maInUseBuffer;                                   /// Buffer block in use
+    int mnOffset;                                           /// position in maInUseBuffer
+    std::queue < Buffer > maPendingBuffers;                 /// Buffers that are available for use
+    std::queue < Buffer > maUsedBuffers;
+
+    rtl::Reference< salhelper::Thread > mxUnzippingThread;
+    std::mutex maBufferProtector;                           /// mutex protecting Buffer queues.
+    std::condition_variable maBufferConsumeResume;
+    std::condition_variable maBufferProduceResume;
+    bool mbTerminateThread;                                 /// indicates the failure of one of the threads
+
+    css::uno::Exception *maSavedException;                  /// exception caught during unzipping is saved to be thrown during reading
+
+    static const size_t nBufferLowWater = 2;
+    static const size_t nBufferHighWater = 4;
+    static const size_t nBufferSize = 32 * 1024;
+
+    const Buffer& getNextBlock();
+    size_t remainingSize() const { return mnStreamSize - mnPos; }
+    bool hasBytes() const { return mnPos < mnStreamSize; }
+
+    bool canProduce() const
+    {
+        return( mbTerminateThread || maPendingBuffers.size() < nBufferHighWater );
+    }
+
+    bool canConsume() const
+    {
+        return( mbTerminateThread || !maPendingBuffers.empty() );
+    }
+
+public:
+    XBufferedThreadedStream(
+                  const css::uno::Reference<XInputStream>& xSrcStream );
+
+    virtual ~XBufferedThreadedStream() override;
+
+    void produce();
+    void setTerminateThread();
+    void saveException( css::uno::Exception *e ) { maSavedException = e; }
+
+    // XInputStream
+    virtual sal_Int32 SAL_CALL readBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override;
+    virtual sal_Int32 SAL_CALL readSomeBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override;
+    virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override;
+    virtual sal_Int32 SAL_CALL available(  ) override;
+    virtual void SAL_CALL closeInput(  ) override;
+};
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipFile.cxx b/package/source/zipapi/ZipFile.cxx
index ba41d5f10d1d..ddea09b3d824 100644
--- a/package/source/zipapi/ZipFile.cxx
+++ b/package/source/zipapi/ZipFile.cxx
@@ -44,6 +44,7 @@
 #include <ZipFile.hxx>
 #include <ZipEnumeration.hxx>
 #include <XUnbufferedStream.hxx>
+#include <XBufferedThreadedStream.hxx>
 #include <PackageConstants.hxx>
 #include <EncryptedDataHeader.hxx>
 #include <EncryptionData.hxx>
@@ -625,7 +626,14 @@ uno::Reference< XInputStream > ZipFile::createStreamForZipEntry(
     if (!mbUseBufferedStream)
         return xSrcStream;
 
-    uno::Reference<io::XInputStream> xBufStream(new XBufferedStream(xSrcStream));
+    uno::Reference<io::XInputStream> xBufStream;
+    static const sal_Int32 nThreadingThreshold = 10000;
+
+    if( xSrcStream->available() > nThreadingThreshold )
+        xBufStream = new XBufferedThreadedStream(xSrcStream);
+    else
+        xBufStream = new XBufferedStream(xSrcStream);
+
     return xBufStream;
 }
 


More information about the Libreoffice-commits mailing list