Hi!

I'm running into a problem when converting back and forth from a streaming 
table to a data stream. Given the following
table DDL:

   create table masterdata
   (
      facility text,
      manufacturer text,
      serial integer,
      latitude double precision,
      longitude double precision,
      elevation double precision
   );

and a corresponding POJO

   public class MasterData {

      public Double elevation;

      public String facility;

      public Double latitude;

      public Double longitude;

      public String manufacturer;

      public Long serial;

      // getter/setter omitted

   }

I register the database using JdbcCatalog like this:

   JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
password, baseUrl);
   tableEnv.registerCatalog("cat", catalog);
   tableEnv.useCatalog("cat");

and if I try to create a table with either "SELECT * FROM masterdata" or via

   tableEnv.from("masterdata");

It will bail out with an exception similar to

   Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Column types of query result and sink for
   registered table 'cat.postgres.Unregistered_DataStream_Sink_1' do not match.

   Cause: Incompatible types for sink column 'elevation' at position 1.

   Query schema: [facility: STRING, manufacturer: STRING, serial: INT, 
latitude: DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
   Sink schema:  [elevation: DOUBLE, facility: STRING, latitude: DOUBLE, 
longitude: DOUBLE, manufacturer: STRING, serial: BIGINT]

If i explicitly set the order of the columns in the SELECT like this:

   tableEnv.sqlQuery("SELECT 
elevation,facility,latitude,longitude,manufacturer,serial from masterdata");

it works. In the debugger I can see that "queryFields" and "sinkField" in the 
call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
() are not aligned, i.e. the order of the fields in those two lists are not the 
same, hence the exception.

According to relevant note in the docs [1]:

   the planner reorders fields and inserts implicit casts where possible to 
convert internal
   data structures to the desired structured type

Which makes me think that as long as the names of the fields in the POJO 
correspond to the column names in the table,
the planner should take care of reordering and the explicit "SELECT elevation, 
..." should not be needed.

What am I missing?

Thanks!

Oliver

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream

Reply via email to