es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo utilizar correctamente concurrent.futures con asyncio. La biblioteca concurrent.futures proporciona una forma sencilla de ejecutar código en paralelo en Python. Sin embargo, si estamos trabajando con asyncio, es importante usar concurrent.futures de manera adecuada para garantizar la integración perfecta con asyncio. Aquí tienes algunos pasos para utilizar concurrent.futures correctamente con asyncio: 1. Importar las bibliotecas necesarias: “`python import asyncio from concurrent import futures “` 2. Crear un bucle de eventos asyncio: “`python loop = asyncio.get_event_loop() “` 3. Definir una función que contenga la lógica de tu tarea asíncrona: “`python async def my_async_task(): # Lógica de tu tarea asíncrona “` 4. Crear un executor de concurrent.futures: “`python executor = futures.ThreadPoolExecutor() “` 5. Ejecutar tu tarea asíncrona utilizando el executor: “`python await loop.run_in_executor(executor, my_async_task) “` 6. Cerrar el executor al finalizar: “`python executor.shutdown(wait=True) “` Estos pasos te permitirán utilizar concurrent.futures en conjunción con asyncio de manera efectiva. Recuerda ajustar la lógica de tu tarea asíncrona a tus necesidades específicas. ¡A programar en paralelo con concurrent.futures y asyncio!

Estoy prototipando una aplicación FastAPI con un punto final que lanzará un proceso de larga duración utilizando el módulo subprocess. La solución obvia es usar concurrent.futures y ProcessPoolExecutor, sin embargo, no puedo obtener el comportamiento que deseo. Muestra de código:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random

pool = ProcessPoolExecutor(5)

def long_task(s):
    print("iniciado")
    time.sleep(random.randrange(5, 15))
    sb.check_output(["touch", str(s)])
    print("hecho")

async def async_task():
    loop = asyncio.get_event_loop()
    print("iniciado")
    tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
    while True:
        print("en la tarea asíncrona")
        done, _ = await asyncio.wait(tasks, timeout=1)
        for task in done:
            await task
        await asyncio.sleep(1)

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_task())

if __name__ == "main":
    main()

Esta muestra funciona bien en la superficie, pero los procesos generados no se detienen después de que la ejecución se completa, veo todos los procesos de python en “ps aux | grep python”. ¿No debería detenerse el proceso completado esperando? Al final, no me importa mucho el resultado de la ejecución, solo debe ocurrir en segundo plano y salir de forma limpia, sin ningún proceso colgando.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Hay algunas cosas importantes a tener en cuenta al usar concurrent.futures y ProcessPoolExecutor para procesos de larga duración en un endpoint de FastAPI:

    1. Asegúrate de cerrar el ejecutor cuando hayas terminado de usarlo. Esto se puede hacer llamando a pool.shutdown(wait=True) al final de la función o dentro de un gestor de contexto.
    2. Es importante destacar que loop.run_in_executor() devuelve un objeto Future que representa la ejecución de la función en el ejecutor. En el ejemplo anterior, estás creando una lista de tareas que no se actualiza cuando las tareas se completan. Esto significa que estás esperando a que se completen el primer conjunto de tareas, y luego esperando al mismo conjunto de tareas nuevamente en la siguiente iteración del bucle while.

    Para solucionar esto, debes crear una nueva tarea en cada iteración del bucle while usando loop.run_in_executor(). Puedes hacer un seguimiento de las tareas en una lista y eliminarlas cuando se completen. Aquí tienes una versión actualizada de la función async_task():

    async def async_task():
        loop = asyncio.get_event_loop()
        print("started")
        tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
        while tasks:
            print("in async task")
            done, tasks = await asyncio.wait(tasks, timeout=1, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                task.result()
    

    Esta versión de la función espera a que se complete el primer conjunto de tareas, elimina esas tareas de la lista y crea nuevas tareas para los elementos restantes en la lista. El argumento return_when de asyncio.wait() especifica que la función debe devolver cuando al menos una tarea se haya completado.

    Con estos cambios, los subprocesos deberían ser limpiados después de que terminen de ejecutarse.

Comments are closed.