es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Akka Streams divide el flujo para el manejo de errores.

Estoy utilizando akka http y streams para cumplir las solicitudes de API.
Cuando la solicitud es inválida, quiero devolver un 400 y si es válida, quiero continuar con la computación y devolver el resultado posteriormente.
El problema al que me enfrento es que el Payload que recibo de la solicitud POST es un Source y no puedo convertirlo en 2 Streams (uno para datos de entrada válidos y otro para datos de entrada inválidos) y completar correctamente la solicitud.

path("alarms")(
  post(entity(asSourceOf[String]) { message =>
    val flow = message.via(Flow[String].map((it) =>
         Try(if valid(it) then it else throw Exception("Entrada incorrecta"))
    ))
    complete(repository.create(flow).run) // <-- aquí solo quiero pasar todos los eventos que sean válidos. Para los otros eventos, se debe usar complete(HttpResponse(NotFound, entity = "Entrada inválida"))
  })
)
/// La firma de repository.create se ve así
def create(message: Source[String, NotUsed]): RunnableGraph[Future[Done]]
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para resolver este problema, podemos utilizar el método partition disponible en akka.stream.scaladsl.Source para dividir la entrada en dos flujos, uno para la entrada válida y otro para la entrada no válida. Luego podemos usar RunnableGraph para crear el grafo que procesa la entrada válida e ignora la entrada no válida.

    Aquí hay un ejemplo de implementación:

        path("alarms")(
          post(
            entity(
              asSourceOf[String]
            ) { message =>
              val (validInput, invalidInput) =
                message.partition(valid(_))
              val validGraph =
                repository.create(validInput)
              val invalidGraph =
                invalidInput.toMat(Sink.ignore)(Keep.right)
              val resultGraph =
                validGraph
                  .viaMat(KillSwitches.single)(Keep.right)
                  .viaMat(KillSwitches.single)(Keep.left)
                  .mergeMat(invalidGraph)(Keep.right)
                  .map(_ => HttpResponse(StatusCodes.OK))
                  .recover {
                    case e: Exception =>
                      HttpResponse(
                        StatusCodes.BadRequest,
                        entity = e.getMessage
                      )
                  }
    
              complete(resultGraph.run())
            }
          )
        )
    

    En esta implementación, utilizamos el método partition para dividir el flujo de entrada en dos flujos, validInput e invalidInput. Luego creamos dos RunnableGraph separados para cada flujo: validGraph para procesar la entrada válida y invalidGraph para ignorar la entrada no válida.

    El validGraph está conectado a un KillSwitch para poder detener el procesamiento de la entrada si es necesario. También fusionamos el invalidGraph en el grafo y mapeamos la salida a una HttpResponse. Finalmente, agregamos un bloque recover para capturar cualquier excepción que se produzca durante el procesamiento y devolver una HttpResponse de error apropiada.

    En general, esta implementación asegura que la entrada no válida se maneje correctamente mientras que la entrada válida se procesa y se devuelve como resultado.

Comments are closed.