[Piglit] [PATCH] Simplify piglit threading

Dylan Baker baker.dylan.c at gmail.com
Wed Jan 8 15:53:35 PST 2014


On Wednesday, January 08, 2014 02:16:39 PM Dylan Baker wrote:
> On Tue, Jan 7, 2014 at 5:01 PM, Kenneth Graunke 
<kenneth at whitecape.org>wrote:
> > On 01/03/2014 06:07 AM, Dylan Baker wrote:
> > > This patch simplifies threading in piglit by removing the hand-rolled
> > > threadpool, and instead using the Pool class from multiprocessing.dummy.
> > > This provides a map interface, allowing for very clear succinct code.
> > > 
> > > The previous implementation ran all tests out of thread pools, a serial
> > > pool and a multi-threaded pool. This patch does the same thing for a
> > > couple of reasons. First, the obvious solution is to use the map()
> > > builtin for serial tests. However, map in python3 returns an iterator
> > > instead of a list so calling map(f, x) will not actually run f(x) until
> > > something tries to use those values. This would require considerable
> > > restructuring to work around. Second, that could easily be split out
> > > into another patch, and limits the number of changes in this patch.
> > > 
> > > Multiproccessing.dummy is a wrapper around the Threading module,
> > > providing the multiproccessing API, but with threads instead of
> > > processes.
> > > 
> > > Signed-off-by: Dylan Baker <baker.dylan.c at gmail.com>
> > > ---
> > > 
> > >  framework/core.py       | 47 +++++++++++++++++-----------------
> > >  framework/threadpool.py | 67
> > 
> > -------------------------------------------------
> > 
> > >  2 files changed, 23 insertions(+), 91 deletions(-)
> > >  delete mode 100644 framework/threadpool.py
> > > 
> > > diff --git a/framework/core.py b/framework/core.py
> > > index 8bcda5b..1e06690 100644
> > > --- a/framework/core.py
> > > +++ b/framework/core.py
> > > @@ -36,14 +36,13 @@ from log import log
> > > 
> > >  from cStringIO import StringIO
> > >  from textwrap import dedent
> > >  from threads import synchronized_self
> > > 
> > > -import threading
> > > 
> > >  import multiprocessing
> > > 
> > > +import multiprocessing.dummy
> > > 
> > >  try:
> > >      import simplejson as json
> > >  
> > >  except ImportError:
> > >      import json
> > > 
> > > -from threadpool import ThreadPool
> > > 
> > >  import status
> > >  
> > >  __all__ = ['Environment',
> > > 
> > > @@ -566,31 +565,31 @@ class TestProfile:
> > >          self.prepare_test_list(env)
> > > 
> > > -        # If concurrency is set to 'all' run all tests out of a
> > 
> > concurrent
> > 
> > > -        # threadpool, if it's none, then run evey test serially.
> > 
> > otherwise mix
> > 
> > > -        # and match them
> > > +        def run(pair):
> > > +            """ Function to call test.execute from .map
> > > +
> > > +            adds env and json_writer which are needed by Test.execute()
> > > +
> > > +            """
> > > +            name, test = pair
> > > +            test.execute(env, name, json_writer)
> > 
> > You just defined a function called run()...IN a function called run().
> > 
> > *mind blown*
> > 
> > probably shouldn't do that.  That's even worse than doRun() and
> > doRunRun() or whatever we used to have.
> > 
> > Would love to see a v2 with this fixed.
> 
> haha, ok.
> 
> > > +
> > > +        # Multiprocessing.dummy is a wrapper around Threading that
> > 
> > provides a
> > 
> > > +        # multiprocessing compatible API
> > > +        single = multiprocessing.dummy.Pool(1)
> > > +        multi = multiprocessing.dummy.Pool(multiprocessing.cpu_count())
> > 
> > You don't need multiprocessing.cpu_count() - that's the default.  So you
> > can just do:
> > 
> > multi = multiprocessing.dummy.Pool()
> 
> Okay, I'll add a comment to make it clear that that is the default since I
> assumed that there was no default
> 
> > > +
> > > 
> > >          if env.concurrent == "all":
> > > -            pool = ThreadPool(multiprocessing.cpu_count())
> > > -            for (path, test) in self.test_list.items():
> > > -                pool.add(test.execute, (env, path, json_writer))
> > > -            pool.join()
> > > +            multi.map(run, self.test_list.iteritems())
> > 
> > I'm unclear whether we want map, imap, or imap_unordered here.  I guess
> > it seems to work.  Still, thoughts?
> 
> I'm not sure either, we're running into the limits of my understanding of
> python's threads. Without digging into SO and reading up on it I suspect
> that imap_unordered would be the best. I'll do some experimentation and see
> what happens.

My poking at imap and friends leaves me believing that what we want is map(), 
imap and imap_unordered don't seem to run any faster, and require additional 
Pool.close() and Pool.join() calls (map joins by default).

> 
> > >          elif env.concurrent == "none":
> > > -            pool = ThreadPool(1)
> > > -            for (path, test) in self.test_list.items():
> > > -                pool.add(test.execute, (env, path, json_writer))
> > > -            pool.join()
> > > +            single.map(run, self.test_list.iteritems())
> > > 
> > >          else:
> > > -            pool = ThreadPool(multiprocessing.cpu_count())
> > > -            for (path, test) in self.test_list.items():
> > > -                if test.runConcurrent:
> > > -                    pool.add(test.execute, (env, path, json_writer))
> > > -            pool.join()
> > > -
> > > -            pool = ThreadPool(1)
> > > -            for (path, test) in self.test_list.items():
> > > -                if not test.runConcurrent:
> > > -                    pool.add(test.execute, (env, path, json_writer))
> > > -            pool.join()
> > > +            # Filter and return only thread safe tests to the threaded
> > 
> > pool
> > 
> > > +            multi.map(run, (x for x in self.test_list.iteritems() if
> > > +                            x[1].runConcurrent))
> > > +            # Filter and return the non thread safe tests to the single
> > 
> > pool
> > 
> > > +            single.map(run, (x for x in self.test_list.iteritems() if
> > 
> > not
> > 
> > > +                             x[1].runConcurrent))
> > > 
> > >      def remove_test(self, test_path):
> > >          """Remove a fully qualified test from the profile.
> > > 
> > > diff --git a/framework/threadpool.py b/framework/threadpool.py
> > > deleted file mode 100644
> > > index 5d1fc56..0000000
> > > --- a/framework/threadpool.py
> > > +++ /dev/null
> > > @@ -1,67 +0,0 @@
> > > -# Copyright (c) 2013 Intel Corporation
> > > -#
> > > -# Permission is hereby granted, free of charge, to any person obtaining
> > 
> > a
> > 
> > > -# copy of this software and associated documentation files (the
> > 
> > "Software"),
> > 
> > > -# to deal in the Software without restriction, including without
> > 
> > limitation
> > 
> > > -# the rights to use, copy, modify, merge, publish, distribute,
> > 
> > sublicense,
> > 
> > > -# and/or sell copies of the Software, and to permit persons to whom the
> > > -# Software is furnished to do so, subject to the following conditions:
> > > -#
> > > -# The above copyright notice and this permission notice (including the
> > 
> > next
> > 
> > > -# paragraph) shall be included in all copies or substantial portions of
> > 
> > the
> > 
> > > -# Software.
> > > -#
> > > -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> > 
> > EXPRESS OR
> > 
> > > -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> > 
> > MERCHANTABILITY,
> > 
> > > -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT
> > 
> > SHALL
> > 
> > > -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> > 
> > OTHER
> > 
> > > -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> > 
> > ARISING
> > 
> > > -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
> > 
> > DEALINGS
> > 
> > > -# IN THE SOFTWARE.
> > > -
> > > -# This code is based on the MIT licensed code by Emilio Monti found
> > 
> > here:
> > > -# http://code.activestate.com/recipes/577187-python-thread-pool/
> > > -
> > > -from Queue import Queue
> > > -from threading import Thread
> > > -
> > > -
> > > -class Worker(Thread):
> > > -    """
> > > -    Simple worker thread
> > > -
> > > -    This worker simply consumes tasks off of the queue until it is
> > 
> > empty and
> > 
> > > -    then waits for more tasks.
> > > -    """
> > > -
> > > -    def __init__(self, queue):
> > > -        Thread.__init__(self)
> > > -        self.queue = queue
> > > -        self.daemon = True
> > > -        self.start()
> > > -
> > > -    def run(self):
> > > -        """ This method is called in the constructor by self.start()
> > > """
> > > -        while True:
> > > -            func, args = self.queue.get()
> > > -            func(*args)  # XXX: Does this need to be try/except-ed?
> > > -            self.queue.task_done()
> > > -
> > > -
> > > -class ThreadPool(object):
> > > -    """
> > > -    A simple ThreadPool class that maintains a Queue object and a set
> > 
> > of Worker
> > 
> > > -    threads.
> > > -    """
> > > -
> > > -    def __init__(self, thread_count):
> > > -        self.queue = Queue(thread_count)
> > > -        self.threads = [Worker(self.queue) for _ in
> > 
> > xrange(thread_count)]
> > 
> > > -
> > > -    def add(self, func, args):
> > > -        """ Add a function and it's arguments to the queue as a tuple
> > 
> > """
> > 
> > > -        self.queue.put((func, args))
> > > -
> > > -    def join(self):
> > > -        """ Block until self.queue is empty """
> > > -        self.queue.join()



More information about the Piglit mailing list