es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo leer y escribir en el mismo archivo en Palantir Foundry?

Tengo una tarea muy sencilla de actualizar el contenido de un archivo de control en Palantir Foundry. Necesito leer el contenido del archivo, realizar una verificación y luego escribirlo nuevamente en el mismo archivo. Sin embargo, si proporciono el mismo archivo como entrada y salida en la transformación, obtengo el siguiente error:

Existe una dependencia cíclica en el código para los siguientes conjuntos de datos de Foundry: ...

¿Hay alguna solución alternativa para esto?

Pregunta actualizada con más detalles y fragmento de código según sugerencia a continuación

Tengo un requerimiento para agregar el contenido de un archivo de transacción a un archivo de instantánea incremental, y este proceso se supone que se ejecuta una vez al día.
Sin embargo, si el proceso se ejecuta más de una vez en un día (inadvertidamente o reiniciando después de una falla, etc.), necesito asegurarme de que los mismos registros no se agreguen nuevamente.
Por lo tanto, estoy intentando usar el siguiente fragmento de código (como se aconseja en la documentación).

@incremental()
@transform(
    inp=Input("<path>/daily_trans_file"),
    op=Output("<path>/hist_snapshot_file")
)

def my_compute_function(inp, op):

    # Calcula la fecha de hoy y la agrega a los datos de transacción diarios
    proc_date = (datetime.today()).strftime("%Y-%m-%d")
    inp_df = inp.dataframe()
    inp_df = inp_df.withColumn("Processed_Date", lit(proc_date))

    # Agrega el dataframe anterior al final del archivo op si aún no está presente
    op.write_dataframe(
        inp_df.subtract(
        op.dataframe('previous', schema=inp_df.schema)))

Sin embargo, parece haber algún problema con esto. El archivo solo conserva los datos que originalmente tenía y no agrega nuevos datos. No estoy seguro de por qué la resta no está funcionando como debería. Verifiqué el ejemplo de la documentación y no parece ser necesario establecer ningún otro modo en este caso.

Tags:  ,

Answer

  1. Avatar for davy.ai

    Según el mensaje de error que proporcionaste, parece que hay una dependencia cíclica en tu código. Esto significa que tu transformación tiene una dependencia de entrada y salida en el mismo conjunto de datos, lo cual está causando el error.

    Para solucionar esto, puedes crear un nuevo conjunto de datos que actúe como conjunto de datos intermedio entre tus conjuntos de datos de entrada y salida. De esta manera, tu transformación tendrá una dependencia clara de entrada y salida, y se resolverá el error de dependencia cíclica.

    Aquí tienes un ejemplo de cómo puedes modificar tu código para crear un conjunto de datos intermedio y utilizarlo en tu transformación:

    intermediate_dataset = ds.output('intermediate_dataset')
    
    @transform(
        inp=Input("<ruta>/archivo_control"),
        intermediate=Output(intermediate_dataset),
        out=Output("<ruta>/archivo_control")
    )
    def mi_funcion_de_calculo(inp, intermediate, out):
        # Leer contenido del archivo de control
        contenido = inp.read()
    
        # Realizar alguna verificación en el contenido
        nuevo_contenido = hacer_verificacion(contenido)
    
        # Escribir el nuevo contenido en el conjunto de datos intermedio
        intermediate.write(nuevo_contenido)
    
        # Copiar el nuevo contenido del conjunto de datos intermedio al conjunto de datos de salida
        out.write(intermediate.read())
    

    En este ejemplo, primero creamos un conjunto de datos intermedio utilizando ds.output. Luego modificamos la transformación para utilizar este conjunto de datos intermedio como una salida, junto con los conjuntos de datos de entrada y salida originales. Leemos el contenido del archivo de control utilizando inp.read(), realizamos alguna verificación en el contenido, escribimos el nuevo contenido en el conjunto de datos intermedio y finalmente copiamos el nuevo contenido del conjunto de datos intermedio al conjunto de datos de salida utilizando out.write(intermediate.read()).

    ¡Espero que esto te ayude! Avísame si tienes alguna pregunta o inquietud adicional.

Comments are closed.