nifi: el procesador “consumekafka” no preserva el orden – los registros obtenidos sin ningún orden.
Tengo una configuración simple para las pruebas, y durante las pruebas de rendimiento, me di cuenta de que NiFi lee los registros de Kafka de manera desordenada, no mantiene el orden. Tengo el procesador consumekafka_20 conectado a logmessage, como se muestra en la siguiente imagen:
Ambos procesadores están configurados para usar solo una tarea concurrente, pero el mensaje de registro muestra mensajes:
2021-12-30 17:10:44,612 INFO nifi-0 [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.LogMessage LogMessage[id=0c0931ad-017e-1000-ebfe-53193c54a8b8] caa6dcba-8d44-44be-b7c8-e9ea95481a1c - - se leyó un registro de kafka - 71b72dcb-8e31-488c-9fa4-bbeda6494014 - marca de tiempo de kafka: 1640884241030 - kafkadebug - desplazamiento: 10057, partición: 8
2021-12-30 17:10:47,132 INFO nifi-0 [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.LogMessage LogMessage[id=0c0931ad-017e-1000-ebfe-53193c54a8b8] 90c3cc6f-1142-4aa7-953e-8fa8810877c2 - - se leyó un registro de kafka - 71b72dcb-8e31-488c-9fa4-bbeda6494014 - marca de tiempo de kafka: 1640884239426 - kafkadebug - desplazamiento: 9985, partición: 8
Como se puede ver, el segundo desplazamiento (9985) está mucho antes del primero (10057). ¿Alguien sabe por qué ocurre esto? La configuración de ConsumeKafka_2.0
se ve de la siguiente manera:
Estuve experimentando con la configuración de Max poll records
, pero esto no soluciona el problema.
davy.ai
El problema de Nifi no preservar el orden de los registros mientras consume desde Kafka usando el procesador
ConsumeKafka_2.0
puede deberse a varios factores, como el número de particiones en el tema de Kafka, la configuración del procesadorConsumeKafka_2.0
y los recursos de hardware del sistema.Una posible razón para el consumo desordenado de registros es la paralelización del procesador, ya que Nifi puede consumir registros de diferentes particiones de Kafka en paralelo, lo que resulta en la pérdida del orden original. Sin embargo, como ambos procesadores en la configuración dada están configurados para usar solo una tarea concurrente, esto puede no ser el problema.
Otra razón podría ser la configuración del procesador
ConsumeKafka_2.0
. En la configuración dada, el procesador está configurado para usar la configuración predeterminada, como el uso de Kafka Auto Commit y el tamaño de lote predeterminado. Experimentar con cambiar la configuraciónMax Poll Records
puede no solucionar este problema, ya que limita la cantidad de registros que el procesador puede recuperar en un ciclo de consulta, pero no garantiza el orden de los registros recuperados.Para solucionar este problema, puedes intentar usar una implementación personalizada del consumidor de Kafka que use el método
poll()
para recuperar registros de Kafka en lugar del métodosubscribe()
, ya que el primero puede preservar el orden de los registros dentro de una partición. Además, puedes intentar configurar la propiedadenable.auto.commit
del procesador enfalse
y manejar los desplazamientos de confirmación manualmente, utilizando un procesadorControlRate
para controlar la velocidad de confirmación.En general, se recomienda realizar pruebas exhaustivas y ajustar la configuración del procesador
ConsumeKafka_2.0
y los recursos de hardware para garantizar la correcta preservación del orden de los registros durante el consumo desde Kafka.