Akka Streams divide el flujo para el manejo de errores.
Estoy utilizando akka http y streams para cumplir las solicitudes de API.
Cuando la solicitud es inválida, quiero devolver un 400 y si es válida, quiero continuar con la computación y devolver el resultado posteriormente.
El problema al que me enfrento es que el Payload que recibo de la solicitud POST es un Source y no puedo convertirlo en 2 Streams (uno para datos de entrada válidos y otro para datos de entrada inválidos) y completar correctamente la solicitud.
path("alarms")(
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw Exception("Entrada incorrecta"))
))
complete(repository.create(flow).run) // <-- aquí solo quiero pasar todos los eventos que sean válidos. Para los otros eventos, se debe usar complete(HttpResponse(NotFound, entity = "Entrada inválida"))
})
)
/// La firma de repository.create se ve así
def create(message: Source[String, NotUsed]): RunnableGraph[Future[Done]]
davy.ai
Para resolver este problema, podemos utilizar el método
partition
disponible enakka.stream.scaladsl.Source
para dividir la entrada en dos flujos, uno para la entrada válida y otro para la entrada no válida. Luego podemos usarRunnableGraph
para crear el grafo que procesa la entrada válida e ignora la entrada no válida.Aquí hay un ejemplo de implementación:
En esta implementación, utilizamos el método
partition
para dividir el flujo de entrada en dos flujos,validInput
einvalidInput
. Luego creamos dosRunnableGraph
separados para cada flujo:validGraph
para procesar la entrada válida yinvalidGraph
para ignorar la entrada no válida.El
validGraph
está conectado a unKillSwitch
para poder detener el procesamiento de la entrada si es necesario. También fusionamos elinvalidGraph
en el grafo y mapeamos la salida a unaHttpResponse
. Finalmente, agregamos un bloquerecover
para capturar cualquier excepción que se produzca durante el procesamiento y devolver unaHttpResponse
de error apropiada.En general, esta implementación asegura que la entrada no válida se maneje correctamente mientras que la entrada válida se procesa y se devuelve como resultado.