es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Programación de llamadas a funciones periódicas en Quart/asyncio

Necesito programar una llamada de función periódica en python (es decir, llamada cada minuto), sin bloquear el bucle de eventos (estoy usando el marco Quart con asyncio). Básicamente, necesito enviar tareas al bucle de eventos, con un temporizador, para que el servidor web siga sirviendo las solicitudes entrantes en el mientras tanto y aproximadamente cada minuto llame a mi función.

He intentado de muchas maneras, por ejemplo:
“`
def do_work():
print("WORK", flush=True)

async def schedule():
await asyncio.sleep(0)
print("scheduling")
loop = asyncio.get<em>running</em>loop()
t = loop.call< em >later(2, dowork)
print("scheduled")

asyncio.run(schedule())
“`

Pero nunca se ejecuta (como el código anterior) o bloquea el bucle principal del servidor web. Por ejemplo, con el código anterior esperaría (ya que se realiza dentro de asyncio.run y schedule espera el temporizador) que “scheduling” se imprimiría después (o durante) de la configuración del servidor, pero ese no es el caso, se bloquea.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para programar una llamada de función periódica en Python utilizando Quart y asyncio, sin bloquear el bucle de eventos, puedes usar el método asyncio.create_task() para enviar la tarea al bucle de eventos y asyncio.sleep() para crear un temporizador para la llamada de función periódica. Aquí está un ejemplo de código:

    async def trabajo_periodico():
        while True:
            print("TRABAJO", flush=True)
            await asyncio.sleep(60)  # llamar la función cada 60 segundos
    
    async def iniciar_trabajo_periodico():
        tarea = asyncio.create_task(trabajo_periodico())
        await tarea
    
    async def principal():
        print("Iniciando el servidor web...")
        # iniciar el servidor web
        app.run()
        # enviar la tarea periódica al bucle de eventos
        asyncio.create_task(iniciar_trabajo_periodico())
    
    asyncio.run(principal())
    

    En este ejemplo, la función trabajo_periodico() imprime “TRABAJO” cada 60 segundos, usando el método asyncio.sleep() para crear el temporizador. iniciar_trabajo_periodico() usa asyncio.create_task() para enviar la llamada de función periódica al bucle de eventos sin bloquear el servidor de atender solicitudes entrantes. Finalmente, principal() ejecuta el servidor web y crea una nueva tarea para iniciar el trabajo periódico, utilizando de nuevo asyncio.create_task().

    Nota: Este código supone que app se refiere a una instancia de la aplicación Quart.

Comments are closed.