es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Ejecute condicionalmente Go Routines consecutivas.

Tengo el siguiente fragmento de código. Estoy intentando ejecutar 3 rutinas GO al mismo tiempo sin exceder tres. Esto funciona como se espera, pero el código se supone que está ejecutando actualizaciones en una tabla de la base de datos.

Entonces, la primera rutina procesa los primeros 50, luego los segundos 50 y luego los terceros 50, y esto se repite. No quiero que dos rutinas procesen las mismas filas al mismo tiempo y debido a la duración de la actualización, esto sucede casi siempre.

Para solucionar esto, empecé a marcar las filas con una nueva columna processing que es un booleano. Lo establezco en verdadero para todas las filas que se van a actualizar cuando comienza la rutina y duermo el script durante 6 segundos para permitir que la bandera se actualice.

Esto funciona durante un período de tiempo aleatorio, pero de vez en cuando, veo que 2-3 trabajos están procesando las mismas filas nuevamente. Siento que el método que estoy usando para evitar actualizaciones duplicadas es un poco chapucero y me preguntaba si hay una mejor manera.

stopper := make(chan struct{}, 3)
var counter int
for {
    counter++
    stopper <- struct{}{}
    go func(db *sqlx.DB, c int) {
        fmt.Println("start")
        updateTables(db)
        fmt.Println("stop")
        <-stopper
    }(db, counter)
    time.Sleep(6 * time.Second)
}
// in updateTables
var ids []string
err := sqlx.Select(db, &data, "select * from table_data where processing = false")
if err != nil {
    panic(err)
}
for _, row := range data {
    list = append(ids, row.Id)
}
if len(rows) == 0 {
    return
}

for _, row := range data {
    _, err = db.Exec("update table_data set processing = true where id = $1", row.Id)
    if err != nil {
        panic(err)
    }
}
// Additional row processing
Tags:  ,

Answer

  1. Avatar for davy.ai

    El enfoque actual para evitar actualizaciones duplicadas mediante el uso de una columna de bandera y esperando 6 segundos parece poco confiable y puede causar problemas de rendimiento. Un enfoque mejor sería utilizar un mecanismo de sincronización como un mutex o un semáforo para asegurar que solo una rutina esté actualizando una fila en cualquier momento dado.

    Una posible solución sería crear un canal para cada fila en la tabla y limitar el número de canales activos a 3, asegurando que no más de 3 rutinas puedan actualizar la misma fila al mismo tiempo. Así es como esto podría funcionar:

    type rowUpdate struct {
        id      string
        channel chan bool
    }
    
    func updateTable(db *sqlx.DB, row rowUpdate) {
        _, err := db.Exec(`update table_data set processing = true where id = $1`, row.id)
        if err != nil {
            panic(err)
        }
        // Procesamiento adicional de la fila
        _, err = db.Exec(`update table_data set processing = false where id = $1`, row.id)
        if err != nil {
            panic(err)
        }
        row.channel <- true
    }
    
    func main() {
        // Crear canales para cada fila
        rows := []rowUpdate{}
        data := []tableData{}
        err := sqlx.Select(db, &data, `select id from table_data where processing = false`)
        if err != nil {
            panic(err)
        }
        for _, row := range data {
            rows = append(rows, rowUpdate{id: row.id, channel: make(chan bool, 1)})
        }
    
        // Limitar a 3 canales activos a la vez
        sem := make(chan bool, 3)
    
        // Iniciar rutinas
        for {
            for i := range rows {
                select {
                case sem <- true:
                    go func(db *sqlx.DB, row rowUpdate) {
                        updateTable(db, row)
                        <-sem
                    }(db, rows[i])
                default:
                    // Omitir si ya se está ejecutando el número máximo de rutinas
                }
            }
            time.Sleep(6 * time.Second)
        }
    }
    

    En este código actualizado, creamos una lista de objetos rowUpdate que contienen el ID de la fila y un canal. Luego inicializamos la lista con todas las filas que aún no se han procesado.

    A continuación, creamos un canal sem con una capacidad de 3 para limitar el número de rutinas activas al mismo tiempo. Iniciamos un bucle que recorre cada fila y, para cada fila, intentamos enviar un valor al canal sem. Si el canal está lleno (es decir, ya hay 3 rutinas ejecutándose), omitimos la fila y pasamos a la siguiente.

    Si hay espacio disponible en el canal sem, iniciamos una nueva rutina que llama a la función updateTable. Esta función toma un objeto rowUpdate, actualiza la fila con el ID proporcionado y avisa al canal cuando haya terminado. Esto asegura que solo una rutina esté actualizando una fila en cualquier momento dado.

    Es importante tener en cuenta que este código asume que no hay problemas de concurrencia al actualizar la columna processing. Si existe el riesgo de que varias rutinas establezcan la columna como true simultáneamente, deberías utilizar un mutex u otro mecanismo de sincronización similar para evitar condiciones de carrera.

Comments are closed.