es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo consultar una tabla de Postgres que fue creada por un conector de Kafka.

Estoy utilizando Docker, Postgres, Pgadmin, Kafka y KSQL para analizar algunos datos.

Mi docker-compose para Postgres y Pgadmin se ve así:

postgres:
  image: postgres
  hostname: postgres
  ports:
    - 5432:5432
  environment:
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: postgres
    POSTGRES_DB: postgres
  volumes:
    - postgres-data:/var/lib/postgresql/data
  restart: unless-stopped

pgadmin:
  image: dpage/pgadmin4
  depends_on:
    - postgres
  ports:
    - 5555:80
  environment:
    PGADMIN_DEFAULT_EMAIL: admin@admin.com
    PGADMIN_DEFAULT_PASSWORD: admin
  restart: unless-stopped

Después de iniciar el Docker, creo un conector de origen con un tema de Kafka proveniente de Twitter. Luego creo un flujo que escucha el conector de origen. Después, creo un conector de destino que va desde el tema de Kafka hacia Postgres.

Cuando ingreso a la página de pgadmin, puedo ver que se ha creado una tabla con el nombre del tema y se insertan valores. En PGADMIN puedo hacer consultas a esta tabla usando: SELECT * FROM public."TWEETS_THIN"

Ahora estoy tratando de consultar esta tabla a través de psycopg2. Creo la conexión al servidor de la siguiente manera:

import psycopg2

try:
    conn = psycopg2.connect("dbname='postgres' user='postgres' host='postgres' password='postgres'")
except psycopg2.Error as e:
    print(e)

try:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM public.TWEETS_THIN")
    for item in cursor.fetchall():
        print(item)
except psycopg2.Error as e:
    print(e)
cursor.close()

Pero obtengo el siguiente error y no sé por qué: “current transaction is aborted, commands ignored until end of transaction block”.

¿Cómo puedo hacer consultas a esta tabla con psycopg2?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    El mensaje de error “la transacción actual ha sido abortada, los comandos serán ignorados hasta el final del bloque de transacción” indica que hay una transacción en curso que ha sido abortada y cualquier comando posterior será ignorado hasta el final del bloque de transacción.

    Para solucionar esto, debes iniciar una nueva transacción antes de ejecutar la consulta. Puedes hacer esto agregando la siguiente línea antes de la llamada cursor.execute:

    conn.autocommit = True
    

    Esto iniciará una nueva transacción y la consulta se ejecutará correctamente.

    Alternativamente, puedes envolver la llamada cursor.execute dentro de una declaración conn.commit(), de esta manera:

    cursor = conn.cursor()
    cursor.execute("SELECT * FROM public.TWEETS_THIN")
    conn.commit()
    for item in cursor.fetchall():
        print(item)
    

    Esto confirmará explícitamente la transacción en curso y la consulta se ejecutará correctamente.

    Ten en cuenta que al usar psycopg2, debes confirmar explícitamente cualquier cambio realizado en la base de datos, o serán revertidos cuando se cierre la conexión.

Comments are closed.