Hi! I'm running into a problem when converting back and forth from a streaming table to a data stream. Given the following table DDL:
create table masterdata ( facility text, manufacturer text, serial integer, latitude double precision, longitude double precision, elevation double precision ); and a corresponding POJO public class MasterData { public Double elevation; public String facility; public Double latitude; public Double longitude; public String manufacturer; public Long serial; // getter/setter omitted } I register the database using JdbcCatalog like this: JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl); tableEnv.registerCatalog("cat", catalog); tableEnv.useCatalog("cat"); and if I try to create a table with either "SELECT * FROM masterdata" or via tableEnv.from("masterdata"); It will bail out with an exception similar to Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'cat.postgres.Unregistered_DataStream_Sink_1' do not match. Cause: Incompatible types for sink column 'elevation' at position 1. Query schema: [facility: STRING, manufacturer: STRING, serial: INT, latitude: DOUBLE, longitude: DOUBLE, elevation: DOUBLE] Sink schema: [elevation: DOUBLE, facility: STRING, latitude: DOUBLE, longitude: DOUBLE, manufacturer: STRING, serial: BIGINT] If i explicitly set the order of the columns in the SELECT like this: tableEnv.sqlQuery("SELECT elevation,facility,latitude,longitude,manufacturer,serial from masterdata"); it works. In the debugger I can see that "queryFields" and "sinkField" in the call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast () are not aligned, i.e. the order of the fields in those two lists are not the same, hence the exception. According to relevant note in the docs [1]: the planner reorders fields and inserts implicit casts where possible to convert internal data structures to the desired structured type Which makes me think that as long as the names of the fields in the POJO correspond to the column names in the table, the planner should take care of reordering and the explicit "SELECT elevation, ..." should not be needed. What am I missing? Thanks! Oliver [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream