Patrón ThreadPool en Python

PythonArco

Una pequeña receta sobre el patrón ThreadPool y su implementación en Python.

Introducción

ThreadPool es un patrón de diseño habitual en aplicaciones concurrentes y de comunicaciones. La idea es tener un «pool» de hilos (workers) que pueden realizar trabajos. Estos trabajos se le dan por medio de una cola FIFO. El usuario añade trabajos a la cola y cuando un hilo está libre extrae y procesa un trabajo de la cola.

Lo interesante es que el número de hilos se especifica cuando se crea el «pool» y no se crean ni destruyen hilos durante la ejecución de la aplicación. Con eso se consigue una aplicación más escalable, predecible y sin la sobrecarga que implica la creación y destrucción de recursos. A parte de las ventajas que implica el proceso en paralelo, este patrón permite tener un código más limpio y mejor estructurado para el cliente.

Implementación

La implementación que proponemos es una modificación/mejora de http://code.activestate.com/recipes/203871/. Se trata de una clase con los siguientes métodos “públicos”:

  • ThreadPool(numWorkers)
    • add(function, args, callback)
    • join()

El cliente simplemente tiene que añadir trabajos (funciones con argumentos) usando el método add(). Para terminar utiliza el méodo join() que obligará al pool a acabar impidiendo al mismo tiempo que se puedan añadir nuevos trabajos.

pool = ThreadPool(5)
 
pool.add(f1, [arg1, arg2])
[. . .]
 
pool.join()

A continuación aparece la implementación de la clase

class ThreadPool:
    class JoiningEx: pass
    """ Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread """
 
    def init(self, numThreads):
        """Initialize the thread pool with numThreads workers """
        self.threads = []
        self.__resizeLock = threading.Lock()
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.resize(numThreads)
 
    def resize(self, newsize):
        """ public method to set the current pool size """
        if self.__isJoining:
            raise ThreadPool.JoiningEx()
 
        with self.__resizeLock:
            self.__resize(newsize)
 
        return True
 
    def __resize(self, newsize):
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        diff = newsize – len(self.__threads)
 
        # If we need to grow the pool, do so
        for i in range(diff):
            self.__threads.append(ThreadPool.Worker(self))
 
        # If we need to shrink the pool, do so
        for i in range(-diff):
            thread = self.__threads.pop()
            thread.stop = True
 
    def __len(self):
        """Return the number of threads in the pool."""
        with self.resizeLock:
            return len(self.__threads)
 
    def add(self, task, args=None, taskCallback=None):
        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""
 
        assert callable(task)
 
        if self.__isJoining:
            raise ThreadPool.JoiningEx()
 
        with self.__taskLock:
            self.__tasks.append((task, args, taskCallback))
            self.__taskLock.notify()
            return True
 
    def nextTask(self):
        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolWorker objects contained in the pool """
        with self.__taskLock:
            while not self.__tasks:
                if self.__isJoining:
                    raise ThreadPool.JoiningEx()
 
                self.__taskLock.wait()
 
            assert self.__tasks
            return self.__tasks.pop(0)
 
    def join(self, waitForTasks=True, waitForThreads=True):
        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish """
        self.__isJoining = True  # prevent more task queueing
 
        if waitForTasks:
            while self.__tasks:
                time.sleep(0.1)
 
        with self.__resizeLock:
            if waitForThreads:
                with self.__taskLock:
                    self.__taskLock.notifyAll()
 
                for t in self.__threads:
                    t.join()
 
            # ready to reuse
            del self.__threads[:]
            self.__isJoining = False
 
    class Worker(threading.Thread):
        """ Pooled thread class """
        def __init(self, pool):
            """ Initialize the thread and remember the pool. """
 
            threading.Thread.init(self)
            self.__pool = pool
            self.stop = False
            self.start()
 
        def run(self):
            """ Until told to quit, retrieve the next task and execute
            it, calling the callback if any.  """
            while not self.stop:
                try:
                    cmd, args, callback = self.__pool.nextTask()
                except ThreadPool.JoiningEx:
                    break
 
                logging.debug("thread %s taken %s" % (self, cmd))
 
                result = cmd(*args)
                if callback:
                    callback(result)

Próximamente la versión C++ del mismo invento.

Comentarios

Opciones de visualización de comentarios

Seleccione la forma que prefiera para mostrar los comentarios y haga clic en «Guardar las opciones» para activar los cambios.
Imagen de david.villa

ThreadPool en la librería estándar

La librería estándar de python-2.7 viene con una implementación de thread-pool en multiprocessing.pool.ThreadPool. Lo interesante además es que tiene la misma interfaz que multiprocessing.Pool de modo que se pueden hacer programas con pool de hilos o pool de procesos sin más que cambiar la clase a instanciar.

No soy portavoz de ningún colectivo, grupo o facción. Mi opinión es personal e intransferible.

Error

A día de hoy, el método add de la clase ThreadPool no permite obviar el segundo parámetro (la lista de argumentos que pasar a la función) porque no acepta None (el valor por defecto).
Sería buena idea añadir un bug tacker para este tipo de librerías.

Anybody can explain the below

Anybody can explain the below snippet of code extracted from above code.

def resize(self, newsize):
""" public method to set the current pool size """
if self.__isJoining:
raise ThreadPool.JoiningEx()

with self.__resizeLock:
self.__resize(newsize)

Is is possible to implement is in another alternate way.
Waiting for responses.
David Mayer

Imagen de nacho

spam?

It is obvious that spambots are becoming more and more sophisticated every day.

Whoa, that sounded pretty much like the introduction to some paper's abstract. Laughing out loud

Nacho

Imagen de int-0

mmm...

....yo creo que sí... porque además los libros esos no parecen funcionar... o a lo mejor es esta conexion xurrera q tengo aquí...

------------------------------------------------------------
$ python -c "print 'VG9udG8gZWwgcXVlIGxvIGxlYSA6KQ==\n'.decode('base64')"
------------------------------------------------------------

Imagen de david.villa

ale, sin link ya no puede ser

ale, sin link ya no puede ser spam Sticking out tongue

No soy portavoz de ningún colectivo, grupo o facción. Mi opinión es personal e intransferible.