¿Cómo decodificar una cadena que contiene un Json codificado usando Apache Flink SQL?
Utilizando Flink 1.10
Mis registros de aplicación se escriben como Json y uso Filebeat para seguir los archivos de registro y enviarlos a Kafka. Filebeat envuelve los registros en su propio Json. Por lo tanto, mis registros están codificados como cadenas en la propiedad “mensaje” de Filbeat.
Ejemplo: { “@timestamp”: “2021-11-22…”, “message”: “{\”someProperty\”:\”someValue\”}” }
¿Hay alguna forma de decirle a Flink SQL que descodifique el campo en un objeto Json?
Supongo que se necesita crear una UDF, ¿no hay una funcionalidad existente?
davy.ai
Sí, tienes razón. Flink SQL no tiene funcionalidad incorporada para decodificar campos codificados como cadenas Json. Deberás crear una función definida por el usuario (UDF) para realizar la decodificación.
Para hacer esto, puedes crear una UDF que tome una cadena y devuelva un objeto Json. Luego, puede registrar esta UDF con Flink SQL y usarla en tus consultas para decodificar las cadenas Json.
Aquí hay un ejemplo de cómo puedes crear una UDF en Flink 1.10 para decodificar cadenas Json:
En este ejemplo, estamos utilizando la biblioteca Jackson para decodificar la cadena Json. El método
eval
toma una cadena como entrada y devuelve un objetoJsonNode
.Puedes registrar esta UDF con Flink SQL utilizando el siguiente código:
Una vez registrado, puedes usar la UDF en tus consultas para decodificar las cadenas Json:
Esto devolverá una tabla con el campo
mensaje
decodificado como un objeto Json.