Mesa (main): ci/lava: Encapsulate job data in a class

GitLab Mirror gitlab-mirror at kemper.freedesktop.org
Thu Apr 28 07:34:16 UTC 2022


Module: Mesa
Branch: main
Commit: 84a5ea422826c678fb36eaa294b172f6387f01f1
URL:    http://cgit.freedesktop.org/mesa/mesa/commit/?id=84a5ea422826c678fb36eaa294b172f6387f01f1

Author: Guilherme Gallo <guilherme.gallo at collabora.com>
Date:   Wed Feb 16 18:06:20 2022 +0000

ci/lava: Encapsulate job data in a class

Less free-form passing stuff around, and also makes it easier to
implement log-based following in future.

The new class has:
- job log polling: This allows us to get rid of some more function-local
  state; the job now contains where we are, and the timeout etc is
  localised within the thing polling it.
- has-started detection into job class
- heartbeat logic to update the job instance state with the start time
  when the submitter begins to track the logs from the LAVA device

Besides:

- Split LAVA jobs and Mesa CI policy
- Update unit tests with LAVAJob class

Signed-off-by: Guilherme Gallo <guilherme.gallo at collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/15938>

---

 .gitlab-ci/lava/lava_job_submitter.py       | 195 ++++++++++++++++------------
 .gitlab-ci/tests/test_lava_job_submitter.py |  51 ++++++--
 2 files changed, 151 insertions(+), 95 deletions(-)

diff --git a/.gitlab-ci/lava/lava_job_submitter.py b/.gitlab-ci/lava/lava_job_submitter.py
index 1f9eb8bffcf..406a0dd3095 100755
--- a/.gitlab-ci/lava/lava_job_submitter.py
+++ b/.gitlab-ci/lava/lava_job_submitter.py
@@ -31,7 +31,6 @@ import time
 import traceback
 import urllib.parse
 import xmlrpc
-
 from datetime import datetime, timedelta
 from os import getenv
 
@@ -211,6 +210,62 @@ def _call_proxy(fn, *args):
             fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
 
 
+class MesaCIException(Exception):
+    pass
+
+
+class LAVAJob():
+    def __init__(self, proxy, definition):
+        self.job_id = None
+        self.proxy = proxy
+        self.definition = definition
+        self.last_log_line = 0
+        self.last_log_time = None
+        self.is_finished = False
+
+    def heartbeat(self):
+        self.last_log_time = datetime.now()
+
+    def validate(self):
+        try:
+            return _call_proxy(
+                self.proxy.scheduler.jobs.validate, self.definition, True
+            )
+        except MesaCIException:
+            return False
+
+    def submit(self):
+        try:
+            self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
+        except MesaCIException:
+            return False
+        return True
+
+    def cancel(self):
+        if self.job_id:
+            self.proxy.scheduler.jobs.cancel(self.job_id)
+
+    def is_started(self):
+        waiting_states = ["Submitted", "Scheduling", "Scheduled"]
+        job_state = _call_proxy(self.proxy.scheduler.job_state, self.job_id)
+        return job_state["job_state"] not in waiting_states
+
+    def get_logs(self):
+        try:
+            (finished, data) = _call_proxy(
+                self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
+            )
+            lines = yaml.load(str(data), Loader=loader(False))
+            self.is_finished = finished
+            if not lines:
+                return []
+            self.heartbeat()
+            self.last_log_line += len(lines)
+            return lines
+        except MesaCIException as mesa_exception:
+            fatal_err(f"Could not get LAVA job logs. Reason: {mesa_exception}")
+
+
 def get_job_results(proxy, job_id, test_suite, test_case):
     # Look for infrastructure errors and retry if we see them.
     results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
@@ -220,125 +275,103 @@ def get_job_results(proxy, job_id, test_suite, test_case):
         if "result" not in metadata or metadata["result"] != "fail":
             continue
         if 'error_type' in metadata and metadata['error_type'] == "Infrastructure":
-            print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
-            return False
+            raise MesaCIException("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
         if 'case' in metadata and metadata['case'] == "validate":
-            print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
-            return False
+            raise MesaCIException("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
 
     results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case)
     results = yaml.load(results_yaml, Loader=loader(False))
     if not results:
-        fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
+        raise MesaCIException("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
 
     print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result']))
     if results[0]['result'] != 'pass':
-        fatal_err("FAIL")
+        return False
 
     return True
 
-def wait_until_job_is_started(proxy, job_id):
-    print_log(f"Waiting for job {job_id} to start.")
-    current_state = "Submitted"
-    waiting_states = ["Submitted", "Scheduling", "Scheduled"]
-    while current_state in waiting_states:
-        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
-        job_state = _call_proxy(proxy.scheduler.job_state, job_id)
-        current_state = job_state["job_state"]
-
-    print_log(f"Job {job_id} started.")
-
-def follow_job_execution(proxy, job_id):
-    line_count = 0
-    finished = False
-    last_time_logs = datetime.now()
-    while not finished:
-        # `proxy.scheduler.jobs.logs` does not block, even when there is no
-        # new log to be fetched. To avoid dosing the LAVA dispatcher
-        # machine, let's add a sleep to save them some stamina.
-        time.sleep(LOG_POLLING_TIME_SEC)
-
-        (finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count)
-        if logs := yaml.load(str(data), Loader=loader(False)):
-            # Reset the timeout
-            last_time_logs = datetime.now()
-            for line in logs:
-                print("{} {}".format(line["dt"], line["msg"]))
-
-            line_count += len(logs)
-        else:
-            time_limit = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
-            if datetime.now() - last_time_logs > time_limit:
-                print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id))
-                return False
-
-    return True
 
-def show_job_data(proxy, job_id):
-    show = _call_proxy(proxy.scheduler.jobs.show, job_id)
+def show_job_data(job):
+    show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
     for field, value in show.items():
         print("{}\t: {}".format(field, value))
 
 
-def validate_job(proxy, job_file):
+def follow_job_execution(job):
     try:
-        return _call_proxy(proxy.scheduler.jobs.validate, job_file, True)
-    except:
-        return False
-
-def submit_job(proxy, job_file):
-    return _call_proxy(proxy.scheduler.jobs.submit, job_file)
+        job.submit()
+    except MesaCIException as mesa_exception:
+        fatal_err(f"Could not submit LAVA job. Reason: {mesa_exception}")
 
+    print_log(f"Waiting for job {job.job_id} to start.")
+    while not job.is_started():
+        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
+    print_log(f"Job {job.job_id} started.")
+
+    max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
+    # Start to check job's health
+    job.heartbeat()
+    while not job.is_finished:
+        # Poll to check for new logs, assuming that a prolonged period of
+        # silence means that the device has died and we should try it again
+        if datetime.now() - job.last_log_time > max_idle_time:
+            print_log(
+                f"No log output for {max_idle_time} seconds; assuming device has died, retrying"
+            )
+
+            raise MesaCIException(
+                f"LAVA job {job.job_id} does not respond for {max_idle_time}. Retry."
+            )
 
-def retriable_follow_job(proxy, yaml_file):
-    retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
-
-    while retry_count >= 0:
-        job_id = submit_job(proxy, yaml_file)
+        time.sleep(LOG_POLLING_TIME_SEC)
 
-        print_log("LAVA job id: {}".format(job_id))
+        new_lines = job.get_logs()
 
-        wait_until_job_is_started(proxy, job_id)
+        for line in new_lines:
+            print(line)
 
-        if not follow_job_execution(proxy, job_id):
-            print_log(f"Job {job_id} has timed out. Cancelling it.")
-            # Cancel the job as it is considered unreachable by Mesa CI.
-            proxy.scheduler.jobs.cancel(job_id)
+    show_job_data(job)
+    return get_job_results(job.proxy, job.job_id, "0_mesa", "mesa")
 
-            retry_count -= 1
-            continue
 
-        show_job_data(proxy, job_id)
-
-        if get_job_results(proxy, job_id, "0_mesa", "mesa") == True:
-            break
-    else:
-        # The script attempted all the retries. The job seemed to fail.
-        return False
+def retriable_follow_job(proxy, job_definition):
+    retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
 
-    return True
+    for attempt_no in range(1, retry_count + 2):
+        job = LAVAJob(proxy, job_definition)
+        try:
+            return follow_job_execution(job)
+        except MesaCIException as mesa_exception:
+            print_log(mesa_exception)
+            job.cancel()
+        finally:
+            print_log(f"Finished executing LAVA job in the attempt #{attempt_no}")
+
+    fatal_err(
+        "Job failed after it exceeded the number of "
+        f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries."
+    )
 
 
 def main(args):
     proxy = setup_lava_proxy()
 
-    yaml_file = generate_lava_yaml(args)
+    job_definition = generate_lava_yaml(args)
 
     if args.dump_yaml:
-        print(hide_sensitive_data(generate_lava_yaml(args)))
+        print("LAVA job definition (YAML):")
+        print(hide_sensitive_data(job_definition))
 
     if args.validate_only:
-        ret = validate_job(proxy, yaml_file)
+        job = LAVAJob(proxy, job_definition)
+        ret = job.validate()
         if not ret:
             fatal_err("Error in LAVA job definition")
         print("LAVA job definition validated successfully")
         return
 
-    if not retriable_follow_job(proxy, yaml_file):
-        fatal_err(
-            "Job failed after it exceeded the number of"
-            f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries."
-        )
+    ret = retriable_follow_job(proxy, job_definition)
+    sys.exit(ret)
 
 
 def create_parser():
diff --git a/.gitlab-ci/tests/test_lava_job_submitter.py b/.gitlab-ci/tests/test_lava_job_submitter.py
index 0ed19efeea4..43896db20fe 100644
--- a/.gitlab-ci/tests/test_lava_job_submitter.py
+++ b/.gitlab-ci/tests/test_lava_job_submitter.py
@@ -25,23 +25,27 @@
 import xmlrpc.client
 from contextlib import nullcontext as does_not_raise
 from datetime import datetime
-from itertools import repeat
-from typing import Tuple
+from itertools import cycle, repeat
+from typing import Iterable, Union, Generator, Tuple
 from unittest.mock import MagicMock, patch
 
 import pytest
 import yaml
 from freezegun import freeze_time
 from lava.lava_job_submitter import (
+    NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
     DEVICE_HANGING_TIMEOUT_SEC,
     follow_job_execution,
     hide_sensitive_data,
     retriable_follow_job,
+    LAVAJob
 )
 
+NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
 
-def jobs_logs_response(finished=False, msg=None) -> Tuple[bool, str]:
-    timed_msg = {"dt": str(datetime.now()), "msg": "New message"}
+
+def jobs_logs_response(finished=False, msg=None, lvl="target") -> Tuple[bool, str]:
+    timed_msg = {"dt": str(datetime.now()), "msg": "New message", "lvl": lvl}
     logs = [timed_msg] if msg is None else msg
 
     return finished, yaml.safe_dump(logs)
@@ -114,17 +118,35 @@ def frozen_time(mock_sleep):
 @pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
 def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
     with pytest.raises(exception):
-        follow_job_execution(mock_proxy(side_effect=exception), "")
+        proxy = mock_proxy(side_effect=exception)
+        job = LAVAJob(proxy, '')
+        follow_job_execution(job)
+
 
+def level_generator():
+    # Tests all known levels by default
+    yield from cycle(( "results", "feedback", "warning", "error", "debug", "target" ))
 
-def generate_n_logs(n=1, tick_sec=1):
+def generate_n_logs(n=1, tick_fn: Union[Generator, Iterable[int], int]=1, level_fn=level_generator):
     """Simulate a log partitionated in n components"""
+    level_gen = level_fn()
+
+    if isinstance(tick_fn, Generator):
+        tick_gen = tick_fn
+    elif isinstance(tick_fn, Iterable):
+        tick_gen = cycle(tick_fn)
+    else:
+        tick_gen = cycle((tick_fn,))
+
     with freeze_time(datetime.now()) as time_travel:
+        tick_sec: int = next(tick_gen)
         while True:
             # Simulate a scenario where the target job is waiting for being started
             for _ in range(n - 1):
+                level: str = next(level_gen)
+
                 time_travel.tick(tick_sec)
-                yield jobs_logs_response(finished=False, msg=[])
+                yield jobs_logs_response(finished=False, msg=[], lvl=level)
 
             time_travel.tick(tick_sec)
             yield jobs_logs_response(finished=True)
@@ -136,23 +158,23 @@ XMLRPC_FAULT = xmlrpc.client.Fault(0, "test")
 PROXY_SCENARIOS = {
     "finish case": (generate_n_logs(1), does_not_raise(), True),
     "works at last retry": (
-        generate_n_logs(n=3, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1),
+        generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS, tick_fn=[ DEVICE_HANGING_TIMEOUT_SEC + 1 ] * NUMBER_OF_RETRIES_TIMEOUT_DETECTION + [1]),
         does_not_raise(),
         True,
     ),
     "timed out more times than retry attempts": (
-        generate_n_logs(n=4, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1),
-        does_not_raise(),
+        generate_n_logs(n=4, tick_fn=DEVICE_HANGING_TIMEOUT_SEC + 1),
+        pytest.raises(SystemExit),
         False,
     ),
     "long log case, no silence": (
-        generate_n_logs(n=1000, tick_sec=0),
+        generate_n_logs(n=1000, tick_fn=0),
         does_not_raise(),
         True,
     ),
     "very long silence": (
-        generate_n_logs(n=4, tick_sec=100000),
-        does_not_raise(),
+        generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000),
+        pytest.raises(SystemExit),
         False,
     ),
     # If a protocol error happens, _call_proxy will retry without affecting timeouts
@@ -181,7 +203,8 @@ def test_retriable_follow_job(
     mock_sleep, side_effect, expectation, has_finished, mock_proxy
 ):
     with expectation:
-        result = retriable_follow_job(mock_proxy(side_effect=side_effect), "")
+        proxy = mock_proxy(side_effect=side_effect)
+        result = retriable_follow_job(proxy, "")
         assert has_finished == result
 
 



More information about the mesa-commit mailing list