es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tag: APACHE-FLINK

Deserialize Protobuf kafka messages with Flink Deserializar mensajes de kafka en formato Protobuf con Flink

Estoy tratando de leer e imprimir mensajes Protobuf desde Kafka utilizando Apache Flink. Seguí la documentación oficial sin éxito: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third-party-serializers/ El código consumidor de Flink es: “` def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointStorage(s”$targetPath/checkpoints”) env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer]) val source = KafkaSource.builder[User] .setBootstrapServers(brokers) .setTopics(topic) .setGroupId(consumerGroupId) .setValueOnlyDeserializer(new . . . Read more

Objeto de configuración pasado por el método open de la clase AbstractRichFunction de Flink.

¿Cómo podríamos utilizar la instancia de configuración que se pasa en open() en AbstractRichFunction? He intentado agregar la siguiente lógica al main() del trabajo, pero me dice que la configuración no se puede modificar (es inmutable según se indica en la documentación de Java): var config = (Configuration) StreamExecutionEnvironment.getExecutionEnvironment().getConfiguration(); config.setString(/* . . . Read more

cómo configurar el cluster-id en Flink

Quiero utilizar un servicio en Flink con Standalone Kubernetes, pero no sé cómo configurar el identificador de clúster para cada clúster de Flink. ¿Alguien puede ayudar? Gracias. Enlace: alta disponibilidad con Standalone Kubernetes

Aplicación Kinesis – Excepción de tiempo de espera en Flink 1.11

Estoy trabajando con la aplicación Kinesis usando Flink 1.11, pero obtengo el siguiente error al iniciar mi aplicación: java.util.concurrent.TimeoutException: El heartbeat del TaskManager con id 421563c271e57acb4592f9d447d45b42 ha excedido el tiempo de espera. at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) . . . Read more