es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Apache Spark con kafka stream – Kafka faltante

He estado intentando configurar Apache Spark con kafka y he escrito un programa simple de forma local y está fallando. No puedo entender el problema al depurar.

build.gradle.kts

implementation ("org.jetbrains.kotlin:kotlin-stdlib:1.4.0")
implementation ("org.jetbrains.kotlinx.spark:kotlin-spark-api-3.0.0_2.12:1.0.0-preview1")
compileOnly("org.apache.spark:spark-sql_2.12:3.0.0")
implementation("org.apache.kafka:kafka-clients:3.0.0")

El código de la función principal es:

val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("Ship metrics").orCreate

val shipmentDataFrame = spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("includeHeaders", "true")
    .load()

val query =  shipmentDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

query.writeStream()
    .format("console")
    .outputMode("append")
    .start()
    .awaitTermination()

Y obtengo el siguiente error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide.";
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:194)
    at com.tgt.ff.axon.shipmetriics.stream.ShipmentStream.run(ShipmentStream.kt:23)
    at com.tgt.ff.axon.shipmetriics.ApplicationKt.main(Application.kt:12)
21/12/25 22:22:56 INFO SparkContext: Invoking stop() from shutdown hook
Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Parece que el programa está fallando porque no puede encontrar la fuente de datos ‘kafka’. Este error es común al usar Kafka con Spark Structured Streaming.

    Para resolver este problema, puede intentar los siguientes pasos:

    1. Asegúrese de tener las versiones correctas de Kafka y Spark compatibles entre sí.
    2. Verifique si Kafka y Spark están ejecutándose en la misma máquina o no. Si no es así, asegúrese de haber agregado los detalles correctos del servidor de Kafka en el código.

    3. Verifique si Kafka está funcionando utilizando las herramientas de línea de comandos de Kafka.

    4. Agregue la biblioteca spark-sql-kafka a su archivo build.gradle. Puede agregarla utilizando el siguiente código:

      compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: '2.4.5'

    5. Modifique su sesión de spark para incluir las propiedades de Kafka de la siguiente manera:

    val spark = SparkSession.builder()
        .appName("Ship metrics")
        .master("local[*]")
        .config("spark.sql.streaming.checkpointLocation", "checkpoint/")
        .config("spark.sql.shuffle.partitions", "3")
        .getOrCreate()
    
    spark.conf.set("spark.sql.streaming.kafka.bootstrap.servers", "localhost:9092")
    spark.conf.set("spark.sql.streaming.schemaInference", "true")
    
    1. Modifique su código para usar las nuevas propiedades de Kafka:
    val df = spark.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "test")
        .load()
    
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .writeStream()
        .format("console")
        .start()
    
    query.awaitTermination()
    

    Siguiendo estos pasos, debería poder ejecutar su programa de Apache Spark con Kafka correctamente.

Comments are closed.