es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Actualizaciones simultáneas con psycopg2

En un servidor, tengo una API que es llamada por uno o varios clientes. El servidor puede recibir, casi al mismo tiempo, múltiples instrucciones de actualización para el mismo atributo de PostgreSQL. Estas solicitudes no llegan en el mismo momento exacto, y me gustaría ejecutarlas una tras otra. Debido a la API (Flask), no puedo controlar que una llamada anterior haya terminado, por lo que la nueva actualización puede ser llamada antes de que la anterior se complete. Esperaría que psycopg2/postgresql pudiera encolar las actualizaciones y proceder con ellas silenciosamente una tras otra (incluso quizás sin orden), pero no parece funcionar de esa manera.

A continuación se muestra una versión simplificada de la función llamada en el servidor cuando se requiere una actualización. El atributo a actualizar es un objeto JSONB, si eso importa. Por lo tanto, hay un ‘path’ para determinar qué parte del objeto JSONB se actualizará. Entonces la llamada a la API se ve así:

def pg_update(data, path):
    conn = psycopg2.connect(...) # siempre la misma base de datos.
    cur = conn.cursor()

    # Actualización JSONB para una sola fila y columna seleccionada.
    # 'column' y 'select_row' se determinan con los parámetros 'data' y 'path'.
    command = (
        f"""UPDATE MY_TABLE SET """
        f"""{column} = jsonb_merge({column}, %s) """
        f"""WHERE {select_row};"""
    )

    cur.execute(command, [Json(data)])
    conn.commit()

Cuando se llama dos veces seguidas, esta llamada produce el error (en “cur.execute”):

psycopg2.errors.InternalError_: tuple concurrently updated

Dado que el servidor sigue en ejecución todo el tiempo, también puedo definir conn fuera de la llamada de API “pg_update” para que todas las llamadas se procesen con la misma conexión psycopg2. Pero luego, cuando se solicitan dos actualizaciones para el mismo atributo, obtengo en su lugar:

psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway

Forcé async_ = False en psycopg2.connect, por si acaso. Sin cambios.

En resumen, ¿cómo puedo permitir múltiples actualizaciones de los mismos atributos, una tras otra, incluso cuando estas actualizaciones se soliciten casi al mismo tiempo desde solicitudes de clientes independientes e incontrolables? ¿Quizás necesito un mecanismo de bloqueo? ¿Un mecanismo de encolado?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para manejar múltiples actualizaciones del mismo atributo, una solución es utilizar bloqueo a nivel de fila. Esto permite que solo una transacción a la vez modifique una fila específica, evitando modificaciones concurrentes y posibles conflictos.

    Para implementar el bloqueo a nivel de fila en PostgreSQL y psycopg2, puedes seguir los siguientes pasos:

    1. Modifica tu consulta SQL para incluir una cláusula FOR UPDATE al final de la declaración SELECT. Esto bloquea la fila seleccionada para su actualización hasta que se complete la transacción.

      Ejemplo:

      comando = (
          f""""SELECT {columna} FROM MI_TABLA WHERE {fila_seleccionada} FOR UPDATE;""""
      )
      
    2. Encierra tu declaración de actualización en un bloque de transacción BEGIN y COMMIT. Esto garantiza que todas las actualizaciones se ejecuten en una única transacción, evitando conflictos.

      Ejemplo:

      comando = (
          f""""BEGIN;""""
          f""""UPDATE MI_TABLA SET {columna} = jsonb_merge({columna}, %s) WHERE {fila_seleccionada};""""
          f""""COMMIT;""""
      )
      
    3. Maneja apropiadamente cualquier excepción, como volver a intentar la transacción si ocurre un conflicto de bloqueo.

      Ejemplo:

      try:
          cur.execute(comando_seleccion)
          resultado = cur.fetchone()
      
          # Realiza la actualización dentro de la transacción
          cur.execute(comando_actualizacion, [Json(datos)])
      
          conn.commit()
      
      except psycopg2.errors.SerializationFailure:
          # Conflicto de bloqueo, reintenta la transacción
          conn.rollback()
      

    También puedes utilizar un mecanismo de cola para manejar las actualizaciones de forma secuencial. Esto involucra el uso de una cola separada (por ejemplo, una cola de Redis) para almacenar las solicitudes de actualización, y un proceso de trabajador separado para tomar las solicitudes una por una y ejecutarlas.

    En general, el bloqueo a nivel de fila es una solución más sencilla y debería ser suficiente para la mayoría de los casos. Sin embargo, puede ser necesario utilizar un mecanismo de cola si se requieren funcionalidades más avanzadas, como priorización o equilibrio de carga.

Comments are closed.