Patrón ThreadPool en Python
Una pequeña receta sobre el patrón ThreadPool y su implementación en Python.
Introducción
ThreadPool es un patrón de diseño habitual en aplicaciones concurrentes y de comunicaciones. La idea es tener un «pool» de hilos (workers) que pueden realizar trabajos. Estos trabajos se le dan por medio de una cola FIFO. El usuario añade trabajos a la cola y cuando un hilo está libre extrae y procesa un trabajo de la cola.
Lo interesante es que el número de hilos se especifica cuando se crea el «pool» y no se crean ni destruyen hilos durante la ejecución de la aplicación. Con eso se consigue una aplicación más escalable, predecible y sin la sobrecarga que implica la creación y destrucción de recursos. A parte de las ventajas que implica el proceso en paralelo, este patrón permite tener un código más limpio y mejor estructurado para el cliente.
Implementación
La implementación que proponemos es una modificación/mejora de http://code.activestate.com/recipes/203871/. Se trata de una clase con los siguientes métodos “públicos”:
- ThreadPool(numWorkers)
- add(function, args, callback)
- join()
El cliente simplemente tiene que añadir trabajos (funciones con argumentos) usando el método add(). Para terminar utiliza el méodo join() que obligará al pool a acabar impidiendo al mismo tiempo que se puedan añadir nuevos trabajos.
pool = ThreadPool(5)
pool.add(f1, [arg1, arg2])
[. . .]
pool.join()
A continuación aparece la implementación de la clase
class ThreadPool:
class JoiningEx: pass
""" Flexible thread pool class. Creates a pool of threads, then
accepts tasks that will be dispatched to the next available
thread """
def __init__(self, numThreads):
"""Initialize the thread pool with numThreads workers """
self.__threads = []
self.__resizeLock = threading.Lock()
self.__taskLock = threading.Condition(threading.Lock())
self.__tasks = []
self.__isJoining = False
self.resize(numThreads)
def resize(self, newsize):
""" public method to set the current pool size """
if self.__isJoining:
raise ThreadPool.JoiningEx()
with self.__resizeLock:
self.__resize(newsize)
return True
def __resize(self, newsize):
"""Set the current pool size, spawning or terminating threads
if necessary. Internal use only; assumes the resizing lock is
held."""
diff = newsize - len(self.__threads)
# If we need to grow the pool, do so
for i in range(diff):
self.__threads.append(ThreadPool.Worker(self))
# If we need to shrink the pool, do so
for i in range(-diff):
thread = self.__threads.pop()
thread.stop = True
def __len__(self):
"""Return the number of threads in the pool."""
with self.__resizeLock:
return len(self.__threads)
def add(self, task, args=None, taskCallback=None):
"""Insert a task into the queue. task must be callable;
args and taskCallback can be None."""
assert callable(task)
if self.__isJoining:
raise ThreadPool.JoiningEx()
with self.__taskLock:
self.__tasks.append((task, args, taskCallback))
self.__taskLock.notify()
return True
def nextTask(self):
""" Retrieve the next task from the task queue. For use
only by ThreadPoolWorker objects contained in the pool """
with self.__taskLock:
while not self.__tasks:
if self.__isJoining:
raise ThreadPool.JoiningEx()
self.__taskLock.wait()
assert self.__tasks
return self.__tasks.pop(0)
def join(self, waitForTasks=True, waitForThreads=True):
""" Clear the task queue and terminate all pooled threads,
optionally allowing the tasks and threads to finish """
self.__isJoining = True # prevent more task queueing
if waitForTasks:
while self.__tasks:
time.sleep(0.1)
with self.__resizeLock:
if waitForThreads:
with self.__taskLock:
self.__taskLock.notifyAll()
for t in self.__threads:
t.join()
# ready to reuse
del self.__threads[:]
self.__isJoining = False
class Worker(threading.Thread):
""" Pooled thread class """
def __init__(self, pool):
""" Initialize the thread and remember the pool. """
threading.Thread.__init__(self)
self.__pool = pool
self.stop = False
self.start()
def run(self):
""" Until told to quit, retrieve the next task and execute
it, calling the callback if any. """
while not self.stop:
try:
cmd, args, callback = self.__pool.nextTask()
except ThreadPool.JoiningEx:
break
logging.debug("thread %s taken %s" % (self, cmd))
result = cmd(*args)
if callback:
callback(result)
Próximamente la versión C++ del mismo invento.