es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Analizando mensajes Avro en Flink, generando una excepción de puntero nulo si el campo es nullable en el esquema de Avro.

Necesito analizar los mensajes de Kafka de Confluent almacenados en Avro. Pero al aplicar el filtro me da una excepción de puntero nulo, sin filtro pude escribir de vuelta en Kafka, pero al aplicar el filtro me da una excepción de puntero nulo.

 public static void main(String[] args) throws Exception {

    Properties config = new Properties();
    config.setProperty("bootstrap.servers", "localhost:9092");
    config.setProperty("group.id","topic");
    String schemaRegistryUrl  = "http://localhost:8091";
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<topic1> streamIn = env
            .addSource(
                    new FlinkKafkaConsumer<>(
                            "topic1",
                            ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl),
                            config
                    ).setStartFromEarliest());

    //Pregunta aquí: quiero devolver solo las filas que necesiten calificar con lo siguiente, ya que me está dando una excepción de puntero nulo

    DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() {
        public boolean filter(topic1 user) throws Exception {
            return user.get("col3").toString().equals("New");
        }
    })).print();        
    //Además, avísame si hay alguna mejor manera de hacerlo, ya que para mí esto es solo el comienzo...)
}

Aquí está el esquema:
json
{
"namespace": "testclass",
"type": "record",
"name": "topic1",
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"default": null,
"name": "col3",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "col4",
"type": [
"null",
"string"
]
}
]
}

Tags:  , ,

Answer

  1. Avatar for davy.ai

    La excepción de puntero nulo en el código proporcionado puede ocurrir porque algunos de los objetos topic1 recuperados del flujo de Kafka pueden tener valores nulos para el campo “col3”. Por lo tanto, llamar al método toString() en un objeto nulo provocaría una excepción de puntero nulo.

    Para evitar esta excepción de puntero nulo, podemos modificar el cierre de filtro para comprobar si el campo “col3” no es nulo antes de llamar al método toString(). A continuación, se proporciona un fragmento de código mejorado para filtrar el flujo de Kafka con valores no nulos para el campo “col3”:

    DataStream<topic1> filteredStream = streamIn.filter(new FilterFunction<topic1>() {
                public boolean filter(topic1 record) throws Exception {
                    return (record.getCol3() != null) && (record.getCol3().equals("New"));
                }
            });
    
    DataStreamSink fltrtsrm_so = filteredStream.print();
    

    El cierre anterior solo devolvería registros que tengan valores no nulos para el campo “col3” y que tengan “New” como su valor. Tenga en cuenta que reemplazamos record.get("col3").toString().equals("New") con (record.getCol3() != null) && (record.getCol3().equals("New")), que es una forma más segura de comprobar valores no nulos y evitar excepciones de puntero nulo.

    Puede haber otras formas alternativas de lograr el mismo resultado, pero la solución anterior debería funcionar dada la estructura y el código proporcionados.

Comments are closed.