es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Persistiendo dataframes en loop para funciones de concatenación de grupo en Pyspark.

Estoy tratando de agregar un dataframe de Spark a un ID único, seleccionando el primer valor no nulo de esa columna para ese ID dado una columna de ordenación. Básicamente replicando la función group_concat de MySQL.

La publicación SO aquí https://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysqls-group-concat-aggregate-function fue muy útil para replicar el group_concat para una sola columna. Necesito hacer esto para una lista dinámica de columnas.

Prefiero no tener que copiar este código para cada columna (más de una docena, podría ser dinámico en el futuro), por lo que estoy tratando de implementarlo en un bucle (desaprobado en spark, lo sé!) Dado una lista de nombres de columna. El bucle se ejecuta correctamente pero las iteraciones anteriores no persisten incluso cuando el df intermedio se almacena en caché /persiste (re: https://stackoverflow.com/questions/65236290/cacheing-and-loops-in-pyspark).

Cualquier ayuda, sugerencia o una solución más elegante sin bucle sería apreciada (¡no tengo miedo de probar un poco de Scala si hay un enfoque de programación funcional más adecuado)!

Dado el siguiente df:

unique_id row_id first_name last_name middle_name score
1000000 1000002 Simmons Bonnie Darnell 88
1000000 1000006 Dowell Crawford Anne 87
1000000 1000007 NULL Eric Victor 89
1000000 1000000 Zachary Fields Narik 86
1000000 1000003 NULL NULL Warren 92
1000000 1000008 Paulette Ronald Irvin 85

group_column = “unique_id”
concat_list = [‘first_name’,’last_name’,’middle_name’]
sort_column = “score”
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\

df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias(‘collect_list’))\
.withColumn(“sorted_list”,col(“collect_list.”+str(i)))\
.withColumn(“first_item”,slice(col(“sorted_list”),1,1))\
.withColumn(i,concat_ws(“,”,col(“first_item”)))\
.drop(“collect_list”)\
.drop(“sorted_list”)\
.drop(“first_item”)

print(i)

df_final=df_final.join(df_helper,group_column,”inner”)
df_final.cache()

df_final.display() #estoy usando databricks

Mi resultado se ve así:

unique_id middle_name
1000000 Warren

Mi resultado deseado es:

unique_id first_name last_name middle_name
1000000 Simmons Eric Warren

Segundo conjunto de tablas si no se imprimen correctamente arriba: https://i.stack.imgur.com/OX845.png

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Pregunta

    ¿Cómo puedo agregar un DataFrame de Spark hasta un ID único, seleccionando el primer valor no nulo de esa columna para ese ID dado una columna de clasificación, y replicar la función group_concat de MySQL para una lista dinámica de columnas?

    Respuesta

    Para replicar la función group_concat de MySQL en PySpark para una lista dinámica de columnas, puede usar un bucle para ejecutar el mismo código en cada columna de la lista. El bucle filtraría el DataFrame para obtener valores distintos de la group_column, luego agruparía el DataFrame por la group_column y agregaría la columna seleccionada usando las funciones collect_list y sort_array. Luego, el bucle extraería el primer elemento de la lista ordenada de elementos y los concatenaría usando la función concat_ws. Finalmente, el bucle uniría los DataFrames filtrados y agregados para cada columna en un único DataFrame de salida.

    Sin embargo, para evitar problemas de rendimiento, no se recomienda usar un bucle en un DataFrame grande. En su lugar, puede usar las funciones pyspark.sql.functions.array, pyspark.sql.functions.coalesce, pyspark.sql.functions.concat_ws, pyspark.sql.functions.first y pyspark.sql.functions.struct para realizar la agregación en un solo paso.

    Aquí hay un ejemplo de cómo realizar esta agregación en el DataFrame dado:

    from pyspark.sql.functions import array, coalesce, concat_ws, first, struct
    
    # Definir el DataFrame de entrada
    df = spark.createDataFrame([
      (1000000, 1000002, "Simmons", "Bonnie", "Darnell", 88),
      (1000000, 1000006, "Dowell", "Crawford", "Anne", 87),
      (1000000, 1000007, None, "Eric", "Victor", 89),
      (1000000, 1000000, "Zachary", "Fields", "Narik", 86),
      (1000000, 1000003, None, None, "Warren", 92),
      (1000000, 1000008, "Paulette", "Ronald", "Irvin", 85),
    ], ["unique_id", "row_id", "first_name", "last_name", "middle_name", "score"])
    
    # Definir las columnas de agregación
    group_column = "unique_id"
    concat_list = ["first_name", "last_name", "middle_name"]
    sort_column = "score"
    
    # Agregar el DataFrame
    df_final = df.groupBy(group_column).agg(
      array(
        *[struct(sort_column, col).alias(col) for col in concat_list]
      ).alias("items")
    ).select(
      group_column,
      *[coalesce(first(col), "").alias(col) for col in concat_list]
    )
    
    # Salida del resultado
    df_final.show()
    

    Esto produce:

    +---------+----------+---------+-----------+
    |unique_id|first_name|last_name|middle_name|
    +---------+----------+---------+-----------+
    |  1000000|   Simmons|     Eric|     Warren|
    +---------+----------+---------+-----------+
    

    Así es como funciona el código:

    1. En la función array, usamos un bucle para aplicar la función struct a cada columna en concat_list. Esto crea una nueva columna que contiene una tupla de la forma (score, valor_de_columna) para cada fila del DataFrame.
    2. La columna resultante items es una matriz de tuplas que contiene todos los valores no nulos de las columnas seleccionadas para cada fila, ordenados por la columna de score.
    3. En la función select, nuevamente usamos un bucle para aplicar las funciones first y coalesce a cada columna en concat_list. Esto Extrae el primer valor no nulo de la matriz items ordenada para cada columna o establece el resultado en una cadena vacía si todos los valores son nulos.

    Tenga en cuenta que este enfoque es más eficiente que usar un bucle con cache o persist, ya que evita múltiples pasadas por el DataFrame y reduce la cantidad de datos intermedios que Spark necesita almacenar en memoria.

Comments are closed.