Hi Wayne : 请查考Flink Join 官方文档, 右侧构建侧 需要定义 PRIMARY KEY 同时 在join 条件的时候带上。 参考地址: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html#%E5%9F%BA%E4%BA%8E%E4%BA%8B%E4%BB%B6%E6%97%B6%E9%97%B4%E7%9A%84%E6%97%B6%E6%80%81-join
内容: 注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。 注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。 [email protected] 发件人: Wayne 发送时间: 2021-09-09 23:29 收件人: user-zh 主题: Re:Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty Hello 打扰了我最近再次尝试,还是报这个错误 我的flink版本为 flink-1.12.2-bin-scala_2.12 使用sql client执行 我的sql 如下 CREATE TABLE stub_trans ( `uuid` STRING, `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue STRING ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT NULL > NOT NULL> NOT NULL, procTime TIMESTAMP(3) METADATA FROM 'timestamp' , WATERMARK FOR procTime AS procTime ) WITH ( 'connector' = 'kafka', .............. 'format' = 'avro' ); CREATE TABLE kapeng_test ( `event_id` STRING, `genre_id` STRING, `user_guid` STRING, procTime TIMESTAMP(3) METADATA FROM 'timestamp' , WATERMARK FOR procTime AS procTime ) WITH ( 'connector' = 'kafka', ........ 'format' = 'avro' ); CREATE TABLE purchase_hist ( rowkey STRING, d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, genreid STRING, quantity STRING >, PRIMARY KEY ( rowkey ) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', .........); INSERT INTO purchase_hist SELECT rowkey, ROW ( cost, crdate, currency, eventid, genreid, quantity ) FROM ( SELECT CONCAT_WS( '_', user_guid, CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey, columnInfos [ 'TICKET_COST' ].newValue AS cost, DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate, columnInfos [ 'EVENT_ID' ].newValue AS eventid, columnInfos [ 'CURRENCY_CODE' ].newValue AS currency, columnInfos [ 'QUANTITY' ].newValue AS quantity, genre_id AS genreid, user_guid AS userGuid FROM stub_trans LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m 报错如下 Exception in thread "main" org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.immutable.Range.foreach(Range.scala:158) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) 如果我将 LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id 中的 stub_trans.columnInfos [ 'EVENT_ID' ].newValue 替换为 uuid 则这部分语句为 LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON stub_trans.uuid = kapeng_test.event_id 则报错如下: Exception in thread "main" org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is: FlinkLogicalJoin(condition=[AND(=($0, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left]) FlinkLogicalCalc(select=[uuid, columnInfos, Reinterpret(CAST(timestamp)) AS procTime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stub_trans, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[uuid, columnInfos, timestamp]) FlinkLogicalSnapshot(period=[$cor0.procTime]) FlinkLogicalCalc(select=[event_id, genre_id, user_guid, Reinterpret(CAST(timestamp)) AS procTime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, kapeng_test, watermark=[CAST($3):TIMESTAMP(3)]]], fields=[event_id, genre_id, user_guid, timestamp]) at org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.org$apache$flink$table$planner$plan$rules$logical$TemporalJoinRewriteWithUniqueKeyRule$$validateRightPrimaryKey(TemporalJoinRewriteWithUniqueKeyRule.scala:127) at org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:88) at org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:109) at org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.onMatch(TemporalJoinRewriteWithUniqueKeyRule.scala:70) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) 请问这个sql 我应该怎么写 ――――――――――――――――――――――――――――――――――――分割线―――――――――――――――――――――――――――――――――――――――――――――――― 在 2021-09-02 11:18:55,"Wayne" <[email protected]> 写道: > > > >我是使用tableEnv.executeSql 执行那两个create 语句 然后用 tableEnv.sqlQuery 执行 select语句 >,然后报错, 没有再执行其他sql额 , aaa.columnInfos [ 'EVENT_ID' ].newValue 就是取出map里面 >EVENT_ID 的key的 newValue 值,从create 语句可以看出 这个是个string > > 另外 tableEnv.explainSql(xxx) 也不能执行那个select语句,报的一样的错误, 我先在本地复现一下试试吧 > > > > > > > > >在 2021-09-02 10:31:20,"Caizhi Weng" <[email protected]> 写道: >>Hi! >> >>没有复现这个问题。后续是还有其他 SQL 吗?特别是对 aaa.columnInfos [ 'EVENT_ID' ].newValue 有 >>filter 的 SQL。 >> >>可以尝试先在本地找一个能复现的 case,通过 tableEnv.explainSql() 就能快速检查能否产生 plan。如果找到了能复现的 >>SQL,可以在 jira 提一个 issue。 >> >>Wayne <[email protected]> 于2021年9月2日周四 上午10:43写道: >> >>> 表a >>> CREATE TABLE aaa ( >>> >>> >>> `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue >>> STRING ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT >>> NULL > NOT NULL> NOT NULL, >>> ....... >>> procTime AS PROCTIME() >>> ) WITH ( >>> 'connector' = 'kafka' , >>> ..... >>> 'format' = 'avro' >>> ) >>> >>> >>> >>> >>> 表b >>> >>> >>> CREATE TABLE bbb ( >>> `event_id` STRING, >>> `genre_id` STRING, >>> `user_guid` STRING, >>> ...... >>> `uuid` STRING >>> ) WITH ( >>> 'connector' = 'kafka' , >>> ..... >>> 'format' = 'avro' >>> ) >>> >>> >>> >>> >>> >>> >>> 我的sql >>> >>> >>> SELECT >>> CONCAT_WS( >>> '_', >>> user_guid, >>> CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, >>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey, >>> columnInfos [ 'TICKET_COST' ].newValue AS cost, >>> DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, >>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate, >>> columnInfos [ 'EVENT_ID' ].newValue AS eventid, >>> columnInfos [ 'QUANTITY' ].newValue AS quantity, >>> genre_id AS genreId, >>> user_guid AS userGuid >>> FROM >>> aaa >>> LEFT JOIN bbb FOR SYSTEM_TIME AS OF aaa.procTime ON aaa.columnInfos [ >>> 'EVENT_ID' ].newValue = bbb.event_id and aaa.columnInfos [ 'EVENT_ID' >>> ].newValue is not null and bbb.event_id is not null >>> >>> >>> 最后的 and aaa.columnInfos [ 'EVENT_ID' ].newValue is not null and >>> bbb.event_id is not null 无论加不加,都是报这个错误 >>> >>> 在 2021-09-02 09:13:11,"Caizhi Weng" <[email protected]> 写道: >>> >Hi! >>> > >>> >Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 customer_id 或 >>> >id 的 filter 条件? >>> > >>> >有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join >>> >这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。 >>> > >>> >Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道: >>> > >>> >> 我的flink 版本是 flink-1.12.2-bin-scala_2.12 >>> >> 我的sql 是 >>> >> SELECT o.order_id, o.total, c.country, c.zip >>> >> FROM Orders AS o >>> >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c >>> >> ON o.customer_id = c.id and o.customer_id is not null and c.id is >>> >> not null ; >>> >> 或者 >>> >> SELECT o.order_id, o.total, c.country, c.zip >>> >> FROM Orders AS o >>> >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c >>> >> ON o.customer_id = c.id ; >>> >> >>> >> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢 >>> >> >>> >> Exception in thread "main" >>> org.apache.flink.table.api.ValidationException: >>> >> Currently the join key in Temporal Table Join can not be empty. >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) >>> >> at >>> >> >>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) >>> >> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) >>> >> at >>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) >>> >> at >>> >> >>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) >>> >> at >>> >> >>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) >>> >> at >>> >> >>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) >>> >> at >>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) >>> >> at >>> >> >>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) >>> >> at >>> >> >>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) >>> >> at scala.collection.Iterator.foreach(Iterator.scala:943) >>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943) >>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) >>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) >>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) >>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) >>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) >>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) >>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) >>> >> at >>> >> >>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) >>> >> at >>> >> >>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) >>> >> at scala.collection.immutable.Range.foreach(Range.scala:158) >>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) >>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) >>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) >>> >> at >>> >> >>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) >>> >> at >>> >> >>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) >>> >> at scala.collection.Iterator.foreach(Iterator.scala:943) >>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943) >>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) >>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) >>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) >>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) >>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) >>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) >>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) >>> >> at >>> >> >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>> >> at >>> >> >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) >>> >> at >>> >> >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) >>> >> at >>> >> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) >>> >> at >>> >> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) >>> >> at >>> >> >>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>>
