??????
??????????????????????????????join????????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??11??25??(??????) ????7:31
??????: "user-zh"<[email protected]>;
????: Re:flink 1.11.2 rowtime??proctime?? Interval Join ????????????
Hi,
???????? 2 ???????????????????????????????????? interval join??
???? match ??????regular join ?????????? join
????????????????????????????????
Interval join ???? 2 ?????????????????????????????????? 2
????????????????????????
Best,
Hailong
?? 2020-11-25 16:23:27??"Asahi Lee" <[email protected]> ??????
>?????? ????????????????????????????????????Interval
Join??????????????????????flink
1.11.2????????????????????????????????????????????????join????????????join??????????????????
???? l_table.l_rt = r_table.r_pt ????????????????l_table.l_rt BETWEEN
r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5'
SECOND??????????package join;
>
>import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
> public static void main(String[] args) {
> StreamExecutionEnvironment bsEnv
= StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment
bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
> String lTable = "CREATE TABLE
l_table ( " +
>
" l_a INT, " +
>
" l_b string, " +
>
" l_rt AS localtimestamp, " +
>
" WATERMARK FOR l_rt AS l_rt " +
>
") WITH ( " +
>
" 'connector' = 'datagen', " +
>
" 'rows-per-second'='5', " +
>
" 'fields.l_a.min'='1', " +
>
" 'fields.l_a.max'='5', " +
>
" 'fields.l_b.length'='5' " +
>
")";
> bsTableEnv.executeSql(lTable);
>
> String rTable = "CREATE TABLE
r_table ( " +
>
" r_a INT, " +
>
" r_b string, " +
>
" r_pt AS proctime() " +
>
") WITH ( " +
>
" 'connector' = 'datagen', " +
>
" 'rows-per-second'='5', " +
>
" 'fields.r_a.min'='1', " +
>
" 'fields.r_a.max'='5', " +
>
" 'fields.r_b.length'='5' " +
>
")";
> bsTableEnv.executeSql(rTable);
>
> String printTable = "CREATE
TABLE print (" +
>
" l_a INT, " +
>
" l_b string, " +
>
" l_rt timestamp(3), " +
>
" r_a INT, " +
>
" r_b string, " +
>
" r_pt timestamp(3) " +
>
") WITH ( " +
>
" 'connector' = 'print' " +
>
") ";
>
>
bsTableEnv.executeSql(printTable);
>
> // ????????
>// Table joinTable =
bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a =
r_table.r_a and l_table.l_rt = r_table.r_pt");
>
> // ??????????????Rowtime
attributes must not be in the input rows of a regular join. As a workaround you
can cast the time attributes of input tables to TIMESTAMP before.
> Table joinTable =
bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a =
r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND
r_table.r_pt + INTERVAL '5' SECOND");
>
> bsTableEnv.executeSql("insert
into print select * from " + joinTable);
>
> }
>
>}