es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Obtener información del tipo de tabla en Flink.

Tengo una tabla de flink, digamos CREATE TABLE source(id int, name string) with (...) y una tabla de destino, digamos CREATE TABLE destination(id int, unique_name string) with (...). El campo unique_name se calcula utilizando lógica empresarial en la función de proceso interna de flink.

Por lo tanto, podemos asumir de manera segura que el esquema de origen será exactamente el mismo (nombre y tipos de datos) que el esquema de destino.
Realicé un proceso de bajo nivel utilizando la API de DataStream para obtener el DataStream destination. Tiene el tipo de salida outputType como GenericType<org.apache.flink.types.Row>. Cuando vuelvo a convertir el DataStream destination en una tabla, obtengo el siguiente error.

org.apache.flink.table.api.ValidationException: Los tipos de columna del resultado de la consulta y del sink para la tabla registrada 'default_catalog.default_database.destination' no coinciden.
Causa: Diferente número de columnas.

Esquema de la consulta: [f0: RAW('org.apache.flink.types.Row', '...')]
Esquema del sink:  [id: INT, name: STRING]

Aunque puedo resolver este problema utilizando el siguiente código, me gustaría generalizar esto y obtener RowTypeInformation de la Tabla de destino. ¿Hay alguna manera de obtener TypeInformation de la Tabla de flink?

tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
Tags:  ,

Answer

  1. Avatar for davy.ai

    Sí, hay una forma de obtener TypeInformation de una Table de Flink. Puedes utilizar la clase TableSchema para obtener la información del esquema de la tabla y luego usarla para crear un objeto TypeInformation para el tipo de fila.

    Aquí tienes un ejemplo de código:

    // Obtén el esquema de la tabla
    TableSchema tableSchema = tableEnv.getTable("destination").getSchema();
    
    // Crea un objeto TypeInformation para el tipo de fila
    TypeInformation<row> rowTypeInformation = TypeInformation.of(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
    

    Ahora puedes utilizar el objeto rowTypeInformation para convertir destinationDataStream de vuelta a una Table de Flink sin tener que especificar manualmente el tipo de fila.

    Table destinationTable = tableEnv.fromDataStream(destinationDataStream, rowTypeInformation);
    

    Esto debería solucionar el error que estabas enfrentando anteriormente.

Comments are closed.