es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Spark structured streaming lee mensajes de Kafka con esquemas variables.

Estoy intentando leer mensajes de un tema de Kafka de manera continua. Los mensajes en el tema tienen dos tipos de esquema.

  1. { “Request”: { } }
  2. { “Response”: { } }

Un mensaje tiene un esquema de solicitud y otro un esquema de respuesta. ¿Cómo puedo leer estos mensajes JSON en Spark, identificar si es un mensaje de solicitud o un mensaje de respuesta y luego tomar acciones en consecuencia? Necesito hacer esto en Scala.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para leer los mensajes de un tema de Kafka de manera continua e identificar el tipo de esquema de cada mensaje en Scala, puedes utilizar la API de streaming estructurado de Spark con Kafka como fuente de transmisión. A continuación, se detallan los pasos que puedes seguir:

    1. Crea una instancia de SparkSession e importa las bibliotecas necesarias:
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.streaming.{OutputMode, Trigger}
    import org.apache.spark.sql.functions.from_json
    import org.apache.spark.sql.functions.col
    
    val spark = SparkSession
      .builder
      .appName("KafkaStream")
      .getOrCreate()
    
    1. Define el esquema de los mensajes de Kafka:
    val schema = new StructType()
            .add("Request", new StructType().add("<algun json="" anidado="">", "<tipo de="" datos="">"))
            .add("Response", new StructType().add("<otro json="" anidado="">", "<tipo de="" datos="">"))
    
    1. Lee los mensajes de Kafka desde el tema utilizando el esquema y agrúpalos por el tipo de esquema:
    val kafkaStream = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", <servidores_kafka>)
            .option("subscribe", <tema_kafka>)
            .option("startingOffsets", "earliest")
            .load()
            .select(from_json(col("value").cast("string"), schema).alias("json"))
            .selectExpr("json.Request as Request", "json.Response as Response")
            .groupBy("Request", "Response")
    
    1. Realiza acciones según el tipo de esquema de cada mensaje utilizando foreachBatch():
    kafkaStream.writeStream
            .trigger(Trigger.ProcessingTime(5000))
            .outputMode(OutputMode.Append())
            .foreachBatch { (batchDF, batchId) =>
                // procesar mensajes de solicitud
                val requestMessages = batchDF.select("Request")
                // procesar mensajes de respuesta
                val responseMessages = batchDF.select("Response")
                // realizar acciones según corresponda
            }.start()
            .awaitTermination()
    

    Con el código anterior, deberías poder leer mensajes de un tema de Kafka de manera continua, identificar el tipo de esquema (Request o Response) y luego realizar acciones correspondientes en Scala con Spark.

Comments are closed.