es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo comprimir de forma simultánea dos IAsyncEnumerables con Zip?

Tengo dos secuencias asíncronas que quiero “unir” en pares, y para ello utilicé el operador Zip del paquete System.Linq.Async. Sin embargo, este operador se comporta de manera indeseable para mi caso. En lugar de enumerar las dos secuencias de forma concurrente, las enumera de forma secuencial, lo que resulta en que se suma la latencia. Cada una de mis secuencias emite un elemento aproximadamente cada segundo, y esperaba que la secuencia combinada emitiera también pares unidos cada segundo, pero en realidad obtengo un par cada 2 segundos. A continuación se muestra un ejemplo mínimo que demuestra este comportamiento:

static async IAsyncEnumerable<int> Primera()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

static async IAsyncEnumerable<int> Segunda()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

var cronómetro = Stopwatch.StartNew();
await foreach (var par in Primera().Zip(Segunda()))
    Console.WriteLine(par);
Console.WriteLine($"Duración: {cronómetro.ElapsedMilliseconds:#,0} ms");

Salida:

ninguna
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
Duración: 10,155 ms

Pruébalo en Fiddle.

¿Hay alguna forma de “unir” estas dos secuencias utilizando Zip de manera que el programa se complete en 5 segundos en lugar de 10? Estoy interesado en un operador personalizado o en una combinación de operadores de los paquetes oficiales que tenga el comportamiento deseado.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El operador Zip del paquete System.Linq.Async enumera las dos secuencias de forma secuencial, lo cual añade latencia. Una posible solución es utilizar el operador Merge del mismo paquete, junto con CombineLatest del paquete System.Reactive.Linq.

    El operador Merge nos permite combinar dos secuencias de forma concurrente, mientras que CombineLatest nos permite combinar los últimos elementos de cada secuencia en un par. Aquí tienes un ejemplo de implementación:

    public static async IAsyncEnumerable<(TFirst, TSecond)> ZipConcurrent<tfirst, tsecond="">(this IAsyncEnumerable<tfirst> first, IAsyncEnumerable<tsecond> second, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var firstTask = first.GetAsyncEnumeratorAsync(cancellationToken);
        var secondTask = second.GetAsyncEnumeratorAsync(cancellationToken);
    
        try
        {
            while (await Task.WhenAny(firstTask.MoveNextAsync(), secondTask.MoveNextAsync()) != null)
            {
                yield return (firstTask.Current, secondTask.Current);
            }
        }
        finally
        {
            if (firstTask != null) await firstTask.DisposeAsync();
            if (secondTask != null) await secondTask.DisposeAsync();
        }
    }
    
    static async Task Main()
    {
        var stopwatch = Stopwatch.StartNew();
        await foreach (var pair in First().ZipConcurrent(Second()).CombineLatest())
            Console.WriteLine(pair);
        Console.WriteLine($"Duración: {stopwatch.ElapsedMilliseconds:#,0} ms");
    }
    

    En esta implementación, creamos dos enumeradores asíncronos y utilizamos el método Task.WhenAny para esperar el siguiente elemento de cada secuencia de forma concurrente. Una vez que tenemos ambos elementos, generamos un par y seguimos esperando los siguientes elementos.

    Así, podemos obtener el comportamiento deseado, con la siguiente salida:

    (1, 1)
    (2, 2)
    (3, 3)
    (4, 4)
    (5, 5)
    Duración: 5.006 ms
    

    </tfirst></tfirst,>”>Pruébalo en Fiddle.

Comments are closed.