es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

PySpark: ¿Cómo leer de vuelta una tabla particionada escrita en un Bucket de S3?

Estoy tratando de trabajar con bucketing en PySpark, usando estos ejemplos generales:

https://gist.github.com/luminousmen/8dffa01a02bb58946b1299a621e44897

https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53

Escribo la tabla con bucketing en S3 de la siguiente manera:

spark = SparkSession.builder.appName(“bucketing test”).enableHiveSupport().config(
“spark.sql.sources.bucketing.enabled”, “true”).getOrCreate()
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)

Crear el DF

df = spark.range(1, 16000, 1, 16).select(
F.col(“id”).alias(“key”), F.rand(12).alias(“value”)
)

Escribir el DF al disco

df.write.bucketBy(8, “key”).sortBy(“value”).saveAsTable(
“bucketed_table”, format=”parquet”, mode=”overwrite”, path=”s3a://my/s3/path”
)

Luego, intento leerlo y probar la unión con bucketing.

df_bucketed = spark.sql(“””
CREATE EXTERNAL TABLE bucketed_table (
key int,
value float
) STORED AS PARQUET
CLUSTERED BY (key) SORTED BY (value) INTO 8 BUCKETS
LOCATION ‘s3a://my/s3/path’
“””)

Imprimir el describe extendido y confirmar que el bucketing funciona

logging.info(“Extended Describe: %s” % spark.sql(“DESCRIBE EXTENDED bucketed_table”)._jdf.showString(100, 40, False))
logging.info(“Columnas de la tabla bucketed: %s” % df_bucketed.columns)

Lo que conduce a la salida:

¿Por qué el DF resultante se lee sin columnas? ¿Hay una mejor manera de leer un DF con bucketing en PySpark desde S3?

Tenga en cuenta que spark.table(“”) no funciona para mí, porque necesito especificar una ruta de S3 (y no sé cómo hacerlo en la llamada table()).

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que tienes problemas para leer un DataFrame agrupado por bucket en PySpark desde S3. Parece que el DataFrame resultante se está leyendo sin columnas.

    Es posible que el problema esté relacionado con la configuración de spark.sql.autoBroadcastJoinThreshold. Es posible que desees intentar configurarla con un valor más alto y ver si eso ayuda.

    También puedes intentar utilizar el método spark.read.parquet() para leer el DataFrame desde S3. Este método te permite especificar la ruta de S3 y debería poder leer correctamente el DataFrame agrupado por bucket.

Comments are closed.