[Mesa-dev] [PATCH 1/2] gallium/util: import the multithreaded job queue from amdgpu winsys

Marek Olšák maraeo at gmail.com
Tue Jun 14 13:04:21 UTC 2016


The event doesn't limit the number of outstanding jobs. It's like a fence -
you can wait for it or you don't. The limitation is that you must have
exactly one event instance for each active job. util_queue_fence is a
better name IMO. I don't plan to extend the feature set beyond that.

Marek
On Jun 14, 2016 1:26 PM, "Nicolai Hähnle" <nhaehnle at gmail.com> wrote:

> On 13.06.2016 19:34, Marek Olšák wrote:
>
>> From: Marek Olšák <marek.olsak at amd.com>
>>
>> ---
>>   src/gallium/auxiliary/Makefile.sources        |   2 +
>>   src/gallium/auxiliary/util/u_queue.c          | 129
>> ++++++++++++++++++++++++++
>>   src/gallium/auxiliary/util/u_queue.h          |  80 ++++++++++++++++
>>   src/gallium/winsys/amdgpu/drm/amdgpu_cs.c     |  23 ++---
>>   src/gallium/winsys/amdgpu/drm/amdgpu_cs.h     |   4 +-
>>   src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c |  63 +------------
>>   src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h |  11 +--
>>   7 files changed, 229 insertions(+), 83 deletions(-)
>>   create mode 100644 src/gallium/auxiliary/util/u_queue.c
>>   create mode 100644 src/gallium/auxiliary/util/u_queue.h
>>
>> diff --git a/src/gallium/auxiliary/Makefile.sources
>> b/src/gallium/auxiliary/Makefile.sources
>> index 7b3853e..ab58358 100644
>> --- a/src/gallium/auxiliary/Makefile.sources
>> +++ b/src/gallium/auxiliary/Makefile.sources
>> @@ -274,6 +274,8 @@ C_SOURCES := \
>>         util/u_pstipple.c \
>>         util/u_pstipple.h \
>>         util/u_pwr8.h \
>> +       util/u_queue.c \
>> +       util/u_queue.h \
>>         util/u_range.h \
>>         util/u_rect.h \
>>         util/u_resource.c \
>> diff --git a/src/gallium/auxiliary/util/u_queue.c
>> b/src/gallium/auxiliary/util/u_queue.c
>> new file mode 100644
>> index 0000000..311b591
>> --- /dev/null
>> +++ b/src/gallium/auxiliary/util/u_queue.c
>> @@ -0,0 +1,129 @@
>> +/*
>> + * Copyright © 2016 Advanced Micro Devices, Inc.
>> + * All Rights Reserved.
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining
>> + * a copy of this software and associated documentation files (the
>> + * "Software"), to deal in the Software without restriction, including
>> + * without limitation the rights to use, copy, modify, merge, publish,
>> + * distribute, sub license, and/or sell copies of the Software, and to
>> + * permit persons to whom the Software is furnished to do so, subject to
>> + * the following conditions:
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
>> + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
>> + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
>> + * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
>> + * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
>> + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
>> + * USE OR OTHER DEALINGS IN THE SOFTWARE.
>> + *
>> + * The above copyright notice and this permission notice (including the
>> + * next paragraph) shall be included in all copies or substantial
>> portions
>> + * of the Software.
>> + */
>> +
>> +#include "u_queue.h"
>> +
>> +static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
>> +{
>> +   struct util_queue *queue = (struct util_queue*)param;
>> +   unsigned i;
>> +
>> +   while (1) {
>> +      struct util_queue_job job;
>> +
>> +      pipe_semaphore_wait(&queue->queued);
>> +      if (queue->kill_thread)
>> +         break;
>> +
>> +      pipe_mutex_lock(queue->lock);
>> +      job = queue->jobs[0];
>> +      for (i = 1; i < queue->num_jobs; i++)
>> +         queue->jobs[i - 1] = queue->jobs[i];
>> +      queue->jobs[--queue->num_jobs].job = NULL;
>> +      pipe_mutex_unlock(queue->lock);
>> +
>> +      pipe_semaphore_signal(&queue->has_space);
>> +
>> +      if (job.job) {
>> +         queue->execute_job(job.job);
>> +         pipe_semaphore_signal(&job.event->done);
>> +      }
>> +   }
>> +
>> +   /* signal remaining jobs before terminating */
>> +   pipe_mutex_lock(queue->lock);
>> +   for (i = 0; i < queue->num_jobs; i++) {
>> +      pipe_semaphore_signal(&queue->jobs[i].event->done);
>> +      queue->jobs[i].job = NULL;
>> +   }
>> +   queue->num_jobs = 0;
>> +   pipe_mutex_unlock(queue->lock);
>> +   return 0;
>> +}
>> +
>> +void
>> +util_queue_init(struct util_queue *queue,
>> +                void (*execute_job)(void *))
>> +{
>> +   memset(queue, 0, sizeof(*queue));
>> +   queue->execute_job = execute_job;
>> +   pipe_mutex_init(queue->lock);
>> +   pipe_semaphore_init(&queue->has_space, ARRAY_SIZE(queue->jobs));
>> +   pipe_semaphore_init(&queue->queued, 0);
>> +   queue->thread = pipe_thread_create(util_queue_thread_func, queue);
>> +}
>> +
>> +void
>> +util_queue_destroy(struct util_queue *queue)
>> +{
>> +   queue->kill_thread = 1;
>> +   pipe_semaphore_signal(&queue->queued);
>> +   pipe_thread_wait(queue->thread);
>> +   pipe_semaphore_destroy(&queue->has_space);
>> +   pipe_semaphore_destroy(&queue->queued);
>> +   pipe_mutex_destroy(queue->lock);
>> +}
>> +
>> +void
>> +util_queue_event_init(struct util_queue_event *event)
>> +{
>> +   pipe_semaphore_init(&event->done, 1);
>> +}
>> +
>> +void
>> +util_queue_event_destroy(struct util_queue_event *event)
>> +{
>> +   pipe_semaphore_destroy(&event->done);
>> +}
>> +
>> +void
>> +util_queue_add_job(struct util_queue *queue,
>> +                   void *job,
>> +                   struct util_queue_event *event)
>> +{
>> +   /* Set the semaphore to "busy". */
>> +   pipe_semaphore_wait(&event->done);
>> +
>> +   /* if the queue is full, wait until there is space */
>> +   pipe_semaphore_wait(&queue->has_space);
>> +
>> +   pipe_mutex_lock(queue->lock);
>> +   assert(queue->num_jobs < ARRAY_SIZE(queue->jobs));
>> +   queue->jobs[queue->num_jobs].job = job;
>> +   queue->jobs[queue->num_jobs].event = event;
>> +   queue->num_jobs++;
>> +   pipe_mutex_unlock(queue->lock);
>> +   pipe_semaphore_signal(&queue->queued);
>> +}
>> +
>> +void
>> +util_queue_job_wait(struct util_queue_event *event)
>> +{
>> +   /* wait and set the semaphore to "busy" */
>> +   pipe_semaphore_wait(&event->done);
>> +   /* set the semaphore to "idle" */
>> +   pipe_semaphore_signal(&event->done);
>> +}
>> diff --git a/src/gallium/auxiliary/util/u_queue.h
>> b/src/gallium/auxiliary/util/u_queue.h
>> new file mode 100644
>> index 0000000..b7c1f44
>> --- /dev/null
>> +++ b/src/gallium/auxiliary/util/u_queue.h
>> @@ -0,0 +1,80 @@
>> +/*
>> + * Copyright © 2016 Advanced Micro Devices, Inc.
>> + * All Rights Reserved.
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining
>> + * a copy of this software and associated documentation files (the
>> + * "Software"), to deal in the Software without restriction, including
>> + * without limitation the rights to use, copy, modify, merge, publish,
>> + * distribute, sub license, and/or sell copies of the Software, and to
>> + * permit persons to whom the Software is furnished to do so, subject to
>> + * the following conditions:
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
>> + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
>> + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
>> + * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
>> + * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
>> + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
>> + * USE OR OTHER DEALINGS IN THE SOFTWARE.
>> + *
>> + * The above copyright notice and this permission notice (including the
>> + * next paragraph) shall be included in all copies or substantial
>> portions
>> + * of the Software.
>> + */
>> +
>> +/* Job queue with execution in a separate thread.
>> + *
>> + * Jobs can be added from any thread. After that, the wait call can be
>> used
>> + * to wait for completion of the job.
>> + */
>> +
>> +#ifndef U_QUEUE_H
>> +#define U_QUEUE_H
>> +
>> +#include "os/os_thread.h"
>> +
>> +/* Job completion event.
>> + * Put this into your job structure.
>> + */
>> +struct util_queue_event {
>> +   pipe_semaphore done;
>> +};
>> +
>> +struct util_queue_job {
>> +   void *job;
>> +   struct util_queue_event *event;
>> +};
>> +
>> +/* Put this into your context. */
>> +struct util_queue {
>> +   pipe_mutex lock;
>> +   pipe_semaphore has_space;
>> +   pipe_semaphore queued;
>> +   pipe_thread thread;
>> +   int kill_thread;
>> +   int num_jobs;
>> +   struct util_queue_job jobs[8];
>> +   void (*execute_job)(void *job);
>> +};
>> +
>> +void util_queue_init(struct util_queue *queue,
>> +                     void (*execute_job)(void *));
>> +void util_queue_destroy(struct util_queue *queue);
>> +void util_queue_event_init(struct util_queue_event *event);
>> +void util_queue_event_destroy(struct util_queue_event *event);
>> +
>> +void util_queue_add_job(struct util_queue *queue,
>> +                        void *job,
>> +                        struct util_queue_event *event);
>> +void util_queue_job_wait(struct util_queue_event *event);
>>
>
> I think the util_queue_event part of the interface is basically impossible
> to understand without knowledge of the code. It does two things:
>
> - limit the number of outstanding jobs
> - wait for outstanding jobs to finish
>
> I think it should be called util_queue_writer for this reason (or perhaps
> _submitter, but _writer is shorter):
>
> void util_queue_writer_init(struct util_queue_writer *writer);
> void util_queue_writer_destroy(struct util_queue_writer *writer);
> void util_queue_writer_wait(struct util_queue_writer *writer);
>
> void util_queue_add_job(struct util_queue *queue,
>                         struct util_queue_writer *writer,
>                         void *job);
>
> This would also logically allow a future extension where each writer can
> have multiple jobs outstanding. In that case, the _writer structure would
> live outside the job structure.
>
> [An alternative that occurs to me now is to rename util_queue_event to
> util_queue_job and expect users of this utility to use that as a "base
> struct" for their job structure. That would simplify util_queue_add_job to
> take only one parameter. I think both alternatives have some merit.]
>
> Nicolai
>
> +
>> +/* util_queue needs to be cleared to zeroes for this to work */
>> +static inline bool
>> +util_queue_is_initialized(struct util_queue *queue)
>> +{
>> +   return queue->thread != 0;
>> +}
>> +
>> +#endif
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> index fefa5d6..737f0c4 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> @@ -605,7 +605,7 @@ amdgpu_cs_create(struct radeon_winsys_ctx *rwctx,
>>         return NULL;
>>      }
>>
>> -   pipe_semaphore_init(&cs->flush_completed, 1);
>> +   util_queue_event_init(&cs->flush_completed);
>>
>>      cs->ctx = ctx;
>>      cs->flush_cs = flush;
>> @@ -872,8 +872,9 @@ static void amdgpu_add_fence_dependencies(struct
>> amdgpu_cs *acs)
>>      }
>>   }
>>
>> -void amdgpu_cs_submit_ib(struct amdgpu_cs *acs)
>> +void amdgpu_cs_submit_ib(void *job)
>>   {
>> +   struct amdgpu_cs *acs = (struct amdgpu_cs*)job;
>>      struct amdgpu_winsys *ws = acs->ctx->ws;
>>      struct amdgpu_cs_context *cs = acs->cst;
>>      int i, r;
>> @@ -957,14 +958,11 @@ cleanup:
>>   void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs)
>>   {
>>      struct amdgpu_cs *cs = amdgpu_cs(rcs);
>> +   struct amdgpu_winsys *ws = cs->ctx->ws;
>>
>>      /* Wait for any pending ioctl of this CS to complete. */
>> -   if (cs->ctx->ws->thread) {
>> -      /* wait and set the semaphore to "busy" */
>> -      pipe_semaphore_wait(&cs->flush_completed);
>> -      /* set the semaphore to "idle" */
>> -      pipe_semaphore_signal(&cs->flush_completed);
>> -   }
>> +   if (util_queue_is_initialized(&ws->cs_queue))
>> +      util_queue_job_wait(&cs->flush_completed);
>>   }
>>
>>   DEBUG_GET_ONCE_BOOL_OPTION(noop, "RADEON_NOOP", FALSE)
>> @@ -1052,10 +1050,9 @@ static void amdgpu_cs_flush(struct
>> radeon_winsys_cs *rcs,
>>         cs->cst = cur;
>>
>>         /* Submit. */
>> -      if (ws->thread && (flags & RADEON_FLUSH_ASYNC)) {
>> -         /* Set the semaphore to "busy". */
>> -         pipe_semaphore_wait(&cs->flush_completed);
>> -         amdgpu_ws_queue_cs(ws, cs);
>> +      if ((flags & RADEON_FLUSH_ASYNC) &&
>> +          util_queue_is_initialized(&ws->cs_queue)) {
>> +         util_queue_add_job(&ws->cs_queue, cs, &cs->flush_completed);
>>         } else {
>>            amdgpu_cs_submit_ib(cs);
>>         }
>> @@ -1077,7 +1074,7 @@ static void amdgpu_cs_destroy(struct
>> radeon_winsys_cs *rcs)
>>      struct amdgpu_cs *cs = amdgpu_cs(rcs);
>>
>>      amdgpu_cs_sync_flush(rcs);
>> -   pipe_semaphore_destroy(&cs->flush_completed);
>> +   util_queue_event_destroy(&cs->flush_completed);
>>      p_atomic_dec(&cs->ctx->ws->num_cs);
>>      pb_reference(&cs->main.big_ib_buffer, NULL);
>>      FREE(cs->main.base.prev);
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> index cc1516c..ff50345 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> @@ -111,7 +111,7 @@ struct amdgpu_cs {
>>      void (*flush_cs)(void *ctx, unsigned flags, struct pipe_fence_handle
>> **fence);
>>      void *flush_data;
>>
>> -   pipe_semaphore flush_completed;
>> +   struct util_queue_event flush_completed;
>>   };
>>
>>   struct amdgpu_fence {
>> @@ -218,6 +218,6 @@ bool amdgpu_fence_wait(struct pipe_fence_handle
>> *fence, uint64_t timeout,
>>                          bool absolute);
>>   void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs);
>>   void amdgpu_cs_init_functions(struct amdgpu_winsys *ws);
>> -void amdgpu_cs_submit_ib(struct amdgpu_cs *cs);
>> +void amdgpu_cs_submit_ib(void *job);
>>
>>   #endif
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> index 7016221..7ef3529 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> @@ -308,14 +308,9 @@ static void amdgpu_winsys_destroy(struct
>> radeon_winsys *rws)
>>   {
>>      struct amdgpu_winsys *ws = (struct amdgpu_winsys*)rws;
>>
>> -   if (ws->thread) {
>> -      ws->kill_thread = 1;
>> -      pipe_semaphore_signal(&ws->cs_queued);
>> -      pipe_thread_wait(ws->thread);
>> -   }
>> -   pipe_semaphore_destroy(&ws->cs_queue_has_space);
>> -   pipe_semaphore_destroy(&ws->cs_queued);
>> -   pipe_mutex_destroy(ws->cs_queue_lock);
>> +   if (util_queue_is_initialized(&ws->cs_queue))
>> +      util_queue_destroy(&ws->cs_queue);
>> +
>>      pipe_mutex_destroy(ws->bo_fence_lock);
>>      pb_cache_deinit(&ws->bo_cache);
>>      pipe_mutex_destroy(ws->global_bo_list_lock);
>> @@ -400,53 +395,7 @@ static int compare_dev(void *key1, void *key2)
>>      return key1 != key2;
>>   }
>>
>> -void amdgpu_ws_queue_cs(struct amdgpu_winsys *ws, struct amdgpu_cs *cs)
>> -{
>> -   pipe_semaphore_wait(&ws->cs_queue_has_space);
>> -
>> -   pipe_mutex_lock(ws->cs_queue_lock);
>> -   assert(ws->num_enqueued_cs < ARRAY_SIZE(ws->cs_queue));
>> -   ws->cs_queue[ws->num_enqueued_cs++] = cs;
>> -   pipe_mutex_unlock(ws->cs_queue_lock);
>> -   pipe_semaphore_signal(&ws->cs_queued);
>> -}
>> -
>> -static PIPE_THREAD_ROUTINE(amdgpu_cs_thread_func, param)
>> -{
>> -   struct amdgpu_winsys *ws = (struct amdgpu_winsys *)param;
>> -   struct amdgpu_cs *cs;
>> -   unsigned i;
>> -
>> -   while (1) {
>> -      pipe_semaphore_wait(&ws->cs_queued);
>> -      if (ws->kill_thread)
>> -         break;
>> -
>> -      pipe_mutex_lock(ws->cs_queue_lock);
>> -      cs = ws->cs_queue[0];
>> -      for (i = 1; i < ws->num_enqueued_cs; i++)
>> -         ws->cs_queue[i - 1] = ws->cs_queue[i];
>> -      ws->cs_queue[--ws->num_enqueued_cs] = NULL;
>> -      pipe_mutex_unlock(ws->cs_queue_lock);
>> -
>> -      pipe_semaphore_signal(&ws->cs_queue_has_space);
>> -
>> -      if (cs) {
>> -         amdgpu_cs_submit_ib(cs);
>> -         pipe_semaphore_signal(&cs->flush_completed);
>> -      }
>> -   }
>> -   pipe_mutex_lock(ws->cs_queue_lock);
>> -   for (i = 0; i < ws->num_enqueued_cs; i++) {
>> -      pipe_semaphore_signal(&ws->cs_queue[i]->flush_completed);
>> -      ws->cs_queue[i] = NULL;
>> -   }
>> -   pipe_mutex_unlock(ws->cs_queue_lock);
>> -   return 0;
>> -}
>> -
>>   DEBUG_GET_ONCE_BOOL_OPTION(thread, "RADEON_THREAD", TRUE)
>> -static PIPE_THREAD_ROUTINE(amdgpu_cs_thread_func, param);
>>
>>   static bool amdgpu_winsys_unref(struct radeon_winsys *rws)
>>   {
>> @@ -541,14 +490,10 @@ amdgpu_winsys_create(int fd, radeon_screen_create_t
>> screen_create)
>>
>>      LIST_INITHEAD(&ws->global_bo_list);
>>      pipe_mutex_init(ws->global_bo_list_lock);
>> -   pipe_mutex_init(ws->cs_queue_lock);
>>      pipe_mutex_init(ws->bo_fence_lock);
>>
>> -   pipe_semaphore_init(&ws->cs_queue_has_space,
>> ARRAY_SIZE(ws->cs_queue));
>> -   pipe_semaphore_init(&ws->cs_queued, 0);
>> -
>>      if (sysconf(_SC_NPROCESSORS_ONLN) > 1 && debug_get_option_thread())
>> -      ws->thread = pipe_thread_create(amdgpu_cs_thread_func, ws);
>> +      util_queue_init(&ws->cs_queue, amdgpu_cs_submit_ib);
>>
>>      /* Create the screen at the end. The winsys must be initialized
>>       * completely.
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> index d6734f7..b13a17e 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> @@ -35,7 +35,7 @@
>>   #include "pipebuffer/pb_cache.h"
>>   #include "gallium/drivers/radeon/radeon_winsys.h"
>>   #include "addrlib/addrinterface.h"
>> -#include "os/os_thread.h"
>> +#include "util/u_queue.h"
>>   #include <amdgpu.h>
>>
>>   struct amdgpu_cs;
>> @@ -59,13 +59,7 @@ struct amdgpu_winsys {
>>      struct radeon_info info;
>>
>>      /* multithreaded IB submission */
>> -   pipe_mutex cs_queue_lock;
>> -   pipe_semaphore cs_queue_has_space;
>> -   pipe_semaphore cs_queued;
>> -   pipe_thread thread;
>> -   int kill_thread;
>> -   int num_enqueued_cs;
>> -   struct amdgpu_cs *cs_queue[8];
>> +   struct util_queue cs_queue;
>>
>>      struct amdgpu_gpu_info amdinfo;
>>      ADDR_HANDLE addrlib;
>> @@ -84,7 +78,6 @@ amdgpu_winsys(struct radeon_winsys *base)
>>      return (struct amdgpu_winsys*)base;
>>   }
>>
>> -void amdgpu_ws_queue_cs(struct amdgpu_winsys *ws, struct amdgpu_cs *cs);
>>   void amdgpu_surface_init_functions(struct amdgpu_winsys *ws);
>>   ADDR_HANDLE amdgpu_addr_create(struct amdgpu_winsys *ws);
>>
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/mesa-dev/attachments/20160614/146df215/attachment-0001.html>


More information about the mesa-dev mailing list