es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo ejecutar un pipeline solo una vez sin importar cuántos blobs se creen? (Azure Data Factory)

He creado un Pipeline que se ejecuta mediante un disparador cada vez que se crea un blob, el problema es que hay escenarios en los que el proceso necesita subir varios archivos al mismo tiempo, cuando esto sucede, el Pipeline se ejecuta tantas veces como el número de blobs y esto provoca que los datos sean incorrectos. Intenté configurar una Actividad de Copia de Datos en el Pipeline principal para copiar cada blob creado, pero como este pipeline está dentro del primero, también se ejecuta muchas veces.

El resultado esperado sería que el pipeline se ejecute solo una vez (independientemente de cuántos blobs se creen) y la actividad de copia de datos pueda copiar todos los blobs en la carpeta.

Aquí está mi configuración del Pipeline y del disparador:

Enlace a la imagen

La configuración de Copia de Datos a continuación:

Enlace a la imagen

He estado intentándolo durante meses, y aquí está otro intento que hice:

Microsoft Answers

¿Podría decirme qué estoy haciendo mal?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema es que el desencadenador se dispara por cada archivo cargado, lo que provoca que el flujo de trabajo se ejecute varias veces y dé lugar a datos incorrectos. Una solución podría ser modificar el desencadenador para que se dispare solo una vez después de haber cargado todos los archivos. Esto se puede lograr configurando el tamaño de lote en “1” en la configuración del desencadenador, lo cual esperará a que se carguen todos los archivos antes de activar el flujo de trabajo.

    Otra solución podría ser utilizar una actividad de bucle (ForEach) en el flujo de trabajo para iterar a través de todos los archivos en la carpeta y copiarlos utilizando la actividad de copia de datos (Copy Data). De esta manera, el flujo de trabajo se ejecutará solo una vez y copiará todos los archivos de la carpeta.

    Aquí tienes un ejemplo de configuración de la actividad de bucle (ForEach):

    {
        "name": "ForEach Blob",
        "type": "ForEach",
        "dependsOn": [],
        "activities": [
            {
                "name": "Copy Data",
                "type": "Copy",
                "inputs": [
                    {
                        "referenceName": "Blob Input",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "Blob Output",
                        "type": "DatasetReference"
                    }
                ],
                "typeProperties": {
                    "source": {
                        "path": "@item().name",
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "copyBehavior": "MergeFiles"
                    }
                }
            }
        ],
        "typeProperties": {
            "items": {
                "value": "@triggerBody().folderPath",
                "type": "Expression"
            },
            "batchCount": 1,
            "parallelism": 1
        }
    }
    

    Observa el uso de la expresión “@triggerBody().folderPath” para obtener la ruta de la carpeta del desencadenador, y el comportamiento de copia “MergeFiles” en el destino para combinar todos los archivos en un único archivo de salida.

    Espero que esto te ayude a resolver el problema y lograr el resultado esperado.

Comments are closed.