¿Cuál es la forma correcta de consumir el flujo entrante de ReactorNettyWebSocketClient?
Intento aprender la programación reactiva con Spring Reactor Webflux. Particularmente en el siguiente ejemplo, trato de entender cómo componer correctamente el flujo de procesamiento para consumir el flujo entrante de una conexión WebSocket utilizando ReactorNettyWebSocketClient.
El siguiente fragmento de código funciona correctamente:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create(“wss://stream.binance.com:9443/ws/btcusdt@trade”),
session -> {
Flux
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(event -> log.info(event));
return tradesFlux.then();
}
).subscribe();
En el fragmento de código anterior, uso doOnNext() en el flujo entrante Flux – tradesFlux para consumir cada evento que llega. Funciona bien con el fragmento de código anterior, pero según tengo entendido, doOnNext() es un operador de efecto secundario, así que intenté hacer lo siguiente:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create(“wss://stream.binance.com:9443/ws/btcusdt@trade”),
session -> {
Flux
.map(WebSocketMessage::getPayloadAsText);
tradesFlux.subscribe(new Subscriber<string>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
log.info("replicar operación de trading de Binance {}", s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
return tradesFlux.then();
}
).subscribe();
En el segundo fragmento de código, intenté consumir el flujo entrante con un Subscriber llamando a subscribe en el flujo entrante – tradesFlux. Pero con este fragmento de código, obtuve la siguiente excepción:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Solo se permite un único suscriptor de recepción de conexión.
Causado por: java.lang.IllegalStateException: Solo se permite un único suscriptor de recepción de conexión.
De alguna manera no se puede consumir el flujo entrante Flux usando su propio Subscriber, así que la pregunta que quiero hacer es: ¿es el primer fragmento de código la forma adecuada de consumir el flujo entrante de ReactorNettyWebSocketClient utilizando el operador de efecto secundario doOnNext(), o estoy omitiendo algo aquí?
¡Muchas gracias!
Tung
davy.ai
Como experto en TI especializado en Java, Spring, Programación Reactiva, Spring Webflux y Reactor-Netty, permíteme ayudarte a responder tu pregunta sobre cómo consumir el flujo entrante de una conexión WebSocket usando ReactorNettyWebSocketClient con Spring Reactor Webflux.
En tu primer fragmento de código, utilizas
doOnNext()
en el Flux de flujo entrante –tradesFlux
para consumir cada evento que llega. Esto funciona bien, ya quedoOnNext()
es una operación de efecto secundario. Te permite realizar alguna acción para cada elemento emitido por elFlux
de origen sin modificar su contenido. En este caso, lo utilizas para registrar cada evento entrante.En el segundo fragmento de código, intentas consumir el flujo entrante con un
Subscriber
llamando asubscribe
en el flux de flujo entrante –tradesFlux
. Sin embargo, obtienes una excepción que indica que solo se permite un suscriptor de recepción de conexión. Esto significa que no puedes consumir elFlux
de flujo entrante con tu propioSubscriber
.Por lo tanto, el primer fragmento de código es la forma correcta de consumir el flujo entrante de
ReactorNettyWebSocketClient
utilizando la operación de efecto secundariodoOnNext()
. Esto te permite realizar alguna acción para cada elemento emitido por elFlux
de origen sin modificar su contenido, y no necesitas crear tu propioSubscriber
.¡Espero que esta respuesta te ayude! Hazme saber si tienes alguna otra pregunta.