es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Transmisión estructurada de Spark desde Kafka hacia Elastic Search.

Quiero escribir un trabajo de Spark Streaming desde Kafka hasta Elasticsearch. Aquí quiero detectar el esquema de forma dinámica mientras lo leo desde Kafka.

¿Me puedes ayudar a hacer eso?

Sé que esto se puede hacer en Spark Batch Processing a través de la siguiente línea.

val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema

Pero al ejecutar lo mismo a través de Spark Streaming Job, no podemos hacer lo anterior ya que el streaming sólo puede tener una acción.

Por favor, házmelo saber.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, es posible detectar el esquema dinámicamente mientras se lee datos de Kafka en Spark Streaming y almacenarlo en Elasticsearch.

    Puede lograrlo siguiendo los siguientes pasos:

    1. Cree una fuente de entrada de streaming de Kafka utilizando la API KafkaUtils.createDirectStream de Spark Streaming.
    2. Procese los datos del flujo de entrada utilizando el método foreachRDD para obtener el RDD de cada intervalo de lote.

    3. En cada RDD, obtenga el esquema de los datos leyendo el primer registro como una cadena JSON y utilizando el método spark.read.json para inferir el esquema dinámicamente.

    4. Convierta el RDD de cadenas JSON a DataFrame utilizando el método spark.read.json junto con el esquema inferido.

    5. Escriba el DataFrame en el índice de Elasticsearch utilizando la API esDataFrameWriter del conector Elasticsearch-Spark.

    Aquí está el código de muestra:

    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.elasticsearch.spark.streaming._
    
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> "myGroup",
      "auto.offset.reset" -> "latest"
    )
    
    val topics = Set("myTopic")
    
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    
    stream.foreachRDD { rdd =>
      val schema = spark.read.json(rdd.map(_.value).take(1)(0)).schema
      val df = spark.read.schema(schema).json(rdd.map(_.value))
      df.saveToEs("myIndex/myType")
    }
    
    streamingContext.start()
    streamingContext.awaitTermination()
    

    ¡Espero que esto ayude!

Comments are closed.