??sql????????????????????????????????job??????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??8??23??(??????) ????2:29
??????: "user-zh"<[email protected]>;
????: Re: 1.11??????????TableEnvironment.executeSql("insert into
...")??job??????????????
?????????????????????????????????? jobName
> 2020??8??21?? ????11:11??Asahi Lee <[email protected]> ??????
>
> ??????
> &nbsp; &nbsp; &nbsp;????????????????insert
into??????????????????????????????job????????
>
>
> ??????
> EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().build();
> TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
>
> String sourceDDL = "CREATE TABLE datagen ( " +
> " f_random INT, " +
> " f_random_str STRING, " +
> " ts AS localtimestamp, " +
> " WATERMARK FOR ts AS ts "
+
> ") WITH ( " +
> " 'connector' = 'datagen',
" +
> " 'rows-per-second'='10',
" +
> "
'fields.f_random.min'='1', " +
> "
'fields.f_random.max'='5', " +
> "
'fields.f_random_str.length'='10' " +
> ")";
>
> bsTableEnv.executeSql(sourceDDL);
> Table datagen = bsTableEnv.from("datagen");
>
> System.out.println(datagen.getSchema());
>
> String sinkDDL = "CREATE TABLE print_table (" +
> " f_random int," +
> " c_val bigint, " +
> " wStart TIMESTAMP(3) " +
> ") WITH ('connector' = 'print')
";
> bsTableEnv.executeSql(sinkDDL);
>
> System.out.println(bsTableEnv.from("print_table").getSchema());
>
> Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str),
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by
TUMBLE(ts, INTERVAL '5' second), f_random");
> bsTableEnv.executeSql("insert into print_table select * from " + table);