es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

JsonEOFException: Fin inesperado de la entrada JSON al enviar un flujo a un punto final de RxJava2.

Estoy tratando de averiguar cómo llamar a un punto final que acepta un parámetro @Body de Flowable<Integer>, para suscribirse y procesar el flujo entrante (usando Micronaut y el marco RxJava). Tengo este controlador en Micronaut:

@Post(uri = "inner")
@ExecuteOn(TaskExecutors.IO)
HttpResponse inner(@Body Flowable<Integer> template){

    template.subscribe(new Subscriber<Integer>() {
        Subscription subscription

        @Override
        void onSubscribe(Subscription s) {
            subscription = s
            subscription.request(1l)
        }

        @Override
        void onNext(Integer integer) {
            log.info("inner:onNext called")
            log.info("${integer.toString()} - ${System.currentTimeMillis()}")
            subscription.request(1l)
        }

        @Override
        void onError(Throwable t) {
            log.info("inner:onError called")
            t.printStackTrace()
        }

        @Override
        void onComplete() {
            log.info("inner:onComplete called")
        }
    })
    ok()
}

Lo llamo desde otro punto final llamado /outer utilizando el código:

rxStreamingHttpClient.exchangeStream(HttpRequest.POST("/inner", Flowable.fromIterable([1,2,3,4,5]))).subscribe()

Lo que espero es llamar al punto final /outer, que a su vez llama al punto final /inner y le proporciona el flujo de enteros para procesarlo. Esto parece funcionar, pero solo de vez en cuando: el punto final /inner siempre se llama, pero a menudo se procesará parcialmente el flujo y luego se llamará a OnError con una excepción de com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input. Sin embargo, a veces se completa correctamente. No puedo encontrar un ejemplo de cómo hacer esto, ¿alguien puede aconsejar qué estoy haciendo mal?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema está en la serialización/deserialización del parámetro Flowable<integer> cuando se pasa desde el punto final /outer hacia el punto final /inner. Parece que el método rxStreamingHttpClient.exchangeStream() utiliza Jackson para serializar el Flowable<integer> como JSON, que luego es deserializado por Micronaut en el punto final /inner.

    Para solucionar este problema, puedes intentar configurar la cabecera Content-Type de la solicitud como "application/octet-stream", que utiliza una codificación binaria en lugar de JSON. Esto se puede hacer agregando la siguiente línea antes de hacer la solicitud:

    HttpRequest.POST("/inner", Flowable.fromIterable([1,2,3,4,5])).contentType(MediaType.APPLICATION_OCTET_STREAM_TYPE)
    

    En el lado del servidor, puedes cambiar la firma del método inner() para aceptar un parámetro Publisher<integer> en lugar de un parámetro Flowable<integer>:

    @Post(uri = "inner")
    @ExecuteOn(TaskExecutors.IO)
    HttpResponse inner(@Body Publisher<integer> template) {
        ...
    }
    

    Esto debería permitir que el Flowable se deserialice correctamente, independientemente de la codificación utilizada.

    Además, es posible que desees considerar manejar el control de flujo en el método inner() para evitar desbordamientos de búfer y asegurar que el procesamiento se mantenga dentro de los límites de recursos aceptables. Esto se puede hacer modificando la implementación del Subscriber para solicitar más elementos solo cuando esté listo para procesarlos:

    template.subscribe(new Subscriber<integer>() {
        Subscription subscription;
    
        @Override
        void onSubscribe(Subscription s) {
            subscription = s;
            subscription.request(1L);
        }
    
        @Override
        void onNext(Integer integer) {
            log.info("inner:onNext llamado");
            log.info("${integer.toString()} - ${System.currentTimeMillis()}");
    
            // Utiliza una forma simple de control de flujo para
            // limitar la cantidad de elementos solicitados a la vez
            if (subscription != null) {
                long requested = subscription.requested();
                if (requested > 0) {
                    subscription.request(Math.min(requested, 10));
                }
            }
        }
    
        @Override
        void onError(Throwable t) {
            log.info("inner:onError llamado");
            t.printStackTrace();
        }
    
        @Override
        void onComplete() {
            log.info("inner:onComplete llamado");
        }
    });
    

    Este código solicita hasta 10 elementos a la vez, hasta el número de elementos que se han solicitado pero aún no se han procesado. Esto se puede ajustar según la velocidad a la que se estén procesando los elementos y la cantidad de memoria disponible.

Comments are closed.