Hi!

感谢持续反馈。这次确实在开发环境下复现了这个问题。

这是因为目前 event time temporal join 对 join key 的处理还不够完善,目前只能处理原原本本的输入列(比如直接用
uuid),暂时不能处理利用输入列做的运算(例如这里从 map 里取值)。我已经开了一个 issue 记录这个问题 [1]。一个绕过的方法是先在一个
view 里给 map 取值,比如这样:

CREATE TABLE A (
a MAP<STRING NOT NULL, INT>,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts
);

CREATE TABLE B (
id INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts,
PRIMARY KEY (id) NOT ENFORCED
);

CREATE VIEW myView AS SELECT A.a['ID'] AS id, ts FROM A;

SELECT * FROM myView AS a LEFT JOIN B FOR SYSTEM_TIME AS OF a.ts AS b ON
a.id = b.id;

后面那个 Temporal Table Join requires primary key in versioned table, but no
primary key can be found. 是说 event time temporal join 的维表必须定义 primary
key。这是预期行为,参见 [1] 中的 The event-time temporal join requires the primary key
contained in the equivalence condition of the temporal join condition.
Primary key 的定义方式见上述示例代码,也可以参见 [2]。

[1] https://issues.apache.org/jira/browse/FLINK-24239
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
[3]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/sql/create/#primary-key

Wayne <[email protected]> 于2021年9月9日周四 下午11:29写道:

> Hello
>
>
> 打扰了我最近再次尝试,还是报这个错误
> 我的flink版本为 flink-1.12.2-bin-scala_2.12
> 使用sql client执行
> 我的sql 如下
>
>
> CREATE TABLE stub_trans (
> `uuid` STRING,
> `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue
> STRING  ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT
> NULL > NOT NULL>  NOT NULL,
> procTime TIMESTAMP(3) METADATA FROM 'timestamp' ,
> WATERMARK FOR procTime AS procTime
> ) WITH (
>     'connector' = 'kafka',
> ..............
>     'format' = 'avro'
> );
>
>
> CREATE TABLE kapeng_test (
>
>
> `event_id` STRING,
> `genre_id` STRING,
> `user_guid` STRING,
> procTime TIMESTAMP(3) METADATA FROM 'timestamp'  ,
> WATERMARK FOR procTime AS procTime
> ) WITH (
>     'connector' = 'kafka',
> ........
>     'format' = 'avro'
> );
>
>
>
>
> CREATE TABLE purchase_hist (
>     rowkey STRING,
>     d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING,
> genreid STRING, quantity STRING >,
>     PRIMARY KEY ( rowkey ) NOT ENFORCED
> ) WITH (
>     'connector' = 'hbase-1.4',
> .........);
>
>
>
>
> INSERT INTO purchase_hist SELECT
> rowkey,
> ROW ( cost, crdate, currency, eventid, genreid, quantity )
> FROM
> ( SELECT
>     CONCAT_WS(
>         '_',
>         user_guid,
>         CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey,
>     columnInfos [ 'TICKET_COST' ].newValue AS cost,
>     DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate,
>     columnInfos [ 'EVENT_ID' ].newValue AS eventid,
>     columnInfos [ 'CURRENCY_CODE' ].newValue AS currency,
>     columnInfos [ 'QUANTITY' ].newValue AS quantity,
>     genre_id AS genreid,
>     user_guid AS userGuid
>     FROM
>     stub_trans
>    LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON
> stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m
>
>
>
>
> 报错如下
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Currently the join key in Temporal Table Join can not be empty.
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.immutable.Range.foreach(Range.scala:158)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>
>
>
>
>
>
> 如果我将
> LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON
> stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id
> 中的
>  stub_trans.columnInfos [ 'EVENT_ID' ].newValue  替换为 uuid  则这部分语句为
>
>
> LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON
> stub_trans.uuid = kapeng_test.event_id
>
>
>
>
> 则报错如下:
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Temporal Table Join requires primary key in versioned table, but no primary
> key can be found. The physical plan is:
> FlinkLogicalJoin(condition=[AND(=($0, $3),
> __INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($0),
> __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
>   FlinkLogicalCalc(select=[uuid, columnInfos, Reinterpret(CAST(timestamp))
> AS procTime])
>     FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
> stub_trans, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[uuid, columnInfos,
> timestamp])
>   FlinkLogicalSnapshot(period=[$cor0.procTime])
>     FlinkLogicalCalc(select=[event_id, genre_id, user_guid,
> Reinterpret(CAST(timestamp)) AS procTime])
>       FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, kapeng_test, watermark=[CAST($3):TIMESTAMP(3)]]],
> fields=[event_id, genre_id, user_guid, timestamp])
>
>
>
>
> at
> org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.org
> $apache$flink$table$planner$plan$rules$logical$TemporalJoinRewriteWithUniqueKeyRule$$validateRightPrimaryKey(TemporalJoinRewriteWithUniqueKeyRule.scala:127)
> at
> org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:88)
> at
> org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
> at
> org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:109)
> at
> org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at
> org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.onMatch(TemporalJoinRewriteWithUniqueKeyRule.scala:70)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>
>
> 请问这个sql 我应该怎么写
>
>
>
>
>
>
>
>
> ————————————————————————————————————分割线————————————————————————————————————————————————
>
>
> 在 2021-09-02 11:18:55,"Wayne" <[email protected]> 写道:
> >
> >
> >
> >我是使用tableEnv.executeSql   执行那两个create 语句 然后用 tableEnv.sqlQuery  执行
> select语句 ,然后报错, 没有再执行其他sql额 , aaa.columnInfos [ 'EVENT_ID' ].newValue
> 就是取出map里面 EVENT_ID 的key的 newValue 值,从create 语句可以看出 这个是个string
> >
> > 另外   tableEnv.explainSql(xxx) 也不能执行那个select语句,报的一样的错误, 我先在本地复现一下试试吧
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-09-02 10:31:20,"Caizhi Weng" <[email protected]> 写道:
> >>Hi!
> >>
> >>没有复现这个问题。后续是还有其他 SQL 吗?特别是对 aaa.columnInfos [ 'EVENT_ID' ].newValue 有
> >>filter 的 SQL。
> >>
> >>可以尝试先在本地找一个能复现的 case,通过 tableEnv.explainSql() 就能快速检查能否产生 plan。如果找到了能复现的
> >>SQL,可以在 jira 提一个 issue。
> >>
> >>Wayne <[email protected]> 于2021年9月2日周四 上午10:43写道:
> >>
> >>> 表a
> >>> CREATE TABLE aaa (
> >>>
> >>>
> >>> `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue
> >>> STRING  ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type
> STRING NOT
> >>> NULL > NOT NULL>  NOT NULL,
> >>> .......
> >>> procTime AS PROCTIME()
> >>> ) WITH (
> >>>     'connector' = 'kafka' ,
> >>>     .....
> >>>     'format' = 'avro'
> >>> )
> >>>
> >>>
> >>>
> >>>
> >>> 表b
> >>>
> >>>
> >>> CREATE TABLE bbb (
> >>> `event_id` STRING,
> >>> `genre_id` STRING,
> >>> `user_guid` STRING,
> >>> ......
> >>> `uuid` STRING
> >>> ) WITH (
> >>>     'connector' = 'kafka' ,
> >>>     .....
> >>>     'format' = 'avro'
> >>> )
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 我的sql
> >>>
> >>>
> >>> SELECT
> >>>     CONCAT_WS(
> >>>         '_',
> >>>         user_guid,
> >>>         CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS'
> ].newValue,
> >>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey,
> >>>     columnInfos [ 'TICKET_COST' ].newValue AS cost,
> >>>     DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
> >>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate,
> >>>     columnInfos [ 'EVENT_ID' ].newValue AS eventid,
> >>>     columnInfos [ 'QUANTITY' ].newValue AS quantity,
> >>>     genre_id AS genreId,
> >>>     user_guid AS userGuid
> >>>     FROM
> >>>     aaa
> >>>    LEFT  JOIN bbb FOR SYSTEM_TIME AS OF aaa.procTime ON
> aaa.columnInfos [
> >>> 'EVENT_ID' ].newValue = bbb.event_id  and aaa.columnInfos [ 'EVENT_ID'
> >>> ].newValue is not null and  bbb.event_id is not null
> >>>
> >>>
> >>> 最后的  and aaa.columnInfos [ 'EVENT_ID' ].newValue is not null and
> >>> bbb.event_id is not null   无论加不加,都是报这个错误
> >>>
> >>> 在 2021-09-02 09:13:11,"Caizhi Weng" <[email protected]> 写道:
> >>> >Hi!
> >>> >
> >>> >Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于
> customer_id 或
> >>> >id 的 filter 条件?
> >>> >
> >>> >有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join
> >>> >这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。
> >>> >
> >>> >Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道:
> >>> >
> >>> >> 我的flink 版本是  flink-1.12.2-bin-scala_2.12
> >>> >> 我的sql 是
> >>> >> SELECT o.order_id, o.total, c.country, c.zip
> >>> >> FROM Orders AS o
> >>> >>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> >>> >>     ON o.customer_id = c.id and o.customer_id is not null and  c.id
> is
> >>> >> not null ;
> >>> >> 或者
> >>> >> SELECT o.order_id, o.total, c.country, c.zip
> >>> >> FROM Orders AS o
> >>> >>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> >>> >>     ON o.customer_id = c.id ;
> >>> >>
> >>> >> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢
> >>> >>
> >>> >> Exception in thread "main"
> >>> org.apache.flink.table.api.ValidationException:
> >>> >> Currently the join key in Temporal Table Join can not be empty.
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
> >>> >> at
> >>> >>
> >>>
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> >>> >> at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> >>> >> at
> >>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> >>> >> at
> >>> >>
> >>>
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> >>> >> at
> >>> >>
> >>>
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> >>> >> at
> >>> >>
> >>>
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> >>> >> at
> >>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
> >>> >> at
> >>> >>
> >>>
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> >>> >> at
> >>> >>
> >>>
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> >>> >> at scala.collection.Iterator.foreach(Iterator.scala:943)
> >>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> >>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> >>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> >>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> >>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> >>> >> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> >>> >> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> >>> >> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
> >>> >> at
> >>> >>
> >>>
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> >>> >> at
> >>> >>
> >>>
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> >>> >> at scala.collection.immutable.Range.foreach(Range.scala:158)
> >>> >> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> >>> >> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> >>> >> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> >>> >> at
> >>> >>
> >>>
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> >>> >> at
> >>> >>
> >>>
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> >>> >> at scala.collection.Iterator.foreach(Iterator.scala:943)
> >>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> >>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> >>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> >>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> >>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> >>> >> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> >>> >> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> >>> >> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
> >>> >> at
> >>> >>
> >>>
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>>
>

回复