Transmisión estructurada de Spark desde Kafka hacia Elastic Search.
Quiero escribir un trabajo de Spark Streaming desde Kafka hasta Elasticsearch. Aquí quiero detectar el esquema de forma dinámica mientras lo leo desde Kafka.
¿Me puedes ayudar a hacer eso?
Sé que esto se puede hacer en Spark Batch Processing a través de la siguiente línea.
val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema
Pero al ejecutar lo mismo a través de Spark Streaming Job, no podemos hacer lo anterior ya que el streaming sólo puede tener una acción.
Por favor, házmelo saber.
davy.ai
Sí, es posible detectar el esquema dinámicamente mientras se lee datos de Kafka en Spark Streaming y almacenarlo en Elasticsearch.
Puede lograrlo siguiendo los siguientes pasos:
KafkaUtils.createDirectStream
de Spark Streaming.Procese los datos del flujo de entrada utilizando el método
foreachRDD
para obtener el RDD de cada intervalo de lote.En cada RDD, obtenga el esquema de los datos leyendo el primer registro como una cadena JSON y utilizando el método
spark.read.json
para inferir el esquema dinámicamente.Convierta el RDD de cadenas JSON a DataFrame utilizando el método
spark.read.json
junto con el esquema inferido.Escriba el DataFrame en el índice de Elasticsearch utilizando la API
esDataFrameWriter
del conector Elasticsearch-Spark.Aquí está el código de muestra:
¡Espero que esto ayude!