HI,大家好,我有一个问题,想请教一下:
问题描述:
我有两张表(来自kafka 的流) 
  table1(EVENTTIME,NEW_EVENT_ID,F4,F6),table2(EVENTTIME,NEW_EVENT_ID,F2,F3)     
两个表 都在 EVENTTIME 定义了rowtime 属性

我想把两个流用UNION ALL并去重,如下语句:

Table id_distinct = tableEnv.sqlQuery("select distinct 
EVENTTIME,NEW_EVENT_IDfrom (select EVENTTIME,NEW_EVENT_ID FROM table1 union all 
select EVENTTIME,NEW_EVENT_ID FROM table2)");

问题:它会报如下异常,请问怎么修复这个问题,谢谢!
Exception in thread "main" java.lang.AssertionError: Type mismatch:
rowtype of new rel:
RecordType(TIMESTAMP(3) NOT NULL EVENTTIME, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NEW_EVENT_ID) NOT NULL
rowtype of set:
RecordType(TIMESTAMP(3) NOT NULL EVENTTIME, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NEW_EVENT_ID) NOT NULL
at org.apache.calcite.util.Litmus$1.fail(Litmus.java:31)
at org.apache.calcite.plan.RelOptUtil.equal(RelOptUtil.java:1857)
at org.apache.calcite.plan.volcano.RelSubset.add(RelSubset.java:276)
at org.apache.calcite.plan.volcano.RelSet.add(RelSet.java:148)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1633)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
at 
org.apache.calcite.rel.rules.ProjectSetOpTransposeRule.onMatch(ProjectSetOpTransposeRule.java:109)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)
at com.nsn.flink.service.DealRegisterFile12.main(DealRegisterFile12.java:80)




回复