es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Mensajes que no se cargan en SilverTable desde el tema.

Intentando cargar mensajes desde Topic en una tabla SilverTable en WriteStream. Pero los mensajes no se están cargando en la SilverTable. ¿Cómo leer los mensajes en la SilverTable?

var df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "10.19.9.4:1111")
  .option("subscribe", "testTopic")
  .load()
df = df.select($"value",$"topic")

// seleccionar el valor codificado en avro y el nombre del tema del tema
df.writeStream
  .foreachBatch( (batchDF: DataFrame, batchId: Long)=>
    {
      batchDF.persist()
      val topics = batchDF.select("topic").distinct().collect().map(
         (row)=>row.getString(0))
      topics.foreach((topix)=>{
        val silverTable = mappings(topix)
        // filtrar el mensaje para el tema actual
        var writeDF = batchDF.where(s"topic = '${topix}'")
        // decodificar los registros avro a una estructura de spark
        val schemaReg = schemaRegistryMappings(topix)
        writeDF = writeDF.withColumn("avroconverted",
           from_avro($"value", topix+"-value", schemaReg))
        // agregar a la tabla SilverTable
        writeDF.write.format("delta").mode("append").saveAsTable("silverTable")
      })
    }
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece haber un problema en el código donde los mensajes no se están cargando en la tabla “silverTable”. Una posible razón podría ser que la variable silverTable en el fragmento de código se refiere a una cadena literal “silverTable”, que no es el nombre real de la tabla.

    Para leer los mensajes en la silverTable correcta, necesitamos reemplazar la cadena literal “silverTable” con el nombre real de la tabla. Podemos hacer esto utilizando interpolación de cadenas para insertar el valor de la variable silverTable en la función saveAsTable.

    Entonces, el fragmento de código actualizado se vería así:

    var df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "10.19.9.4:1111")
      .option("subscribe", "testTopic")
      .load()
    df = df.select($"value",$"topic")
    
    // seleccionar el valor codificado en avro y el nombre del tema del tema
    df.writeStream
      .foreachBatch( (batchDF: DataFrame, batchId: Long)=>
        {
          batchDF.persist()
          val topics = batchDF.select("topic").distinct().collect().map(
             (row)=>row.getString(0))
          topics.foreach((topix)=>{
            val silverTable = mappings(topix)
            // filtrar los mensajes para el tema actual
            var writeDF = batchDF.where(s"topic = '${topix}'")
            // decodificar los registros avro a una estructura spark
            val schemaReg = schemaRegistryMappings(topix)
            writeDF = writeDF.withColumn("avroconverted",
               from_avro($"value", topix+"-value", schemaReg))
            // apendizar a la silverTable
            writeDF.write.format("delta").mode("append").saveAsTable(silverTable)
          })
        }
    

    Aquí, hemos utilizado la sintaxis de interpolación de cadenas ${silverTable} para insertar el nombre real de la tabla en la función saveAsTable, lo cual debería cargar los mensajes en la silverTable correcta.

Comments are closed.