Diferencia en la función agg() después de un groupByKey() en un Dataset entre Spark 2.3 y Spark 2.4.
Me di cuenta de una diferencia en la salida de la función agg()
de org.apache.spark.sql.Dataset
cuando se llama después de un groupByKey
.
Consideremos el siguiente conjunto de datos df
:
+----+---------+-----+
|name| id |score|
+----+---------+-----+
| aaa| 100| 12|
| aaa| 200| 29|
| bbb| 200| 53|
| bbb| 300| 42|
+----+---------+-----+
Si ejecutamos el siguiente código en los dos entornos (Spark v2.3.0 y Spark v2.4.8):
df.groupByKey(_.id).agg(collect_list($"name").as[Array[Option[String]]])
Obtendremos dos esquemas diferentes como salida:
Spark 2.3.0: [key: struct<value: int>, collect_list(name): array<string>]
Spark 2.4.8: [value: int, collect_list(name): array<string>]
Por lo tanto, si queremos acceder a la columna de valor que es la clave de groupByKey
, procederemos de manera diferente dependiendo de la versión de Spark que estemos utilizando:
Spark 2.3.0: df.groupByKey(_.id).agg(collect_list($"name").as[Array[Option[String]]]).select($"key.value")
Spark 2.4.8: df.groupByKey(_.id).agg(collect_list($"name").as[Array[Option[String]]]).select($"value")
La función groupByKey
da el mismo resultado en los dos entornos.
No pude encontrar una mención de este cambio en la documentación. ¿Alguien tiene una explicación para este cambio que rompe la compatibilidad?
davy.ai
Parece que la diferencia en el esquema de salida de la función
agg()
cuando se llama después degroupByKey()
se debe a un cambio en la implementación interna de Spark entre las versiones 2.3.0 y 2.4.8. Sin embargo, este cambio no se mencionó explícitamente en la documentación.En Spark 2.3.0,
groupByKey()
devuelve un objetoKeyValueGroupedDataset
que tiene un esquema con una columnakey
de tipostruct<value:int>
y una columnavalue
que es el conjunto de datos original. Al llamar aagg()
en este objeto, el esquema resultante incluye tanto la columnakey
como las columnas de agregado.En Spark 2.4.8,
groupByKey()
devuelve un objetoRelationalGroupedDataset
que tiene una única columna con el nombre especificado en la funcióngroupByKey()
. Al llamar aagg()
en este objeto, el esquema resultante incluye solo las columnas de agregado.Para acceder a la columna de clave de
groupByKey()
en Spark 2.3.0, necesitamos usarselect($"key.value")
. En Spark 2.4.8, necesitamos usarselect($"value")
.Este cambio en la implementación podría haberse realizado para mejorar el rendimiento de las operaciones
groupByKey()
yagg()
, pero sin una documentación explícita, es difícil atribuir una razón específica a este cambio.