[Piglit] [RFC 09/10] framework: allow specifying the number of jobs for concurrency

Nicolai Hähnle nhaehnle at gmail.com
Wed Oct 11 10:26:58 UTC 2017


From: Nicolai Hähnle <nicolai.haehnle at amd.com>

The default remains the same: number of CPUs. But on systems with lots of
cores but comparatively little (V)RAM it can make sense to reduce the
number of jobs to avoid random failures caused by out-of-memory conditions.
---
 framework/options.py      |  1 +
 framework/profile.py      |  7 +++++--
 framework/programs/run.py | 24 ++++++++++++++++++++++--
 3 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/framework/options.py b/framework/options.py
index 211159a45..b6ff2b406 100644
--- a/framework/options.py
+++ b/framework/options.py
@@ -51,20 +51,21 @@ class _Options(object):  # pylint: disable=too-many-instance-attributes
     env -- environment variables set for each test before run
     deqp_mustpass -- True to enable the use of the deqp mustpass list feature.
     """
 
     def __init__(self):
         self.execute = True
         self.valgrind = False
         self.sync = False
         self.deqp_mustpass = False
         self.process_isolation = True
+        self.jobs = -1
 
         # env is used to set some base environment variables that are not going
         # to change across runs, without sending them to os.environ which is
         # fickle and easy to break
         self.env = {
             'PIGLIT_SOURCE_DIR':
                 os.environ.get(
                     'PIGLIT_SOURCE_DIR',
                     os.path.abspath(os.path.join(os.path.dirname(__file__),
                                                  '..')))
diff --git a/framework/profile.py b/framework/profile.py
index 1fadb39a8..a4f18a456 100644
--- a/framework/profile.py
+++ b/framework/profile.py
@@ -414,40 +414,41 @@ class WorkItemBuilder(object):
                 max_tests = runner.max_tests or len(tests)
                 for idx in range(0, len(tests), max_tests):
                     self._add_workitem(workitems, runner, tests[idx:idx + max_tests])
 
         # Run large work items first
         workitems.sort(key=lambda w: len(w.tests), reverse=True)
         return workitems
 
 
 def run(profiles, logger, backend, concurrency='some',
-        process_isolation=True, results=None):
+        process_isolation=True, results=None, jobs=-1):
     """Runs all tests using Thread pool.
 
     When called this method will flatten out self.tests into self.test_list,
     then will prepare a logger, and begin executing tests through it's Thread
     pools.
 
     Based on the value of concurrency it will either run all the tests
     concurrently, all serially, or first the thread safe tests then the
     serial tests.
 
     Finally it will print a final summary of the tests.
 
     Arguments:
     profiles -- a list of Profile instances.
     logger   -- a log.LogManager instance.
     backend  -- a results.Backend derived instance.
     results  -- list of TestResult instances from a previous interrupted run
                 using the same profiles and settings (resulting in the same
                 workitems).
+    jobs     -- maximum number of concurrent jobs. Use os.cpu_count() by default
     """
     chunksize = 1
 
     # The logger needs to know how many tests are running. Because of filters
     # there's no way to do that without making a concrete list out of the
     # filters profiles.
     workitem_builder = WorkItemBuilder(process_isolation=process_isolation,
                                        results=results)
     profiles = [(p, workitem_builder(p.itertests())) for p in profiles]
     log = LogManager(logger, sum(len(l) for _, l in profiles))
@@ -496,21 +497,23 @@ def run(profiles, logger, backend, concurrency='some',
             # pool
             run_threads(single, profile, workitems,
                         lambda x: not x.run_concurrent)
         profile.teardown()
 
     # Multiprocessing.dummy is a wrapper around Threading that provides a
     # multiprocessing compatible API
     #
     # The default value of pool is the number of virtual processor cores
     single = multiprocessing.dummy.Pool(1)
-    multi = multiprocessing.dummy.Pool()
+    if not jobs or jobs < 0:
+        jobs = os.cpu_count()
+    multi = multiprocessing.dummy.Pool(jobs)
 
     try:
         for p in profiles:
             run_profile(*p)
 
         for pool in [single, multi]:
             pool.close()
             pool.join()
     finally:
         log.get().summary()
diff --git a/framework/programs/run.py b/framework/programs/run.py
index 03562cd97..bf8a0e612 100644
--- a/framework/programs/run.py
+++ b/framework/programs/run.py
@@ -201,20 +201,28 @@ def _run_parser(input_):
                         dest='process_isolation',
                         action='store',
                         type=booltype,
                         default=core.PIGLIT_CONFIG.safe_get(
                             'core', 'process isolation', 'true'),
                         metavar='<bool>',
                         help='Set this to allow tests to run without process '
                              'isolation. This allows, but does not require, '
                              'tests to run multiple tests per process. '
                              'This value can also be set in piglit.conf.')
+    parser.add_argument('-j', '--jobs',
+                        dest='jobs',
+                        action='store',
+                        type=int,
+                        default=core.PIGLIT_CONFIG.safe_get(
+                            'core', 'jobs', '-1'),
+                        help='Set the maximum number of jobs to run concurrently. '
+                             'By default, the reported number of CPUs is used.')
     parser.add_argument("--ignore-missing",
                         dest="ignore_missing",
                         action="store_true",
                         help="missing tests are considered as 'notrun'")
     parser.add_argument("test_profile",
                         metavar="<Profile path(s)>",
                         nargs='+',
                         help="Path to one or more test profiles to run. "
                              "If more than one profile is provided then they "
                              "will be merged.")
@@ -289,20 +297,21 @@ def run(input_):
     # isn't reliable with threaded run
     if args.dmesg or args.monitored:
         args.concurrency = "none"
 
     # Pass arguments into Options
     options.OPTIONS.execute = args.execute
     options.OPTIONS.valgrind = args.valgrind
     options.OPTIONS.sync = args.sync
     options.OPTIONS.deqp_mustpass = args.deqp_mustpass
     options.OPTIONS.process_isolation = args.process_isolation
+    options.OPTIONS.jobs = args.jobs
 
     # Set the platform to pass to waffle
     options.OPTIONS.env['PIGLIT_PLATFORM'] = args.platform
 
     # Change working directory to the root of the piglit directory
     piglit_dir = path.dirname(path.realpath(sys.argv[0]))
     os.chdir(piglit_dir)
 
     # If the results directory already exists and if overwrite was set, then
     # clear the directory. If it wasn't set, then raise fatal error.
@@ -359,21 +368,22 @@ def run(input_):
         if args.exclude_tests:
             p.filters.append(profile.RegexFilter(args.exclude_tests,
                                                  inverse=True))
         if args.include_tests:
             p.filters.append(profile.RegexFilter(args.include_tests))
 
     time_elapsed = TimeAttribute(start=time.time())
 
     profile.run(profiles, args.log_level, backend,
                 concurrency=args.concurrency,
-                process_isolation=args.process_isolation)
+                process_isolation=args.process_isolation,
+                jobs=args.jobs)
 
     time_elapsed.end = time.time()
     backend.finalize({'time_elapsed': time_elapsed.to_json()})
 
     print('Thank you for running Piglit!\n'
           'Results have been written to ' + args.results_path)
 
 
 @exceptions.handler
 def resume(input_):
@@ -384,29 +394,38 @@ def resume(input_):
                         help="Path to results folder")
     parser.add_argument("-f", "--config",
                         dest="config_file",
                         type=argparse.FileType("r"),
                         help="Optionally specify a piglit config file to use. "
                              "Default is piglit.conf")
     parser.add_argument("-n", "--no-retry",
                         dest="no_retry",
                         action="store_true",
                         help="Do not retry incomplete tests")
+    parser.add_argument('-j', '--jobs',
+                        dest='jobs',
+                        action='store',
+                        type=int,
+                        default=core.PIGLIT_CONFIG.safe_get(
+                            'core', 'jobs', '-1'),
+                        help='Set the maximum number of jobs to run concurrently. '
+                             'By default, the reported number of CPUs is used.')
     args = parser.parse_args(input_)
     _disable_windows_exception_messages()
 
     results = backends.load(args.results_path)
     options.OPTIONS.execute = results.options['execute']
     options.OPTIONS.valgrind = results.options['valgrind']
     options.OPTIONS.sync = results.options['sync']
     options.OPTIONS.deqp_mustpass = results.options['deqp_mustpass']
     options.OPTIONS.process_isolation = results.options['process_isolation']
+    options.OPTIONS.jobs = args.jobs
 
     core.get_config(args.config_file)
 
     options.OPTIONS.env['PIGLIT_PLATFORM'] = results.options['platform']
 
     results.options['env'] = core.collect_system_info()
     results.options['name'] = results.name
 
     # Resume only works with the JSON backend
     backend = backends.get_backend('json')(
@@ -441,19 +460,20 @@ def resume(input_):
             p.forced_test_list = results.options['forced_test_list']
 
     # This is resumed, don't bother with time since it won't be accurate anyway
     try:
         profile.run(
             profiles,
             results.options['log_level'],
             backend,
             concurrency=results.options['concurrent'],
             process_isolation=results.options['process_isolation'],
-            results=results.results)
+            results=results.results,
+            jobs=args.jobs)
     except exceptions.PiglitUserError as e:
         if str(e) != 'no matching tests':
             raise
 
     backend.finalize()
 
     print("Thank you for running Piglit!\n"
           "Results have been written to {0}".format(args.results_path))
-- 
2.11.0



More information about the Piglit mailing list