es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Deserializar un objeto de archivo de secuencia Hadoop en memoria.

Pyspark tiene una función llamada sequenceFile que nos permite leer un archivo de secuencia que está almacenado en HDFS o en alguna ruta local disponible para todos los nodos.

Sin embargo, ¿qué pasa si ya tengo un objeto de bytes en la memoria del controlador que necesito deserializar y escribir como un archivo de secuencia?

Por ejemplo, la aplicación en la que estoy trabajando (no puedo cambiar la lógica de la aplicación) ejecuta un trabajo de spark que escribe este archivo en un sistema de archivos que no cumple con HDFS, el cual luego puedo recuperar como un objeto de bytes de Python en memoria, el cual parece contener únicamente un objeto de Secuencia serializado que debería poder deserializar en memoria.

Dado que este objeto ya se encuentra en memoria (por razones que no puedo controlar), la única forma que tengo de deserializarlo y ver el resultado (que es un archivo json) actualmente es escribirlo como un archivo local, mover ese archivo a HDFS y luego leer el archivo utilizando el método sequenceFile (ya que este método solo funciona con un archivo que se encuentra en una ruta de archivo HDFS o en una ruta local en cada nodo). Esto crea problemas en el flujo de trabajo de la aplicación.

Lo que necesito poder hacer es deserializar esto en memoria para poder escribirlo como un archivo json sin tener que escribirlo localmente y luego ponerlo en HDFS solo para leerlo de nuevo con spark.

¿Existe alguna forma en Python de tomar este objeto de bytes similar a un objeto NullWritable y deserializarlo en un diccionario de Python o ponerlo nuevamente en Hadoop como algo que pueda leer realmente?

Imagen: enlace a la imagen

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Desafortunadamente, el método sequenceFile en PySpark solo nos permite leer un archivo de secuencia almacenado en HDFS o una ruta local disponible para todos los nodos. Si ya tienes un objeto de bytes en la memoria del driver que necesita ser deserializado y escrito como un archivo de secuencia, la solución actual es escribirlo como un archivo local, mover ese archivo a HDFS y luego leer el archivo usando el método sequenceFile.

    Sin embargo, existen formas de deserializar el objeto en memoria sin tener que escribirlo en un archivo. Una posible solución es usar el módulo StringIO para crear un objeto similar a un archivo en memoria, y luego usar el módulo hadoop_fs en PySpark para escribir los datos en HDFS. Aquí tienes un ejemplo de código:

    import io
    from pyspark import SparkContext, SparkConf
    from pyspark.serializers import PickleSerializer
    from py4j.java_gateway import java_import
    
    # crear el contexto de Spark
    conf = SparkConf().setAppName("test")
    sc = SparkContext(conf=conf)
    
    # crear objeto de bytes en memoria
    bytes_obj = b'\x80\x03}q\x00.'
    
    # crear objeto similar a un archivo en memoria
    buffer = io.BytesIO(bytes_obj)
    
    # importar el módulo FileSystem de Hadoop
    java_import(sc._gateway.jvm, "org.apache.hadoop.fs.FileSystem")
    java_import(sc._gateway.jvm, "org.apache.hadoop.fs.Path")
    java_import(sc._gateway.jvm, "org.apache.hadoop.conf.Configuration")
    
    # configuración de Hadoop
    conf = sc._jsc.hadoopConfiguration()
    conf.set("fs.defaultFS", "hdfs://localhost:9000")
    
    # crear objeto FileSystem
    fs = sc._gateway.jvm.FileSystem.get(conf)
    
    # crear la ruta de salida en HDFS
    output_path = sc._gateway.jvm.Path("hdfs://localhost:9000/tmp/test.seq")
    
    # crear el flujo de salida
    output_stream = fs.create(output_path)
    
    # escribir los datos en el flujo de salida
    Pickler(writeable=True).dump(buffer, output_stream)
    
    # cerrar el flujo de salida
    output_stream.close()
    
    # leer los datos de HDFS
    data = sc.sequenceFile("hdfs://localhost:9000/tmp/test.seq", "org.apache.hadoop.io.NullWritable", "org.apache.hadoop.io.BytesWritable").collect()
    
    # deserializar los datos
    deserializer = PickleSerializer()
    deserialized_data = [deserializer.loads(x[1]) for x in data]
    
    # imprimir el resultado
    print(deserialized_data)
    

    Este código crea un objeto de bytes en memoria, luego crea un objeto similar a un archivo usando io.BytesIO. Luego utiliza el módulo hadoop_fs en PySpark para escribir los datos en HDFS sin tener que escribirlos en un archivo local primero. Finalmente, lee los datos de HDFS usando el método sequenceFile y los deserializa utilizando el PickleSerializer. El resultado se imprime en la consola.

    Ten en cuenta que esta es solo una posible solución, y puede haber otras formas de lograr el mismo resultado dependiendo de tu caso de uso específico.

Comments are closed.