[Piglit] [PATCH 1/2] threadpool.py: Completely rewrite threadpool.py

Kenneth Graunke kenneth at whitecape.org
Mon Oct 7 07:07:23 CEST 2013


On 09/16/2013 09:09 AM, Dylan Baker wrote:
[snip]
> +# Copyright (c) 2013 Intel Corporation
> +#
> +# Permission is hereby granted, free of charge, to any person obtaining a
> +# copy of this software and associated documentation files (the "Software"),
> +# to deal in the Software without restriction, including without limitation
> +# the rights to use, copy, modify, merge, publish, distribute, sublicense,
> +# and/or sell copies of the Software, and to permit persons to whom the
> +# Software is furnished to do so, subject to the following conditions:
> +#
> +# The above copyright notice and this permission notice (including the next
> +# paragraph) shall be included in all copies or substantial portions of the
> +# Software.
> +#
> +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
> +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> +# IN THE SOFTWARE.
> +
> +# This code is based on the MIT licensed code here:
> +# http://code.activestate.com/recipes/577187-python-thread-pool/

I might say:
# This code is based on Emilio Monti's MIT licensed snippet from:
# http://code.activestate.com/recipes/577187-python-thread-pool/

I was going to say you should add his copyright, but you've rewritten
most of the code anyway, so I don't think you really need to.

Either way, this looks great.
Reviewed-and-tested-by: Kenneth Graunke <kenneth at whitecape.org>

> +from Queue import Queue
> +from threading import Thread
> +
> +
> +class Worker(Thread):
>      """
> -    traceback.print_exception(*exc_info)
> -
> -
> -# utility functions
> -def makeRequests(callable_, args_list, callback=None,
> -        exc_callback=_handle_thread_exception):
> -    """Create several work requests for same callable with different arguments.
> -
> -    Convenience function for creating several work requests for the same
> -    callable where each invocation of the callable receives different values
> -    for its arguments.
> -
> -    ``args_list`` contains the parameters for each invocation of callable.
> -    Each item in ``args_list`` should be either a 2-item tuple of the list of
> -    positional arguments and a dictionary of keyword arguments or a single,
> -    non-tuple argument.
> -
> -    See docstring for ``WorkRequest`` for info on ``callback`` and
> -    ``exc_callback``.
> +    Simple worker thread
>  
> +    This worker simply consumes tasks off of the queue until it is empty and
> +    then waits for more tasks.
>      """
> -    requests = []
> -    for item in args_list:
> -        if isinstance(item, tuple):
> -            requests.append(
> -                WorkRequest(callable_, item[0], item[1], callback=callback,
> -                    exc_callback=exc_callback)
> -            )
> -        else:
> -            requests.append(
> -                WorkRequest(callable_, [item], None, callback=callback,
> -                    exc_callback=exc_callback)
> -            )
> -    return requests
> -
> -
> -# classes
> -class WorkerThread(threading.Thread):
> -    """Background thread connected to the requests/results queues.
> -
> -    A worker thread sits in the background and picks up work requests from
> -    one queue and puts the results in another until it is dismissed.
>  
> -    """
> -
> -    def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
> -        """Set up thread in daemonic mode and start it immediatedly.
> -
> -        ``requests_queue`` and ``results_queue`` are instances of
> -        ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
> -        worker thread.
> -
> -        """
> -        threading.Thread.__init__(self, **kwds)
> -        self.setDaemon(1)
> -        self._requests_queue = requests_queue
> -        self._results_queue = results_queue
> -        self._poll_timeout = poll_timeout
> -        self._dismissed = threading.Event()
> +    def __init__(self, queue):
> +        Thread.__init__(self)
> +        self.queue = queue
> +        self.daemon = True
>          self.start()
>  
>      def run(self):
> -        """Repeatedly process the job queue until told to exit."""
> +        """ This method is called in the constructor by self.start() """
>          while True:
> -            if self._dismissed.isSet():
> -                # we are dismissed, break out of loop
> -                break
> -            # get next work request. If we don't get a new request from the
> -            # queue after self._poll_timout seconds, we jump to the start of
> -            # the while loop again, to give the thread a chance to exit.
> -            try:
> -                request = self._requests_queue.get(True, self._poll_timeout)
> -            except Queue.Empty:
> -                continue
> -            else:
> -                if self._dismissed.isSet():
> -                    # we are dismissed, put back request in queue and exit loop
> -                    self._requests_queue.put(request)
> -                    break
> -                try:
> -                    result = request.callable(*request.args, **request.kwds)
> -                    self._results_queue.put((request, result))
> -                except:
> -                    request.exception = True
> -                    self._results_queue.put((request, sys.exc_info()))
> +            func, args = self.queue.get()
> +            func(*args)  # XXX: Does this need to be try/except-ed?
> +            self.queue.task_done()
>  
> -    def dismiss(self):
> -        """Sets a flag to tell the thread to exit when done with current job."""
> -        self._dismissed.set()
> -
> -
> -class WorkRequest:
> -    """A request to execute a callable for putting in the request queue later.
> -
> -    See the module function ``makeRequests`` for the common case
> -    where you want to build several ``WorkRequest`` objects for the same
> -    callable but with different arguments for each call.
>  
> +class ThreadPool(object):
>      """
> -
> -    def __init__(self, callable_, args=None, kwds=None, requestID=None,
> -            callback=None, exc_callback=_handle_thread_exception):
> -        """Create a work request for a callable and attach callbacks.
> -
> -        A work request consists of the a callable to be executed by a
> -        worker thread, a list of positional arguments, a dictionary
> -        of keyword arguments.
> -
> -        A ``callback`` function can be specified, that is called when the
> -        results of the request are picked up from the result queue. It must
> -        accept two anonymous arguments, the ``WorkRequest`` object and the
> -        results of the callable, in that order. If you want to pass additional
> -        information to the callback, just stick it on the request object.
> -
> -        You can also give custom callback for when an exception occurs with
> -        the ``exc_callback`` keyword parameter. It should also accept two
> -        anonymous arguments, the ``WorkRequest`` and a tuple with the exception
> -        details as returned by ``sys.exc_info()``. The default implementation
> -        of this callback just prints the exception info via
> -        ``traceback.print_exception``. If you want no exception handler
> -        callback, just pass in ``None``.
> -
> -        ``requestID``, if given, must be hashable since it is used by
> -        ``ThreadPool`` object to store the results of that work request in a
> -        dictionary. It defaults to the return value of ``id(self)``.
> -
> -        """
> -        if requestID is None:
> -            self.requestID = id(self)
> -        else:
> -            try:
> -                self.requestID = hash(requestID)
> -            except TypeError:
> -                raise TypeError("requestID must be hashable.")
> -        self.exception = False
> -        self.callback = callback
> -        self.exc_callback = exc_callback
> -        self.callable = callable_
> -        self.args = args or []
> -        self.kwds = kwds or {}
> -
> -    def __str__(self):
> -        return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
> -            (self.requestID, self.args, self.kwds, self.exception)
> -
> -class ThreadPool:
> -    """A thread pool, distributing work requests and collecting results.
> -
> -    See the module docstring for more information.
> -
> +    A simple ThreadPool class that maintains a Queue object and a set of Worker
> +    threads.
>      """
>  
> -    def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
> -        """Set up the thread pool and start num_workers worker threads.
> -
> -        ``num_workers`` is the number of worker threads to start initially.
> -
> -        If ``q_size > 0`` the size of the work *request queue* is limited and
> -        the thread pool blocks when the queue is full and it tries to put
> -        more work requests in it (see ``putRequest`` method), unless you also
> -        use a positive ``timeout`` value for ``putRequest``.
> -
> -        If ``resq_size > 0`` the size of the *results queue* is limited and the
> -        worker threads will block when the queue is full and they try to put
> -        new results in it.
> -
> -        .. warning:
> -            If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
> -            the possibilty of a deadlock, when the results queue is not pulled
> -            regularly and too many jobs are put in the work requests queue.
> -            To prevent this, always set ``timeout > 0`` when calling
> -            ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
> -
> -        """
> -        self._requests_queue = Queue.Queue(q_size)
> -        self._results_queue = Queue.Queue(resq_size)
> -        self.workers = []
> -        self.dismissedWorkers = []
> -        self.workRequests = {}
> -        self.createWorkers(num_workers, poll_timeout)
> -
> -    def createWorkers(self, num_workers, poll_timeout=5):
> -        """Add num_workers worker threads to the pool.
> -
> -        ``poll_timout`` sets the interval in seconds (int or float) for how
> -        ofte threads should check whether they are dismissed, while waiting for
> -        requests.
> -
> -        """
> -        for i in range(num_workers):
> -            self.workers.append(WorkerThread(self._requests_queue,
> -                self._results_queue, poll_timeout=poll_timeout))
> -
> -    def dismissWorkers(self, num_workers, do_join=False):
> -        """Tell num_workers worker threads to quit after their current task."""
> -        dismiss_list = []
> -        for i in range(min(num_workers, len(self.workers))):
> -            worker = self.workers.pop()
> -            worker.dismiss()
> -            dismiss_list.append(worker)
> -
> -        if do_join:
> -            for worker in dismiss_list:
> -                worker.join()
> -        else:
> -            self.dismissedWorkers.extend(dismiss_list)
> -
> -    def joinAllDismissedWorkers(self):
> -        """Perform Thread.join() on all worker threads that have been dismissed.
> -        """
> -        for worker in self.dismissedWorkers:
> -            worker.join()
> -        self.dismissedWorkers = []
> -
> -    def putRequest(self, request, block=True, timeout=None):
> -        """Put work request into work queue and save its id for later."""
> -        assert isinstance(request, WorkRequest)
> -        # don't reuse old work requests
> -        assert not getattr(request, 'exception', None)
> -        self._requests_queue.put(request, block, timeout)
> -        self.workRequests[request.requestID] = request
> -
> -    def poll(self, block=False):
> -        """Process any new results in the queue."""
> -        while True:
> -            # still results pending?
> -            if not self.workRequests:
> -                raise NoResultsPending
> -            # are there still workers to process remaining requests?
> -            elif block and not self.workers:
> -                raise NoWorkersAvailable
> -            try:
> -                # get back next results
> -                request, result = self._results_queue.get(block=block)
> -                # has an exception occured?
> -                if request.exception and request.exc_callback:
> -                    request.exc_callback(request, result)
> -                # hand results to callback, if any
> -                if request.callback and not \
> -                       (request.exception and request.exc_callback):
> -                    request.callback(request, result)
> -                del self.workRequests[request.requestID]
> -            except Queue.Empty:
> -                break
> -
> -    def wait(self):
> -        """Wait for results, blocking until all have arrived."""
> -        while 1:
> -            try:
> -                self.poll(True)
> -            except NoResultsPending:
> -                break
> -
> -
> -################
> -# USAGE EXAMPLE
> -################
> -
> -if __name__ == '__main__':
> -    import random
> -    import time
> -
> -    # the work the threads will have to do (rather trivial in our example)
> -    def do_something(data):
> -        time.sleep(random.randint(1,5))
> -        result = round(random.random() * data, 5)
> -        # just to show off, we throw an exception once in a while
> -        if result > 5:
> -            raise RuntimeError("Something extraordinary happened!")
> -        return result
> -
> -    # this will be called each time a result is available
> -    def print_result(request, result):
> -        print "**** Result from request #%s: %r" % (request.requestID, result)
> -
> -    # this will be called when an exception occurs within a thread
> -    # this example exception handler does little more than the default handler
> -    def handle_exception(request, exc_info):
> -        if not isinstance(exc_info, tuple):
> -            # Something is seriously wrong...
> -            print request
> -            print exc_info
> -            raise SystemExit
> -        print "**** Exception occured in request #%s: %s" % \
> -          (request.requestID, exc_info)
> -
> -    # assemble the arguments for each job to a list...
> -    data = [random.randint(1,10) for i in range(20)]
> -    # ... and build a WorkRequest object for each item in data
> -    requests = makeRequests(do_something, data, print_result, handle_exception)
> -    # to use the default exception handler, uncomment next line and comment out
> -    # the preceding one.
> -    #requests = makeRequests(do_something, data, print_result)
> -
> -    # or the other form of args_lists accepted by makeRequests: ((,), {})
> -    data = [((random.randint(1,10),), {}) for i in range(20)]
> -    requests.extend(
> -        makeRequests(do_something, data, print_result, handle_exception)
> -        #makeRequests(do_something, data, print_result)
> -        # to use the default exception handler, uncomment next line and comment
> -        # out the preceding one.
> -    )
> -
> -    # we create a pool of 3 worker threads
> -    print "Creating thread pool with 3 worker threads."
> -    main = ThreadPool(3)
> -
> -    # then we put the work requests in the queue...
> -    for req in requests:
> -        main.putRequest(req)
> -        print "Work request #%s added." % req.requestID
> -    # or shorter:
> -    # [main.putRequest(req) for req in requests]
> +    def __init__(self, thread_count):
> +        self.queue = Queue(thread_count)
> +        self.threads = [Worker(self.queue) for _ in xrange(thread_count)]
>  
> -    # ...and wait for the results to arrive in the result queue
> -    # by using ThreadPool.wait(). This would block until results for
> -    # all work requests have arrived:
> -    # main.wait()
> +    def add(self, func, args):
> +        """ Add a function and it's arguments to the queue as a tuple """
> +        self.queue.put((func, args))
>  
> -    # instead we can poll for results while doing something else:
> -    i = 0
> -    while True:
> -        try:
> -            time.sleep(0.5)
> -            main.poll()
> -            print "Main thread working...",
> -            print "(active worker threads: %i)" % (threading.activeCount()-1, )
> -            if i == 10:
> -                print "**** Adding 3 more worker threads..."
> -                main.createWorkers(3)
> -            if i == 20:
> -                print "**** Dismissing 2 worker threads..."
> -                main.dismissWorkers(2)
> -            i += 1
> -        except KeyboardInterrupt:
> -            print "**** Interrupted!"
> -            break
> -        except NoResultsPending:
> -            print "**** No pending results."
> -            break
> -    if main.dismissedWorkers:
> -        print "Joining all dismissed worker threads..."
> -        main.joinAllDismissedWorkers()
> +    def join(self):
> +        """ Block until self.queue is empty """
> +        self.queue.join()
> diff --git a/framework/threads.py b/framework/threads.py
> index fcc266e..ef037d1 100644
> --- a/framework/threads.py
> +++ b/framework/threads.py
> @@ -24,7 +24,7 @@
>  from weakref import WeakKeyDictionary
>  import multiprocessing
>  
> -from threadpool import ThreadPool, WorkRequest
> +from threadpool import ThreadPool
>  from patterns import Singleton
>  from threading import RLock
>  
> @@ -53,8 +53,8 @@ class ConcurrentTestPool(Singleton):
>          self.pool = ThreadPool(multiprocessing.cpu_count())
>  
>      @synchronized_self
> -    def put(self, callable_, args=None, kwds=None):
> -        self.pool.putRequest(WorkRequest(callable_, args=args, kwds=kwds))
> +    def put(self, callable, args=None):
> +        self.pool.add(callable, args)
>  
>      def join(self):
> -        self.pool.wait()
> +        self.pool.join()
> 



More information about the Piglit mailing list