<div dir="ltr"><br><div class="gmail_extra"><br><br><div class="gmail_quote">On Tue, Jan 7, 2014 at 5:01 PM, Kenneth Graunke <span dir="ltr"><<a href="mailto:kenneth@whitecape.org" target="_blank">kenneth@whitecape.org</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div class="HOEnZb"><div class="h5">On 01/03/2014 06:07 AM, Dylan Baker wrote:<br>
> This patch simplifies threading in piglit by removing the hand-rolled<br>
> threadpool, and instead using the Pool class from multiprocessing.dummy.<br>
> This provides a map interface, allowing for very clear succinct code.<br>
><br>
> The previous implementation ran all tests out of thread pools, a serial<br>
> pool and a multi-threaded pool. This patch does the same thing for a<br>
> couple of reasons. First, the obvious solution is to use the map()<br>
> builtin for serial tests. However, map in python3 returns an iterator<br>
> instead of a list so calling map(f, x) will not actually run f(x) until<br>
> something tries to use those values. This would require considerable<br>
> restructuring to work around. Second, that could easily be split out<br>
> into another patch, and limits the number of changes in this patch.<br>
><br>
> Multiproccessing.dummy is a wrapper around the Threading module,<br>
> providing the multiproccessing API, but with threads instead of<br>
> processes.<br>
><br>
> Signed-off-by: Dylan Baker <<a href="mailto:baker.dylan.c@gmail.com">baker.dylan.c@gmail.com</a>><br>
> ---<br>
>  framework/core.py       | 47 +++++++++++++++++-----------------<br>
>  framework/threadpool.py | 67 -------------------------------------------------<br>
>  2 files changed, 23 insertions(+), 91 deletions(-)<br>
>  delete mode 100644 framework/threadpool.py<br>
><br>
> diff --git a/framework/core.py b/framework/core.py<br>
> index 8bcda5b..1e06690 100644<br>
> --- a/framework/core.py<br>
> +++ b/framework/core.py<br>
> @@ -36,14 +36,13 @@ from log import log<br>
>  from cStringIO import StringIO<br>
>  from textwrap import dedent<br>
>  from threads import synchronized_self<br>
> -import threading<br>
>  import multiprocessing<br>
> +import multiprocessing.dummy<br>
>  try:<br>
>      import simplejson as json<br>
>  except ImportError:<br>
>      import json<br>
><br>
> -from threadpool import ThreadPool<br>
>  import status<br>
><br>
>  __all__ = ['Environment',<br>
> @@ -566,31 +565,31 @@ class TestProfile:<br>
><br>
>          self.prepare_test_list(env)<br>
><br>
> -        # If concurrency is set to 'all' run all tests out of a concurrent<br>
> -        # threadpool, if it's none, then run evey test serially. otherwise mix<br>
> -        # and match them<br>
> +        def run(pair):<br>
> +            """ Function to call test.execute from .map<br>
> +<br>
> +            adds env and json_writer which are needed by Test.execute()<br>
> +<br>
> +            """<br>
> +            name, test = pair<br>
> +            test.execute(env, name, json_writer)<br>
<br>
</div></div>You just defined a function called run()...IN a function called run().<br>
<br>
*mind blown*<br>
<br>
probably shouldn't do that.  That's even worse than doRun() and<br>
doRunRun() or whatever we used to have.<br>
<br>
Would love to see a v2 with this fixed.<br></blockquote><div><br></div><div>haha, ok. <br></div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div class="im"><br>
> +<br>
> +        # Multiprocessing.dummy is a wrapper around Threading that provides a<br>
> +        # multiprocessing compatible API<br>
> +        single = multiprocessing.dummy.Pool(1)<br>
> +        multi = multiprocessing.dummy.Pool(multiprocessing.cpu_count())<br>
<br>
</div>You don't need multiprocessing.cpu_count() - that's the default.  So you<br>
can just do:<br>
<br>
multi = multiprocessing.dummy.Pool()<br></blockquote><div><br><br></div><div>Okay, I'll add a comment to make it clear that that is the default since I assumed that there was no default <br></div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">

<div class="im"><br>
> +<br>
>          if env.concurrent == "all":<br>
> -            pool = ThreadPool(multiprocessing.cpu_count())<br>
> -            for (path, test) in self.test_list.items():<br>
> -                pool.add(test.execute, (env, path, json_writer))<br>
> -            pool.join()<br>
> +            multi.map(run, self.test_list.iteritems())<br>
<br>
</div>I'm unclear whether we want map, imap, or imap_unordered here.  I guess<br>
it seems to work.  Still, thoughts?<br></blockquote><div><br></div><div>I'm not sure either, we're running into the limits of my understanding of python's threads. Without digging into SO and reading up on it I suspect that imap_unordered would be the best. I'll do some experimentation and see what happens.<br>
</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div class="HOEnZb"><div class="h5"><br>
>          elif env.concurrent == "none":<br>
> -            pool = ThreadPool(1)<br>
> -            for (path, test) in self.test_list.items():<br>
> -                pool.add(test.execute, (env, path, json_writer))<br>
> -            pool.join()<br>
> +            single.map(run, self.test_list.iteritems())<br>
>          else:<br>
> -            pool = ThreadPool(multiprocessing.cpu_count())<br>
> -            for (path, test) in self.test_list.items():<br>
> -                if test.runConcurrent:<br>
> -                    pool.add(test.execute, (env, path, json_writer))<br>
> -            pool.join()<br>
> -<br>
> -            pool = ThreadPool(1)<br>
> -            for (path, test) in self.test_list.items():<br>
> -                if not test.runConcurrent:<br>
> -                    pool.add(test.execute, (env, path, json_writer))<br>
> -            pool.join()<br>
> +            # Filter and return only thread safe tests to the threaded pool<br>
> +            multi.map(run, (x for x in self.test_list.iteritems() if<br>
> +                            x[1].runConcurrent))<br>
> +            # Filter and return the non thread safe tests to the single pool<br>
> +            single.map(run, (x for x in self.test_list.iteritems() if not<br>
> +                             x[1].runConcurrent))<br>
><br>
>      def remove_test(self, test_path):<br>
>          """Remove a fully qualified test from the profile.<br>
> diff --git a/framework/threadpool.py b/framework/threadpool.py<br>
> deleted file mode 100644<br>
> index 5d1fc56..0000000<br>
> --- a/framework/threadpool.py<br>
> +++ /dev/null<br>
> @@ -1,67 +0,0 @@<br>
> -# Copyright (c) 2013 Intel Corporation<br>
> -#<br>
> -# Permission is hereby granted, free of charge, to any person obtaining a<br>
> -# copy of this software and associated documentation files (the "Software"),<br>
> -# to deal in the Software without restriction, including without limitation<br>
> -# the rights to use, copy, modify, merge, publish, distribute, sublicense,<br>
> -# and/or sell copies of the Software, and to permit persons to whom the<br>
> -# Software is furnished to do so, subject to the following conditions:<br>
> -#<br>
> -# The above copyright notice and this permission notice (including the next<br>
> -# paragraph) shall be included in all copies or substantial portions of the<br>
> -# Software.<br>
> -#<br>
> -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR<br>
> -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,<br>
> -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL<br>
> -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER<br>
> -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING<br>
> -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS<br>
> -# IN THE SOFTWARE.<br>
> -<br>
> -# This code is based on the MIT licensed code by Emilio Monti found here:<br>
> -# <a href="http://code.activestate.com/recipes/577187-python-thread-pool/" target="_blank">http://code.activestate.com/recipes/577187-python-thread-pool/</a><br>
> -<br>
> -from Queue import Queue<br>
> -from threading import Thread<br>
> -<br>
> -<br>
> -class Worker(Thread):<br>
> -    """<br>
> -    Simple worker thread<br>
> -<br>
> -    This worker simply consumes tasks off of the queue until it is empty and<br>
> -    then waits for more tasks.<br>
> -    """<br>
> -<br>
> -    def __init__(self, queue):<br>
> -        Thread.__init__(self)<br>
> -        self.queue = queue<br>
> -        self.daemon = True<br>
> -        self.start()<br>
> -<br>
> -    def run(self):<br>
> -        """ This method is called in the constructor by self.start() """<br>
> -        while True:<br>
> -            func, args = self.queue.get()<br>
> -            func(*args)  # XXX: Does this need to be try/except-ed?<br>
> -            self.queue.task_done()<br>
> -<br>
> -<br>
> -class ThreadPool(object):<br>
> -    """<br>
> -    A simple ThreadPool class that maintains a Queue object and a set of Worker<br>
> -    threads.<br>
> -    """<br>
> -<br>
> -    def __init__(self, thread_count):<br>
> -        self.queue = Queue(thread_count)<br>
> -        self.threads = [Worker(self.queue) for _ in xrange(thread_count)]<br>
> -<br>
> -    def add(self, func, args):<br>
> -        """ Add a function and it's arguments to the queue as a tuple """<br>
> -        self.queue.put((func, args))<br>
> -<br>
> -    def join(self):<br>
> -        """ Block until self.queue is empty """<br>
> -        self.queue.join()<br>
><br>
<br>
</div></div></blockquote></div><br></div></div>