es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

PySpark: Cómo llenar los valores NaN con un array en streaming?

Lo que estoy haciendo es llenar los valores nulos dentro de un dataframe con una matriz en un entorno de transmisión. Mi situación es la siguiente:
enter image description here

Necesito llenar el valor nulo con la matriz en la última fila, la que tiene el id = -1. Lo que he intentado hacer es la siguiente operación:

concat.withColumn('object_values2',
  F.when(concat['object_values'].isNull(), 
         array([lit(x) for x in concat.where(col("object_values").isNotNull()).rdd.collect()[0]['object_values']]))
)\
  .otherwise(concat['object_values'])).show()

¡Eso no funciona en la transmisión! Cuando uso este código en la transmisión, obtengo un mensaje de error “Las consultas con fuentes de transmisión deben ejecutarse con writeStream.start()”.

Mi pregunta es: ¿Cómo puedo evitar la operación “collect” en la transmisión para resolver mi problema?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para evitar la operación de collect en streaming, puedes utilizar una función de ventana para obtener la última fila con id = -1 y luego utilizar esa fila para completar los valores nulos en el dataframe. Aquí tienes un código de ejemplo:

    from pyspark.sql.functions import last, when, array, lit
    
    # Define la especificación de la ventana
    from pyspark.sql.window import Window
    windowSpec = Window.orderBy('timestamp').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    # Obtiene la última fila con id=-1
    last_row = concat.filter(concat.id == -1).select(last(concat.object_values).over(windowSpec).alias('last_object')).limit(1).collect()
    
    # Utiliza la última fila para completar los valores nulos en object_values
    concat = concat.withColumn('object_values2',
                  when(concat.object_values.isNull(),
                       array([lit(x) for x in last_row[0]['last_object']]))
                  .otherwise(concat.object_values))
    
    concat.show()
    

    En este código, definimos una especificación de ventana que ordena las filas por timestamp e incluye todas las filas hasta la fila actual. Luego utilizamos la función last y la aplicamos sobre la ventana para obtener la última fila con id=-1. Limitamos el resultado a solo una fila, ya que en un entorno de streaming debería haber solo una fila con id=-1.

    Luego utilizamos una instrucción when para completar los valores nulos en object_values con el array de la última fila, y utilizamos otherwise para mantener los valores originales en object_values.

    Ten en cuenta que para que este código funcione en un entorno de streaming, debes usarlo dentro de una consulta writeStream en lugar de llamar directamente a show().

Comments are closed.