[Piglit] [PATCH 1/2] threadpool.py: Completely rewrite threadpool.py
Dylan Baker
baker.dylan.c at gmail.com
Mon Sep 30 14:57:15 PDT 2013
On Monday, September 16, 2013 09:09:34 AM Dylan Baker wrote:
> This patch completely rewrites the threadpool module. This new
> implementation is less complicated than the previous version. It also is
> not an external project pulled in, but an original implementation.
>
> This tries to be largely API compatible with the previous ThreadPool
> implementation. And for piglit's use case it is, however, it does not
> implement the full API supported by the previous implementation.
>
> Signed-off-by: Dylan Baker <baker.dylan.c at gmail.com>
> ---
> framework/core.py | 3 +-
> framework/threadpool.py | 455
> ++++++------------------------------------------ framework/threads.py |
> 8 +-
> 3 files changed, 57 insertions(+), 409 deletions(-)
>
> diff --git a/framework/core.py b/framework/core.py
> index 150a70c..bebe1b8 100644
> --- a/framework/core.py
> +++ b/framework/core.py
> @@ -428,9 +428,8 @@ class Test:
>
> See ``Test.doRun`` for a description of the parameters.
> '''
> - args = (env, path, json_writer)
> if self.runConcurrent:
> - ConcurrentTestPool().put(self.doRun, args=args)
> + ConcurrentTestPool().put(self.doRun, args=(env, path,
> json_writer))
>
> def doRun(self, env, path, json_writer):
> '''
> diff --git a/framework/threadpool.py b/framework/threadpool.py
> index 1b4c12c..ffe8123 100644
> --- a/framework/threadpool.py
> +++ b/framework/threadpool.py
> @@ -1,418 +1,67 @@
> -# -*- coding: UTF-8 -*-
> -"""Easy to use object-oriented thread pool framework.
> -
> -A thread pool is an object that maintains a pool of worker threads to
> perform -time consuming operations in parallel. It assigns jobs to the
> threads -by putting them in a work request queue, where they are picked up
> by the -next available thread. This then performs the requested operation
> in the -background and puts the results in another queue.
> -
> -The thread pool object can then collect the results from all threads from
> -this queue as soon as they become available or after all threads have
> -finished their work. It's also possible, to define callbacks to handle
> -each result as it comes in.
> -
> -The basic concept and some code was taken from the book "Python in a
> Nutshell, -2nd edition" by Alex Martelli, O'Reilly 2006, ISBN
> 0-596-10046-9, from section -14.5 "Threaded Program Architecture". I
> wrapped the main program logic in the -ThreadPool class, added the
> WorkRequest class and the callback system and -tweaked the code here and
> there. Kudos also to Florent Aide for the exception -handling mechanism.
> -
> -Basic usage::
> -
> - >>> pool = ThreadPool(poolsize)
> - >>> requests = makeRequests(some_callable, list_of_args, callback)
> - >>> [pool.putRequest(req) for req in requests]
> - >>> pool.wait()
> -
> -See the end of the module code for a brief, annotated usage example.
> -
> -Website : http://chrisarndt.de/projects/threadpool/
> -
> -"""
> -__docformat__ = "restructuredtext en"
> -
> -__all__ = [
> - 'makeRequests',
> - 'NoResultsPending',
> - 'NoWorkersAvailable',
> - 'ThreadPool',
> - 'WorkRequest',
> - 'WorkerThread'
> -]
> -
> -__author__ = "Christopher Arndt"
> -__version__ = '1.2.7'
> -__revision__ = "$Revision: 416 $"
> -__date__ = "$Date: 2009-10-07 05:41:27 +0200 (Wed, 07 Oct 2009) $"
> -__license__ = "MIT license"
> -
> -
> -# standard library modules
> -import sys
> -import threading
> -import Queue
> -import traceback
> -
> -
> -# exceptions
> -class NoResultsPending(Exception):
> - """All work requests have been processed."""
> - pass
> -
> -class NoWorkersAvailable(Exception):
> - """No worker threads available to process remaining requests."""
> - pass
> -
> -
> -# internal module helper functions
> -def _handle_thread_exception(request, exc_info):
> - """Default exception handler callback function.
> -
> - This just prints the exception info via ``traceback.print_exception``.
> -
> +# Copyright (c) 2013 Intel Corporation
> +#
> +# Permission is hereby granted, free of charge, to any person obtaining a
> +# copy of this software and associated documentation files (the
> "Software"), +# to deal in the Software without restriction, including
> without limitation +# the rights to use, copy, modify, merge, publish,
> distribute, sublicense, +# and/or sell copies of the Software, and to
> permit persons to whom the +# Software is furnished to do so, subject to
> the following conditions: +#
> +# The above copyright notice and this permission notice (including the next
> +# paragraph) shall be included in all copies or substantial portions of
> the +# Software.
> +#
> +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
> IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
> CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
> TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE
> SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE.
> +
> +# This code is based on the MIT licensed code here:
> +# http://code.activestate.com/recipes/577187-python-thread-pool/
> +
> +from Queue import Queue
> +from threading import Thread
> +
> +
> +class Worker(Thread):
> """
> - traceback.print_exception(*exc_info)
> -
> -
> -# utility functions
> -def makeRequests(callable_, args_list, callback=None,
> - exc_callback=_handle_thread_exception):
> - """Create several work requests for same callable with different
> arguments. -
> - Convenience function for creating several work requests for the same
> - callable where each invocation of the callable receives different
> values - for its arguments.
> -
> - ``args_list`` contains the parameters for each invocation of callable.
> - Each item in ``args_list`` should be either a 2-item tuple of the list
> of - positional arguments and a dictionary of keyword arguments or a
> single, - non-tuple argument.
> -
> - See docstring for ``WorkRequest`` for info on ``callback`` and
> - ``exc_callback``.
> + Simple worker thread
>
> + This worker simply consumes tasks off of the queue until it is empty
> and + then waits for more tasks.
> """
> - requests = []
> - for item in args_list:
> - if isinstance(item, tuple):
> - requests.append(
> - WorkRequest(callable_, item[0], item[1], callback=callback,
> - exc_callback=exc_callback)
> - )
> - else:
> - requests.append(
> - WorkRequest(callable_, [item], None, callback=callback,
> - exc_callback=exc_callback)
> - )
> - return requests
> -
> -
> -# classes
> -class WorkerThread(threading.Thread):
> - """Background thread connected to the requests/results queues.
> -
> - A worker thread sits in the background and picks up work requests from
> - one queue and puts the results in another until it is dismissed.
>
> - """
> -
> - def __init__(self, requests_queue, results_queue, poll_timeout=5,
> **kwds): - """Set up thread in daemonic mode and start it
> immediatedly. -
> - ``requests_queue`` and ``results_queue`` are instances of
> - ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates
> a new - worker thread.
> -
> - """
> - threading.Thread.__init__(self, **kwds)
> - self.setDaemon(1)
> - self._requests_queue = requests_queue
> - self._results_queue = results_queue
> - self._poll_timeout = poll_timeout
> - self._dismissed = threading.Event()
> + def __init__(self, queue):
> + Thread.__init__(self)
> + self.queue = queue
> + self.daemon = True
> self.start()
>
> def run(self):
> - """Repeatedly process the job queue until told to exit."""
> + """ This method is called in the constructor by self.start() """
> while True:
> - if self._dismissed.isSet():
> - # we are dismissed, break out of loop
> - break
> - # get next work request. If we don't get a new request from the
> - # queue after self._poll_timout seconds, we jump to the start
> of - # the while loop again, to give the thread a chance to
> exit. - try:
> - request = self._requests_queue.get(True,
> self._poll_timeout) - except Queue.Empty:
> - continue
> - else:
> - if self._dismissed.isSet():
> - # we are dismissed, put back request in queue and exit
> loop - self._requests_queue.put(request)
> - break
> - try:
> - result = request.callable(*request.args,
> **request.kwds) - self._results_queue.put((request,
> result))
> - except:
> - request.exception = True
> - self._results_queue.put((request, sys.exc_info()))
> + func, args = self.queue.get()
> + func(*args) # XXX: Does this need to be try/except-ed?
> + self.queue.task_done()
>
> - def dismiss(self):
> - """Sets a flag to tell the thread to exit when done with current
> job.""" - self._dismissed.set()
> -
> -
> -class WorkRequest:
> - """A request to execute a callable for putting in the request queue
> later. -
> - See the module function ``makeRequests`` for the common case
> - where you want to build several ``WorkRequest`` objects for the same
> - callable but with different arguments for each call.
>
> +class ThreadPool(object):
> """
> -
> - def __init__(self, callable_, args=None, kwds=None, requestID=None,
> - callback=None, exc_callback=_handle_thread_exception):
> - """Create a work request for a callable and attach callbacks.
> -
> - A work request consists of the a callable to be executed by a
> - worker thread, a list of positional arguments, a dictionary
> - of keyword arguments.
> -
> - A ``callback`` function can be specified, that is called when the
> - results of the request are picked up from the result queue. It must
> - accept two anonymous arguments, the ``WorkRequest`` object and the
> - results of the callable, in that order. If you want to pass
> additional - information to the callback, just stick it on the
> request object. -
> - You can also give custom callback for when an exception occurs with
> - the ``exc_callback`` keyword parameter. It should also accept two
> - anonymous arguments, the ``WorkRequest`` and a tuple with the
> exception - details as returned by ``sys.exc_info()``. The default
> implementation - of this callback just prints the exception info via
> - ``traceback.print_exception``. If you want no exception handler
> - callback, just pass in ``None``.
> -
> - ``requestID``, if given, must be hashable since it is used by
> - ``ThreadPool`` object to store the results of that work request in
> a - dictionary. It defaults to the return value of ``id(self)``. -
> - """
> - if requestID is None:
> - self.requestID = id(self)
> - else:
> - try:
> - self.requestID = hash(requestID)
> - except TypeError:
> - raise TypeError("requestID must be hashable.")
> - self.exception = False
> - self.callback = callback
> - self.exc_callback = exc_callback
> - self.callable = callable_
> - self.args = args or []
> - self.kwds = kwds or {}
> -
> - def __str__(self):
> - return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
> - (self.requestID, self.args, self.kwds, self.exception)
> -
> -class ThreadPool:
> - """A thread pool, distributing work requests and collecting results.
> -
> - See the module docstring for more information.
> -
> + A simple ThreadPool class that maintains a Queue object and a set of
> Worker + threads.
> """
>
> - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
> - """Set up the thread pool and start num_workers worker threads. -
> - ``num_workers`` is the number of worker threads to start initially.
> -
> - If ``q_size > 0`` the size of the work *request queue* is limited
> and - the thread pool blocks when the queue is full and it tries to
> put - more work requests in it (see ``putRequest`` method), unless
> you also - use a positive ``timeout`` value for ``putRequest``.
> -
> - If ``resq_size > 0`` the size of the *results queue* is limited and
> the - worker threads will block when the queue is full and they try
> to put - new results in it.
> -
> - .. warning:
> - If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there
> is - the possibilty of a deadlock, when the results queue is not
> pulled - regularly and too many jobs are put in the work
> requests queue. - To prevent this, always set ``timeout > 0``
> when calling - ``ThreadPool.putRequest()`` and catch
> ``Queue.Full`` exceptions. -
> - """
> - self._requests_queue = Queue.Queue(q_size)
> - self._results_queue = Queue.Queue(resq_size)
> - self.workers = []
> - self.dismissedWorkers = []
> - self.workRequests = {}
> - self.createWorkers(num_workers, poll_timeout)
> -
> - def createWorkers(self, num_workers, poll_timeout=5):
> - """Add num_workers worker threads to the pool.
> -
> - ``poll_timout`` sets the interval in seconds (int or float) for how
> - ofte threads should check whether they are dismissed, while
> waiting for - requests.
> -
> - """
> - for i in range(num_workers):
> - self.workers.append(WorkerThread(self._requests_queue,
> - self._results_queue, poll_timeout=poll_timeout))
> -
> - def dismissWorkers(self, num_workers, do_join=False):
> - """Tell num_workers worker threads to quit after their current
> task.""" - dismiss_list = []
> - for i in range(min(num_workers, len(self.workers))):
> - worker = self.workers.pop()
> - worker.dismiss()
> - dismiss_list.append(worker)
> -
> - if do_join:
> - for worker in dismiss_list:
> - worker.join()
> - else:
> - self.dismissedWorkers.extend(dismiss_list)
> -
> - def joinAllDismissedWorkers(self):
> - """Perform Thread.join() on all worker threads that have been
> dismissed. - """
> - for worker in self.dismissedWorkers:
> - worker.join()
> - self.dismissedWorkers = []
> -
> - def putRequest(self, request, block=True, timeout=None):
> - """Put work request into work queue and save its id for later."""
> - assert isinstance(request, WorkRequest)
> - # don't reuse old work requests
> - assert not getattr(request, 'exception', None)
> - self._requests_queue.put(request, block, timeout)
> - self.workRequests[request.requestID] = request
> -
> - def poll(self, block=False):
> - """Process any new results in the queue."""
> - while True:
> - # still results pending?
> - if not self.workRequests:
> - raise NoResultsPending
> - # are there still workers to process remaining requests?
> - elif block and not self.workers:
> - raise NoWorkersAvailable
> - try:
> - # get back next results
> - request, result = self._results_queue.get(block=block)
> - # has an exception occured?
> - if request.exception and request.exc_callback:
> - request.exc_callback(request, result)
> - # hand results to callback, if any
> - if request.callback and not \
> - (request.exception and request.exc_callback):
> - request.callback(request, result)
> - del self.workRequests[request.requestID]
> - except Queue.Empty:
> - break
> -
> - def wait(self):
> - """Wait for results, blocking until all have arrived."""
> - while 1:
> - try:
> - self.poll(True)
> - except NoResultsPending:
> - break
> -
> -
> -################
> -# USAGE EXAMPLE
> -################
> -
> -if __name__ == '__main__':
> - import random
> - import time
> -
> - # the work the threads will have to do (rather trivial in our example)
> - def do_something(data):
> - time.sleep(random.randint(1,5))
> - result = round(random.random() * data, 5)
> - # just to show off, we throw an exception once in a while
> - if result > 5:
> - raise RuntimeError("Something extraordinary happened!")
> - return result
> -
> - # this will be called each time a result is available
> - def print_result(request, result):
> - print "**** Result from request #%s: %r" % (request.requestID,
> result) -
> - # this will be called when an exception occurs within a thread
> - # this example exception handler does little more than the default
> handler - def handle_exception(request, exc_info):
> - if not isinstance(exc_info, tuple):
> - # Something is seriously wrong...
> - print request
> - print exc_info
> - raise SystemExit
> - print "**** Exception occured in request #%s: %s" % \
> - (request.requestID, exc_info)
> -
> - # assemble the arguments for each job to a list...
> - data = [random.randint(1,10) for i in range(20)]
> - # ... and build a WorkRequest object for each item in data
> - requests = makeRequests(do_something, data, print_result,
> handle_exception) - # to use the default exception handler, uncomment
> next line and comment out - # the preceding one.
> - #requests = makeRequests(do_something, data, print_result)
> -
> - # or the other form of args_lists accepted by makeRequests: ((,), {})
> - data = [((random.randint(1,10),), {}) for i in range(20)]
> - requests.extend(
> - makeRequests(do_something, data, print_result, handle_exception)
> - #makeRequests(do_something, data, print_result)
> - # to use the default exception handler, uncomment next line and
> comment - # out the preceding one.
> - )
> -
> - # we create a pool of 3 worker threads
> - print "Creating thread pool with 3 worker threads."
> - main = ThreadPool(3)
> -
> - # then we put the work requests in the queue...
> - for req in requests:
> - main.putRequest(req)
> - print "Work request #%s added." % req.requestID
> - # or shorter:
> - # [main.putRequest(req) for req in requests]
> + def __init__(self, thread_count):
> + self.queue = Queue(thread_count)
> + self.threads = [Worker(self.queue) for _ in xrange(thread_count)]
>
> - # ...and wait for the results to arrive in the result queue
> - # by using ThreadPool.wait(). This would block until results for
> - # all work requests have arrived:
> - # main.wait()
> + def add(self, func, args):
> + """ Add a function and it's arguments to the queue as a tuple """
> + self.queue.put((func, args))
>
> - # instead we can poll for results while doing something else:
> - i = 0
> - while True:
> - try:
> - time.sleep(0.5)
> - main.poll()
> - print "Main thread working...",
> - print "(active worker threads: %i)" %
> (threading.activeCount()-1, ) - if i == 10:
> - print "**** Adding 3 more worker threads..."
> - main.createWorkers(3)
> - if i == 20:
> - print "**** Dismissing 2 worker threads..."
> - main.dismissWorkers(2)
> - i += 1
> - except KeyboardInterrupt:
> - print "**** Interrupted!"
> - break
> - except NoResultsPending:
> - print "**** No pending results."
> - break
> - if main.dismissedWorkers:
> - print "Joining all dismissed worker threads..."
> - main.joinAllDismissedWorkers()
> + def join(self):
> + """ Block until self.queue is empty """
> + self.queue.join()
> diff --git a/framework/threads.py b/framework/threads.py
> index fcc266e..ef037d1 100644
> --- a/framework/threads.py
> +++ b/framework/threads.py
> @@ -24,7 +24,7 @@
> from weakref import WeakKeyDictionary
> import multiprocessing
>
> -from threadpool import ThreadPool, WorkRequest
> +from threadpool import ThreadPool
> from patterns import Singleton
> from threading import RLock
>
> @@ -53,8 +53,8 @@ class ConcurrentTestPool(Singleton):
> self.pool = ThreadPool(multiprocessing.cpu_count())
>
> @synchronized_self
> - def put(self, callable_, args=None, kwds=None):
> - self.pool.putRequest(WorkRequest(callable_, args=args, kwds=kwds))
> + def put(self, callable, args=None):
> + self.pool.add(callable, args)
>
> def join(self):
> - self.pool.wait()
> + self.pool.join()
I'm hoping to get someone to at least test this before it lands.
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 490 bytes
Desc: This is a digitally signed message part.
URL: <http://lists.freedesktop.org/archives/piglit/attachments/20130930/cf83da9c/attachment-0001.pgp>
More information about the Piglit
mailing list