es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo manejar la presión de retroceso al transmitir archivos desde S3 con la interacción de actores.

Estoy intentando descargar un archivo grande de S3 y enviar sus datos a otro actor que está haciendo una solicitud http y luego persistir la respuesta. Quiero limitar el número de solicitudes enviadas por ese actor, por lo tanto necesito manejar una contrapresión. Intenté hacer algo como esto:

El problema es que veo que solo lee 30 líneas del archivo como límite establecido para la paralelización. No estoy seguro de que esta sea la forma correcta de lograr lo que busco.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para lograr retroalimentación de presión en este escenario, puede utilizar el operador throttle de Akka Streams. Esto reducirá la velocidad del flujo entrante de datos desde S3, disminuyéndolo para permitir que las fases del flujo posterior puedan mantenerse a la par.

    Aquí hay un ejemplo de cómo usar throttle en esta situación:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import akka.util.ByteString
    import scala.concurrent.duration._
    
    implicit val sys: ActorSystem = ActorSystem("mi-sistema")
    implicit val mat: ActorMaterializer = ActorMaterializer()
    import sys.dispatcher
    
    case class Buzz(request: String, response: String)
    
    // Defina sus actores y su carpeta / clave aquí
    val persistActor = ???
    val httpClientActor = ???
    val bckt = ???
    val bcktKey = ???
    
    val requestTimeout = Timeout(10.seconds)
    val httpClientFlow = Http().cachedHostConnectionPoolHttps[String]("https://example.com")
    
    S3.download(bckt, bcktKey)
      .map {
        case Some((file, _)) => file
          .via(CsvParsing.lineScanner())
          .map(_.map(_.utf8String)).drop(1)//eliminar los encabezados
          .map(p => Foo(p.head, p(1)))
          .throttle(30, 1.second) // Limitar a 30 elementos por segundo
          .mapAsync(1) { foo =>
            (httpClientFlow.ask[(HttpResponse, String)](foo.request)(requestTimeout, sender = None, httpClientActor)).map {
              case (HttpResponse(status, _, entity, _), foo.request) =>
                entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String) map { response =>
                  Buzz(foo.request, response)
                }
              case resp =>
                throw new RuntimeException(s"Se recibió una respuesta inesperada de $resp")
            }.flatten
          }
          .mapAsync(1){ buzz =>
            (persistActor ? buzz).mapTo[Done]
          }
          .runWith(Sink.ignore)
        case None => throw new java.io.IOException(s"$bckt/$bcktKey no se pudo descargar de S3")
      }
    

    Este código limitará el procesamiento de elementos a 30 por segundo. Puede ajustar los valores a su nivel deseado de retroalimentación de presión. Al configurar adecuadamente los valores de limitación, puede asegurarse de que el actor del cliente http no se abrume, mientras mantiene una buena tasa de procesamiento de datos.

Comments are closed.