Error de configuración de servicio al ejecutar Spark 3.2
Estoy intentando actualizar el código escrito con spark 2.4 y haciendo algunas pruebas con spark 3.2. Soy capaz de crear una sesión de spark:
spark = (
SparkSession.builder
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.2.0,com.crealytics:spark-excel_2.11:0.13.1')
.config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.config("spark.hadoop.fs.azure.account.key." + storage_account + ".blob.core.windows.net", storage_account_key)
.config("spark.driver.memory", "32G")
.master("local[*]")
.appName("Dev")
.getOrCreate()
)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
Pero cuando intento leer algo con:
spark.read.parquet(some_parquet_somewhere)
Recibo los siguientes errores:
Py4JJavaError: An error occurred while calling o104.parquet.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:596)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
at org.apache.spark.sql.delta.sources.DeltaDataSource.<init>(DeltaDataSource.scala:43)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 31 more
Entiendo que es un problema de configuración, pero no estoy seguro de cuál. Realmente agradecería algo de ayuda.
Edit: Estoy obteniendo estos errores tratando de usar pyspark en Jupyter Notebook, traté de usar pyspark en la consola y no tuve problemas para leer desde Azure Blob Storage.
Porque obtengo este mensaje:
/opt/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
299 int96RebaseMode=int96RebaseMode)
300
--> 301 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
302
303 def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
~/.pyenv/versions/3.7.3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1320 answer = self.gateway_client.send_command(command)
1321 return_value = get_return_value(
-> 1322 answer, self.gateway_client, self.target_id, self.name)
1323
1324 for temp_arg in temp_args:
/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
~/.pyenv/versions/3.7.3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Pensé que tal vez era algún problema con las variables de entorno, así que intenté usar:
export SPARK_HOME=carpeta/de/instalación/spark
y
import findspark
findspark.init()
sin éxito.
davy.ai
Parece que el error se debe a que no se pudo instanciar el proveedor DeltaDataSource. Este proveedor es utilizado por Spark para acceder a las tablas de Delta Lake.
El mensaje de error también sugiere que puede haber un problema de configuración, y puede estar relacionado con el hecho de que el código fue originalmente escrito con Spark 2.4 y ahora se está probando con Spark 3.2.
Para resolver este problema, es posible que desees probar los siguientes pasos:
Comprueba que tienes las versiones correctas de los paquetes “delta-core” y “spark-delta” instalados. Puedes hacer esto ejecutando
pip list
y buscando estos paquetes.Verifica si hay otros posibles problemas de configuración, como si la versión de Spark que estás usando es compatible con la versión de Hadoop que tienes instalada.
Intenta ejecutar tu código con la opción “–packages” y especificando explícitamente las dependencias “delta-core” y “spark-delta”, como sigue:
En cuanto a tu edición sobre el uso de PySpark en Jupyter Notebook, es posible que debas asegurarte de que el kernel de Jupyter Notebook apunte a la instalación correcta de Spark y que todas las dependencias requeridas estén instaladas para ese kernel. Puedes verificar esto ejecutando
pip list
desde dentro de Jupyter Notebook e inspeccionando la lista de paquetes instalados.