Tag: APACHE-FLINK
Estoy tratando de leer e imprimir mensajes Protobuf desde Kafka utilizando Apache Flink. Seguí la documentación oficial sin éxito: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third-party-serializers/ El código consumidor de Flink es: “` def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointStorage(s”$targetPath/checkpoints”) env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer]) val source = KafkaSource.builder[User] .setBootstrapServers(brokers) .setTopics(topic) .setGroupId(consumerGroupId) .setValueOnlyDeserializer(new . . . Read more
¿Cómo podríamos utilizar la instancia de configuración que se pasa en open() en AbstractRichFunction? He intentado agregar la siguiente lógica al main() del trabajo, pero me dice que la configuración no se puede modificar (es inmutable según se indica en la documentación de Java): var config = (Configuration) StreamExecutionEnvironment.getExecutionEnvironment().getConfiguration(); config.setString(/* . . . Read more
Tenemos un requisito para reemplazar la interfaz de usuario de la consola de Flink y habilitar todas las funcionalidades de la consola web de Flink utilizando utilidades de línea de comandos. Para algunas de las funcionalidades como iniciar un trabajo o crear puntos de guardado, estamos utilizando la CLI de . . . Read more
Quiero utilizar un servicio en Flink con Standalone Kubernetes, pero no sé cómo configurar el identificador de clúster para cada clúster de Flink. ¿Alguien puede ayudar? Gracias. Enlace: alta disponibilidad con Standalone Kubernetes
Estoy trabajando con la aplicación Kinesis usando Flink 1.11, pero obtengo el siguiente error al iniciar mi aplicación: java.util.concurrent.TimeoutException: El heartbeat del TaskManager con id 421563c271e57acb4592f9d447d45b42 ha excedido el tiempo de espera. at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) . . . Read more