[Ezbench-dev] [PATCH 13/25] smartezbench: resume incomplete runs, if possible

Petri Latvala petri.latvala at intel.com
Fri Feb 24 11:19:14 UTC 2017


From: Martin Peres <martin.peres at linux.intel.com>

v2 (Petri): Allow resuming runs with subtests
v3 (Martin):
 - Resuming tasks should be done on the original version
 - Re-schedule a failed resume as a run
 - Change the task description for resume runs
---
 python-modules/ezbench/smartezbench.py | 126 +++++++++++++++++++++++----------
 1 file changed, 88 insertions(+), 38 deletions(-)

diff --git a/python-modules/ezbench/smartezbench.py b/python-modules/ezbench/smartezbench.py
index 5f3474b..300761b 100644
--- a/python-modules/ezbench/smartezbench.py
+++ b/python-modules/ezbench/smartezbench.py
@@ -25,7 +25,7 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
 
-from collections import namedtuple
+from collections import namedtuple, deque
 from datetime import datetime, timedelta
 from enum import Enum
 import numpy as np
@@ -92,10 +92,11 @@ def list_smart_ezbench_report_names(ezbench_dir, updatedSince = 0):
     return reports
 
 class TaskEntry:
-    def __init__(self, commit, test, rounds):
+    def __init__(self, commit, test, rounds, resumeResultFile = None):
         self.commit = commit
         self.test = test
         self.rounds = rounds
+        self.resumeResultFile = resumeResultFile
         self.start_date = None
         self.exec_time = None
         self.build_time = None
@@ -133,7 +134,10 @@ class TaskEntry:
         return self.predicted_completion_time() - elapsed
 
     def __str__(self):
-        string = "{}: {}: {} run(s)".format(self.commit, self.test, self.rounds)
+        if self.resumeResultFile is None:
+            string = "{}: {}: {} run(s)".format(self.commit, self.test, self.rounds)
+        else:
+            string = "resume {}".format(self.resumeResultFile)
 
         total_delta = self.predicted_completion_time()
         if total_delta.total_seconds() > 0:
@@ -147,7 +151,10 @@ class TaskEntry:
                 else:
                     remaining_str = str(timedelta(0, math.floor(-remaining.total_seconds()))) + "s overtime"
 
-                string += "({:.2f}%, {})".format(progress, remaining_str)
+                if self.resumeResultFile is None:
+                    string += "({:.2f}%, {})".format(progress, remaining_str)
+                else:
+                    string += "(> {:.2f}%, up to {})".format(progress, remaining_str)
             else:
                 rounded_total_delta = timedelta(0, math.ceil(total_delta.total_seconds()))
                 string += "(estimated completion time: {}s)".format(rounded_total_delta)
@@ -445,33 +452,36 @@ class SmartEzbench:
         self.__write_attribute__('commit_url', commit_url, allow_updates = True)
         self.__log(Criticality.II, "Report commit URL has been changed to '{}'".format(commit_url))
 
-    def __add_test_unlocked__(self, commit, test, rounds = None):
+    def __task_tree_add_test__(self, task_tree, commit, test, rounds):
+        if commit not in task_tree:
+            task_tree[commit] = dict()
+            task_tree[commit]["tests"] = dict()
+
+        if test not in task_tree[commit]['tests']:
+            task_tree[commit]['tests'][test] = dict()
+            task_tree[commit]['tests'][test]['rounds'] = rounds
+        else:
+            task_tree[commit]['tests'][test]['rounds'] += rounds
+
+        # if the number of rounds is equal to 0 for a test, delete it
+        if task_tree[commit]['tests'][test]['rounds'] <= 0:
+            del task_tree[commit]['tests'][test]
+
+        # Delete a commit that has no test
+        if len(task_tree[commit]['tests']) == 0:
+            del task_tree[commit]
+
+    def __add_test_unlocked__(self, commit, test, rounds):
         scm = self.repo()
         if scm is not None:
             commit = scm.full_version_name(commit)
 
-        if commit not in self.state['commits']:
-            self.state['commits'][commit] = dict()
-            self.state['commits'][commit]["tests"] = dict()
-
         if rounds is None:
             rounds = 3
         else:
             rounds = int(rounds)
 
-        if test not in self.state['commits'][commit]['tests']:
-            self.state['commits'][commit]['tests'][test] = dict()
-            self.state['commits'][commit]['tests'][test]['rounds'] = rounds
-        else:
-            self.state['commits'][commit]['tests'][test]['rounds'] += rounds
-
-        # if the number of rounds is equal to 0 for a test, delete it
-        if self.state['commits'][commit]['tests'][test]['rounds'] <= 0:
-            del self.state['commits'][commit]['tests'][test]
-
-        # Delete a commit that has no test
-        if len(self.state['commits'][commit]['tests']) == 0:
-            del self.state['commits'][commit]
+        self.__task_tree_add_test__(self.state['commits'], commit, test, rounds)
 
         # If the state was DONE, set it back to RUN
         if self.__running_mode_unlocked__(check_running=False) == RunningMode.DONE:
@@ -563,8 +573,8 @@ class SmartEzbench:
 
         return c, tl, self._events_str
 
-    def __prioritize_runs(self, task_tree, deployed_version):
-        task_list = list()
+    def __prioritize_runs(self, task_tree, deployed_version, resumable_tasks):
+        task_list = deque()
 
         # Aggregate all the subtests
         for commit in task_tree:
@@ -587,7 +597,25 @@ class SmartEzbench:
                 task_tree[commit]["tests"][full_name] = dict()
                 task_tree[commit]["tests"][full_name]["rounds"] = test_rounds[basename]
 
-        # Schedule the tests using the already-deployed version
+        # Schedule resumable tasks. First the already-deployed
+        # versions, other versions later
+        for task in resumable_tasks:
+            result_file = task.get("result_file", None)
+            if result_file is not None:
+                entry = TaskEntry(task["version"], task["test"], 1, result_file)
+            else:
+                continue
+
+            if task["version"] == deployed_version:
+                task_list.appendleft(entry)
+            else:
+                task_list.append(entry)
+
+            # Get rid of the task
+            self.__task_tree_add_test__(task_tree, entry.commit, entry.test, -1)
+
+        # Schedule the tests using the already-deployed version after
+        # all resumable tasks
         if deployed_version is not None and deployed_version in task_tree:
             for test in task_tree[deployed_version]["tests"]:
                 rounds = task_tree[deployed_version]["tests"][test]["rounds"]
@@ -678,8 +706,8 @@ class SmartEzbench:
                     for key in result.results():
                         full_name = Test.partial_name(result.test.full_name, [key])
                         SmartEzbench.__remove_task_from_tasktree__(task_tree, commit.full_sha1, full_name, len(result.result(key)))
-                        # HACK: Remove this when all the new reports use the full_sha1 for storage
-                        SmartEzbench.__remove_task_from_tasktree__(task_tree, commit.sha1, full_name, len(result.result(key)))
+
+            resumable_tasks = report.journal.incomplete_tests()
 
             # Delete the tests on commits that do not compile
             for commit in report.commits:
@@ -696,7 +724,7 @@ class SmartEzbench:
             pass
 
         # Return the result
-        q.put((exit_code, task_tree, events_str))
+        q.put((exit_code, task_tree, events_str, resumable_tasks))
 
     def run(self):
         self.__log(Criticality.II, "----------------------")
@@ -731,7 +759,7 @@ class SmartEzbench:
         p = multiprocessing.Process(target=SmartEzbench.__generate_task_and_events_list__,
                                     args=(q, self.state, self.log_folder, self.repo()))
         p.start()
-        exit_code, task_tree, self._events_str = q.get()
+        exit_code, task_tree, self._events_str, resumable_tasks = q.get()
         p.join()
 
         if len(task_tree) == 0:
@@ -740,13 +768,14 @@ class SmartEzbench:
 
         task_tree_str = pprint.pformat(task_tree)
         self.__log(Criticality.II, "Task list: {tsk_str}".format(tsk_str=task_tree_str))
+        self.__log(Criticality.II, "Incomplete runs: {}".format([r['result_file'] for r in resumable_tasks]))
 
         # Lock the report for further changes (like for profiles)
         self.__write_attribute__('beenRunBefore', True)
 
         # Prioritize --> return a list of commits to do in order
         self._task_lock.acquire()
-        self._task_list = self.__prioritize_runs(task_tree, deployed_commit)
+        self._task_list = self.__prioritize_runs(task_tree, deployed_commit, resumable_tasks)
 
         # Call the hook file, telling we started running
         self.__call_hook__('start_running_tests')
@@ -763,7 +792,7 @@ class SmartEzbench:
                 self.__done_running__(runner)
                 return False
 
-            self._task_current = e = self._task_list.pop(0)
+            self._task_current = e = self._task_list.popleft()
             short_name=e.test[:80].rsplit('|', 1)[0]+'...'
             self.__log(Criticality.DD,
                        "make {count} runs for test {test} using commit {commit}".format(count=e.rounds,
@@ -784,22 +813,43 @@ class SmartEzbench:
             self._task_current.started()
             for r in range(0, e.rounds):
                 self._task_lock.release()
+
                 try:
-                    time, cmd_output = runner.run(e.commit, e.test, False)
+                    if e.resumeResultFile is not None:
+                        time, cmd_output = runner.resume(e.commit, e.test, e.resumeResultFile, False)
+                    else:
+                        time, cmd_output = runner.run(e.commit, e.test, False)
+                    self._task_lock.acquire()
                 except RunnerError as error:
-                    err_code = error.args[0]['err_code']
+                    # Acquire the lock as we are about to modify the task list
+                    self._task_lock.acquire()
+
                     # We got an error, let's see what we can do about it!
-                    if (err_code.value != RunnerErrorCode.NO_ERROR and
-                        err_code.value < RunnerErrorCode.COMP_DEP_UNK_ERROR.value):
+                    err_code = error.args[0]['err_code']
+                    err_str = error.args[0]['err_str']
+                    if (err_code == RunnerErrorCode.CMD_TEST_EXEC_TYPE_UNSUPPORTED or
+                        err_code == RunnerErrorCode.CMD_TEST_EXEC_TYPE_NEED_VALID_RESULT_FILE or
+                        err_code == RunnerErrorCode.CMD_RESULT_ALREADY_COMPLETE):
+                        # The result is un-resumable, schedule a full run at the end
+                        self._task_list.append(TaskEntry(e.commit, e.test, 1))
+                    elif err_code == RunnerErrorCode.REBOOT_NEEDED:
+                        # TODO: have some sort of hooks here to warn the rest of the world
+                        # that we are about to reboot
+                        self._task_list = []
+                        self._task_current = None
+                        self.__log(Criticality.II, "Rebooting...")
+                        runner.reboot()
+                        sys.exit(0)
+                    elif (err_code.value != RunnerErrorCode.NO_ERROR and
+                          err_code.value < RunnerErrorCode.COMP_DEP_UNK_ERROR.value):
                         # Error we cannot do anything about, probably a setup issue
                         # Let's mark the run as aborted until the user resets it!
-                        self.__log(Criticality.EE, "The run returned the error {}".format(err_code))
+                        self.__log(Criticality.EE,  error.args[0]['err_code'])
                         self.set_running_mode(RunningMode.ERROR)
                     elif (err_code == RunnerErrorCode.COMPILATION_FAILED or
-                        err_code == RunnerErrorCode.DEPLOYMENT_FAILED):
+                          err_code == RunnerErrorCode.DEPLOYMENT_FAILED):
                         # Cancel any other test on this commit
                         self._task_list = [x for x in self._task_list if not x.commit == e.commit]
-                self._task_lock.acquire()
 
         self._task_current = None
 
-- 
2.9.3



More information about the Ezbench-dev mailing list