[Intel-gfx] [PATCH 1/2] A lockless Buffering Utility for Concurrency

Krzysztof E. Olinski krzysztof.e.olinski at intel.com
Thu Apr 27 08:59:19 UTC 2017


The proposed buffering method utilizes atomic operations to manage
data buffering. This methodology does not use classic locking approach
(mutex, semaphores, blocking calls, etc.), therefore no "hard"
serialization takes place.

Signed-off-by: Krzysztof E. Olinski <krzysztof.e.olinski at intel.com>
---
 lib/buc.c | 208 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 lib/buc.h | 242 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 450 insertions(+)
 create mode 100755 lib/buc.c
 create mode 100755 lib/buc.h

diff --git a/lib/buc.c b/lib/buc.c
new file mode 100755
index 0000000..1a5b833
--- /dev/null
+++ b/lib/buc.c
@@ -0,0 +1,208 @@
+/*
+ * Copyright © 2017 Intel Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ *
+ * Authors:
+ *    Krzysztof E. Olinski <krzysztof.e.olinski at intel.com>
+ *
+ */
+
+#include "buc.h"
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+
+
+#include "igt.h"
+#define buc__assert igt_assert
+
+// Customized alloc definition.
+// You can pick your favorite allocator here.
+static void* buc__alloc(size_t size)
+{
+    // Add extra 4 bytes to store size of the buffer.
+    void* addr = malloc(size+sizeof(int));
+    buc__assert(addr);
+
+    {
+        int ret;
+        ret = mlock(addr, size+sizeof(int));
+        buc__assert(!ret);
+    }
+
+    // Store 'size' for munlock.
+    // To be pedantic, call munlock before free.
+    *(unsigned int*)addr = size;
+
+    return ((char*)addr + sizeof(int));
+}
+
+static void buc__free(void* addr)
+{
+    void* raddr = ((char*)addr - sizeof(int));
+    unsigned int len = *(unsigned int*)raddr;
+    munlock(raddr, len);
+    free(raddr);
+}
+
+static void* collector_thread(void *p)
+{
+    buc_t *my_buc = (buc_t*)p;
+    bufdesc_t *process_buffer = my_buc->bufferB;
+    int ret;
+
+    while(my_buc->out_fd != -1)
+    {
+        // Swap buffers A <-> B.
+        process_buffer = (bufdesc_t*)__atomic_exchange_n(
+            &my_buc->active_base,
+            process_buffer,
+            __ATOMIC_SEQ_CST);
+
+        if(process_buffer->cursor > 0)
+        {
+            // Wait until nobody is writing to the buffer.
+            // The only lock in this design is here in the collector.
+            while(process_buffer->ref_pointer > (void*)process_buffer)
+                sched_yield();
+
+            ret = write(my_buc->out_fd,
+                (char*)process_buffer + sizeof(bufdesc_t),
+                (process_buffer->cursor_of_last_hope)?\
+                process_buffer->cursor_of_last_hope:process_buffer->cursor);
+            buc__assert(ret);
+            process_buffer->cursor = 0;
+            process_buffer->cursor_of_last_hope = 0;
+        }
+        sched_yield();
+    }
+
+    pthread_exit(NULL);
+}
+
+buc_t* buc__create(int out_fd, unsigned int buffer_size)
+{
+    int r;
+
+    // Create main structure.
+    buc_t *new_buflogger = buc__alloc(sizeof(buc_t));
+    buc__assert(new_buflogger);
+
+    // Allocate buffers A and B.
+    new_buflogger->bufferA = buc__alloc(2*(sizeof(bufdesc_t) + buffer_size));
+    buc__assert(new_buflogger->bufferA);
+    new_buflogger->bufferA->ref_pointer = new_buflogger->bufferA;
+    new_buflogger->bufferA->cursor = 0;
+
+    new_buflogger->bufferB = (bufdesc_t*)(((char*)(new_buflogger->bufferA)
+        + (sizeof(bufdesc_t) + buffer_size)));
+    new_buflogger->bufferB->ref_pointer = new_buflogger->bufferB;
+    new_buflogger->bufferB->cursor = 0;
+
+    new_buflogger->active_base = new_buflogger->bufferA;
+    new_buflogger->buffer_size = buffer_size;
+    new_buflogger->out_fd = out_fd;
+    new_buflogger->overflow_counter = 0;
+
+    // Create collector thread.
+    r = pthread_create(&new_buflogger->collector_thread,
+                       NULL,
+                       collector_thread,
+                       new_buflogger);
+    buc__assert(r==0);
+
+    return new_buflogger;
+}
+
+int buc__append(buc_t* this_buc, void* buf, unsigned int size)
+{
+    unsigned int   offset;
+    char *where2write;
+
+    // Get a reference to the active buffer.
+    unsigned long ref_pointer = __atomic_fetch_add(
+        (volatile unsigned long*)(
+        &((bufdesc_t*)this_buc->active_base)->ref_pointer),
+        1,
+        __ATOMIC_SEQ_CST);
+    bufdesc_t *target_buffer;
+
+    // Identify the target buffer.
+    // We assume here some reasonable maximum number of emitters which
+    // should be less than the quantity representing the size of the buffer.
+    if(ref_pointer >= (unsigned long)this_buc->bufferB)
+        target_buffer = (bufdesc_t*)this_buc->bufferB;
+    else
+        target_buffer = (bufdesc_t*)this_buc->bufferA;
+
+    // Reserve space for data within the target buffer.
+    offset = __atomic_fetch_add(
+                   &target_buffer->cursor,
+                   size,
+                   __ATOMIC_SEQ_CST);
+
+    // Verify fit conditions
+    // (each emmiter works on its own memory slot).
+    if(offset + size >= this_buc->buffer_size)
+    {
+        // Check whether we are the evil who has overflowed
+        // as the first.
+        if(offset < this_buc->buffer_size)
+        {
+            // Store the cursor_of_last_hope value
+            // so the collector can save as much as
+            // possible.
+            target_buffer->cursor_of_last_hope = offset;
+        }
+
+        __atomic_fetch_add(&this_buc->overflow_counter, 1, __ATOMIC_SEQ_CST);
+        return -1;
+    }
+
+    where2write = ((char*)target_buffer + sizeof(bufdesc_t) + offset);
+    memcpy(where2write, buf, size);
+
+    // Release reference to the active buffer.
+    __atomic_fetch_sub(
+        (volatile unsigned long*)&target_buffer->ref_pointer,
+        1,
+        __ATOMIC_SEQ_CST);
+
+    return 0;
+}
+
+void buc__finalize(buc_t* this_buc)
+{
+    // Signalize the collector thread to end.
+    this_buc->out_fd = -1;
+    pthread_join(this_buc->collector_thread, NULL);
+
+    // Free memory.
+    buc__free(this_buc->bufferA);
+    buc__free(this_buc);
+}
+
+unsigned int buc__get_overflow(buc_t* this_buc)
+{
+    return this_buc->overflow_counter;
+}
diff --git a/lib/buc.h b/lib/buc.h
new file mode 100755
index 0000000..0fa9acc
--- /dev/null
+++ b/lib/buc.h
@@ -0,0 +1,242 @@
+/*
+ * Copyright © 2017 Intel Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ *
+ * Authors:
+ *    Krzysztof E. Olinski <krzysztof.e.olinski at intel.com>
+ *
+ */
+
+/**
+ * A Buffering Utility for Concurrency (BUC)
+ *
+ * Glossary:
+ * emitter - a functional block working within some thread, who wants
+ *           to store a memory block of a known size
+ *
+ * collector - an event triggered action responsible for dumping
+ *             buffered data to some target destination (ex. disk file)
+ *
+ * memory slot - a subset of memory buffer reserved for a given emmiter
+ *               for data buffering purposes
+ *
+ * The proposed buffering method utilizes atomic operations to manage
+ * data buffering. This methodology does not use classic locking approach
+ * (mutex, semaphores, blocking calls, etc.), therefore no "hard"
+ * serialization takes place.
+ *
+ * The current design is based on two memory buffers called A and B
+ * (in general there can be more than two).
+ * Let's consider the following scheme:
+ *
+ *
+ *                                +---------------+
+ *                                |  active_base  |
+ *                       +--------+---------------+
+ * ,---------.           |
+ * |Emitter 1|       ____V_______                     ____________
+ * `---------'.     |BUFFER A   |<-ref_pointer       |BUFFER B   |
+ *             `.   |11111111111|                    |###########|
+ *               -  |11111111222|                    |###########|
+ * ,---------.      |22222222222|                    |###########|
+ * |Emitter 2|_____ |2222222222.|<-cursor            |###########|
+ * `---------'      |...........|                    |###########|
+ *                  |...........|                    |###########|
+ *     ...       _- |...........|                    |###########|
+ * ,---------.  /   |...........|                    |#####......|
+ * |Emitter n| '    '-----------'                    '----+------'
+ * `---------'                                            |
+ *                                                        |
+ *                                                        |
+ *                                                        |
+ *                                                      +---------+
+ *                                                      |collector|
+ *                                                      +---------+
+ *
+ *
+ * One of the two buffers is called active, when 'active_base' points
+ * at it. The emmiter can start writing to the buffer which is active.
+ * For each of the buffers tree variables are assigned: 'ref_pointer'
+ * and 'cursor' and 'cursor_of_last_hope'.
+ *
+ * 'ref_pointer' is used to trace a number of emitters which are currently
+ * working on that buffer. Initially, 'ref_pointer' points at the buffer
+ * base. Each emmiter which wants to mark the buffer as "in-use" increments
+ * atomically this value. When the emitter finshes all write operations
+ * on that buffer it decrements atomically 'ref_pointer' value.
+ *
+ * 'cursor' points at a free place within the buffer.
+ *
+ * 'cursor_of_last_hope' points at the end of consistent data when
+ * overflow occurs.
+ *
+ *
+ * The emmiter who want to store data in the active buffer performs
+ * the following steps:
+ * 1. Make atomic fetch_and_increment operation on
+ *    'active_base->ref_pointer'. This step marks the active buffer
+ *    as "in-use". At the same time the retrived 'ref_pointer' value
+ *    allows to identify the buffer to which the emmiter can write into
+ *    (called then target buffer for a given emitter). We assume here
+ *    some reasonable maximum number of emitters which should be less than
+ *    a quantity representing a size of the buffer.
+ *
+ * 2. Make atomic addition (cursor = cursor + required_space) on the target
+ *    buffer. This way we are reserving a memory slot of 'required_space'
+ *    size for a given emitter. Additionally, we can check if we are not
+ *    overflowing the target buffer.
+ *
+ * 3. Write into the reserved memory slot.
+ *
+ * 4. Make atomic decrement on 'ref_pointer' of the target buffer.
+ *
+ *
+ * The collector is performing the following steps:
+ * 1. Make atomic exchange on 'active_base' to swap A and B buffers.
+ *
+ * 2. Wait until non-active buffer's 'ref_pointer' points at its base
+ *    address. In this step we are just waiting until all jobs on that
+ *    buffer are finished.
+ *
+ * 3. Dump non-active buffer content to the final destination
+ *    (ex. disk file).
+ *
+ * 4. Set the non-active buffer's cursors to zero.
+ *
+ *
+ * Exemplary usage pattern:
+ * ------------------------
+ *
+ * #include "buc.h"
+ *
+ * // declare BUC handler & destination file desc.
+ * static buc_t* my_buc = NULL;
+ * int outfile_fd;
+ *
+ * your_module_init(){
+ *    . . .
+ *    // open destination file
+ *    outfile_fd = open("mylogs.bin", ...);
+ *    . . .
+ *    // create BUC instance
+ *    my_buc = buc__create(outfile_fd, size_of_buffers);
+ *    . . .
+ * }
+ *
+ *
+ * your_thread_N(){
+ *     . . .
+ *     if(buc__append(my_buc, data_ptr, data_size))
+ *         //unsuccessful attempt :(
+ *     . . .
+ * }
+ *
+ *
+ * your_module_finalize(){
+ *     . . .
+ *     buc_finalize(my_buc);
+ *     . . .
+ *     close(outfile_fd);
+ *     . . .}
+ */
+
+#pragma once
+#include <pthread.h>
+
+// buffer header
+typedef struct _bufdesc
+{
+    // reference pointer
+    volatile void   *ref_pointer;
+
+    // cursor denoting free space start
+    volatile int    cursor;
+
+    // end of consistent data in case
+    // of overflow
+    unsigned int    cursor_of_last_hope;
+
+} bufdesc_t;
+
+// structure for instantiations purposes
+typedef struct _buc
+{
+    // final destination file descriptor
+    int                       out_fd;
+
+    // active buffer pointer
+    volatile void                      *active_base;
+
+    // predefined buffer size
+    unsigned int              buffer_size;
+
+    // overflow indicator
+    unsigned int              overflow_counter;
+
+    // collector thread
+    pthread_t                 collector_thread;
+
+    // buffer A and B pointers
+    bufdesc_t                 *bufferA;
+    bufdesc_t                 *bufferB;
+} buc_t;
+
+
+/**
+ * Creates a new BUC instance.
+ *
+ * This routine initializes a new instance of BUC.
+ * For each instance two buffers A, B are created and a separate
+ * collector thread is initialized.
+ *
+ * @param out_fd        destination file descriptor.
+ * @param buffer_size   initial buffer size.
+ *
+ * @return buc_t structure pointer on success, NULL value on failure.
+ */
+buc_t*  buc__create(int out_fd, unsigned int buffer_size);
+
+/**
+ * Appends data to the active buffer.
+ *
+ * @param this_buc pointer to buc_t structure returned by \b buc__create.
+ * @param buf      pointer to data to be appended.
+ * @param size     data size.
+ *
+ * @return zero value on success, negative value on error.
+ */
+int    buc__append(buc_t* this_buc, void* data, unsigned int size);
+
+/**
+ * Finalizes BUC instance and releases all related resources.
+ * Notice, that this function does not close destination file descriptor
+ * \b out_fd passed to \b buc__create.
+ *
+ * @param this_buc   pointer to buc_t structure returned by \b buc__create.
+ */
+void   buc__finalize(buc_t *this_buc);
+
+/**
+ * Returns overflow counter values.
+ *
+ * @param this_buc   pointer to buc_t structure returned by \b buc__create.
+ */
+unsigned int buc__get_overflow(buc_t *this_buc);
-- 
2.9.3



More information about the Intel-gfx mailing list