es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cuál es la forma correcta de consumir el flujo entrante de ReactorNettyWebSocketClient?

Intento aprender la programación reactiva con Spring Reactor Webflux. Particularmente en el siguiente ejemplo, trato de entender cómo componer correctamente el flujo de procesamiento para consumir el flujo entrante de una conexión WebSocket utilizando ReactorNettyWebSocketClient.

El siguiente fragmento de código funciona correctamente:

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create(“wss://stream.binance.com:9443/ws/btcusdt@trade”),
session -> {
Flux tradesFlux = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(event -> log.info(event));

            return tradesFlux.then();
        }

).subscribe();

En el fragmento de código anterior, uso doOnNext() en el flujo entrante Flux – tradesFlux para consumir cada evento que llega. Funciona bien con el fragmento de código anterior, pero según tengo entendido, doOnNext() es un operador de efecto secundario, así que intenté hacer lo siguiente:

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create(“wss://stream.binance.com:9443/ws/btcusdt@trade”),
session -> {
Flux tradesFlux = session.receive()
.map(WebSocketMessage::getPayloadAsText);

            tradesFlux.subscribe(new Subscriber<string>() {
                @Override
                public void onSubscribe(Subscription s) {
                }

                @Override
                public void onNext(String s) {
                    log.info("replicar operación de trading de Binance {}", s);
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            });

            return tradesFlux.then();
        }

).subscribe();

En el segundo fragmento de código, intenté consumir el flujo entrante con un Subscriber llamando a subscribe en el flujo entrante – tradesFlux. Pero con este fragmento de código, obtuve la siguiente excepción:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Solo se permite un único suscriptor de recepción de conexión.
Causado por: java.lang.IllegalStateException: Solo se permite un único suscriptor de recepción de conexión.

De alguna manera no se puede consumir el flujo entrante Flux usando su propio Subscriber, así que la pregunta que quiero hacer es: ¿es el primer fragmento de código la forma adecuada de consumir el flujo entrante de ReactorNettyWebSocketClient utilizando el operador de efecto secundario doOnNext(), o estoy omitiendo algo aquí?

¡Muchas gracias!
Tung

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Como experto en TI especializado en Java, Spring, Programación Reactiva, Spring Webflux y Reactor-Netty, permíteme ayudarte a responder tu pregunta sobre cómo consumir el flujo entrante de una conexión WebSocket usando ReactorNettyWebSocketClient con Spring Reactor Webflux.

    En tu primer fragmento de código, utilizas doOnNext() en el Flux de flujo entrante – tradesFlux para consumir cada evento que llega. Esto funciona bien, ya que doOnNext() es una operación de efecto secundario. Te permite realizar alguna acción para cada elemento emitido por el Flux de origen sin modificar su contenido. En este caso, lo utilizas para registrar cada evento entrante.

    En el segundo fragmento de código, intentas consumir el flujo entrante con un Subscriber llamando a subscribe en el flux de flujo entrante – tradesFlux. Sin embargo, obtienes una excepción que indica que solo se permite un suscriptor de recepción de conexión. Esto significa que no puedes consumir el Flux de flujo entrante con tu propio Subscriber.

    Por lo tanto, el primer fragmento de código es la forma correcta de consumir el flujo entrante de ReactorNettyWebSocketClient utilizando la operación de efecto secundario doOnNext(). Esto te permite realizar alguna acción para cada elemento emitido por el Flux de origen sin modificar su contenido, y no necesitas crear tu propio Subscriber.

    ¡Espero que esta respuesta te ayude! Hazme saber si tienes alguna otra pregunta.

Comments are closed.