es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

de Akka Streams: Cómo formar entradas y salidas para un Grafo utilizando un Flow.

Tengo un código que es similar al siguiente:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

  val input = builder.add(Balance[Int](1)) //Pregunta 1) ¿cómo eliminar esta entrada?
  val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
  val balance = builder.add(Balance[Int](2))

  val flow1 = Flow[Int].map(_*2)
  val flow2 = Flow[Int].map(_*2)

  val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
    left + right
  }))

  val flow3 = Flow[Int].map(_*2)

  input ~> buffer ~> balance.in
  balance.out(0) ~> flow1 ~> zip.in0
  balance.out(1) ~> flow2 ~> zip.in1
  zip.out ~> flow3

  FlowShape(input.in, flow3) //Pregunta 2) ¿cómo hacer una salida aquí?
})

Note que tuve que agregar un Balance llamado input, porque no puedo recuperar una entrada desde el primer Buffer del FlowShape que quiero crear. ¿Hay alguna otra forma más simple de resolver esto? Crear un Balance con 1 Outlet parece ser la forma incorrecta de hacer esto.

Mi segunda pregunta es similar. No puedo recuperar una Outlet de flow3. La única forma que conozco de resolver este problema es crear otro Balance, y exponer su Outlet como la Outlet de todo el FlowShape. ¿Hay alguna mejor manera de resolver este problema?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para eliminar la necesidad de la entrada de saldo llamada “entrada”, podemos utilizar el método “Flow.fromSinkAndSource” para crear un “FlowShape” directamente a partir de un “Sink” y un “Source”. Podemos utilizar un “Sink.ignore” como sink, ya que no nos importa consumir la entrada, y un “Source.single” como source, ya que solo necesitamos producir un solo elemento. Aquí está el código modificado para lograr esto:

    object Test extends App {
      val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
        Flow.fromGraph(GraphDSL.create() { implicit builder =>
          import GraphDSL.Implicits._
    
          val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
          val balance = builder.add(Balance[Int](2))
    
          val flow1 = Flow[Int].map(_*2)
          val flow2 = Flow[Int].map(_*2)
    
          val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
            left + right
          }))
    
          val flow3 = Flow[Int].map(_*2)
    
          Source.single(0) ~> buffer ~> balance.in
          balance.out(0) ~> flow1 ~> zip.in0
          balance.out(1) ~> flow2 ~> zip.in1
          zip.out ~> flow3 ~> Sink.ignore
    
          FlowShape(buffer.in, flow3.outlet)
        })
    }
    

    En cuanto a la segunda pregunta, podemos evitar la necesidad de otro “Balance” envolviendo “flow3” en un “Flow.fromFunction”, lo que nos permite crear un “FlowShape” con un solo “Inlet” y un solo “Outlet”. Aquí está el código modificado:

    object Test extends App {
      val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
        Flow.fromGraph(GraphDSL.create() { implicit builder =>
          import GraphDSL.Implicits._
    
          val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
          val balance = builder.add(Balance[Int](2))
    
          val flow1 = Flow[Int].map(_*2)
          val flow2 = Flow[Int].map(_*2)
    
          val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
            left + right
          }))
    
          val flow3 = Flow.fromFunction((i: Int) => i*2)
    
          Source.single(0) ~> buffer ~> balance.in
          balance.out(0) ~> flow1 ~> zip.in0
          balance.out(1) ~> flow2 ~> zip.in1
          zip.out ~> flow3 ~> Sink.ignore
    
          FlowShape(buffer.in, flow3.outlet)
        })
    }
    

    Ahora tenemos un “FlowShape” con un solo “Inlet” de “buffer” y un solo “Outlet” de “flow3”.

Comments are closed.