es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Java Reactor -> servicio sobrecargado con llamadas http

Tengo un servicio que se comunica con otro servicio a través de REST. La llamada está envuelta en un Mono:

return Mono.from(buildPublisher(requestDto))
            .subscribeOn(Schedulers.boundedElastic())
            .onErrorResume(e -> Mono.empty())
            .map(this::mapResponse);

Necesito hacer decenas de miles de llamadas a ese servicio, así que estoy llamando al método anterior desde aquí:

return Flux.fromStream(hugeListOfObjects.stream())
                .flatMap(serviceWhichReturnsMono::makeACallAndReturnMono);

Funciona bien para 200 o 2000 elementos, pero para un número mayor como más de 20000 obtengo un 503 (sin error/excepción ni en el servicio que llama ni en el servicio cliente).

Tengo una solución temporal (que es la única forma en que en realidad funciona) y se ve así:

List<><someobject>> partitions = Lists.partition(hugeListOfObjects, SECURITIES_PARTITION_SIZE);

return Flux.fromStream(partitions.stream())
            .flatMap(s -> {
                try {
                    Thread.sleep(PARTITION_DELAY);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return Flux.fromStream(s.stream()).flatMap(serviceWhichReturnsMono::makeACallAndReturnMono);
            });

Sin embargo, esto es feo y debería estar utilizando correctamente la API de Reactor. Intenté diferentes formas como:

return Flux.fromStream(securities.stream())
                .buffer(200)
                .delayElements(Duration.ofSeconds(15))
                .flatMap(s -> Flux.fromStream(s.stream()).flatMap(serviceWhichReturnsMono::makeACallAndReturnMono, 3));

Así que utilicé buffer, window, flatMap((), númeroDeHilos), etc. para controlar de alguna manera el tráfico y permitir que el servicio cliente finalice la ejecución de los fragmentos antes de recibir más solicitudes. Todos estos retornan un 503 como en el primer ejemplo anterior. ¿Alguna idea de qué podría estar mal? ¿Hay alguna forma de enviar solo un número de solicitudes y esperar a que se completen antes de enviar las siguientes?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El problema parece estar relacionado con la sobrecarga del servicio de aguas abajo con solicitudes. El hecho de que la solución alternativa funcione mediante la partición de la lista de objetos y la demora de cada partición sugiere que el servicio de aguas abajo necesita tiempo para procesar cada solicitud antes de recibir más.

    Una solución sería utilizar un mecanismo de regulación que limite el número de solicitudes por unidad de tiempo, evitando una sobrecarga. El operador flatMap se puede utilizar para este propósito al especificar un parámetro de concurrencia, que limita el número de solicitudes activas en un momento dado.

    Por ejemplo, el siguiente código crea un Flux de solicitudes que se realizan con un máximo de 10 solicitudes activas en un momento dado, con un retraso de 1 segundo entre cada solicitud:

    Flux.fromIterable(hugeListOfObjects)
        .flatMap(object -> serviceWhichReturnsMono.makeACallAndReturnMono(object)
                .subscribeOn(Schedulers.boundedElastic()), 10)
        .delayElements(Duration.ofSeconds(1))
        .subscribe();
    

    El método subscribeOn se utiliza para asegurar que cada solicitud se ejecuta en su propio hilo, evitando el bloqueo del hilo principal.

    Al limitar el nivel de concurrencia e introducir una demora entre cada solicitud, el servicio de aguas abajo debería ser capaz de manejar la carga y dejar de devolver errores 503.

Comments are closed.