es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

AWS Glue: Cómo generar sólo el último archivo en un bucket de s3.

Utilizo AWS Glue y Apache Hudi para replicar datos en RDS a S3. Si ejecuto el siguiente trabajo, se generarán 2 archivos parquet (el inicial y el actualizado) en el bucket S3 (rutaBase). En este caso, solo quiero tener 1 archivo más reciente y me gustaría eliminar el antiguo.

¿Alguien sabe cómo mantener solo 1 archivo más reciente en el bucket?

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

@params: [NOMBRE_DEL_TRABAJO]

args = getResolvedOptions(sys.argv, ['NOMBRE_DEL_TRABAJO'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['NOMBRE_DEL_TRABAJO'], args)

dataGen = sc.jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc.jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()

tablaNombre = 'hudi_morathena_sample' 
bucketNombre = 'cm-sato-hudi-sample--datalake'
rutaBase = f's3://{bucketNombre}/{tablaNombre}'

hudi_opciones = {
  'hoodie.table.name': tablaNombre,
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
  'hoodie.compact.inline': 'false',
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tablaNombre,
  'hoodie.datasource.write.operation': 'upsert',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2,
  'hoodie.insert.shuffle.parallelism': 2,
}

df.write.format("hudi"). \
  options(**hudi_opciones). \
  mode("overwrite"). \
  save(rutaBase)

updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.show()

print("update")

df.write.format("hudi"). \
  options(**hudi_opciones). \
  mode("append"). \
  save(rutaBase)

job.commit()
Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para mantener solo el archivo más reciente en el bucket de S3 utilizando AWS Glue y Apache Hudi, puede modificar el código para incluir un paso para eliminar el archivo anterior después de que se haya generado el nuevo archivo.

    Puede usar el paquete awsglue.utils de AWS Glue para listar los archivos en el bucket de S3 y luego eliminar el archivo anterior utilizando la función s3.deleteObject() del SDK de AWS para Python (boto3).

    Aquí hay un ejemplo de cómo puede modificar el código para mantener solo el archivo más reciente:

    import sys
    from awsglue.utils import getResolvedOptions
    from awsglue.utils import get_boto3_client
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
    sc = spark.sparkContext
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
    df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    df.show()
    
    tableName = 'hudi_mor_athena_sample' 
    bucketName = 'cm-sato-hudi-sample--datalake'
    basePath = f's3://{bucketName}/{tableName}'
    
    hudi_options = {
      'hoodie.table.name': tableName,
      'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
      'hoodie.compact.inline': 'false',
      'hoodie.datasource.write.recordkey.field': 'uuid', 
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': tableName,
      'hoodie.datasource.write.operation': 'upsert', 
      'hoodie.datasource.write.precombine.field': 'ts', 
      'hoodie.upsert.shuffle.parallelism': 2, 
      'hoodie.insert.shuffle.parallelism': 2,
    }
    
    df.write.format("hudi"). \
      options(**hudi_options). \
      mode("overwrite"). \
      save(basePath)
    
    updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
    df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.show()
    
    # update
    df.write.format("hudi"). \
      options(**hudi_options). \
      mode("append"). \
      save(basePath)
    
    # obtener cliente del bucket de S3
    s3 = get_boto3_client('s3')
    
    # listar archivos en el bucket de S3
    files = s3.list_objects_v2(Bucket=bucketName, Prefix=tableName)['Contents']
    
    # ordenar archivos por tiempo de última modificación
    sorted_files = sorted(files, key=lambda x:x['LastModified'], reverse=True)
    
    # eliminar todos los archivos excepto el más reciente
    for file in sorted_files[1:]:
        s3.delete_object(Bucket=bucketName, Key=file['Key'])
    
    job.commit()
    

    En este código modificado, después de la línea df.write.format("hudi").options(**hudi_options).mode("append").save(basePath), el código lista todos los archivos en el bucket de S3 (files = s3.list_objects_v2(Bucket=bucketName, Prefix=tableName)['Contents']) y los ordena por tiempo de última modificación (sorted_files = sorted(files, key=lambda x:x['LastModified'], reverse=True)). El código luego recorre todos los archivos excepto el más reciente (for file in sorted_files[1:]:) y los elimina utilizando la función s3.delete_object().

    Tenga en cuenta que este código eliminará todos los archivos excepto el más reciente. Si desea mantener más de un archivo más reciente, puede modificar el código en consecuencia.

Comments are closed.