es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Error de PicklingError para llamada recursiva en Pyspark

Necesito implementar una función recursiva que utilice rdd (escribiré una función que enviará algo para ser calculado al clúster y luego enviará algunos cálculos a otros trabajadores).

Da un error de PicklingError debido a llamar a una función que utiliza sc:

PicklingError: No se pudo serializar el objeto: Excepción: Parece que estás intentando hacer referencia a SparkContext desde una variable de transmisión, acción o transformación. SparkContext solo se puede usar en el controlador, no en código que se ejecute en los trabajadores. Para obtener más información, consulta SPARK-5063.

¿Cómo puedo implementar este algoritmo? Estoy llamando a recursive(5) en la función que utiliza SparkContext sc dentro de ella, y esto no es posible al usar Spark, pero ¿cómo puedo manejar este estado de “enviar nuevamente algunos cálculos a otro clúster”?

El código se simplificó:

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("test")\
        .getOrCreate()
sc = spark.sparkContext

def recursive(i):

def main_calculation(j):
    string_to_return = ""
    # calcular algo y concatenar la cadena con eso
    recursive(5) # necesito recursión debido a un algoritmo de árbol; por supuesto, no se llama infinitamente, se detendrá, pero el código se simplificó mucho
    return string_to_return

rdd = sc.parallelize([0,1,2,3,4])
log_values = rdd.map(lambda n: main_calculation(n))
llist = log_values.collect()

recursive(0)

La versión de Spark es 3.0.0.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para evitar el error de PicklingError, puedes pasar el objeto SparkContext como argumento a la función recursiva en lugar de llamarlo directamente dentro de la función. De esta manera, puedes evitar hacer referencia a SparkContext en un método de transformación o acción.

    Aquí tienes una implementación actualizada de la función recursiva:

    def recursiva(i, spark_context):
    
        def calculo_principal(j):
            string_a_retornar = ""
            # calcula algo y concatena la cadena con eso
            recursiva(5, spark_context) # llama a la función recursiva con el argumento spark_context
            return string_a_retornar
    
        rdd = spark_context.parallelize([0,1,2,3,4])
        valores_log = rdd.map(lambda n :  calculo_principal(n))
        lista = valores_log.collect()
    
    recursiva(0, sc) # llama a la función recursiva con el objeto SparkContext como argumento
    

    En la implementación actualizada, la función recursiva toma un argumento adicional, “spark_context”, que es el objeto SparkContext pasado desde el programa controlador. Este objeto luego puede ser utilizado en la función interna “calculo_principal” en lugar de llamar directamente a “sc”.

    Ten en cuenta que, dado que SparkContext solo puede ser utilizado en el controlador, debes pasarlo explícitamente a los trabajadores mediante argumentos de función.

    Además, asegúrate de llamar a la función recursiva con el argumento spark_context cada vez que la llames dentro de la función para evitar el error de PicklingError.

Comments are closed.