感谢提醒
















在 2020-06-12 17:43:20,"Leonard Xu" <xbjt...@gmail.com> 写道:
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach <wander...@163.com> 写道:
>> 
>> 
>> 
>> 
>> 是的,1.10.0版本
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-12 16:28:15,"Benchao Li" <libenc...@apache.org> 写道:
>>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>> 
>>> Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午3:47写道:
>>> 
>>>> 还是不行,
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type
>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>> configuration: logging only errors to the console.
>>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>>> Was expecting one of:
>>>>    "CURSOR" ...
>>>>    "EXISTS" ...
>>>>    "NOT" ...
>>>>    "ROW" ...
>>>>    "(" ...
>>>>    "+" ...
>>>>    "-" ...
>>>>    <UNSIGNED_INTEGER_LITERAL> ...
>>>>    <DECIMAL_NUMERIC_LITERAL> ...
>>>>    <APPROX_NUMERIC_LITERAL> ...
>>>>    <BINARY_STRING_LITERAL> ...
>>>>    <PREFIXED_STRING_LITERAL> ...
>>>>    <QUOTED_STRING> ...
>>>>    <UNICODE_STRING_LITERAL> ...
>>>>    "TRUE" ...
>>>>    "FALSE" ...
>>>>    "UNKNOWN" ...
>>>>    "NULL" ...
>>>>    <LBRACE_D> ...
>>>>    <LBRACE_T> ...
>>>>    <LBRACE_TS> ...
>>>>    "DATE" ...
>>>>    "TIME" <QUOTED_STRING> ...
>>>>    "TIMESTAMP" ...
>>>>    "INTERVAL" ...
>>>>    "?" ...
>>>>    "CAST" ...
>>>>    "EXTRACT" ...
>>>>    "POSITION" ...
>>>>    "CONVERT" ...
>>>>    "TRANSLATE" ...
>>>>    "OVERLAY" ...
>>>>    "FLOOR" ...
>>>>    "CEIL" ...
>>>>    "CEILING" ...
>>>>    "SUBSTRING" ...
>>>>    "TRIM" ...
>>>>    "CLASSIFIER" ...
>>>>    "MATCH_NUMBER" ...
>>>>    "RUNNING" ...
>>>>    "PREV" ...
>>>>    "NEXT" ...
>>>>    "JSON_EXISTS" ...
>>>>    "JSON_VALUE" ...
>>>>    "JSON_QUERY" ...
>>>>    "JSON_OBJECT" ...
>>>>    "JSON_OBJECTAGG" ...
>>>>    "JSON_ARRAY" ...
>>>>    "JSON_ARRAYAGG" ...
>>>>    <LBRACE_FN> ...
>>>>    "MULTISET" ...
>>>>    "ARRAY" ...
>>>>    "MAP" ...
>>>>    "PERIOD" ...
>>>>    "SPECIFIC" ...
>>>>    <IDENTIFIER> ...
>>>>    <QUOTED_IDENTIFIER> ...
>>>>    <BACK_QUOTED_IDENTIFIER> ...
>>>>    <BRACKET_QUOTED_IDENTIFIER> ...
>>>>    <UNICODE_QUOTED_IDENTIFIER> ...
>>>>    "ABS" ...
>>>>    "AVG" ...
>>>>    "CARDINALITY" ...
>>>>    "CHAR_LENGTH" ...
>>>>    "CHARACTER_LENGTH" ...
>>>>    "COALESCE" ...
>>>>    "COLLECT" ...
>>>>    "COVAR_POP" ...
>>>>    "COVAR_SAMP" ...
>>>>    "CUME_DIST" ...
>>>>    "COUNT" ...
>>>>    "CURRENT_DATE" ...
>>>>    "CURRENT_TIME" ...
>>>>    "CURRENT_TIMESTAMP" ...
>>>>    "DENSE_RANK" ...
>>>>    "ELEMENT" ...
>>>>    "EXP" ...
>>>>    "FIRST_VALUE" ...
>>>>    "FUSION" ...
>>>>    "GROUPING" ...
>>>>    "HOUR" ...
>>>>    "LAG" ...
>>>>    "LEAD" ...
>>>>    "LEFT" ...
>>>>    "LAST_VALUE" ...
>>>>    "LN" ...
>>>>    "LOCALTIME" ...
>>>>    "LOCALTIMESTAMP" ...
>>>>    "LOWER" ...
>>>>    "MAX" ...
>>>>    "MIN" ...
>>>>    "MINUTE" ...
>>>>    "MOD" ...
>>>>    "MONTH" ...
>>>>    "NTH_VALUE" ...
>>>>    "NTILE" ...
>>>>    "NULLIF" ...
>>>>    "OCTET_LENGTH" ...
>>>>    "PERCENT_RANK" ...
>>>>    "POWER" ...
>>>>    "RANK" ...
>>>>    "REGR_COUNT" ...
>>>>    "REGR_SXX" ...
>>>>    "REGR_SYY" ...
>>>>    "RIGHT" ...
>>>>    "ROW_NUMBER" ...
>>>>    "SECOND" ...
>>>>    "SQRT" ...
>>>>    "STDDEV_POP" ...
>>>>    "STDDEV_SAMP" ...
>>>>    "SUM" ...
>>>>    "UPPER" ...
>>>>    "TRUNCATE" ...
>>>>    "USER" ...
>>>>    "VAR_POP" ...
>>>>    "VAR_SAMP" ...
>>>>    "YEAR" ...
>>>>    "CURRENT_CATALOG" ...
>>>>    "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>>    "CURRENT_PATH" ...
>>>>    "CURRENT_ROLE" ...
>>>>    "CURRENT_SCHEMA" ...
>>>>    "CURRENT_USER" ...
>>>>    "SESSION_USER" ...
>>>>    "SYSTEM_USER" ...
>>>>    "NEW" ...
>>>>    "CASE" ...
>>>>    "CURRENT" ...
>>>> 
>>>> at
>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>>> at
>>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>> 
>>>> 
>>>> query:
>>>> 
>>>> 
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>>    |
>>>>    |CREATE TABLE user_behavior (
>>>>    |    uid VARCHAR,
>>>>    |    phoneType VARCHAR,
>>>>    |    clickCount INT,
>>>>    |    proctime AS PROCTIME(),
>>>>    |    `time` TIMESTAMP(3)
>>>>    |) WITH (
>>>>    |    'connector.type' = 'kafka',
>>>>    |    'connector.version' = 'universal',
>>>>    |    'connector.topic' = 'user_behavior',
>>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>>    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>>    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>    |    'update-mode' = 'append',
>>>>    |    'format.type' = 'json',
>>>>    |    'format.derive-schema' = 'true'
>>>>    |)
>>>>    |""".stripMargin)
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>>    |
>>>>    |insert into  user_cnt
>>>>    |SELECT
>>>>    |  cast(b.`time` as string), u.age
>>>>    |FROM
>>>>    |  user_behavior AS b
>>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>>>    |  ON b.uid = u.uid
>>>>    |
>>>>    |""".stripMargin)
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>>> 放在select 后面也不行。
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2020-06-12 15:29:49,"Benchao Li" <libenc...@apache.org> 写道:
>>>>> 你写反了,是proctime AS PROCTIME()。
>>>>> 计算列跟普通query里面的AS是反着的。
>>>>> 
>>>>> Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午2:24写道:
>>>>> 
>>>>>> flink 1.10.0:
>>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <libenc...@apache.org> 写道:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>>> 可以参考下[1]
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>>> 
>>>>>>> Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>>> 
>>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>>> 
>>>>>>>> SLF4J: Found binding in
>>>>>>>> 
>>>>>> 
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>> 
>>>>>>>> SLF4J: Found binding in
>>>>>>>> 
>>>>>> 
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>> 
>>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>>> explanation.
>>>>>>>> 
>>>>>>>> SLF4J: Actual binding is of type
>>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>>> 
>>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>>> configuration: logging only errors to the console.
>>>>>>>> 
>>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>>> fields=[time, sum_age])
>>>>>>>> 
>>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>>> 
>>>>>>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>>> 
>>>>>>>>      :- FlinkLogicalCalc(select=[uid, time])
>>>>>>>> 
>>>>>>>>      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>>> phoneType,
>>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>>> 
>>>>>>>>      +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>>> 
>>>>>>>>         +- FlinkLogicalCalc(select=[uid, age])
>>>>>>>> 
>>>>>>>>            +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>>> sex,
>>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>>> left
>>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>>> 
>>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>>> features.
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>> 
>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>> 
>>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>> 
>>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>> 
>>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>>> 
>>>>>>>> at
>>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>>> 
>>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>>> join
>>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>>> 
>>>>>>>> at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>>> 
>>>>>>>> ... 20 more
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> query:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> val streamExecutionEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val blinkEnvSettings =
>>>>>>>> 
>>>>>> 
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>>> val streamTableEnv =
>>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>>> 
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>>    |
>>>>>>>>    |CREATE TABLE user_behavior (
>>>>>>>>    |    uid VARCHAR,
>>>>>>>>    |    phoneType VARCHAR,
>>>>>>>>    |    clickCount INT,
>>>>>>>>    |    `time` TIMESTAMP(3)
>>>>>>>>    |) WITH (
>>>>>>>>    |    'connector.type' = 'kafka',
>>>>>>>>    |    'connector.version' = 'universal',
>>>>>>>>    |    'connector.topic' = 'user_behavior',
>>>>>>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>>>>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>>>    |    'connector.properties.0.value' =
>>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>>>    |    'connector.properties.1.value' =
>>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>>>    |    'update-mode' = 'append',
>>>>>>>>    |    'format.type' = 'json',
>>>>>>>>    |    'format.derive-schema' = 'true'
>>>>>>>>    |)
>>>>>>>>    |""".stripMargin)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>>    |
>>>>>>>>    |CREATE TABLE user_cnt (
>>>>>>>>    |    `time` VARCHAR,
>>>>>>>>    |    sum_age INT
>>>>>>>>    |) WITH (
>>>>>>>>    |    'connector.type' = 'jdbc',
>>>>>>>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>>>    |    'connector.table' = 'user_cnt',
>>>>>>>>    |    'connector.username' = 'root',
>>>>>>>>    |    'connector.password' = '123456',
>>>>>>>>    |    'connector.write.flush.max-rows' = '1'
>>>>>>>>    |)
>>>>>>>>    |""".stripMargin)
>>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>>> Array(),
>>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>>    |
>>>>>>>>    |insert into  user_cnt
>>>>>>>>    |SELECT
>>>>>>>>    |  cast(b.`time` as string), u.age
>>>>>>>>    |FROM
>>>>>>>>    |  user_behavior AS b
>>>>>>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>>>    |  ON b.uid = u.uid
>>>>>>>>    |
>>>>>>>>    |""".stripMargin)
>>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>>> 
>>>> 

回复