我是使用tableEnv.executeSql   执行那两个create 语句 然后用 tableEnv.sqlQuery  执行 select语句 
,然后报错, 没有再执行其他sql额 , aaa.columnInfos [ 'EVENT_ID' ].newValue 就是取出map里面 EVENT_ID 
的key的 newValue 值,从create 语句可以看出 这个是个string

 另外   tableEnv.explainSql(xxx) 也不能执行那个select语句,报的一样的错误, 我先在本地复现一下试试吧








在 2021-09-02 10:31:20,"Caizhi Weng" <[email protected]> 写道:
>Hi!
>
>没有复现这个问题。后续是还有其他 SQL 吗?特别是对 aaa.columnInfos [ 'EVENT_ID' ].newValue 有
>filter 的 SQL。
>
>可以尝试先在本地找一个能复现的 case,通过 tableEnv.explainSql() 就能快速检查能否产生 plan。如果找到了能复现的
>SQL,可以在 jira 提一个 issue。
>
>Wayne <[email protected]> 于2021年9月2日周四 上午10:43写道:
>
>> 表a
>> CREATE TABLE aaa (
>>
>>
>> `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue
>> STRING  ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT
>> NULL > NOT NULL>  NOT NULL,
>> .......
>> procTime AS PROCTIME()
>> ) WITH (
>>     'connector' = 'kafka' ,
>>     .....
>>     'format' = 'avro'
>> )
>>
>>
>>
>>
>> 表b
>>
>>
>> CREATE TABLE bbb (
>> `event_id` STRING,
>> `genre_id` STRING,
>> `user_guid` STRING,
>> ......
>> `uuid` STRING
>> ) WITH (
>>     'connector' = 'kafka' ,
>>     .....
>>     'format' = 'avro'
>> )
>>
>>
>>
>>
>>
>>
>> 我的sql
>>
>>
>> SELECT
>>     CONCAT_WS(
>>         '_',
>>         user_guid,
>>         CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey,
>>     columnInfos [ 'TICKET_COST' ].newValue AS cost,
>>     DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
>> 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate,
>>     columnInfos [ 'EVENT_ID' ].newValue AS eventid,
>>     columnInfos [ 'QUANTITY' ].newValue AS quantity,
>>     genre_id AS genreId,
>>     user_guid AS userGuid
>>     FROM
>>     aaa
>>    LEFT  JOIN bbb FOR SYSTEM_TIME AS OF aaa.procTime ON aaa.columnInfos [
>> 'EVENT_ID' ].newValue = bbb.event_id  and aaa.columnInfos [ 'EVENT_ID'
>> ].newValue is not null and  bbb.event_id is not null
>>
>>
>> 最后的  and aaa.columnInfos [ 'EVENT_ID' ].newValue is not null and
>> bbb.event_id is not null   无论加不加,都是报这个错误
>>
>> 在 2021-09-02 09:13:11,"Caizhi Weng" <[email protected]> 写道:
>> >Hi!
>> >
>> >Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 customer_id 或
>> >id 的 filter 条件?
>> >
>> >有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join
>> >这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。
>> >
>> >Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道:
>> >
>> >> 我的flink 版本是  flink-1.12.2-bin-scala_2.12
>> >> 我的sql 是
>> >> SELECT o.order_id, o.total, c.country, c.zip
>> >> FROM Orders AS o
>> >>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> >>     ON o.customer_id = c.id and o.customer_id is not null and  c.id is
>> >> not null ;
>> >> 或者
>> >> SELECT o.order_id, o.total, c.country, c.zip
>> >> FROM Orders AS o
>> >>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> >>     ON o.customer_id = c.id ;
>> >>
>> >> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢
>> >>
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Currently the join key in Temporal Table Join can not be empty.
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
>> >> at
>> >>
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>> >> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>> >> at
>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>> >> at
>> >>
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>> >> at
>> >>
>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>> >> at
>> >>
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>> >> at
>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
>> >> at
>> >>
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>> >> at
>> >>
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>> >> at scala.collection.Iterator.foreach(Iterator.scala:943)
>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
>> >> at
>> >>
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>> >> at
>> >>
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>> >> at scala.collection.immutable.Range.foreach(Range.scala:158)
>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>> >> at
>> >>
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>> >> at
>> >>
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>> >> at scala.collection.Iterator.foreach(Iterator.scala:943)
>> >> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>> >> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>> >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>> >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>

回复