es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Ejecutar la función map de manera asíncrona en una lista de tamaño desconocido en Python utilizando asyncio.

Tengo una aplicación que procesa datos transmitidos a través de tcp en lotes. El programa está codificado en Python y utilizo la biblioteca asyncio para alternar entre la recopilación y el procesamiento del flujo de datos.

Necesito analizar el lote línea por línea y ocasionalmente emitir algo a la interfaz gráfica. El flujo es demasiado rápido para enviar todo. Mi código actual es el siguiente:

await process_data( list( map( parse_line, await readlines( reader ) ) ) )

La función parse_line se ejecutará cientos de veces (dependiendo de cuántos registros devuelva readlines) y estoy bloqueado porque es una función regular. Mi solución inicial fue intentar dormir en la función parse_line con asyncio, pero Python no lo permite y lanza un error…

TypeError: el objeto no se puede utilizar en una expresión 'await'

Entonces, ¿cómo puedo solucionarlo? Quiero poder alternar mientras se procesa el lote, pero debo mantener la velocidad. ¿Alguna sugerencia?

Gracias de antemano…

Actualización

Código simplificado:

import asyncio

data = '1,1,1,1\n1,1,1,1\n1,1,1,1\n'

async def readlines(data):
    lines = []
    print('leyendo datos')
    lines = data.split('\n')
    return lines

def parse_lines(line):
    print('analizando línea')
    print(line)
    (var1, var2, var3, var4) = line.split(',')
    print(f'var1= {var1} var2 = {var2} var3= {var3} var4={var4}')

async def process_data():
    print(list( map( parse_lines, await readlines( data) )))

El objetivo es dividir el análisis de los datos y permitirme volver a mi bucle de eventos y hacer otras cosas. Puedo tener 600 elementos en mis datos, lo que causa problemas ya que intento mantener la actualización de la interfaz gráfica y algunas otras cosas, y me bloqueo durante demasiado tiempo.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para evitar que se bloquee al analizar cada línea, puedes modificar tu código para usar asyncio.create_task() y ejecutar cada llamada a parse_line como una tarea independiente que se pueda programar y ejecutar de forma independiente.

    Así es como podrías modificar tu código para lograr esto:

    import asyncio
    
    data = '1,1,1,1\n1,1,1,1\n1,1,1,1\n'
    
    async def readlines(data):
        lines = []
        print('leyendo datos')
        lines = data.split('\n')
        return lines
    
    async def parse_lines(line):
        print('analizando línea')
        print(line)
        ( var1, var2, var3, var4 ) = line.split(',')
        print(f'var1= {var1} var2 = {var2} var3= {var3} var4={var4}')
    
    async def process_data():
        tasks = []
        for line in await readlines(data):
            tasks.append(asyncio.create_task(parse_lines(line)))
        await asyncio.gather(*tasks)
    
    # llamar a la función asíncrona
    asyncio.run(process_data())
    

    En este código modificado, creamos una lista llamada “tasks” y añadimos una nueva tarea para cada línea de datos. Cada tarea se crea utilizando asyncio.create_task(), que devuelve un objeto de tipo coroutine que se puede programar de forma independiente del bucle principal de eventos.

    Al utilizar asyncio.create_task() para ejecutar cada llamada a parse_line como una tarea independiente, permitimos que el bucle de eventos cambie entre las diferentes tareas según sea necesario, lo que nos permitirá realizar otras acciones mientras los datos se están procesando.

    Para ejecutar las coroutines usamos asyncio.gather, que espera a que todas las tareas se completen antes de continuar.

    ¡Espero que esto te sea de ayuda!

Comments are closed.