es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Obtener todos los resultados de un CompletableFuture recursivo

Aquí está el escenario: podría generar aleatoriamente algunos datos y, si lo hace, entonces los datos deben ser recuperados de forma recursiva, finalmente necesito obtener todos los datos generados.

interface DataProvider {
  List<String> randomData(String url);
}

public static void main(String[] args) {
    List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
        final Random random = new Random();
        @Override
        public List<String> randomData(String url) {
            if (random.nextBoolean()) {
                System.out.println("proporcionar algunos datos");
                return List.of(UUID.randomUUID().toString());
            }
            return null;
        }
    }, List.of("a", "b", "c"));
    System.out.println("los resultados son: ");
    System.out.println(strings);
}

private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
    if (items == null || items.isEmpty())
        return new ArrayList<>();
    List<CompletableFuture<List<String>>> collect =
            items.stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                    .collect(Collectors.toList());
    List<CompletableFuture<List<String>>> list = new ArrayList<>();
    collect.forEach(item -> {
        CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
        list.add(listCompletableFuture);
    });
    return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}

A veces el programa se congela y otras veces imprime una colección vacía (proporcionar algunos datos se imprime).

¿Dónde me equivoqué? No estoy nada familiarizado con CompletableFuture, así que tal vez toda la invocación recursiva esté incorrecta. (O el código podría ser más simple porque CompletableFuture tiene muchos métodos).

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    El problema con la implementación actual es que la recursión no se encadena correctamente con la llamada inicial a fetch(). Cuando un CompletableFuture se completa, invoca la función thenApplyAsync(), que crea un nuevo CompletableFuture que se supone que se encadena con el anterior. Sin embargo, en esta implementación, el CompletableFuture recién creado no se agrega a ninguna lista y, por lo tanto, no se espera su finalización. En consecuencia, el programa sale prematuramente y se pueden perder algunos datos.

    Para solucionar este problema, debemos encadenar correctamente las instancias de CompletableFuture, de modo que la recursión se complete antes de devolver el resultado final. Una forma de hacer esto es crear una función auxiliar que tome un único argumento String, obtenga sus datos y luego obtenga recursivamente los datos de cada elemento en la lista devuelta. Esta función podría devolver un CompletableFuture<><string>> que represente toda la cadena recursiva, y se pueda agregar a una lista de instancias de CompletableFuture que deben unirse al final.

    Aquí tienes una implementación actualizada que soluciona el problema:

    private static CompletableFuture<><string>> fetchRecursive(ExecutorService es, DataProvider dataProvider, String item) {
        return CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es)
                .thenComposeAsync(strings -> {
                    List<><><string>>> futures = strings.stream()
                            .map(str -> fetchRecursive(es, dataProvider, str))
                            .collect(Collectors.toList());
                    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                            .thenApplyAsync(v -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .flatMap(List::stream)
                                    .collect(Collectors.toList()));
                }, es);
    }
    
    private static List<string> fetch(ExecutorService es, DataProvider dataProvider, List<string> items) {
        if (items == null || items.isEmpty()) {
            return new ArrayList<>();
        }
        List<><><string>>> collect = items.stream()
                .map(item -> fetchRecursive(es, dataProvider, item))
                .collect(Collectors.toList());
        CompletableFuture.allOf(collect.toArray(new CompletableFuture[0])).join();
        return collect.stream().flatMap(future -> future.join().stream())
                .collect(Collectors.toList());
    }
    

    En esta implementación, utilizamos thenComposeAsync() en lugar de thenApplyAsync() para aplanar las instancias anidadas de CompletableFuture. También creamos una función separada fetchRecursive() para manejar la lógica de recursión y devolver un CompletableFuture que se encadena correctamente con el anterior. Por último, agregamos todas las instancias de CompletableFuture a una lista y esperamos su finalización utilizando CompletableFuture.allOf().

Comments are closed.