es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Puedo aplicar una función a un dataframe de pandas de manera asíncrona?

En mi caso de uso, necesito obtener datos de un servidor remoto. El código es aproximadamente equivalente a:

def obtener_datos_de_usuario(id_de_usuario):
    time.sleep(5)
    ...
    return data

df = pd.DataFrame({'id_de_usuario': ['uid1', 'uid2', 'uid3', ..., 'uid9999']})

respuesta = df['id_de_usuario'].apply(obtener_datos_de_usuario)

Me parece que pandas podría estar ejecutando la función obtener_datos_de_usuario de forma asíncrona.

Nota: Ya he intentado usar df['id_de_usuario'].swifter.apply(obtener_datos_de_usuario) y dask. Ambos me dan una buena aceleración al ejecutar múltiples funciones en paralelo, pero la utilización de mi CPU, red y servidor remoto sigue siendo muy baja.

¿Hay alguna forma de hacer un apply asíncrono?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Sí, existe una manera de hacer una aplicación .apply() asincrónica en pandas usando asyncio y concurrent.futures. Aquí hay un ejemplo de cómo se puede hacer:

    import pandas as pd
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    async def get_user_data(user_id):
        await asyncio.sleep(5)
        ...
        return data
    
    async def async_apply(df, func):
        with ThreadPoolExecutor() as executor:
            loop = asyncio.get_event_loop()
            tasks = [
                loop.run_in_executor(executor, func, row)
                for _, row in df.iterrows()
            ]
            results = await asyncio.gather(*tasks)
        return pd.Series(results)
    
    df = pd.DataFrame({'user_id': ['uid1', 'uid2', 'uid3', ..., 'uid9999']})
    
    respuesta = await async_apply(df['user_id'], get_user_data)
    

    En este ejemplo de código, definimos una nueva función async_apply que toma un DataFrame de pandas y una función para aplicar a cada fila de manera asincrónica. Dentro de async_apply, primero creamos un ThreadPoolExecutor para ejecutar la función en hilos separados, y luego usamos concurrent.futures para ejecutar la función en cada fila de forma asincrónica. Finalmente, usamos asyncio.gather para esperar a que regresen todos los resultados.

    Este código debería darte una mejora significativa en la velocidad al ejecutar la función get_user_data de manera asincrónica, y también aprovechar mejor tu CPU, red y los recursos del servidor remoto.

Comments are closed.