es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Spark solicita solo una clasificación parcial para row_number ().over ventana particionada.

Versión: DBR 8.4 | Spark 3.1.2

Estoy intentando obtener las primeras 500 filas por partición, pero puedo ver en el plan de consulta que se está ordenando el conjunto de datos completo (50K filas por partición) antes de filtrar las filas que me interesan.

  max_rank = 500
  ranking_order = Window.partitionBy(['category', 'id'])
    .orderBy(F.col('primary').desc(), F.col('secondary'))
  df_ranked = (df
    .withColumn('rank', F.row_number().over(ranking_order))
    .where(F.col('rank') <= max_rank)
  )
  df_ranked.explain()

Leí en otro lugar que expresiones como df.orderBy(desc("value")).limit(n) son optimizadas por el planificador de consultas para utilizar TakeOrderedAndProject y evitar ordenar toda la tabla. ¿Existe un enfoque similar que pueda utilizar aquí para activar una optimización y evitar ordenar completamente todas las particiones?

Para contextualizar, actualmente mi consulta está tardando 3.5 horas en un clúster potente de 4 trabajadores x 40 núcleos y el tiempo de escritura de shuffle que rodea esta consulta (incluyendo algunas proyecciones no mencionadas arriba) parece ser el cuello de botella, por lo que estoy tratando de reducir la cantidad de datos lo antes posible.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Sí, puedes usar take() en lugar de limit() para activar la optimización y evitar ordenar completamente todas las particiones. Aquí está el código actualizado:

      max_rank = 500
      ranking_order = Window.partitionBy(['category', 'id'])
        .orderBy(F.col('primary').desc(), F.col('secondary'))
    
      df_ranked = (df
        .withColumn('rank', F.row_number().over(ranking_order))
        .groupBy(['category', 'id'])
        .agg(F.expr('take({}, struct(rank, primary, secondary, other_cols)) as top_rows'.format(max_rank)))
        .select([F.col(col).alias(col.split('.')[-1]) for col in df_ranked.schema.fieldNames()])
      )
      df_ranked.explain()
    

    Este código agrupa los datos por category y id, calcula las primeras max_rank filas para cada grupo utilizando la función take(), y luego selecciona las columnas relevantes de la estructura resultante. Esto debería activar la optimización TakeOrderedAndProject y reducir la cantidad de datos que necesita ser ordenada.

    Ten en cuenta que la función take() puede devolver menos de max_rank filas por grupo si hay menos filas disponibles. Puedes manejar esto agregando una verificación de la longitud del array resultante y completando con nulos si es necesario.

Comments are closed.