es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Unir eficientemente una tabla grande (1TB) con otra tabla pequeña (250GB) en Spark

Hay un conjunto de datos enorme de tamaño 1TB y otro conjunto de datos de tamaño 250GB. La configuración actual es la siguiente:

“`spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) //Como estaba obteniendo una timeoutException

var masterDfPrimary: DataFrame = spark.read //Conjunto de datos enorme
.format("com.databricks.spark.sqldw")
.options(readAdwMap) //Detalles de conexión y tabla pasados como parámetro
.load().persist(StorageLevel.DISK_ONLY) //Persistido en disco ya que se usará dos veces para el cálculo

masterDfPrimary.createOrReplaceTempView("HugeData")

var smallDF= spark.read.parquet(path)
smallDF.createOrReplaceTempView("smallData")

var computeDf = spark.sql("""select * from HugeData as h left outer join smallData as s on h.id = s.id)

display(computeDf)
“`

¿Cómo particiona los datos enormes el código anterior? ¿Cómo mejorar aún más el rendimiento utilizando conceptos de reparticionamiento y difusión o hay alguna otra configuración que pueda aprovechar?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El código anterior no particiona explícitamente los datos enormes. Por defecto, Spark particiona los datos en función del número de núcleos disponibles en el nodo de cálculo. Sin embargo, el método “persist” utilizado con StorageLevel.DISK_ONLY sugiere que los datos se están persistiendo en disco, lo que significa que se dividirán en fragmentos más pequeños y se almacenarán en varios nodos en el clúster.

    Para mejorar aún más el rendimiento, podemos aprovechar los conceptos de “repartition” y “broadcast”.

    1. Repartition: Podemos particionar explícitamente los datos enormes en particiones más pequeñas utilizando el método “repartition”. Esto nos permite controlar el número de particiones, lo que a su vez puede mejorar la paralelización de nuestro cálculo. Por ejemplo, podemos particionar los datos en 1000 particiones de la siguiente manera:
    var masterDfPrimary: DataFrame = spark.read // Conjunto de datos enorme
          .format("com.databricks.spark.sqldw")
          .options(readAdwMap)
          .load()
          .repartition(1000) // Repartición en 1000 particiones
          .persist(StorageLevel.DISK_ONLY)
    
    masterDfPrimary.createOrReplaceTempView("HugeData")
    
    1. Broadcast: También podemos aprovechar el concepto de “broadcast” para optimizar la unión entre los conjuntos de datos enormes y pequeños. Por defecto, Spark utiliza un valor umbral para decidir cuándo realizar la transmisión en el conjunto de datos más pequeño. Sin embargo, en este caso, ya hemos desactivado el umbral de transmisión automática utilizando “spark.sql.autoBroadcastJoinThreshold = -1”. Por lo tanto, podemos transmitir explícitamente el dataframe “smallData” de la siguiente manera:
    import org.apache.spark.sql.functions.broadcast
    
    var computeDf = spark.sql("""
                   SELECT * 
                   FROM HugeData as h 
                   LEFT OUTER JOIN (
                       SELECT /*+ BROADCAST(smallData) */ *
                       FROM smallData
                   ) as s on h.id = s.id
                   """)
    
    display(computeDf)
    

    Aquí, hemos utilizado la función “broadcast” para indicar a Spark que el dataframe “smallData” debe transmitirse. Esto asegurará que cada partición de los datos enormes solo necesite leer los datos más pequeños una vez y luego use la copia transmitida para la unión. Esto puede mejorar significativamente el rendimiento de nuestro cálculo.

Comments are closed.