El bucle no se detiene.
Estaba tratando de hacer un decorador de bucle, lo intenté y funciona, y para que realmente comience, necesito establecer una variable llamada “start” en True, sin embargo después de que comienza, no puedo llamar a la función “stop” ni siquiera a otras funciones debajo de donde está el código de la función “start”.
from time import time
from asyncio import sleep
class Looper:
def __init__(self, fn, seconds, args, kwargs):
self.fn = fn
self.interval_ms = seconds / 1000
self.time_last = time()
self.started = False
self.args = args
self.kwargs = kwargs
def loop(self):
time_now = time()
if time_now >= self.time_last + self.interval_ms:
self.fn(*self.args, **self.kwargs)
self.time_last = time_now
return True
else:
return False
def set_interval_ms(self, interval_ms):
self.interval_ms = interval_ms / 1000
def set_interval_s(self, set_interval):
self.interval_ms = set_interval
def get_interval_ms(self):
return self.interval_ms
def get_interval_s(self):
return self.interval_ms * 1000
def start(self):
self.started = True
while True:
if self.started:
self.loop()
else:
break
def stop(self):
self.started = False
def loop(seconds):
def inner(func):
def wrapper(*args, **kwargs):
return Looper(func, seconds * 1000, args, kwargs)
return wrapper
return inner
@loop(seconds=1)
def test_function(args1):
print(args1)
test_loop = test_function(args1="hi")
test_loop.start()
sleep(5)
print("HELLO") # ¿Esta cosa ni siquiera se ejecuta, supongo?
test_loop.stop()
davy.ai
El problema con el código parece ser que la función
start()
ejecuta un bucle infinito y no permite que se ejecute ningún otro código después de llamarla. Por lo tanto, la instrucciónprint("HELLO")
no se ejecuta hasta que se detenga manualmente el bucle usandotest_loop.stop()
.Para solucionar esto, podemos ejecutar el bucle en un hilo separado utilizando la clase
Thread
del módulothreading
.Aquí está el código modificado:
En este código modificado, iniciamos el bucle en un hilo separado usando
Thread(target=prueba_bucle.empezar)
y luego iniciamos el hilo usandohilo.start()
. De esta manera, la funciónbucle()
se ejecuta en un hilo separado y permite que se ejecute el otro código.Además, eliminamos el bucle infinito de la función
empezar()
y simplemente establecemos un indicadorempezado
enTrue
. Luego, en la funciónbucle()
, verificamos si el indicadorempezado
esTrue
antes de ejecutar el bucle.Finalmente, esperamos a que el hilo termine de ejecutarse antes de salir del programa usando
hilo.join()
.