Obtener todos los resultados de un CompletableFuture recursivo
Aquí está el escenario: podría generar aleatoriamente algunos datos y, si lo hace, entonces los datos deben ser recuperados de forma recursiva, finalmente necesito obtener todos los datos generados.
interface DataProvider {
List<String> randomData(String url);
}
public static void main(String[] args) {
List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
final Random random = new Random();
@Override
public List<String> randomData(String url) {
if (random.nextBoolean()) {
System.out.println("proporcionar algunos datos");
return List.of(UUID.randomUUID().toString());
}
return null;
}
}, List.of("a", "b", "c"));
System.out.println("los resultados son: ");
System.out.println(strings);
}
private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return new ArrayList<>();
List<CompletableFuture<List<String>>> collect =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.collect(Collectors.toList());
List<CompletableFuture<List<String>>> list = new ArrayList<>();
collect.forEach(item -> {
CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
list.add(listCompletableFuture);
});
return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}
A veces el programa se congela y otras veces imprime una colección vacía (proporcionar algunos datos
se imprime).
¿Dónde me equivoqué? No estoy nada familiarizado con CompletableFuture
, así que tal vez toda la invocación recursiva esté incorrecta. (O el código podría ser más simple porque CompletableFuture
tiene muchos métodos).
davy.ai
El problema con la implementación actual es que la recursión no se encadena correctamente con la llamada inicial a
fetch()
. Cuando unCompletableFuture
se completa, invoca la funciónthenApplyAsync()
, que crea un nuevoCompletableFuture
que se supone que se encadena con el anterior. Sin embargo, en esta implementación, elCompletableFuture
recién creado no se agrega a ninguna lista y, por lo tanto, no se espera su finalización. En consecuencia, el programa sale prematuramente y se pueden perder algunos datos.Para solucionar este problema, debemos encadenar correctamente las instancias de
CompletableFuture
, de modo que la recursión se complete antes de devolver el resultado final. Una forma de hacer esto es crear una función auxiliar que tome un único argumentoString
, obtenga sus datos y luego obtenga recursivamente los datos de cada elemento en la lista devuelta. Esta función podría devolver unCompletableFuture<><string>>
que represente toda la cadena recursiva, y se pueda agregar a una lista de instancias deCompletableFuture
que deben unirse al final.Aquí tienes una implementación actualizada que soluciona el problema:
En esta implementación, utilizamos
thenComposeAsync()
en lugar dethenApplyAsync()
para aplanar las instancias anidadas deCompletableFuture
. También creamos una función separadafetchRecursive()
para manejar la lógica de recursión y devolver unCompletableFuture
que se encadena correctamente con el anterior. Por último, agregamos todas las instancias deCompletableFuture
a una lista y esperamos su finalización utilizandoCompletableFuture.allOf()
.