Tag: AKKA-STREAM
Estoy intentando dividir un Source de Akka en dos diferentes. val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convertir a Seq[BodyPartEntity] val dataFlow = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convertir a Publisher[BodyPartEntity] Clase extension implícita para Source que utiliza la función partition con dos Sinks como argumentos. implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) . . . Read more
Actualmente estoy trabajando en un proyecto simple llamado crypto_wallet. Utilizo gestión de estado (BLoC) y igualdad de valores (freezed) para crear operaciones CRUD en una base de datos (Firebase). En el método watch, uso el código StreamSubscription: @injectable class CoinWatcherBloc extends Bloc<CoinWatcherEvent, CoinWatcherState> { final ICoinRepository _repository; CoinWatcherBloc(this._repository, this._coinStreamSubscription) : . . . Read more
Quiero devolver un flujo basado en el valor/evento de otro flujo. Por ejemplo, si tengo 2 flujos, stream1 y stream2, quiero crear una función que devuelva como flujo stream2 o nulo, dependiendo del valor de stream1. ¿Cómo puedo hacerlo? Intenté hacer un mapeo de stream1 y, basándome en el evento, . . . Read more
Tengo un código que es similar al siguiente: object Test extends App { val SomeComplicatedFlow: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val input = builder.add(Balance[Int](1)) //Pregunta 1) ¿cómo eliminar esta entrada? val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure) val balance = builder.add(Balance[Int](2)) val flow1 = Flow[Int].map(_*2) val . . . Read more
Estoy intentando descargar un archivo grande de S3 y enviar sus datos a otro actor que está haciendo una solicitud http y luego persistir la respuesta. Quiero limitar el número de solicitudes enviadas por ese actor, por lo tanto necesito manejar una contrapresión. Intenté hacer algo como esto: El problema . . . Read more