Cómo fusionar las secuencias de entrada/salida (IO) de Python en un único iterador, pero mantener la información de qué elemento proviene de qué secuencia?
La funcionalidad deseada es algo como esto:
import subprocess as sp
p = sp.Popen(('programa', 'arg1', ...), stdout=sp.PIPE, stderr=sp.PIPE)
for line in merge_streams((p.stdout, 'OUT'), (p.stderr, 'ERR')):
print(line)
Lo cual debería producir una salida similar a esta, en tiempo real:
('OUT', b'línea de salida 1')
('OUT', b'línea de salida 2')
('ERR', b'línea de error 1')
('ERR', b'línea de error 2')
('OUT', b'línea de salida 3')
Para ser claro, ejecutar el mismo programa desde CMD producirá:
línea de salida 1
línea de salida 2
línea de error 1
línea de error 2
línea de salida 3
Usar p = sp.Popen(('programa', 'arg1', ...), stdout=sp.PIPE, stderr=sp.STDOUT)
fusionará los flujos pero no habrá forma de distinguir entre ellos.
Usar itertools.chain
obviamente colocará todas las líneas de uno después de todas las líneas del otro.
Mi única solución semi-funcional implicó 2 hilos que empujaban a una collections.dequeue
y el programa principal leyendo de él,
pero este enfoque parece desordenar el orden de los bloques multilínea escritos en el flujo al mismo tiempo.
Es decir, una excepción como esta:
asdf : El término 'asdf' no se reconoce como el nombre de un cmdlet, función, archivo de script o programa ejecutable. Compruebe la ortografía del nombre o, si se incluyó una ruta, compruebe que la ruta sea correcta e inténtelo de nuevo.
En algo.ps1:2 carácter:5
+ asdf
+ ~~~~
+ CategoryInfo : ObjectNotFound: (asdf:String) [], CommandNotFoundException
+ FullyQualifiedErrorId : CommandNotFoundException
Podría imprimirse así:
b' + CategoryInfo : ObjectNotFound: (asdf:String) [], CommandNotFoundException'
b' + FullyQualifiedErrorId : CommandNotFoundException'
b'En algo.ps1:2 carácter:5'
b'+ asdf'
b'+ ~~~~'
b"asdf : El término 'asdf' no se reconoce como el nombre de un cmdlet, función, archivo de script o programa ejecutable. Compruebe la ortografía del nombre o, si se incluyó una ruta, compruebe que la ruta sea correcta e inténtelo de nuevo."
Para evitar el problema XY: El objetivo final es enviar la salida a un cliente en tiempo real utilizando fastapi.responses.StreamingResponse
, y ese cliente necesita saber qué es stderr y qué es stdout. Si usar un WebSocket facilita esto, también está bien.
davy.ai
Solución para combinar las salidas estándar y de error en tiempo real
Para combinar las salidas estándar y de error en tiempo real en Python utilizando
subprocess.Popen
, podemos definir una funciónmerge_streams
que tome como argumentos las salidas estándar y de error junto con sus respectivas etiquetas. En la función, podemos utilizar el móduloselect
para verificar si hay datos disponibles en las salidas y luego leer las líneas de las salidas utilizando el métodoreadline
. Luego podemos devolver una tupla que contenga la etiqueta y la línea.Aquí está la implementación de
merge_streams
:Podemos utilizar esta función con
subprocess.Popen
de la siguiente manera:Esto imprimirá las líneas de salida estándar y de error junto con sus respectivas etiquetas en tiempo real.
Manejo de bloques multilinea
Si queremos preservar el orden de los bloques multilinea escritos en la salida al mismo tiempo, podemos modificar la función
merge_streams
para leer todas las líneas hasta que se encuentre una línea en blanco o hasta que no haya más datos disponibles en la salida. Esto se puede lograr utilizando un bucle dentro del buclereadline
.Aquí está la implementación modificada de
merge_streams
:Esto asegurará que los bloques multilinea escritos en la salida al mismo tiempo se muestren juntos.
Envío de la salida al cliente.
Para enviar la salida a un cliente en tiempo real utilizando
fastapi.responses.StreamingResponse
, podemos modificar el código anterior de la siguiente manera:Esto creará una respuesta de transmisión que envía la salida al cliente en tiempo real. El cliente puede leer la transmisión abriendo la URL
/stream
en su navegador o utilizando un WebSocket para conectarse al servidor.