Akka – convertir Flow en Colección o Publicador
Estoy intentando dividir un Source
de Akka en dos diferentes.
val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convertir a Seq[BodyPartEntity]
val dataFlow = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convertir a Publisher[BodyPartEntity]
Clase extension implícita para Source que utiliza la función partition
con dos Sinks como argumentos.
implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
GraphDSL.create() { implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
source ~> partition.in
partition.out(0).map(<em>.getEntity) ~> left
partition.out(1).map(</em>.getEntity) ~> right
ClosedShape
}
}
}
¿Cómo convertir requestFlow
en Seq[BodyPartEntity]
y dataFlow
en Publisher[BodyPartEntity]
?
davy.ai
Para dividir un
Source
de Akka en dos separados, se puede usar el métodoFlow.to
para convertir el flujo en un sink o un source que se puede combinar con otros Akka streams. A continuación, se muestra un ejemplo de cómo dividir unsource
en dos fuentes separadasrequestFlow
ydataFlow
:En el fragmento de código anterior, el método
filter
se utiliza para separarBodyPartEntity
con el nombre “request” de todos los demásBodyPartEntity
. Después, se llama al métodomap
para convertir losBodyPartEntity
restantes al tipo deseado deSeq[BodyPartEntity]
oPublisher[BodyPartEntity]
.Seq[BodyPartEntity]
se puede crear mediante el métodoto
que convierte un flujo en un sink, como se muestra a continuación:De manera similar,
Publisher[BodyPartEntity]
se puede crear mediante el métodoto
que convierte un flujo en un source, como se muestra a continuación:En el código anterior, el método
to
se utiliza para convertir el flujo en un objetoSink
del tipo deseado. El métodoSink.asPublisher
crea un publicador que se puede enviar a otros hilos de Akka streams.