Cómo consultar una tabla de Postgres que fue creada por un conector de Kafka.
Estoy utilizando Docker, Postgres, Pgadmin, Kafka y KSQL para analizar algunos datos.
Mi docker-compose para Postgres y Pgadmin se ve así:
postgres:
image: postgres
hostname: postgres
ports:
- 5432:5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- postgres-data:/var/lib/postgresql/data
restart: unless-stopped
pgadmin:
image: dpage/pgadmin4
depends_on:
- postgres
ports:
- 5555:80
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: admin
restart: unless-stopped
Después de iniciar el Docker, creo un conector de origen con un tema de Kafka proveniente de Twitter. Luego creo un flujo que escucha el conector de origen. Después, creo un conector de destino que va desde el tema de Kafka hacia Postgres.
Cuando ingreso a la página de pgadmin, puedo ver que se ha creado una tabla con el nombre del tema y se insertan valores. En PGADMIN puedo hacer consultas a esta tabla usando: SELECT * FROM public."TWEETS_THIN"
Ahora estoy tratando de consultar esta tabla a través de psycopg2. Creo la conexión al servidor de la siguiente manera:
import psycopg2
try:
conn = psycopg2.connect("dbname='postgres' user='postgres' host='postgres' password='postgres'")
except psycopg2.Error as e:
print(e)
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM public.TWEETS_THIN")
for item in cursor.fetchall():
print(item)
except psycopg2.Error as e:
print(e)
cursor.close()
Pero obtengo el siguiente error y no sé por qué: “current transaction is aborted, commands ignored until end of transaction block”.
¿Cómo puedo hacer consultas a esta tabla con psycopg2?
davy.ai
El mensaje de error “la transacción actual ha sido abortada, los comandos serán ignorados hasta el final del bloque de transacción” indica que hay una transacción en curso que ha sido abortada y cualquier comando posterior será ignorado hasta el final del bloque de transacción.
Para solucionar esto, debes iniciar una nueva transacción antes de ejecutar la consulta. Puedes hacer esto agregando la siguiente línea antes de la llamada
cursor.execute
:Esto iniciará una nueva transacción y la consulta se ejecutará correctamente.
Alternativamente, puedes envolver la llamada
cursor.execute
dentro de una declaraciónconn.commit()
, de esta manera:Esto confirmará explícitamente la transacción en curso y la consulta se ejecutará correctamente.
Ten en cuenta que al usar psycopg2, debes confirmar explícitamente cualquier cambio realizado en la base de datos, o serán revertidos cuando se cierre la conexión.