es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Escucha reactiva de SQS

Estoy implementando un listener de SQS reactivo y me enfrento a un problema con las suscripciones anidadas.

Así es como se ve mi listener:

@PostConstruct
public void listener() {
  Mono<ReceiveMessageResponse> receiveMessageResponseMono =
    Mono.fromFuture(
      () ->
        sqsAsyncClient.receiveMessage(
          ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .maxNumberOfMessages(MAX<em>NUMBER</em>OF<em>MESSAGES)
            .waitTimeSeconds(WAIT</em>TIME<em>IN</em>SECONDS)
            .build()));
}
receiveMessageResponseMono
  .repeat()
  .retry()
  .map(ReceiveMessageResponse::messages)
  .map(Flux::fromIterable)
  .flatMap(messageFlux -> messageFlux)
  .subscribe(
    message -> {
      boolean isProcessed;

      isProcessed = process(message.body(), message.attributesAsStrings());

      if (isProcessed) {
        sqsAsyncClient.deleteMessage(
          DeleteMessageRequest.builder()
            .queueUrl(queueUrl)
            .receiptHandle(message.receiptHandle())
            .build());
      }
    });
}

Y el método process:

Boolean process(String message, Map<String, String> attributes) {
  reactiveCrudRepository
    .findById(1)
    .subscribe(b -> log.info("Elemento recuperado {}", b.getId()));
  return true;
}

Ahora el problema es que necesito datos de un repositorio reactivo y la suscripción principal no espera a que se complete la suscripción secundaria y la mayoría de las veces no obtengo este registro “Elemento recuperado”. No puedo devolver un Mono desde la función ya que el método process debe ser genérico y cualquier evento puede ser procesado y dentro del método process, necesito usar un cliente de rest-reactivo y un cliente de base de datos.

Soy nuevo en la programación reactiva, así que no estoy seguro de cómo puedo resolver esto o si estoy siguiendo un anti-patrón.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema es causado por la suscripción anidada en el método process. La suscripción padre no espera a que la suscripción hijo se complete, lo que provoca que la declaración de registro no se ejecute siempre.

    Una forma de resolver este problema es utilizando operadores que te permitan realizar operaciones asíncronas y esperar sus resultados, como flatMap o concatMap.

    Por ejemplo, en lugar de suscribirte al reactiveCrudRepository dentro del método process, podrías usar flatMap para esperar el resultado antes de continuar con el procesamiento:

      Mono<boolean> process(String message, Map<string, string=""> attributes) {
        return reactiveCrudRepository
            .findById(1)
            .doOnNext(b -> log.info("Item obtenido {}", b.getId()))
            .map(b -> true);
      }
    

    Luego, puedes modificar tu listener para usar este método encadenándolo después del operador map:

    receiveMessageResponseMono
            .repeat()
            .retry()
            .map(ReceiveMessageResponse::messages)
            .map(Flux::fromIterable)
            .flatMap(messageFlux -> messageFlux)
            .map(message -> process(message.body(), message.attributesAsStrings()))
            .flatMap(Mono::subscribeOn)
            .subscribe();
    

    Esta modificación asegurará que la suscripción padre espere el resultado del método process antes de avanzar al siguiente mensaje.

    En resumen, para resolver el problema con suscripciones anidadas en un listener SQS reactivo, usa flatMap o concatMap para asegurarte de que la suscripción padre espere el resultado de cualquier operación asíncrona antes de avanzar.</string,>

Comments are closed.