es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

SparkContext ha sido cerrado.

Tengo un trabajo Spark que se ve así

def run():
    spark_session = helper_class.create_spark_session()
    // realizar alguna lógica
    spark_session.stop()
ahora quiero escribir algunas pruebas de integración para el trabajo y obtener <br><code>java.lang.IllegalStateException: SparkContext ha sido apagado</code><be>
en dos casos <br>
1. Tengo dos casos de prueba que ejecutan el mismo trabajo pero con datos de prueba diferentes, el primer caso de prueba se ejecuta bien pero el segundo falla con el error anterior.
2. Incluso si mi primer caso de prueba se ejecuta bien, quiero verificar los resultados del trabajo (almacenados en una ubicación de prueba local), por lo que para lograr eso quiero crear una sesión de Spark, leer los archivos y luego comparar los marcos de datos, pero obtengo el mismo error.

¿Hay alguna forma de forzar a Spark a crear un nuevo contexto o cómo puedo resolver este problema? Gracias de antemano.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Sí, hay una forma de forzar a Spark a crear un nuevo contexto. Puede configurar la propiedad de configuración spark.driver.allowMultipleContexts como true antes de crear un nuevo contexto. Esto le permitirá crear múltiples contextos de Spark en el mismo JVM.

    Para hacer esto en su función run(), agregue el siguiente código antes de crear la nueva SparkSession:

    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setAppName("MiApp").set("spark.driver.allowMultipleContexts", "true")
    sc = SparkContext.getOrCreate(conf=conf)
    

    Esto creará un nuevo SparkContext con la opción allowMultipleContexts configurada como true. Luego puede usar el objeto sc para crear una nueva SparkSession.

    Alternativamente, puede usar el complemento pytest-spark, que proporciona un punto de referencia para crear automáticamente un nuevo contexto de Spark para cada caso de prueba. Esto se puede hacer instalando el complemento (pip install pytest-spark) y utilizando el punto de referencia spark_context en sus pruebas:

    def test_mi_tarea_spark(spark_context):
        spark_session = spark_context.sparkSession
        # realizar alguna lógica
        spark_session.stop()
    

    Esto creará un nuevo SparkContext y SparkSession para cada caso de prueba, evitando el problema de que múltiples casos de prueba utilicen el mismo contexto.

Comments are closed.