es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Error de configuración de servicio al ejecutar Spark 3.2

Estoy intentando actualizar el código escrito con spark 2.4 y haciendo algunas pruebas con spark 3.2. Soy capaz de crear una sesión de spark:

spark = (
    SparkSession.builder
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.2.0,com.crealytics:spark-excel_2.11:0.13.1')
        .config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        .config("spark.hadoop.fs.azure.account.key." + storage_account + ".blob.core.windows.net", storage_account_key)
        .config("spark.driver.memory", "32G")
        .master("local[*]")
        .appName("Dev")
        .getOrCreate()
)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")

Pero cuando intento leer algo con:

spark.read.parquet(some_parquet_somewhere)

Recibo los siguientes errores:

Py4JJavaError: An error occurred while calling o104.parquet.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:596)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
    at org.apache.spark.sql.delta.sources.DeltaDataSource.<init>(DeltaDataSource.scala:43)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    ... 31 more

Entiendo que es un problema de configuración, pero no estoy seguro de cuál. Realmente agradecería algo de ayuda.

Edit: Estoy obteniendo estos errores tratando de usar pyspark en Jupyter Notebook, traté de usar pyspark en la consola y no tuve problemas para leer desde Azure Blob Storage.

Porque obtengo este mensaje:

/opt/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
299                        int96RebaseMode=int96RebaseMode)
300 
--> 301         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    302 
    303     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,

~/.pyenv/versions/3.7.3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1320         answer = self.gateway_client.send_command(command)
   1321         return_value = get_return_value(
-> 1322             answer, self.gateway_client, self.target_id, self.name)
   1323 
   1324         for temp_arg in temp_args:

/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~/.pyenv/versions/3.7.3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Pensé que tal vez era algún problema con las variables de entorno, así que intenté usar:

export SPARK_HOME=carpeta/de/instalación/spark

y

import findspark
findspark.init()

sin éxito.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Parece que el error se debe a que no se pudo instanciar el proveedor DeltaDataSource. Este proveedor es utilizado por Spark para acceder a las tablas de Delta Lake.

    El mensaje de error también sugiere que puede haber un problema de configuración, y puede estar relacionado con el hecho de que el código fue originalmente escrito con Spark 2.4 y ahora se está probando con Spark 3.2.

    Para resolver este problema, es posible que desees probar los siguientes pasos:

    1. Asegúrate de que todas las dependencias requeridas se hayan agregado correctamente a tu entorno Spark. Puedes verificar esto inspeccionando la variable de entorno “SPARK_CLASSPATH” o mediante la página web de Spark en la pestaña “Environment”.
    2. Comprueba que tienes las versiones correctas de los paquetes “delta-core” y “spark-delta” instalados. Puedes hacer esto ejecutando pip list y buscando estos paquetes.

    3. Verifica si hay otros posibles problemas de configuración, como si la versión de Spark que estás usando es compatible con la versión de Hadoop que tienes instalada.

    4. Intenta ejecutar tu código con la opción “–packages” y especificando explícitamente las dependencias “delta-core” y “spark-delta”, como sigue:

      pyspark --packages io.delta:delta-core_2.12:1.1.0,io.delta:delta-core_2.12:1.1.0 \
             --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
             --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
      

    5. Si ninguno de los pasos anteriores funciona, es posible que desees intentar volver a la versión de Spark 2.4 para ver si se resuelve el problema.

    En cuanto a tu edición sobre el uso de PySpark en Jupyter Notebook, es posible que debas asegurarte de que el kernel de Jupyter Notebook apunte a la instalación correcta de Spark y que todas las dependencias requeridas estén instaladas para ese kernel. Puedes verificar esto ejecutando pip list desde dentro de Jupyter Notebook e inspeccionando la lista de paquetes instalados.

Comments are closed.