Hi Wayne :
   请查考Flink Join 官方文档, 右侧构建侧 需要定义  PRIMARY KEY 同时 在join 条件的时候带上。
参考地址: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html#%E5%9F%BA%E4%BA%8E%E4%BA%8B%E4%BB%B6%E6%97%B6%E9%97%B4%E7%9A%84%E6%97%B6%E6%80%81-join
 

内容: 
注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。
注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 
P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。



[email protected]
 
发件人: Wayne
发送时间: 2021-09-09 23:29
收件人: user-zh
主题: Re:Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table 
Join can not be empty
Hello 
 
 
打扰了我最近再次尝试,还是报这个错误
我的flink版本为 flink-1.12.2-bin-scala_2.12
使用sql client执行
我的sql 如下  
 
 
CREATE TABLE stub_trans (
`uuid` STRING,
`columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue STRING  
,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT NULL > NOT 
NULL>  NOT NULL,
procTime TIMESTAMP(3) METADATA FROM 'timestamp' , 
WATERMARK FOR procTime AS procTime   
) WITH (
    'connector' = 'kafka',
..............
    'format' = 'avro'
);
 
 
CREATE TABLE kapeng_test (
 
 
`event_id` STRING,
`genre_id` STRING,
`user_guid` STRING,
procTime TIMESTAMP(3) METADATA FROM 'timestamp'  ,  
WATERMARK FOR procTime AS procTime   
) WITH (
    'connector' = 'kafka',
........
    'format' = 'avro'
);
 
 
 
 
CREATE TABLE purchase_hist (
    rowkey STRING,
    d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, 
genreid STRING, quantity STRING >,
    PRIMARY KEY ( rowkey ) NOT ENFORCED 
) WITH ( 
    'connector' = 'hbase-1.4', 
.........);
 
 
 
 
INSERT INTO purchase_hist SELECT
rowkey,
ROW ( cost, crdate, currency, eventid, genreid, quantity ) 
FROM
( SELECT
    CONCAT_WS(
        '_',
        user_guid,
        CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey,
    columnInfos [ 'TICKET_COST' ].newValue AS cost,
    DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate,
    columnInfos [ 'EVENT_ID' ].newValue AS eventid,
    columnInfos [ 'CURRENCY_CODE' ].newValue AS currency,
    columnInfos [ 'QUANTITY' ].newValue AS quantity,
    genre_id AS genreid,
    user_guid AS userGuid 
    FROM
    stub_trans
   LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON 
stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m
 
 
 
 
报错如下
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Currently the join key in Temporal Table Join can not be empty.
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
 
 
 
 
 
 
如果我将
LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON 
stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id
中的 
stub_trans.columnInfos [ 'EVENT_ID' ].newValue  替换为 uuid  则这部分语句为
 
 
LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON 
stub_trans.uuid = kapeng_test.event_id
 
 
 
 
则报错如下:
 
 
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Temporal Table Join requires primary key in versioned table, but no primary key 
can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($0, $3), 
__INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($0), 
__TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
  FlinkLogicalCalc(select=[uuid, columnInfos, Reinterpret(CAST(timestamp)) AS 
procTime])
    FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
stub_trans, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[uuid, columnInfos, 
timestamp])
  FlinkLogicalSnapshot(period=[$cor0.procTime])
    FlinkLogicalCalc(select=[event_id, genre_id, user_guid, 
Reinterpret(CAST(timestamp)) AS procTime])
      FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
kapeng_test, watermark=[CAST($3):TIMESTAMP(3)]]], fields=[event_id, genre_id, 
user_guid, timestamp])
 
 
 
 
at 
org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.org$apache$flink$table$planner$plan$rules$logical$TemporalJoinRewriteWithUniqueKeyRule$$validateRightPrimaryKey(TemporalJoinRewriteWithUniqueKeyRule.scala:127)
at 
org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:88)
at 
org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at 
org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:109)
at 
org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.onMatch(TemporalJoinRewriteWithUniqueKeyRule.scala:70)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
 
 
请问这个sql 我应该怎么写
 
 
 
 
 
 
 
――――――――――――――――――――――――――――――――――――分割线――――――――――――――――――――――――――――――――――――――――――――――――
 
 
在 2021-09-02 11:18:55,"Wayne" <[email protected]> 写道:
>
>
>
>我是使用tableEnv.executeSql   执行那两个create 语句 然后用 tableEnv.sqlQuery  执行 select语句 
>,然后报错, 没有再执行其他sql额 , aaa.columnInfos [ 'EVENT_ID' ].newValue 就是取出map里面 
>EVENT_ID 的key的 newValue 值,从create 语句可以看出 这个是个string
>
> 另外   tableEnv.explainSql(xxx) 也不能执行那个select语句,报的一样的错误, 我先在本地复现一下试试吧
>
>
>
>
>
>
>
>
>在 2021-09-02 10:31:20,"Caizhi Weng" <[email protected]> 写道:
>>Hi!
>>
>>没有复现这个问题。后续是还有其他 SQL 吗?特别是对 aaa.columnInfos [ 'EVENT_ID' ].newValue 有
>>filter 的 SQL。
>>
>>可以尝试先在本地找一个能复现的 case,通过 tableEnv.explainSql() 就能快速检查能否产生 plan。如果找到了能复现的
>>SQL,可以在 jira 提一个 issue。
>>
>>Wayne <[email protected]> 于2021年9月2日周四 上午10:43写道:
>>
>>> 表a
>>> CREATE TABLE aaa (
>>>
>>>
>>> `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue
>>> STRING  ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT
>>> NULL > NOT NULL>  NOT NULL,
>>> .......
>>> procTime AS PROCTIME()
>>> ) WITH (
>>>     'connector' = 'kafka' ,
>>>     .....
>>>     'format' = 'avro'
>>> )
>>>
>>>
>>>
>>>
>>> 表b
>>>
>>>
>>> CREATE TABLE bbb (
>>> `event_id` STRING,
>>> `genre_id` STRING,
>>> `user_guid` STRING,
>>> ......
>>> `uuid` STRING
>>> ) WITH (
>>>     'connector' = 'kafka' ,
>>>     .....
>>>     'format' = 'avro'
>>> )
>>>
>>>
>>>
>>>
>>>
>>>
>>> 我的sql
>>>
>>>
>>> SELECT
>>>     CONCAT_WS(
>>>         '_',
>>>         user_guid,
>>>         CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
>>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey,
>>>     columnInfos [ 'TICKET_COST' ].newValue AS cost,
>>>     DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
>>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate,
>>>     columnInfos [ 'EVENT_ID' ].newValue AS eventid,
>>>     columnInfos [ 'QUANTITY' ].newValue AS quantity,
>>>     genre_id AS genreId,
>>>     user_guid AS userGuid
>>>     FROM
>>>     aaa
>>>    LEFT  JOIN bbb FOR SYSTEM_TIME AS OF aaa.procTime ON aaa.columnInfos [
>>> 'EVENT_ID' ].newValue = bbb.event_id  and aaa.columnInfos [ 'EVENT_ID'
>>> ].newValue is not null and  bbb.event_id is not null
>>>
>>>
>>> 最后的  and aaa.columnInfos [ 'EVENT_ID' ].newValue is not null and
>>> bbb.event_id is not null   无论加不加,都是报这个错误
>>>
>>> 在 2021-09-02 09:13:11,"Caizhi Weng" <[email protected]> 写道:
>>> >Hi!
>>> >
>>> >Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 customer_id 或
>>> >id 的 filter 条件?
>>> >
>>> >有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join
>>> >这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。
>>> >
>>> >Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道:
>>> >
>>> >> 我的flink 版本是  flink-1.12.2-bin-scala_2.12
>>> >> 我的sql 是
>>> >> SELECT o.order_id, o.total, c.country, c.zip
>>> >> FROM Orders AS o
>>> >>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>>> >>     ON o.customer_id = c.id and o.customer_id is not null and  c.id is
>>> >> not null ;
>>> >> 或者
>>> >> SELECT o.order_id, o.total, c.country, c.zip
>>> >> FROM Orders AS o
>>> >>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>>> >>     ON o.customer_id = c.id ;
>>> >>
>>> >> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢
>>> >>
>>> >> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException:
>>> >> Currently the join key in Temporal Table Join can not be empty.
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
>>> >> at
>>> >>
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>>> >> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>>> >> at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>>> >> at
>>> >>
>>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>>> >> at
>>> >>
>>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>>> >> at
>>> >>
>>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>>> >> at
>>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
>>> >> at
>>> >>
>>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>>> >> at
>>> >>
>>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>>> >> at scala.collection.Iterator.foreach(Iterator.scala:943)
>>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
>>> >> at
>>> >>
>>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>>> >> at
>>> >>
>>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>>> >> at scala.collection.immutable.Range.foreach(Range.scala:158)
>>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>>> >> at
>>> >>
>>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>>> >> at
>>> >>
>>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>>> >> at scala.collection.Iterator.foreach(Iterator.scala:943)
>>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
>>> >> at
>>> >>
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
>>> >> at
>>> >>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>>> >> at
>>> >>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
>>> >> at
>>> >>
>>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>>

回复