?????? ????????????????????????????????????Interval
Join??????????????????????flink
1.11.2????????????????????????????????????????????????join????????????join??????????????????
???? l_table.l_rt = r_table.r_pt ????????????????l_table.l_rt BETWEEN
r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5'
SECOND??????????package join;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Test1 {
public static void main(String[] args) {
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String lTable = "CREATE TABLE l_table ( " +
" l_a INT, " +
" l_b string, " +
" l_rt AS localtimestamp, " +
" WATERMARK FOR l_rt AS l_rt " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='5', " +
" 'fields.l_a.min'='1', " +
" 'fields.l_a.max'='5', " +
" 'fields.l_b.length'='5' " +
")";
bsTableEnv.executeSql(lTable);
String rTable = "CREATE TABLE r_table ( " +
" r_a INT, " +
" r_b string, " +
" r_pt AS proctime() " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='5', " +
" 'fields.r_a.min'='1', " +
" 'fields.r_a.max'='5', " +
" 'fields.r_b.length'='5' " +
")";
bsTableEnv.executeSql(rTable);
String printTable = "CREATE TABLE print (" +
" l_a INT, " +
" l_b string, " +
" l_rt timestamp(3), " +
" r_a INT, " +
" r_b string, " +
" r_pt timestamp(3) " +
") WITH ( " +
" 'connector' = 'print' " +
") ";
bsTableEnv.executeSql(printTable);
// ????????
// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join
r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
// ??????????????Rowtime attributes must not be in the input rows of a
regular join. As a workaround you can cast the time attributes of input tables
to TIMESTAMP before.
Table joinTable = bsTableEnv.sqlQuery("select * from l_table join
r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt -
INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
bsTableEnv.executeSql("insert into print select * from " + joinTable);
}
}