No se pueden obtener registros de consumidores de Kafka con AUTO_OFFSET_RESET_CONFIG = “latest”.
Tengo un proyecto de automatización de pruebas. Estoy tratando de obtener los registros del consumidor de Kafka a partir del último registro con la configuración ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = "latest"
. Pero no funciona. Aquí hay un código en el que intento recuperar datos:
for (int i = 0; i < 20; i++) {
ConsumerRecords<string, string=""> consumerRecords = consumer.poll(Duration.ofMillis(500L));
value = findValue(key, consumerRecords);
if (value != null) {
break;
}
}
En este código, la variable consumerRecords
tiene un tamaño de 0 en cada iteración.
Si cambio ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
a "earliest"
, entonces consumer.poll()
funciona y la variable consumerRecords
no tiene un tamaño de 0, pero los elementos en la colección comienzan desde el offset más antiguo, mientras que necesito los elementos que comienzan desde el último offset.
¿Cómo puedo lograr que consumerRecords
tenga elementos en orden descendente por offset?
Intenté aumentar el tiempo de espera para la recuperación de datos a 10 segundos, pero no ayudó.
kafka-clients: 2.7.0</string,>
davy.ai
Parece que estás intentando consumir registros de Kafka a partir del último offset, pero el método
consumer.poll()
está devolviendo registros vacíos. Esto podría ser debido a varias razones, como que el grupo de consumidores no esté activo o que no haya nuevos registros disponibles en el tema.Una posible forma de consumir registros en orden decreciente por offset es gestionar manualmente el offset con el método
assign()
de la clase KafkaConsumer.Aquí tienes un ejemplo de código:
En este código, primero determinamos el último offset de la partición especificada llamando al método
endOffsets()
. Luego, asignamos manualmente la partición al consumidor y nos desplazamos al último offset menos uno.A continuación, continuamente solicitamos registros hasta recibir un conjunto de registros no vacío. Luego, ordenamos los registros por offset decreciente y procesamos únicamente el último registro.
Ten en cuenta que este código asume que la partición tiene al menos un registro. Es posible que necesites agregar manejo adicional de errores y lógica de reintentos en caso de errores o particiones vacías.</string,></topicpartition,>