[Beignet] [PATCH 14/57] Add cl_command_queue define to runtime.

junyan.he at inbox.com junyan.he at inbox.com
Sun Jun 11 05:50:00 UTC 2017


From: Junyan He <junyan.he at intel.com>

We create a worker thread in command queue to handle the event
requirement. Each equeueXXX api will generate a enqueue event,
and will be add to exec list in command queue worker thread.

Signed-off-by: Junyan He <junyan.he at intel.com>
---
 runtime/cl_command_queue.c | 465 +++++++++++++++++++++++++++++++++++++++++++++
 runtime/cl_command_queue.h |  75 ++++++++
 2 files changed, 540 insertions(+)
 create mode 100644 runtime/cl_command_queue.c
 create mode 100644 runtime/cl_command_queue.h

diff --git a/runtime/cl_command_queue.c b/runtime/cl_command_queue.c
new file mode 100644
index 0000000..b54f1cb
--- /dev/null
+++ b/runtime/cl_command_queue.c
@@ -0,0 +1,465 @@
+/*
+ * Copyright © 2012 Intel Corporation
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: He Junyan <junyan.he at intel.com>
+ */
+
+#include "cl_command_queue.h"
+#include "cl_alloc.h"
+#include "cl_device_id.h"
+#include "cl_event.h"
+
+static cl_command_queue
+cl_command_queue_new(cl_context ctx)
+{
+  cl_command_queue queue = NULL;
+
+  assert(ctx);
+  queue = CL_CALLOC(1, sizeof(_cl_command_queue));
+  if (queue == NULL)
+    return NULL;
+
+  CL_OBJECT_INIT_BASE(queue, CL_OBJECT_COMMAND_QUEUE_MAGIC);
+  if (cl_command_queue_init_enqueue(queue) != CL_SUCCESS) {
+    CL_FREE(queue);
+    return NULL;
+  }
+
+  /* Append the command queue in the list */
+  cl_context_add_queue(ctx, queue);
+  return queue;
+}
+
+LOCAL cl_command_queue
+cl_command_queue_create(cl_context ctx, cl_device_id device, cl_command_queue_properties properties,
+                        cl_uint queue_size, cl_int *errcode_ret)
+{
+  cl_command_queue queue = cl_command_queue_new(ctx);
+  if (queue == NULL) {
+    *errcode_ret = CL_OUT_OF_HOST_MEMORY;
+  }
+
+  queue->props = properties;
+  queue->device = device;
+  queue->size = queue_size;
+
+  *errcode_ret = device->api.command_queue_create(device, queue);
+  if (*errcode_ret != CL_SUCCESS) {
+    cl_command_queue_delete(queue);
+    return NULL;
+  }
+
+  return queue;
+}
+
+LOCAL void
+cl_command_queue_delete(cl_command_queue queue)
+{
+  assert(queue);
+  if (CL_OBJECT_DEC_REF(queue) > 1)
+    return;
+
+  cl_command_queue_destroy_enqueue(queue);
+
+  queue->device->api.command_queue_create(queue->device, queue);
+
+  cl_context_remove_queue(queue->ctx, queue);
+  if (queue->barrier_events) {
+    CL_FREE(queue->barrier_events);
+  }
+  CL_OBJECT_DESTROY_BASE(queue);
+  CL_FREE(queue);
+}
+
+LOCAL void
+cl_command_queue_add_ref(cl_command_queue queue)
+{
+  CL_OBJECT_INC_REF(queue);
+}
+
+LOCAL void
+cl_command_queue_insert_barrier_event(cl_command_queue queue, cl_event event)
+{
+  cl_int i = 0;
+
+  cl_event_add_ref(event);
+
+  assert(queue != NULL);
+  CL_OBJECT_LOCK(queue);
+
+  if (queue->barrier_events == NULL) {
+    queue->barrier_events_size = 4;
+    queue->barrier_events = CL_CALLOC(queue->barrier_events_size, sizeof(cl_event));
+    assert(queue->barrier_events);
+  }
+
+  for (i = 0; i < queue->barrier_events_num; i++) {
+    assert(queue->barrier_events[i] != event);
+  }
+
+  if (queue->barrier_events_num < queue->barrier_events_size) {
+    queue->barrier_events[queue->barrier_events_num++] = event;
+    CL_OBJECT_UNLOCK(queue);
+    return;
+  }
+
+  /* Array is full, double expand. */
+  queue->barrier_events_size *= 2;
+  queue->barrier_events = CL_REALLOC(queue->barrier_events,
+                                     queue->barrier_events_size * sizeof(cl_event));
+  assert(queue->barrier_events);
+
+  queue->barrier_events[queue->barrier_events_num++] = event;
+  CL_OBJECT_UNLOCK(queue);
+  return;
+}
+
+LOCAL void
+cl_command_queue_remove_barrier_event(cl_command_queue queue, cl_event event)
+{
+  cl_int i = 0;
+  assert(queue != NULL);
+
+  CL_OBJECT_LOCK(queue);
+
+  assert(queue->barrier_events_num > 0);
+  assert(queue->barrier_events);
+
+  for (i = 0; i < queue->barrier_events_num; i++) {
+    if (queue->barrier_events[i] == event)
+      break;
+  }
+  assert(i < queue->barrier_events_num); // Must find it.
+
+  if (i == queue->barrier_events_num - 1) { // The last one.
+    queue->barrier_events[i] = NULL;
+  } else {
+    for (; i < queue->barrier_events_num - 1; i++) { // Move forward.
+      queue->barrier_events[i] = queue->barrier_events[i + 1];
+    }
+  }
+  queue->barrier_events_num -= 1;
+  CL_OBJECT_UNLOCK(queue);
+
+  cl_event_delete(event);
+}
+
+static void *
+worker_thread_function(void *Arg)
+{
+  cl_command_queue_enqueue_worker worker = (cl_command_queue_enqueue_worker)Arg;
+  cl_command_queue queue = worker->queue;
+  cl_event e;
+  cl_uint cookie = -1;
+  list_node *pos;
+  list_node *n;
+  list_head ready_list;
+  cl_int exec_status;
+
+  CL_OBJECT_LOCK(queue);
+
+  while (1) {
+    /* Must have locked here. */
+
+    if (worker->quit == CL_TRUE) {
+      CL_OBJECT_UNLOCK(queue);
+      return NULL;
+    }
+
+    if (list_empty(&worker->enqueued_events)) {
+      CL_OBJECT_WAIT_ON_COND(queue);
+      continue;
+    }
+
+    /* The cookie will change when event status change or something happend to
+       this command queue. If we already checked the event list and do not find
+       anything to exec, we need to wait the cookie update, to avoid loop for ever. */
+    if (cookie == worker->cookie) {
+      CL_OBJECT_WAIT_ON_COND(queue);
+      continue;
+    }
+
+    /* Here we hold lock to check event status, to avoid missing the status notify*/
+    list_init(&ready_list);
+    list_for_each_safe(pos, n, &worker->enqueued_events)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      if (cl_event_is_ready(e) <= CL_COMPLETE) {
+        list_node_del(&e->enqueue_node);
+        list_add_tail(&ready_list, &e->enqueue_node);
+      }
+    }
+
+    if (list_empty(&ready_list)) { /* Nothing to do, just wait. */
+      cookie = worker->cookie;
+      continue;
+    }
+
+    /* Notify waiters, we change the event list. */
+    CL_OBJECT_NOTIFY_COND(queue);
+
+    worker->in_exec_status = CL_QUEUED;
+    CL_OBJECT_UNLOCK(queue);
+
+    /* Do the really job without lock.*/
+    exec_status = CL_SUBMITTED;
+    list_for_each_safe(pos, n, &ready_list)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      cl_event_exec(e, exec_status, CL_FALSE);
+    }
+
+    /* Notify all waiting for flush. */
+    CL_OBJECT_LOCK(queue);
+    worker->in_exec_status = CL_SUBMITTED;
+    CL_OBJECT_NOTIFY_COND(queue);
+    CL_OBJECT_UNLOCK(queue);
+
+    list_for_each_safe(pos, n, &ready_list)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      cl_event_exec(e, CL_COMPLETE, CL_FALSE);
+    }
+
+    /* Clear and delete all the events. */
+    list_for_each_safe(pos, n, &ready_list)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      list_node_del(&e->enqueue_node);
+      cl_event_delete(e);
+    }
+
+    CL_OBJECT_LOCK(queue);
+    worker->in_exec_status = CL_COMPLETE;
+
+    /* Notify finish waiters, we have done all the ready event. */
+    CL_OBJECT_NOTIFY_COND(queue);
+  }
+}
+
+LOCAL void
+cl_command_queue_notify(cl_command_queue queue)
+{
+  if (CL_OBJECT_GET_REF(queue) < 1) {
+    return;
+  }
+
+  assert(queue && (((cl_base_object)queue)->magic == CL_OBJECT_COMMAND_QUEUE_MAGIC));
+  CL_OBJECT_LOCK(queue);
+  queue->worker.cookie++;
+  CL_OBJECT_NOTIFY_COND(queue);
+  CL_OBJECT_UNLOCK(queue);
+}
+
+LOCAL void
+cl_command_queue_enqueue_event(cl_command_queue queue, cl_event event)
+{
+  CL_OBJECT_INC_REF(event);
+  assert(CL_OBJECT_IS_COMMAND_QUEUE(queue));
+  CL_OBJECT_LOCK(queue);
+  assert(queue->worker.quit == CL_FALSE);
+  assert(list_node_out_of_list(&event->enqueue_node));
+  list_add_tail(&queue->worker.enqueued_events, &event->enqueue_node);
+  queue->worker.cookie++;
+  CL_OBJECT_NOTIFY_COND(queue);
+  CL_OBJECT_UNLOCK(queue);
+}
+
+LOCAL cl_int
+cl_command_queue_init_enqueue(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  worker->queue = queue;
+  worker->quit = CL_FALSE;
+  worker->in_exec_status = CL_COMPLETE;
+  worker->cookie = 8;
+  list_init(&worker->enqueued_events);
+
+  if (pthread_create(&worker->tid, NULL, worker_thread_function, worker)) {
+    CL_LOG_ERROR("Can not create worker thread for queue %p...\n", queue);
+    return CL_OUT_OF_RESOURCES;
+  }
+
+  return CL_SUCCESS;
+}
+
+LOCAL void
+cl_command_queue_destroy_enqueue(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  list_node *pos;
+  list_node *n;
+  cl_event e;
+
+  assert(worker->queue == queue);
+  assert(worker->quit == CL_FALSE);
+
+  CL_OBJECT_LOCK(queue);
+  worker->quit = 1;
+  CL_OBJECT_NOTIFY_COND(queue);
+  CL_OBJECT_UNLOCK(queue);
+
+  pthread_join(worker->tid, NULL);
+
+  /* We will wait for finish before destroy the command queue. */
+  if (!list_empty(&worker->enqueued_events)) {
+    CL_LOG_WARNING("There are still some enqueued works in the queue %p when this"
+                   " queue is destroyed, this may cause very serious problems.\n",
+                   queue);
+
+    list_for_each_safe(pos, n, &worker->enqueued_events)
+    {
+      e = list_entry(pos, _cl_event, enqueue_node);
+      list_node_del(&e->enqueue_node);
+      cl_event_set_status(e, -1); // Give waiters a chance to wakeup.
+      cl_event_delete(e);
+    }
+  }
+}
+
+/* Note: Must call this function with queue's lock. */
+LOCAL cl_event *
+cl_command_queue_record_in_queue_events(cl_command_queue queue, cl_uint *list_num)
+{
+  int event_num = 0;
+  list_node *pos;
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  cl_event *enqueued_list = NULL;
+  int i;
+  cl_event tmp_e = NULL;
+
+  list_for_each(pos, &worker->enqueued_events)
+  {
+    event_num++;
+  }
+  assert(event_num > 0);
+
+  enqueued_list = CL_CALLOC(event_num, sizeof(cl_event));
+  assert(enqueued_list);
+
+  i = 0;
+  list_for_each(pos, &worker->enqueued_events)
+  {
+    tmp_e = list_entry(pos, _cl_event, enqueue_node);
+    cl_event_add_ref(tmp_e); // Add ref temp avoid delete.
+    enqueued_list[i] = tmp_e;
+    i++;
+  }
+  assert(i == event_num);
+
+  *list_num = event_num;
+  return enqueued_list;
+}
+
+LOCAL cl_int
+cl_command_queue_wait_flush(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  cl_event *enqueued_list = NULL;
+  cl_uint enqueued_num = 0;
+  int i;
+
+  CL_OBJECT_LOCK(queue);
+
+  if (worker->quit) { // already destroy the queue?
+    CL_OBJECT_UNLOCK(queue);
+    return CL_INVALID_COMMAND_QUEUE;
+  }
+
+  if (!list_empty(&worker->enqueued_events)) {
+    enqueued_list = cl_command_queue_record_in_queue_events(queue, &enqueued_num);
+    assert(enqueued_num > 0);
+    assert(enqueued_list);
+  }
+
+  while (worker->in_exec_status == CL_QUEUED) {
+    CL_OBJECT_WAIT_ON_COND(queue);
+
+    if (worker->quit) { // already destroy the queue?
+      CL_OBJECT_UNLOCK(queue);
+      return CL_INVALID_COMMAND_QUEUE;
+    }
+  }
+
+  CL_OBJECT_UNLOCK(queue);
+
+  /* Wait all event enter submitted status. */
+  for (i = 0; i < enqueued_num; i++) {
+    CL_OBJECT_LOCK(enqueued_list[i]);
+    while (enqueued_list[i]->status > CL_SUBMITTED) {
+      CL_OBJECT_WAIT_ON_COND(enqueued_list[i]);
+    }
+    CL_OBJECT_UNLOCK(enqueued_list[i]);
+  }
+
+  for (i = 0; i < enqueued_num; i++) {
+    cl_event_delete(enqueued_list[i]);
+  }
+  if (enqueued_list)
+    CL_FREE(enqueued_list);
+
+  return CL_SUCCESS;
+}
+
+LOCAL cl_int
+cl_command_queue_wait_finish(cl_command_queue queue)
+{
+  cl_command_queue_enqueue_worker worker = &queue->worker;
+  cl_event *enqueued_list = NULL;
+  cl_uint enqueued_num = 0;
+  int i;
+
+  CL_OBJECT_LOCK(queue);
+
+  if (worker->quit) { // already destroy the queue?
+    CL_OBJECT_UNLOCK(queue);
+    return CL_INVALID_COMMAND_QUEUE;
+  }
+
+  if (!list_empty(&worker->enqueued_events)) {
+    enqueued_list = cl_command_queue_record_in_queue_events(queue, &enqueued_num);
+    assert(enqueued_num > 0);
+    assert(enqueued_list);
+  }
+
+  while (worker->in_exec_status > CL_COMPLETE) {
+    CL_OBJECT_WAIT_ON_COND(queue);
+
+    if (worker->quit) { // already destroy the queue?
+      CL_OBJECT_UNLOCK(queue);
+      return CL_INVALID_COMMAND_QUEUE;
+    }
+  }
+
+  CL_OBJECT_UNLOCK(queue);
+
+  /* Wait all event enter submitted status. */
+  for (i = 0; i < enqueued_num; i++) {
+    CL_OBJECT_LOCK(enqueued_list[i]);
+    while (enqueued_list[i]->status > CL_COMPLETE) {
+      CL_OBJECT_WAIT_ON_COND(enqueued_list[i]);
+    }
+    CL_OBJECT_UNLOCK(enqueued_list[i]);
+  }
+
+  for (i = 0; i < enqueued_num; i++) {
+    cl_event_delete(enqueued_list[i]);
+  }
+  if (enqueued_list)
+    CL_FREE(enqueued_list);
+
+  return CL_SUCCESS;
+}
diff --git a/runtime/cl_command_queue.h b/runtime/cl_command_queue.h
new file mode 100644
index 0000000..5058443
--- /dev/null
+++ b/runtime/cl_command_queue.h
@@ -0,0 +1,75 @@
+/* 
+ * Copyright © 2012 Intel Corporation
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Benjamin Segovia <benjamin.segovia at intel.com>
+ */
+
+#ifndef __CL_COMMAND_QUEUE_H__
+#define __CL_COMMAND_QUEUE_H__
+
+#include "cl_base_object.h"
+#include "cl_context.h"
+#include "CL/cl.h"
+
+typedef struct _cl_command_queue_enqueue_worker {
+  cl_command_queue queue;
+  pthread_t tid;
+  cl_uint cookie;
+  cl_bool quit;
+  list_head enqueued_events;
+  cl_uint in_exec_status; // Same value as CL_COMPLETE, CL_SUBMITTED ...
+} _cl_command_queue_enqueue_worker;
+
+typedef _cl_command_queue_enqueue_worker *cl_command_queue_enqueue_worker;
+
+/* Basically, this is a (kind-of) batch buffer */
+typedef struct _cl_command_queue {
+  _cl_base_object base;
+  _cl_command_queue_enqueue_worker worker;
+  cl_context ctx;                    /* Its parent context */
+  cl_device_id device;               /* Its device */
+  cl_event *barrier_events;          /* Point to array of non-complete user events that block this command queue */
+  cl_int barrier_events_num;         /* Number of Non-complete user events */
+  cl_int barrier_events_size;        /* The size of array that wait_events point to */
+  cl_command_queue_properties props; /* Queue properties */
+  cl_uint size;                      /* Store the specified size for queueu */
+  void *command_queue_for_device;    /* device's specific content */
+} _cl_command_queue;
+
+#define CL_OBJECT_COMMAND_QUEUE_MAGIC 0x83650a12b79ce4efLL
+#define CL_OBJECT_IS_COMMAND_QUEUE(obj) ((obj &&                                                           \
+                                          ((cl_base_object)obj)->magic == CL_OBJECT_COMMAND_QUEUE_MAGIC && \
+                                          CL_OBJECT_GET_REF(obj) >= 1))
+
+/* Allocate and initialize a new command queue. Also insert it in the list of
+ * command queue in the associated context */
+extern cl_command_queue cl_command_queue_create(cl_context, cl_device_id,
+                                                cl_command_queue_properties, cl_uint, cl_int *);
+/* Destroy and deallocate the command queue */
+extern void cl_command_queue_delete(cl_command_queue);
+/* Keep one more reference on the queue */
+extern void cl_command_queue_add_ref(cl_command_queue);
+extern void cl_command_queue_insert_barrier_event(cl_command_queue queue, cl_event event);
+extern void cl_command_queue_remove_barrier_event(cl_command_queue queue, cl_event event);
+extern void cl_command_queue_notify(cl_command_queue queue);
+extern void cl_command_queue_enqueue_event(cl_command_queue queue, cl_event event);
+extern cl_int cl_command_queue_init_enqueue(cl_command_queue queue);
+extern void cl_command_queue_destroy_enqueue(cl_command_queue queue);
+extern cl_int cl_command_queue_wait_finish(cl_command_queue queue);
+extern cl_int cl_command_queue_wait_flush(cl_command_queue queue);
+/* Note: Must call this function with queue's lock. */
+extern cl_event *cl_command_queue_record_in_queue_events(cl_command_queue queue, cl_uint *list_num);
+#endif /* __CL_COMMAND_QUEUE_H__ */
-- 
2.7.4





More information about the Beignet mailing list