[Piglit] [Patch V3] Simplify piglit threading

Kenneth Graunke kenneth at whitecape.org
Thu Jan 9 17:39:40 PST 2014


On 01/09/2014 02:20 PM, Dylan Baker wrote:
> This patch simplifies threading in piglit by removing the hand-rolled
> threadpool, and instead using the Pool class from multiprocessing.dummy.
> This provides a map interface, allowing for very clear succinct code.
> 
> The previous implementation ran all tests out of thread pools, a serial
> pool and a multi-threaded pool. This patch does the same thing for a
> couple of reasons. First, the obvious solution is to use the map()
> builtin for serial tests. However, map in python3 returns an iterator
> instead of a list so calling map(f, x) will not actually run f(x) until
> something tries to use those values. This would require considerable
> restructuring to work around. Second, that could easily be split out
> into another patch, and limits the number of changes in this patch.
> 
> Multiproccessing.dummy is a wrapper around the Threading module,
> providing the multiproccessing API, but with threads instead of
> processes.
> 
> V2: - Renamed run() helper to test()
>     - use multiprocessing.dummy.Pool() default thread count value, which
>       is equal to multiproccessing.cpu_count(), instead of cpu_count()
>       explicitly
> V3: - use Pool.imap() instead of Pool.map(), it is slightly faster.
> 
> Signed-off-by: Dylan Baker <baker.dylan.c at gmail.com>
> ---
>  framework/core.py       | 54 +++++++++++++++++++++------------------
>  framework/threadpool.py | 67 -------------------------------------------------
>  2 files changed, 30 insertions(+), 91 deletions(-)
>  delete mode 100644 framework/threadpool.py
> 
> diff --git a/framework/core.py b/framework/core.py
> index 8bcda5b..4080c91 100644
> --- a/framework/core.py
> +++ b/framework/core.py
> @@ -36,14 +36,13 @@ from log import log
>  from cStringIO import StringIO
>  from textwrap import dedent
>  from threads import synchronized_self
> -import threading
>  import multiprocessing
> +import multiprocessing.dummy
>  try:
>      import simplejson as json
>  except ImportError:
>      import json
>  
> -from threadpool import ThreadPool
>  import status
>  
>  __all__ = ['Environment',
> @@ -566,31 +565,38 @@ class TestProfile:
>  
>          self.prepare_test_list(env)
>  
> -        # If concurrency is set to 'all' run all tests out of a concurrent
> -        # threadpool, if it's none, then run evey test serially. otherwise mix
> -        # and match them
> +        def test(pair):
> +            """ Function to call test.execute from .map
> +
> +            adds env and json_writer which are needed by Test.execute()
> +
> +            """
> +            name, test = pair
> +            test.execute(env, name, json_writer)
> +
> +        # Multiprocessing.dummy is a wrapper around Threading that provides a
> +        # multiprocessing compatible API
> +        #
> +        # The default value of pool is the number of virtual processor cores
> +        single = multiprocessing.dummy.Pool(1)
> +        multi = multiprocessing.dummy.Pool()
> +        chunksize = 50
> +
>          if env.concurrent == "all":
> -            pool = ThreadPool(multiprocessing.cpu_count())
> -            for (path, test) in self.test_list.items():
> -                pool.add(test.execute, (env, path, json_writer))
> -            pool.join()
> +            multi.imap(test, self.test_list.iteritems(), chunksize)
>          elif env.concurrent == "none":
> -            pool = ThreadPool(1)
> -            for (path, test) in self.test_list.items():
> -                pool.add(test.execute, (env, path, json_writer))
> -            pool.join()
> +            single.imap(test, self.test_list.iteritems(), chunksize)
>          else:
> -            pool = ThreadPool(multiprocessing.cpu_count())
> -            for (path, test) in self.test_list.items():
> -                if test.runConcurrent:
> -                    pool.add(test.execute, (env, path, json_writer))
> -            pool.join()
> -
> -            pool = ThreadPool(1)
> -            for (path, test) in self.test_list.items():
> -                if not test.runConcurrent:
> -                    pool.add(test.execute, (env, path, json_writer))
> -            pool.join()
> +            # Filter and return only thread safe tests to the threaded pool
> +            multi.imap(test, (x for x in self.test_list.iteritems() if
> +                              x[1].runConcurrent), chunksize)
> +            # Filter and return the non thread safe tests to the single pool
> +            single.imap(test, (x for x in self.test_list.iteritems() if not
> +                               x[1].runConcurrent), chunksize)
> +
> +        # Close and join the pools
> +        map(lambda x: x.close(), [multi, single])
> +        map(lambda x: x.join(), [multi, single])

I think this would be clearer as:

for pool in [multi, single]:
    pool.close()
    pool.join()

or:
multi.close()
single.close()
multi.join()
single.join()

With that change, v4 would be:
Reviewed-by: Kenneth Graunke <kenneth at whitecape.org>

>  
>      def remove_test(self, test_path):
>          """Remove a fully qualified test from the profile.
> diff --git a/framework/threadpool.py b/framework/threadpool.py
> deleted file mode 100644
> index 5d1fc56..0000000
> --- a/framework/threadpool.py
> +++ /dev/null
> @@ -1,67 +0,0 @@
> -# Copyright (c) 2013 Intel Corporation
> -#
> -# Permission is hereby granted, free of charge, to any person obtaining a
> -# copy of this software and associated documentation files (the "Software"),
> -# to deal in the Software without restriction, including without limitation
> -# the rights to use, copy, modify, merge, publish, distribute, sublicense,
> -# and/or sell copies of the Software, and to permit persons to whom the
> -# Software is furnished to do so, subject to the following conditions:
> -#
> -# The above copyright notice and this permission notice (including the next
> -# paragraph) shall be included in all copies or substantial portions of the
> -# Software.
> -#
> -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
> -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> -# IN THE SOFTWARE.
> -
> -# This code is based on the MIT licensed code by Emilio Monti found here:
> -# http://code.activestate.com/recipes/577187-python-thread-pool/
> -
> -from Queue import Queue
> -from threading import Thread
> -
> -
> -class Worker(Thread):
> -    """
> -    Simple worker thread
> -
> -    This worker simply consumes tasks off of the queue until it is empty and
> -    then waits for more tasks.
> -    """
> -
> -    def __init__(self, queue):
> -        Thread.__init__(self)
> -        self.queue = queue
> -        self.daemon = True
> -        self.start()
> -
> -    def run(self):
> -        """ This method is called in the constructor by self.start() """
> -        while True:
> -            func, args = self.queue.get()
> -            func(*args)  # XXX: Does this need to be try/except-ed?
> -            self.queue.task_done()
> -
> -
> -class ThreadPool(object):
> -    """
> -    A simple ThreadPool class that maintains a Queue object and a set of Worker
> -    threads.
> -    """
> -
> -    def __init__(self, thread_count):
> -        self.queue = Queue(thread_count)
> -        self.threads = [Worker(self.queue) for _ in xrange(thread_count)]
> -
> -    def add(self, func, args):
> -        """ Add a function and it's arguments to the queue as a tuple """
> -        self.queue.put((func, args))
> -
> -    def join(self):
> -        """ Block until self.queue is empty """
> -        self.queue.join()
> 



More information about the Piglit mailing list