Una pequeña receta sobre el patrón ThreadPool y su implementación en Python.

Introducción

ThreadPool es un patrón de diseño habitual en aplicaciones concurrentes y de comunicaciones. La idea es tener un «pool» de hilos (workers) que pueden realizar trabajos. Estos trabajos se le dan por medio de una cola FIFO. El usuario añade trabajos a la cola y cuando un hilo está libre extrae y procesa un trabajo de la cola.

Lo interesante es que el número de hilos se especifica cuando se crea el «pool» y no se crean ni destruyen hilos durante la ejecución de la aplicación. Con eso se consigue una aplicación más escalable, predecible y sin la sobrecarga que implica la creación y destrucción de recursos. A parte de las ventajas que implica el proceso en paralelo, este patrón permite tener un código más limpio y mejor estructurado para el cliente.

Implementación

La implementación que proponemos es una modificación/mejora de http://code.activestate.com/recipes/203871/. Se trata de una clase con los siguientes métodos “públicos”:

  • ThreadPool(numWorkers)
    • add(function, args, callback)
    • join()

El cliente simplemente tiene que añadir trabajos (funciones con argumentos) usando el método add(). Para terminar utiliza el méodo join() que obligará al pool a acabar impidiendo al mismo tiempo que se puedan añadir nuevos trabajos.

pool = ThreadPool(5)

pool.add(f1, [arg1, arg2])
[. . .]

pool.join()

A continuación aparece la implementación de la clase

class ThreadPool:
    class JoiningEx: pass
    """ Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread """

    def __init__(self, numThreads):
        """Initialize the thread pool with numThreads workers """
        self.__threads = []
        self.__resizeLock = threading.Lock()
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.resize(numThreads)

    def resize(self, newsize):
        """ public method to set the current pool size """
        if self.__isJoining:
            raise ThreadPool.JoiningEx()

        with self.__resizeLock:
            self.__resize(newsize)

        return True

    def __resize(self, newsize):
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        diff = newsize - len(self.__threads)

        # If we need to grow the pool, do so
        for i in range(diff):
            self.__threads.append(ThreadPool.Worker(self))

        # If we need to shrink the pool, do so
        for i in range(-diff):
            thread = self.__threads.pop()
            thread.stop = True

    def __len__(self):
        """Return the number of threads in the pool."""
        with self.__resizeLock:
            return len(self.__threads)

    def add(self, task, args=None, taskCallback=None):
        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""

        assert callable(task)

        if self.__isJoining:
            raise ThreadPool.JoiningEx()

        with self.__taskLock:
            self.__tasks.append((task, args, taskCallback))
            self.__taskLock.notify()
            return True

    def nextTask(self):
        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolWorker objects contained in the pool """
        with self.__taskLock:
            while not self.__tasks:
                if self.__isJoining:
                    raise ThreadPool.JoiningEx()

                self.__taskLock.wait()

            assert self.__tasks
            return self.__tasks.pop(0)

    def join(self, waitForTasks=True, waitForThreads=True):
        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish """
        self.__isJoining = True  # prevent more task queueing

        if waitForTasks:
            while self.__tasks:
                time.sleep(0.1)

        with self.__resizeLock:
            if waitForThreads:
                with self.__taskLock:
                    self.__taskLock.notifyAll()

                for t in self.__threads:
                    t.join()

            # ready to reuse
            del self.__threads[:]
            self.__isJoining = False

    class Worker(threading.Thread):
        """ Pooled thread class """
        def __init__(self, pool):
            """ Initialize the thread and remember the pool. """

            threading.Thread.__init__(self)
            self.__pool = pool
            self.stop = False
            self.start()

        def run(self):
            """ Until told to quit, retrieve the next task and execute
            it, calling the callback if any.  """
            while not self.stop:
                try:
                    cmd, args, callback = self.__pool.nextTask()
                except ThreadPool.JoiningEx:
                    break

                logging.debug("thread %s taken %s" % (self, cmd))

                result = cmd(*args)
                if callback:
                    callback(result)

Próximamente la versión C++ del mismo invento.



blog comments powered by Disqus