es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo leer un archivo Parquet particionado por la carpeta de fecha en un DataFrame desde S3 utilizando Python?

Usando python, debería ir hasta la carpeta cwp y entrar en la carpeta de la fecha para leer el archivo parquet.
Tengo esta estructura de carpetas dentro de s3.

Ruta de muestra de s3:

Nombre del bucket = lla.analytics.dev

Ruta = bigdata/dna/fixed/cwp/dt=YYYY-MM-DD/file.parquet

s3://lla.analytics.dev/bigdata/dna/fixed/cwp/dt=2021-11-24/file.parquet
dt=2021-11-25/file.parquet
dt=2021-11-26/file.parquet
……………………
……………………
dt=YYYY-MM-DD/file.parquet

Debo acceder a la carpeta de fecha más reciente y leer los archivos en un dataframe desde s3.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para lograr esto, primero necesitamos asegurarnos de que tenemos instaladas las bibliotecas necesarias, como boto3 y pyarrow. Una vez que tengamos estas bibliotecas, podemos utilizar el siguiente código para leer el último archivo parquet del ruta especificada en s3:

    import boto3
    import pandas as pd
    import pyarrow.parquet as pq
    
    #Credenciales de S3
    ACCESS_KEY = 'tu_access_key'
    SECRET_KEY = 'tu_secret_key'
    
    #Crear sesión de boto3
    s3 = boto3.Session(
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
    )
    
    #Establecer el bucket de S3 y la ruta del directorio
    bucket_name = 'lla.analytics.dev'
    prefix = 'bigdata/dna/fixed/cwp/'
    
    # Obtener la lista de todos los directorios
    all_keys = []
    s3_client = s3.client('s3')
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    for obj in response.get('Contents', []):
        all_keys.append(obj.get('Key'))
    
    # Ordenar los directorios según la fecha
    all_keys_sorted_by_date = sorted(all_keys)
    keys = [key for key in all_keys_sorted_by_date if 'dt=' in key]
    
    # Obtener el directorio más reciente
    recent_directory = keys[-1]
    
    # Obtener la lista de todos los archivos en el directorio más reciente
    files = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=recent_directory)
    file_keys = [file['Key'] for file in files['Contents']]
    
    # Leer el archivo parquet en un dataframe de pandas y devolver el dataframe
    df = pd.concat([pq.read_table("s3://" + bucket_name + "/" + file).to_pandas() for file in file_keys])
    

    El código anterior utiliza boto3 para conectarse al bucket de s3 y obtener una lista de todos los directorios bajo el prefijo especificado. Luego, el código ordena esta lista por fecha (suponiendo que los directorios estén nombrados con fechas) y selecciona el directorio más reciente. A continuación, recupera una lista de todos los archivos en este directorio y lee cada archivo parquet en un dataframe de pandas. Finalmente, concatena todos los dataframes en uno y lo devuelve.

Comments are closed.