es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Comenzando un bucle infinito desde una función asíncrona.

Estoy intentando consultar periódicamente una API. La estructura de mi código es la siguiente:

async def QueryData():
print(datetime.datetime.now())

async def main():
await TestApiConnection()

scheduler = AsyncIOScheduler(timezone="Europe/Berlin")
scheduler.add_job(QueryData, 'cron', minute='0-59')
scheduler.start()
try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass
    scheduler.shutdown(wait=False)

if name == "main":
asyncio.run(main())

Se produce el siguiente error si ejecuto esto:

en runforever
assert self.
selfreadingfuture is None AssertionError

Si convierto la función principal en una función síncrona, el trabajo QueryData comienza, pero el resultado de TestApiConnection ya no se espera.

async def QueryData():
print(datetime.datetime.now())

def main():
TestApiConnection()

if name == "main":
main()

¿Cómo inicio el trabajo desde el método principal asíncrono? ¿O debería reestructurar el código?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    El mensaje de error sugiere que el objeto future utilizado por el ciclo de eventos para realizar operaciones de E / S no está en el estado correcto. Esto puede ser causado por un uso incorrecto de coroutines o llamadas de bloqueo dentro del contexto asíncrono.

    Para iniciar el trabajo periódico desde la función principal asíncrona, se puede utilizar el método create_task proporcionado por el ciclo de eventos para programar la coroutine como una tarea. Esto se puede hacer de la siguiente manera:

    async def main():
        await TestApiConnection()
    
        scheduler = AsyncIOScheduler(timezone="Europe/Berlin")
        scheduler.add_job(QueryData, 'cron', minute='0-59')
        scheduler.start()
    
        # programar el trabajo como una tarea
        asyncio.create_task(scheduler._run_jobs())
    
        try:
            # esperar indefinidamente el ciclo de eventos
            await asyncio.Event().wait()
        finally:
            # apagar el planificador en la salida
            scheduler.shutdown(wait=False)
    

    Esto programa el método _run_jobs del planificador como una tarea, que iniciará el trabajo periódico utilizando el horario cron especificado. Luego se espera indefinidamente el ciclo de eventos utilizando un objeto asyncio.Event, y el planificador se apaga en la salida utilizando el bloque finally.

    Alternativamente, se podría reestructurar el código para utilizar una función principal síncrona y esperar la llamada TestApiConnection utilizando asyncio.to_thread de la siguiente manera:

    import asyncio
    import datetime
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    
    async def QueryData():
        print(datetime.datetime.now())
    
    async def TestApiConnection():
        # código de conexión API aquí
        await asyncio.sleep(1) # retraso de conexión simulado
    
    def main():
        asyncio.run(_main())
    
    async def _main():
        await asyncio.to_thread(TestApiConnection)
    
        scheduler = AsyncIOScheduler(timezone="Europe/Berlin")
        scheduler.add_job(QueryData, 'cron', minute='0-59')
        scheduler.start()
        try:
            asyncio.get_event_loop().run_forever()
        except (KeyboardInterrupt, SystemExit):
            pass
        scheduler.shutdown(wait=False)
    
    if __name__ == "__main__":
        main()
    

    Esto utiliza asyncio.to_thread para llamar a la función de bloqueo TestApiConnection en un hilo separado, permitiendo que el ciclo de eventos continúe ejecutándose mientras se establece la conexión. El resto del código puede estructurarse de manera similar al primer ejemplo.

    Vale la pena señalar que ambas soluciones requieren algún tipo de llamada de bloqueo o retraso dentro de la función QueryData para evitar que ocupe todo el ciclo de eventos. En el ejemplo anterior, se utiliza una llamada a asyncio.sleep para simular alguna forma de trabajo realizado.

Comments are closed.