es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

No se pueden obtener registros de consumidores de Kafka con AUTO_OFFSET_RESET_CONFIG = “latest”.

Tengo un proyecto de automatización de pruebas. Estoy tratando de obtener los registros del consumidor de Kafka a partir del último registro con la configuración ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = "latest". Pero no funciona. Aquí hay un código en el que intento recuperar datos:

for (int i = 0; i < 20; i++) {
    ConsumerRecords<string, string=""> consumerRecords = consumer.poll(Duration.ofMillis(500L));
    value = findValue(key, consumerRecords);
    if (value != null) {
        break;
    }
}

En este código, la variable consumerRecords tiene un tamaño de 0 en cada iteración.

Si cambio ConsumerConfig.AUTO_OFFSET_RESET_CONFIG a "earliest", entonces consumer.poll() funciona y la variable consumerRecords no tiene un tamaño de 0, pero los elementos en la colección comienzan desde el offset más antiguo, mientras que necesito los elementos que comienzan desde el último offset.

¿Cómo puedo lograr que consumerRecords tenga elementos en orden descendente por offset?

Intenté aumentar el tiempo de espera para la recuperación de datos a 10 segundos, pero no ayudó.

kafka-clients: 2.7.0</string,>

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Parece que estás intentando consumir registros de Kafka a partir del último offset, pero el método consumer.poll() está devolviendo registros vacíos. Esto podría ser debido a varias razones, como que el grupo de consumidores no esté activo o que no haya nuevos registros disponibles en el tema.

    Una posible forma de consumir registros en orden decreciente por offset es gestionar manualmente el offset con el método assign() de la clase KafkaConsumer.

    Aquí tienes un ejemplo de código:

    TopicPartition partition = new TopicPartition(topic, partitionNumber);
    Map<topicpartition, long=""> offset = new HashMap<>();
    offset.put(partition, consumer.endOffsets(Collections.singleton(partition)).get(partition) - 1);
    
    consumer.assign(Collections.singleton(partition));
    consumer.seek(partition, offset.get(partition));
    
    while (true) {
        ConsumerRecords<string, string=""> records = consumer.poll(Duration.ofMillis(500L));
        if (!records.isEmpty()) {
            records.records(partition).stream()
                .sorted((o1, o2) -> Long.compare(o2.offset(), o1.offset())) // ordenar por offset decreciente
                .forEach(record -> {
                    // procesar el registro
                });
            break; // solo consumir el último registro
        }
    }
    

    En este código, primero determinamos el último offset de la partición especificada llamando al método endOffsets(). Luego, asignamos manualmente la partición al consumidor y nos desplazamos al último offset menos uno.

    A continuación, continuamente solicitamos registros hasta recibir un conjunto de registros no vacío. Luego, ordenamos los registros por offset decreciente y procesamos únicamente el último registro.

    Ten en cuenta que este código asume que la partición tiene al menos un registro. Es posible que necesites agregar manejo adicional de errores y lógica de reintentos en caso de errores o particiones vacías.</string,></topicpartition,>

Comments are closed.