es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Akka – convertir Flow en Colección o Publicador

Estoy intentando dividir un Source de Akka en dos diferentes.

  val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convertir a Seq[BodyPartEntity]
  val dataFlow    = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convertir a Publisher[BodyPartEntity]

Clase extension implícita para Source que utiliza la función partition con dos Sinks como argumentos.

implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
  GraphDSL.create() { implicit builder =>
    import akka.stream.scaladsl.GraphDSL.Implicits._
    val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
    source ~> partition.in
    partition.out(0).map(<em>.getEntity) ~> left
    partition.out(1).map(</em>.getEntity) ~> right
    ClosedShape
  }
}
}

¿Cómo convertir requestFlow en Seq[BodyPartEntity] y dataFlow en Publisher[BodyPartEntity]?

Tags:  ,

Answer

  1. Avatar for davy.ai

    Para dividir un Source de Akka en dos separados, se puede usar el método Flow.to para convertir el flujo en un sink o un source que se puede combinar con otros Akka streams. A continuación, se muestra un ejemplo de cómo dividir un source en dos fuentes separadas requestFlow y dataFlow:

    val requestFlow: Source[BodyPartEntity, NotUsed] = source.filter(_.getName == "request").map(_.getEntity)
    val dataFlow: Source[BodyPartEntity, NotUsed] = source.filter(_.getName != "request").map(_.getEntity)
    

    En el fragmento de código anterior, el método filter se utiliza para separar BodyPartEntity con el nombre “request” de todos los demás BodyPartEntity. Después, se llama al método map para convertir los BodyPartEntity restantes al tipo deseado de Seq[BodyPartEntity] o Publisher[BodyPartEntity]. Seq[BodyPartEntity] se puede crear mediante el método to que convierte un flujo en un sink, como se muestra a continuación:

    val requestFlow: Source[BodyPartEntity, NotUsed] = source.filter(_.getName == "request").map(_.getEntity).to(Sink.seq)
    

    De manera similar, Publisher[BodyPartEntity] se puede crear mediante el método to que convierte un flujo en un source, como se muestra a continuación:

    val dataFlow: Source[BodyPartEntity, NotUsed] = source.filter(_.getName != "request").map(_.getEntity).to(Sink.asPublisher(fanout = false))
    

    En el código anterior, el método to se utiliza para convertir el flujo en un objeto Sink del tipo deseado. El método Sink.asPublisher crea un publicador que se puede enviar a otros hilos de Akka streams.

Comments are closed.