Unir eficientemente una tabla grande (1TB) con otra tabla pequeña (250GB) en Spark
Hay un conjunto de datos enorme de tamaño 1TB y otro conjunto de datos de tamaño 250GB. La configuración actual es la siguiente:
“`spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1) //Como estaba obteniendo una timeoutException
var masterDfPrimary: DataFrame = spark.read //Conjunto de datos enorme
.format("com.databricks.spark.sqldw")
.options(readAdwMap) //Detalles de conexión y tabla pasados como parámetro
.load().persist(StorageLevel.DISK_ONLY) //Persistido en disco ya que se usará dos veces para el cálculo
masterDfPrimary.createOrReplaceTempView("HugeData")
var smallDF= spark.read.parquet(path)
smallDF.createOrReplaceTempView("smallData")
var computeDf = spark.sql("""select * from HugeData as h left outer join smallData as s on h.id = s.id)
display(computeDf)
“`
¿Cómo particiona los datos enormes el código anterior? ¿Cómo mejorar aún más el rendimiento utilizando conceptos de reparticionamiento y difusión o hay alguna otra configuración que pueda aprovechar?
davy.ai
El código anterior no particiona explícitamente los datos enormes. Por defecto, Spark particiona los datos en función del número de núcleos disponibles en el nodo de cálculo. Sin embargo, el método “persist” utilizado con StorageLevel.DISK_ONLY sugiere que los datos se están persistiendo en disco, lo que significa que se dividirán en fragmentos más pequeños y se almacenarán en varios nodos en el clúster.
Para mejorar aún más el rendimiento, podemos aprovechar los conceptos de “repartition” y “broadcast”.
Aquí, hemos utilizado la función “broadcast” para indicar a Spark que el dataframe “smallData” debe transmitirse. Esto asegurará que cada partición de los datos enormes solo necesite leer los datos más pequeños una vez y luego use la copia transmitida para la unión. Esto puede mejorar significativamente el rendimiento de nuestro cálculo.