es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Obtener el mensaje/indicador de error de un proceso en cola en Python Multiprocessing.

Estoy preparando una herramienta de multiprocesamiento en Python en la que utilizo comandos Process y Queue. La cola pone otro script en un proceso para ejecutar en paralelo. Como comprobación sanitaria, en la cola, quiero comprobar si hay algún error ocurriendo en mi otro script y devolver una bandera/mensaje si hubo un error (status = os.system() ejecutará el proceso y status es una bandera para el error). Pero no puedo mostrar errores de la cola/hijo en el proceso consumer hacia el proceso principal. Los siguientes son las partes principales de mi código (acortado):

import os
import time
from multiprocessing import Process, Queue, Lock

command_queue = Queue()
lock = Lock()

p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock))
    consumers.append(c)

p.daemon = True
p.start()

for c in consumers:
    c.daemon = True
    c.start()

p.join()
for c in consumers:
    c.join()

if error_flag:
    ¡Detener este proceso y enviar un mensaje!


def producer(queue, lock, ...):
    for config_path in test_config_list_path:
        queue.put((config_path, process_to_be_queued))

def consumer(queue, lock):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag = 1
    time.sleep(3)

Ahora quiero obtener esa error_flag y utilizarla en el código principal para manejar las cosas. Pero parece que no puedo mostrar error_flag desde la parte de consumer (hijo) hasta la parte principal del código. Agradecería si alguien pudiera ayudar con esto.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para generar el error_flag del proceso consumer en el proceso principal, se puede utilizar un objeto multiprocessing.Value. Aquí hay un ejemplo de cómo modificar su código para usar un objeto Value compartido:

    import os
    import time
    from multiprocessing import Process, Queue, Lock, Value
    
    command_queue = Queue()
    lock = Lock()
    
    error_flag = Value('i', 0)  # valor entero compartido inicializado en 0
    
    p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
    for i in range(consumer_num):
        c = Process(target=consumer, args=(command_queue, lock, error_flag))
        consumers.append(c)
    
    p.daemon = True
    p.start()
    
    for c in consumers:
        c.daemon = True
        c.start()
    
    p.join()
    for c in consumers:
        c.join()
    
    if error_flag.value:
        # manejar el error
        print("¡El indicador de error está activado!")
    
    def producer(queue, lock, ...):
        for config_path in test_config_list_path:
            queue.put((config_path, process_to_be_queued))
    
    
    def consumer(queue, lock, error_flag):
        while True:
            elem = queue.get()
            if elem is None:
                return
            status = os.system(elem[1])
            if status:
                error_flag.value = 1  # establecer error_flag a 1
            time.sleep(3)
    

    En este código modificado, creamos un objeto Value compartido llamado error_flag y lo inicializamos en 0. Este objeto se puede acceder y modificar tanto por la función consumer como por el proceso principal. Pasamos este objeto como argumento a la función consumer.

    Dentro de la función consumer, establecemos la propiedad value del objeto error_flag en 1 si hay un error.

    En la parte principal del código, después de unir los procesos, podemos verificar la propiedad value del objeto error_flag para ver si hubo un error.

Comments are closed.