??????
      
??????????????????????????????join????????????????????????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??11??25??(??????) ????7:31
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re:flink 1.11.2 rowtime??proctime?? Interval Join ????????????



Hi,
&nbsp;&nbsp; ???????? 2 ???????????????????????????????????? interval join??
&nbsp;&nbsp; ???? match ??????regular join ?????????? join 
????????????????????????????????
&nbsp;&nbsp; Interval join ???? 2 ?????????????????????????????????? 2 
????????????????????????


Best,
Hailong

?? 2020-11-25 16:23:27??"Asahi Lee" <[email protected]&gt; ??????
&gt;??????&nbsp;&nbsp;&nbsp;&nbsp; ????????????????????????????????????Interval 
Join??????????????????????flink 
1.11.2????????????????????????????????????????????????join????????????join??????????????????&nbsp;&nbsp;&nbsp;&nbsp;
 ???? l_table.l_rt = r_table.r_pt ????????????????l_table.l_rt BETWEEN 
r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' 
SECOND??????????package join;
&gt;
&gt;import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
&gt;import org.apache.flink.table.api.EnvironmentSettings;
&gt;import org.apache.flink.table.api.Table;
&gt;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
&gt;
&gt;public class Test1 {
&gt;
&gt;&nbsp;&nbsp;&nbsp; public static void main(String[] args) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment bsEnv 
= StreamExecutionEnvironment.getExecutionEnvironment();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamTableEnvironment 
bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String lTable = "CREATE TABLE 
l_table (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " l_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " l_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " l_rt AS localtimestamp,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " WATERMARK FOR l_rt AS l_rt&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'connector' = 'datagen',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'rows-per-second'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'fields.l_a.min'='1',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'fields.l_a.max'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'fields.l_b.length'='5'&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ")";
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(lTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String rTable = "CREATE TABLE 
r_table (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " r_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " r_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " r_pt AS proctime()&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'connector' = 'datagen',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'rows-per-second'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'fields.r_a.min'='1',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'fields.r_a.max'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'fields.r_b.length'='5'&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ")";
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(rTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String printTable = "CREATE 
TABLE print (" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; l_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; l_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; l_rt timestamp(3),&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; r_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; r_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "&nbsp; r_pt timestamp(3)&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " 'connector' = 'print' " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ") ";
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
bsTableEnv.executeSql(printTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // ????????
&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = 
bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = 
r_table.r_a and l_table.l_rt = r_table.r_pt");
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // ??????????????Rowtime 
attributes must not be in the input rows of a regular join. As a workaround you 
can cast the time attributes of input tables to TIMESTAMP before.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = 
bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = 
r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND 
r_table.r_pt + INTERVAL '5' SECOND");
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql("insert 
into print select * from " + joinTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp; }
&gt;
&gt;}

回复