??????
     ????????????????insert 
into??????????????????????????????job????????


??????
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);

String sourceDDL = "CREATE TABLE datagen (  " +
        " f_random INT,  " +
        " f_random_str STRING,  " +
        " ts AS localtimestamp,  " +
        " WATERMARK FOR ts AS ts  " +
        ") WITH (  " +
        " 'connector' = 'datagen',  " +
        " 'rows-per-second'='10',  " +
        " 'fields.f_random.min'='1',  " +
        " 'fields.f_random.max'='5',  " +
        " 'fields.f_random_str.length'='10'  " +
        ")";

bsTableEnv.executeSql(sourceDDL);
Table datagen = bsTableEnv.from("datagen");

System.out.println(datagen.getSchema());

String sinkDDL = "CREATE TABLE print_table (" +
        " f_random int," +
        " c_val bigint, " +
        " wStart TIMESTAMP(3) " +
        ") WITH ('connector' = 'print') ";
bsTableEnv.executeSql(sinkDDL);

System.out.println(bsTableEnv.from("print_table").getSchema());

Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), 
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by 
TUMBLE(ts, INTERVAL '5' second), f_random");
bsTableEnv.executeSql("insert into print_table select * from " + table);

回复