es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo cargar un archivo csv.gpg desde GCS a Bigquery utilizando airflow o python.

Tenemos un requisito para descifrar y cargar archivos .gpg desde GCS a BigQuery en Airflow. ¿Existe alguna manera más fácil y rápida de descifrar y cargar al mismo tiempo desde GCS a BQ?

El proceso que he utilizado:
1. Descargar el archivo al servidor local.
2. Descifrar y cargar el mismo en GCS.
3. Cargar desde GCS a BQ.

    import pgpy
    import os
    from google.cloud import secretmanager
    from google.cloud import storage
    from io import BytesIO

    def upload_to_bucket(blob_name, output, bucket_name):
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(data=output.read())
        output.close()
        return blob.public_url

    def get_secret(project, key_id, version):

        client = secretmanager.SecretManagerServiceClient()
        name=f"projects/{project}/secrets/{key_id}/versions/{version}"
        response = client.access_secret_version(request={"name": name})
        return response.payload.data.decode('ascii')

    fetch_key = get_secret('project', 'secret_key', version)
    key, _ = pgpy.PGPKey.from_blob(fetch_key) 
    storage_client = storage.Client()
    bucket = storage_client.get_bucket('bucket_name')

    blob = bucket.blob('test.csv.gpg')
    local_path=os.path.join(os.getcwd(),'teest.csv.gpg')
    blob.download_to_filename(os.path.join(os.getcwd(),'test.csv.gpg'))
    print(local_path)
    msg_encrypted = pgpy.PGPMessage.from_file(local_path)
    msg_plaintext = BytesIO((key.decrypt(msg_encrypted).message))
    print(msg_plaintext)
    g=upload_to_bucket('test.csv', msg_plaintext, 'buckeet')
    print(g)

os.remove(local_path)

¿Tenemos alguna opción para leer directamente desde GCS y cargar en BQ?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, puede utilizar Google Cloud Storage para cargar directamente los datos descifrados en BigQuery utilizando el método load de la API bigquery. Puede leer el archivo .gpg desde GCS utilizando la biblioteca google-cloud-storage, y luego descifrarlo utilizando pgpy. Una vez que tenga el archivo descifrado como una cadena, puede pasarlo al método load para cargarlo en BigQuery.

    Aquí hay una posible implementación:

    import pgpy
    from google.cloud import secretmanager
    from google.cloud import storage
    from google.cloud import bigquery
    from io import BytesIO
    
    def obtener_secreto(proyecto, id_clave, version):
        cliente = secretmanager.SecretManagerServiceClient()
        nombre = f"projects/{proyecto}/secrets/{id_clave}/versions/{version}"
        respuesta = cliente.access_secret_version(request={"name": nombre})
        return respuesta.payload.data.decode('ascii')
    
    clave_secreta = obtener_secreto('proyecto', 'clave_secreta', version)
    clave, _ = pgpy.PGPKey.from_blob(clave_secreta) 
    cliente_storage = storage.Client()
    cubeta = cliente_storage.get_bucket('nombre_cubeta')
    blob = cubeta.blob('test.csv.gpg')
    mensaje_encriptado = pgpy.PGPMessage.from_blob(blob.download_as_string())
    mensaje_plano = clave.decrypt(mensaje_encriptado).message.decode()
    
    cliente_bq = bigquery.Client()
    referencia_dataset = cliente_bq.dataset('mi_dataset')
    referencia_tabla = referencia_dataset.table('mi_tabla')
    
    configuracion_trabajo = bigquery.LoadJobConfig()
    configuracion_trabajo.source_format = bigquery.SourceFormat.CSV
    configuracion_trabajo.skip_leading_rows = 1 # omitir fila de encabezado si está presente
    configuracion_trabajo.autodetect = True # detectar esquema automáticamente
    
    trabajo_carga = cliente_bq.load_table_from_file(
        BytesIO(mensaje_plano.encode()),
        referencia_tabla,
        job_config=configuracion_trabajo,
    )  # El trabajo es asíncrono, por lo que necesitamos esperar hasta que se complete
    trabajo_carga.result()
    

    Este código lee la clave secreta de Google Cloud Secret Manager, luego descarga el archivo .gpg desde GCS y lo descifra utilizando pgpy. Los datos descifrados se cargan en BigQuery utilizando el método load_table_from_file, que acepta un objeto similar a un archivo que contiene datos CSV. El objeto configuracion_trabajo especifica el formato de entrada y algunas configuraciones opcionales. Una vez que se lanza el trabajo, esperamos hasta que se complete utilizando el método result.

    Este método debería ser más rápido y eficiente que descargar el archivo a un servidor local y volver a cargarlo en GCS.

Comments are closed.