es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Node.js se detiene al leer de múltiples flujos legibles.

Después de crear un stream (A), crear otro stream (B) y leer el stream (B), el proceso de lectura se detiene del stream (A).
¿Cómo puedo solucionar este problema?

Node.js v14.18.1

import * as readline from 'readline';
import { Readable } from 'stream';

async function main() {

const streamA = Readable.from('a');
const readerA = readline.createInterface({
    input: streamA,
    crlfDelay: Infinity
});

var stopCase = false;
if (stopCase) {

    const streamB = Readable.from('b');
    const readerB = readline.createInterface({
        input: streamB,
        crlfDelay: Infinity
    });

    console.log('readB');
    for await (const line of readerB) {
        console.log(line);
    }
}
console.log(`readerA.closed = ${'closed' in readerA}`);

console.log('readA');
for await (const line of readerA) {
    console.log(line);
}
console.log('success');
}
main();

Salida (stopCase=true):

readB
b
readerA.closed = true
readA

Salida (stopCase=false):

readerA.closed = false
readA
a
success
Tags:  , , ,

Answer

  1. Avatar for davy.ai

    El problema es que cuando stopCase es true, el código lee primero de streamB y readerB antes de volver a leer de streamA y readerA. Cuando se crea readerB, llama implícitamente a pause() en streamA, lo que detiene el flujo de datos. Por lo tanto, cuando intentamos leer de readerA nuevamente, no hay datos que se estén enviando a través de streamA.

    Para solucionar este problema, podemos llamar explícitamente a resume() en streamA después de leer en readerB:

    import * as readline from 'readline';
    import { Readable } from 'stream';
    
    async function main() {
      const streamA = Readable.from('a');
      const readerA = readline.createInterface({
        input: streamA,
        crlfDelay: Infinity,
      });
    
      let stopCase = true;
      if (stopCase) {
        const streamB = Readable.from('b');
        const readerB = readline.createInterface({
          input: streamB,
          crlfDelay: Infinity,
        });
    
        console.log('readB');
        for await (const line of readerB) {
          console.log(line);
        }
    
        // llamamos explícitamente a resume() en streamA para continuar el flujo de datos
        streamA.resume();
      }
    
      console.log(`readerA.closed = ${'closed' in readerA}`);
      console.log('readA');
    
      for await (const line of readerA) {
        console.log(line);
      }
    
      console.log('success');
    }
    
    main();
    

    Al llamar a streamA.resume() después de terminar con readerB, permitimos que los datos continúen fluyendo a través de streamA, lo que nos permite leer de readerA nuevamente con la salida esperada.

    Resultado (stopCase=true):

    readB
    b
    readerA.closed = true
    readA
    a
    success
    

    Resultado (stopCase=false):

    readerA.closed = false
    readA
    a
    success
    

    Tenga en cuenta que si está trabajando con flujos más complejos, es posible que deba usar otros métodos para controlar el flujo de datos en lugar de resume() y pause(), como usar pipe() o crear un flujo personalizado Transform.

Comments are closed.