es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Spark scala agrupa en un array y concaténalo.

Tengo un conjunto de datos con una serie de columnas que se ve así: (columnas: nombre, marca de tiempo, plataforma, conteo de clics, id)

Joy  2021-10-10T10:27:16  apple      5   1
May  2020-12-12T22:28:08  android    6   2
June 2021-09-15T20:20:06  Microsoft  9   3
Joy  2021-09-09T09:30:09  android    10  1
May  2021-08-08T05:05:05  apple      8   2

Quiero agrupar por id y después se verá así:

Joy  2021-10-10T10:27:16,2021-09-09T09:30:09   apple,android         5,10   1
May  2020-12-12T22:28:08,2021-08-08T05:05:05   android,apple         6,8    2
June 2021-09-15T20:20:06                       Microsoft             9      3

Después de llamar a otra API que convierte el id a seudónimo, quiero mapear ese id y que se vea así:

Joy  2021-10-10T10:27:16,2021-09-09T09:30:09   apple,android         5,10   1   A12
May  2020-12-12T22:28:08,2021-08-08T05:05:05   android,apple         6,8    2   B23
June 2021-09-15T20:20:06                       Microsoft             9      3   C34

He intentado usar groupBy y forEach, pero estoy atascado y no puedo continuar.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para obtener el resultado deseado, puedes realizar los siguientes pasos:

    1. Agrupa el conjunto de datos por la columna “id” usando la función “groupBy”.
    2. Utiliza la función “agg” para agregar las columnas en función de la columna “id” agrupada.
    3. Define la lógica de agregación para cada columna utilizando las funciones “concat_ws” y “collect_list”.
    4. Une el conjunto de datos resultante con el conjunto de datos de pseudo id utilizando la función “join”.
    5. Mapea la columna “id” al pseudo id correspondiente utilizando la función “withColumn”.

    Aquí tienes el fragmento de código que puedes utilizar:

    // Carga el conjunto de datos en un DataFrame
    val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("/ruta/al/conjunto_de_datos.csv")
    
    // Define la lógica de agregación utilizando la función "agg"
    val agregado = df.groupBy("id")
      .agg(
        concat_ws(",", collect_list("timestamp")).as("timestamp"),
        concat_ws(",", collect_list("platform")).as("platform"),
        concat_ws(",", collect_list("clickcount")).as("clickcount")
      )
    
    // Carga el conjunto de datos de pseudo id en un DataFrame
    val pseudoIds = Seq(
      (1, "A12"),
      (2, "B23"),
      (3, "C34")
    ).toDF("id", "pseudo_id")
    
    // Une el conjunto de datos resultante con el conjunto de datos de pseudo id utilizando la función "join"
    val resultado = agregado.join(pseudoIds, Seq("id"))
    
    // Mapea la columna "id" al pseudo id correspondiente utilizando la función "withColumn"
    val resultadoFinal = resultado.withColumn("id", $"pseudo_id").drop("pseudo_id")
    
    // Muestra el resultado final
    resultadoFinal.show()
    

    ¡Espero que esto te ayude!

Comments are closed.