另一张表可以这么定义:
String rTable = "CREATE TABLE r_table ( " +
" r_a INT, " +
" r_b string, " +
" r_pt AS now(), " +
"WATERMARK FOR r_pt AS r_pt" +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='5', " +
" 'fields.r_a.min'='1', " +
" 'fields.r_a.max'='5', " +
" 'fields.r_b.length'='5' " +
")";
Best,
Hailong
在 2020-11-25 19:05:04,"Asahi Lee" <[email protected]> 写道:
>你好!
> 那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?
>
>
>
>
>------------------ 原始邮件 ------------------
>发件人:
> "user-zh"
>
><[email protected]>;
>发送时间: 2020年11月25日(星期三) 晚上7:31
>收件人: "user-zh"<[email protected]>;
>
>主题: Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
>
>
>
>Hi,
> 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
> 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
> Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
>
>
>Best,
>Hailong
>
>在 2020-11-25 16:23:27,"Asahi Lee" <[email protected]> 写道:
>>你好! 我需要将事件时间的流同处理时间的流做Interval
>Join时提示错误,我是用的是flink
>1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?
>我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt -
>INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>>
>>import
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>import org.apache.flink.table.api.EnvironmentSettings;
>>import org.apache.flink.table.api.Table;
>>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>
>>public class Test1 {
>>
>> public static void main(String[] args) {
>> StreamExecutionEnvironment
>bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings
>=
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment
>bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> String lTable = "CREATE TABLE
>l_table ( " +
>>
> " l_a INT, " +
>>
> " l_b string, " +
>>
> " l_rt AS localtimestamp, " +
>>
> " WATERMARK FOR l_rt AS l_rt " +
>>
> ") WITH ( " +
>>
> " 'connector' = 'datagen', " +
>>
> " 'rows-per-second'='5', " +
>>
> " 'fields.l_a.min'='1', " +
>>
> " 'fields.l_a.max'='5', " +
>>
> " 'fields.l_b.length'='5' " +
>>
> ")";
>> bsTableEnv.executeSql(lTable);
>>
>> String rTable = "CREATE TABLE
>r_table ( " +
>>
> " r_a INT, " +
>>
> " r_b string, " +
>>
> " r_pt AS proctime() " +
>>
> ") WITH ( " +
>>
> " 'connector' = 'datagen', " +
>>
> " 'rows-per-second'='5', " +
>>
> " 'fields.r_a.min'='1', " +
>>
> " 'fields.r_a.max'='5', " +
>>
> " 'fields.r_b.length'='5' " +
>>
> ")";
>> bsTableEnv.executeSql(rTable);
>>
>> String printTable = "CREATE
>TABLE print (" +
>>
> " l_a INT, " +
>>
> " l_b string, " +
>>
> " l_rt timestamp(3), " +
>>
> " r_a INT, " +
>>
> " r_b string, " +
>>
> " r_pt timestamp(3) " +
>>
> ") WITH ( " +
>>
> " 'connector' = 'print' " +
>>
> ") ";
>>
>>
>bsTableEnv.executeSql(printTable);
>>
>> // 运行成功
>>// Table joinTable =
>bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a =
>r_table.r_a and l_table.l_rt = r_table.r_pt");
>>
>> // 运行错误,提示Rowtime attributes
>must not be in the input rows of a regular join. As a workaround you can cast
>the time attributes of input tables to TIMESTAMP before.
>> Table joinTable =
>bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a =
>r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND
>r_table.r_pt + INTERVAL '5' SECOND");
>>
>> bsTableEnv.executeSql("insert
>into print select * from " + joinTable);
>>
>> }
>>
>>}