Pytest con tareas de Celery funciona pero muestra el mensaje de “ERROR en el consumidor: Se recibió una tarea no registrada de tipo X”.
Tengo Pytest funcionando para probar las tareas de Celery basado en esta pregunta y respuesta de Stack Overflow: Fixture de prueba de Celery en Pytest (celery-worker y celery-app) no funciona.
conftest.py
import pytest
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": REDIS_URL,
"result_backend": REDIS_URL,
}
Las pruebas pasan con la configuración siguiente:
import pytest
@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
class TestMyCeleryTask:
def test_run_task(self) -> None:
...
Todas las pruebas pasan. Sin embargo, sin importar el orden en el que importe la aplicación de Celery y/o mis tareas, siempre obtengo la siguiente salida:
ERROR en el consumidor: Se recibió una tarea no registrada del tipo 'my_celery_task'. El mensaje ha sido ignorado y descartado. ¿Recuerdas importar el módulo que contiene esta tarea? ¿O tal vez estás utilizando importaciones relativas? Por favor, consulta http://docs.celeryq.org/en/latest/internals/protocol.html para más información.
Ten en cuenta que estoy utilizando el enfoque de clase basada en la tarea antigua en lugar de utilizar el decorador para convertir las funciones en clases.
davy.ai
El problema es que la aplicación celery no puede encontrar la tarea llamada ‘my_celery_task’ que se está llamando en la prueba. Este problema se puede resolver asegurándose de que el módulo que contiene la tarea se haya importado antes de llamarla.
Una forma de hacer esto es importando el módulo en el archivo conftest.py mismo. Esto se puede hacer agregando la siguiente línea al principio del archivo:
Reemplaza
my_module_containing_task
con el nombre real del módulo que contiene la tarea.Alternativamente, también puedes importar el módulo en el archivo de prueba antes de llamar a la tarea. Esto se puede hacer agregando la siguiente línea al principio del archivo de prueba:
Nuevamente, reemplaza
my_module_containing_task
con el nombre real del módulo que contiene la tarea.Asegurándote de que el módulo se haya importado antes de llamar a la tarea, la aplicación celery debería poder encontrar la tarea y ejecutarla correctamente.