[Piglit] [PATCH v2 5/16] framework: Add a Mixin class for running multiple tests in a single process
Dylan Baker
dylan at pnwbakers.com
Fri Sep 30 21:17:38 UTC 2016
This Mixin makes writing classes for handling tests that run multiple
tests in a single process simpler. It does this through the use of the
subtest feature. It makes it possible to implement two new methods, and
an aware interpret_result method and have support for this feature,
including a cherry-like resume feature that starts again after a test
crashes, without rerunning the crashed test.
Signed-off-by: Dylan Baker <dylanx.c.baker at intel.com>
---
framework/test/base.py | 147 ++++++++++++++++++++++++++-
framework/test/deqp.py | 4 +-
unittests/framework/test/test_base.py | 138 ++++++++++++++++++++++++-
3 files changed, 280 insertions(+), 9 deletions(-)
diff --git a/framework/test/base.py b/framework/test/base.py
index b667b15..224ca61 100644
--- a/framework/test/base.py
+++ b/framework/test/base.py
@@ -40,6 +40,7 @@ import six
from six.moves import range
from framework import exceptions, options
+from framework import status
from framework.results import TestResult
# We're doing some special crazy here to make timeouts work on python 2. pylint
@@ -258,7 +259,9 @@ class Test(object):
try:
self.is_skip()
except TestIsSkip as e:
- self.result.result = 'skip'
+ self.result.result = status.SKIP
+ for each in six.iterkeys(self.result.subtests):
+ self.result.subtests[each] = status.SKIP
self.result.out = e.reason
self.result.returncode = None
return
@@ -267,6 +270,8 @@ class Test(object):
self._run_command()
except TestRunError as e:
self.result.result = six.text_type(e.status)
+ for each in six.iterkeys(self.result.subtests):
+ self.result.subtests[each] = six.text_type(e.status)
self.result.out = six.text_type(e)
self.result.returncode = None
return
@@ -282,13 +287,18 @@ class Test(object):
"""
pass
- def _run_command(self):
+ def _run_command(self, **kwargs):
""" Run the test command and get the result
This method sets environment options, then runs the executable. If the
executable isn't found it sets the result to skip.
"""
+ # This allows the ReducedProcessMixin to work without having to whack
+ # self.command (which should be treated as immutable), but is
+ # considered private.
+ command = kwargs.pop('_command', self.command)
+
# Setup the environment for the test. Environment variables are taken
# from the following sources, listed in order of increasing precedence:
#
@@ -314,7 +324,7 @@ class Test(object):
fullenv = {f(k): f(v) for k, v in _base}
try:
- proc = subprocess.Popen(self.command,
+ proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=self.cwd,
@@ -382,7 +392,7 @@ class WindowResizeMixin(object):
see: https://bugzilla.gnome.org/show_bug.cgi?id=680214
"""
- def _run_command(self):
+ def _run_command(self, *args, **kwargs):
"""Run a test up 5 times when window resize is detected.
Rerun the command up to 5 times if the window size changes, if it
@@ -391,7 +401,7 @@ class WindowResizeMixin(object):
"""
for _ in range(5):
- super(WindowResizeMixin, self)._run_command()
+ super(WindowResizeMixin, self)._run_command(*args, **kwargs)
if "Got spurious window resize" not in self.result.out:
return
@@ -439,3 +449,130 @@ class ValgrindMixin(object):
else:
# Test passed but has valgrind errors.
self.result.result = 'fail'
+
+
+ at six.add_metaclass(abc.ABCMeta)
+class ReducedProcessMixin(object):
+ """This Mixin simplifies writing Test classes that run more than one test
+ in a single process.
+
+ Although one of the benefits of piglit is it's process isolation, there are
+ times that process isolation is too expensive for day to day runs, and
+ running more than one test in a single process is a valid trade-off for
+ decreased run times. This class helps to ease writing a Test class for such
+ a purpose, while not suffering all of the drawback of the approach.
+
+ The first way that this helps is that it provides crash detection and
+ recovery, allowing a single subtest to crash
+ """
+
+ def __init__(self, command, subtests=None, **kwargs):
+ assert subtests # This covers both "not None" and len(subtests) > 1
+ super(ReducedProcessMixin, self).__init__(command, **kwargs)
+ self._expected = subtests
+ self._populate_subtests()
+
+ def __find_sub(self):
+ """Helper for getting the next index."""
+ return len([l for l in self.result.out.split('\n')
+ if self._is_subtest(l)])
+
+ @staticmethod
+ def _subtest_name(test):
+ """If the name provided isn't the subtest name, this method does."""
+ return test
+
+ def _stop_status(self):
+ """This method returns the status of the test that stopped the run.
+
+ By default this will return status.CRASH, but this may not be suitable
+ for some suites, which may require special considerations and need to
+ require a different status in some cases, like SKIP.
+ """
+ return status.CRASH
+
+ def _run_command(self, *args, **kwargs):
+ """Run the command until all of the subtests have completed or crashed.
+
+ This method will try to run all of the subtests, resuming the run if
+ it's interrupted, and combining the stdout and stderr attributes
+ together for parsing later. I will separate those values with
+ "\n\n====RESUME====\n\n".
+ """
+ super(ReducedProcessMixin, self)._run_command(*args, **kwargs)
+
+ if not self._is_cherry():
+ returncode = self.result.returncode
+ out = [self.result.out]
+ err = [self.result.err]
+ cur_sub = self.__find_sub() or 1
+ last = len(self._expected)
+
+ while cur_sub < last:
+ self.result.subtests[
+ self._subtest_name(self._expected[cur_sub - 1])] = \
+ self._stop_status()
+
+ super(ReducedProcessMixin, self)._run_command(
+ _command=self._resume(cur_sub) + list(args), **kwargs)
+
+ out.append(self.result.out)
+ err.append(self.result.err)
+
+ # If the index is 0 the next test failed without printing a
+ # name, increase by 1 so that test will be marked crash and we
+ # don't get stuck in an infinite loop, otherwise return the
+ # number of tests that did complete.
+ cur_sub += self.__find_sub() or 1
+
+ if not self._is_cherry():
+ self.result.subtests[
+ self._subtest_name(self._expected[cur_sub - 1])] = \
+ self._stop_status()
+
+ # Restore and keep the original returncode (so that it remains a
+ # non-pass, since only one test might fail and the resumed part
+ # might return 0)
+ self.result.returncode = returncode
+ self.result.out = '\n\n====RESUME====\n\n'.join(out)
+ self.result.err = '\n\n====RESUME====\n\n'.join(err)
+
+ def _is_cherry(self):
+ """Method used to determine if rerunning is required.
+
+ If this returns True then the rerun path will be entered, otherwise
+ _run_command is effectively a bare call to super().
+
+ Classes using this mixin may need to overwrite this if the binary
+ they're calling can stop prematurely but return 0.
+ """
+ return self.result.returncode == 0
+
+ def _populate_subtests(self):
+ """Default implementation of subtest prepopulation.
+
+ It may be necissary to override this depending on the subtest format.
+ """
+ self.result.subtests.update({x: status.NOTRUN for x in self._expected})
+
+ @abc.abstractmethod
+ def _resume(self, current):
+ """Method that defines how to resume the case if it crashes.
+
+ This method will be provided with a completed count, which is the index
+ into self._expected of the first subtest that hasn't been run. This
+ method should return the command to restart, and the ReduceProcessMixin
+ will handle actually restarting the the process with the new command.
+ """
+
+ @abc.abstractmethod
+ def _is_subtest(self, line):
+ """Determines if a line in stdout contains a subtest name.
+
+ This method is used during the resume detection phase of the
+ _run_command method to determine how many subtests have successfully
+ been run.
+
+ Should simply return True if the line reprents a test starting, or
+ False if it does not.
+ """
diff --git a/framework/test/deqp.py b/framework/test/deqp.py
index c3452b4..5b53efd 100644
--- a/framework/test/deqp.py
+++ b/framework/test/deqp.py
@@ -216,10 +216,10 @@ class DEQPBaseTest(Test):
if self.result.result == 'notrun':
self.result.result = 'fail'
- def _run_command(self):
+ def _run_command(self, *args, **kwargs):
"""Rerun the command if X11 connection failure happens."""
for _ in range(5):
- super(DEQPBaseTest, self)._run_command()
+ super(DEQPBaseTest, self)._run_command(*args, **kwargs)
if "FATAL ERROR: Failed to open display" not in self.result.err:
return
diff --git a/unittests/framework/test/test_base.py b/unittests/framework/test/test_base.py
index ba7719d..6b0c299 100644
--- a/unittests/framework/test/test_base.py
+++ b/unittests/framework/test/test_base.py
@@ -43,7 +43,7 @@ from framework.test import base
from ..test_status import PROBLEMS
from .. import skip
-# pylint: disable=invalid-name,no-self-use
+# pylint: disable=invalid-name,no-self-use,protected-access
class _Test(base.Test):
@@ -297,7 +297,7 @@ class TestWindowResizeMixin(object):
super(Mixin, self).__init__(*args, **kwargs)
self.__return_spurious = True
- def _run_command(self):
+ def _run_command(self, *args, **kwargs): # pylint: disable=unused-argument
self.result.returncode = None
# IF this is run only once we'll have "got spurious window resize"
@@ -407,3 +407,137 @@ class TestValgrindMixin(object):
test.result.returncode = 1
test.interpret_result()
assert test.result.result is status.FAIL
+
+
+class TestReducedProcessMixin(object):
+ """Tests for the ReducedProcessMixin class."""
+
+ class MPTest(base.ReducedProcessMixin, _Test):
+ def _resume(self, current):
+ return [self.command[0]] + self._expected[current:]
+
+ def _is_subtest(self, line):
+ return line.startswith('TEST')
+
+ def test_populate_subtests(self):
+ test = self.MPTest(['foobar'], subtests=['a', 'b', 'c'])
+ assert set(test.result.subtests.keys()) == {'a', 'b', 'c'}
+
+ class TestRunCommand(object):
+ """Tests for the _run_command method."""
+
+ @pytest.fixture(scope='module')
+ def test_class(self):
+ """Defines a test class that uses generators to ease testing."""
+ class _Shim(object):
+ """This shim goes between the Mixin and the Test class and
+ provides a way to set the output of the test.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(_Shim, self).__init__(*args, **kwargs)
+ self.gen_rcode = None
+ self.gen_out = None
+ self.get_err = None
+
+ def _run_command(self, *args, **kwargs): # pylint: disable=unused-argument
+ # pylint: disable=no-member
+ self.result.returncode = next(self.gen_rcode)
+ self.result.out = next(self.gen_out)
+ self.result.err = next(self.gen_err)
+
+ class Test(base.ReducedProcessMixin, _Shim, _Test):
+ """The actual Class returned by the fixture.
+
+ This class implements the abstract bits from
+ ReducedProcessMixin, and inserts the _Shim class. The
+ _is_subtest method is implemented such that any line starting
+ with SUBTEST is a subtest.
+ """
+
+ def _is_subtest(self, line):
+ return line.startswith('SUBTEST')
+
+ def _resume(self, cur, **kwargs): # pylint: disable=unused-argument
+ return self._expected[cur:]
+
+ def interpret_result(self):
+ name = None
+
+ for line in self.result.out.split('\n'):
+ if self._is_subtest(line):
+ name = line[len('SUBTEST: '):]
+ elif line.startswith('RESULT: '):
+ self.result.subtests[name] = line[len('RESULT: '):]
+ name = None
+
+ return Test
+
+ def test_result(self, test_class):
+ """Test result attributes."""
+ test = test_class(['foobar'], ['a', 'b'])
+ test.gen_out = iter(['SUBTEST: a', 'SUBTEST: b'])
+ test.gen_err = iter(['err output', 'err output'])
+ test.gen_rcode = iter([2, 0])
+ test._run_command()
+ assert test.result.out == \
+ 'SUBTEST: a\n\n====RESUME====\n\nSUBTEST: b'
+ assert test.result.err == \
+ 'err output\n\n====RESUME====\n\nerr output'
+ assert test.result.returncode == 2
+
+ @pytest.mark.timeout(5)
+ def test_infinite_loop(self, test_class):
+ """Test that we don't get into an infinite loop."""
+ test = test_class(['foobar'], ['a', 'b'])
+ test.gen_out = iter(['a', 'a'])
+ test.gen_err = iter(['a', 'a'])
+ test.gen_rcode = iter([1, 1])
+ test._run_command()
+
+ def test_crash_first(self, test_class):
+ """Handles the first test crashing."""
+ test = test_class(['foo'], ['a', 'b'])
+ test.gen_out = iter(['', 'SUBTEST: a'])
+ test.gen_err = iter(['', ''])
+ test.gen_rcode = iter([1, 0])
+
+ # Since interpret_result isn't called this would normally be left
+ # as NOTRUN, but we want to ensure that _run_command isn't mucking
+ # with it, so we set it to this PASS, which acts as a sentinal
+ test.result.subtests['b'] = status.PASS
+ test._run_command()
+
+ assert test.result.subtests['a'] is status.CRASH
+ assert test.result.subtests['b'] is status.PASS
+
+ def test_middle_crash(self, test_class):
+ """handle the final subtest crashing."""
+ test = test_class(['foo'], ['a', 'b', 'c'])
+ test.gen_out = iter(['SUBTEST: a\nRESULT: pass\nSUBTEST: b\n',
+ 'SUBTEST: c\nRESULT: pass\n'])
+ test.gen_err = iter(['', ''])
+ test.gen_rcode = iter([1, 0])
+
+ test._run_command()
+ test.interpret_result()
+
+ assert test.result.subtests['a'] == status.PASS
+ assert test.result.subtests['b'] == status.CRASH
+ assert test.result.subtests['c'] == status.PASS
+
+ def test_final_crash(self, test_class):
+ """handle the final subtest crashing."""
+ test = test_class(['foo'], ['a', 'b', 'c'])
+ test.gen_out = iter(['SUBTEST: a\nRESULT: pass\n'
+ 'SUBTEST: b\nRESULT: pass\n'
+ 'SUBTEST: c\n'])
+ test.gen_err = iter([''])
+ test.gen_rcode = iter([1])
+
+ test._run_command()
+ test.interpret_result()
+
+ assert test.result.subtests['a'] == status.PASS
+ assert test.result.subtests['b'] == status.PASS
+ assert test.result.subtests['c'] == status.CRASH
--
git-series 0.8.10
More information about the Piglit
mailing list