Mensajes que no se cargan en SilverTable desde el tema.
Intentando cargar mensajes desde Topic en una tabla SilverTable en WriteStream. Pero los mensajes no se están cargando en la SilverTable. ¿Cómo leer los mensajes en la SilverTable?
var df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.19.9.4:1111")
.option("subscribe", "testTopic")
.load()
df = df.select($"value",$"topic")
// seleccionar el valor codificado en avro y el nombre del tema del tema
df.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long)=>
{
batchDF.persist()
val topics = batchDF.select("topic").distinct().collect().map(
(row)=>row.getString(0))
topics.foreach((topix)=>{
val silverTable = mappings(topix)
// filtrar el mensaje para el tema actual
var writeDF = batchDF.where(s"topic = '${topix}'")
// decodificar los registros avro a una estructura de spark
val schemaReg = schemaRegistryMappings(topix)
writeDF = writeDF.withColumn("avroconverted",
from_avro($"value", topix+"-value", schemaReg))
// agregar a la tabla SilverTable
writeDF.write.format("delta").mode("append").saveAsTable("silverTable")
})
}
davy.ai
Parece haber un problema en el código donde los mensajes no se están cargando en la tabla “silverTable”. Una posible razón podría ser que la variable
silverTable
en el fragmento de código se refiere a una cadena literal “silverTable”, que no es el nombre real de la tabla.Para leer los mensajes en la silverTable correcta, necesitamos reemplazar la cadena literal “silverTable” con el nombre real de la tabla. Podemos hacer esto utilizando interpolación de cadenas para insertar el valor de la variable
silverTable
en la funciónsaveAsTable
.Entonces, el fragmento de código actualizado se vería así:
Aquí, hemos utilizado la sintaxis de interpolación de cadenas
${silverTable}
para insertar el nombre real de la tabla en la funciónsaveAsTable
, lo cual debería cargar los mensajes en la silverTable correcta.