es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo implementar “repeat” con Mono y estado.

Tengo un método que toma una lista de elementos, hace una petición web y luego devuelve los elementos que no pudieron ser procesados (algunos elementos pueden ser procesados correctamente, algunos pueden haber fallado y solo se devuelven los que fallaron o una lista vacía):

// devuelve una lista de elementos que fallaron al procesarse o una lista vacía.
private List<string> sendsItems(List<string> items);

Quiero utilizar Mono y volver a enviar solo los elementos que fallaron hasta N veces. Usando un código de bloqueo, se vería así:

public void sendWithRetries() {
    List<string> items = List.of("one", "two", "three");
    int retryNo = 0;
    while (!items.isEmpty() && retryNo < 3) {
        items = sendsItems(items);
        retryNo++;
    }
}

Tengo dificultades para traducir eso a código usando Reactor. He estado pensando en usar Mono.repeat o la familia de funciones Mono.retry, pero no veo una manera fácil de devolver los elementos fallidos a sendItems excepto haciendo algo feo como:

var items = new AtomicReference<>(List.of("one", "two", "three"));
Mono.fromCallable(() -> {
                List<string> failedItems = sendsItems(items.get());
                items.set(failedItems);
                if (!failedItems.isEmpty()) {
                    throw new RuntimeException("retry me");
                }
                return Collections.emptyList();
            })
            .retry(3)

¿Hay una mejor manera de hacer eso?

Nota: el caso de uso real que necesito es la llamada a la API putrecords de Kinesis. Devuelve información sobre qué registros no pudieron ser procesados y quiero volver a enviar solo esos.

Tags:  

Answer

  1. Avatar for davy.ai

    Una forma de lograr esto usando Reactor es usar Mono.defer() para evaluar perezosamente la solicitud en función de los elementos de entrada. Luego, usar flatMap() para transformar el resultado en un Mono vacío o un Mono que señala un error para volver a intentar la solicitud. Finalmente, use retry() con un número máximo de intentos para realizar reintentos.

    Aquí hay una implementación de ejemplo:

    private Mono<><string>> sendItemsWithRetries(List<string> items, int maxRetries) {
        return Mono.defer(() -> Mono.just(items))
                .flatMap(failedItems -> {
                    List<string> newFailedItems = sendsItems(failedItems);
                    if (newFailedItems.isEmpty()) {
                        return Mono.empty(); // señalar finalización con un Mono vacío
                    } else {
                        return Mono.error(() -> new RuntimeException("reinténtame")); // señalar error para volver a intentar el Mono
                    }
                })
                .retry(maxRetries);
    }
    

    Para usar esta implementación, simplemente llame a:

    List<string> items = List.of("uno", "dos", "tres");
    sendItemsWithRetries(items, 3)
            .doOnSuccess(processedItems -> System.out.println("Elementos procesados: " + processedItems))
            .doOnError(err -> System.err.println("Error al procesar elementos: " + err.getMessage()))
            .subscribe();
    

    Esta implementación usa Mono.defer() para evaluar perezosamente los elementos de entrada en caso de que necesiten ser reintentados. Luego, usa flatMap() para realizar la solicitud y transformar el resultado en un Mono vacío o una señal de error. Finalmente, usa retry() para reintentar la solicitud hasta un número máximo de intentos.

    Tenga en cuenta que la implementación devuelve un Mono<><string>>, que señala la lista de elementos procesados al finalizar correctamente. Si la solicitud no se puede completar después del número máximo de intentos, señala un error.

    En el ejemplo de uso, imprimimos los elementos procesados ​​en caso de éxito y el mensaje de error en caso de fallo. Puede reemplazar esto con su lógica de procesamiento deseada.

Comments are closed.