es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Spark.read o spark.sql son transformaciones perezosas?

En Spark, si los datos de origen han cambiado entre dos llamadas de acción, ¿por qué sigo obteniendo la salida anterior en lugar de la más reciente? A través de DAG, todas las operaciones se ejecutarán, incluida la operación de lectura, una vez que se llame a la acción. ¿No es así?

Por ejemplo:
df = spark.sql ("select * from dummy.table1")

Leyendo desde una tabla de Spark que tiene dos registros en un dataframe.

df.count()

Da como resultado un recuento de 2 registros.

Ahora, se inserta un registro en la tabla y se llama a la acción sin volver a ejecutar el comando1.

df.count()

Sigue dando como resultado un recuento de 2 registros.

Esperaba que Spark ejecutara la operación de lectura nuevamente y recuperara los 3 registros totales en el dataframe.

¿Dónde está mi comprensión equivocada?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Tu entendimiento es parcialmente correcto, pero existen ciertos mecanismos en Spark que pueden llevar al comportamiento que has observado.

    Spark utiliza la evaluación perezosa para optimizar el procesamiento de datos. Esto significa que las transformaciones de datos (como filtrado, ordenamiento, etc.) no se ejecutan inmediatamente al llamarlas, sino que se guardan como parte de un grafo acíclico dirigido (DAG). Solo cuando se llama a una operación de acción (como contar, recolectar, etc.), Spark ejecutará todo el DAG y producirá la salida.

    Ahora, en cuanto a tu ejemplo específico, cuando inicialmente lees el DataFrame de la tabla de Spark, se guarda en memoria como una colección de 2 registros que se refleja en la salida de df.count(). Sin embargo, después de insertar un nuevo registro en la tabla, el DataFrame en memoria no se actualiza automáticamente ya que Spark no monitorea continuamente la fuente de datos. Por lo tanto, cuando llamas a df.count() de nuevo, Spark simplemente ejecuta el DAG formado durante la operación de lectura inicial, el cual todavía tiene los datos viejos y, por lo tanto, devuelve la salida como 2.

    Si deseas ver los datos actualizados, necesitas leer la tabla de Spark de nuevo antes de llamar a la operación de contar volviendo a ejecutar el comando de lectura df = spark.sql("select * from dummy.table1"). Como alternativa, puedes usar el comando refreshTable para actualizar los metadatos de la tabla lo cual forzará a Spark a leer la tabla de nuevo en la próxima operación – spark.catalog.refreshTable("dummy.table1").

Comments are closed.