Obtener información del tipo de tabla en Flink.
Tengo una tabla de flink, digamos CREATE TABLE source(id int, name string) with (...)
y una tabla de destino, digamos CREATE TABLE destination(id int, unique_name string) with (...)
. El campo unique_name
se calcula utilizando lógica empresarial en la función de proceso interna de flink.
Por lo tanto, podemos asumir de manera segura que el esquema de origen será exactamente el mismo (nombre y tipos de datos) que el esquema de destino.
Realicé un proceso de bajo nivel utilizando la API de DataStream para obtener el DataStream destination
. Tiene el tipo de salida outputType
como GenericType<org.apache.flink.types.Row>
. Cuando vuelvo a convertir el DataStream destination
en una tabla, obtengo el siguiente error.
org.apache.flink.table.api.ValidationException: Los tipos de columna del resultado de la consulta y del sink para la tabla registrada 'default_catalog.default_database.destination' no coinciden.
Causa: Diferente número de columnas.
Esquema de la consulta: [f0: RAW('org.apache.flink.types.Row', '...')]
Esquema del sink: [id: INT, name: STRING]
Aunque puedo resolver este problema utilizando el siguiente código, me gustaría generalizar esto y obtener RowTypeInformation
de la Tabla
de destino. ¿Hay alguna manera de obtener TypeInformation
de la Tabla
de flink?
tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
davy.ai
Sí, hay una forma de obtener
TypeInformation
de unaTable
de Flink. Puedes utilizar la claseTableSchema
para obtener la información del esquema de la tabla y luego usarla para crear un objetoTypeInformation
para el tipo de fila.Aquí tienes un ejemplo de código:
Ahora puedes utilizar el objeto
rowTypeInformation
para convertirdestinationDataStream
de vuelta a unaTable
de Flink sin tener que especificar manualmente el tipo de fila.Esto debería solucionar el error que estabas enfrentando anteriormente.