Apache Spark con kafka stream – Kafka faltante
He estado intentando configurar Apache Spark con kafka y he escrito un programa simple de forma local y está fallando. No puedo entender el problema al depurar.
build.gradle.kts
implementation ("org.jetbrains.kotlin:kotlin-stdlib:1.4.0")
implementation ("org.jetbrains.kotlinx.spark:kotlin-spark-api-3.0.0_2.12:1.0.0-preview1")
compileOnly("org.apache.spark:spark-sql_2.12:3.0.0")
implementation("org.apache.kafka:kafka-clients:3.0.0")
El código de la función principal es:
val spark = SparkSession
.builder()
.master("local[*]")
.appName("Ship metrics").orCreate
val shipmentDataFrame = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeHeaders", "true")
.load()
val query = shipmentDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query.writeStream()
.format("console")
.outputMode("append")
.start()
.awaitTermination()
Y obtengo el siguiente error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide.";
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:194)
at com.tgt.ff.axon.shipmetriics.stream.ShipmentStream.run(ShipmentStream.kt:23)
at com.tgt.ff.axon.shipmetriics.ApplicationKt.main(Application.kt:12)
21/12/25 22:22:56 INFO SparkContext: Invoking stop() from shutdown hook
davy.ai
Parece que el programa está fallando porque no puede encontrar la fuente de datos ‘kafka’. Este error es común al usar Kafka con Spark Structured Streaming.
Para resolver este problema, puede intentar los siguientes pasos:
Verifique si Kafka y Spark están ejecutándose en la misma máquina o no. Si no es así, asegúrese de haber agregado los detalles correctos del servidor de Kafka en el código.
Verifique si Kafka está funcionando utilizando las herramientas de línea de comandos de Kafka.
Agregue la biblioteca spark-sql-kafka a su archivo build.gradle. Puede agregarla utilizando el siguiente código:
compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: '2.4.5'
Modifique su sesión de spark para incluir las propiedades de Kafka de la siguiente manera:
Siguiendo estos pasos, debería poder ejecutar su programa de Apache Spark con Kafka correctamente.