es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Existe alguna manera de ejecutar SnowflakeOperator, SnowflakeHook localmente en un script que no sea de Airflow?

Tengo algunos DAGs que utilizan SnowflakeOperator y SnowflakeHook. Ambos hacen conexiones a Snowflake utilizando la entrada snowflake_connection, que he guardado en Admin > Connections en Airflow.


SnowflakeHook(
snowflake_conn_id="snowflake_connection",
database='SOME_DB',
schema='PUBLIC'
)

Si bien este arreglo funciona dentro del entorno de Airflow, quiero ejecutar el código como un script regular en mi máquina local. Pero no puede acceder a esa snowflake_connection, que tengo guardada en Airflow. ¿Existe alguna solución alternativa para esto?

¿Existe alguna forma en la que pueda proporcionar directamente una cadena de conexión o nombre de usuario/contraseña a SnowflakeHook y SnowflakeOperator para establecer una conexión con la base de datos?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Sí, hay una solución alternativa para esta situación. Puedes definir la cadena de conexión o el nombre de usuario/contraseña directamente en el código al crear una instancia de SnowflakeHook o SnowflakeOperator. Aquí tienes un ejemplo:

    from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
    
    # Define los parámetros de conexión
    conn_params = {
        'account': 'nombre_de_tu_cuenta',
        'user': 'nombre_de_usuario',
        'password': 'tu_contraseña',
        'database': 'ALGUNA_DB',
        'schema': 'PUBLIC'
    }
    
    # Crea una instancia de SnowflakeHook con los parámetros de conexión
    hook = SnowflakeHook(snowflake_conn_id=None, **conn_params)
    
    # Ejecuta una consulta utilizando el hook
    results = hook.get_first('SELECT * FROM alguna_tabla')
    
    # Crea una instancia de SnowflakeOperator con los parámetros de conexión
    op = SnowflakeOperator(
        task_id='alguna_tarea',
        sql='SELECT * FROM otra_tabla',
        snowflake_conn_id=None,
        **conn_params
    )
    
    # Ejecuta el operador
    op.execute(context={})
    

    En el código anterior, definimos los parámetros de conexión (cuenta, usuario, contraseña, base de datos, esquema) directamente en un diccionario (conn_params). Luego, creamos instancias de SnowflakeHook y SnowflakeOperator utilizando esos parámetros. Establecemos snowflake_conn_id=None para evitar utilizar la conexión guardada en Airflow, y pasamos los parámetros utilizando **conn_params.

    Con este enfoque, puedes ejecutar tu código fuera del entorno de Airflow y aún así conectarte a Snowflake correctamente.

Comments are closed.