PySpark: Cómo llenar los valores NaN con un array en streaming?
Lo que estoy haciendo es llenar los valores nulos dentro de un dataframe con una matriz en un entorno de transmisión. Mi situación es la siguiente:
Necesito llenar el valor nulo con la matriz en la última fila, la que tiene el id = -1. Lo que he intentado hacer es la siguiente operación:
concat.withColumn('object_values2',
F.when(concat['object_values'].isNull(),
array([lit(x) for x in concat.where(col("object_values").isNotNull()).rdd.collect()[0]['object_values']]))
)\
.otherwise(concat['object_values'])).show()
¡Eso no funciona en la transmisión! Cuando uso este código en la transmisión, obtengo un mensaje de error “Las consultas con fuentes de transmisión deben ejecutarse con writeStream.start()”.
Mi pregunta es: ¿Cómo puedo evitar la operación “collect” en la transmisión para resolver mi problema?
davy.ai
Para evitar la operación de collect en streaming, puedes utilizar una función de ventana para obtener la última fila con id = -1 y luego utilizar esa fila para completar los valores nulos en el dataframe. Aquí tienes un código de ejemplo:
En este código, definimos una especificación de ventana que ordena las filas por timestamp e incluye todas las filas hasta la fila actual. Luego utilizamos la función
last
y la aplicamos sobre la ventana para obtener la última fila con id=-1. Limitamos el resultado a solo una fila, ya que en un entorno de streaming debería haber solo una fila con id=-1.Luego utilizamos una instrucción
when
para completar los valores nulos enobject_values
con el array de la última fila, y utilizamosotherwise
para mantener los valores originales enobject_values
.Ten en cuenta que para que este código funcione en un entorno de streaming, debes usarlo dentro de una consulta
writeStream
en lugar de llamar directamente ashow()
.