es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Ejecutando un script de Python en 16 CPUs en lugar de 1 CPU.

Tengo un script bash que activa un script de Python:

#!/bin/bash
#SBATCH -J XXXXX
#SBATCH –ntasks=1
#SBATCH –cpus-per-task=16

python my_python_script.py

El script de Python está escaneando un archivo muy grande (~480,000,000 filas) y crea un diccionario que se escribirá como archivo de salida:

with open (huge_file, ‘r’) as hugefile, open (final_file, ‘w’) as final:
reader = csv.reader(hugefile, delimiter=”\t”)
writer = csv.writer(final, delimiter=”\t”)

d = {}

for r in reader:
v = r[0]+r[1]
if v not in d.keys():
d[v] = [r[5], r[4]]
else:
d[v].append([r[5], r[4]])

for k,v in d.items():
#analyses
nl = [different variables]
writer.writerow(nl)

Debido al tamaño del archivo, quiero utilizar 16 CPUs para la ejecución, sin embargo, aunque definí 16 CPUs en mi script bash, solo usa 1 CPU.

Leí mucho sobre subprocess, pero no parece aplicarse en este caso.
Me encantaría escuchar cualquier sugerencia.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Como experto en TI especializado en python y bash, sugiero usar el módulo multiprocessing en Python para paralelizar el procesamiento del archivo.

    El objeto multiprocessing.Pool se puede utilizar para crear un grupo de procesos trabajadores, y el método map() se puede utilizar para distribuir el trabajo entre ellos. Aquí hay un ejemplo de cómo modificar el script para utilizar la multiprogramación:

    import multiprocessing
    import csv
    
    # Función para procesar un conjunto de filas
    def process_chunk(chunk):
        d = {}
        for r in chunk:
            v = r[0] + r[1]
            if v not in d.keys():
                d[v] = [r[5],r[4]]
            else:
                d[v].append([r[5],r[4]])
        # Analyses
        nl = [cantidad de variables diferentes]
        return nl
    
    # Función principal para leer el archivo y utilizar la multiprogramación
    def main():
        with open(huge_file, 'r') as hugefile, open(final_file, 'w') as final:
            reader = csv.reader(hugefile, delimiter="\t")
            writer = csv.writer(final, delimiter="\t")
    
            pool = multiprocessing.Pool(processes=16)
            chunk_size = 1000  # Ajuste esto en función de la memoria disponible y el rendimiento
            chunks = [chunk for chunk in zip(*[reader]*chunk_size)]
            results = pool.map(process_chunk, chunks)
    
            for nl in results:
                writer.writerow(nl)
    

    En esta versión modificada, la función process_chunk() procesa un conjunto de filas y devuelve el resultado como una lista de variables. La función main() lee el archivo y lo divide en fragmentos, y utiliza multiprocessing.Pool.map() para aplicar la función process_chunk() a cada fragmento en paralelo. Los resultados se recopilan y se escriben en el archivo final.

    Ajuste el chunk_size para optimizar el rendimiento según la memoria del sistema disponible y el rendimiento general.

Comments are closed.