es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo fusionar las secuencias de entrada/salida (IO) de Python en un único iterador, pero mantener la información de qué elemento proviene de qué secuencia?

La funcionalidad deseada es algo como esto:

import subprocess as sp

p = sp.Popen(('programa', 'arg1', ...), stdout=sp.PIPE, stderr=sp.PIPE)

for line in merge_streams((p.stdout, 'OUT'), (p.stderr, 'ERR')):
    print(line)

Lo cual debería producir una salida similar a esta, en tiempo real:

('OUT', b'línea de salida 1')
('OUT', b'línea de salida 2')
('ERR', b'línea de error 1')
('ERR', b'línea de error 2')
('OUT', b'línea de salida 3')

Para ser claro, ejecutar el mismo programa desde CMD producirá:

línea de salida 1
línea de salida 2
línea de error 1
línea de error 2
línea de salida 3

Usar p = sp.Popen(('programa', 'arg1', ...), stdout=sp.PIPE, stderr=sp.STDOUT) fusionará los flujos pero no habrá forma de distinguir entre ellos.

Usar itertools.chain obviamente colocará todas las líneas de uno después de todas las líneas del otro.

Mi única solución semi-funcional implicó 2 hilos que empujaban a una collections.dequeue y el programa principal leyendo de él,
pero este enfoque parece desordenar el orden de los bloques multilínea escritos en el flujo al mismo tiempo.

Es decir, una excepción como esta:

asdf : El término 'asdf' no se reconoce como el nombre de un cmdlet, función, archivo de script o programa ejecutable. Compruebe la ortografía del nombre o, si se incluyó una ruta, compruebe que la ruta sea correcta e inténtelo de nuevo.
En algo.ps1:2 carácter:5
+     asdf
+     ~~~~
    + CategoryInfo          : ObjectNotFound: (asdf:String) [], CommandNotFoundException
    + FullyQualifiedErrorId : CommandNotFoundException

Podría imprimirse así:

b'    + CategoryInfo          : ObjectNotFound: (asdf:String) [], CommandNotFoundException'
b'    + FullyQualifiedErrorId : CommandNotFoundException'
b'En algo.ps1:2 carácter:5'
b'+     asdf'
b'+     ~~~~'
b"asdf : El término 'asdf' no se reconoce como el nombre de un cmdlet, función, archivo de script o programa ejecutable. Compruebe la ortografía del nombre o, si se incluyó una ruta, compruebe que la ruta sea correcta e inténtelo de nuevo."

Para evitar el problema XY: El objetivo final es enviar la salida a un cliente en tiempo real utilizando fastapi.responses.StreamingResponse, y ese cliente necesita saber qué es stderr y qué es stdout. Si usar un WebSocket facilita esto, también está bien.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Solución para combinar las salidas estándar y de error en tiempo real

    Para combinar las salidas estándar y de error en tiempo real en Python utilizando subprocess.Popen, podemos definir una función merge_streams que tome como argumentos las salidas estándar y de error junto con sus respectivas etiquetas. En la función, podemos utilizar el módulo select para verificar si hay datos disponibles en las salidas y luego leer las líneas de las salidas utilizando el método readline. Luego podemos devolver una tupla que contenga la etiqueta y la línea.

    Aquí está la implementación de merge_streams:

    import select
    
    def merge_streams(*streams):
        """
        Combinar las salidas estándar y de error en tiempo real.
        """
        streams = [(stream, label) for stream, label in streams if stream is not None]
        while streams:
            readable, _, _ = select.select(streams, [], [])
            for stream, label in readable:
                line = stream.readline()
                if line:
                    yield label, line
                else:
                    streams.remove((stream, label))
    

    Podemos utilizar esta función con subprocess.Popen de la siguiente manera:

    import subprocess as sp
    
    p = sp.Popen(('programa', 'arg1', ...), stdout=sp.PIPE, stderr=sp.PIPE)
    
    for label, line in merge_streams((p.stdout, 'SALIDA'), (p.stderr, 'ERROR')):
        print(label, line)
    

    Esto imprimirá las líneas de salida estándar y de error junto con sus respectivas etiquetas en tiempo real.

    Manejo de bloques multilinea

    Si queremos preservar el orden de los bloques multilinea escritos en la salida al mismo tiempo, podemos modificar la función merge_streams para leer todas las líneas hasta que se encuentre una línea en blanco o hasta que no haya más datos disponibles en la salida. Esto se puede lograr utilizando un bucle dentro del bucle readline.

    Aquí está la implementación modificada de merge_streams:

    import select
    
    def merge_streams(*streams):
        """
        Combinar las salidas estándar y de error en tiempo real.
        """
        streams = [(stream, label) for stream, label in streams if stream is not None]
        while streams:
            readable, _, _ = select.select(streams, [], [])
            for stream, label in readable:
                lines = []
                while True:
                    line = stream.readline()
                    if line:
                        lines.append(line)
                        if not line.strip():
                            break
                    else:
                        streams.remove((stream, label))
                        break
                if lines:
                    yield label, b''.join(lines)
    

    Esto asegurará que los bloques multilinea escritos en la salida al mismo tiempo se muestren juntos.

    Envío de la salida al cliente.

    Para enviar la salida a un cliente en tiempo real utilizando fastapi.responses.StreamingResponse, podemos modificar el código anterior de la siguiente manera:

    import fastapi.responses
    import fastapi.routing
    
    router = fastapi.routing.APIRouter()
    
    @router.get('/stream')
    async def stream_handler():
        p = sp.Popen(('programa', 'arg1', ...), stdout=sp.PIPE, stderr=sp.PIPE)
        async def generate():
            async for label, line in merge_streams((p.stdout, 'SALIDA'), (p.stderr, 'ERROR')):
                yield label.encode(), line
        return fastapi.responses.StreamingResponse(generate())
    

    Esto creará una respuesta de transmisión que envía la salida al cliente en tiempo real. El cliente puede leer la transmisión abriendo la URL /stream en su navegador o utilizando un WebSocket para conectarse al servidor.

Comments are closed.