es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

nifi: el procesador “consumekafka” no preserva el orden – los registros obtenidos sin ningún orden.

Tengo una configuración simple para las pruebas, y durante las pruebas de rendimiento, me di cuenta de que NiFi lee los registros de Kafka de manera desordenada, no mantiene el orden. Tengo el procesador consumekafka_20 conectado a logmessage, como se muestra en la siguiente imagen:

enter image description here

Ambos procesadores están configurados para usar solo una tarea concurrente, pero el mensaje de registro muestra mensajes:

2021-12-30 17:10:44,612 INFO nifi-0 [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.LogMessage LogMessage[id=0c0931ad-017e-1000-ebfe-53193c54a8b8] caa6dcba-8d44-44be-b7c8-e9ea95481a1c - - se leyó un registro de kafka - 71b72dcb-8e31-488c-9fa4-bbeda6494014 - marca de tiempo de kafka: 1640884241030 - kafkadebug - desplazamiento: 10057, partición: 8
2021-12-30 17:10:47,132 INFO nifi-0 [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.LogMessage LogMessage[id=0c0931ad-017e-1000-ebfe-53193c54a8b8] 90c3cc6f-1142-4aa7-953e-8fa8810877c2 - - se leyó un registro de kafka - 71b72dcb-8e31-488c-9fa4-bbeda6494014 - marca de tiempo de kafka: 1640884239426 - kafkadebug - desplazamiento: 9985, partición: 8

Como se puede ver, el segundo desplazamiento (9985) está mucho antes del primero (10057). ¿Alguien sabe por qué ocurre esto? La configuración de ConsumeKafka_2.0 se ve de la siguiente manera:

enter image description here

enter image description here

Estuve experimentando con la configuración de Max poll records, pero esto no soluciona el problema.

Tags:  ,

Answer

  1. Avatar for davy.ai

    El problema de Nifi no preservar el orden de los registros mientras consume desde Kafka usando el procesador ConsumeKafka_2.0 puede deberse a varios factores, como el número de particiones en el tema de Kafka, la configuración del procesador ConsumeKafka_2.0 y los recursos de hardware del sistema.

    Una posible razón para el consumo desordenado de registros es la paralelización del procesador, ya que Nifi puede consumir registros de diferentes particiones de Kafka en paralelo, lo que resulta en la pérdida del orden original. Sin embargo, como ambos procesadores en la configuración dada están configurados para usar solo una tarea concurrente, esto puede no ser el problema.

    Otra razón podría ser la configuración del procesador ConsumeKafka_2.0. En la configuración dada, el procesador está configurado para usar la configuración predeterminada, como el uso de Kafka Auto Commit y el tamaño de lote predeterminado. Experimentar con cambiar la configuración Max Poll Records puede no solucionar este problema, ya que limita la cantidad de registros que el procesador puede recuperar en un ciclo de consulta, pero no garantiza el orden de los registros recuperados.

    Para solucionar este problema, puedes intentar usar una implementación personalizada del consumidor de Kafka que use el método poll() para recuperar registros de Kafka en lugar del método subscribe(), ya que el primero puede preservar el orden de los registros dentro de una partición. Además, puedes intentar configurar la propiedad enable.auto.commit del procesador en false y manejar los desplazamientos de confirmación manualmente, utilizando un procesador ControlRate para controlar la velocidad de confirmación.

    En general, se recomienda realizar pruebas exhaustivas y ajustar la configuración del procesador ConsumeKafka_2.0 y los recursos de hardware para garantizar la correcta preservación del orden de los registros durante el consumo desde Kafka.

Comments are closed.