[Mesa-dev] [PATCH 6/7] util/queue: add util_queue_adjust_num_threads

Ian Romanick idr at freedesktop.org
Thu Jan 3 20:05:38 UTC 2019


This patch is

Reviewed-by: Ian Romanick <ian.d.romanick at intel.com>

On 11/28/18 6:59 PM, Marek Olšák wrote:
> From: Marek Olšák <marek.olsak at amd.com>
> 
> for ARB_parallel_shader_compile
> ---
>  src/util/u_queue.c | 50 ++++++++++++++++++++++++++++++++++++++++------
>  src/util/u_queue.h |  8 ++++++++
>  2 files changed, 52 insertions(+), 6 deletions(-)
> 
> diff --git a/src/util/u_queue.c b/src/util/u_queue.c
> index 612ad5e83c6..383a9c09919 100644
> --- a/src/util/u_queue.c
> +++ b/src/util/u_queue.c
> @@ -27,42 +27,43 @@
>  #include "u_queue.h"
>  
>  #include <time.h>
>  
>  #include "util/os_time.h"
>  #include "util/u_string.h"
>  #include "util/u_thread.h"
>  #include "u_process.h"
>  
>  static void
> -util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads);
> +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
> +                        bool finish_locked);
>  
>  /****************************************************************************
>   * Wait for all queues to assert idle when exit() is called.
>   *
>   * Otherwise, C++ static variable destructors can be called while threads
>   * are using the static variables.
>   */
>  
>  static once_flag atexit_once_flag = ONCE_FLAG_INIT;
>  static struct list_head queue_list;
>  static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
>  
>  static void
>  atexit_handler(void)
>  {
>     struct util_queue *iter;
>  
>     mtx_lock(&exit_mutex);
>     /* Wait for all queues to assert idle. */
>     LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
> -      util_queue_kill_threads(iter, 0);
> +      util_queue_kill_threads(iter, 0, false);
>     }
>     mtx_unlock(&exit_mutex);
>  }
>  
>  static void
>  global_init(void)
>  {
>     LIST_INITHEAD(&queue_list);
>     atexit(atexit_handler);
>  }
> @@ -333,20 +334,53 @@ util_queue_create_thread(struct util_queue *queue, unsigned index)
>         *
>         * Note that Linux only allows decreasing the priority. The original
>         * priority can't be restored.
>         */
>        pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param);
>  #endif
>     }
>     return true;
>  }
>  
> +void
> +util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
> +{
> +   num_threads = MIN2(num_threads, queue->max_threads);
> +   num_threads = MAX2(num_threads, 1);
> +
> +   mtx_lock(&queue->finish_lock);
> +   unsigned old_num_threads = queue->num_threads;
> +
> +   if (num_threads == old_num_threads) {
> +      mtx_unlock(&queue->finish_lock);
> +      return;
> +   }
> +
> +   if (num_threads < old_num_threads) {
> +      util_queue_kill_threads(queue, num_threads, true);
> +      mtx_unlock(&queue->finish_lock);
> +      return;
> +   }
> +
> +   /* Create threads.
> +    *
> +    * We need to update num_threads first, because threads terminate
> +    * when thread_index < num_threads.
> +    */
> +   queue->num_threads = num_threads;
> +   for (unsigned i = old_num_threads; i < num_threads; i++) {
> +      if (!util_queue_create_thread(queue, i))
> +         break;
> +   }
> +   mtx_unlock(&queue->finish_lock);
> +}
> +
>  bool
>  util_queue_init(struct util_queue *queue,
>                  const char *name,
>                  unsigned max_jobs,
>                  unsigned num_threads,
>                  unsigned flags)
>  {
>     unsigned i;
>  
>     /* Form the thread name from process_name and name, limited to 13
> @@ -371,20 +405,21 @@ util_queue_init(struct util_queue *queue,
>     memset(queue, 0, sizeof(*queue));
>  
>     if (process_len) {
>        util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
>                      process_len, process_name, name);
>     } else {
>        util_snprintf(queue->name, sizeof(queue->name), "%s", name);
>     }
>  
>     queue->flags = flags;
> +   queue->max_threads = num_threads;
>     queue->num_threads = num_threads;
>     queue->max_jobs = max_jobs;
>  
>     queue->jobs = (struct util_queue_job*)
>                   calloc(max_jobs, sizeof(struct util_queue_job));
>     if (!queue->jobs)
>        goto fail;
>  
>     (void) mtx_init(&queue->lock, mtx_plain);
>     (void) mtx_init(&queue->finish_lock, mtx_plain);
> @@ -422,48 +457,51 @@ fail:
>        cnd_destroy(&queue->has_queued_cond);
>        mtx_destroy(&queue->lock);
>        free(queue->jobs);
>     }
>     /* also util_queue_is_initialized can be used to check for success */
>     memset(queue, 0, sizeof(*queue));
>     return false;
>  }
>  
>  static void
> -util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads)
> +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
> +                        bool finish_locked)
>  {
>     unsigned i;
>  
>     /* Signal all threads to terminate. */
> -   mtx_lock(&queue->finish_lock);
> +   if (!finish_locked)
> +      mtx_lock(&queue->finish_lock);
>  
>     if (keep_num_threads >= queue->num_threads) {
>        mtx_unlock(&queue->finish_lock);
>        return;
>     }
>  
>     mtx_lock(&queue->lock);
>     unsigned old_num_threads = queue->num_threads;
>     queue->num_threads = keep_num_threads;
>     cnd_broadcast(&queue->has_queued_cond);
>     mtx_unlock(&queue->lock);
>  
>     for (i = keep_num_threads; i < old_num_threads; i++)
>        thrd_join(queue->threads[i], NULL);
>  
> -   mtx_unlock(&queue->finish_lock);
> +   if (!finish_locked)
> +      mtx_unlock(&queue->finish_lock);
>  }
>  
>  void
>  util_queue_destroy(struct util_queue *queue)
>  {
> -   util_queue_kill_threads(queue, 0);
> +   util_queue_kill_threads(queue, 0, false);
>     remove_from_atexit_list(queue);
>  
>     cnd_destroy(&queue->has_space_cond);
>     cnd_destroy(&queue->has_queued_cond);
>     mtx_destroy(&queue->finish_lock);
>     mtx_destroy(&queue->lock);
>     free(queue->jobs);
>     free(queue->threads);
>  }
>  
> diff --git a/src/util/u_queue.h b/src/util/u_queue.h
> index 756fa53e1bf..2d269099c20 100644
> --- a/src/util/u_queue.h
> +++ b/src/util/u_queue.h
> @@ -201,20 +201,21 @@ struct util_queue_job {
>  /* Put this into your context. */
>  struct util_queue {
>     char name[14]; /* 13 characters = the thread name without the index */
>     mtx_t finish_lock; /* for util_queue_finish and protects threads/num_threads */
>     mtx_t lock;
>     cnd_t has_queued_cond;
>     cnd_t has_space_cond;
>     thrd_t *threads;
>     unsigned flags;
>     int num_queued;
> +   unsigned max_threads;
>     unsigned num_threads; /* decreasing this number will terminate threads */
>     int max_jobs;
>     int write_idx, read_idx; /* ring buffer pointers */
>     struct util_queue_job *jobs;
>  
>     /* for cleanup at exit(), protected by exit_mutex */
>     struct list_head head;
>  };
>  
>  bool util_queue_init(struct util_queue *queue,
> @@ -228,20 +229,27 @@ void util_queue_destroy(struct util_queue *queue);
>  void util_queue_add_job(struct util_queue *queue,
>                          void *job,
>                          struct util_queue_fence *fence,
>                          util_queue_execute_func execute,
>                          util_queue_execute_func cleanup);
>  void util_queue_drop_job(struct util_queue *queue,
>                           struct util_queue_fence *fence);
>  
>  void util_queue_finish(struct util_queue *queue);
>  
> +/* Adjust the number of active threads. The new number of threads can't be
> + * greater than the initial number of threads at the creation of the queue,
> + * and it can't be less than 1.
> + */
> +void
> +util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads);
> +
>  int64_t util_queue_get_thread_time_nano(struct util_queue *queue,
>                                          unsigned thread_index);
>  
>  /* util_queue needs to be cleared to zeroes for this to work */
>  static inline bool
>  util_queue_is_initialized(struct util_queue *queue)
>  {
>     return queue->threads != NULL;
>  }
>  
> 



More information about the mesa-dev mailing list