Flink: No se pudo encontrar ninguna fábrica para el identificador ‘kafka’ que implemente ‘org.apache.flink.table.factories.CatalogFactory’ en el classpath.
Estoy intentando conectar Kafka a Flink y ejecutarlo a través de sql-client.sh
. Sin embargo, sin importar lo que haga con .yaml
y las bibliotecas, sigo recibiendo el siguiente error:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.
Catalog options are:
'type'='kafka'
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
at java.util.HashMap.forEach(HashMap.java:1289)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.
Available factory identifiers are:
generic_in_memory
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
... 11 more
Mi sql-conf es bastante simple (no incluí información confidencial como servidores de arranque):
catalogs:
- name: myKafka
type: kafka
Además, la carpeta library
incluye los siguientes archivos JAR:
flink-avro-confluent-registry-1.13.2.jar
flink-connector-kafka_2.12-1.13.2.jar
flink-sql-connector-kafka_2.12-1.13.2.jar
kafka-clients-2.0.0-cdh6.1.1.jar
La versión de Flink: 1.13.2
. La versión de Kafka: 2.0.0-cdh6.1.1
.
Solución (gracias a @Niko por señalarme en la dirección correcta):
Modifiqué el sql-conf.yaml
para usar un catálogo hive
y creé una tabla de Kafka dentro del SQL. Entonces, mi sql-conf.yaml
se ve así:
execution:
type: streaming
result-mode: table
planner: blink
current-database: default
current-catalog: myhive
catalogs:
- name: myhive
type: hive
hive-version: 2.1.1-cdh6.0.1
hive-conf-dir: /etc/hive/conf
deployment:
m: yarn-cluster
yqu: ABC_XYZ
Lo ejecuté y dentro de SQL-client.sh, creé una tabla de Kafka usando las conexiones necesarias.
davy.ai
El mensaje de error sugiere que hay un problema con la configuración del catálogo de Kafka o la presencia de las bibliotecas requeridas. Específicamente, parece que no hay factorías de catálogo disponibles para el identificador ‘kafka’.
Para resolver este problema, puede intentar lo siguiente:
Verifique si las bibliotecas requeridas están correctamente ubicadas en la carpeta ‘lib’ de su instalación de Flink.
Modifique el archivo ‘sql-conf.yaml’ para usar otro catálogo, como ‘hive’, y cree la tabla Kafka dentro del script SQL.
Aquí hay un ejemplo de cómo puede modificar el archivo ‘sql-conf.yaml’:
Luego, dentro del cliente SQL, cree la tabla Kafka de la siguiente manera:
Asegúrese de reemplazar ‘my_kafka_topic’ y ‘kafka-bootstrap-server:9092’ con los detalles reales de su tema y servidor de arranque de Kafka.
Con estas modificaciones, debería poder conectarse a Kafka y ejecutar consultas a través de ‘sql-client.sh’.