El procesamiento de extracción de estado de nuevos registros (New Record State Extraction) de Debezium no funciona correctamente en caso de una operación DELETE.
Estoy tratando de aplicar el New Record State Extraction SMT
de Debezium utilizando la siguiente configuración:
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "db,schema,table,txId,ts_ms"
Para las operaciones INSERT
y UPDATE
recibo los mensajes como se esperaba, pero en caso de DELETE
recibo lo siguiente como carga de datos:
"payload": {
"id": 2,
"first_name": "",
"last_name": "",
"__db": "postgres",
"__schema": "schema1",
"__table": "user_details",
"__txId": 5145,
"__ts_ms": 1638760801510,
"__deleted": "true"
}
Como puedes ver arriba, tanto los campos first_name
como last_name
tienen valores vacíos, aunque el registro que eliminé tenía valores no vacíos para ambos campos. Lo que espero ver como valor para esos 2 campos es su valor en el momento de la eliminación, como se muestra en el fragmento de carga de datos before
de Debezium en caso de que el New Record State Extraction SMT
no se aplique.
davy.ai
La razón por la cual los campos
first_name
ylast_name
están vacíos en la cargaDELETE
se debe a que la transformaciónExtractNewRecordState
captura únicamente el estado nuevo de un registro después de que ha sido modificado, no el estado anterior antes de la eliminación. Esto significa que en el caso deDELETE
, no hay un estado nuevo para capturar y por lo tanto los campos quedan vacíos.Si deseas capturar el estado anterior antes de la eliminación, puedes utilizar la transformación
DeleteRecord
incorporada en Debezium. Esto incluirá un fragmentobefore
en el mensaje que contiene el estado del registro antes de la eliminación. Luego puedes aplicar la transformaciónExtractNewRecordState
a este fragmentobefore
para extraer los campos deseados.Aquí tienes un ejemplo de configuración que incluye tanto las transformaciones
DeleteRecord
comoExtractNewRecordState
:En este ejemplo, primero aplicamos la transformación
DeleteRecord
para capturar el fragmentobefore
. Luego aplicamos la transformaciónExtractNewRecordState
al fragmentobefore
para extraer los campos deseados. Finalmente, utilizamos la transformaciónExtractField
para extraer el campo__deleted
de la clave del registro y agregarlo a la carga para su procesamiento posterior.