[Mesa-dev] [PATCH 3/6] gallium/u_queue: add an option to have multiple worker threads

Nicolai Hähnle nhaehnle at gmail.com
Tue Jun 21 14:41:34 UTC 2016



On 21.06.2016 14:17, Marek Olšák wrote:
> From: Marek Olšák <marek.olsak at amd.com>
>
> independent jobs don't have to be stuck on only one thread
>
> (there is just one deadlock situation in util_queue_destroy that
>   will be fixed in a later patch)
> ---
>   src/gallium/auxiliary/util/u_queue.c              | 66 +++++++++++++++++++----
>   src/gallium/auxiliary/util/u_queue.h              | 12 +++--
>   src/gallium/winsys/amdgpu/drm/amdgpu_cs.c         |  4 +-
>   src/gallium/winsys/amdgpu/drm/amdgpu_cs.h         |  2 +-
>   src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c     |  2 +-
>   src/gallium/winsys/radeon/drm/radeon_drm_cs.c     |  4 +-
>   src/gallium/winsys/radeon/drm/radeon_drm_cs.h     |  2 +-
>   src/gallium/winsys/radeon/drm/radeon_drm_winsys.c |  2 +-
>   8 files changed, 71 insertions(+), 23 deletions(-)
>
> diff --git a/src/gallium/auxiliary/util/u_queue.c b/src/gallium/auxiliary/util/u_queue.c
> index e6b0357..d14d850 100644
> --- a/src/gallium/auxiliary/util/u_queue.c
> +++ b/src/gallium/auxiliary/util/u_queue.c
> @@ -25,6 +25,7 @@
>    */
>
>   #include "u_queue.h"
> +#include "u_memory.h"
>   #include "os/os_time.h"
>
>   static void
> @@ -48,15 +49,23 @@ util_queue_job_wait(struct util_queue_fence *fence)
>      pipe_mutex_unlock(fence->mutex);
>   }
>
> -static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
> +struct thread_input {
> +   struct util_queue *queue;
> +   int thread_index;
> +};
> +
> +static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
>   {
> -   struct util_queue *queue = (struct util_queue*)param;
> +   struct util_queue *queue = ((struct thread_input*)input)->queue;
> +   int thread_index = ((struct thread_input*)input)->thread_index;
> +
> +   FREE(input);
>
>      while (1) {
>         struct util_queue_job job;
>
>         pipe_semaphore_wait(&queue->queued);
> -      if (queue->kill_thread)
> +      if (queue->kill_threads)
>            break;
>
>         pipe_mutex_lock(queue->lock);
> @@ -68,7 +77,7 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
>         pipe_semaphore_signal(&queue->has_space);
>
>         if (job.job) {
> -         queue->execute_job(job.job);
> +         queue->execute_job(job.job, thread_index);
>            util_queue_fence_signal(job.fence);
>         }
>      }
> @@ -88,9 +97,13 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
>   bool
>   util_queue_init(struct util_queue *queue,
>                   unsigned max_jobs,
> -                void (*execute_job)(void *))
> +                unsigned num_threads,
> +                void (*execute_job)(void *, int))
>   {
> +   unsigned i;
> +
>      memset(queue, 0, sizeof(*queue));
> +   queue->num_threads = num_threads;
>      queue->max_jobs = max_jobs;
>
>      queue->jobs = (struct util_queue_job*)
> @@ -103,13 +116,36 @@ util_queue_init(struct util_queue *queue,
>      pipe_semaphore_init(&queue->has_space, max_jobs);
>      pipe_semaphore_init(&queue->queued, 0);
>
> -   queue->thread = pipe_thread_create(util_queue_thread_func, queue);
> -   if (!queue->thread)
> +   queue->threads = (pipe_thread*)calloc(num_threads, sizeof(pipe_thread));

Same comment about CALLOC/FREE consistency as on patch #1.

Nicolai

> +   if (!queue->threads)
>         goto fail;
>
> +   /* start threads */
> +   for (i = 0; i < num_threads; i++) {
> +      struct thread_input *input = MALLOC_STRUCT(thread_input);
> +      input->queue = queue;
> +      input->thread_index = i;
> +
> +      queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
> +
> +      if (!queue->threads[i]) {
> +         FREE(input);
> +
> +         if (i == 0) {
> +            /* no threads created, fail */
> +            goto fail;
> +         } else {
> +            /* at least one thread created, so use it */
> +            queue->num_threads = i+1;
> +            break;
> +         }
> +      }
> +   }
>      return true;
>
>   fail:
> +   free(queue->threads);
> +
>      if (queue->jobs) {
>         pipe_semaphore_destroy(&queue->has_space);
>         pipe_semaphore_destroy(&queue->queued);
> @@ -124,13 +160,23 @@ fail:
>   void
>   util_queue_destroy(struct util_queue *queue)
>   {
> -   queue->kill_thread = 1;
> -   pipe_semaphore_signal(&queue->queued);
> -   pipe_thread_wait(queue->thread);
> +   unsigned i;
> +
> +   /* Signal all threads to terminate. */
> +   pipe_mutex_lock(queue->queued.mutex);
> +   queue->kill_threads = 1;
> +   queue->queued.counter = queue->num_threads;
> +   pipe_condvar_broadcast(queue->queued.cond);
> +   pipe_mutex_unlock(queue->queued.mutex);
> +
> +   for (i = 0; i < queue->num_threads; i++)
> +      pipe_thread_wait(queue->threads[i]);
> +
>      pipe_semaphore_destroy(&queue->has_space);
>      pipe_semaphore_destroy(&queue->queued);
>      pipe_mutex_destroy(queue->lock);
>      free(queue->jobs);
> +   free(queue->threads);
>   }
>
>   void
> diff --git a/src/gallium/auxiliary/util/u_queue.h b/src/gallium/auxiliary/util/u_queue.h
> index acebb51..f3aa4f6 100644
> --- a/src/gallium/auxiliary/util/u_queue.h
> +++ b/src/gallium/auxiliary/util/u_queue.h
> @@ -54,17 +54,19 @@ struct util_queue {
>      pipe_mutex lock;
>      pipe_semaphore has_space;
>      pipe_semaphore queued;
> -   pipe_thread thread;
> -   int kill_thread;
> +   pipe_thread *threads;
> +   unsigned num_threads;
> +   int kill_threads;
>      int max_jobs;
>      int write_idx, read_idx; /* ring buffer pointers */
>      struct util_queue_job *jobs;
> -   void (*execute_job)(void *job);
> +   void (*execute_job)(void *job, int thread_index);
>   };
>
>   bool util_queue_init(struct util_queue *queue,
>                        unsigned max_jobs,
> -                     void (*execute_job)(void *));
> +                     unsigned num_threads,
> +                     void (*execute_job)(void *, int));
>   void util_queue_destroy(struct util_queue *queue);
>   void util_queue_fence_init(struct util_queue_fence *fence);
>   void util_queue_fence_destroy(struct util_queue_fence *fence);
> @@ -78,7 +80,7 @@ void util_queue_job_wait(struct util_queue_fence *fence);
>   static inline bool
>   util_queue_is_initialized(struct util_queue *queue)
>   {
> -   return queue->thread != 0;
> +   return queue->threads != NULL;
>   }
>
>   static inline bool
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> index 4a7302a..5636f83 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> @@ -872,7 +872,7 @@ static void amdgpu_add_fence_dependencies(struct amdgpu_cs *acs)
>      }
>   }
>
> -void amdgpu_cs_submit_ib(void *job)
> +void amdgpu_cs_submit_ib(void *job, int thread_index)
>   {
>      struct amdgpu_cs *acs = (struct amdgpu_cs*)job;
>      struct amdgpu_winsys *ws = acs->ctx->ws;
> @@ -1054,7 +1054,7 @@ static void amdgpu_cs_flush(struct radeon_winsys_cs *rcs,
>             util_queue_is_initialized(&ws->cs_queue)) {
>            util_queue_add_job(&ws->cs_queue, cs, &cs->flush_completed);
>         } else {
> -         amdgpu_cs_submit_ib(cs);
> +         amdgpu_cs_submit_ib(cs, 0);
>         }
>      } else {
>         amdgpu_cs_context_cleanup(cs->csc);
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> index 354e403..a7f3414 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> @@ -218,6 +218,6 @@ bool amdgpu_fence_wait(struct pipe_fence_handle *fence, uint64_t timeout,
>                          bool absolute);
>   void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs);
>   void amdgpu_cs_init_functions(struct amdgpu_winsys *ws);
> -void amdgpu_cs_submit_ib(void *job);
> +void amdgpu_cs_submit_ib(void *job, int thread_index);
>
>   #endif
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> index ddcdc86..22a8122 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> @@ -493,7 +493,7 @@ amdgpu_winsys_create(int fd, radeon_screen_create_t screen_create)
>      pipe_mutex_init(ws->bo_fence_lock);
>
>      if (sysconf(_SC_NPROCESSORS_ONLN) > 1 && debug_get_option_thread())
> -      util_queue_init(&ws->cs_queue, 8, amdgpu_cs_submit_ib);
> +      util_queue_init(&ws->cs_queue, 8, 1, amdgpu_cs_submit_ib);
>
>      /* Create the screen at the end. The winsys must be initialized
>       * completely.
> diff --git a/src/gallium/winsys/radeon/drm/radeon_drm_cs.c b/src/gallium/winsys/radeon/drm/radeon_drm_cs.c
> index 9552bd5..9532a6a 100644
> --- a/src/gallium/winsys/radeon/drm/radeon_drm_cs.c
> +++ b/src/gallium/winsys/radeon/drm/radeon_drm_cs.c
> @@ -427,7 +427,7 @@ static unsigned radeon_drm_cs_get_buffer_list(struct radeon_winsys_cs *rcs,
>       return cs->csc->crelocs;
>   }
>
> -void radeon_drm_cs_emit_ioctl_oneshot(void *job)
> +void radeon_drm_cs_emit_ioctl_oneshot(void *job, int thread_index)
>   {
>       struct radeon_cs_context *csc = ((struct radeon_drm_cs*)job)->cst;
>       unsigned i;
> @@ -590,7 +590,7 @@ static void radeon_drm_cs_flush(struct radeon_winsys_cs *rcs,
>               if (!(flags & RADEON_FLUSH_ASYNC))
>                   radeon_drm_cs_sync_flush(rcs);
>           } else {
> -            radeon_drm_cs_emit_ioctl_oneshot(cs);
> +            radeon_drm_cs_emit_ioctl_oneshot(cs, 0);
>           }
>       } else {
>           radeon_cs_context_cleanup(cs->cst);
> diff --git a/src/gallium/winsys/radeon/drm/radeon_drm_cs.h b/src/gallium/winsys/radeon/drm/radeon_drm_cs.h
> index a5f243d..53a3ae0 100644
> --- a/src/gallium/winsys/radeon/drm/radeon_drm_cs.h
> +++ b/src/gallium/winsys/radeon/drm/radeon_drm_cs.h
> @@ -122,6 +122,6 @@ radeon_bo_is_referenced_by_any_cs(struct radeon_bo *bo)
>
>   void radeon_drm_cs_sync_flush(struct radeon_winsys_cs *rcs);
>   void radeon_drm_cs_init_functions(struct radeon_drm_winsys *ws);
> -void radeon_drm_cs_emit_ioctl_oneshot(void *job);
> +void radeon_drm_cs_emit_ioctl_oneshot(void *job, int thread_index);
>
>   #endif
> diff --git a/src/gallium/winsys/radeon/drm/radeon_drm_winsys.c b/src/gallium/winsys/radeon/drm/radeon_drm_winsys.c
> index 453cbfc..32d58b9 100644
> --- a/src/gallium/winsys/radeon/drm/radeon_drm_winsys.c
> +++ b/src/gallium/winsys/radeon/drm/radeon_drm_winsys.c
> @@ -783,7 +783,7 @@ radeon_drm_winsys_create(int fd, radeon_screen_create_t screen_create)
>       ws->info.gart_page_size = sysconf(_SC_PAGESIZE);
>
>       if (ws->num_cpus > 1 && debug_get_option_thread())
> -        util_queue_init(&ws->cs_queue, 8,
> +        util_queue_init(&ws->cs_queue, 8, 1,
>                           radeon_drm_cs_emit_ioctl_oneshot);
>
>       /* Create the screen at the end. The winsys must be initialized
>


More information about the mesa-dev mailing list