es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo obtener todos los mensajes actuales presentes en un tema de Kafka a través de una aplicación de Spring Boot.

Requisito: Filtrar datos del tema de Kafka en función de un ID.
Necesito crear una API y, en función de la entrada proporcionada, debo filtrar los datos de los mensajes presentes en el tema de Kafka en ese momento.

Ejemplo: A continuación se muestran los registros presentes en el tema:

{"id": "1", "name": "ABC", "dept": "Computer"}
{"id": "1", "name": "FGH", "dept": "Mechanical"}

Por lo tanto, si accedo a una API “/getAllMessages”, debería obtener todos los registros presentes en el tema.

PD. – No estamos guardando los mensajes de Kafka en ninguna base de datos.

¿Alguien puede ayudarme a resolverlo, por favor?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para filtrar los datos de un tema de Kafka en base a un ID y crear una API, puedes utilizar la API de Consumidor de Kafka junto con la biblioteca Spring Kafka. Aquí están los pasos para lograr esto:

    1. Configurar el Consumidor de Kafka: Crea una instancia de Consumidor de Kafka con las propiedades de configuración adecuadas. Suscríbete al tema que contiene los datos que deseas filtrar.
    Properties props = new Properties();
    props.put("bootstrap.servers", "tu-broker-de-kafka");
    props.put("group.id", "tu-id-de-grupo");
    
    KafkaConsumer<string, string=""> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("nombre-del-tema"));
    
    1. Leer los datos del Consumidor: Lee los datos del consumidor en un bucle hasta que se consuman todos los mensajes.
    List<string> mensajesFiltrados = new ArrayList<>(); // para almacenar los mensajes filtrados
    
    while (true) {
        ConsumerRecords<string, string=""> registros = consumer.poll(Duration.ofMillis(100));
    
        for (ConsumerRecord<string, string=""> registro : registros) {
            // Verifica si el campo de ID del registro entrante coincide con el valor de ID proporcionado
            if (registro.value().contains("\"id\":\"tu-id\"")) {
                mensajesFiltrados.add(registro.value());
            }
        }
    }
    
    1. Crear el punto de acceso de la API: Crea un punto de acceso de API (utilizando el Framework Spring) que acepte un valor de ID como entrada y devuelva los mensajes filtrados.
    @RestController
    public class KafkaController {
    
        @Autowired
        private KafkaConsumer<string, string=""> consumidorKafka;
    
        @RequestMapping(value = "/obtenerTodosLosMensajes", method = RequestMethod.GET)
        public List<string> obtenerMensajes(@RequestParam("id") String id) {
            List<string> mensajesFiltrados = new ArrayList<>();
    
            while (true) {
                ConsumerRecords<string, string=""> registros = consumidorKafka.poll(Duration.ofMillis(100));
    
                for (ConsumerRecord<string, string=""> registro : registros) {
                    // Verifica si el campo de ID del registro entrante coincide con el valor de ID proporcionado
                    if (registro.value().contains("\"id\":\"" + id + "\"")) {
                        mensajesFiltrados.add(registro.value());
                    }
                }
            }
    
            return mensajesFiltrados;
        }
    }
    
    1. Ejecutar la aplicación: Ejecuta la aplicación y accede al punto de acceso de la API con el valor de ID deseado para obtener los mensajes filtrados del tema de Kafka.

    http://localhost:8080/obtenerTodosLosMensajes?id=tu-id</string,></string,></string,></string,></string,></string,>

Comments are closed.