[PATCH 5/6] Compression in separate thread

Eugene Velesevich evel at ispras.ru
Fri May 31 14:11:21 PDT 2013


This patch implements compression in a separate thread via
ThreadedFile, derived from File, supporting LZ4 and SNAPPY trace
format, specified with APITRACE_COMPRESSOR environment variable.

ThreadedFile can be used only for writing.

Platform-specific semaphores are used to implement synchronization.
---
 CMakeLists.txt                 |    1 +
 cli/CMakeLists.txt             |    1 +
 cli/cli_repack.cpp             |    3 +-
 common/trace_file.hpp          |    3 +
 common/trace_threaded_file.cpp |  239 ++++++++++++++++++++++++++++++++++++++++
 common/trace_threaded_file.hpp |  177 +++++++++++++++++++++++++++++
 common/trace_writer.cpp        |    2 +-
 common/trace_writer.hpp        |    7 +-
 common/trace_writer_local.cpp  |    2 +-
 9 files changed, 431 insertions(+), 4 deletions(-)
 create mode 100644 common/trace_threaded_file.cpp
 create mode 100644 common/trace_threaded_file.hpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 15a3ee5..fa32f7d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -331,6 +331,7 @@ add_library (common STATIC
     common/trace_option.cpp
     common/${os}
     common/trace_backtrace.cpp
+    common/trace_threaded_file.cpp
     common/trace_file_common.cpp
 )
 
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index f68b3cf..f177aa7 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -29,6 +29,7 @@ add_executable (apitrace
 target_link_libraries (apitrace
     common
     ${COMPRESSION_LIBRARIES}
+    ${CMAKE_THREAD_LIBS_INIT}
     ${GETOPT_LIBRARIES}
 )
 
diff --git a/cli/cli_repack.cpp b/cli/cli_repack.cpp
index 6275c1d..1f61d04 100644
--- a/cli/cli_repack.cpp
+++ b/cli/cli_repack.cpp
@@ -32,6 +32,7 @@
 #include "cli.hpp"
 
 #include "trace_file.hpp"
+#include "trace_threaded_file.hpp"
 
 
 static const char *synopsis = "Repack a trace file with Snappy compression.";
@@ -65,7 +66,7 @@ repack(const char *inFileName, const char *outFileName)
         return 1;
     }
 
-    trace::File *outFile = trace::File::createCommonFile(trace::File::SNAPPY);
+    trace::File *outFile = new ThreadedFile(new trace::SnappyLibrary());
     if (!outFile) {
         delete inFile;
         return 1;
diff --git a/common/trace_file.hpp b/common/trace_file.hpp
index 78c0ad4..1a98943 100644
--- a/common/trace_file.hpp
+++ b/common/trace_file.hpp
@@ -38,6 +38,8 @@
 #include <assert.h>
 
 
+class ThreadedFile;
+
 namespace trace {
 
 class CompressionLibrary {
@@ -248,6 +250,7 @@ public:
 public:
     static File *createZLib(void);
     static File *createForRead(const char *filename);
+    static ThreadedFile *createThreadedFile();
     static File *createCommonFile(File::Compressor compressor);
 public:
     File(const std::string &filename = std::string(),
diff --git a/common/trace_threaded_file.cpp b/common/trace_threaded_file.cpp
new file mode 100644
index 0000000..9553c75
--- /dev/null
+++ b/common/trace_threaded_file.cpp
@@ -0,0 +1,239 @@
+/**************************************************************************
+ *
+ * Copyright 2012 Samsung
+ * Contributed by Vladimir Platonov
+ * All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ **************************************************************************/
+
+#include "trace_threaded_file.hpp"
+
+using namespace trace;
+
+
+
+CompressionCache::CompressionCache(size_t chunkSize, trace::CompressionLibrary *library) {
+    m_chunkSize = chunkSize;
+    m_library = library;
+    for (int i = 0; i < NUM_BUFFERS; ++i) {
+        m_caches[i] = new char[m_chunkSize];
+        m_cachePtr[i] = m_caches[i];
+        MutexInit(m_readAccess[i]);
+        MutexInit(m_writeAccess[i]);
+    }
+    m_writeID = 0;
+    m_readID = 0;
+}
+
+CompressionCache::~CompressionCache() {
+    for (int i = 0; i < NUM_BUFFERS; ++i) {
+        delete [] m_caches[i];
+    }
+}
+
+void CompressionCache::write(const char *buffer, size_t length) {
+    if (freeCacheSize(m_writeID) > length) {
+        memcpy(m_cachePtr[m_writeID], buffer, length);
+        m_cachePtr[m_writeID] += length;
+    }
+    else if (freeCacheSize(m_writeID) == length) {
+        memcpy(m_cachePtr[m_writeID], buffer, length);
+        m_cachePtr[m_writeID] += length;
+        nextWriteBuffer();
+    }
+    else {
+        size_t sizeToWrite = length;
+        while (sizeToWrite >= freeCacheSize(m_writeID)) {
+            size_t endSize = freeCacheSize(m_writeID);
+            size_t offset = length - sizeToWrite;
+            memcpy(m_cachePtr[m_writeID], (const char*)buffer + offset, endSize);
+            sizeToWrite -= endSize;
+            m_cachePtr[m_writeID] += endSize;
+            nextWriteBuffer();
+        }
+        if (sizeToWrite) {
+            size_t offset = length - sizeToWrite;
+            memcpy(m_cachePtr[m_writeID], (const char*)buffer + offset, sizeToWrite);
+            m_cachePtr[m_writeID] += sizeToWrite;
+        }
+    }
+}
+
+size_t CompressionCache::read(char *buffer, size_t length) {
+    if (freeCacheSize(m_readID) >= length) {
+        memcpy(buffer, m_cachePtr[m_readID], length);
+        m_cachePtr[m_readID] += length;
+    }
+    else {
+        size_t sizeToRead = length;
+        size_t offset = 0;
+        while (sizeToRead) {
+            size_t chunkSize = std::min(freeCacheSize(m_readID), sizeToRead);
+            offset = length - sizeToRead;
+            memcpy((char*)buffer + offset, m_cachePtr[m_readID], chunkSize);
+            m_cachePtr[m_readID] += chunkSize;
+            sizeToRead -= chunkSize;
+            if (sizeToRead > 0) {
+                nextReadBuffer();
+            }
+        }
+    }
+    return length;
+}
+
+size_t CompressionCache::readAndCompressBuffer(char *buffer, size_t &inputLength) {
+    size_t compressedLength;
+    if (m_sizes[m_readID] == 0) {
+        return 0;
+    }
+    else {
+        m_library->compress(m_caches[m_readID], m_sizes[m_readID], buffer, &compressedLength);
+    }
+    inputLength = m_sizes[m_readID];
+    nextReadBuffer();
+    return compressedLength;
+}
+
+THREAD_ROUTINE void * ThreadedFile::compressorThread(void * param) {
+    ThreadedFile *file = (ThreadedFile *) param;
+    char * compressedData = new char[file->m_library->maxCompressedLength(file->CACHE_SIZE)];
+    file->m_cache->acquireReadControl();
+    size_t compressedLength = 0;
+    size_t inputLength = 0;
+    do {
+        compressedLength = file->m_cache->readAndCompressBuffer(compressedData, inputLength);
+        file->writeLength(compressedLength);
+        file->m_stream.write(compressedData, compressedLength);
+    }
+    while (compressedLength);
+
+    delete [] compressedData;
+    return NULL;
+}
+
+ThreadedFile::ThreadedFile(trace::CompressionLibrary * lib, const std::string &filename,
+        File::Mode mode)
+    : File() {
+    m_library = lib;
+    m_cache = NULL;
+}
+
+bool ThreadedFile::rawOpen(const std::string &filename, enum Mode mode) {
+    std::ios_base::openmode fmode = std::fstream::binary;
+    if (mode == File::Write) {
+        fmode |= (std::fstream::out | std::fstream::trunc);
+    }
+    else if (mode == File::Read) {
+        return false;
+    }
+
+    m_stream.open(filename.c_str(), fmode);
+
+    if (m_stream.is_open() && mode == File::Read) {
+        return false;
+    } else if (m_stream.is_open() && mode == File::Write) {
+        m_cache = new CompressionCache(CACHE_SIZE, m_library);
+        unsigned int sig = m_library->getSignature();
+        m_stream << ((unsigned char)((sig >> 8) & 0xFF));
+        m_stream << ((unsigned char)(sig & 0xFF));
+        m_cache->acquireWriteControl();
+        m_thread = os::ThreadCreate(compressorThread, this);
+    }
+    return m_stream.is_open();
+}
+
+
+size_t ThreadedFile::rawRead(void *buffer, size_t length) {
+    os::log("apitrace: threaded file read function access \n");
+    os::abort();
+}
+
+int ThreadedFile::rawGetc() {
+    os::log("apitrace: threaded file read function access \n");
+    os::abort();
+}
+
+void ThreadedFile::rawClose() {
+    if (!m_stream.is_open()) {
+        return;
+    }
+    m_cache->flushWrite();
+    m_cache->releaseLocks();
+    os::ThreadWait(m_thread);
+    m_stream.close();
+    delete m_cache;
+}
+
+ThreadedFile::~ThreadedFile() {
+
+}
+
+void ThreadedFile::rawFlush() {
+    assert(m_mode == File::Write);
+    m_cache->flushWrite();
+    m_stream.flush();
+}
+
+bool ThreadedFile::rawSkip(size_t length) {
+    os::log("apitrace: threaded file read function access \n");
+    os::abort();
+}
+
+bool ThreadedFile::supportsOffsets() const {
+    return false;
+}
+
+File::Offset ThreadedFile::currentOffset() {
+    os::log("apitrace: threaded file read function access \n");
+    os::abort();
+}
+
+void ThreadedFile::setCurrentOffset(const File::Offset &offset) {
+    os::log("apitrace: threaded file read function access \n");
+    os::abort();
+}
+
+void ThreadedFile::writeLength(size_t length) {
+    unsigned char buf[m_library->m_lengthSize];
+    m_library->setLength(buf, length);
+    m_stream.write((const char *)buf, sizeof buf);
+}
+
+int ThreadedFile::rawPercentRead() {
+    os::log("apitrace: threaded file read function access \n");
+    os::abort();
+}
+
+ThreadedFile* File::createThreadedFile() {
+    char *compressor = std::getenv("APITRACE_COMPRESSOR");
+    if (compressor == NULL) {
+        return new ThreadedFile(new SnappyLibrary());
+    }
+    if (strcmp(compressor, "LZ4HC") == 0) {
+        return new ThreadedFile(new LZ4Library(true));
+    }
+    else if (strcmp(compressor, "LZ4") == 0) {
+        return new ThreadedFile(new LZ4Library(false));
+    }
+    else {
+        return new ThreadedFile(new SnappyLibrary());
+    }
+}
diff --git a/common/trace_threaded_file.hpp b/common/trace_threaded_file.hpp
new file mode 100644
index 0000000..84bbe0c
--- /dev/null
+++ b/common/trace_threaded_file.hpp
@@ -0,0 +1,177 @@
+/**************************************************************************
+ *
+ * Copyright 2012 Samsung 
+ * Contributed by Vladimir Platonov
+ * All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ **************************************************************************/
+
+/*
+ * Threaded file represents trace file as number of data chunks (similar to
+ * original Snappy file), but separate thread (and different compression
+ * libraries) are used for compression.
+ * Compression cache consists of two CACHE_SIZE size buffers, so when one
+ * of them is full, tracing thread uses another while first one is
+ * compressed by compression thread. Each buffer has read access and write
+ * access semaphores, which are set by corresponding thread.
+ *
+ * For now Threaded file uses only for tracing purposes and any attempt to
+ * call read functions will cause an exception.
+ *
+ * rawWrite function implemented to get rid of virtual write calls and
+ * gain performance
+ *
+ * Compressor type set via APITRACE_COMPRESSOR environment variable.
+ */
+
+#ifndef TRACE_THREADED_FILE_HPP_
+#define TRACE_THREADED_FILE_HPP_
+
+#include <assert.h>
+#include <string.h>
+#include <cstdlib>
+#include "os.hpp"
+#include "os_thread.hpp"
+#include "trace_file.hpp"
+
+using namespace trace;
+
+class CompressionCache {
+
+private:
+    static const int NUM_BUFFERS = 2;
+    size_t m_chunkSize;
+
+    char *m_caches[NUM_BUFFERS];
+    char *m_cachePtr[NUM_BUFFERS];
+    os::Mutex m_writeAccess[NUM_BUFFERS];
+    os::Mutex m_readAccess[NUM_BUFFERS];
+
+    size_t m_sizes[NUM_BUFFERS];
+
+    int m_writeID;
+    int m_readID;
+
+    trace::CompressionLibrary * m_library;
+
+    inline size_t usedCacheSize(int id) const {
+        assert(m_cachePtr[id] >= m_caches[id]);
+        return m_cachePtr[id] - m_caches[id];
+    }
+
+    inline size_t freeCacheSize(int id) const {
+        assert(m_chunkSize >= usedCacheSize(id));
+        return m_chunkSize - usedCacheSize(id);
+    }
+
+    inline void nextWriteBuffer() {
+        m_sizes[m_writeID] = m_cachePtr[m_writeID] - m_caches[m_writeID];
+        m_cachePtr[m_writeID] = m_caches[m_writeID];
+        MutexUnlock(m_readAccess[m_writeID]);
+        m_writeID = (m_writeID + 1) % NUM_BUFFERS;
+        MutexLock(m_writeAccess[m_writeID]);
+        m_sizes[m_writeID] = 0;
+    }
+
+    inline void nextReadBuffer() {
+        m_cachePtr[m_readID] = m_caches[m_readID];
+        MutexUnlock(m_writeAccess[m_readID]);
+        m_readID = (m_readID + 1) % NUM_BUFFERS;
+        MutexLock(m_readAccess[m_readID]);
+    }
+
+public:
+
+    CompressionCache(size_t chunkSize, trace::CompressionLibrary *library);
+    ~CompressionCache();
+    void write(const char *buffer, size_t length);
+    size_t read(char *buffer, size_t length);
+    size_t readAndCompressBuffer(char *buffer, size_t &inputLength);
+
+    void flushWrite() {
+        nextWriteBuffer();
+    }
+
+    void acquireWriteControl() {
+        MutexLock(m_writeAccess[m_writeID]);
+        MutexLock(m_readAccess[m_writeID]);
+        MutexLock(m_readAccess[m_writeID+1]);
+    }
+
+    void releaseLocks() {
+        MutexUnlock(m_readAccess[m_writeID]);
+    }
+
+    void acquireReadControl() {
+        MutexLock(m_readAccess[m_readID]);
+    }
+};
+
+class ThreadedFile : public File {
+public:
+    ThreadedFile(trace::CompressionLibrary * lib = new LZ4Library(false),
+                    const std::string &filename = std::string(),
+                    File::Mode mode = File::Read);
+    virtual ~ThreadedFile();
+
+    virtual bool supportsOffsets() const;
+    virtual File::Offset currentOffset();
+    virtual void setCurrentOffset(const File::Offset &offset);
+
+    bool write(const void *buffer, size_t length)
+    {
+        if (!m_isOpened || m_mode != File::Write) {
+            return false;
+        }
+        return ThreadedFile::rawWrite(buffer, length);
+    }
+
+protected:
+    virtual bool rawOpen(const std::string &filename, File::Mode mode);
+    virtual bool rawWrite(const void *buffer, size_t length)
+    {
+        m_cache->write((const char *)buffer, length);
+        return true;
+    }
+
+    virtual size_t rawRead(void *buffer, size_t length);
+    virtual int rawGetc();
+    virtual void rawClose();
+    virtual void rawFlush();
+    virtual bool rawSkip(size_t length);
+    virtual int rawPercentRead();
+
+private:
+    std::fstream m_stream;
+    std::streampos m_endPos;
+    CompressionCache *m_cache;
+    File::Offset m_currentOffset;
+    static const size_t CACHE_SIZE = 1 * 1024 * 1024;
+    trace::CompressionLibrary *m_library;
+    os::Thread m_thread;
+
+    void writeLength(size_t length);
+
+    THREAD_ROUTINE static void * compressorThread(void * param);
+ };
+
+
+#endif /* TRACE_THREADED_FILE_HPP_ */
diff --git a/common/trace_writer.cpp b/common/trace_writer.cpp
index dbf00b5..5efe188 100644
--- a/common/trace_writer.cpp
+++ b/common/trace_writer.cpp
@@ -43,7 +43,7 @@ namespace trace {
 Writer::Writer() :
     call_no(0)
 {
-    m_file = File::createCommonFile(File::LZ4);
+    m_file = File::createThreadedFile();
     close();
 }
 
diff --git a/common/trace_writer.hpp b/common/trace_writer.hpp
index a75d5ac..8695c47 100644
--- a/common/trace_writer.hpp
+++ b/common/trace_writer.hpp
@@ -37,13 +37,18 @@
 
 #include "trace_model.hpp"
 #include "trace_backtrace.hpp"
+#include "trace_threaded_file.hpp"
 
 namespace trace {
     class File;
 
     class Writer {
     protected:
-        File *m_file;
+        /*
+         * m_file type changed to ThreadedFile since tests shows that getting rid
+         * of virtual calls and heavy inlining helps performance a lot.
+         */
+        ThreadedFile *m_file;
         unsigned call_no;
 
         std::vector<bool> functions;
diff --git a/common/trace_writer_local.cpp b/common/trace_writer_local.cpp
index 5f86f7d..f5644a3 100644
--- a/common/trace_writer_local.cpp
+++ b/common/trace_writer_local.cpp
@@ -146,7 +146,7 @@ void LocalWriter::checkProcessId(void) {
         // create a new file.  We can't call any method of the current
         // file, as it may cause it to flush and corrupt the parent's
         // trace, so we effectively leak the old file object.
-        m_file = File::createCommonFile(File::SNAPPY);
+        m_file = File::createThreadedFile();
         // Don't want to open the same file again
         os::unsetEnvironment("TRACE_FILE");
         open();
-- 
1.7.9.5



More information about the apitrace mailing list