es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Pytest con tareas de Celery funciona pero muestra el mensaje de “ERROR en el consumidor: Se recibió una tarea no registrada de tipo X”.

Tengo Pytest funcionando para probar las tareas de Celery basado en esta pregunta y respuesta de Stack Overflow: Fixture de prueba de Celery en Pytest (celery-worker y celery-app) no funciona.

conftest.py

import pytest

@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": REDIS_URL,
        "result_backend": REDIS_URL,
    }

Las pruebas pasan con la configuración siguiente:

import pytest

@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
class TestMyCeleryTask:

    def test_run_task(self) -> None:
        ...

Todas las pruebas pasan. Sin embargo, sin importar el orden en el que importe la aplicación de Celery y/o mis tareas, siempre obtengo la siguiente salida:

ERROR en el consumidor: Se recibió una tarea no registrada del tipo 'my_celery_task'.
El mensaje ha sido ignorado y descartado.

¿Recuerdas importar el módulo que contiene esta tarea?
¿O tal vez estás utilizando importaciones relativas?

Por favor, consulta
http://docs.celeryq.org/en/latest/internals/protocol.html
para más información.

Ten en cuenta que estoy utilizando el enfoque de clase basada en la tarea antigua en lugar de utilizar el decorador para convertir las funciones en clases.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    El problema es que la aplicación celery no puede encontrar la tarea llamada ‘my_celery_task’ que se está llamando en la prueba. Este problema se puede resolver asegurándose de que el módulo que contiene la tarea se haya importado antes de llamarla.

    Una forma de hacer esto es importando el módulo en el archivo conftest.py mismo. Esto se puede hacer agregando la siguiente línea al principio del archivo:

    import my_module_containing_task
    

    Reemplaza my_module_containing_task con el nombre real del módulo que contiene la tarea.

    Alternativamente, también puedes importar el módulo en el archivo de prueba antes de llamar a la tarea. Esto se puede hacer agregando la siguiente línea al principio del archivo de prueba:

    import my_module_containing_task
    

    Nuevamente, reemplaza my_module_containing_task con el nombre real del módulo que contiene la tarea.

    Asegurándote de que el módulo se haya importado antes de llamar a la tarea, la aplicación celery debería poder encontrar la tarea y ejecutarla correctamente.

Comments are closed.