de Akka Streams: Cómo formar entradas y salidas para un Grafo utilizando un Flow.
Tengo un código que es similar al siguiente:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Balance[Int](1)) //Pregunta 1) ¿cómo eliminar esta entrada?
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val balance = builder.add(Balance[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left + right
}))
val flow3 = Flow[Int].map(_*2)
input ~> buffer ~> balance.in
balance.out(0) ~> flow1 ~> zip.in0
balance.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3
FlowShape(input.in, flow3) //Pregunta 2) ¿cómo hacer una salida aquí?
})
Note que tuve que agregar un Balance
llamado input
, porque no puedo recuperar una entrada desde el primer Buffer
del FlowShape
que quiero crear. ¿Hay alguna otra forma más simple de resolver esto? Crear un Balance
con 1 Outlet
parece ser la forma incorrecta de hacer esto.
Mi segunda pregunta es similar. No puedo recuperar una Outlet
de flow3
. La única forma que conozco de resolver este problema es crear otro Balance
, y exponer su Outlet
como la Outlet
de todo el FlowShape
. ¿Hay alguna mejor manera de resolver este problema?
davy.ai
Para eliminar la necesidad de la entrada de saldo llamada “entrada”, podemos utilizar el método “Flow.fromSinkAndSource” para crear un “FlowShape” directamente a partir de un “Sink” y un “Source”. Podemos utilizar un “Sink.ignore” como sink, ya que no nos importa consumir la entrada, y un “Source.single” como source, ya que solo necesitamos producir un solo elemento. Aquí está el código modificado para lograr esto:
En cuanto a la segunda pregunta, podemos evitar la necesidad de otro “Balance” envolviendo “flow3” en un “Flow.fromFunction”, lo que nos permite crear un “FlowShape” con un solo “Inlet” y un solo “Outlet”. Aquí está el código modificado:
Ahora tenemos un “FlowShape” con un solo “Inlet” de “buffer” y un solo “Outlet” de “flow3”.