es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo decodificar una cadena que contiene un Json codificado usando Apache Flink SQL?

Utilizando Flink 1.10

Mis registros de aplicación se escriben como Json y uso Filebeat para seguir los archivos de registro y enviarlos a Kafka. Filebeat envuelve los registros en su propio Json. Por lo tanto, mis registros están codificados como cadenas en la propiedad “mensaje” de Filbeat.

Ejemplo: { “@timestamp”: “2021-11-22…”, “message”: “{\”someProperty\”:\”someValue\”}” }

¿Hay alguna forma de decirle a Flink SQL que descodifique el campo en un objeto Json?

Supongo que se necesita crear una UDF, ¿no hay una funcionalidad existente?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, tienes razón. Flink SQL no tiene funcionalidad incorporada para decodificar campos codificados como cadenas Json. Deberás crear una función definida por el usuario (UDF) para realizar la decodificación.

    Para hacer esto, puedes crear una UDF que tome una cadena y devuelva un objeto Json. Luego, puede registrar esta UDF con Flink SQL y usarla en tus consultas para decodificar las cadenas Json.

    Aquí hay un ejemplo de cómo puedes crear una UDF en Flink 1.10 para decodificar cadenas Json:

    import org.apache.flink.table.functions.ScalarFunction;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.JsonNode;
    
    public class JsonDecode extends ScalarFunction {
        public static final String FUNC_NAME = "jsonDecode";
        private static final ObjectMapper objectMapper = new ObjectMapper();
    
        public JsonNode eval(String jsonStr) {
            try {
                return objectMapper.readTree(jsonStr);
            } catch (Exception e) {
                // manejar errores de decodificación de Json
            }
        }
    }
    

    En este ejemplo, estamos utilizando la biblioteca Jackson para decodificar la cadena Json. El método eval toma una cadena como entrada y devuelve un objeto JsonNode.

    Puedes registrar esta UDF con Flink SQL utilizando el siguiente código:

    tableEnv.createTemporaryFunction(JsonDecode.FUNC_NAME, JsonDecode.class);
    

    Una vez registrado, puedes usar la UDF en tus consultas para decodificar las cadenas Json:

    SELECT jsonDecode(message) as decoded_message
    FROM logs;
    

    Esto devolverá una tabla con el campo mensaje decodificado como un objeto Json.

Comments are closed.