[Mesa-dev] [PATCH 1/2] gallium/util: import the multithreaded job queue from amdgpu winsys
Marek Olšák
maraeo at gmail.com
Tue Jun 14 13:04:21 UTC 2016
The event doesn't limit the number of outstanding jobs. It's like a fence -
you can wait for it or you don't. The limitation is that you must have
exactly one event instance for each active job. util_queue_fence is a
better name IMO. I don't plan to extend the feature set beyond that.
Marek
On Jun 14, 2016 1:26 PM, "Nicolai Hähnle" <nhaehnle at gmail.com> wrote:
> On 13.06.2016 19:34, Marek Olšák wrote:
>
>> From: Marek Olšák <marek.olsak at amd.com>
>>
>> ---
>> src/gallium/auxiliary/Makefile.sources | 2 +
>> src/gallium/auxiliary/util/u_queue.c | 129
>> ++++++++++++++++++++++++++
>> src/gallium/auxiliary/util/u_queue.h | 80 ++++++++++++++++
>> src/gallium/winsys/amdgpu/drm/amdgpu_cs.c | 23 ++---
>> src/gallium/winsys/amdgpu/drm/amdgpu_cs.h | 4 +-
>> src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c | 63 +------------
>> src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h | 11 +--
>> 7 files changed, 229 insertions(+), 83 deletions(-)
>> create mode 100644 src/gallium/auxiliary/util/u_queue.c
>> create mode 100644 src/gallium/auxiliary/util/u_queue.h
>>
>> diff --git a/src/gallium/auxiliary/Makefile.sources
>> b/src/gallium/auxiliary/Makefile.sources
>> index 7b3853e..ab58358 100644
>> --- a/src/gallium/auxiliary/Makefile.sources
>> +++ b/src/gallium/auxiliary/Makefile.sources
>> @@ -274,6 +274,8 @@ C_SOURCES := \
>> util/u_pstipple.c \
>> util/u_pstipple.h \
>> util/u_pwr8.h \
>> + util/u_queue.c \
>> + util/u_queue.h \
>> util/u_range.h \
>> util/u_rect.h \
>> util/u_resource.c \
>> diff --git a/src/gallium/auxiliary/util/u_queue.c
>> b/src/gallium/auxiliary/util/u_queue.c
>> new file mode 100644
>> index 0000000..311b591
>> --- /dev/null
>> +++ b/src/gallium/auxiliary/util/u_queue.c
>> @@ -0,0 +1,129 @@
>> +/*
>> + * Copyright © 2016 Advanced Micro Devices, Inc.
>> + * All Rights Reserved.
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining
>> + * a copy of this software and associated documentation files (the
>> + * "Software"), to deal in the Software without restriction, including
>> + * without limitation the rights to use, copy, modify, merge, publish,
>> + * distribute, sub license, and/or sell copies of the Software, and to
>> + * permit persons to whom the Software is furnished to do so, subject to
>> + * the following conditions:
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
>> + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
>> + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
>> + * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
>> + * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
>> + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
>> + * USE OR OTHER DEALINGS IN THE SOFTWARE.
>> + *
>> + * The above copyright notice and this permission notice (including the
>> + * next paragraph) shall be included in all copies or substantial
>> portions
>> + * of the Software.
>> + */
>> +
>> +#include "u_queue.h"
>> +
>> +static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
>> +{
>> + struct util_queue *queue = (struct util_queue*)param;
>> + unsigned i;
>> +
>> + while (1) {
>> + struct util_queue_job job;
>> +
>> + pipe_semaphore_wait(&queue->queued);
>> + if (queue->kill_thread)
>> + break;
>> +
>> + pipe_mutex_lock(queue->lock);
>> + job = queue->jobs[0];
>> + for (i = 1; i < queue->num_jobs; i++)
>> + queue->jobs[i - 1] = queue->jobs[i];
>> + queue->jobs[--queue->num_jobs].job = NULL;
>> + pipe_mutex_unlock(queue->lock);
>> +
>> + pipe_semaphore_signal(&queue->has_space);
>> +
>> + if (job.job) {
>> + queue->execute_job(job.job);
>> + pipe_semaphore_signal(&job.event->done);
>> + }
>> + }
>> +
>> + /* signal remaining jobs before terminating */
>> + pipe_mutex_lock(queue->lock);
>> + for (i = 0; i < queue->num_jobs; i++) {
>> + pipe_semaphore_signal(&queue->jobs[i].event->done);
>> + queue->jobs[i].job = NULL;
>> + }
>> + queue->num_jobs = 0;
>> + pipe_mutex_unlock(queue->lock);
>> + return 0;
>> +}
>> +
>> +void
>> +util_queue_init(struct util_queue *queue,
>> + void (*execute_job)(void *))
>> +{
>> + memset(queue, 0, sizeof(*queue));
>> + queue->execute_job = execute_job;
>> + pipe_mutex_init(queue->lock);
>> + pipe_semaphore_init(&queue->has_space, ARRAY_SIZE(queue->jobs));
>> + pipe_semaphore_init(&queue->queued, 0);
>> + queue->thread = pipe_thread_create(util_queue_thread_func, queue);
>> +}
>> +
>> +void
>> +util_queue_destroy(struct util_queue *queue)
>> +{
>> + queue->kill_thread = 1;
>> + pipe_semaphore_signal(&queue->queued);
>> + pipe_thread_wait(queue->thread);
>> + pipe_semaphore_destroy(&queue->has_space);
>> + pipe_semaphore_destroy(&queue->queued);
>> + pipe_mutex_destroy(queue->lock);
>> +}
>> +
>> +void
>> +util_queue_event_init(struct util_queue_event *event)
>> +{
>> + pipe_semaphore_init(&event->done, 1);
>> +}
>> +
>> +void
>> +util_queue_event_destroy(struct util_queue_event *event)
>> +{
>> + pipe_semaphore_destroy(&event->done);
>> +}
>> +
>> +void
>> +util_queue_add_job(struct util_queue *queue,
>> + void *job,
>> + struct util_queue_event *event)
>> +{
>> + /* Set the semaphore to "busy". */
>> + pipe_semaphore_wait(&event->done);
>> +
>> + /* if the queue is full, wait until there is space */
>> + pipe_semaphore_wait(&queue->has_space);
>> +
>> + pipe_mutex_lock(queue->lock);
>> + assert(queue->num_jobs < ARRAY_SIZE(queue->jobs));
>> + queue->jobs[queue->num_jobs].job = job;
>> + queue->jobs[queue->num_jobs].event = event;
>> + queue->num_jobs++;
>> + pipe_mutex_unlock(queue->lock);
>> + pipe_semaphore_signal(&queue->queued);
>> +}
>> +
>> +void
>> +util_queue_job_wait(struct util_queue_event *event)
>> +{
>> + /* wait and set the semaphore to "busy" */
>> + pipe_semaphore_wait(&event->done);
>> + /* set the semaphore to "idle" */
>> + pipe_semaphore_signal(&event->done);
>> +}
>> diff --git a/src/gallium/auxiliary/util/u_queue.h
>> b/src/gallium/auxiliary/util/u_queue.h
>> new file mode 100644
>> index 0000000..b7c1f44
>> --- /dev/null
>> +++ b/src/gallium/auxiliary/util/u_queue.h
>> @@ -0,0 +1,80 @@
>> +/*
>> + * Copyright © 2016 Advanced Micro Devices, Inc.
>> + * All Rights Reserved.
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining
>> + * a copy of this software and associated documentation files (the
>> + * "Software"), to deal in the Software without restriction, including
>> + * without limitation the rights to use, copy, modify, merge, publish,
>> + * distribute, sub license, and/or sell copies of the Software, and to
>> + * permit persons to whom the Software is furnished to do so, subject to
>> + * the following conditions:
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
>> + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
>> + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
>> + * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
>> + * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
>> + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
>> + * USE OR OTHER DEALINGS IN THE SOFTWARE.
>> + *
>> + * The above copyright notice and this permission notice (including the
>> + * next paragraph) shall be included in all copies or substantial
>> portions
>> + * of the Software.
>> + */
>> +
>> +/* Job queue with execution in a separate thread.
>> + *
>> + * Jobs can be added from any thread. After that, the wait call can be
>> used
>> + * to wait for completion of the job.
>> + */
>> +
>> +#ifndef U_QUEUE_H
>> +#define U_QUEUE_H
>> +
>> +#include "os/os_thread.h"
>> +
>> +/* Job completion event.
>> + * Put this into your job structure.
>> + */
>> +struct util_queue_event {
>> + pipe_semaphore done;
>> +};
>> +
>> +struct util_queue_job {
>> + void *job;
>> + struct util_queue_event *event;
>> +};
>> +
>> +/* Put this into your context. */
>> +struct util_queue {
>> + pipe_mutex lock;
>> + pipe_semaphore has_space;
>> + pipe_semaphore queued;
>> + pipe_thread thread;
>> + int kill_thread;
>> + int num_jobs;
>> + struct util_queue_job jobs[8];
>> + void (*execute_job)(void *job);
>> +};
>> +
>> +void util_queue_init(struct util_queue *queue,
>> + void (*execute_job)(void *));
>> +void util_queue_destroy(struct util_queue *queue);
>> +void util_queue_event_init(struct util_queue_event *event);
>> +void util_queue_event_destroy(struct util_queue_event *event);
>> +
>> +void util_queue_add_job(struct util_queue *queue,
>> + void *job,
>> + struct util_queue_event *event);
>> +void util_queue_job_wait(struct util_queue_event *event);
>>
>
> I think the util_queue_event part of the interface is basically impossible
> to understand without knowledge of the code. It does two things:
>
> - limit the number of outstanding jobs
> - wait for outstanding jobs to finish
>
> I think it should be called util_queue_writer for this reason (or perhaps
> _submitter, but _writer is shorter):
>
> void util_queue_writer_init(struct util_queue_writer *writer);
> void util_queue_writer_destroy(struct util_queue_writer *writer);
> void util_queue_writer_wait(struct util_queue_writer *writer);
>
> void util_queue_add_job(struct util_queue *queue,
> struct util_queue_writer *writer,
> void *job);
>
> This would also logically allow a future extension where each writer can
> have multiple jobs outstanding. In that case, the _writer structure would
> live outside the job structure.
>
> [An alternative that occurs to me now is to rename util_queue_event to
> util_queue_job and expect users of this utility to use that as a "base
> struct" for their job structure. That would simplify util_queue_add_job to
> take only one parameter. I think both alternatives have some merit.]
>
> Nicolai
>
> +
>> +/* util_queue needs to be cleared to zeroes for this to work */
>> +static inline bool
>> +util_queue_is_initialized(struct util_queue *queue)
>> +{
>> + return queue->thread != 0;
>> +}
>> +
>> +#endif
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> index fefa5d6..737f0c4 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
>> @@ -605,7 +605,7 @@ amdgpu_cs_create(struct radeon_winsys_ctx *rwctx,
>> return NULL;
>> }
>>
>> - pipe_semaphore_init(&cs->flush_completed, 1);
>> + util_queue_event_init(&cs->flush_completed);
>>
>> cs->ctx = ctx;
>> cs->flush_cs = flush;
>> @@ -872,8 +872,9 @@ static void amdgpu_add_fence_dependencies(struct
>> amdgpu_cs *acs)
>> }
>> }
>>
>> -void amdgpu_cs_submit_ib(struct amdgpu_cs *acs)
>> +void amdgpu_cs_submit_ib(void *job)
>> {
>> + struct amdgpu_cs *acs = (struct amdgpu_cs*)job;
>> struct amdgpu_winsys *ws = acs->ctx->ws;
>> struct amdgpu_cs_context *cs = acs->cst;
>> int i, r;
>> @@ -957,14 +958,11 @@ cleanup:
>> void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs)
>> {
>> struct amdgpu_cs *cs = amdgpu_cs(rcs);
>> + struct amdgpu_winsys *ws = cs->ctx->ws;
>>
>> /* Wait for any pending ioctl of this CS to complete. */
>> - if (cs->ctx->ws->thread) {
>> - /* wait and set the semaphore to "busy" */
>> - pipe_semaphore_wait(&cs->flush_completed);
>> - /* set the semaphore to "idle" */
>> - pipe_semaphore_signal(&cs->flush_completed);
>> - }
>> + if (util_queue_is_initialized(&ws->cs_queue))
>> + util_queue_job_wait(&cs->flush_completed);
>> }
>>
>> DEBUG_GET_ONCE_BOOL_OPTION(noop, "RADEON_NOOP", FALSE)
>> @@ -1052,10 +1050,9 @@ static void amdgpu_cs_flush(struct
>> radeon_winsys_cs *rcs,
>> cs->cst = cur;
>>
>> /* Submit. */
>> - if (ws->thread && (flags & RADEON_FLUSH_ASYNC)) {
>> - /* Set the semaphore to "busy". */
>> - pipe_semaphore_wait(&cs->flush_completed);
>> - amdgpu_ws_queue_cs(ws, cs);
>> + if ((flags & RADEON_FLUSH_ASYNC) &&
>> + util_queue_is_initialized(&ws->cs_queue)) {
>> + util_queue_add_job(&ws->cs_queue, cs, &cs->flush_completed);
>> } else {
>> amdgpu_cs_submit_ib(cs);
>> }
>> @@ -1077,7 +1074,7 @@ static void amdgpu_cs_destroy(struct
>> radeon_winsys_cs *rcs)
>> struct amdgpu_cs *cs = amdgpu_cs(rcs);
>>
>> amdgpu_cs_sync_flush(rcs);
>> - pipe_semaphore_destroy(&cs->flush_completed);
>> + util_queue_event_destroy(&cs->flush_completed);
>> p_atomic_dec(&cs->ctx->ws->num_cs);
>> pb_reference(&cs->main.big_ib_buffer, NULL);
>> FREE(cs->main.base.prev);
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> index cc1516c..ff50345 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
>> @@ -111,7 +111,7 @@ struct amdgpu_cs {
>> void (*flush_cs)(void *ctx, unsigned flags, struct pipe_fence_handle
>> **fence);
>> void *flush_data;
>>
>> - pipe_semaphore flush_completed;
>> + struct util_queue_event flush_completed;
>> };
>>
>> struct amdgpu_fence {
>> @@ -218,6 +218,6 @@ bool amdgpu_fence_wait(struct pipe_fence_handle
>> *fence, uint64_t timeout,
>> bool absolute);
>> void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs);
>> void amdgpu_cs_init_functions(struct amdgpu_winsys *ws);
>> -void amdgpu_cs_submit_ib(struct amdgpu_cs *cs);
>> +void amdgpu_cs_submit_ib(void *job);
>>
>> #endif
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> index 7016221..7ef3529 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
>> @@ -308,14 +308,9 @@ static void amdgpu_winsys_destroy(struct
>> radeon_winsys *rws)
>> {
>> struct amdgpu_winsys *ws = (struct amdgpu_winsys*)rws;
>>
>> - if (ws->thread) {
>> - ws->kill_thread = 1;
>> - pipe_semaphore_signal(&ws->cs_queued);
>> - pipe_thread_wait(ws->thread);
>> - }
>> - pipe_semaphore_destroy(&ws->cs_queue_has_space);
>> - pipe_semaphore_destroy(&ws->cs_queued);
>> - pipe_mutex_destroy(ws->cs_queue_lock);
>> + if (util_queue_is_initialized(&ws->cs_queue))
>> + util_queue_destroy(&ws->cs_queue);
>> +
>> pipe_mutex_destroy(ws->bo_fence_lock);
>> pb_cache_deinit(&ws->bo_cache);
>> pipe_mutex_destroy(ws->global_bo_list_lock);
>> @@ -400,53 +395,7 @@ static int compare_dev(void *key1, void *key2)
>> return key1 != key2;
>> }
>>
>> -void amdgpu_ws_queue_cs(struct amdgpu_winsys *ws, struct amdgpu_cs *cs)
>> -{
>> - pipe_semaphore_wait(&ws->cs_queue_has_space);
>> -
>> - pipe_mutex_lock(ws->cs_queue_lock);
>> - assert(ws->num_enqueued_cs < ARRAY_SIZE(ws->cs_queue));
>> - ws->cs_queue[ws->num_enqueued_cs++] = cs;
>> - pipe_mutex_unlock(ws->cs_queue_lock);
>> - pipe_semaphore_signal(&ws->cs_queued);
>> -}
>> -
>> -static PIPE_THREAD_ROUTINE(amdgpu_cs_thread_func, param)
>> -{
>> - struct amdgpu_winsys *ws = (struct amdgpu_winsys *)param;
>> - struct amdgpu_cs *cs;
>> - unsigned i;
>> -
>> - while (1) {
>> - pipe_semaphore_wait(&ws->cs_queued);
>> - if (ws->kill_thread)
>> - break;
>> -
>> - pipe_mutex_lock(ws->cs_queue_lock);
>> - cs = ws->cs_queue[0];
>> - for (i = 1; i < ws->num_enqueued_cs; i++)
>> - ws->cs_queue[i - 1] = ws->cs_queue[i];
>> - ws->cs_queue[--ws->num_enqueued_cs] = NULL;
>> - pipe_mutex_unlock(ws->cs_queue_lock);
>> -
>> - pipe_semaphore_signal(&ws->cs_queue_has_space);
>> -
>> - if (cs) {
>> - amdgpu_cs_submit_ib(cs);
>> - pipe_semaphore_signal(&cs->flush_completed);
>> - }
>> - }
>> - pipe_mutex_lock(ws->cs_queue_lock);
>> - for (i = 0; i < ws->num_enqueued_cs; i++) {
>> - pipe_semaphore_signal(&ws->cs_queue[i]->flush_completed);
>> - ws->cs_queue[i] = NULL;
>> - }
>> - pipe_mutex_unlock(ws->cs_queue_lock);
>> - return 0;
>> -}
>> -
>> DEBUG_GET_ONCE_BOOL_OPTION(thread, "RADEON_THREAD", TRUE)
>> -static PIPE_THREAD_ROUTINE(amdgpu_cs_thread_func, param);
>>
>> static bool amdgpu_winsys_unref(struct radeon_winsys *rws)
>> {
>> @@ -541,14 +490,10 @@ amdgpu_winsys_create(int fd, radeon_screen_create_t
>> screen_create)
>>
>> LIST_INITHEAD(&ws->global_bo_list);
>> pipe_mutex_init(ws->global_bo_list_lock);
>> - pipe_mutex_init(ws->cs_queue_lock);
>> pipe_mutex_init(ws->bo_fence_lock);
>>
>> - pipe_semaphore_init(&ws->cs_queue_has_space,
>> ARRAY_SIZE(ws->cs_queue));
>> - pipe_semaphore_init(&ws->cs_queued, 0);
>> -
>> if (sysconf(_SC_NPROCESSORS_ONLN) > 1 && debug_get_option_thread())
>> - ws->thread = pipe_thread_create(amdgpu_cs_thread_func, ws);
>> + util_queue_init(&ws->cs_queue, amdgpu_cs_submit_ib);
>>
>> /* Create the screen at the end. The winsys must be initialized
>> * completely.
>> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> index d6734f7..b13a17e 100644
>> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
>> @@ -35,7 +35,7 @@
>> #include "pipebuffer/pb_cache.h"
>> #include "gallium/drivers/radeon/radeon_winsys.h"
>> #include "addrlib/addrinterface.h"
>> -#include "os/os_thread.h"
>> +#include "util/u_queue.h"
>> #include <amdgpu.h>
>>
>> struct amdgpu_cs;
>> @@ -59,13 +59,7 @@ struct amdgpu_winsys {
>> struct radeon_info info;
>>
>> /* multithreaded IB submission */
>> - pipe_mutex cs_queue_lock;
>> - pipe_semaphore cs_queue_has_space;
>> - pipe_semaphore cs_queued;
>> - pipe_thread thread;
>> - int kill_thread;
>> - int num_enqueued_cs;
>> - struct amdgpu_cs *cs_queue[8];
>> + struct util_queue cs_queue;
>>
>> struct amdgpu_gpu_info amdinfo;
>> ADDR_HANDLE addrlib;
>> @@ -84,7 +78,6 @@ amdgpu_winsys(struct radeon_winsys *base)
>> return (struct amdgpu_winsys*)base;
>> }
>>
>> -void amdgpu_ws_queue_cs(struct amdgpu_winsys *ws, struct amdgpu_cs *cs);
>> void amdgpu_surface_init_functions(struct amdgpu_winsys *ws);
>> ADDR_HANDLE amdgpu_addr_create(struct amdgpu_winsys *ws);
>>
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/mesa-dev/attachments/20160614/146df215/attachment-0001.html>
More information about the mesa-dev
mailing list