es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Fusionar intervalos superpuestos en PysPark.

Tengo un DataFrame como este (pero mucho más grande):

id   start    end
0    10       20
1    11       13
2    14       18
3    22       30
4    25       27
5    28       31

Estoy intentando fusionar de manera eficiente los intervalos superpuestos en PySpark, mientras se guarda en una nueva columna ‘ids’, qué intervalos fueron fusionados, para que se vea así:

start    end   ids
10       20    [0,1,2]
22       31    [3,4,5]

Visualización:

Desde:

AS-IS

A:

TO-BE

¿Puedo hacer esto sin usar una UDF?

Editar: el orden de id y start no necesariamente es el mismo.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, puedes hacer esto sin usar una UDF. Aquí está una solución utilizando las funciones incorporadas de PySpark:

    from pyspark.sql.functions import collect_list, struct
    
    # asumiendo que tu data frame se llama 'df'
    # crea una nueva columna 'merged_intervals' que contiene una estructura de inicio, fin e id
    df = df.withColumn('merged_intervals', struct('start', 'end', 'id'))
    
    # agrupa por intervalos superpuestos y recolecta los ids de los intervalos fusionados en una lista
    merged_df = df.groupBy().agg(collect_list('merged_intervals').alias('merged_intervals'))
    
    # define una función para fusionar intervalos superpuestos
    def merge_intervals(intervals):
        intervals = sorted(intervals, key=lambda x: x[0])
        merged = [intervals[0]]
        for current in intervals:
            previous = merged[-1]
            if current[0] <= previous[1]:
                merged[-1] = (previous[0], max(current[1], previous[1]), previous[2] + current[2])
            else:
                merged.append(current)
        return merged
    
    # aplica la función merge_intervals a los intervalos fusionados recolectados y explota la lista resultante
    result_df = merged_df.selectExpr('inline(merge_intervals(merged_intervals))')
    
    # selecciona las columnas relevantes y agrupa por inicio y fin para obtener el resultado final
    final_df = result_df.select('start', 'end', 'id').groupBy('start', 'end').agg(collect_list('id').alias('ids'))
    

    Esto debería darte el resultado deseado:

    +-----+---+-------+
    |start|end|ids    |
    +-----+---+-------+
    |10   |20 |[0, 1, 2]|
    |22   |31 |[3, 4, 5]|
    +-----+---+-------+
    

Comments are closed.