es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Computación en cadena distribuida con Dask en un clúster con alta tasa de fallos?

Estoy utilizando Dask Bag para ejecutar algunos cálculos de map-reduce simples en un clúster especial:

import dask.bag as bag

summed_image = bag.from_sequence(my_ids).map(gen_image_from_ids).reduction(sum, sum).compute()

Este código genera una secuencia de cálculos encadenados, comienza mapeando desde from_sequence y gen_image_from_ids, y luego reduce todos los resultados en uno solo con sum. Gracias a la funcionalidad de Dask Bag, la suma se realiza en paralelo en un árbol multinivel.

Mi configuración de clúster especial tiene una tasa de falla más alta, ya que mis procesadores pueden ser finalizados en cualquier momento y la CPU es utilizada por otros procesos de mayor orden y luego liberada después de un tiempo. La finalización puede ocurrir solo una vez en un solo nodo cada 5 minutos, pero mi trabajo de reducción total puede llevar más de 5 minutos.

Aunque Dask es bueno en la recuperación de fallos, a veces mi trabajo simplemente no termina. Si se finaliza algún nodo interno en el árbol de trabajo, los resultados intermedios temporales de todas las computaciones anteriores faltarán. Y la computación debe reiniciarse desde el principio.

Hay una función llamada replicate para los objetos Future de Dask, pero no pude encontrar una característica similar en Dask Bag o Dataframe de nivel superior para garantizar la resistencia de los datos. Por favor, avíseme si hay un tratamiento común para mantener los resultados intermedios en un clúster de Dask con una tasa de falla muy alta.

Actualización – Mi solución alternativa

Tal vez cualquier sistema de computación distribuida sufrirá fallas frecuentes aunque pueda recuperarse de ellas. En mi caso, el cierre de los workers no es esencialmente una falla del sistema, sino que es desencadenada por el proceso de mayor orden. Así que en lugar de finalizar directamente mis workers, el proceso de mayor orden ahora ejecuta un pequeño script de Python para enviar el comando retire_worker() cuando comienza a ejecutarse.

Como se documenta, con retire_worker() el planificador moverá los datos del worker retirado a otro disponible. Así que mi problema queda resuelto temporalmente. Sin embargo, mantengo la pregunta abierta porque creo que la computación replicada y redundante sería una solución más rápida y aprovecharía mejor los nodos inactivos en el clúster.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Solución para mantener los resultados intermedios en un clúster Dask con una alta tasa de fallos

    Si está ejecutando cálculos en un clúster Dask con una alta tasa de fallos, es importante asegurar la resiliencia de los datos para evitar la pérdida de resultados intermedios. Si bien las Dask Futures tienen un método replicate, no existe una característica similar para las Dask Bags o DataFrames de nivel superior.

    Una solución alternativa para este problema es implementar una política de retirada para sus trabajadores Dask. Al implementar una política que retire los trabajadores en lugar de matarlos directamente, puede asegurarse de que Dask pueda mover los datos del trabajador retirado a otro nodo disponible. Esto ayuda a evitar la pérdida de resultados intermedios en caso de fallo de un trabajador.

    Para retirar un trabajador Dask, puede utilizar el método retire_worker() del cliente Dask. Este método envía una solicitud al planificador para retirar el trabajador especificado. Luego, el planificador mueve cualquier dato ubicado en ese trabajador a otro nodo disponible antes de retirarlo.

    Si bien esta solución puede no ser tan rápida como la computación replicada o redundante, aún puede ayudar a garantizar la resiliencia de los datos intermedios en un clúster Dask con una alta tasa de fallos.

Comments are closed.