es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Aplicando una función personalizada en groupby en un conjunto de datos grande.

Tengo un marco de datos muy grande (millones de filas) y quiero aplicar una función personalizada en un agrupamiento.
El marco de datos está ordenado por “A” y también por una columna de fecha “C”. La función devuelve un nuevo vector para “C” que luego debería reemplazar la columna original “C”.

Esto es lo que estoy haciendo:

resultado = df.groupby(“A”, sort=False).apply(lambda x: myFunction(x[“B”], x[“C”], x[“D”])

La función en sí toma tres columnas como argumentos de entrada, realiza algunos cálculos dependientes del orden con if-else y devuelve “C” con valores posiblemente cambiados.

def myFunction(B, C, D):
df = pd.concat([B, C, D], axis=1).reset_index(drop=True)

... algunos cálculos if-else ...

return(df["C"])

Ahora, este cálculo tarda muchas horas, si no días, en completarse en todo el conjunto de datos.

Además, el resultado ahora es un marco de datos donde cada fila es el resultado de un grupo en el grupo por.
Sin embargo, quisiera que el resultado sea un vector continuo donde las columnas “C” recién devueltas reemplacen la columna “C” en el marco de datos original.

Estoy bastante seguro de que hay una forma mucho mejor de hacer esto, pero no puedo encontrarla.

Todo esto fue originalmente programado en R con dplyr y se ejecutaba mucho más rápido.

df <- df %>% group_by(A) %>%
mutate(C= myFunction(B,C,D))

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Solución propuesta

    Una solución posible para optimizar el proceso y reducir el tiempo de cálculo es dividir el dataframe grande en fragmentos más pequeños, aplicar la función a cada grupo y luego fusionar los resultados nuevamente. Esto se puede lograr utilizando la biblioteca dask, que puede paralelizar los cálculos y distribuirlos en varios núcleos o máquinas.

    Suponiendo que el dataframe es demasiado grande para caber en la memoria, podemos crear un dataframe dask a partir del dataframe original y especificar el tamaño de la partición para dividir los datos en fragmentos más pequeños:

    import dask.dataframe as dd
    
    ddf = dd.from_pandas(df, npartitions=<n_fragmentos>)
    

    A continuación, podemos definir una función retrasada que aplica la función personalizada a cada grupo del dataframe dask:

    from dask import delayed
    
    @delayed
    def procesar_grupo(df):
        return df.groupby("A", sort=False).apply(lambda x: miFuncion(x["B"], x["C"], x["D"]))
    
    

    Tenga en cuenta que se utiliza el decorador @delayed para convertir la función en un cálculo perezoso, que solo se evalúa cuando se necesitan los resultados.

    Luego podemos mapear la función retrasada en cada partición del dataframe dask y calcular los resultados:

    resultados = ddf.map_partitions(procesar_grupo).compute()
    

    Esto devolverá una lista de dataframes, donde cada dataframe contiene los resultados de la función personalizada aplicada a cada grupo del dataframe original.

    Para fusionar los resultados en un solo dataframe y reemplazar la columna original “C”, podemos usar la función concat de pandas:

    nueva_C = pd.concat(resultados)["C"]
    df["C"] = nueva_C
    

    Esto creará una nueva columna “C” con los valores actualizados.

    Tenga en cuenta que el enfoque dask no necesariamente será más rápido que el enfoque pandas, especialmente si los datos caben en la memoria y los cálculos se pueden paralelizar utilizando las bibliotecas multiprocessing o joblib. Sin embargo, proporciona una solución escalable y distribuida para manejar conjuntos de datos grandes.

Comments are closed.