es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Flink: No se pudo encontrar ninguna fábrica para el identificador ‘kafka’ que implemente ‘org.apache.flink.table.factories.CatalogFactory’ en el classpath.

Estoy intentando conectar Kafka a Flink y ejecutarlo a través de sql-client.sh. Sin embargo, sin importar lo que haga con .yaml y las bibliotecas, sigo recibiendo el siguiente error:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.

Catalog options are:
'type'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
        at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
        at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
        ... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.

Available factory identifiers are:

generic_in_memory
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
        at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
        ... 11 more

Mi sql-conf es bastante simple (no incluí información confidencial como servidores de arranque):

catalogs:
 - name: myKafka
   type: kafka

Además, la carpeta library incluye los siguientes archivos JAR:

  • flink-avro-confluent-registry-1.13.2.jar
  • flink-connector-kafka_2.12-1.13.2.jar
  • flink-sql-connector-kafka_2.12-1.13.2.jar
  • kafka-clients-2.0.0-cdh6.1.1.jar

La versión de Flink: 1.13.2. La versión de Kafka: 2.0.0-cdh6.1.1.

Solución (gracias a @Niko por señalarme en la dirección correcta):
Modifiqué el sql-conf.yaml para usar un catálogo hive y creé una tabla de Kafka dentro del SQL. Entonces, mi sql-conf.yaml se ve así:

execution:
  type: streaming
  result-mode: table
  planner: blink
  current-database: default
  current-catalog: myhive

catalogs:
  - name: myhive
    type: hive
    hive-version: 2.1.1-cdh6.0.1
    hive-conf-dir: /etc/hive/conf

deployment:
  m: yarn-cluster
  yqu: ABC_XYZ

Lo ejecuté y dentro de SQL-client.sh, creé una tabla de Kafka usando las conexiones necesarias.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    El mensaje de error sugiere que hay un problema con la configuración del catálogo de Kafka o la presencia de las bibliotecas requeridas. Específicamente, parece que no hay factorías de catálogo disponibles para el identificador ‘kafka’.

    Para resolver este problema, puede intentar lo siguiente:

    1. Verifique las versiones de Flink y Kafka: asegúrese de que sean compatibles y verifique si se necesitan dependencias adicionales para la combinación de versiones que está utilizando.
    2. Verifique si las bibliotecas requeridas están correctamente ubicadas en la carpeta ‘lib’ de su instalación de Flink.

    3. Modifique el archivo ‘sql-conf.yaml’ para usar otro catálogo, como ‘hive’, y cree la tabla Kafka dentro del script SQL.

    Aquí hay un ejemplo de cómo puede modificar el archivo ‘sql-conf.yaml’:

    execution:
      type: streaming
      result-mode: table
      planner: blink
      current-database: default
      current-catalog: myhive
    
    catalogs:
      - name: myhive
        type: hive
        hive-version: 2.1.1-cdh6.0.1
        hive-conf-dir: /etc/hive/conf
    
    deployment:
      m: yarn-cluster
      yqu: ABC_XYZ
    

    Luego, dentro del cliente SQL, cree la tabla Kafka de la siguiente manera:

    CREATE TABLE kafka_table (
      field1 STRING,
      field2 INT,
      -- agregue más campos según sea necesario
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'my_kafka_topic',
      'properties.bootstrap.servers' = 'kafka-bootstrap-server:9092',
      'properties.group.id' = 'flink-consumer-group',
      'format' = 'json'
    );
    

    Asegúrese de reemplazar ‘my_kafka_topic’ y ‘kafka-bootstrap-server:9092’ con los detalles reales de su tema y servidor de arranque de Kafka.

    Con estas modificaciones, debería poder conectarse a Kafka y ejecutar consultas a través de ‘sql-client.sh’.

Comments are closed.