[Ezbench-dev] [PATCH 13/25] smartezbench: resume incomplete runs, if possible
Petri Latvala
petri.latvala at intel.com
Fri Feb 24 11:19:14 UTC 2017
From: Martin Peres <martin.peres at linux.intel.com>
v2 (Petri): Allow resuming runs with subtests
v3 (Martin):
- Resuming tasks should be done on the original version
- Re-schedule a failed resume as a run
- Change the task description for resume runs
---
python-modules/ezbench/smartezbench.py | 126 +++++++++++++++++++++++----------
1 file changed, 88 insertions(+), 38 deletions(-)
diff --git a/python-modules/ezbench/smartezbench.py b/python-modules/ezbench/smartezbench.py
index 5f3474b..300761b 100644
--- a/python-modules/ezbench/smartezbench.py
+++ b/python-modules/ezbench/smartezbench.py
@@ -25,7 +25,7 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
-from collections import namedtuple
+from collections import namedtuple, deque
from datetime import datetime, timedelta
from enum import Enum
import numpy as np
@@ -92,10 +92,11 @@ def list_smart_ezbench_report_names(ezbench_dir, updatedSince = 0):
return reports
class TaskEntry:
- def __init__(self, commit, test, rounds):
+ def __init__(self, commit, test, rounds, resumeResultFile = None):
self.commit = commit
self.test = test
self.rounds = rounds
+ self.resumeResultFile = resumeResultFile
self.start_date = None
self.exec_time = None
self.build_time = None
@@ -133,7 +134,10 @@ class TaskEntry:
return self.predicted_completion_time() - elapsed
def __str__(self):
- string = "{}: {}: {} run(s)".format(self.commit, self.test, self.rounds)
+ if self.resumeResultFile is None:
+ string = "{}: {}: {} run(s)".format(self.commit, self.test, self.rounds)
+ else:
+ string = "resume {}".format(self.resumeResultFile)
total_delta = self.predicted_completion_time()
if total_delta.total_seconds() > 0:
@@ -147,7 +151,10 @@ class TaskEntry:
else:
remaining_str = str(timedelta(0, math.floor(-remaining.total_seconds()))) + "s overtime"
- string += "({:.2f}%, {})".format(progress, remaining_str)
+ if self.resumeResultFile is None:
+ string += "({:.2f}%, {})".format(progress, remaining_str)
+ else:
+ string += "(> {:.2f}%, up to {})".format(progress, remaining_str)
else:
rounded_total_delta = timedelta(0, math.ceil(total_delta.total_seconds()))
string += "(estimated completion time: {}s)".format(rounded_total_delta)
@@ -445,33 +452,36 @@ class SmartEzbench:
self.__write_attribute__('commit_url', commit_url, allow_updates = True)
self.__log(Criticality.II, "Report commit URL has been changed to '{}'".format(commit_url))
- def __add_test_unlocked__(self, commit, test, rounds = None):
+ def __task_tree_add_test__(self, task_tree, commit, test, rounds):
+ if commit not in task_tree:
+ task_tree[commit] = dict()
+ task_tree[commit]["tests"] = dict()
+
+ if test not in task_tree[commit]['tests']:
+ task_tree[commit]['tests'][test] = dict()
+ task_tree[commit]['tests'][test]['rounds'] = rounds
+ else:
+ task_tree[commit]['tests'][test]['rounds'] += rounds
+
+ # if the number of rounds is equal to 0 for a test, delete it
+ if task_tree[commit]['tests'][test]['rounds'] <= 0:
+ del task_tree[commit]['tests'][test]
+
+ # Delete a commit that has no test
+ if len(task_tree[commit]['tests']) == 0:
+ del task_tree[commit]
+
+ def __add_test_unlocked__(self, commit, test, rounds):
scm = self.repo()
if scm is not None:
commit = scm.full_version_name(commit)
- if commit not in self.state['commits']:
- self.state['commits'][commit] = dict()
- self.state['commits'][commit]["tests"] = dict()
-
if rounds is None:
rounds = 3
else:
rounds = int(rounds)
- if test not in self.state['commits'][commit]['tests']:
- self.state['commits'][commit]['tests'][test] = dict()
- self.state['commits'][commit]['tests'][test]['rounds'] = rounds
- else:
- self.state['commits'][commit]['tests'][test]['rounds'] += rounds
-
- # if the number of rounds is equal to 0 for a test, delete it
- if self.state['commits'][commit]['tests'][test]['rounds'] <= 0:
- del self.state['commits'][commit]['tests'][test]
-
- # Delete a commit that has no test
- if len(self.state['commits'][commit]['tests']) == 0:
- del self.state['commits'][commit]
+ self.__task_tree_add_test__(self.state['commits'], commit, test, rounds)
# If the state was DONE, set it back to RUN
if self.__running_mode_unlocked__(check_running=False) == RunningMode.DONE:
@@ -563,8 +573,8 @@ class SmartEzbench:
return c, tl, self._events_str
- def __prioritize_runs(self, task_tree, deployed_version):
- task_list = list()
+ def __prioritize_runs(self, task_tree, deployed_version, resumable_tasks):
+ task_list = deque()
# Aggregate all the subtests
for commit in task_tree:
@@ -587,7 +597,25 @@ class SmartEzbench:
task_tree[commit]["tests"][full_name] = dict()
task_tree[commit]["tests"][full_name]["rounds"] = test_rounds[basename]
- # Schedule the tests using the already-deployed version
+ # Schedule resumable tasks. First the already-deployed
+ # versions, other versions later
+ for task in resumable_tasks:
+ result_file = task.get("result_file", None)
+ if result_file is not None:
+ entry = TaskEntry(task["version"], task["test"], 1, result_file)
+ else:
+ continue
+
+ if task["version"] == deployed_version:
+ task_list.appendleft(entry)
+ else:
+ task_list.append(entry)
+
+ # Get rid of the task
+ self.__task_tree_add_test__(task_tree, entry.commit, entry.test, -1)
+
+ # Schedule the tests using the already-deployed version after
+ # all resumable tasks
if deployed_version is not None and deployed_version in task_tree:
for test in task_tree[deployed_version]["tests"]:
rounds = task_tree[deployed_version]["tests"][test]["rounds"]
@@ -678,8 +706,8 @@ class SmartEzbench:
for key in result.results():
full_name = Test.partial_name(result.test.full_name, [key])
SmartEzbench.__remove_task_from_tasktree__(task_tree, commit.full_sha1, full_name, len(result.result(key)))
- # HACK: Remove this when all the new reports use the full_sha1 for storage
- SmartEzbench.__remove_task_from_tasktree__(task_tree, commit.sha1, full_name, len(result.result(key)))
+
+ resumable_tasks = report.journal.incomplete_tests()
# Delete the tests on commits that do not compile
for commit in report.commits:
@@ -696,7 +724,7 @@ class SmartEzbench:
pass
# Return the result
- q.put((exit_code, task_tree, events_str))
+ q.put((exit_code, task_tree, events_str, resumable_tasks))
def run(self):
self.__log(Criticality.II, "----------------------")
@@ -731,7 +759,7 @@ class SmartEzbench:
p = multiprocessing.Process(target=SmartEzbench.__generate_task_and_events_list__,
args=(q, self.state, self.log_folder, self.repo()))
p.start()
- exit_code, task_tree, self._events_str = q.get()
+ exit_code, task_tree, self._events_str, resumable_tasks = q.get()
p.join()
if len(task_tree) == 0:
@@ -740,13 +768,14 @@ class SmartEzbench:
task_tree_str = pprint.pformat(task_tree)
self.__log(Criticality.II, "Task list: {tsk_str}".format(tsk_str=task_tree_str))
+ self.__log(Criticality.II, "Incomplete runs: {}".format([r['result_file'] for r in resumable_tasks]))
# Lock the report for further changes (like for profiles)
self.__write_attribute__('beenRunBefore', True)
# Prioritize --> return a list of commits to do in order
self._task_lock.acquire()
- self._task_list = self.__prioritize_runs(task_tree, deployed_commit)
+ self._task_list = self.__prioritize_runs(task_tree, deployed_commit, resumable_tasks)
# Call the hook file, telling we started running
self.__call_hook__('start_running_tests')
@@ -763,7 +792,7 @@ class SmartEzbench:
self.__done_running__(runner)
return False
- self._task_current = e = self._task_list.pop(0)
+ self._task_current = e = self._task_list.popleft()
short_name=e.test[:80].rsplit('|', 1)[0]+'...'
self.__log(Criticality.DD,
"make {count} runs for test {test} using commit {commit}".format(count=e.rounds,
@@ -784,22 +813,43 @@ class SmartEzbench:
self._task_current.started()
for r in range(0, e.rounds):
self._task_lock.release()
+
try:
- time, cmd_output = runner.run(e.commit, e.test, False)
+ if e.resumeResultFile is not None:
+ time, cmd_output = runner.resume(e.commit, e.test, e.resumeResultFile, False)
+ else:
+ time, cmd_output = runner.run(e.commit, e.test, False)
+ self._task_lock.acquire()
except RunnerError as error:
- err_code = error.args[0]['err_code']
+ # Acquire the lock as we are about to modify the task list
+ self._task_lock.acquire()
+
# We got an error, let's see what we can do about it!
- if (err_code.value != RunnerErrorCode.NO_ERROR and
- err_code.value < RunnerErrorCode.COMP_DEP_UNK_ERROR.value):
+ err_code = error.args[0]['err_code']
+ err_str = error.args[0]['err_str']
+ if (err_code == RunnerErrorCode.CMD_TEST_EXEC_TYPE_UNSUPPORTED or
+ err_code == RunnerErrorCode.CMD_TEST_EXEC_TYPE_NEED_VALID_RESULT_FILE or
+ err_code == RunnerErrorCode.CMD_RESULT_ALREADY_COMPLETE):
+ # The result is un-resumable, schedule a full run at the end
+ self._task_list.append(TaskEntry(e.commit, e.test, 1))
+ elif err_code == RunnerErrorCode.REBOOT_NEEDED:
+ # TODO: have some sort of hooks here to warn the rest of the world
+ # that we are about to reboot
+ self._task_list = []
+ self._task_current = None
+ self.__log(Criticality.II, "Rebooting...")
+ runner.reboot()
+ sys.exit(0)
+ elif (err_code.value != RunnerErrorCode.NO_ERROR and
+ err_code.value < RunnerErrorCode.COMP_DEP_UNK_ERROR.value):
# Error we cannot do anything about, probably a setup issue
# Let's mark the run as aborted until the user resets it!
- self.__log(Criticality.EE, "The run returned the error {}".format(err_code))
+ self.__log(Criticality.EE, error.args[0]['err_code'])
self.set_running_mode(RunningMode.ERROR)
elif (err_code == RunnerErrorCode.COMPILATION_FAILED or
- err_code == RunnerErrorCode.DEPLOYMENT_FAILED):
+ err_code == RunnerErrorCode.DEPLOYMENT_FAILED):
# Cancel any other test on this commit
self._task_list = [x for x in self._task_list if not x.commit == e.commit]
- self._task_lock.acquire()
self._task_current = None
--
2.9.3
More information about the Ezbench-dev
mailing list