[Beignet] [PATCH V2] Refine the cl thread implement for queue.
Zhigang Gong
zhigang.gong at linux.intel.com
Thu May 29 23:59:15 PDT 2014
LGTM, pushed, thanks.
On Fri, May 30, 2014 at 02:28:30PM +0800, junyan.he at inbox.com wrote:
> From: Junyan He <junyan.he at linux.intel.com>
>
> Because the cl_command_queue can be used in several threads simultaneously but
> without add ref to it, we now handle it like this:
> Keep one threads_slot_array, every time the thread get gpgpu or batch buffer, if it
> does not have a slot, assign it.
> The resources are keeped in queue private, and resize it if needed.
> When the thread exit, the slot will be set invalid.
> When queue released, all the resources will be released. If user still enqueue, flush
> or finish the queue after it has been released, the behavior is undefined.
> TODO: Need to shrink the slot map.
>
> Signed-off-by: Junyan He <junyan.he at linux.intel.com>
> ---
> src/cl_command_queue.c | 4 +-
> src/cl_command_queue_gen7.c | 2 +-
> src/cl_context.c | 1 -
> src/cl_device_id.c | 1 -
> src/cl_thread.c | 254 ++++++++++++++++++++++++++++++++------------
> src/cl_thread.h | 6 +-
> 6 files changed, 192 insertions(+), 76 deletions(-)
>
> diff --git a/src/cl_command_queue.c b/src/cl_command_queue.c
> index 6a699c0..a2109d7 100644
> --- a/src/cl_command_queue.c
> +++ b/src/cl_command_queue.c
> @@ -89,7 +89,7 @@ cl_command_queue_delete(cl_command_queue queue)
> queue->fulsim_out = NULL;
> }
>
> - cl_thread_data_destroy(queue->thread_data);
> + cl_thread_data_destroy(queue);
> queue->thread_data = NULL;
> cl_mem_delete(queue->perf);
> cl_context_delete(queue->ctx);
> @@ -430,7 +430,7 @@ cl_command_queue_flush(cl_command_queue queue)
> LOCAL cl_int
> cl_command_queue_finish(cl_command_queue queue)
> {
> - cl_gpgpu_sync(cl_get_thread_batch_buf());
> + cl_gpgpu_sync(cl_get_thread_batch_buf(queue));
> return CL_SUCCESS;
> }
>
> diff --git a/src/cl_command_queue_gen7.c b/src/cl_command_queue_gen7.c
> index 891d6f1..d875021 100644
> --- a/src/cl_command_queue_gen7.c
> +++ b/src/cl_command_queue_gen7.c
> @@ -327,7 +327,7 @@ cl_command_queue_ND_range_gen7(cl_command_queue queue,
> /* Start a new batch buffer */
> batch_sz = cl_kernel_compute_batch_sz(ker);
> cl_gpgpu_batch_reset(gpgpu, batch_sz);
> - cl_set_thread_batch_buf(cl_gpgpu_ref_batch_buf(gpgpu));
> + cl_set_thread_batch_buf(queue, cl_gpgpu_ref_batch_buf(gpgpu));
> cl_gpgpu_batch_start(gpgpu);
>
> /* Issue the GPGPU_WALKER command */
> diff --git a/src/cl_context.c b/src/cl_context.c
> index 8190e6a..1911bf2 100644
> --- a/src/cl_context.c
> +++ b/src/cl_context.c
> @@ -203,7 +203,6 @@ cl_context_delete(cl_context ctx)
> assert(ctx->buffers == NULL);
> assert(ctx->drv);
> cl_free(ctx->prop_user);
> - cl_set_thread_batch_buf(NULL);
> cl_driver_delete(ctx->drv);
> ctx->magic = CL_MAGIC_DEAD_HEADER; /* For safety */
> cl_free(ctx);
> diff --git a/src/cl_device_id.c b/src/cl_device_id.c
> index 427c50e..d2b3bed 100644
> --- a/src/cl_device_id.c
> +++ b/src/cl_device_id.c
> @@ -107,7 +107,6 @@ LOCAL cl_device_id
> cl_get_gt_device(void)
> {
> cl_device_id ret = NULL;
> - cl_set_thread_batch_buf(NULL);
> const int device_id = cl_driver_get_device_id();
> cl_device_id device = NULL;
>
> diff --git a/src/cl_thread.c b/src/cl_thread.c
> index cadc3cd..4692ed7 100644
> --- a/src/cl_thread.c
> +++ b/src/cl_thread.c
> @@ -15,113 +15,231 @@
> * License along with this library. If not, see <http://www.gnu.org/licenses/>.
> *
> */
> +#include <string.h>
> +#include <stdio.h>
>
> #include "cl_thread.h"
> #include "cl_alloc.h"
> #include "cl_utils.h"
>
> -static __thread void* thread_batch_buf = NULL;
> -
> -typedef struct _cl_thread_spec_data {
> +/* Because the cl_command_queue can be used in several threads simultaneously but
> + without add ref to it, we now handle it like this:
> + Keep one threads_slot_array, every time the thread get gpgpu or batch buffer, if it
> + does not have a slot, assign it.
> + The resources are keeped in queue private, and resize it if needed.
> + When the thread exit, the slot will be set invalid.
> + When queue released, all the resources will be released. If user still enqueue, flush
> + or finish the queue after it has been released, the behavior is undefined.
> + TODO: Need to shrink the slot map.
> + */
> +
> +static int thread_array_num = 1;
> +static int *thread_slot_map = NULL;
> +static int thread_magic_num = 1;
> +static pthread_mutex_t thread_queue_map_lock = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_key_t destroy_key;
> +
> +static __thread int thread_id = -1;
> +static __thread int thread_magic = -1;
> +
> +typedef struct _thread_spec_data {
> cl_gpgpu gpgpu ;
> int valid;
> -}cl_thread_spec_data;
> + void* thread_batch_buf;
> + int thread_magic;
> +} thread_spec_data;
> +
> +typedef struct _queue_thread_private {
> + thread_spec_data** threads_data;
> + int threads_data_num;
> + pthread_mutex_t thread_data_lock;
> +} queue_thread_private;
> +
> +static void thread_data_destructor(void *dummy) {
> + pthread_mutex_lock(&thread_queue_map_lock);
> + thread_slot_map[thread_id] = 0;
> + pthread_mutex_unlock(&thread_queue_map_lock);
> + free(dummy);
> +}
> +
> +static thread_spec_data * __create_thread_spec_data(cl_command_queue queue, int create)
> +{
> + queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
> + thread_spec_data* spec = NULL;
> + int i = 0;
> +
> + if (thread_id == -1) {
> + void * dummy = malloc(sizeof(int));
> +
> + pthread_mutex_lock(&thread_queue_map_lock);
> + for (i = 0; i < thread_array_num; i++) {
> + if (thread_slot_map[i] == 0) {
> + thread_id = i;
> + break;
> + }
> + }
> +
> + if (i == thread_array_num) {
> + thread_array_num *= 2;
> + thread_slot_map = realloc(thread_slot_map, sizeof(int) * thread_array_num);
> + memset(thread_slot_map + thread_array_num/2, 0, sizeof(int) * (thread_array_num/2));
> + thread_id = thread_array_num/2;
> + }
> +
> + thread_slot_map[thread_id] = 1;
> +
> + thread_magic = thread_magic_num++;
> + pthread_mutex_unlock(&thread_queue_map_lock);
>
> -void cl_set_thread_batch_buf(void* buf) {
> - if (thread_batch_buf) {
> - cl_gpgpu_unref_batch_buf(thread_batch_buf);
> + pthread_setspecific(destroy_key, dummy);
> + }
> +
> + pthread_mutex_lock(&thread_private->thread_data_lock);
> + if (thread_array_num > thread_private->threads_data_num) {// just enlarge
> + int old_num = thread_private->threads_data_num;
> + thread_private->threads_data_num = thread_array_num;
> + thread_private->threads_data = realloc(thread_private->threads_data,
> + thread_private->threads_data_num * sizeof(void *));
> + memset(thread_private->threads_data + old_num, 0,
> + sizeof(void*) * (thread_private->threads_data_num - old_num));
> }
> - thread_batch_buf = buf;
> -}
>
> -void* cl_get_thread_batch_buf(void) {
> - return thread_batch_buf;
> + assert(thread_id != -1 && thread_id < thread_array_num);
> + spec = thread_private->threads_data[thread_id];
> + if (!spec && create) {
> + spec = CALLOC(thread_spec_data);
> + spec->thread_magic = thread_magic;
> + thread_private->threads_data[thread_id] = spec;
> + }
> +
> + pthread_mutex_unlock(&thread_private->thread_data_lock);
> +
> + return spec;
> }
>
> -cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue)
> +void* cl_thread_data_create(void)
> {
> - pthread_key_t* key = queue->thread_data;
> - cl_thread_spec_data* thread_spec_data = pthread_getspecific(*key);
> -
> - if (!thread_spec_data) {
> - TRY_ALLOC_NO_ERR(thread_spec_data, CALLOC(struct _cl_thread_spec_data));
> - if (pthread_setspecific(*key, thread_spec_data)) {
> - cl_free(thread_spec_data);
> - return NULL;
> - }
> - }
> + queue_thread_private* thread_private = CALLOC(queue_thread_private);
> +
> + if (thread_private == NULL)
> + return NULL;
>
> - if (!thread_spec_data->valid) {
> - TRY_ALLOC_NO_ERR(thread_spec_data->gpgpu, cl_gpgpu_new(queue->ctx->drv));
> - thread_spec_data->valid = 1;
> + if (thread_slot_map == NULL) {
> + pthread_mutex_lock(&thread_queue_map_lock);
> + thread_slot_map = calloc(thread_array_num, sizeof(int));
> + pthread_mutex_unlock(&thread_queue_map_lock);
> +
> + pthread_key_create(&destroy_key, thread_data_destructor);
> }
>
> -error:
> - return thread_spec_data->gpgpu;
> + pthread_mutex_init(&thread_private->thread_data_lock, NULL);
> +
> + pthread_mutex_lock(&thread_private->thread_data_lock);
> + thread_private->threads_data = malloc(thread_array_num * sizeof(void *));
> + memset(thread_private->threads_data, 0, sizeof(void*) * thread_array_num);
> + thread_private->threads_data_num = thread_array_num;
> + pthread_mutex_unlock(&thread_private->thread_data_lock);
> +
> + return thread_private;
> }
>
> -void cl_invalid_thread_gpgpu(cl_command_queue queue)
> +cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue)
> {
> - pthread_key_t* key = queue->thread_data;
> - cl_thread_spec_data* thread_spec_data = pthread_getspecific(*key);
> + thread_spec_data* spec = __create_thread_spec_data(queue, 1);
>
> - if (!thread_spec_data) {
> - return;
> + if (!spec->thread_magic && spec->thread_magic != thread_magic) {
> + //We may get the slot from last thread. So free the resource.
> + spec->valid = 0;
> }
>
> - if (!thread_spec_data->valid) {
> - return;
> + if (!spec->valid) {
> + if (spec->thread_batch_buf) {
> + cl_gpgpu_unref_batch_buf(spec->thread_batch_buf);
> + spec->thread_batch_buf = NULL;
> + }
> + if (spec->gpgpu) {
> + cl_gpgpu_delete(spec->gpgpu);
> + spec->gpgpu = NULL;
> + }
> + TRY_ALLOC_NO_ERR(spec->gpgpu, cl_gpgpu_new(queue->ctx->drv));
> + spec->valid = 1;
> }
>
> - assert(thread_spec_data->gpgpu);
> - cl_gpgpu_delete(thread_spec_data->gpgpu);
> - thread_spec_data->valid = 0;
> + error:
> + return spec->gpgpu;
> }
>
> -static void thread_data_destructor(void *data) {
> - cl_thread_spec_data* thread_spec_data = (cl_thread_spec_data *)data;
> +void cl_set_thread_batch_buf(cl_command_queue queue, void* buf)
> +{
> + thread_spec_data* spec = __create_thread_spec_data(queue, 1);
> +
> + assert(spec && spec->thread_magic == thread_magic);
>
> - if (thread_batch_buf) {
> - cl_gpgpu_unref_batch_buf(thread_batch_buf);
> - thread_batch_buf = NULL;
> + if (spec->thread_batch_buf) {
> + cl_gpgpu_unref_batch_buf(spec->thread_batch_buf);
> }
> + spec->thread_batch_buf = buf;
> +}
> +
> +void* cl_get_thread_batch_buf(cl_command_queue queue) {
> + thread_spec_data* spec = __create_thread_spec_data(queue, 1);
>
> - if (thread_spec_data->valid)
> - cl_gpgpu_delete(thread_spec_data->gpgpu);
> - cl_free(thread_spec_data);
> + assert(spec && spec->thread_magic == thread_magic);
> +
> + return spec->thread_batch_buf;
> }
>
> -/* Create the thread specific data. */
> -void* cl_thread_data_create(void)
> +void cl_invalid_thread_gpgpu(cl_command_queue queue)
> {
> - int rc = 0;
> + queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
> + thread_spec_data* spec = NULL;
>
> - pthread_key_t *thread_specific_key = CALLOC(pthread_key_t);
> - if (thread_specific_key == NULL)
> - return NULL;
> -
> - rc = pthread_key_create(thread_specific_key, thread_data_destructor);
> + pthread_mutex_lock(&thread_private->thread_data_lock);
> + spec = thread_private->threads_data[thread_id];
> + assert(spec);
> + pthread_mutex_unlock(&thread_private->thread_data_lock);
>
> - if (rc != 0)
> - return NULL;
> + if (!spec->valid) {
> + return;
> + }
>
> - return thread_specific_key;
> + assert(spec->gpgpu);
> + cl_gpgpu_delete(spec->gpgpu);
> + spec->gpgpu = NULL;
> + spec->valid = 0;
> }
>
> /* The destructor for clean the thread specific data. */
> -void cl_thread_data_destroy(void * data)
> +void cl_thread_data_destroy(cl_command_queue queue)
> {
> - pthread_key_t *thread_specific_key = (pthread_key_t *)data;
> -
> - /* First release self spec data. */
> - cl_thread_spec_data* thread_spec_data =
> - pthread_getspecific(*thread_specific_key);
> - if (thread_spec_data && thread_spec_data->valid) {
> - cl_gpgpu_delete(thread_spec_data->gpgpu);
> - if (thread_spec_data)
> - cl_free(thread_spec_data);
> + int i = 0;
> + queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
> + int threads_data_num;
> + thread_spec_data** threads_data;
> +
> + pthread_mutex_lock(&thread_private->thread_data_lock);
> + assert(thread_private->threads_data_num == thread_array_num);
> + threads_data_num = thread_private->threads_data_num;
> + threads_data = thread_private->threads_data;
> + thread_private->threads_data_num = 0;
> + thread_private->threads_data = NULL;
> + pthread_mutex_unlock(&thread_private->thread_data_lock);
> + cl_free(thread_private);
> + queue->thread_data = NULL;
> +
> + for (i = 0; i < threads_data_num; i++) {
> + if (threads_data[i] != NULL && threads_data[i]->thread_batch_buf) {
> + cl_gpgpu_unref_batch_buf(threads_data[i]->thread_batch_buf);
> + threads_data[i]->thread_batch_buf = NULL;
> + }
> +
> + if (threads_data[i] != NULL && threads_data[i]->valid) {
> + cl_gpgpu_delete(threads_data[i]->gpgpu);
> + threads_data[i]->gpgpu = NULL;
> + threads_data[i]->valid = 0;
> + }
> + cl_free(threads_data[i]);
> }
>
> - pthread_key_delete(*thread_specific_key);
> - cl_free(thread_specific_key);
> + cl_free(threads_data);
> }
> diff --git a/src/cl_thread.h b/src/cl_thread.h
> index c8ab63c..bf855a2 100644
> --- a/src/cl_thread.h
> +++ b/src/cl_thread.h
> @@ -27,7 +27,7 @@
> void* cl_thread_data_create(void);
>
> /* The destructor for clean the thread specific data. */
> -void cl_thread_data_destroy(void * data);
> +void cl_thread_data_destroy(cl_command_queue queue);
>
> /* Used to get the gpgpu struct of each thread. */
> cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue);
> @@ -36,9 +36,9 @@ cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue);
> void cl_invalid_thread_gpgpu(cl_command_queue queue);
>
> /* Used to set the batch buffer of each thread. */
> -void cl_set_thread_batch_buf(void* buf);
> +void cl_set_thread_batch_buf(cl_command_queue queue, void* buf);
>
> /* Used to get the batch buffer of each thread. */
> -void* cl_get_thread_batch_buf(void);
> +void* cl_get_thread_batch_buf(cl_command_queue queue);
>
> #endif /* __CL_THREAD_H__ */
> --
> 1.8.3.2
>
> _______________________________________________
> Beignet mailing list
> Beignet at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/beignet
More information about the Beignet
mailing list