[Beignet] [PATCH 4/9] Add command queue's enqueue thread.

junyan.he at inbox.com junyan.he at inbox.com
Wed Sep 21 09:47:19 UTC 2016


From: Junyan He <junyan.he at intel.com>

According to Spec, event should be more suitable to implement
in async mode. We now add a thread to each command queue to
handle the event commands. The basic idea is:
1. If a command depends on other events which are not COMPLETED,
this command must be queued to that thread. Every event's status
change will notify the command queue, and give that thread a
chance to dequeue and run the enqueued commands.
2. For some BLOCK API, such as MapBuffer with BLOCK flag set, we
will wait for all the events in wait list ready and execute it
in sync mode, no event will be queued to that thread.
3. For NDRange like commands, because we want to gain the best
performance, we will check its wait list, if all are COMPLETED,
we SUBMIT that NDRange command, and set the event to SUBMUTTED
status. Event will also be queued to that thread, and that
thread will wait for it COMPLETED.

Signed-off-by: Junyan He <junyan.he at intel.com>
---
 src/cl_command_queue.h         |  21 +++
 src/cl_command_queue_enqueue.c | 321 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 342 insertions(+)
 create mode 100644 src/cl_command_queue_enqueue.c

diff --git a/src/cl_command_queue.h b/src/cl_command_queue.h
index f0b421d..34886f8 100644
--- a/src/cl_command_queue.h
+++ b/src/cl_command_queue.h
@@ -29,9 +29,21 @@
 
 struct intel_gpgpu;
 
+typedef struct _cl_command_queue_enqueue_worker {
+  cl_command_queue queue;
+  pthread_t tid;
+  cl_uint cookie;
+  cl_bool quit;
+  list_head enqueued_events;
+  cl_uint in_exec_status; // Same value as CL_COMPLETE, CL_SUBMITTED ...
+} _cl_command_queue_enqueue_worker;
+
+typedef _cl_command_queue_enqueue_worker *cl_command_queue_enqueue_worker;
+
 /* Basically, this is a (kind-of) batch buffer */
 struct _cl_command_queue {
   _cl_base_object base;
+  _cl_command_queue_enqueue_worker worker;
   cl_context ctx;                      /* Its parent context */
   cl_event* barrier_events;               /* Point to array of non-complete user events that block this command queue */
   cl_int    barrier_events_num;           /* Number of Non-complete user events */
@@ -102,5 +114,14 @@ extern void cl_command_queue_insert_barrier_event(cl_command_queue queue, cl_eve
 
 extern void cl_command_queue_remove_barrier_event(cl_command_queue queue, cl_event event);
 
+extern void cl_command_queue_notify(cl_command_queue queue);
+extern void cl_command_queue_enqueue_event(cl_command_queue queue, cl_event event);
+extern cl_int cl_command_queue_init_enqueue(cl_command_queue queue);
+extern void cl_command_queue_destroy_enqueue(cl_command_queue queue);
+extern cl_int cl_command_queue_wait_finish(cl_command_queue queue);
+extern cl_int cl_command_queue_wait_flush(cl_command_queue queue);
+/* Note: Must call this function with queue's lock. */
+extern cl_event *cl_command_queue_record_in_queue_events(cl_command_queue queue, cl_uint *list_num);
+
 #endif /* __CL_COMMAND_QUEUE_H__ */
 
diff --git a/src/cl_command_queue_enqueue.c b/src/cl_command_queue_enqueue.c
new file mode 100644
index 0000000..1848d50
--- /dev/null
+++ b/src/cl_command_queue_enqueue.c
@@ -0,0 +1,321 @@
+/*
+ * Copyright © 2012 Intel Corporation
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: He Junyan <junyan.he at intel.com>
+ */
+
+#include "cl_command_queue.h"
+#include "cl_event_new.h"
+#include "cl_alloc.h"
+#include <stdio.h>
+
+static void *worker_thread_function(void *Arg)
+{
+  cl_command_queue_enqueue_worker worker = (cl_command_queue_enqueue_worker)Arg;
+  cl_command_queue queue = worker->queue;
+  cl_event e;
+  cl_uint cookie = -1;
+  list_head *pos;
+  list_head *n;
+  list_head ready_list;
+  cl_int exec_status;
+
+  CL_OBJECT_LOCK(queue);
+
+  while (1) {
+    /* Must have locked here. */
+
+    if (worker->quit == CL_TRUE) {
+      CL_OBJECT_UNLOCK(queue);
+      return NULL;
+    }
+
+    if (list_empty(&worker->enqueued_events)) {
+      CL_OBJECT_WAIT_ON_COND(queue);
+      continue;
+    }
+
+    /* The cookie will change when event status change or something happend to
+       this command queue. If we already checked the event list and do not find
+       anything to exec, we need to wait the cookie update, to avoid loop for ever. */
+    if (cookie == worker->cookie) {
+      CL_OBJECT_WAIT_ON_COND(queue);
+      continue;
+    }
+
+    /* Here we hold lock to check event status, to avoid missing the status notify*/
+    list_init(&ready_list);
+    list_for_each_safe(pos, n, &worker->enqueued_events)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      if (cl_event_is_ready(e) <= CL_COMPLETE) {
+        list_del(&e->enqueue_node);
+        list_add_tail(&e->enqueue_node, &ready_list);
+      }
+    }
+
+    if (list_empty(&ready_list)) { /* Nothing to do, just wait. */
+      cookie = worker->cookie;
+      continue;
+    }
+
+    /* Notify waiters, we change the event list. */
+    CL_OBJECT_NOTIFY_COND(queue);
+
+    worker->in_exec_status = CL_QUEUED;
+    CL_OBJECT_UNLOCK(queue);
+
+    /* Do the really job without lock.*/
+    exec_status = CL_SUBMITTED;
+    list_for_each_safe(pos, n, &ready_list)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      cl_event_exec(e, exec_status);
+    }
+
+    /* Notify all waiting for flush. */
+    CL_OBJECT_LOCK(queue);
+    worker->in_exec_status = CL_SUBMITTED;
+    CL_OBJECT_NOTIFY_COND(queue);
+    CL_OBJECT_UNLOCK(queue);
+
+    for (exec_status = CL_RUNNING; exec_status >= CL_COMPLETE; exec_status--) {
+      list_for_each_safe(pos, n, &ready_list)
+      {
+        e = list_entry(pos, _cl_event, enqueue_node);
+        cl_event_exec(e, exec_status);
+      }
+    }
+
+    /* Clear and delete all the events. */
+    list_for_each_safe(pos, n, &ready_list)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      list_del(&e->enqueue_node);
+      cl_event_delete(e);
+    }
+
+    CL_OBJECT_LOCK(queue);
+    worker->in_exec_status = CL_COMPLETE;
+
+    /* Notify finish waiters, we have done all the ready event. */
+    CL_OBJECT_NOTIFY_COND(queue);
+  }
+}
+
+LOCAL void
+cl_command_queue_notify(cl_command_queue queue)
+{
+  if (CL_OBJECT_GET_REF(queue) < 1) {
+    return;
+  }
+
+  assert(CL_OBJECT_IS_COMMAND_QUEUE(queue));
+  CL_OBJECT_LOCK(queue);
+  queue->worker.cookie++;
+  CL_OBJECT_NOTIFY_COND(queue);
+  CL_OBJECT_UNLOCK(queue);
+}
+
+LOCAL void
+cl_command_queue_enqueue_event(cl_command_queue queue, cl_event event)
+{
+  CL_OBJECT_INC_REF(event);
+  assert(CL_OBJECT_IS_COMMAND_QUEUE(queue));
+  CL_OBJECT_LOCK(queue);
+  assert(queue->worker.quit == CL_FALSE);
+  assert(list_empty(&event->enqueue_node));
+  list_add_tail(&event->enqueue_node, &queue->worker.enqueued_events);
+  queue->worker.cookie++;
+  CL_OBJECT_NOTIFY_COND(queue);
+  CL_OBJECT_UNLOCK(queue);
+}
+
+LOCAL cl_int
+cl_command_queue_init_enqueue(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  worker->queue = queue;
+  worker->quit = CL_FALSE;
+  worker->in_exec_status = CL_COMPLETE;
+  worker->cookie = 8;
+  list_init(&worker->enqueued_events);
+
+  if (pthread_create(&worker->tid, NULL, worker_thread_function, worker)) {
+    DEBUGP(DL_ERROR, "Can not create worker thread for queue %p...\n", queue);
+    return CL_OUT_OF_RESOURCES;
+  }
+
+  return CL_SUCCESS;
+}
+
+LOCAL void
+cl_command_queue_destroy_enqueue(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+
+  assert(worker->queue == queue);
+  assert(worker->quit == CL_FALSE);
+
+  CL_OBJECT_LOCK(queue);
+  worker->quit = 1;
+  CL_OBJECT_NOTIFY_COND(queue);
+  CL_OBJECT_UNLOCK(queue);
+
+  pthread_join(worker->tid, NULL);
+
+  /* We will wait for finish before destroy the command queue. */
+  if (!list_empty(&worker->enqueued_events)) {
+    DEBUGP(DL_WARNING, "There are still some enqueued works in the queue %p when this queue is destroyed,"
+                       " this may cause very serious problems.\n",
+           queue);
+    assert(0);
+  }
+}
+
+/* Note: Must call this function with queue's lock. */
+LOCAL cl_event *
+cl_command_queue_record_in_queue_events(cl_command_queue queue, cl_uint *list_num)
+{
+  int event_num = 0;
+  list_head *pos;
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  cl_event *enqueued_list = NULL;
+  int i;
+  cl_event tmp_e = NULL;
+
+  list_for_each(pos, &worker->enqueued_events)
+  {
+    event_num++;
+  }
+  assert(event_num > 0);
+
+  enqueued_list = CL_CALLOC(event_num, sizeof(cl_event));
+  assert(enqueued_list);
+
+  i = 0;
+  list_for_each(pos, &worker->enqueued_events)
+  {
+    tmp_e = list_entry(pos, _cl_event, enqueue_node);
+    cl_event_add_ref(tmp_e); // Add ref temp avoid delete.
+    enqueued_list[i] = tmp_e;
+    i++;
+  }
+  assert(i == event_num);
+
+  *list_num = event_num;
+  return enqueued_list;
+}
+
+LOCAL cl_int
+cl_command_queue_wait_flush(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  cl_event *enqueued_list = NULL;
+  cl_uint enqueued_num = 0;
+  int i;
+
+  CL_OBJECT_LOCK(queue);
+
+  if (worker->quit) { // already destroy the queue?
+    CL_OBJECT_UNLOCK(queue);
+    return CL_INVALID_COMMAND_QUEUE;
+  }
+
+  if (!list_empty(&worker->enqueued_events)) {
+    enqueued_list = cl_command_queue_record_in_queue_events(queue, &enqueued_num);
+    assert(enqueued_num > 0);
+    assert(enqueued_list);
+  }
+
+  while (worker->in_exec_status == CL_QUEUED) {
+    CL_OBJECT_WAIT_ON_COND(queue);
+
+    if (worker->quit) { // already destroy the queue?
+      CL_OBJECT_UNLOCK(queue);
+      return CL_INVALID_COMMAND_QUEUE;
+    }
+  }
+
+  CL_OBJECT_UNLOCK(queue);
+
+  /* Wait all event enter submitted status. */
+  for (i = 0; i < enqueued_num; i++) {
+    CL_OBJECT_LOCK(enqueued_list[i]);
+    while (enqueued_list[i]->status > CL_SUBMITTED) {
+      CL_OBJECT_WAIT_ON_COND(enqueued_list[i]);
+    }
+    CL_OBJECT_UNLOCK(enqueued_list[i]);
+  }
+
+  for (i = 0; i < enqueued_num; i++) {
+    cl_event_delete(enqueued_list[i]);
+  }
+  if (enqueued_list)
+    CL_FREE(enqueued_list);
+
+  return CL_SUCCESS;
+}
+
+LOCAL cl_int
+cl_command_queue_wait_finish(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  cl_event *enqueued_list = NULL;
+  cl_uint enqueued_num = 0;
+  int i;
+
+  CL_OBJECT_LOCK(queue);
+
+  if (worker->quit) { // already destroy the queue?
+    CL_OBJECT_UNLOCK(queue);
+    return CL_INVALID_COMMAND_QUEUE;
+  }
+
+  if (!list_empty(&worker->enqueued_events)) {
+    enqueued_list = cl_command_queue_record_in_queue_events(queue, &enqueued_num);
+    assert(enqueued_num > 0);
+    assert(enqueued_list);
+  }
+
+  while (worker->in_exec_status > CL_COMPLETE) {
+    CL_OBJECT_WAIT_ON_COND(queue);
+
+    if (worker->quit) { // already destroy the queue?
+      CL_OBJECT_UNLOCK(queue);
+      return CL_INVALID_COMMAND_QUEUE;
+    }
+  }
+
+  CL_OBJECT_UNLOCK(queue);
+
+  /* Wait all event enter submitted status. */
+  for (i = 0; i < enqueued_num; i++) {
+    CL_OBJECT_LOCK(enqueued_list[i]);
+    while (enqueued_list[i]->status > CL_COMPLETE) {
+      CL_OBJECT_WAIT_ON_COND(enqueued_list[i]);
+    }
+    CL_OBJECT_UNLOCK(enqueued_list[i]);
+  }
+
+  for (i = 0; i < enqueued_num; i++) {
+    cl_event_delete(enqueued_list[i]);
+  }
+  if (enqueued_list)
+    CL_FREE(enqueued_list);
+
+  return CL_SUCCESS;
+}
-- 
2.7.4





More information about the Beignet mailing list