es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Por qué no veo tareas más pequeñas para mi solicitud de reparticionamiento?

Tengo un conjunto de datos que quiero repartir de manera equitativa en 10 cubetas por valor único de una columna, y quiero ajustar el tamaño de este resultado en un gran número de particiones para que cada una sea pequeña.

Se garantiza que col_1 será uno de los valores en ["CREATE", "UPDATE", "DELETE"]

Mi código se ve así:

df.show()
"""
+------+-----+-----+
| col<em>1|col</em>2|index|
+------+-----+-----+
|CREATE|    0|    0|
|CREATE|    0|    1|
|UPDATE|    0|    2|
|UPDATE|    0|    3|
|DELETE|    0|    4|
|DELETE|    0|    5|
|CREATE|    0|    6|
|CREATE|    0|    7|
|CREATE|    0|    8|
+------+-----+-----+
"""
df = df.withColumn(
  "columna<em>partición",
  F.concat(
    F.col("col</em>1"),
    F.round( # Elegir un número aleatorio entre 0 y 9
      F.random() * F.lit(10),
      0
    )
  )
)

df = df.repartition(1000, F.col("columna_partición"))

Veo que la mayoría de mis tareas se ejecutan y terminan sin ninguna fila de datos, ¿esperaría que los datos estén distribuidos de manera equitativa en mi columna_partición en 1000 particiones?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Aunque tu enfoque parece correcto a primera vista, es posible que tu columna de partición no esté distribuida de manera uniforme, lo que provoca que la mayoría de tus tareas finalicen sin filas de datos.

    Para asegurarnos de que cada valor único de col_1 se particione de manera uniforme en 10 fragmentos, puedes utilizar la función pyspark.sql.functions.spark_partition_id() para calcular el ID de partición en función de tu columna de partición. Aquí tienes una versión modificada de tu código que utiliza este enfoque:

    from pyspark.sql.functions import concat, round, rand, spark_partition_id
    
    df = df.withColumn(
      "columna_de_particion",
      concat(
        df.col_1,
        round(rand() * 10)
      )
    )
    
    df = df.repartition(spark_partition_id(), "columna_de_particion")
    

    En este código, spark_partition_id() se utiliza como el primer argumento del método repartition() para distribuir los datos de manera uniforme en todas las particiones, y luego especificamos columna_de_particion como segundo argumento para asegurarnos de que los datos se particionen adicionalmente según nuestra lógica deseada.

    Al utilizar este enfoque, podemos garantizar que los datos se distribuyan de manera uniforme en todas las particiones y que cada valor único de col_1 se particione en 10 fragmentos.

    Esto debería reducir la cantidad de tareas que finalizan sin filas de datos y acelerar el procesamiento.

Comments are closed.