es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Enviar fila de Flink a Kafka

Tengo un objeto Row de Flink y nombres de columnas. Este objeto Row puede ser accedido mediante nombres de campo o índice. Quiero enviarlo a Kafka en formato JSON utilizando el productor de Kafka vanilla de Flink. ¿Cómo puedo hacerlo? ¿Se requiere un esquema JSON de destino para enviarlo a Kafka?

Tags:  ,

Answer

  1. Avatar for davy.ai

    Para hundir un Row de Flink en formato JSON en Kafka utilizando el productor de Kafka vanilla de Flink, puedes seguir estos pasos:

    1. En primer lugar, agrega el conector de Kafka de Flink a las dependencias de tu proyecto. Puedes hacer esto agregando la siguiente dependencia a tu archivo pom.xml:
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-connector-kafka_2.11</artifactid>
        <version>${flink.version}</version>
    </dependency>
    
    1. Crea un objeto Properties para configurar el productor de Kafka. Por ejemplo:
    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProps.setProperty("acks", "all");
    
    1. Crea un objeto FlinkKafkaProducer para escribir los datos en el tema de Kafka. Especifica el nombre del tema, un SerializationSchema, y el objeto Properties. Por ejemplo:
    FlinkKafkaProducer<row> kafkaProducer = new FlinkKafkaProducer<>(
        "my-topic", 
        new JsonRowSerializationSchema(fields), 
        kafkaProps, 
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    );
    

    Aquí, fields es un arreglo de String que contiene los nombres de las columnas en el Row.

    1. Define un SerializationSchema para convertir el Row en una cadena JSON. Por ejemplo:
    public class JsonRowSerializationSchema implements SerializationSchema<row> {
    
        private final String[] fieldNames;
        private final ObjectMapper mapper = new ObjectMapper();
    
        public JsonRowSerializationSchema(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }
    
        @Override
        public byte[] serialize(Row row) {
            try {
                ObjectNode objectNode = mapper.createObjectNode();
                for (int i = 0; i < row.getArity(); i++) {
                    objectNode.put(fieldNames[i], row.getField(i).toString());
                }
                return mapper.writeValueAsBytes(objectNode);
            } catch (Exception e) {
                throw new RuntimeException("Error serializing row", e);
            }
        }
    }
    
    1. Por último, agrega una SinkFunction para escribir los datos en el productor de Kafka. Por ejemplo:
    dataStream.addSink(kafkaProducer);
    

    En cuanto al esquema JSON de destino, depende de los requisitos de los consumidores de Kafka que leerán los datos. Si los consumidores esperan un esquema específico, deberás ajustar tu lógica de serialización en consecuencia. Si no es así, puedes utilizar un esquema genérico que incluya todos los campos en el Row.

Comments are closed.