Hi, 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。 Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
Best, Hailong 在 2020-11-25 16:23:27,"Asahi Lee" <[email protected]> 写道: >你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink >1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = >r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND >AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join; > >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > > public static void main(String[] args) { > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > > String lTable = "CREATE TABLE l_table ( " + > " l_a INT, " + > " l_b string, " + > " l_rt AS localtimestamp, " + > " WATERMARK FOR l_rt AS l_rt " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='5', " + > " 'fields.l_a.min'='1', " + > " 'fields.l_a.max'='5', " + > " 'fields.l_b.length'='5' " + > ")"; > bsTableEnv.executeSql(lTable); > > String rTable = "CREATE TABLE r_table ( " + > " r_a INT, " + > " r_b string, " + > " r_pt AS proctime() " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='5', " + > " 'fields.r_a.min'='1', " + > " 'fields.r_a.max'='5', " + > " 'fields.r_b.length'='5' " + > ")"; > bsTableEnv.executeSql(rTable); > > String printTable = "CREATE TABLE print (" + > " l_a INT, " + > " l_b string, " + > " l_rt timestamp(3), " + > " r_a INT, " + > " r_b string, " + > " r_pt timestamp(3) " + > ") WITH ( " + > " 'connector' = 'print' " + > ") "; > > bsTableEnv.executeSql(printTable); > > // 运行成功 >// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join >r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > > // 运行错误,提示Rowtime attributes must not be in the input rows of a > regular join. As a workaround you can cast the time attributes of input > tables to TIMESTAMP before. > Table joinTable = bsTableEnv.sqlQuery("select * from l_table join > r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - > INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > > bsTableEnv.executeSql("insert into print select * from " + joinTable); > > } > >}
