es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Problema del administrador de contexto ThreadPool con el método appy_async?

Estoy teniendo un problema con ThreadPool cuando se usa como gestor de contexto. Cuando utilizo el método apply_async y trato de obtener los resultados con el método get(), simplemente se bloquea.

El código proporcionado tiene el ThreadPool como gestor de contexto. Siempre se bloquea en la parte inferior cuando intento obtener los resultados.
He comentado el código que funciona correctamente (opción #2) donde creo y cierro manualmente el pool.

¿Alguna idea de por qué se bloquea?

import threading
import time
import random
import os
import logging
from multiprocessing.pool import ThreadPool


glock = threading.Lock()

def run(pause_time, lock=glock):
    thread_name = threading.currentThread().name
    logging.info(f'intentando bloqueo para el hilo: {thread_name}')
    with lock:
        logging.info(f'obtuve el bloqueo para el hilo: {thread_name}')
        logging.info(f'pausando {pause_time} segundos para {thread_name}')
        time.sleep(pause_time)

    logging.info(f'liberé el bloqueo para el hilo: {thread_name}')

    return 'abc';


if __name__ == '__main__':
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(level=logging.INFO, format=format)

    jobs = []
    job = None
    num_threads = os.cpu_count()

    """Opción #1 no funciona, ¡jobs.get() se bloqueará!"""
    with ThreadPool(num_threads) as pool:
        for i in range(1):
            pause_time = random.randint(1,5)
            jobs.append(pool.apply_async(run, args=(pause_time,glock)))

    """La opción #2 funciona bien. Necesitará descomentarlo"""
    # pool = ThreadPool(num_threads)
    # for i in range(2):
    #     pause_time = random.randint(1,5)
    #     jobs.append(pool.apply_async(run, args=(pause_time,glock)))
    # pool.close()

    pool.join()
    for job in jobs:
        job.get()

    logging.info('terminado')
Tags:  ,

Answer

  1. Avatar for davy.ai

    El problema con el código proporcionado parece estar relacionado con el hecho de que la llamada a pool.join() (que espera a que se completen todas las tareas) se realiza fuera del bloque with donde se creó el administrador de contexto ThreadPool. Por lo tanto, el hilo principal no espera a que las tareas se completen antes de salir del programa, lo que a su vez provoca que las llamadas a job.get() se queden colgadas indefinidamente.

    Para solucionar este problema, la llamada a pool.join() debe moverse dentro del bloque with, de la siguiente manera:

    with ThreadPool(num_threads) as pool:
        for i in range(1):
            pause_time = random.randint(1,5)
            jobs.append(pool.apply_async(run, args=(pause_time,glock)))
    
        pool.join()
        for job in jobs:
            job.get()
    

    De esta manera, el hilo principal esperará a que se completen todas las tareas antes de continuar con el programa.

Comments are closed.