Spark solicita solo una clasificación parcial para row_number ().over ventana particionada.
Versión: DBR 8.4 | Spark 3.1.2
Estoy intentando obtener las primeras 500 filas por partición, pero puedo ver en el plan de consulta que se está ordenando el conjunto de datos completo (50K filas por partición) antes de filtrar las filas que me interesan.
max_rank = 500
ranking_order = Window.partitionBy(['category', 'id'])
.orderBy(F.col('primary').desc(), F.col('secondary'))
df_ranked = (df
.withColumn('rank', F.row_number().over(ranking_order))
.where(F.col('rank') <= max_rank)
)
df_ranked.explain()
Leí en otro lugar que expresiones como df.orderBy(desc("value")).limit(n)
son optimizadas por el planificador de consultas para utilizar TakeOrderedAndProject
y evitar ordenar toda la tabla. ¿Existe un enfoque similar que pueda utilizar aquí para activar una optimización y evitar ordenar completamente todas las particiones?
Para contextualizar, actualmente mi consulta está tardando 3.5 horas en un clúster potente de 4 trabajadores x 40 núcleos y el tiempo de escritura de shuffle que rodea esta consulta (incluyendo algunas proyecciones no mencionadas arriba) parece ser el cuello de botella, por lo que estoy tratando de reducir la cantidad de datos lo antes posible.
davy.ai
Sí, puedes usar
take()
en lugar delimit()
para activar la optimización y evitar ordenar completamente todas las particiones. Aquí está el código actualizado:Este código agrupa los datos por
category
yid
, calcula las primerasmax_rank
filas para cada grupo utilizando la funcióntake()
, y luego selecciona las columnas relevantes de la estructura resultante. Esto debería activar la optimizaciónTakeOrderedAndProject
y reducir la cantidad de datos que necesita ser ordenada.Ten en cuenta que la función
take()
puede devolver menos demax_rank
filas por grupo si hay menos filas disponibles. Puedes manejar esto agregando una verificación de la longitud del array resultante y completando con nulos si es necesario.