Hi! 感谢持续反馈。这次确实在开发环境下复现了这个问题。
这是因为目前 event time temporal join 对 join key 的处理还不够完善,目前只能处理原原本本的输入列(比如直接用 uuid),暂时不能处理利用输入列做的运算(例如这里从 map 里取值)。我已经开了一个 issue 记录这个问题 [1]。一个绕过的方法是先在一个 view 里给 map 取值,比如这样: CREATE TABLE A ( a MAP<STRING NOT NULL, INT>, ts TIMESTAMP(3), WATERMARK FOR ts AS ts ); CREATE TABLE B ( id INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts, PRIMARY KEY (id) NOT ENFORCED ); CREATE VIEW myView AS SELECT A.a['ID'] AS id, ts FROM A; SELECT * FROM myView AS a LEFT JOIN B FOR SYSTEM_TIME AS OF a.ts AS b ON a.id = b.id; 后面那个 Temporal Table Join requires primary key in versioned table, but no primary key can be found. 是说 event time temporal join 的维表必须定义 primary key。这是预期行为,参见 [1] 中的 The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition. Primary key 的定义方式见上述示例代码,也可以参见 [2]。 [1] https://issues.apache.org/jira/browse/FLINK-24239 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/sql/create/#primary-key Wayne <[email protected]> 于2021年9月9日周四 下午11:29写道: > Hello > > > 打扰了我最近再次尝试,还是报这个错误 > 我的flink版本为 flink-1.12.2-bin-scala_2.12 > 使用sql client执行 > 我的sql 如下 > > > CREATE TABLE stub_trans ( > `uuid` STRING, > `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue > STRING ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT > NULL > NOT NULL> NOT NULL, > procTime TIMESTAMP(3) METADATA FROM 'timestamp' , > WATERMARK FOR procTime AS procTime > ) WITH ( > 'connector' = 'kafka', > .............. > 'format' = 'avro' > ); > > > CREATE TABLE kapeng_test ( > > > `event_id` STRING, > `genre_id` STRING, > `user_guid` STRING, > procTime TIMESTAMP(3) METADATA FROM 'timestamp' , > WATERMARK FOR procTime AS procTime > ) WITH ( > 'connector' = 'kafka', > ........ > 'format' = 'avro' > ); > > > > > CREATE TABLE purchase_hist ( > rowkey STRING, > d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, > genreid STRING, quantity STRING >, > PRIMARY KEY ( rowkey ) NOT ENFORCED > ) WITH ( > 'connector' = 'hbase-1.4', > .........); > > > > > INSERT INTO purchase_hist SELECT > rowkey, > ROW ( cost, crdate, currency, eventid, genreid, quantity ) > FROM > ( SELECT > CONCAT_WS( > '_', > user_guid, > CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, > 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey, > columnInfos [ 'TICKET_COST' ].newValue AS cost, > DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, > 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate, > columnInfos [ 'EVENT_ID' ].newValue AS eventid, > columnInfos [ 'CURRENCY_CODE' ].newValue AS currency, > columnInfos [ 'QUANTITY' ].newValue AS quantity, > genre_id AS genreid, > user_guid AS userGuid > FROM > stub_trans > LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON > stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m > > > > > 报错如下 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Currently the join key in Temporal Table Join can not be empty. > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.immutable.Range.foreach(Range.scala:158) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) > > > > > > > 如果我将 > LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON > stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id > 中的 > stub_trans.columnInfos [ 'EVENT_ID' ].newValue 替换为 uuid 则这部分语句为 > > > LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON > stub_trans.uuid = kapeng_test.event_id > > > > > 则报错如下: > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Temporal Table Join requires primary key in versioned table, but no primary > key can be found. The physical plan is: > FlinkLogicalJoin(condition=[AND(=($0, $3), > __INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($0), > __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left]) > FlinkLogicalCalc(select=[uuid, columnInfos, Reinterpret(CAST(timestamp)) > AS procTime]) > FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, > stub_trans, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[uuid, columnInfos, > timestamp]) > FlinkLogicalSnapshot(period=[$cor0.procTime]) > FlinkLogicalCalc(select=[event_id, genre_id, user_guid, > Reinterpret(CAST(timestamp)) AS procTime]) > FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, kapeng_test, watermark=[CAST($3):TIMESTAMP(3)]]], > fields=[event_id, genre_id, user_guid, timestamp]) > > > > > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.org > $apache$flink$table$planner$plan$rules$logical$TemporalJoinRewriteWithUniqueKeyRule$$validateRightPrimaryKey(TemporalJoinRewriteWithUniqueKeyRule.scala:127) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:88) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) > at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) > at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:109) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.onMatch(TemporalJoinRewriteWithUniqueKeyRule.scala:70) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) > > > 请问这个sql 我应该怎么写 > > > > > > > > > ————————————————————————————————————分割线———————————————————————————————————————————————— > > > 在 2021-09-02 11:18:55,"Wayne" <[email protected]> 写道: > > > > > > > >我是使用tableEnv.executeSql 执行那两个create 语句 然后用 tableEnv.sqlQuery 执行 > select语句 ,然后报错, 没有再执行其他sql额 , aaa.columnInfos [ 'EVENT_ID' ].newValue > 就是取出map里面 EVENT_ID 的key的 newValue 值,从create 语句可以看出 这个是个string > > > > 另外 tableEnv.explainSql(xxx) 也不能执行那个select语句,报的一样的错误, 我先在本地复现一下试试吧 > > > > > > > > > > > > > > > > > >在 2021-09-02 10:31:20,"Caizhi Weng" <[email protected]> 写道: > >>Hi! > >> > >>没有复现这个问题。后续是还有其他 SQL 吗?特别是对 aaa.columnInfos [ 'EVENT_ID' ].newValue 有 > >>filter 的 SQL。 > >> > >>可以尝试先在本地找一个能复现的 case,通过 tableEnv.explainSql() 就能快速检查能否产生 plan。如果找到了能复现的 > >>SQL,可以在 jira 提一个 issue。 > >> > >>Wayne <[email protected]> 于2021年9月2日周四 上午10:43写道: > >> > >>> 表a > >>> CREATE TABLE aaa ( > >>> > >>> > >>> `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue > >>> STRING ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type > STRING NOT > >>> NULL > NOT NULL> NOT NULL, > >>> ....... > >>> procTime AS PROCTIME() > >>> ) WITH ( > >>> 'connector' = 'kafka' , > >>> ..... > >>> 'format' = 'avro' > >>> ) > >>> > >>> > >>> > >>> > >>> 表b > >>> > >>> > >>> CREATE TABLE bbb ( > >>> `event_id` STRING, > >>> `genre_id` STRING, > >>> `user_guid` STRING, > >>> ...... > >>> `uuid` STRING > >>> ) WITH ( > >>> 'connector' = 'kafka' , > >>> ..... > >>> 'format' = 'avro' > >>> ) > >>> > >>> > >>> > >>> > >>> > >>> > >>> 我的sql > >>> > >>> > >>> SELECT > >>> CONCAT_WS( > >>> '_', > >>> user_guid, > >>> CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' > ].newValue, > >>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey, > >>> columnInfos [ 'TICKET_COST' ].newValue AS cost, > >>> DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, > >>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate, > >>> columnInfos [ 'EVENT_ID' ].newValue AS eventid, > >>> columnInfos [ 'QUANTITY' ].newValue AS quantity, > >>> genre_id AS genreId, > >>> user_guid AS userGuid > >>> FROM > >>> aaa > >>> LEFT JOIN bbb FOR SYSTEM_TIME AS OF aaa.procTime ON > aaa.columnInfos [ > >>> 'EVENT_ID' ].newValue = bbb.event_id and aaa.columnInfos [ 'EVENT_ID' > >>> ].newValue is not null and bbb.event_id is not null > >>> > >>> > >>> 最后的 and aaa.columnInfos [ 'EVENT_ID' ].newValue is not null and > >>> bbb.event_id is not null 无论加不加,都是报这个错误 > >>> > >>> 在 2021-09-02 09:13:11,"Caizhi Weng" <[email protected]> 写道: > >>> >Hi! > >>> > > >>> >Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 > customer_id 或 > >>> >id 的 filter 条件? > >>> > > >>> >有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join > >>> >这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。 > >>> > > >>> >Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道: > >>> > > >>> >> 我的flink 版本是 flink-1.12.2-bin-scala_2.12 > >>> >> 我的sql 是 > >>> >> SELECT o.order_id, o.total, c.country, c.zip > >>> >> FROM Orders AS o > >>> >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > >>> >> ON o.customer_id = c.id and o.customer_id is not null and c.id > is > >>> >> not null ; > >>> >> 或者 > >>> >> SELECT o.order_id, o.total, c.country, c.zip > >>> >> FROM Orders AS o > >>> >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > >>> >> ON o.customer_id = c.id ; > >>> >> > >>> >> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢 > >>> >> > >>> >> Exception in thread "main" > >>> org.apache.flink.table.api.ValidationException: > >>> >> Currently the join key in Temporal Table Join can not be empty. > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) > >>> >> at > >>> >> > >>> > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > >>> >> at > org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > >>> >> at > >>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > >>> >> at > >>> >> > >>> > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > >>> >> at > >>> >> > >>> > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > >>> >> at > >>> >> > >>> > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > >>> >> at > >>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) > >>> >> at > >>> >> > >>> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > >>> >> at > >>> >> > >>> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > >>> >> at scala.collection.Iterator.foreach(Iterator.scala:943) > >>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943) > >>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > >>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) > >>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > >>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > >>> >> at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > >>> >> at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > >>> >> at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) > >>> >> at > >>> >> > >>> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > >>> >> at > >>> >> > >>> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > >>> >> at scala.collection.immutable.Range.foreach(Range.scala:158) > >>> >> at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > >>> >> at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > >>> >> at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > >>> >> at > >>> >> > >>> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > >>> >> at > >>> >> > >>> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > >>> >> at scala.collection.Iterator.foreach(Iterator.scala:943) > >>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943) > >>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > >>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) > >>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > >>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > >>> >> at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > >>> >> at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > >>> >> at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > >>> >> at > >>> >> > >>> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) > >>> >> at > >>> >> > >>> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > >>> >> at > >>> >> > >>> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) > >>> >> at > >>> >> > >>> > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >
