es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

PySpark: escribiendo en modo ‘append’ y sobrescribiendo si ciertos criterios coinciden

Estoy añadiendo el siguiente dataframe de Spark a una base de datos existente de Redshift. Y quiero usar ‘month’ y ‘state’ como criterios para verificar y reemplazar datos en la tabla de Redshift si month = '2021-12' y state = 'ga'

Dataframe que queremos añadir: df

month state producto
2021-12 ca ho0
2021-12 ca ho1
2021-12 ca ho2
2021-12 ca ho3
2021-12 ca ho4
2021-12 ga ho5
2021-12 ga ho6
2021-12 ga ho7
2021-12 ga ho8
2021-12 ga ho9

He intentado el siguiente script para añadirlo, parece que solo añade el dataframe df, sin reemplazar (sobrescribir) los registros existentes para el mes ‘2021-12’ y el estado ‘ga’.


df.write \
.format("xxx") \
.option("url", "xxx") \
.option("dbtable", "table1") \
.option("tempdir", "xxxx") \
.option("aws_iam_role", "xxxx") \
.mode("append") \
.option("replaceWhere", "month == '2021-12' AND state == 'ga'") \
.save()

Supongo que .option("replaceWhere", "month == '2021-12' AND state == 'ga'") no está funcionando. ¿Cómo puedo realizar el cambio? ¡Gracias!
(También probé la siguiente parte, parece que los registros existentes desaparecen y son reemplazados por df)


.mode("overwrite") \
.option("replaceWhere", "month == '2021-12' AND state == 'ga'") \

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para reemplazar (sobrescribir) los registros existentes para el mes ‘2021-12’ y el estado ‘ga’ con los datos del dataframe df durante la operación de anexar, puedes modificar el script de la siguiente manera:

    from pyspark.sql.functions import col
    
    df.write \
      .format("xxx") \
      .option("url", "xxx") \
      .option("dbtable", "table1") \
      .option("tempdir", "xxxx") \
      .option("aws_iam_role", "xxxx") \
      .mode("append") \
      .option("batchsize", "100000") \ # opcional, puede mejorar el rendimiento
      .option("truncate", "true") \ # truncar la tabla antes de anexar los datos
      .option("filterCondition", "(month != '2021-12' OR state != 'ga')") \ # filtrar los registros existentes
      .save()
    
    # seleccionar y filtrar los registros existentes en Redshift
    existing_df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:redshift://host:port/database") \
      .option("dbtable", "table1") \
      .option("user","username") \
      .option("password","password") \
      .load() \
      .filter((col("month") == '2021-12') & (col("state") == 'ga'))
    
    # unir los registros existentes con los nuevos datos y sobrescribir los registros existentes en Redshift
    combined_df = df.union(existing_df)
    combined_df.write \
      .format("jdbc") \
      .option("url", "jdbc:redshift://host:port/database") \
      .option("dbtable", "table1") \
      .option("user","username") \
      .option("password","password") \
      .mode("overwrite") \
      .save()
    

    Este script trunca la tabla table1 en Redshift antes de anexar los datos de df. También filtra los registros existentes para el mes ‘2021-12’ y el estado ‘ga’. Luego combina los registros existentes restantes con los nuevos datos de df utilizando union(). Finalmente, sobrescribe los registros existentes en table1 con los datos combinados utilizando mode("overwrite").

    Ten en cuenta que truncate y mode("overwrite") pueden causar pérdida de datos, así que úsalos con precaución. También debes ajustar las opciones como url, dbtable, user, password según tu entorno.

Comments are closed.