es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Apache Nifi: Enriquecimiento de datos de fila a columnar

Estoy utilizando Apache Nifi como mi procesador ETL. Pero después de buscar en tantos artículos, me di cuenta de que no hay un procesador JOIN disponible de forma predeterminada para tablas SQL (o flowfile).

Tengo los datos normalizados en el siguiente formato. Pero quiero volverlos desnormalizados.

student_id name
1 Jake
2 Sean
student_id subject marks
1 SUB01 30
1 SUB02 40
1 SUB03 50
2 SUB01 60
2 SUB02 70
2 SUB03 80

La salida final esperada es la siguiente:

student_id name sub01 sub02 sub03 total
1 Jake 30 40 50 120
2 Sean 60 70 80 210

Esto ha sido bastante difícil de hacer con el servicio de búsqueda ya que tengo que unir con 2 claves, lo cual no es posible.

¿Alguien ha resuelto esto?

PD: Las tablas son transaccionales, los datos siguen aumentando cada minuto.

Tags:  

Answer

  1. Avatar for davy.ai

    Dado que no hay un procesador JOIN listo para usar en Apache Nifi para tablas SQL o archivos de flujo, puedes abordar esto utilizando una combinación de procesadores:

    1. Utiliza un procesador QueryDatabaseTable para obtener los datos normalizados de la tabla “student”.
    2. Utiliza otro procesador QueryDatabaseTable para obtener los datos de las notas de la tabla “subject”.
    3. A continuación, utiliza un procesador MergeContent para combinar las dos corrientes de datos, utilizando “student_id” como estrategia de fusión.
    4. Después de eso, puedes utilizar un procesador RouteOnAttribute para dirigir los datos según “student_id”.
    5. Utiliza un procesador UpdateAttribute para establecer los nombres de columna como “sub01”, “sub02”, “sub03”, etc. según sus respectivos nombres de asignatura.
    6. A continuación, utiliza un procesador EvaluateJsonPath para extraer las respectivas columnas de la carga útil JSON y crear un nuevo atributo que incluya la columna “total” mediante los cálculos requeridos, es decir, “sub01 + sub02 + sub03”.
    7. Por último, utiliza un procesador PutDatabaseRecord para escribir la salida final en la base de datos de destino.

    Sin embargo, ten en cuenta que dado que las tablas son transaccionales, los datos seguirán aumentando con el tiempo. Por tanto, es esencial gestionar este proceso con técnicas adecuadas de almacenamiento en caché y amortiguación para evitar problemas de rendimiento.

Comments are closed.