Tag: REACTIVE-STREAMS
Supongamos que tengo dos fuentes de eventos históricos, y que los eventos de cada fuente están ordenados cronológicamente. ¿Cómo puedo fusionar estas fuentes utilizando Reactor para que los eventos en el Flujo fusionado se emitan en orden cronológico? En RxNET, se puede utilizar una combinación de Observable.Generate() y HistoricalScheduler para . . . Read more
Estoy implementando un listener de SQS reactivo y me enfrento a un problema con las suscripciones anidadas. Así es como se ve mi listener: @PostConstruct public void listener() { Mono<ReceiveMessageResponse> receiveMessageResponseMono = Mono.fromFuture( () -> sqsAsyncClient.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(MAX<em>NUMBER</em>OF<em>MESSAGES) .waitTimeSeconds(WAIT</em>TIME<em>IN</em>SECONDS) .build())); } receiveMessageResponseMono .repeat() .retry() .map(ReceiveMessageResponse::messages) .map(Flux::fromIterable) .flatMap(messageFlux -> messageFlux) . . . Read more
Estamos tratando de actualizar el antiguo controlador de Mongo basado en RxJava mongodb-driver-rx (v1.5.0) al nuevo mongodb-driver-reactivestreams (v1.13.1), no el más nuevo debido a las dependencias, pero sí mucho más nuevo. El antiguo RxJava ha sido descontinuado durante años. Todo funciona correctamente con el nuevo controlador, pero bajo carga elevada . . . Read more
Estoy creando un ByteArrayOutputStream utilizando ZIO Streams, es decir: lazy val byteArrayOutputStream = new ByteArrayOutputStream() val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunksString val data = ZStream.unwrap(callToFunction).run(sink) Esto funciona bien, ahora necesito transmitir en streaming estos datos de regreso al cliente utilizando akka http. Puedo hacer esto: val arr = byteArrayOutputStream.toByteArray complete(HttpEntity(ContentTypes.application/octet-stream, arr)) Lo . . . Read more