[PATCH 5/6] Compression in separate thread
Eugene Velesevich
evel at ispras.ru
Fri May 31 14:11:21 PDT 2013
This patch implements compression in a separate thread via
ThreadedFile, derived from File, supporting LZ4 and SNAPPY trace
format, specified with APITRACE_COMPRESSOR environment variable.
ThreadedFile can be used only for writing.
Platform-specific semaphores are used to implement synchronization.
---
CMakeLists.txt | 1 +
cli/CMakeLists.txt | 1 +
cli/cli_repack.cpp | 3 +-
common/trace_file.hpp | 3 +
common/trace_threaded_file.cpp | 239 ++++++++++++++++++++++++++++++++++++++++
common/trace_threaded_file.hpp | 177 +++++++++++++++++++++++++++++
common/trace_writer.cpp | 2 +-
common/trace_writer.hpp | 7 +-
common/trace_writer_local.cpp | 2 +-
9 files changed, 431 insertions(+), 4 deletions(-)
create mode 100644 common/trace_threaded_file.cpp
create mode 100644 common/trace_threaded_file.hpp
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 15a3ee5..fa32f7d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -331,6 +331,7 @@ add_library (common STATIC
common/trace_option.cpp
common/${os}
common/trace_backtrace.cpp
+ common/trace_threaded_file.cpp
common/trace_file_common.cpp
)
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index f68b3cf..f177aa7 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -29,6 +29,7 @@ add_executable (apitrace
target_link_libraries (apitrace
common
${COMPRESSION_LIBRARIES}
+ ${CMAKE_THREAD_LIBS_INIT}
${GETOPT_LIBRARIES}
)
diff --git a/cli/cli_repack.cpp b/cli/cli_repack.cpp
index 6275c1d..1f61d04 100644
--- a/cli/cli_repack.cpp
+++ b/cli/cli_repack.cpp
@@ -32,6 +32,7 @@
#include "cli.hpp"
#include "trace_file.hpp"
+#include "trace_threaded_file.hpp"
static const char *synopsis = "Repack a trace file with Snappy compression.";
@@ -65,7 +66,7 @@ repack(const char *inFileName, const char *outFileName)
return 1;
}
- trace::File *outFile = trace::File::createCommonFile(trace::File::SNAPPY);
+ trace::File *outFile = new ThreadedFile(new trace::SnappyLibrary());
if (!outFile) {
delete inFile;
return 1;
diff --git a/common/trace_file.hpp b/common/trace_file.hpp
index 78c0ad4..1a98943 100644
--- a/common/trace_file.hpp
+++ b/common/trace_file.hpp
@@ -38,6 +38,8 @@
#include <assert.h>
+class ThreadedFile;
+
namespace trace {
class CompressionLibrary {
@@ -248,6 +250,7 @@ public:
public:
static File *createZLib(void);
static File *createForRead(const char *filename);
+ static ThreadedFile *createThreadedFile();
static File *createCommonFile(File::Compressor compressor);
public:
File(const std::string &filename = std::string(),
diff --git a/common/trace_threaded_file.cpp b/common/trace_threaded_file.cpp
new file mode 100644
index 0000000..9553c75
--- /dev/null
+++ b/common/trace_threaded_file.cpp
@@ -0,0 +1,239 @@
+/**************************************************************************
+ *
+ * Copyright 2012 Samsung
+ * Contributed by Vladimir Platonov
+ * All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ **************************************************************************/
+
+#include "trace_threaded_file.hpp"
+
+using namespace trace;
+
+
+
+CompressionCache::CompressionCache(size_t chunkSize, trace::CompressionLibrary *library) {
+ m_chunkSize = chunkSize;
+ m_library = library;
+ for (int i = 0; i < NUM_BUFFERS; ++i) {
+ m_caches[i] = new char[m_chunkSize];
+ m_cachePtr[i] = m_caches[i];
+ MutexInit(m_readAccess[i]);
+ MutexInit(m_writeAccess[i]);
+ }
+ m_writeID = 0;
+ m_readID = 0;
+}
+
+CompressionCache::~CompressionCache() {
+ for (int i = 0; i < NUM_BUFFERS; ++i) {
+ delete [] m_caches[i];
+ }
+}
+
+void CompressionCache::write(const char *buffer, size_t length) {
+ if (freeCacheSize(m_writeID) > length) {
+ memcpy(m_cachePtr[m_writeID], buffer, length);
+ m_cachePtr[m_writeID] += length;
+ }
+ else if (freeCacheSize(m_writeID) == length) {
+ memcpy(m_cachePtr[m_writeID], buffer, length);
+ m_cachePtr[m_writeID] += length;
+ nextWriteBuffer();
+ }
+ else {
+ size_t sizeToWrite = length;
+ while (sizeToWrite >= freeCacheSize(m_writeID)) {
+ size_t endSize = freeCacheSize(m_writeID);
+ size_t offset = length - sizeToWrite;
+ memcpy(m_cachePtr[m_writeID], (const char*)buffer + offset, endSize);
+ sizeToWrite -= endSize;
+ m_cachePtr[m_writeID] += endSize;
+ nextWriteBuffer();
+ }
+ if (sizeToWrite) {
+ size_t offset = length - sizeToWrite;
+ memcpy(m_cachePtr[m_writeID], (const char*)buffer + offset, sizeToWrite);
+ m_cachePtr[m_writeID] += sizeToWrite;
+ }
+ }
+}
+
+size_t CompressionCache::read(char *buffer, size_t length) {
+ if (freeCacheSize(m_readID) >= length) {
+ memcpy(buffer, m_cachePtr[m_readID], length);
+ m_cachePtr[m_readID] += length;
+ }
+ else {
+ size_t sizeToRead = length;
+ size_t offset = 0;
+ while (sizeToRead) {
+ size_t chunkSize = std::min(freeCacheSize(m_readID), sizeToRead);
+ offset = length - sizeToRead;
+ memcpy((char*)buffer + offset, m_cachePtr[m_readID], chunkSize);
+ m_cachePtr[m_readID] += chunkSize;
+ sizeToRead -= chunkSize;
+ if (sizeToRead > 0) {
+ nextReadBuffer();
+ }
+ }
+ }
+ return length;
+}
+
+size_t CompressionCache::readAndCompressBuffer(char *buffer, size_t &inputLength) {
+ size_t compressedLength;
+ if (m_sizes[m_readID] == 0) {
+ return 0;
+ }
+ else {
+ m_library->compress(m_caches[m_readID], m_sizes[m_readID], buffer, &compressedLength);
+ }
+ inputLength = m_sizes[m_readID];
+ nextReadBuffer();
+ return compressedLength;
+}
+
+THREAD_ROUTINE void * ThreadedFile::compressorThread(void * param) {
+ ThreadedFile *file = (ThreadedFile *) param;
+ char * compressedData = new char[file->m_library->maxCompressedLength(file->CACHE_SIZE)];
+ file->m_cache->acquireReadControl();
+ size_t compressedLength = 0;
+ size_t inputLength = 0;
+ do {
+ compressedLength = file->m_cache->readAndCompressBuffer(compressedData, inputLength);
+ file->writeLength(compressedLength);
+ file->m_stream.write(compressedData, compressedLength);
+ }
+ while (compressedLength);
+
+ delete [] compressedData;
+ return NULL;
+}
+
+ThreadedFile::ThreadedFile(trace::CompressionLibrary * lib, const std::string &filename,
+ File::Mode mode)
+ : File() {
+ m_library = lib;
+ m_cache = NULL;
+}
+
+bool ThreadedFile::rawOpen(const std::string &filename, enum Mode mode) {
+ std::ios_base::openmode fmode = std::fstream::binary;
+ if (mode == File::Write) {
+ fmode |= (std::fstream::out | std::fstream::trunc);
+ }
+ else if (mode == File::Read) {
+ return false;
+ }
+
+ m_stream.open(filename.c_str(), fmode);
+
+ if (m_stream.is_open() && mode == File::Read) {
+ return false;
+ } else if (m_stream.is_open() && mode == File::Write) {
+ m_cache = new CompressionCache(CACHE_SIZE, m_library);
+ unsigned int sig = m_library->getSignature();
+ m_stream << ((unsigned char)((sig >> 8) & 0xFF));
+ m_stream << ((unsigned char)(sig & 0xFF));
+ m_cache->acquireWriteControl();
+ m_thread = os::ThreadCreate(compressorThread, this);
+ }
+ return m_stream.is_open();
+}
+
+
+size_t ThreadedFile::rawRead(void *buffer, size_t length) {
+ os::log("apitrace: threaded file read function access \n");
+ os::abort();
+}
+
+int ThreadedFile::rawGetc() {
+ os::log("apitrace: threaded file read function access \n");
+ os::abort();
+}
+
+void ThreadedFile::rawClose() {
+ if (!m_stream.is_open()) {
+ return;
+ }
+ m_cache->flushWrite();
+ m_cache->releaseLocks();
+ os::ThreadWait(m_thread);
+ m_stream.close();
+ delete m_cache;
+}
+
+ThreadedFile::~ThreadedFile() {
+
+}
+
+void ThreadedFile::rawFlush() {
+ assert(m_mode == File::Write);
+ m_cache->flushWrite();
+ m_stream.flush();
+}
+
+bool ThreadedFile::rawSkip(size_t length) {
+ os::log("apitrace: threaded file read function access \n");
+ os::abort();
+}
+
+bool ThreadedFile::supportsOffsets() const {
+ return false;
+}
+
+File::Offset ThreadedFile::currentOffset() {
+ os::log("apitrace: threaded file read function access \n");
+ os::abort();
+}
+
+void ThreadedFile::setCurrentOffset(const File::Offset &offset) {
+ os::log("apitrace: threaded file read function access \n");
+ os::abort();
+}
+
+void ThreadedFile::writeLength(size_t length) {
+ unsigned char buf[m_library->m_lengthSize];
+ m_library->setLength(buf, length);
+ m_stream.write((const char *)buf, sizeof buf);
+}
+
+int ThreadedFile::rawPercentRead() {
+ os::log("apitrace: threaded file read function access \n");
+ os::abort();
+}
+
+ThreadedFile* File::createThreadedFile() {
+ char *compressor = std::getenv("APITRACE_COMPRESSOR");
+ if (compressor == NULL) {
+ return new ThreadedFile(new SnappyLibrary());
+ }
+ if (strcmp(compressor, "LZ4HC") == 0) {
+ return new ThreadedFile(new LZ4Library(true));
+ }
+ else if (strcmp(compressor, "LZ4") == 0) {
+ return new ThreadedFile(new LZ4Library(false));
+ }
+ else {
+ return new ThreadedFile(new SnappyLibrary());
+ }
+}
diff --git a/common/trace_threaded_file.hpp b/common/trace_threaded_file.hpp
new file mode 100644
index 0000000..84bbe0c
--- /dev/null
+++ b/common/trace_threaded_file.hpp
@@ -0,0 +1,177 @@
+/**************************************************************************
+ *
+ * Copyright 2012 Samsung
+ * Contributed by Vladimir Platonov
+ * All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ **************************************************************************/
+
+/*
+ * Threaded file represents trace file as number of data chunks (similar to
+ * original Snappy file), but separate thread (and different compression
+ * libraries) are used for compression.
+ * Compression cache consists of two CACHE_SIZE size buffers, so when one
+ * of them is full, tracing thread uses another while first one is
+ * compressed by compression thread. Each buffer has read access and write
+ * access semaphores, which are set by corresponding thread.
+ *
+ * For now Threaded file uses only for tracing purposes and any attempt to
+ * call read functions will cause an exception.
+ *
+ * rawWrite function implemented to get rid of virtual write calls and
+ * gain performance
+ *
+ * Compressor type set via APITRACE_COMPRESSOR environment variable.
+ */
+
+#ifndef TRACE_THREADED_FILE_HPP_
+#define TRACE_THREADED_FILE_HPP_
+
+#include <assert.h>
+#include <string.h>
+#include <cstdlib>
+#include "os.hpp"
+#include "os_thread.hpp"
+#include "trace_file.hpp"
+
+using namespace trace;
+
+class CompressionCache {
+
+private:
+ static const int NUM_BUFFERS = 2;
+ size_t m_chunkSize;
+
+ char *m_caches[NUM_BUFFERS];
+ char *m_cachePtr[NUM_BUFFERS];
+ os::Mutex m_writeAccess[NUM_BUFFERS];
+ os::Mutex m_readAccess[NUM_BUFFERS];
+
+ size_t m_sizes[NUM_BUFFERS];
+
+ int m_writeID;
+ int m_readID;
+
+ trace::CompressionLibrary * m_library;
+
+ inline size_t usedCacheSize(int id) const {
+ assert(m_cachePtr[id] >= m_caches[id]);
+ return m_cachePtr[id] - m_caches[id];
+ }
+
+ inline size_t freeCacheSize(int id) const {
+ assert(m_chunkSize >= usedCacheSize(id));
+ return m_chunkSize - usedCacheSize(id);
+ }
+
+ inline void nextWriteBuffer() {
+ m_sizes[m_writeID] = m_cachePtr[m_writeID] - m_caches[m_writeID];
+ m_cachePtr[m_writeID] = m_caches[m_writeID];
+ MutexUnlock(m_readAccess[m_writeID]);
+ m_writeID = (m_writeID + 1) % NUM_BUFFERS;
+ MutexLock(m_writeAccess[m_writeID]);
+ m_sizes[m_writeID] = 0;
+ }
+
+ inline void nextReadBuffer() {
+ m_cachePtr[m_readID] = m_caches[m_readID];
+ MutexUnlock(m_writeAccess[m_readID]);
+ m_readID = (m_readID + 1) % NUM_BUFFERS;
+ MutexLock(m_readAccess[m_readID]);
+ }
+
+public:
+
+ CompressionCache(size_t chunkSize, trace::CompressionLibrary *library);
+ ~CompressionCache();
+ void write(const char *buffer, size_t length);
+ size_t read(char *buffer, size_t length);
+ size_t readAndCompressBuffer(char *buffer, size_t &inputLength);
+
+ void flushWrite() {
+ nextWriteBuffer();
+ }
+
+ void acquireWriteControl() {
+ MutexLock(m_writeAccess[m_writeID]);
+ MutexLock(m_readAccess[m_writeID]);
+ MutexLock(m_readAccess[m_writeID+1]);
+ }
+
+ void releaseLocks() {
+ MutexUnlock(m_readAccess[m_writeID]);
+ }
+
+ void acquireReadControl() {
+ MutexLock(m_readAccess[m_readID]);
+ }
+};
+
+class ThreadedFile : public File {
+public:
+ ThreadedFile(trace::CompressionLibrary * lib = new LZ4Library(false),
+ const std::string &filename = std::string(),
+ File::Mode mode = File::Read);
+ virtual ~ThreadedFile();
+
+ virtual bool supportsOffsets() const;
+ virtual File::Offset currentOffset();
+ virtual void setCurrentOffset(const File::Offset &offset);
+
+ bool write(const void *buffer, size_t length)
+ {
+ if (!m_isOpened || m_mode != File::Write) {
+ return false;
+ }
+ return ThreadedFile::rawWrite(buffer, length);
+ }
+
+protected:
+ virtual bool rawOpen(const std::string &filename, File::Mode mode);
+ virtual bool rawWrite(const void *buffer, size_t length)
+ {
+ m_cache->write((const char *)buffer, length);
+ return true;
+ }
+
+ virtual size_t rawRead(void *buffer, size_t length);
+ virtual int rawGetc();
+ virtual void rawClose();
+ virtual void rawFlush();
+ virtual bool rawSkip(size_t length);
+ virtual int rawPercentRead();
+
+private:
+ std::fstream m_stream;
+ std::streampos m_endPos;
+ CompressionCache *m_cache;
+ File::Offset m_currentOffset;
+ static const size_t CACHE_SIZE = 1 * 1024 * 1024;
+ trace::CompressionLibrary *m_library;
+ os::Thread m_thread;
+
+ void writeLength(size_t length);
+
+ THREAD_ROUTINE static void * compressorThread(void * param);
+ };
+
+
+#endif /* TRACE_THREADED_FILE_HPP_ */
diff --git a/common/trace_writer.cpp b/common/trace_writer.cpp
index dbf00b5..5efe188 100644
--- a/common/trace_writer.cpp
+++ b/common/trace_writer.cpp
@@ -43,7 +43,7 @@ namespace trace {
Writer::Writer() :
call_no(0)
{
- m_file = File::createCommonFile(File::LZ4);
+ m_file = File::createThreadedFile();
close();
}
diff --git a/common/trace_writer.hpp b/common/trace_writer.hpp
index a75d5ac..8695c47 100644
--- a/common/trace_writer.hpp
+++ b/common/trace_writer.hpp
@@ -37,13 +37,18 @@
#include "trace_model.hpp"
#include "trace_backtrace.hpp"
+#include "trace_threaded_file.hpp"
namespace trace {
class File;
class Writer {
protected:
- File *m_file;
+ /*
+ * m_file type changed to ThreadedFile since tests shows that getting rid
+ * of virtual calls and heavy inlining helps performance a lot.
+ */
+ ThreadedFile *m_file;
unsigned call_no;
std::vector<bool> functions;
diff --git a/common/trace_writer_local.cpp b/common/trace_writer_local.cpp
index 5f86f7d..f5644a3 100644
--- a/common/trace_writer_local.cpp
+++ b/common/trace_writer_local.cpp
@@ -146,7 +146,7 @@ void LocalWriter::checkProcessId(void) {
// create a new file. We can't call any method of the current
// file, as it may cause it to flush and corrupt the parent's
// trace, so we effectively leak the old file object.
- m_file = File::createCommonFile(File::SNAPPY);
+ m_file = File::createThreadedFile();
// Don't want to open the same file again
os::unsetEnvironment("TRACE_FILE");
open();
--
1.7.9.5
More information about the apitrace
mailing list