[Mesa-dev] [PATCH 4/7] util/queue: add ability to kill a subset of threads

Ian Romanick idr at freedesktop.org
Thu Jan 3 20:01:44 UTC 2019


On 11/28/18 6:59 PM, Marek Olšák wrote:
> From: Marek Olšák <marek.olsak at amd.com>
> 
> for ARB_parallel_shader_compile
> ---
>  src/util/u_queue.c | 49 +++++++++++++++++++++++++++++-----------------
>  src/util/u_queue.h |  5 ++---
>  2 files changed, 33 insertions(+), 21 deletions(-)
> 
> diff --git a/src/util/u_queue.c b/src/util/u_queue.c
> index 48c5c79552d..5aaf60ae78e 100644
> --- a/src/util/u_queue.c
> +++ b/src/util/u_queue.c
> @@ -26,42 +26,43 @@
>  
>  #include "u_queue.h"
>  
>  #include <time.h>
>  
>  #include "util/os_time.h"
>  #include "util/u_string.h"
>  #include "util/u_thread.h"
>  #include "u_process.h"
>  
> -static void util_queue_killall_and_wait(struct util_queue *queue);
> +static void
> +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads);
>  
>  /****************************************************************************
>   * Wait for all queues to assert idle when exit() is called.
>   *
>   * Otherwise, C++ static variable destructors can be called while threads
>   * are using the static variables.
>   */
>  
>  static once_flag atexit_once_flag = ONCE_FLAG_INIT;
>  static struct list_head queue_list;
>  static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
>  
>  static void
>  atexit_handler(void)
>  {
>     struct util_queue *iter;
>  
>     mtx_lock(&exit_mutex);
>     /* Wait for all queues to assert idle. */
>     LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
> -      util_queue_killall_and_wait(iter);
> +      util_queue_kill_threads(iter, 0);
>     }
>     mtx_unlock(&exit_mutex);
>  }
>  
>  static void
>  global_init(void)
>  {
>     LIST_INITHEAD(&queue_list);
>     atexit(atexit_handler);
>  }
> @@ -259,55 +260,58 @@ util_queue_thread_func(void *input)
>        u_thread_setname(name);
>     }
>  
>     while (1) {
>        struct util_queue_job job;
>  
>        mtx_lock(&queue->lock);
>        assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
>  
>        /* wait if the queue is empty */
> -      while (!queue->kill_threads && queue->num_queued == 0)
> +      while (thread_index < queue->num_threads && queue->num_queued == 0)
>           cnd_wait(&queue->has_queued_cond, &queue->lock);
>  
> -      if (queue->kill_threads) {
> +      /* only kill threads that are above "num_threads" */
> +      if (thread_index >= queue->num_threads) {
>           mtx_unlock(&queue->lock);
>           break;
>        }
>  
>        job = queue->jobs[queue->read_idx];
>        memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
>        queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
>  
>        queue->num_queued--;
>        cnd_signal(&queue->has_space_cond);
>        mtx_unlock(&queue->lock);
>  
>        if (job.job) {
>           job.execute(job.job, thread_index);
>           util_queue_fence_signal(job.fence);
>           if (job.cleanup)
>              job.cleanup(job.job, thread_index);
>        }
>     }
>  
> -   /* signal remaining jobs before terminating */
> +   /* signal remaining jobs if all threads are being terminated */
>     mtx_lock(&queue->lock);
> -   for (unsigned i = queue->read_idx; i != queue->write_idx;
> -        i = (i + 1) % queue->max_jobs) {
> -      if (queue->jobs[i].job) {
> -         util_queue_fence_signal(queue->jobs[i].fence);
> -         queue->jobs[i].job = NULL;
> +   if (queue->num_threads == 0) {
> +      for (unsigned i = queue->read_idx; i != queue->write_idx;
> +           i = (i + 1) % queue->max_jobs) {
> +         if (queue->jobs[i].job) {
> +            util_queue_fence_signal(queue->jobs[i].fence);
> +            queue->jobs[i].job = NULL;
> +         }
>        }
> +      queue->read_idx = queue->write_idx;
> +      queue->num_queued = 0;
>     }
> -   queue->read_idx = queue->write_idx;
> -   queue->num_queued = 0;
>     mtx_unlock(&queue->lock);
>     return 0;
>  }
>  
>  static bool
>  util_queue_create_thread(struct util_queue *queue, unsigned index)
>  {
>     struct thread_input *input =
>        (struct thread_input *) malloc(sizeof(struct thread_input));
>     input->queue = queue;
> @@ -418,60 +422,69 @@ fail:
>        cnd_destroy(&queue->has_queued_cond);
>        mtx_destroy(&queue->lock);
>        free(queue->jobs);
>     }
>     /* also util_queue_is_initialized can be used to check for success */
>     memset(queue, 0, sizeof(*queue));
>     return false;
>  }
>  
>  static void
> -util_queue_killall_and_wait(struct util_queue *queue)
> +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads)
>  {
>     unsigned i;
>  
>     /* Signal all threads to terminate. */
> +   mtx_lock(&queue->finish_lock);
> +
> +   if (keep_num_threads >= queue->num_threads) {
> +      mtx_unlock(&queue->finish_lock);
> +      return;
> +   }
> +
>     mtx_lock(&queue->lock);
> -   queue->kill_threads = 1;
> +   unsigned old_num_threads = queue->num_threads;
> +   queue->num_threads = keep_num_threads;

Shouldn't this still be set below, after the threads are joined?

>     cnd_broadcast(&queue->has_queued_cond);
>     mtx_unlock(&queue->lock);
>  
> -   for (i = 0; i < queue->num_threads; i++)
> +   for (i = keep_num_threads; i < old_num_threads; i++)
>        thrd_join(queue->threads[i], NULL);
> -   queue->num_threads = 0;
> +
> +   mtx_unlock(&queue->finish_lock);
>  }
>  
>  void
>  util_queue_destroy(struct util_queue *queue)
>  {
> -   util_queue_killall_and_wait(queue);
> +   util_queue_kill_threads(queue, 0);
>     remove_from_atexit_list(queue);
>  
>     cnd_destroy(&queue->has_space_cond);
>     cnd_destroy(&queue->has_queued_cond);
>     mtx_destroy(&queue->finish_lock);
>     mtx_destroy(&queue->lock);
>     free(queue->jobs);
>     free(queue->threads);
>  }
>  
>  void
>  util_queue_add_job(struct util_queue *queue,
>                     void *job,
>                     struct util_queue_fence *fence,
>                     util_queue_execute_func execute,
>                     util_queue_execute_func cleanup)
>  {
>     struct util_queue_job *ptr;
>  
>     mtx_lock(&queue->lock);
> -   if (queue->kill_threads) {
> +   if (queue->num_threads == 0) {
>        mtx_unlock(&queue->lock);
>        /* well no good option here, but any leaks will be
>         * short-lived as things are shutting down..
>         */
>        return;
>     }
>  
>     util_queue_fence_reset(fence);
>  
>     assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
> diff --git a/src/util/u_queue.h b/src/util/u_queue.h
> index 4e63a76aab2..756fa53e1bf 100644
> --- a/src/util/u_queue.h
> +++ b/src/util/u_queue.h
> @@ -194,29 +194,28 @@ typedef void (*util_queue_execute_func)(void *job, int thread_index);
>  struct util_queue_job {
>     void *job;
>     struct util_queue_fence *fence;
>     util_queue_execute_func execute;
>     util_queue_execute_func cleanup;
>  };
>  
>  /* Put this into your context. */
>  struct util_queue {
>     char name[14]; /* 13 characters = the thread name without the index */
> -   mtx_t finish_lock; /* only for util_queue_finish */
> +   mtx_t finish_lock; /* for util_queue_finish and protects threads/num_threads */
>     mtx_t lock;
>     cnd_t has_queued_cond;
>     cnd_t has_space_cond;
>     thrd_t *threads;
>     unsigned flags;
>     int num_queued;
> -   unsigned num_threads;
> -   int kill_threads;
> +   unsigned num_threads; /* decreasing this number will terminate threads */
>     int max_jobs;
>     int write_idx, read_idx; /* ring buffer pointers */
>     struct util_queue_job *jobs;
>  
>     /* for cleanup at exit(), protected by exit_mutex */
>     struct list_head head;
>  };
>  
>  bool util_queue_init(struct util_queue *queue,
>                       const char *name,
> 



More information about the mesa-dev mailing list