Ejecutar la función map de manera asíncrona en una lista de tamaño desconocido en Python utilizando asyncio.
Tengo una aplicación que procesa datos transmitidos a través de tcp en lotes. El programa está codificado en Python y utilizo la biblioteca asyncio para alternar entre la recopilación y el procesamiento del flujo de datos.
Necesito analizar el lote línea por línea y ocasionalmente emitir algo a la interfaz gráfica. El flujo es demasiado rápido para enviar todo. Mi código actual es el siguiente:
await process_data( list( map( parse_line, await readlines( reader ) ) ) )
La función parse_line
se ejecutará cientos de veces (dependiendo de cuántos registros devuelva readlines
) y estoy bloqueado porque es una función regular. Mi solución inicial fue intentar dormir en la función parse_line
con asyncio
, pero Python no lo permite y lanza un error…
TypeError: el objeto no se puede utilizar en una expresión 'await'
Entonces, ¿cómo puedo solucionarlo? Quiero poder alternar mientras se procesa el lote, pero debo mantener la velocidad. ¿Alguna sugerencia?
Gracias de antemano…
Actualización
Código simplificado:
import asyncio
data = '1,1,1,1\n1,1,1,1\n1,1,1,1\n'
async def readlines(data):
lines = []
print('leyendo datos')
lines = data.split('\n')
return lines
def parse_lines(line):
print('analizando línea')
print(line)
(var1, var2, var3, var4) = line.split(',')
print(f'var1= {var1} var2 = {var2} var3= {var3} var4={var4}')
async def process_data():
print(list( map( parse_lines, await readlines( data) )))
El objetivo es dividir el análisis de los datos y permitirme volver a mi bucle de eventos y hacer otras cosas. Puedo tener 600 elementos en mis datos, lo que causa problemas ya que intento mantener la actualización de la interfaz gráfica y algunas otras cosas, y me bloqueo durante demasiado tiempo.
davy.ai
Para evitar que se bloquee al analizar cada línea, puedes modificar tu código para usar asyncio.create_task() y ejecutar cada llamada a parse_line como una tarea independiente que se pueda programar y ejecutar de forma independiente.
Así es como podrías modificar tu código para lograr esto:
En este código modificado, creamos una lista llamada “tasks” y añadimos una nueva tarea para cada línea de datos. Cada tarea se crea utilizando asyncio.create_task(), que devuelve un objeto de tipo coroutine que se puede programar de forma independiente del bucle principal de eventos.
Al utilizar asyncio.create_task() para ejecutar cada llamada a parse_line como una tarea independiente, permitimos que el bucle de eventos cambie entre las diferentes tareas según sea necesario, lo que nos permitirá realizar otras acciones mientras los datos se están procesando.
Para ejecutar las coroutines usamos asyncio.gather, que espera a que todas las tareas se completen antes de continuar.
¡Espero que esto te sea de ayuda!