??????     ????????????????????????????????????Interval 
Join??????????????????????flink 
1.11.2????????????????????????????????????????????????join????????????join??????????????????
     ???? l_table.l_rt = r_table.r_pt ????????????????l_table.l_rt BETWEEN 
r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' 
SECOND??????????package join;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test1 {

    public static void main(String[] args) {
        StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

        String lTable = "CREATE TABLE l_table (  " +
                " l_a INT,  " +
                " l_b string,  " +
                " l_rt AS localtimestamp,  " +
                " WATERMARK FOR l_rt AS l_rt  " +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.l_a.min'='1',  " +
                " 'fields.l_a.max'='5',  " +
                " 'fields.l_b.length'='5'  " +
                ")";
        bsTableEnv.executeSql(lTable);

        String rTable = "CREATE TABLE r_table (  " +
                " r_a INT,  " +
                " r_b string,  " +
                " r_pt AS proctime()  " +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.r_a.min'='1',  " +
                " 'fields.r_a.max'='5',  " +
                " 'fields.r_b.length'='5'  " +
                ")";
        bsTableEnv.executeSql(rTable);

        String printTable = "CREATE TABLE print (" +
                "  l_a INT,  " +
                "  l_b string,  " +
                "  l_rt timestamp(3),  " +
                "  r_a INT,  " +
                "  r_b string,  " +
                "  r_pt timestamp(3)  " +
                ") WITH (  " +
                " 'connector' = 'print' " +
                ") ";

        bsTableEnv.executeSql(printTable);

        // ????????
//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");

        // ??????????????Rowtime attributes must not be in the input rows of a 
regular join. As a workaround you can cast the time attributes of input tables 
to TIMESTAMP before.
        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - 
INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");

        bsTableEnv.executeSql("insert into print select * from " + joinTable);

    }

}

回复