[Beignet] [PATCH V2] Refine the cl thread implement for queue.

Zhigang Gong zhigang.gong at linux.intel.com
Thu May 29 23:59:15 PDT 2014


LGTM, pushed, thanks.

On Fri, May 30, 2014 at 02:28:30PM +0800, junyan.he at inbox.com wrote:
> From: Junyan He <junyan.he at linux.intel.com>
> 
> Because the cl_command_queue can be used in several threads simultaneously but
> without add ref to it, we now handle it like this:
> Keep one threads_slot_array, every time the thread get gpgpu or batch buffer, if it
> does not have a slot, assign it.
> The resources are keeped in queue private, and resize it if needed.
> When the thread exit, the slot will be set invalid.
> When queue released, all the resources will be released. If user still enqueue, flush
> or finish the queue after it has been released, the behavior is undefined.
> TODO: Need to shrink the slot map.
> 
> Signed-off-by: Junyan He <junyan.he at linux.intel.com>
> ---
>  src/cl_command_queue.c      |   4 +-
>  src/cl_command_queue_gen7.c |   2 +-
>  src/cl_context.c            |   1 -
>  src/cl_device_id.c          |   1 -
>  src/cl_thread.c             | 254 ++++++++++++++++++++++++++++++++------------
>  src/cl_thread.h             |   6 +-
>  6 files changed, 192 insertions(+), 76 deletions(-)
> 
> diff --git a/src/cl_command_queue.c b/src/cl_command_queue.c
> index 6a699c0..a2109d7 100644
> --- a/src/cl_command_queue.c
> +++ b/src/cl_command_queue.c
> @@ -89,7 +89,7 @@ cl_command_queue_delete(cl_command_queue queue)
>      queue->fulsim_out = NULL;
>    }
>  
> -  cl_thread_data_destroy(queue->thread_data);
> +  cl_thread_data_destroy(queue);
>    queue->thread_data = NULL;
>    cl_mem_delete(queue->perf);
>    cl_context_delete(queue->ctx);
> @@ -430,7 +430,7 @@ cl_command_queue_flush(cl_command_queue queue)
>  LOCAL cl_int
>  cl_command_queue_finish(cl_command_queue queue)
>  {
> -  cl_gpgpu_sync(cl_get_thread_batch_buf());
> +  cl_gpgpu_sync(cl_get_thread_batch_buf(queue));
>    return CL_SUCCESS;
>  }
>  
> diff --git a/src/cl_command_queue_gen7.c b/src/cl_command_queue_gen7.c
> index 891d6f1..d875021 100644
> --- a/src/cl_command_queue_gen7.c
> +++ b/src/cl_command_queue_gen7.c
> @@ -327,7 +327,7 @@ cl_command_queue_ND_range_gen7(cl_command_queue queue,
>    /* Start a new batch buffer */
>    batch_sz = cl_kernel_compute_batch_sz(ker);
>    cl_gpgpu_batch_reset(gpgpu, batch_sz);
> -  cl_set_thread_batch_buf(cl_gpgpu_ref_batch_buf(gpgpu));
> +  cl_set_thread_batch_buf(queue, cl_gpgpu_ref_batch_buf(gpgpu));
>    cl_gpgpu_batch_start(gpgpu);
>  
>    /* Issue the GPGPU_WALKER command */
> diff --git a/src/cl_context.c b/src/cl_context.c
> index 8190e6a..1911bf2 100644
> --- a/src/cl_context.c
> +++ b/src/cl_context.c
> @@ -203,7 +203,6 @@ cl_context_delete(cl_context ctx)
>    assert(ctx->buffers == NULL);
>    assert(ctx->drv);
>    cl_free(ctx->prop_user);
> -  cl_set_thread_batch_buf(NULL);
>    cl_driver_delete(ctx->drv);
>    ctx->magic = CL_MAGIC_DEAD_HEADER; /* For safety */
>    cl_free(ctx);
> diff --git a/src/cl_device_id.c b/src/cl_device_id.c
> index 427c50e..d2b3bed 100644
> --- a/src/cl_device_id.c
> +++ b/src/cl_device_id.c
> @@ -107,7 +107,6 @@ LOCAL cl_device_id
>  cl_get_gt_device(void)
>  {
>    cl_device_id ret = NULL;
> -  cl_set_thread_batch_buf(NULL);
>    const int device_id = cl_driver_get_device_id();
>    cl_device_id device = NULL;
>  
> diff --git a/src/cl_thread.c b/src/cl_thread.c
> index cadc3cd..4692ed7 100644
> --- a/src/cl_thread.c
> +++ b/src/cl_thread.c
> @@ -15,113 +15,231 @@
>   * License along with this library. If not, see <http://www.gnu.org/licenses/>.
>   *
>   */
> +#include <string.h>
> +#include <stdio.h>
>  
>  #include "cl_thread.h"
>  #include "cl_alloc.h"
>  #include "cl_utils.h"
>  
> -static __thread void* thread_batch_buf = NULL;
> -
> -typedef struct _cl_thread_spec_data {
> +/* Because the cl_command_queue can be used in several threads simultaneously but
> +   without add ref to it, we now handle it like this:
> +   Keep one threads_slot_array, every time the thread get gpgpu or batch buffer, if it
> +   does not have a slot, assign it.
> +   The resources are keeped in queue private, and resize it if needed.
> +   When the thread exit, the slot will be set invalid.
> +   When queue released, all the resources will be released. If user still enqueue, flush
> +   or finish the queue after it has been released, the behavior is undefined.
> +   TODO: Need to shrink the slot map.
> +   */
> +
> +static int thread_array_num = 1;
> +static int *thread_slot_map = NULL;
> +static int thread_magic_num = 1;
> +static pthread_mutex_t thread_queue_map_lock = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_key_t destroy_key;
> +
> +static __thread int thread_id = -1;
> +static __thread int thread_magic = -1;
> +
> +typedef struct _thread_spec_data {
>    cl_gpgpu gpgpu ;
>    int valid;
> -}cl_thread_spec_data;
> +  void* thread_batch_buf;
> +  int thread_magic;
> +} thread_spec_data;
> +
> +typedef struct _queue_thread_private {
> +  thread_spec_data**  threads_data;
> +  int threads_data_num;
> +  pthread_mutex_t thread_data_lock;
> +} queue_thread_private;
> +
> +static void thread_data_destructor(void *dummy) {
> +  pthread_mutex_lock(&thread_queue_map_lock);
> +  thread_slot_map[thread_id] = 0;
> +  pthread_mutex_unlock(&thread_queue_map_lock);
> +  free(dummy);
> +}
> +
> +static thread_spec_data * __create_thread_spec_data(cl_command_queue queue, int create)
> +{
> +  queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
> +  thread_spec_data* spec = NULL;
> +  int i = 0;
> +
> +  if (thread_id == -1) {
> +    void * dummy = malloc(sizeof(int));
> +
> +    pthread_mutex_lock(&thread_queue_map_lock);
> +    for (i = 0; i < thread_array_num; i++) {
> +      if (thread_slot_map[i] == 0) {
> +        thread_id = i;
> +        break;
> +      }
> +    }
> +
> +    if (i == thread_array_num) {
> +      thread_array_num *= 2;
> +      thread_slot_map = realloc(thread_slot_map, sizeof(int) * thread_array_num);
> +      memset(thread_slot_map + thread_array_num/2, 0, sizeof(int) * (thread_array_num/2));
> +      thread_id = thread_array_num/2;
> +    }
> +
> +    thread_slot_map[thread_id] = 1;
> +
> +    thread_magic = thread_magic_num++;
> +    pthread_mutex_unlock(&thread_queue_map_lock);
>  
> -void cl_set_thread_batch_buf(void* buf) {
> -  if (thread_batch_buf) {
> -    cl_gpgpu_unref_batch_buf(thread_batch_buf);
> +    pthread_setspecific(destroy_key, dummy);
> +  }
> +
> +  pthread_mutex_lock(&thread_private->thread_data_lock);
> +  if (thread_array_num > thread_private->threads_data_num) {// just enlarge
> +    int old_num = thread_private->threads_data_num;
> +    thread_private->threads_data_num = thread_array_num;
> +    thread_private->threads_data = realloc(thread_private->threads_data,
> +                thread_private->threads_data_num * sizeof(void *));
> +    memset(thread_private->threads_data + old_num, 0,
> +           sizeof(void*) * (thread_private->threads_data_num - old_num));
>    }
> -  thread_batch_buf = buf;
> -}
>  
> -void* cl_get_thread_batch_buf(void) {
> -  return thread_batch_buf;
> +  assert(thread_id != -1 && thread_id < thread_array_num);
> +  spec = thread_private->threads_data[thread_id];
> +  if (!spec && create) {
> +       spec = CALLOC(thread_spec_data);
> +       spec->thread_magic = thread_magic;
> +       thread_private->threads_data[thread_id] = spec;
> +  }
> +
> +  pthread_mutex_unlock(&thread_private->thread_data_lock);
> +
> +  return spec;
>  }
>  
> -cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue)
> +void* cl_thread_data_create(void)
>  {
> -  pthread_key_t* key = queue->thread_data;
> -  cl_thread_spec_data* thread_spec_data = pthread_getspecific(*key);
> -
> -  if (!thread_spec_data) {
> -    TRY_ALLOC_NO_ERR(thread_spec_data, CALLOC(struct _cl_thread_spec_data));
> -    if (pthread_setspecific(*key, thread_spec_data)) {
> -      cl_free(thread_spec_data);
> -      return NULL;
> -    }
> -  }
> +  queue_thread_private* thread_private = CALLOC(queue_thread_private);
> +
> +  if (thread_private == NULL)
> +    return NULL;
>  
> -  if (!thread_spec_data->valid) {
> -    TRY_ALLOC_NO_ERR(thread_spec_data->gpgpu, cl_gpgpu_new(queue->ctx->drv));
> -    thread_spec_data->valid = 1;
> +  if (thread_slot_map == NULL) {
> +    pthread_mutex_lock(&thread_queue_map_lock);
> +    thread_slot_map = calloc(thread_array_num, sizeof(int));
> +    pthread_mutex_unlock(&thread_queue_map_lock);
> +
> +    pthread_key_create(&destroy_key, thread_data_destructor);
>    }
>  
> -error:
> -  return thread_spec_data->gpgpu;
> +  pthread_mutex_init(&thread_private->thread_data_lock, NULL);
> +
> +  pthread_mutex_lock(&thread_private->thread_data_lock);
> +  thread_private->threads_data = malloc(thread_array_num * sizeof(void *));
> +  memset(thread_private->threads_data, 0, sizeof(void*) * thread_array_num);
> +  thread_private->threads_data_num = thread_array_num;
> +  pthread_mutex_unlock(&thread_private->thread_data_lock);
> +
> +  return thread_private;
>  }
>  
> -void cl_invalid_thread_gpgpu(cl_command_queue queue)
> +cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue)
>  {
> -  pthread_key_t* key = queue->thread_data;
> -  cl_thread_spec_data* thread_spec_data = pthread_getspecific(*key);
> +  thread_spec_data* spec = __create_thread_spec_data(queue, 1);
>  
> -  if (!thread_spec_data) {
> -    return;
> +  if (!spec->thread_magic && spec->thread_magic != thread_magic) {
> +    //We may get the slot from last thread. So free the resource.
> +    spec->valid = 0;
>    }
>  
> -  if (!thread_spec_data->valid) {
> -    return;
> +  if (!spec->valid) {
> +    if (spec->thread_batch_buf) {
> +      cl_gpgpu_unref_batch_buf(spec->thread_batch_buf);
> +      spec->thread_batch_buf = NULL;
> +    }
> +    if (spec->gpgpu) {
> +      cl_gpgpu_delete(spec->gpgpu);
> +      spec->gpgpu = NULL;
> +    }
> +    TRY_ALLOC_NO_ERR(spec->gpgpu, cl_gpgpu_new(queue->ctx->drv));
> +    spec->valid = 1;
>    }
>  
> -  assert(thread_spec_data->gpgpu);
> -  cl_gpgpu_delete(thread_spec_data->gpgpu);
> -  thread_spec_data->valid = 0;
> + error:
> +  return spec->gpgpu;
>  }
>  
> -static void thread_data_destructor(void *data) {
> -  cl_thread_spec_data* thread_spec_data = (cl_thread_spec_data *)data;
> +void cl_set_thread_batch_buf(cl_command_queue queue, void* buf)
> +{
> +  thread_spec_data* spec = __create_thread_spec_data(queue, 1);
> +
> +  assert(spec && spec->thread_magic == thread_magic);
>  
> -  if (thread_batch_buf) {
> -    cl_gpgpu_unref_batch_buf(thread_batch_buf);
> -    thread_batch_buf = NULL;
> +  if (spec->thread_batch_buf) {
> +    cl_gpgpu_unref_batch_buf(spec->thread_batch_buf);
>    }
> +  spec->thread_batch_buf = buf;
> +}
> +
> +void* cl_get_thread_batch_buf(cl_command_queue queue) {
> +  thread_spec_data* spec = __create_thread_spec_data(queue, 1);
>  
> -  if (thread_spec_data->valid)
> -    cl_gpgpu_delete(thread_spec_data->gpgpu);
> -  cl_free(thread_spec_data);
> +  assert(spec && spec->thread_magic == thread_magic);
> +
> +  return spec->thread_batch_buf;
>  }
>  
> -/* Create the thread specific data. */
> -void* cl_thread_data_create(void)
> +void cl_invalid_thread_gpgpu(cl_command_queue queue)
>  {
> -  int rc = 0;
> +  queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
> +  thread_spec_data* spec = NULL;
>  
> -  pthread_key_t *thread_specific_key = CALLOC(pthread_key_t);
> -  if (thread_specific_key == NULL)
> -    return NULL;
> -
> -  rc = pthread_key_create(thread_specific_key, thread_data_destructor);
> +  pthread_mutex_lock(&thread_private->thread_data_lock);
> +  spec = thread_private->threads_data[thread_id];
> +  assert(spec);
> +  pthread_mutex_unlock(&thread_private->thread_data_lock);
>  
> -  if (rc != 0)
> -    return NULL;
> +  if (!spec->valid) {
> +    return;
> +  }
>  
> -  return thread_specific_key;
> +  assert(spec->gpgpu);
> +  cl_gpgpu_delete(spec->gpgpu);
> +  spec->gpgpu = NULL;
> +  spec->valid = 0;
>  }
>  
>  /* The destructor for clean the thread specific data. */
> -void cl_thread_data_destroy(void * data)
> +void cl_thread_data_destroy(cl_command_queue queue)
>  {
> -  pthread_key_t *thread_specific_key = (pthread_key_t *)data;
> -
> -  /* First release self spec data. */
> -  cl_thread_spec_data* thread_spec_data =
> -         pthread_getspecific(*thread_specific_key);
> -  if (thread_spec_data && thread_spec_data->valid) {
> -    cl_gpgpu_delete(thread_spec_data->gpgpu);
> -    if (thread_spec_data)
> -      cl_free(thread_spec_data);
> +  int i = 0;
> +  queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
> +  int threads_data_num;
> +  thread_spec_data** threads_data;
> +
> +  pthread_mutex_lock(&thread_private->thread_data_lock);
> +  assert(thread_private->threads_data_num == thread_array_num);
> +  threads_data_num = thread_private->threads_data_num;
> +  threads_data = thread_private->threads_data;
> +  thread_private->threads_data_num = 0;
> +  thread_private->threads_data = NULL;
> +  pthread_mutex_unlock(&thread_private->thread_data_lock);
> +  cl_free(thread_private);
> +  queue->thread_data = NULL;
> +
> +  for (i = 0; i < threads_data_num; i++) {
> +    if (threads_data[i] != NULL && threads_data[i]->thread_batch_buf) {
> +      cl_gpgpu_unref_batch_buf(threads_data[i]->thread_batch_buf);
> +      threads_data[i]->thread_batch_buf = NULL;
> +    }
> +
> +    if (threads_data[i] != NULL && threads_data[i]->valid) {
> +      cl_gpgpu_delete(threads_data[i]->gpgpu);
> +      threads_data[i]->gpgpu = NULL;
> +      threads_data[i]->valid = 0;
> +    }
> +    cl_free(threads_data[i]);
>    }
>  
> -  pthread_key_delete(*thread_specific_key);
> -  cl_free(thread_specific_key);
> +  cl_free(threads_data);
>  }
> diff --git a/src/cl_thread.h b/src/cl_thread.h
> index c8ab63c..bf855a2 100644
> --- a/src/cl_thread.h
> +++ b/src/cl_thread.h
> @@ -27,7 +27,7 @@
>  void* cl_thread_data_create(void);
>  
>  /* The destructor for clean the thread specific data. */
> -void cl_thread_data_destroy(void * data);
> +void cl_thread_data_destroy(cl_command_queue queue);
>  
>  /* Used to get the gpgpu struct of each thread. */
>  cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue);
> @@ -36,9 +36,9 @@ cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue);
>  void cl_invalid_thread_gpgpu(cl_command_queue queue);
>  
>  /* Used to set the batch buffer of each thread. */
> -void cl_set_thread_batch_buf(void* buf);
> +void cl_set_thread_batch_buf(cl_command_queue queue, void* buf);
>  
>  /* Used to get the batch buffer of each thread. */
> -void* cl_get_thread_batch_buf(void);
> +void* cl_get_thread_batch_buf(cl_command_queue queue);
>  
>  #endif /* __CL_THREAD_H__ */
> -- 
> 1.8.3.2
> 
> _______________________________________________
> Beignet mailing list
> Beignet at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/beignet


More information about the Beignet mailing list