es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Convierta rdd.PipelinedRDD en un sql.dataframe.DataFrame sin utilizar el método collect().

Me gustaría convertir un rdd.PipelinedRDD en un sql.dataframe.DataFrame con PySpark. El PipelinedRDD se ve así:

[(1.0, ‘#1John #2Doe’),
(2.0, ‘#1Adam #2Smith’),
(3.0, ‘#1Benjamin #2Miller’)]

El DataFrame debería verse así:

[Row(_1=1, _2=’#1John #2Doe’),
Row(_1=2, _2=’#1Adam #2Smith’),
Row(_1=3, _2=’#1Benjamin #2Miller’)]

Logré hacer esto con el siguiente código:

df = sc.parallelize(rdd.collect()).map(lambda x: [int(x[0]), x[1]]).toDF()

Sin embargo, al usar este código para un conjunto de datos grande, obtengo un OutOfMemoryError. Creo que el problema está en el método collect(). ¿Existe una forma de hacer esto de manera más eficiente?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para evitar el error OutOfMemoryError al convertir un PipelinedRDD en un DataFrame, podrías intentar usar el método toDF() directamente en el rdd sin llamar primero al método collect(). Aquí tienes un ejemplo de código:

    from pyspark.sql import Row
    
    rdd = sc.parallelize([(1.0, '#1John #2Doe'),
                          (2.0, '#1Adam #2Smith'),
                          (3.0, '#1Benjamin #2Miller')])
    
    df = rdd.map(lambda x: Row(_1=int(x[0]), _2=x[1])).toDF()
    
    df.show()
    

    Este código utiliza el método map() para crear un nuevo RDD de objetos Row, donde _1 se convierte en un entero, y luego aplica toDF() para crear un DataFrame directamente desde el RDD.

    Al evitar el método collect(), no es necesario cargar todo el RDD en memoria de una vez, lo que reduce el riesgo de un error OutOfMemoryError.

Comments are closed.