es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

No se puede iniciar un multiprocessing Pool en una tarea dask.

Estoy tratando de calcular un gráfico personalizado de dask en un clúster remoto utilizando el método Client.get, pero me he encontrado con el siguiente error: AssertionError: no se permiten procesos daemon con hijos.

Me di cuenta de que uno de los métodos subyacentes en el gráfico de dask utiliza un grupo de procesos, lo que provoca que se genere este error, ya que dask también intenta utilizar un grupo de procesos. Así que la forma en que solucioné esto fue usando un LocalCluster y pasando el argumento processes=False. Sin embargo, lo desafortunado es que dask no me permite pasar process=False si estoy usando un Client conectado a un clúster remoto, y tampoco funciona inicializar los workers remotos con el argumento –nprocs=1.

En definitiva, puedo ejecutar el gráfico, pero es bastante frustrante que no pueda utilizar las capacidades de multiprocessing al calcular el gráfico de dask y no pueda usar clústeres remotos. ¿Alguna idea sobre cómo implementar uno (o tal vez ambos) de estos requisitos?

Gracias de antemano.

Muestra de código

Esto es lo que pretendía hacer, pero se generó la AssertionError:

from multiprocessing import Pool
from dask.distributed import Client

client = Client(<alguna-dirección-ip-remota-aquí>)

def foo():
    pool = Pool()  # la excepción se genera aquí, en la inicialización del objeto Pool
    ... hace algo aquí ...
    retorna True

graph = {'resultado': foo}

client.get(graph, 'resultado')

Esto es cómo lo “arreglé”, eliminando las capacidades de multiprocessing y configurando un clúster local:

“`
from multiprocessing import Pool
from dask.distributed import Client

client = Client(processes=False) # esto proporciona un LocalCluster que no tiene capacidades de multiprocessing (la documentación es muy breve y no muy útil: http://distributed.dask.org/en/stable/api.html#distributed.LocalCluster)

def foo():
pool = Pool() # no se genera ninguna excepción
… hace algo aquí …
retorna True

graph = {'resultado': foo}

client.get(graph, 'resultado')

“`</alguna-dirección-ip-remota-aquí>

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el error que encontraste se debe a un conflicto entre el grupo de procesos en paralelo utilizado por dask y el que intentas crear en tu código. Una solución que propusiste fue usar un LocalCluster en lugar de uno remoto, pero esto elimina las capacidades de multiprocesamiento, lo cual es una limitación que deseas superar.

    Una solución que se me ocurre es usar un programador enroscado (threaded scheduler) en lugar del predeterminado, que se basa en un programador basado en procesos. Esto podría evitar el conflicto entre los grupos de procesos en paralelo y debería funcionar con grupos remotos.

    Aquí te muestro cómo puedes crear un cliente con un programador enroscado:

    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(processes=False, threads_per_worker=4)
    client = Client(cluster)
    

    Esto te dará un cliente con un LocalCluster que utiliza hilos en lugar de procesos. Puedes ajustar el parámetro threads_per_worker según tus necesidades. Esto debería permitirte utilizar capacidades de multiprocesamiento con grupos remotos, sin encontrar el error de “procesos demoníacos”.

Comments are closed.