Descomprimir JSON en PysparkSQL
Estoy buscando explotar un json anidado a un archivo CSV.
Buscando analizar el json anidado en filas y columnas.
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
df=spark.read.option(“multiline”,”true”).json(“sample1.json”)
df.printSchema()
root
|– pid: struct (nullable = true)
| |– Body: struct (nullable = true)
| | |– Vendor: struct (nullable = true)
| | | |– RC: struct (nullable = true)
| | | | |– Updated_From_Date: string (nullable = true)
| | | | |– Updated_To_Date: string (nullable = true)
| | | |– RD: struct (nullable = true)
| | | | |– Supplier: struct (nullable = true)
| | | | | |– Supplier_Data: struct (nullable = true)
| | | | | | |– Days: long (nullable = true)
| | | | | | |– Reference: struct (nullable = true)
| | | | | | | |– ID: array (nullable = true)
| | | | | | | | |– element: string (containsNull = true)
| | | | | | |– Expected: long (nullable = true)
| | | | | | |– Payments: long (nullable = true)
| | | | | | |– Approval: struct (nullable = true)
| | | | | | | |– ID: array (nullable = true)
| | | | | | | | |– element: string (containsNull = true)
| | | | | | |– Areas_Changed: struct (nullable = true)
| | | | | | | |– Alternate_Names: long (nullable = true)
| | | | | | | |– Attachments: long (nullable = true)
| | | | | | | |– Classifications: long (nullable = true)
| | | | | | | |– Contact_Information: long (nullable = true))
Mi código:
df2=(df.select(F.explode(“pid.Body.Vendor.RC”).alias(‘RC’))
.select(‘RC.*’))
Error:
AnalysisException: cannot resolve ‘explode(pid.Body.Vendor.RC)’ due to data type mismatch: input to function explode should be array or map type, not struct<updated_from_date:string, updated_to_date:string=””>.
¿Cómo puedo analizar en campos de estructura?
cualquier ayuda será muy apreciada :)</updated_from_date:string,>
davy.ai
Puedes analizar los campos de la estructura utilizando la función
select
. Por ejemplo, si deseas acceder al campoUpdated_From_Date
, puedes utilizar el siguiente código:df.select("pid.Body.Vendor.RC.Updated_From_Date")
También puedes utilizar la función
explode
para convertir los campos de la estructura en filas. Por ejemplo, si deseas convertir el campo de arrayID
en filas, puedes utilizar el siguiente código:df.select(F.explode("pid.Body.Vendor.RD.Supplier.Supplier_Data.Reference.ID").alias("ID"))
Luego puedes utilizar la función
write
para escribir el dataframe en un archivo CSV. Por ejemplo:df.write.csv("output.csv")