es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

El procesamiento de extracción de estado de nuevos registros (New Record State Extraction) de Debezium no funciona correctamente en caso de una operación DELETE.

Estoy tratando de aplicar el New Record State Extraction SMT de Debezium utilizando la siguiente configuración:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "db,schema,table,txId,ts_ms"

Para las operaciones INSERT y UPDATE recibo los mensajes como se esperaba, pero en caso de DELETE recibo lo siguiente como carga de datos:

"payload": {
    "id": 2,
    "first_name": "",
    "last_name": "",
    "__db": "postgres",
    "__schema": "schema1",
    "__table": "user_details",
    "__txId": 5145,
    "__ts_ms": 1638760801510,
    "__deleted": "true"
}

Como puedes ver arriba, tanto los campos first_name como last_name tienen valores vacíos, aunque el registro que eliminé tenía valores no vacíos para ambos campos. Lo que espero ver como valor para esos 2 campos es su valor en el momento de la eliminación, como se muestra en el fragmento de carga de datos before de Debezium en caso de que el New Record State Extraction SMT no se aplique.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    La razón por la cual los campos first_name y last_name están vacíos en la carga DELETE se debe a que la transformación ExtractNewRecordState captura únicamente el estado nuevo de un registro después de que ha sido modificado, no el estado anterior antes de la eliminación. Esto significa que en el caso de DELETE, no hay un estado nuevo para capturar y por lo tanto los campos quedan vacíos.

    Si deseas capturar el estado anterior antes de la eliminación, puedes utilizar la transformación DeleteRecord incorporada en Debezium. Esto incluirá un fragmento before en el mensaje que contiene el estado del registro antes de la eliminación. Luego puedes aplicar la transformación ExtractNewRecordState a este fragmento before para extraer los campos deseados.

    Aquí tienes un ejemplo de configuración que incluye tanto las transformaciones DeleteRecord como ExtractNewRecordState:

    "transforms": "unwrap,delete,extract",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": true,
    "transforms.delete.type": "io.debezium.transforms.DeleteRecord",
    "transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extract.field": "__deleted",
    "transforms.extract.skip.missing.or.null": true,
    

    En este ejemplo, primero aplicamos la transformación DeleteRecord para capturar el fragmento before. Luego aplicamos la transformación ExtractNewRecordState al fragmento before para extraer los campos deseados. Finalmente, utilizamos la transformación ExtractField para extraer el campo __deleted de la clave del registro y agregarlo a la carga para su procesamiento posterior.

Comments are closed.