Hi,
   因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
   而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
   Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。


Best,
Hailong

在 2020-11-25 16:23:27,"Asahi Lee" <[email protected]> 写道:
>你好!     我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 
>1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = 
>r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND 
>AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
>    public static void main(String[] args) {
>        StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
>        String lTable = "CREATE TABLE l_table (  " +
>                " l_a INT,  " +
>                " l_b string,  " +
>                " l_rt AS localtimestamp,  " +
>                " WATERMARK FOR l_rt AS l_rt  " +
>                ") WITH (  " +
>                " 'connector' = 'datagen',  " +
>                " 'rows-per-second'='5',  " +
>                " 'fields.l_a.min'='1',  " +
>                " 'fields.l_a.max'='5',  " +
>                " 'fields.l_b.length'='5'  " +
>                ")";
>        bsTableEnv.executeSql(lTable);
>
>        String rTable = "CREATE TABLE r_table (  " +
>                " r_a INT,  " +
>                " r_b string,  " +
>                " r_pt AS proctime()  " +
>                ") WITH (  " +
>                " 'connector' = 'datagen',  " +
>                " 'rows-per-second'='5',  " +
>                " 'fields.r_a.min'='1',  " +
>                " 'fields.r_a.max'='5',  " +
>                " 'fields.r_b.length'='5'  " +
>                ")";
>        bsTableEnv.executeSql(rTable);
>
>        String printTable = "CREATE TABLE print (" +
>                "  l_a INT,  " +
>                "  l_b string,  " +
>                "  l_rt timestamp(3),  " +
>                "  r_a INT,  " +
>                "  r_b string,  " +
>                "  r_pt timestamp(3)  " +
>                ") WITH (  " +
>                " 'connector' = 'print' " +
>                ") ";
>
>        bsTableEnv.executeSql(printTable);
>
>        // 运行成功
>//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
>r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>
>        // 运行错误,提示Rowtime attributes must not be in the input rows of a 
> regular join. As a workaround you can cast the time attributes of input 
> tables to TIMESTAMP before.
>        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
> r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - 
> INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>
>        bsTableEnv.executeSql("insert into print select * from " + joinTable);
>
>    }
>
>}

回复