es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

La escritura de Pyspark en Minio (S3) particionado falla.

Estoy escribiendo archivos en Minio S3 usando Pyspark 3.1.2.
Estoy usando particionamiento para que los datos se almacenen en identificadores de lote, por ejemplo:

s3a://0001/transactions/batchid=1
s3a://0001/transactions/batchid=2
etc.

Todo funciona perfectamente bien al escribir en el sistema de archivos local.

Sin embargo, cuando uso S3 con partitioned commiter (https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html)

Con la opción: “partitionOverwriteMode” = “static”
por ejemplo:
data_frame.write.mode(“overwrite”).partitionBy(“batch_id”).orc(output_path)

Se sobrescribe todo el camino, incluyendo “transactions” (en lugar de sobrescribir solo la partición dada).

Configuración:

    spark_session.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
    )
    spark_session.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.path.style.access", "true"
    )
    spark_session.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.committer.magic.enabled", "true"
    )
    spark_session.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.committer.name", "partitioned"
    )
    spark_session.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.committer.staging.conflict-mode", "replace"
    )
    spark_session.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.committer.staging.abort.pending.uploads", "true"
    )
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema está en la configuración de partitionOverwriteMode. Intenta establecerlo en dynamic en lugar de static y verifica si eso ayuda.

Comments are closed.