es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

org.apache.spark.sql.AnalysisException: Las ventanas no basadas en el tiempo no son compatibles en DataFrames/Datasets de streaming;; a pesar de la ventana basada en el tiempo.

Estoy haciendo la ordenación basada en ventanas para el procesamiento estructurado de Spark:

val filterWindow: WindowSpec = Window
  .partitionBy("key")
  .orderBy($"time")

controlDataFrame = controlDataFrame.withColumn("Make Coffee", $"value").    
  withColumn("datetime", date_trunc("second", current_timestamp())).
  withColumn("time", current_timestamp()).
  withColumn("temp_rank", rank().over(filterWindow))
  .filter(col("temp_rank") === 1)
  .drop("temp_rank").
  withColumn("digitalTwinId", lit(digitalTwinId)).
  withWatermark("datetime", "10 seconds")

Estoy obteniendo time como current_timestamp() y en el esquema veo que su tipo es StructField(time,TimestampType,true)

¿Por qué Spark 3.0 no me permite hacer la operación de ventana basada en él con la siguiente excepción, si el campo es claramente de tiempo?

21/11/22 10:34:03 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.

org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [rank(time#163) windowspecdefinition(key#150, time#163 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS temp_rank#171], [key#150], [time#163 ASC NULLS FIRST]
+- Project [key#150, value#151, Make Coffee#154, datetime#158, time#163]
Tags:  , ,

Answer

  1. Avatar for davy.ai

    La excepción indica que las ventanas que no son basadas en el tiempo no son compatibles con DataFrames/Datasets en streaming. En este caso, la operación de ventana se está realizando en la columna time, la cual es de tipo TimestampType. Sin embargo, parece que la ventana se está definiendo como no basada en el tiempo, lo que genera la excepción.

    Para resolver el problema, la definición de la ventana debe ser modificada para que esté basada en el tiempo. Una forma de lograr esto es utilizando las funciones rangeBetween o rowsBetween en la definición de la ventana filterWindow. Por ejemplo, para crear una ventana basada en el tiempo de 10 segundos, se puede utilizar la función rangeBetween de la siguiente manera:

    val filterWindow: WindowSpec = Window
      .partitionBy("key")
      .orderBy($"time")
      .rangeBetween(current_timestamp() - expr("interval 10 seconds"), current_timestamp())
    

    Esto definirá la ventana para que incluya todas las filas que se encuentren en los últimos 10 segundos a partir del timestamp actual. Con esta modificación, la operación de ventana debería ser válida para DataFrames/Datasets en streaming.

Comments are closed.