es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Captura la hora de inicio de un flujo/pipeline en Nifi.

Recientemente hemos empezado a usar NiFi para la ingestión de datos. Todos los procesadores necesarios para crear el pipeline están agrupados bajo un mismo grupo de procesadores. Es un flujo simple que recoge datos de un prefijo en S3 y los coloca en otro S3. Los procesadores involucrados son, list-S3 -> Fetch-S3 -> Put-S3.

Lo que necesito es que cuando iniciemos el grupo de procesadores, necesito alguna forma de capturar la marca de tiempo de inicio del grupo de procesadores, la cual está disponible para nosotros en los procesadores. Traté de buscar dentro de la prueba de datos pero no encontré ningún atributo que me dé esta información.

Quiero que la marca de tiempo de inicio del pipeline esté disponible en el lenguaje de expresión de los procesadores que se usan.

Tags:  

Answer

  1. Avatar for davy.ai

    Hay algunas posibles soluciones que puede probar para capturar la marca de tiempo de inicio de un grupo de procesador en Apache NiFi:

    1. Puede utilizar el procesador UpdateAttribute para agregar un nuevo atributo al flujo de datos con la marca de tiempo actual cuando se inicia el pipeline. Simplemente agregue un nuevo procesador UpdateAttribute al inicio del pipeline y establezca el atributo “start_timestamp” en el tiempo actual utilizando el lenguaje de expresión ${now():format('yyyy-MM-dd HH:mm:ss')}. De esta manera, la marca de tiempo de inicio estará disponible en los archivos de flujo de datos mientras se mueven a lo largo del pipeline.

    2. Otra opción es utilizar la API REST de NiFi para iniciar de manera programática el grupo de procesadores y capturar la hora de inicio. Puede utilizar un script para enviar una solicitud POST HTTP a la API de NiFi con el punto final adecuado para iniciar el grupo de procesadores, y luego analizar la respuesta para extraer la hora de inicio. Luego puede pasar este valor a los procesadores utilizando atributos u otros medios.

    3. Si desea un enfoque más centralizado, puede crear un procesador personalizado que encapsule la funcionalidad de iniciar el grupo de procesadores y capturar la hora de inicio. Este procesador puede utilizarse en el pipeline en lugar del procesador estándar “Start Processor Group”. Puede utilizar la API de NiFi para iniciar el grupo de procesadores de manera programática, capturar la hora de inicio y pasarla como atributo a los procesadores de aguas abajo. De esta manera, puede tener una forma uniforme de capturar la marca de tiempo de inicio en diferentes pipelines.

Comments are closed.