[Piglit] [Patch V3] Simplify piglit threading

Dylan Baker baker.dylan.c at gmail.com
Thu Jan 9 14:20:12 PST 2014


This patch simplifies threading in piglit by removing the hand-rolled
threadpool, and instead using the Pool class from multiprocessing.dummy.
This provides a map interface, allowing for very clear succinct code.

The previous implementation ran all tests out of thread pools, a serial
pool and a multi-threaded pool. This patch does the same thing for a
couple of reasons. First, the obvious solution is to use the map()
builtin for serial tests. However, map in python3 returns an iterator
instead of a list so calling map(f, x) will not actually run f(x) until
something tries to use those values. This would require considerable
restructuring to work around. Second, that could easily be split out
into another patch, and limits the number of changes in this patch.

Multiproccessing.dummy is a wrapper around the Threading module,
providing the multiproccessing API, but with threads instead of
processes.

V2: - Renamed run() helper to test()
    - use multiprocessing.dummy.Pool() default thread count value, which
      is equal to multiproccessing.cpu_count(), instead of cpu_count()
      explicitly
V3: - use Pool.imap() instead of Pool.map(), it is slightly faster.

Signed-off-by: Dylan Baker <baker.dylan.c at gmail.com>
---
 framework/core.py       | 54 +++++++++++++++++++++------------------
 framework/threadpool.py | 67 -------------------------------------------------
 2 files changed, 30 insertions(+), 91 deletions(-)
 delete mode 100644 framework/threadpool.py

diff --git a/framework/core.py b/framework/core.py
index 8bcda5b..4080c91 100644
--- a/framework/core.py
+++ b/framework/core.py
@@ -36,14 +36,13 @@ from log import log
 from cStringIO import StringIO
 from textwrap import dedent
 from threads import synchronized_self
-import threading
 import multiprocessing
+import multiprocessing.dummy
 try:
     import simplejson as json
 except ImportError:
     import json
 
-from threadpool import ThreadPool
 import status
 
 __all__ = ['Environment',
@@ -566,31 +565,38 @@ class TestProfile:
 
         self.prepare_test_list(env)
 
-        # If concurrency is set to 'all' run all tests out of a concurrent
-        # threadpool, if it's none, then run evey test serially. otherwise mix
-        # and match them
+        def test(pair):
+            """ Function to call test.execute from .map
+
+            adds env and json_writer which are needed by Test.execute()
+
+            """
+            name, test = pair
+            test.execute(env, name, json_writer)
+
+        # Multiprocessing.dummy is a wrapper around Threading that provides a
+        # multiprocessing compatible API
+        #
+        # The default value of pool is the number of virtual processor cores
+        single = multiprocessing.dummy.Pool(1)
+        multi = multiprocessing.dummy.Pool()
+        chunksize = 50
+
         if env.concurrent == "all":
-            pool = ThreadPool(multiprocessing.cpu_count())
-            for (path, test) in self.test_list.items():
-                pool.add(test.execute, (env, path, json_writer))
-            pool.join()
+            multi.imap(test, self.test_list.iteritems(), chunksize)
         elif env.concurrent == "none":
-            pool = ThreadPool(1)
-            for (path, test) in self.test_list.items():
-                pool.add(test.execute, (env, path, json_writer))
-            pool.join()
+            single.imap(test, self.test_list.iteritems(), chunksize)
         else:
-            pool = ThreadPool(multiprocessing.cpu_count())
-            for (path, test) in self.test_list.items():
-                if test.runConcurrent:
-                    pool.add(test.execute, (env, path, json_writer))
-            pool.join()
-
-            pool = ThreadPool(1)
-            for (path, test) in self.test_list.items():
-                if not test.runConcurrent:
-                    pool.add(test.execute, (env, path, json_writer))
-            pool.join()
+            # Filter and return only thread safe tests to the threaded pool
+            multi.imap(test, (x for x in self.test_list.iteritems() if
+                              x[1].runConcurrent), chunksize)
+            # Filter and return the non thread safe tests to the single pool
+            single.imap(test, (x for x in self.test_list.iteritems() if not
+                               x[1].runConcurrent), chunksize)
+
+        # Close and join the pools
+        map(lambda x: x.close(), [multi, single])
+        map(lambda x: x.join(), [multi, single])
 
     def remove_test(self, test_path):
         """Remove a fully qualified test from the profile.
diff --git a/framework/threadpool.py b/framework/threadpool.py
deleted file mode 100644
index 5d1fc56..0000000
--- a/framework/threadpool.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# Copyright (c) 2013 Intel Corporation
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the "Software"),
-# to deal in the Software without restriction, including without limitation
-# the rights to use, copy, modify, merge, publish, distribute, sublicense,
-# and/or sell copies of the Software, and to permit persons to whom the
-# Software is furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice (including the next
-# paragraph) shall be included in all copies or substantial portions of the
-# Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
-# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-# This code is based on the MIT licensed code by Emilio Monti found here:
-# http://code.activestate.com/recipes/577187-python-thread-pool/
-
-from Queue import Queue
-from threading import Thread
-
-
-class Worker(Thread):
-    """
-    Simple worker thread
-
-    This worker simply consumes tasks off of the queue until it is empty and
-    then waits for more tasks.
-    """
-
-    def __init__(self, queue):
-        Thread.__init__(self)
-        self.queue = queue
-        self.daemon = True
-        self.start()
-
-    def run(self):
-        """ This method is called in the constructor by self.start() """
-        while True:
-            func, args = self.queue.get()
-            func(*args)  # XXX: Does this need to be try/except-ed?
-            self.queue.task_done()
-
-
-class ThreadPool(object):
-    """
-    A simple ThreadPool class that maintains a Queue object and a set of Worker
-    threads.
-    """
-
-    def __init__(self, thread_count):
-        self.queue = Queue(thread_count)
-        self.threads = [Worker(self.queue) for _ in xrange(thread_count)]
-
-    def add(self, func, args):
-        """ Add a function and it's arguments to the queue as a tuple """
-        self.queue.put((func, args))
-
-    def join(self):
-        """ Block until self.queue is empty """
-        self.queue.join()
-- 
1.8.5.2



More information about the Piglit mailing list