es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

NullpointException al leer un archivo con RowCsvInputFormat en Flink.

Soy principiante en Flink streaming.

Cuando leo un archivo con RowCsvInputFormat, el código que el serializador Kryo crea para Row no funciona correctamente.

El código es el siguiente.

val readLocalCsvFile = new RowCsvInputFormat(
  new Path("flink-test/000000_1"),
  Array(Types.STRING, Types.STRING, Types.STRING),
  "\n",
  ","
)
val read = env.readFile(
  readLocalCsvFile,
  "flink-test/000000_1",
  FileProcessingMode.PROCESS_CONTINUOUSLY,
  1000000
)

read.print()
env.execute("test")

El contenido del archivo 000000_1 es el siguiente.

aa,bb,cc

aaa,bbb,ccc

Como resultado de la depuración, obtengo los valores divididos de aa, bb y cc correctamente. Pero cuando coloco esos valores en los campos de Row uno por uno, se produce una excepción de punto nulo porque los campos son nulos.

La siguiente imagen muestra que los campos de Row son nulos.

Imagen

El código que crea un Row cuando se ejecuta el código anterior es el siguiente. KryoSerializer genera la fila.

val kryo = new EmptyFlinkScalaKryoInstantiator().newKryo
val Row = kryo.newInstance(classOf[Row])

El error de salida es el siguiente.

java.lang.NullPointerException
at org.apache.flink.types.Row.setField(Row.java:140)
at org.apache.flink.api.java.io.RowCsvInputFormat.fillRecord(RowCsvInputFormat.java:162)
at org.apache.flink.api.java.io.RowCsvInputFormat.fillRecord(RowCsvInputFormat.java:33)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:113)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:551)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:80)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:387)
at
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema radica en la forma en que se crea y se poblada el objeto Row. El serializador Kryo crea una instancia de Row, pero sus campos no se establecen correctamente al leer el archivo CSV.

    Una posible solución podría ser crear manualmente una instancia de Row y establecer sus campos con los valores del CSV, en lugar de confiar en el serializador Kryo para hacerlo. Aquí tienes un ejemplo de cómo se podría hacer:

    read.map(new MapFunction[String, Row]() {
      override def map(value: String): Row = {
        val split = value.split(",")
        val row = new Row(3)
        row.setField(0, split(0))
        row.setField(1, split(1))
        row.setField(2, split(2))
        row
      }
    }).print()
    

    Este código reemplaza la llamada a read.print() y mapea manualmente cada línea leída en un objeto Row con los campos adecuados establecidos. La llamada new Row(3) crea un nuevo objeto Row con 3 campos, y luego cada campo se establece con el valor correspondiente del CSV. Finalmente, se imprime la transmisión resultante de Rows.

    Este enfoque debería funcionar independientemente del tipo de formato de entrada de archivo utilizado, ya que crea y popula directamente el objeto Row en lugar de confiar en la implementación del formato.

Comments are closed.