es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tag: REACTIVE-STREAMS

Cómo procesar datos históricos utilizando Reactor?

Supongamos que tengo dos fuentes de eventos históricos, y que los eventos de cada fuente están ordenados cronológicamente. ¿Cómo puedo fusionar estas fuentes utilizando Reactor para que los eventos en el Flujo fusionado se emitan en orden cronológico? En RxNET, se puede utilizar una combinación de Observable.Generate() y HistoricalScheduler para . . . Read more

Escucha reactiva de SQS

Estoy implementando un listener de SQS reactivo y me enfrento a un problema con las suscripciones anidadas. Así es como se ve mi listener: @PostConstruct public void listener() { Mono<ReceiveMessageResponse> receiveMessageResponseMono = Mono.fromFuture( () -> sqsAsyncClient.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(MAX<em>NUMBER</em>OF<em>MESSAGES) .waitTimeSeconds(WAIT</em>TIME<em>IN</em>SECONDS) .build())); } receiveMessageResponseMono .repeat() .retry() .map(ReceiveMessageResponse::messages) .map(Flux::fromIterable) .flatMap(messageFlux -> messageFlux) . . . Read more

Impacto en el rendimiento al migrar del controlador Java Rx de MongoDB al controlador de transmisión reactiva.

Estamos tratando de actualizar el antiguo controlador de Mongo basado en RxJava mongodb-driver-rx (v1.5.0) al nuevo mongodb-driver-reactivestreams (v1.13.1), no el más nuevo debido a las dependencias, pero sí mucho más nuevo. El antiguo RxJava ha sido descontinuado durante años. Todo funciona correctamente con el nuevo controlador, pero bajo carga elevada . . . Read more

Transmitiendo un ByteArrayOutputStream a una respuesta de Akka HTTP.

Estoy creando un ByteArrayOutputStream utilizando ZIO Streams, es decir: lazy val byteArrayOutputStream = new ByteArrayOutputStream() val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunksString val data = ZStream.unwrap(callToFunction).run(sink) Esto funciona bien, ahora necesito transmitir en streaming estos datos de regreso al cliente utilizando akka http. Puedo hacer esto: val arr = byteArrayOutputStream.toByteArray complete(HttpEntity(ContentTypes.application/octet-stream, arr)) Lo . . . Read more