es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Escribe Spark Stream a tabla Phoenix.

Estoy tratando de descubrir cómo escribir un Spark Stream en una tabla de Phoenix de la manera menos complicada.

Hasta ahora solo he encontrado esta solución: kafka-to-phoenix, que requiere alguna ingeniería ad-hoc profunda (a mis ojos de principiante).
Puedo adaptar la solución vinculada a mis necesidades sin mayores problemas, pero me pregunto si hay mejores soluciones disponibles.

Básicamente, llego a este punto en mi código:

pre>streamingData
.writeStream
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime(triggerInterval, TimeUnit.MILLISECONDS))
.foreach(
howToInsertToPhoenix?()
)

.start()
.awaitTermination()

Tengo una solución en ejecución que guarda los datos en HBase, pero realmente no es menos complicada que la propuesta de Phoenix, por lo que no es fácil / rápidamente generalizable para casos de uso similares.

Muchas gracias de antemano 🙂

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Un enfoque para escribir Spark Stream en una tabla de Phoenix es utilizando la biblioteca phoenix-spark. Aquí están los pasos para lograr esto:

    1. Agrega la biblioteca a tu proyecto de Spark:
    libraryDependencies += "org.apache.phoenix" % "phoenix-spark" % "<versión>"
    
    1. Configura la conexión a Phoenix:
    val zkUrl = "localhost:2181"
    val tableName = "mi_tabla"
    val props = new Properties()
    props.setProperty("phoenix.schema.isNamespaceMappingEnabled", "true")
    props.setProperty("phoenix.schema.mapSystemTablesToNamespace", "true")
    props.setProperty("phoenix.mapReduce.outputTableName", tableName)
    props.setProperty("zookeeper.znode.parent", "/hbase")
    props.setProperty("hbase.zookeeper.quorum", zkUrl)
    
    1. Utiliza el objeto PhoenixSpark para guardar los datos en Phoenix:
    streamingData
        .writeStream
        .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
            PhoenixSpark.saveToPhoenix(
              batchDF,
              tableName,
              props,
              zkUrl
            )
        }
        .start()
        .awaitTermination()
    

    Este enfoque aprovecha la funcionalidad proporcionada por la biblioteca phoenix-spark y simplifica el código al evitar la necesidad de ingeniería ad-hoc. Sin embargo, siempre se recomienda probar a fondo cualquier solución antes de utilizarla en producción.</versión>

Comments are closed.