Ejecute condicionalmente Go Routines consecutivas.
Tengo el siguiente fragmento de código. Estoy intentando ejecutar 3 rutinas GO al mismo tiempo sin exceder tres. Esto funciona como se espera, pero el código se supone que está ejecutando actualizaciones en una tabla de la base de datos.
Entonces, la primera rutina procesa los primeros 50, luego los segundos 50 y luego los terceros 50, y esto se repite. No quiero que dos rutinas procesen las mismas filas al mismo tiempo y debido a la duración de la actualización, esto sucede casi siempre.
Para solucionar esto, empecé a marcar las filas con una nueva columna processing
que es un booleano. Lo establezco en verdadero para todas las filas que se van a actualizar cuando comienza la rutina y duermo el script durante 6 segundos para permitir que la bandera se actualice.
Esto funciona durante un período de tiempo aleatorio, pero de vez en cuando, veo que 2-3 trabajos están procesando las mismas filas nuevamente. Siento que el método que estoy usando para evitar actualizaciones duplicadas es un poco chapucero y me preguntaba si hay una mejor manera.
stopper := make(chan struct{}, 3)
var counter int
for {
counter++
stopper <- struct{}{}
go func(db *sqlx.DB, c int) {
fmt.Println("start")
updateTables(db)
fmt.Println("stop")
<-stopper
}(db, counter)
time.Sleep(6 * time.Second)
}
// in updateTables
var ids []string
err := sqlx.Select(db, &data, "select * from table_data where processing = false")
if err != nil {
panic(err)
}
for _, row := range data {
list = append(ids, row.Id)
}
if len(rows) == 0 {
return
}
for _, row := range data {
_, err = db.Exec("update table_data set processing = true where id = $1", row.Id)
if err != nil {
panic(err)
}
}
// Additional row processing
davy.ai
El enfoque actual para evitar actualizaciones duplicadas mediante el uso de una columna de bandera y esperando 6 segundos parece poco confiable y puede causar problemas de rendimiento. Un enfoque mejor sería utilizar un mecanismo de sincronización como un mutex o un semáforo para asegurar que solo una rutina esté actualizando una fila en cualquier momento dado.
Una posible solución sería crear un canal para cada fila en la tabla y limitar el número de canales activos a 3, asegurando que no más de 3 rutinas puedan actualizar la misma fila al mismo tiempo. Así es como esto podría funcionar:
En este código actualizado, creamos una lista de objetos
rowUpdate
que contienen el ID de la fila y un canal. Luego inicializamos la lista con todas las filas que aún no se han procesado.A continuación, creamos un canal
sem
con una capacidad de 3 para limitar el número de rutinas activas al mismo tiempo. Iniciamos un bucle que recorre cada fila y, para cada fila, intentamos enviar un valor al canalsem
. Si el canal está lleno (es decir, ya hay 3 rutinas ejecutándose), omitimos la fila y pasamos a la siguiente.Si hay espacio disponible en el canal
sem
, iniciamos una nueva rutina que llama a la funciónupdateTable
. Esta función toma un objetorowUpdate
, actualiza la fila con el ID proporcionado y avisa al canal cuando haya terminado. Esto asegura que solo una rutina esté actualizando una fila en cualquier momento dado.Es importante tener en cuenta que este código asume que no hay problemas de concurrencia al actualizar la columna
processing
. Si existe el riesgo de que varias rutinas establezcan la columna comotrue
simultáneamente, deberías utilizar un mutex u otro mecanismo de sincronización similar para evitar condiciones de carrera.