JsonEOFException: Fin inesperado de la entrada JSON al enviar un flujo a un punto final de RxJava2.
Estoy tratando de averiguar cómo llamar a un punto final que acepta un parámetro @Body
de Flowable<Integer>
, para suscribirse y procesar el flujo entrante (usando Micronaut y el marco RxJava). Tengo este controlador en Micronaut:
@Post(uri = "inner")
@ExecuteOn(TaskExecutors.IO)
HttpResponse inner(@Body Flowable<Integer> template){
template.subscribe(new Subscriber<Integer>() {
Subscription subscription
@Override
void onSubscribe(Subscription s) {
subscription = s
subscription.request(1l)
}
@Override
void onNext(Integer integer) {
log.info("inner:onNext called")
log.info("${integer.toString()} - ${System.currentTimeMillis()}")
subscription.request(1l)
}
@Override
void onError(Throwable t) {
log.info("inner:onError called")
t.printStackTrace()
}
@Override
void onComplete() {
log.info("inner:onComplete called")
}
})
ok()
}
Lo llamo desde otro punto final llamado /outer
utilizando el código:
rxStreamingHttpClient.exchangeStream(HttpRequest.POST("/inner", Flowable.fromIterable([1,2,3,4,5]))).subscribe()
Lo que espero es llamar al punto final /outer
, que a su vez llama al punto final /inner
y le proporciona el flujo de enteros para procesarlo. Esto parece funcionar, pero solo de vez en cuando: el punto final /inner
siempre se llama, pero a menudo se procesará parcialmente el flujo y luego se llamará a OnError con una excepción de com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input
. Sin embargo, a veces se completa correctamente. No puedo encontrar un ejemplo de cómo hacer esto, ¿alguien puede aconsejar qué estoy haciendo mal?
davy.ai
Parece que el problema está en la serialización/deserialización del parámetro
Flowable<integer>
cuando se pasa desde el punto final/outer
hacia el punto final/inner
. Parece que el métodorxStreamingHttpClient.exchangeStream()
utiliza Jackson para serializar elFlowable<integer>
como JSON, que luego es deserializado por Micronaut en el punto final/inner
.Para solucionar este problema, puedes intentar configurar la cabecera
Content-Type
de la solicitud como"application/octet-stream"
, que utiliza una codificación binaria en lugar de JSON. Esto se puede hacer agregando la siguiente línea antes de hacer la solicitud:En el lado del servidor, puedes cambiar la firma del método
inner()
para aceptar un parámetroPublisher<integer>
en lugar de un parámetroFlowable<integer>
:Esto debería permitir que el
Flowable
se deserialice correctamente, independientemente de la codificación utilizada.Además, es posible que desees considerar manejar el control de flujo en el método
inner()
para evitar desbordamientos de búfer y asegurar que el procesamiento se mantenga dentro de los límites de recursos aceptables. Esto se puede hacer modificando la implementación delSubscriber
para solicitar más elementos solo cuando esté listo para procesarlos:Este código solicita hasta 10 elementos a la vez, hasta el número de elementos que se han solicitado pero aún no se han procesado. Esto se puede ajustar según la velocidad a la que se estén procesando los elementos y la cantidad de memoria disponible.