Hi!
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field types of query result and registered TableSink [aggregationTableSink] do 
not match.

SQL = SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as 
tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)
使用table.sqlQuery(SQL),返回的table schema 是 Query result schema: [cnt: Long, 
tumTime: Timestamp]。
而使用 
JsonRowSchemaConverter.convert("{" +
"    type:'object'," +
"    properties:{" +
"        cnt: {" +
"            type: 'number'" +
"        }," +
"        tumTime:{" +
"            type:'string'," +
"            format:'date-time'" +
"        }" +
"    }" +
“}");
创建Elasticsearch6UpsertTableSink table schema 是 TableSink schema:   [cnt: 
BigDecimal, tumTime: Timestamp]
而且我看了 JsonRowSchemaConverter.convert 所有的数字类型都被转成BigDecimal,导致SQL返回的schema 和 
json定义的schema无法匹配。

请问是我使用的问题还是说框架存在这个问题?

附上源代码:

public class AggregationFunction {



    public static void main(String[] args) {
        String sql = "SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL 
'10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' 
SECOND)";
        StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
        DataStream<User> source = senv.addSource(new SourceFunction<User>() {
            @Override
            public void run(SourceContext<User> sourceContext) throws Exception 
{
                int i = 1000;
                String[] names = {"Hanmeimei", "Lilei"};
                while (i > 1) {
                    sourceContext.collect(new User(names[i%2], i, new 
Timestamp(System.currentTimeMillis())));
                    Thread.sleep(10);
                    i--;
                }
            }
            @Override
            public void cancel() {

            }
        });
        tenv.registerDataStream("abc", source, "name, age, timestamp, 
rowtime.rowtime");
        Table table = tenv.sqlQuery(sql);
        List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, 
"http"));
        TypeInformation<Row> typeInformation = 
JsonRowSchemaConverter.convert("{" +
                "    type:'object'," +
                "    properties:{" +
                "        cnt: {" +
                "            type: 'number'" +
                "        }," +
                "        tumTime:{" +
                "            type:'string'," +
                "            format:'date-time'" +
                "        }" +
                "    }" +
                "}");
        RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
        TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes();

        String[] fieldNames = typeInfo.getFieldNames();
        TableSchema.Builder builder = TableSchema.builder();
        for (int i = 0; i < typeInformations.length; i ++) {
            builder.field(fieldNames[i], typeInformations[i]);
        }
        Elasticsearch6UpsertTableSink establesink = new 
Elasticsearch6UpsertTableSink(
                true,
                builder.build(),
                hosts,
                "aggregation",
                "data",
                "$",
                "n/a",
                new JsonRowSerializationSchema.Builder(typeInformation).build(),
                XContentType.JSON,
                new IgnoringFailureHandler(),
                new HashMap<>()
        );
        tenv.registerTableSink("aggregationTableSink", establesink);
        table.insertInto("aggregationTableSink");
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class User {
        private String name;

        private Integer age;

        private Timestamp timestamp;
    }


}



best wish!

回复