es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tareas periódicas y no periódicas con Django + Telegram + Celery.

Estoy construyendo un proyecto basado en Django y una de mis intenciones es tener un bot de Telegram que reciba información desde un grupo de Telegram. Pude implementar el bot para enviar mensajes en Telegram sin problemas.

En este momento, tengo un par de tareas de Celery que se están ejecutando con Beat y también la web de Django, están desacopladas. Todo bien aquí.

He visto que python-telegram-bot está ejecutando una función en uno de los ejemplos (https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/echobot.py) que está esperando ocioso para recibir datos de Telegram. Ahora, todas mis tareas en Celery son periódicas y son llamadas cada 10 o 60 minutos por Beat. ¿Cómo puedo ejecutar esta tarea no periódica con Celery en mi configuración? Me refiero a no periódica porque entendí que esperará por contenido hasta que sea interrumpida manualmente.

  • Django~=3.2.6
  • celery~=5.1.2

CELERY_BEAT_SCHEDULE = {
‘task_1’: {
‘task’: ‘apps.envc.tasks.Fetch1’,
‘schedule’: 600.0,
},
‘task_2’: {
‘task’: ‘apps.envc.tasks.Fetch2’,
‘schedule’: crontab(minute=’*/60′),
},
‘task_3’: {
‘task’: ‘apps.envc.tasks.Analyze’,
‘schedule’: 600,
},
}

En mi archivo tasks.py, tengo una tarea como esta:

@celery_app.task(name=’apps.envc.tasks.TelegramBot’)
def TelegramBot():
status = start_bot()
return status

Y, como implementación de start_bot, simplemente copié el ejemplo de echobot.py y agregué mi TOKEN allí (por supuesto, también están allí las funciones para diferentes comandos del ejemplo).

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para ejecutar una tarea no periódica con Celery, puedes utilizar el método delay() de Celery. En lugar de programar la tarea con CELERY_BEAT_SCHEDULE, puedes llamarla cuando sea necesario importando la función de la tarea y utilizando el método delay() para ejecutarla. Por ejemplo, en este caso, puedes importar la función de la tarea TelegramBot() en tu archivo views.py de Django o cualquier otro módulo relevante, y luego llamarla utilizando TelegramBot.delay(). Esto añadirá la tarea a la cola de tareas de Celery y la ejecutará de manera asíncrona.

    Aquí tienes un ejemplo:

    from apps.envc.tasks import TelegramBot
    
    # llamar a la tarea cuando sea necesario
    TelegramBot.delay()
    

    De esta manera, la función no se ejecutará periódicamente, sino sólo cuando la llames explícitamente.

    Ten en cuenta que delay() devuelve un objeto de tipo AsyncResult que se puede utilizar para comprobar el estado de la tarea o conseguir su resultado.

Comments are closed.