[Mesa-dev] [PATCH 1/2] gallium/util: import the multithreaded job queue from amdgpu winsys
Nicolai Hähnle
nhaehnle at gmail.com
Tue Jun 14 13:26:25 UTC 2016
On 14.06.2016 15:04, Marek Olšák wrote:
> The event doesn't limit the number of outstanding jobs. It's like a
> fence - you can wait for it or you don't. The limitation is that you
> must have exactly one event instance for each active job.
Right, that's what I meant: only one job can be outstanding for each
util_queue_event. I didn't express myself very clearly.
> util_queue_fence is a better name IMO. I don't plan to extend the
> feature set beyond that.
I guess that works as well.
Nicolai
>
> Marek
>
> On Jun 14, 2016 1:26 PM, "Nicolai Hähnle" <nhaehnle at gmail.com
> <mailto:nhaehnle at gmail.com>> wrote:
>
> On 13.06.2016 19:34, Marek Olšák wrote:
>
> From: Marek Olšák <marek.olsak at amd.com <mailto:marek.olsak at amd.com>>
>
> ---
> src/gallium/auxiliary/Makefile.sources | 2 +
> src/gallium/auxiliary/util/u_queue.c | 129
> ++++++++++++++++++++++++++
> src/gallium/auxiliary/util/u_queue.h | 80
> ++++++++++++++++
> src/gallium/winsys/amdgpu/drm/amdgpu_cs.c | 23 ++---
> src/gallium/winsys/amdgpu/drm/amdgpu_cs.h | 4 +-
> src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c | 63 +------------
> src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h | 11 +--
> 7 files changed, 229 insertions(+), 83 deletions(-)
> create mode 100644 src/gallium/auxiliary/util/u_queue.c
> create mode 100644 src/gallium/auxiliary/util/u_queue.h
>
> diff --git a/src/gallium/auxiliary/Makefile.sources
> b/src/gallium/auxiliary/Makefile.sources
> index 7b3853e..ab58358 100644
> --- a/src/gallium/auxiliary/Makefile.sources
> +++ b/src/gallium/auxiliary/Makefile.sources
> @@ -274,6 +274,8 @@ C_SOURCES := \
> util/u_pstipple.c \
> util/u_pstipple.h \
> util/u_pwr8.h \
> + util/u_queue.c \
> + util/u_queue.h \
> util/u_range.h \
> util/u_rect.h \
> util/u_resource.c \
> diff --git a/src/gallium/auxiliary/util/u_queue.c
> b/src/gallium/auxiliary/util/u_queue.c
> new file mode 100644
> index 0000000..311b591
> --- /dev/null
> +++ b/src/gallium/auxiliary/util/u_queue.c
> @@ -0,0 +1,129 @@
> +/*
> + * Copyright © 2016 Advanced Micro Devices, Inc.
> + * All Rights Reserved.
> + *
> + * Permission is hereby granted, free of charge, to any person
> obtaining
> + * a copy of this software and associated documentation files (the
> + * "Software"), to deal in the Software without restriction,
> including
> + * without limitation the rights to use, copy, modify, merge,
> publish,
> + * distribute, sub license, and/or sell copies of the Software,
> and to
> + * permit persons to whom the Software is furnished to do so,
> subject to
> + * the following conditions:
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
> + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
> + * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS,
> AUTHORS
> + * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
> + * USE OR OTHER DEALINGS IN THE SOFTWARE.
> + *
> + * The above copyright notice and this permission notice
> (including the
> + * next paragraph) shall be included in all copies or
> substantial portions
> + * of the Software.
> + */
> +
> +#include "u_queue.h"
> +
> +static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
> +{
> + struct util_queue *queue = (struct util_queue*)param;
> + unsigned i;
> +
> + while (1) {
> + struct util_queue_job job;
> +
> + pipe_semaphore_wait(&queue->queued);
> + if (queue->kill_thread)
> + break;
> +
> + pipe_mutex_lock(queue->lock);
> + job = queue->jobs[0];
> + for (i = 1; i < queue->num_jobs; i++)
> + queue->jobs[i - 1] = queue->jobs[i];
> + queue->jobs[--queue->num_jobs].job = NULL;
> + pipe_mutex_unlock(queue->lock);
> +
> + pipe_semaphore_signal(&queue->has_space);
> +
> + if (job.job) {
> + queue->execute_job(job.job);
> + pipe_semaphore_signal(&job.event->done);
> + }
> + }
> +
> + /* signal remaining jobs before terminating */
> + pipe_mutex_lock(queue->lock);
> + for (i = 0; i < queue->num_jobs; i++) {
> + pipe_semaphore_signal(&queue->jobs[i].event->done);
> + queue->jobs[i].job = NULL;
> + }
> + queue->num_jobs = 0;
> + pipe_mutex_unlock(queue->lock);
> + return 0;
> +}
> +
> +void
> +util_queue_init(struct util_queue *queue,
> + void (*execute_job)(void *))
> +{
> + memset(queue, 0, sizeof(*queue));
> + queue->execute_job = execute_job;
> + pipe_mutex_init(queue->lock);
> + pipe_semaphore_init(&queue->has_space, ARRAY_SIZE(queue->jobs));
> + pipe_semaphore_init(&queue->queued, 0);
> + queue->thread = pipe_thread_create(util_queue_thread_func,
> queue);
> +}
> +
> +void
> +util_queue_destroy(struct util_queue *queue)
> +{
> + queue->kill_thread = 1;
> + pipe_semaphore_signal(&queue->queued);
> + pipe_thread_wait(queue->thread);
> + pipe_semaphore_destroy(&queue->has_space);
> + pipe_semaphore_destroy(&queue->queued);
> + pipe_mutex_destroy(queue->lock);
> +}
> +
> +void
> +util_queue_event_init(struct util_queue_event *event)
> +{
> + pipe_semaphore_init(&event->done, 1);
> +}
> +
> +void
> +util_queue_event_destroy(struct util_queue_event *event)
> +{
> + pipe_semaphore_destroy(&event->done);
> +}
> +
> +void
> +util_queue_add_job(struct util_queue *queue,
> + void *job,
> + struct util_queue_event *event)
> +{
> + /* Set the semaphore to "busy". */
> + pipe_semaphore_wait(&event->done);
> +
> + /* if the queue is full, wait until there is space */
> + pipe_semaphore_wait(&queue->has_space);
> +
> + pipe_mutex_lock(queue->lock);
> + assert(queue->num_jobs < ARRAY_SIZE(queue->jobs));
> + queue->jobs[queue->num_jobs].job = job;
> + queue->jobs[queue->num_jobs].event = event;
> + queue->num_jobs++;
> + pipe_mutex_unlock(queue->lock);
> + pipe_semaphore_signal(&queue->queued);
> +}
> +
> +void
> +util_queue_job_wait(struct util_queue_event *event)
> +{
> + /* wait and set the semaphore to "busy" */
> + pipe_semaphore_wait(&event->done);
> + /* set the semaphore to "idle" */
> + pipe_semaphore_signal(&event->done);
> +}
> diff --git a/src/gallium/auxiliary/util/u_queue.h
> b/src/gallium/auxiliary/util/u_queue.h
> new file mode 100644
> index 0000000..b7c1f44
> --- /dev/null
> +++ b/src/gallium/auxiliary/util/u_queue.h
> @@ -0,0 +1,80 @@
> +/*
> + * Copyright © 2016 Advanced Micro Devices, Inc.
> + * All Rights Reserved.
> + *
> + * Permission is hereby granted, free of charge, to any person
> obtaining
> + * a copy of this software and associated documentation files (the
> + * "Software"), to deal in the Software without restriction,
> including
> + * without limitation the rights to use, copy, modify, merge,
> publish,
> + * distribute, sub license, and/or sell copies of the Software,
> and to
> + * permit persons to whom the Software is furnished to do so,
> subject to
> + * the following conditions:
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
> + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
> + * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS,
> AUTHORS
> + * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
> + * USE OR OTHER DEALINGS IN THE SOFTWARE.
> + *
> + * The above copyright notice and this permission notice
> (including the
> + * next paragraph) shall be included in all copies or
> substantial portions
> + * of the Software.
> + */
> +
> +/* Job queue with execution in a separate thread.
> + *
> + * Jobs can be added from any thread. After that, the wait call
> can be used
> + * to wait for completion of the job.
> + */
> +
> +#ifndef U_QUEUE_H
> +#define U_QUEUE_H
> +
> +#include "os/os_thread.h"
> +
> +/* Job completion event.
> + * Put this into your job structure.
> + */
> +struct util_queue_event {
> + pipe_semaphore done;
> +};
> +
> +struct util_queue_job {
> + void *job;
> + struct util_queue_event *event;
> +};
> +
> +/* Put this into your context. */
> +struct util_queue {
> + pipe_mutex lock;
> + pipe_semaphore has_space;
> + pipe_semaphore queued;
> + pipe_thread thread;
> + int kill_thread;
> + int num_jobs;
> + struct util_queue_job jobs[8];
> + void (*execute_job)(void *job);
> +};
> +
> +void util_queue_init(struct util_queue *queue,
> + void (*execute_job)(void *));
> +void util_queue_destroy(struct util_queue *queue);
> +void util_queue_event_init(struct util_queue_event *event);
> +void util_queue_event_destroy(struct util_queue_event *event);
> +
> +void util_queue_add_job(struct util_queue *queue,
> + void *job,
> + struct util_queue_event *event);
> +void util_queue_job_wait(struct util_queue_event *event);
>
>
> I think the util_queue_event part of the interface is basically
> impossible to understand without knowledge of the code. It does two
> things:
>
> - limit the number of outstanding jobs
> - wait for outstanding jobs to finish
>
> I think it should be called util_queue_writer for this reason (or
> perhaps _submitter, but _writer is shorter):
>
> void util_queue_writer_init(struct util_queue_writer *writer);
> void util_queue_writer_destroy(struct util_queue_writer *writer);
> void util_queue_writer_wait(struct util_queue_writer *writer);
>
> void util_queue_add_job(struct util_queue *queue,
> struct util_queue_writer *writer,
> void *job);
>
> This would also logically allow a future extension where each writer
> can have multiple jobs outstanding. In that case, the _writer
> structure would live outside the job structure.
>
> [An alternative that occurs to me now is to rename util_queue_event
> to util_queue_job and expect users of this utility to use that as a
> "base struct" for their job structure. That would simplify
> util_queue_add_job to take only one parameter. I think both
> alternatives have some merit.]
>
> Nicolai
>
> +
> +/* util_queue needs to be cleared to zeroes for this to work */
> +static inline bool
> +util_queue_is_initialized(struct util_queue *queue)
> +{
> + return queue->thread != 0;
> +}
> +
> +#endif
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> index fefa5d6..737f0c4 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
> @@ -605,7 +605,7 @@ amdgpu_cs_create(struct radeon_winsys_ctx
> *rwctx,
> return NULL;
> }
>
> - pipe_semaphore_init(&cs->flush_completed, 1);
> + util_queue_event_init(&cs->flush_completed);
>
> cs->ctx = ctx;
> cs->flush_cs = flush;
> @@ -872,8 +872,9 @@ static void
> amdgpu_add_fence_dependencies(struct amdgpu_cs *acs)
> }
> }
>
> -void amdgpu_cs_submit_ib(struct amdgpu_cs *acs)
> +void amdgpu_cs_submit_ib(void *job)
> {
> + struct amdgpu_cs *acs = (struct amdgpu_cs*)job;
> struct amdgpu_winsys *ws = acs->ctx->ws;
> struct amdgpu_cs_context *cs = acs->cst;
> int i, r;
> @@ -957,14 +958,11 @@ cleanup:
> void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs)
> {
> struct amdgpu_cs *cs = amdgpu_cs(rcs);
> + struct amdgpu_winsys *ws = cs->ctx->ws;
>
> /* Wait for any pending ioctl of this CS to complete. */
> - if (cs->ctx->ws->thread) {
> - /* wait and set the semaphore to "busy" */
> - pipe_semaphore_wait(&cs->flush_completed);
> - /* set the semaphore to "idle" */
> - pipe_semaphore_signal(&cs->flush_completed);
> - }
> + if (util_queue_is_initialized(&ws->cs_queue))
> + util_queue_job_wait(&cs->flush_completed);
> }
>
> DEBUG_GET_ONCE_BOOL_OPTION(noop, "RADEON_NOOP", FALSE)
> @@ -1052,10 +1050,9 @@ static void amdgpu_cs_flush(struct
> radeon_winsys_cs *rcs,
> cs->cst = cur;
>
> /* Submit. */
> - if (ws->thread && (flags & RADEON_FLUSH_ASYNC)) {
> - /* Set the semaphore to "busy". */
> - pipe_semaphore_wait(&cs->flush_completed);
> - amdgpu_ws_queue_cs(ws, cs);
> + if ((flags & RADEON_FLUSH_ASYNC) &&
> + util_queue_is_initialized(&ws->cs_queue)) {
> + util_queue_add_job(&ws->cs_queue, cs,
> &cs->flush_completed);
> } else {
> amdgpu_cs_submit_ib(cs);
> }
> @@ -1077,7 +1074,7 @@ static void amdgpu_cs_destroy(struct
> radeon_winsys_cs *rcs)
> struct amdgpu_cs *cs = amdgpu_cs(rcs);
>
> amdgpu_cs_sync_flush(rcs);
> - pipe_semaphore_destroy(&cs->flush_completed);
> + util_queue_event_destroy(&cs->flush_completed);
> p_atomic_dec(&cs->ctx->ws->num_cs);
> pb_reference(&cs->main.big_ib_buffer, NULL);
> FREE(cs->main.base.prev);
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> index cc1516c..ff50345 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
> @@ -111,7 +111,7 @@ struct amdgpu_cs {
> void (*flush_cs)(void *ctx, unsigned flags, struct
> pipe_fence_handle **fence);
> void *flush_data;
>
> - pipe_semaphore flush_completed;
> + struct util_queue_event flush_completed;
> };
>
> struct amdgpu_fence {
> @@ -218,6 +218,6 @@ bool amdgpu_fence_wait(struct
> pipe_fence_handle *fence, uint64_t timeout,
> bool absolute);
> void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs);
> void amdgpu_cs_init_functions(struct amdgpu_winsys *ws);
> -void amdgpu_cs_submit_ib(struct amdgpu_cs *cs);
> +void amdgpu_cs_submit_ib(void *job);
>
> #endif
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> index 7016221..7ef3529 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
> @@ -308,14 +308,9 @@ static void amdgpu_winsys_destroy(struct
> radeon_winsys *rws)
> {
> struct amdgpu_winsys *ws = (struct amdgpu_winsys*)rws;
>
> - if (ws->thread) {
> - ws->kill_thread = 1;
> - pipe_semaphore_signal(&ws->cs_queued);
> - pipe_thread_wait(ws->thread);
> - }
> - pipe_semaphore_destroy(&ws->cs_queue_has_space);
> - pipe_semaphore_destroy(&ws->cs_queued);
> - pipe_mutex_destroy(ws->cs_queue_lock);
> + if (util_queue_is_initialized(&ws->cs_queue))
> + util_queue_destroy(&ws->cs_queue);
> +
> pipe_mutex_destroy(ws->bo_fence_lock);
> pb_cache_deinit(&ws->bo_cache);
> pipe_mutex_destroy(ws->global_bo_list_lock);
> @@ -400,53 +395,7 @@ static int compare_dev(void *key1, void *key2)
> return key1 != key2;
> }
>
> -void amdgpu_ws_queue_cs(struct amdgpu_winsys *ws, struct
> amdgpu_cs *cs)
> -{
> - pipe_semaphore_wait(&ws->cs_queue_has_space);
> -
> - pipe_mutex_lock(ws->cs_queue_lock);
> - assert(ws->num_enqueued_cs < ARRAY_SIZE(ws->cs_queue));
> - ws->cs_queue[ws->num_enqueued_cs++] = cs;
> - pipe_mutex_unlock(ws->cs_queue_lock);
> - pipe_semaphore_signal(&ws->cs_queued);
> -}
> -
> -static PIPE_THREAD_ROUTINE(amdgpu_cs_thread_func, param)
> -{
> - struct amdgpu_winsys *ws = (struct amdgpu_winsys *)param;
> - struct amdgpu_cs *cs;
> - unsigned i;
> -
> - while (1) {
> - pipe_semaphore_wait(&ws->cs_queued);
> - if (ws->kill_thread)
> - break;
> -
> - pipe_mutex_lock(ws->cs_queue_lock);
> - cs = ws->cs_queue[0];
> - for (i = 1; i < ws->num_enqueued_cs; i++)
> - ws->cs_queue[i - 1] = ws->cs_queue[i];
> - ws->cs_queue[--ws->num_enqueued_cs] = NULL;
> - pipe_mutex_unlock(ws->cs_queue_lock);
> -
> - pipe_semaphore_signal(&ws->cs_queue_has_space);
> -
> - if (cs) {
> - amdgpu_cs_submit_ib(cs);
> - pipe_semaphore_signal(&cs->flush_completed);
> - }
> - }
> - pipe_mutex_lock(ws->cs_queue_lock);
> - for (i = 0; i < ws->num_enqueued_cs; i++) {
> - pipe_semaphore_signal(&ws->cs_queue[i]->flush_completed);
> - ws->cs_queue[i] = NULL;
> - }
> - pipe_mutex_unlock(ws->cs_queue_lock);
> - return 0;
> -}
> -
> DEBUG_GET_ONCE_BOOL_OPTION(thread, "RADEON_THREAD", TRUE)
> -static PIPE_THREAD_ROUTINE(amdgpu_cs_thread_func, param);
>
> static bool amdgpu_winsys_unref(struct radeon_winsys *rws)
> {
> @@ -541,14 +490,10 @@ amdgpu_winsys_create(int fd,
> radeon_screen_create_t screen_create)
>
> LIST_INITHEAD(&ws->global_bo_list);
> pipe_mutex_init(ws->global_bo_list_lock);
> - pipe_mutex_init(ws->cs_queue_lock);
> pipe_mutex_init(ws->bo_fence_lock);
>
> - pipe_semaphore_init(&ws->cs_queue_has_space,
> ARRAY_SIZE(ws->cs_queue));
> - pipe_semaphore_init(&ws->cs_queued, 0);
> -
> if (sysconf(_SC_NPROCESSORS_ONLN) > 1 &&
> debug_get_option_thread())
> - ws->thread = pipe_thread_create(amdgpu_cs_thread_func, ws);
> + util_queue_init(&ws->cs_queue, amdgpu_cs_submit_ib);
>
> /* Create the screen at the end. The winsys must be
> initialized
> * completely.
> diff --git a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
> b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
> index d6734f7..b13a17e 100644
> --- a/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
> +++ b/src/gallium/winsys/amdgpu/drm/amdgpu_winsys.h
> @@ -35,7 +35,7 @@
> #include "pipebuffer/pb_cache.h"
> #include "gallium/drivers/radeon/radeon_winsys.h"
> #include "addrlib/addrinterface.h"
> -#include "os/os_thread.h"
> +#include "util/u_queue.h"
> #include <amdgpu.h>
>
> struct amdgpu_cs;
> @@ -59,13 +59,7 @@ struct amdgpu_winsys {
> struct radeon_info info;
>
> /* multithreaded IB submission */
> - pipe_mutex cs_queue_lock;
> - pipe_semaphore cs_queue_has_space;
> - pipe_semaphore cs_queued;
> - pipe_thread thread;
> - int kill_thread;
> - int num_enqueued_cs;
> - struct amdgpu_cs *cs_queue[8];
> + struct util_queue cs_queue;
>
> struct amdgpu_gpu_info amdinfo;
> ADDR_HANDLE addrlib;
> @@ -84,7 +78,6 @@ amdgpu_winsys(struct radeon_winsys *base)
> return (struct amdgpu_winsys*)base;
> }
>
> -void amdgpu_ws_queue_cs(struct amdgpu_winsys *ws, struct
> amdgpu_cs *cs);
> void amdgpu_surface_init_functions(struct amdgpu_winsys *ws);
> ADDR_HANDLE amdgpu_addr_create(struct amdgpu_winsys *ws);
>
>
More information about the mesa-dev
mailing list