好的
在 2020-06-12 17:46:22,"咖啡泡油条" <9329...@qq.com> 写道: >可以参考之前的邮件列表 >https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E > > > > >------------------ 原始邮件 ------------------ >发件人: "Leonard Xu"<xbjt...@gmail.com>; >发送时间: 2020年6月12日(星期五) 下午5:43 >收件人: "user-zh"<user-zh@flink.apache.org>; > >主题: Re: flink sql Temporal table join failed > > > > >你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 > >祝好 >Leonard Xu > >> 在 2020年6月12日,17:38,Zhou Zach <wander...@163.com> 写道: >> >> >> >> >> 是的,1.10.0版本 >> >> >> >> >> >> >> >> >> 在 2020-06-12 16:28:15,"Benchao Li" <libenc...@apache.org> 写道: >>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 >>> >>> Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午3:47写道: >>> >>>> 还是不行, >>>> SLF4J: Class path contains multiple SLF4J bindings. >>>> SLF4J: Found binding in >>>> >[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>> SLF4J: Found binding in >>>> >[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for >an >>>> explanation. >>>> SLF4J: Actual binding is of type >>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>> ERROR StatusLogger No log4j2 configuration file found. Using >default >>>> configuration: logging only errors to the console. >>>> Exception in thread "main" >org.apache.flink.table.api.SqlParserException: >>>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>>> Was expecting one of: >>>> "CURSOR" ... >>>> "EXISTS" ... >>>> "NOT" ... >>>> "ROW" ... >>>> "(" ... >>>> "+" ... >>>> "-" ... >>>> <UNSIGNED_INTEGER_LITERAL> ... >>>> <DECIMAL_NUMERIC_LITERAL> ... >>>> <APPROX_NUMERIC_LITERAL> ... >>>> <BINARY_STRING_LITERAL> ... >>>> <PREFIXED_STRING_LITERAL> ... >>>> <QUOTED_STRING> ... >>>> <UNICODE_STRING_LITERAL> ... >>>> "TRUE" ... >>>> "FALSE" ... >>>> "UNKNOWN" ... >>>> "NULL" ... >>>> <LBRACE_D> ... >>>> <LBRACE_T> ... >>>> <LBRACE_TS> ... >>>> "DATE" ... >>>> "TIME" <QUOTED_STRING> ... >>>> "TIMESTAMP" ... >>>> "INTERVAL" ... >>>> "?" ... >>>> "CAST" ... >>>> "EXTRACT" ... >>>> "POSITION" ... >>>> "CONVERT" ... >>>> "TRANSLATE" ... >>>> "OVERLAY" ... >>>> "FLOOR" ... >>>> "CEIL" ... >>>> "CEILING" ... >>>> "SUBSTRING" ... >>>> "TRIM" ... >>>> "CLASSIFIER" ... >>>> "MATCH_NUMBER" ... >>>> "RUNNING" ... >>>> "PREV" ... >>>> "NEXT" ... >>>> "JSON_EXISTS" ... >>>> "JSON_VALUE" ... >>>> "JSON_QUERY" ... >>>> "JSON_OBJECT" ... >>>> "JSON_OBJECTAGG" ... >>>> "JSON_ARRAY" ... >>>> "JSON_ARRAYAGG" ... >>>> <LBRACE_FN> ... >>>> "MULTISET" ... >>>> "ARRAY" ... >>>> "MAP" ... >>>> "PERIOD" ... >>>> "SPECIFIC" ... >>>> <IDENTIFIER> ... >>>> <QUOTED_IDENTIFIER> ... >>>> <BACK_QUOTED_IDENTIFIER> ... >>>> <BRACKET_QUOTED_IDENTIFIER> ... >>>> <UNICODE_QUOTED_IDENTIFIER> ... >>>> "ABS" ... >>>> "AVG" ... >>>> "CARDINALITY" ... >>>> "CHAR_LENGTH" ... >>>> "CHARACTER_LENGTH" ... >>>> "COALESCE" ... >>>> "COLLECT" ... >>>> "COVAR_POP" ... >>>> "COVAR_SAMP" ... >>>> "CUME_DIST" ... >>>> "COUNT" ... >>>> "CURRENT_DATE" ... >>>> "CURRENT_TIME" ... >>>> "CURRENT_TIMESTAMP" ... >>>> "DENSE_RANK" ... >>>> "ELEMENT" ... >>>> "EXP" ... >>>> "FIRST_VALUE" ... >>>> "FUSION" ... >>>> "GROUPING" ... >>>> "HOUR" ... >>>> "LAG" ... >>>> "LEAD" ... >>>> "LEFT" ... >>>> "LAST_VALUE" ... >>>> "LN" ... >>>> "LOCALTIME" ... >>>> "LOCALTIMESTAMP" ... >>>> "LOWER" ... >>>> "MAX" ... >>>> "MIN" ... >>>> "MINUTE" ... >>>> "MOD" ... >>>> "MONTH" ... >>>> "NTH_VALUE" ... >>>> "NTILE" ... >>>> "NULLIF" ... >>>> "OCTET_LENGTH" ... >>>> "PERCENT_RANK" ... >>>> "POWER" ... >>>> "RANK" ... >>>> "REGR_COUNT" ... >>>> "REGR_SXX" ... >>>> "REGR_SYY" ... >>>> "RIGHT" ... >>>> "ROW_NUMBER" ... >>>> "SECOND" ... >>>> "SQRT" ... >>>> "STDDEV_POP" ... >>>> "STDDEV_SAMP" ... >>>> "SUM" ... >>>> "UPPER" ... >>>> "TRUNCATE" ... >>>> "USER" ... >>>> "VAR_POP" ... >>>> "VAR_SAMP" ... >>>> "YEAR" ... >>>> "CURRENT_CATALOG" ... >>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>>> "CURRENT_PATH" ... >>>> "CURRENT_ROLE" ... >>>> "CURRENT_SCHEMA" ... >>>> "CURRENT_USER" ... >>>> "SESSION_USER" ... >>>> "SYSTEM_USER" ... >>>> "NEW" ... >>>> "CASE" ... >>>> "CURRENT" ... >>>> >>>> at >>>> >org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>>> at >>>> >org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>>> at >>>> >org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>> at >>>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>> >$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>> at >>>> >org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>> at >>>> >org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>> at >>>> >org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>> at >>>> >org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>> at >>>> >org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >>>> at >>>> >org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >>>> at >>>> >org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>> at >>>> >org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>>> at >>>> >org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >>>> at >>>> >org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>> >>>> >>>> query: >>>> >>>> >>>> streamTableEnv.sqlUpdate( >>>> """ >>>> | >>>> |CREATE TABLE user_behavior ( >>>> | uid VARCHAR, >>>> | phoneType VARCHAR, >>>> | clickCount INT, >>>> | proctime AS PROCTIME(), >>>> | `time` TIMESTAMP(3) >>>> |) WITH ( >>>> | 'connector.type' = 'kafka', >>>> | 'connector.version' = >'universal', >>>> | 'connector.topic' = >'user_behavior', >>>> | 'connector.startup-mode' = >'earliest-offset', >>>> | >'connector.properties.0.key' = 'zookeeper.connect', >>>> | >'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>>> | >'connector.properties.1.key' = 'bootstrap.servers', >>>> | >'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>> | 'update-mode' = 'append', >>>> | 'format.type' = 'json', >>>> | 'format.derive-schema' = >'true' >>>> |) >>>> |""".stripMargin) >>>> streamTableEnv.sqlUpdate( >>>> """ >>>> | >>>> |insert into user_cnt >>>> |SELECT >>>> | cast(b.`time` as string), u.age >>>> |FROM >>>> | user_behavior AS b >>>> | JOIN users FOR SYSTEM_TIME AS OF >b.`proctime` AS u >>>> | ON b.uid = u.uid >>>> | >>>> |""".stripMargin) >>>> >>>> >>>> >>>> >>>> >>>> >>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() >>>> 放在select 后面也不行。 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2020-06-12 15:29:49,"Benchao Li" <libenc...@apache.org> 写道: >>>>> 你写反了,是proctime AS PROCTIME()。 >>>>> 计算列跟普通query里面的AS是反着的。 >>>>> >>>>> Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午2:24写道: >>>>> >>>>>> flink 1.10.0: >>>>>> 在create table中,加PROCTIME() AS proctime字段报错 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 在 2020-06-12 14:08:11,"Benchao Li" ><libenc...@apache.org> 写道: >>>>>>> Hi, >>>>>>> >>>>>>> Temporal Table >join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >>>>>>> 可以参考下[1] >>>>>>> >>>>>>> [1] >>>>>>> >>>>>> >>>> >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >>>>>>> >>>>>>> Zhou Zach <wander...@163.com> 于2020年6月12日周五 >下午1:33写道: >>>>>>> >>>>>>>> SLF4J: Class path contains multiple SLF4J >bindings. >>>>>>>> >>>>>>>> SLF4J: Found binding in >>>>>>>> >>>>>> >>>> >[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>>> >>>>>>>> SLF4J: Found binding in >>>>>>>> >>>>>> >>>> >[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>>> >>>>>>>> SLF4J: See >http://www.slf4j.org/codes.html#multiple_bindings for an >>>>>>>> explanation. >>>>>>>> >>>>>>>> SLF4J: Actual binding is of type >>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>>>>>> >>>>>>>> ERROR StatusLogger No log4j2 configuration file >found. Using default >>>>>>>> configuration: logging only errors to the console. >>>>>>>> >>>>>>>> Exception in thread "main" >org.apache.flink.table.api.TableException: >>>>>>>> Cannot generate a valid execution plan for the >given query: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>> >FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>>>>>>> fields=[time, sum_age]) >>>>>>>> >>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, >age]) >>>>>>>> >>>>>>>> +- FlinkLogicalJoin(condition=[=($0, >$2)], joinType=[inner]) >>>>>>>> >>>>>>>> :- >FlinkLogicalCalc(select=[uid, time]) >>>>>>>> >>>>>>>> : +- >FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>>> default_database, user_behavior, source: >[KafkaTableSource(uid, >>>>>> phoneType, >>>>>>>> clickCount, time)]]], fields=[uid, phoneType, >clickCount, time]) >>>>>>>> >>>>>>>> +- >FlinkLogicalSnapshot(period=[$cor0.time]) >>>>>>>> >>>>>>>> >+- FlinkLogicalCalc(select=[uid, age]) >>>>>>>> >>>>>>>> > +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>>> default_database, users, source: >[MysqlAsyncLookupTableSource(uid, >>>> sex, >>>>>>>> age, created_time)]]], fields=[uid, sex, age, >created_time]) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Temporal table join currently only supports 'FOR >SYSTEM_TIME AS OF' >>>> left >>>>>>>> table's proctime field, doesn't support >'PROCTIME()' >>>>>>>> >>>>>>>> Please check the documentation for the set of >currently supported SQL >>>>>>>> features. >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>> >>>>>>>> at >scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>> >>>>>>>> at >scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>> >>>>>>>> at >scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>>> >>>>>>>> at >>>> >scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>>>>>> >>>>>>>> Caused by: >org.apache.flink.table.api.TableException: Temporal table >>>>>> join >>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' >left table's proctime >>>>>>>> field, doesn't support 'PROCTIME()' >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>>>>>>> >>>>>>>> at >>>>>> >org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>>>>>>> >>>>>>>> at >>>>>>>> >>>>>> >>>> >org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>>>>>>> >>>>>>>> ... 20 more >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> query: >>>>>>>> >>>>>>>> >>>>>>>> val streamExecutionEnv = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> val blinkEnvSettings = >>>>>>>> >>>>>> >>>> >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>>> val streamTableEnv = >>>>>>>> >StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>>>>>>> >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |CREATE TABLE user_behavior ( >>>>>>>> | uid VARCHAR, >>>>>>>> | phoneType >VARCHAR, >>>>>>>> | clickCount >INT, >>>>>>>> | `time` >TIMESTAMP(3) >>>>>>>> |) WITH ( >>>>>>>> | >'connector.type' = 'kafka', >>>>>>>> | >'connector.version' = 'universal', >>>>>>>> | >'connector.topic' = 'user_behavior', >>>>>>>> | >'connector.startup-mode' = 'earliest-offset', >>>>>>>> | >'connector.properties.0.key' = 'zookeeper.connect', >>>>>>>> | >'connector.properties.0.value' = >>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>>>>> | >'connector.properties.1.key' = 'bootstrap.servers', >>>>>>>> | >'connector.properties.1.value' = >>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>>>>> | >'update-mode' = 'append', >>>>>>>> | >'format.type' = 'json', >>>>>>>> | >'format.derive-schema' = 'true' >>>>>>>> |) >>>>>>>> |""".stripMargin) >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |CREATE TABLE user_cnt ( >>>>>>>> | `time` >VARCHAR, >>>>>>>> | sum_age INT >>>>>>>> |) WITH ( >>>>>>>> | >'connector.type' = 'jdbc', >>>>>>>> | >'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>>>>>>> | >'connector.table' = 'user_cnt', >>>>>>>> | >'connector.username' = 'root', >>>>>>>> | >'connector.password' = '123456', >>>>>>>> | >'connector.write.flush.max-rows' = '1' >>>>>>>> |) >>>>>>>> |""".stripMargin) >>>>>>>> val userTableSource = new >MysqlAsyncLookupTableSource( >>>>>>>> Array("uid", "sex", "age", "created_time"), >>>>>>>> Array(), >>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, >Types.STRING)) >>>>>>>> streamTableEnv.registerTableSource("users", >userTableSource) >>>>>>>> streamTableEnv.sqlUpdate( >>>>>>>> """ >>>>>>>> | >>>>>>>> |insert into user_cnt >>>>>>>> |SELECT >>>>>>>> | cast(b.`time` as >string), u.age >>>>>>>> |FROM >>>>>>>> | user_behavior AS b >>>>>>>> | JOIN users FOR >SYSTEM_TIME AS OF b.`time` AS u >>>>>>>> | ON b.uid = u.uid >>>>>>>> | >>>>>>>> |""".stripMargin) >>>>>>>> streamTableEnv.execute("Temporal table join") >>>>>> >>>>