Cómo acelerar groupby().sum() en un dataframe de Dask con 5 millones de filas y 500 mil grupos?
Tengo un marco de datos con:
- 5 millones de filas.
- una columna
group_id
cuyo número de elementos únicos es de 500.000. - miles de otras columnas llamadas
var1
,var2
, etc. Cada una devar1
,var2
, … contiene solo 0 y 1.
Me gustaría agrupar por group_id
y luego sumarlos. Para tener un mejor rendimiento, utilizo dask. Sin embargo, la velocidad sigue siendo lenta para esta simple agregación.
El tiempo dedicado a un marco de datos con 10 columnas es de 6.285385847091675 segundos
El tiempo dedicado a un marco de datos con 100 columnas es de 64.9060411453247 segundos
El tiempo dedicado a un marco de datos con 200 columnas es de 150.6109869480133 segundos
El tiempo dedicado a un marco de datos con 300 columnas es de 235.77087807655334 segundos
Mi conjunto de datos real contiene hasta 30.000 columnas. He leído respuestas (1 y 2) de @Divakar sobre el uso de numpy. Sin embargo, el primer hilo trata de contar y el segundo trata de sumar columnas.
¿Podría detallar algunas formas de acelerar esta agregación?
importar numpy como np
importar pandas como pd
importar os, tiempo
desde multiprocessing import dummy
importar dask.dataframe como dd
núcleo = os.cpu_count()
P = dummy.Pool(processes = núcleo)
n_docs = 500000
n_filas = n_docs * 10
datos = {}
def crear_col(i):
nombre = ‘var’ + str(i)
datos[nombre] = np.random.randint(0, 2, n_filas)
n_cols = 300
P.map(crear_col, rango(1, n_cols + 1))
df = pd.DataFrame(datos, dtype = ‘int8’)
df.insert(0, ‘group_id’, np.random.randint(1, n_docs + 1, n_filas))
df = dd.from_pandas(df, npartitions = 3 * núcleo)
inicio = tiempo.time()
df.groupby(‘group_id’).sum().compute()
fin = tiempo.time()
print(‘El tiempo dedicado a un marco de datos con {} columnas es’.format(n_cols), fin – inicio, ‘segundos’)
davy.ai
Hay algunas técnicas para acelerar este proceso de agregación:
var1
,var2
, etc. Dado que solo contienen0
y1
, el uso de un tipo de dato booleano en lugar deint8
puede reducir significativamente el uso de memoria y acelerar el cálculo.Utilice un algoritmo de agrupación más eficiente. Pandas y Dask utilizan algoritmos de agrupación diferentes: pandas utiliza un algoritmo basado en hash mientras que Dask utiliza un algoritmo basado en ordenación. En algunos casos, cambiar al otro algoritmo puede mejorar el rendimiento. En este caso, dado que tenemos un gran número de valores únicos de
group_id
(500,000), el uso del algoritmo basado en ordenación en pandas puede ser más rápido.Utilice un marco de trabajo de computación distribuida como Apache Spark o Dask para paralelizar el cálculo. Esto puede ayudar a acelerar el cálculo distribuyéndolo en varios nodos y procesadores.
Si el conjunto de datos es demasiado grande para caber en la memoria, considere utilizar un sistema de almacenamiento basado en disco como Apache Parquet o Apache Arrow. Estos formatos pueden almacenar grandes conjuntos de datos de manera eficiente en disco y permitir un rápido acceso de lectura/escritura.
Utilice la función
bincount
de numpy para realizar la agregación. Dado que las columnasvar1
,var2
, etc. solo contienen0
y1
, podemos tratarlas como valores binarios y utilizarbincount
para contar el número de valores1
para cada grupo. Esto puede ser mucho más rápido que utilizar la función de agrupación de pandas o dask.Aquí hay un ejemplo de cómo implementar el enfoque utilizando
bincount
de numpy:Este código convierte la columna
group_id
en un arreglo de numpy y luego aplicabincount
a las columnas restantes del dataframe para contar el número de valores1
para cada grupo. Los recuentos resultantes se convierten luego en un dataframe de pandas para su fácil manipulación y análisis adicional. Este enfoque puede ser mucho más rápido que utilizar la función de agrupación de pandas o dask, especialmente para conjuntos de datos grandes.