es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Es posible evitar un segundo intercambio cuando Spark une dos conjuntos de datos utilizando joinWith?

Para el siguiente fragmento de código:

case class SomeRow(key: String, value: String)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val ds1 = Seq(SomeRow("A", "1")).toDS().repartition(col("key"))
val ds2 = Seq(SomeRow("A", "1"), SomeRow("B","2")).toDS().repartition(col("key"))

val dataSetJoined = ds1.joinWith(ds2, ds1("key")===ds2("key"), "left")
val dataFrameJoined = ds1.join(ds2, ds1("key")===ds2("key"), "left")
dataSetJoined.explain(true)
dataFrameJoined.explain(true)

Spark genera el siguiente plan para el dataSet:

== Physical Plan ==
SortMergeJoin [_1#132.key], [_2#133.key], LeftOuter
:- *(2) Sort [_1#132.key ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#132.key, 2)
:     +- *(1) Project [named_struct(key, key#122, value, value#123) AS _1#132]
:        +- Exchange hashpartitioning(key#122, 2)
:           +- LocalTableScan [key#122, value#123]
+- *(4) Sort [_2#133.key ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_2#133.key, 2)
      +- *(3) Project [named_struct(key, key#128, value, value#129) AS _2#133]
         +- Exchange hashpartitioning(key#128, 2)
            +- LocalTableScan [key#128, value#129]

y para el dataFrame:

== Physical Plan ==
SortMergeJoin [key#122], [key#128], LeftOuter
:- *(1) Sort [key#122 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(key#122, 2)
:     +- LocalTableScan [key#122, value#123]
+- *(2) Sort [key#128 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#128, 2)
      +- LocalTableScan [key#128, value#129]

¿Es posible evitar otro intercambio igual al unir dos dataSets usando joinWith?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    No, no es posible evitar otro intercambio igual al unir dos conjuntos de datos usando joinWith porque joinWith devuelve un conjunto de datos de tuplas, en lugar de un dataframe. La operación de intercambio es necesaria para asegurar que los datos estén correctamente particionados antes de realizar la operación de unión. Sin embargo, cabe mencionar que el optimizador Catalyst de Spark puede elegir optimizar el plan físico generado por la operación joinWith de manera diferente dependiendo de los detalles específicos de la consulta y los datos que se estén procesando.

Comments are closed.