?????????????????????? https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
------------------ ???????? ------------------ ??????: "Leonard Xu"<xbjt...@gmail.com>; ????????: 2020??6??12??(??????) ????5:43 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: flink sql Temporal table join failed ????????????????????????flink??????????????time????????bug??1.10.1????????????(??????????1.11)???????????? ???? Leonard Xu > ?? 2020??6??12????17:38??Zhou Zach <wander...@163.com> ?????? > > > > > ??????1.10.0???? > > > > > > > > > ?? 2020-06-12 16:28:15??"Benchao Li" <libenc...@apache.org> ?????? >> ????????????????????????????????1.10.0??????????????1.10.1??????????????bug??????1.10.1?????????? >> >> Zhou Zach <wander...@163.com> ??2020??6??12?????? ????3:47?????? >> >>> ?????????? >>> SLF4J: Class path contains multiple SLF4J bindings. >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>> Was expecting one of: >>> "CURSOR" ... >>> "EXISTS" ... >>> "NOT" ... >>> "ROW" ... >>> "(" ... >>> "+" ... >>> "-" ... >>> <UNSIGNED_INTEGER_LITERAL> ... >>> <DECIMAL_NUMERIC_LITERAL> ... >>> <APPROX_NUMERIC_LITERAL> ... >>> <BINARY_STRING_LITERAL> ... >>> <PREFIXED_STRING_LITERAL> ... >>> <QUOTED_STRING> ... >>> <UNICODE_STRING_LITERAL> ... >>> "TRUE" ... >>> "FALSE" ... >>> "UNKNOWN" ... >>> "NULL" ... >>> <LBRACE_D> ... >>> <LBRACE_T> ... >>> <LBRACE_TS> ... >>> "DATE" ... >>> "TIME" <QUOTED_STRING> ... >>> "TIMESTAMP" ... >>> "INTERVAL" ... >>> "?" ... >>> "CAST" ... >>> "EXTRACT" ... >>> "POSITION" ... >>> "CONVERT" ... >>> "TRANSLATE" ... >>> "OVERLAY" ... >>> "FLOOR" ... >>> "CEIL" ... >>> "CEILING" ... >>> "SUBSTRING" ... >>> "TRIM" ... >>> "CLASSIFIER" ... >>> "MATCH_NUMBER" ... >>> "RUNNING" ... >>> "PREV" ... >>> "NEXT" ... >>> "JSON_EXISTS" ... >>> "JSON_VALUE" ... >>> "JSON_QUERY" ... >>> "JSON_OBJECT" ... >>> "JSON_OBJECTAGG" ... >>> "JSON_ARRAY" ... >>> "JSON_ARRAYAGG" ... >>> <LBRACE_FN> ... >>> "MULTISET" ... >>> "ARRAY" ... >>> "MAP" ... >>> "PERIOD" ... >>> "SPECIFIC" ... >>> <IDENTIFIER> ... >>> <QUOTED_IDENTIFIER> ... >>> <BACK_QUOTED_IDENTIFIER> ... >>> <BRACKET_QUOTED_IDENTIFIER> ... >>> <UNICODE_QUOTED_IDENTIFIER> ... >>> "ABS" ... >>> "AVG" ... >>> "CARDINALITY" ... >>> "CHAR_LENGTH" ... >>> "CHARACTER_LENGTH" ... >>> "COALESCE" ... >>> "COLLECT" ... >>> "COVAR_POP" ... >>> "COVAR_SAMP" ... >>> "CUME_DIST" ... >>> "COUNT" ... >>> "CURRENT_DATE" ... >>> "CURRENT_TIME" ... >>> "CURRENT_TIMESTAMP" ... >>> "DENSE_RANK" ... >>> "ELEMENT" ... >>> "EXP" ... >>> "FIRST_VALUE" ... >>> "FUSION" ... >>> "GROUPING" ... >>> "HOUR" ... >>> "LAG" ... >>> "LEAD" ... >>> "LEFT" ... >>> "LAST_VALUE" ... >>> "LN" ... >>> "LOCALTIME" ... >>> "LOCALTIMESTAMP" ... >>> "LOWER" ... >>> "MAX" ... >>> "MIN" ... >>> "MINUTE" ... >>> "MOD" ... >>> "MONTH" ... >>> "NTH_VALUE" ... >>> "NTILE" ... >>> "NULLIF" ... >>> "OCTET_LENGTH" ... >>> "PERCENT_RANK" ... >>> "POWER" ... >>> "RANK" ... >>> "REGR_COUNT" ... >>> "REGR_SXX" ... >>> "REGR_SYY" ... >>> "RIGHT" ... >>> "ROW_NUMBER" ... >>> "SECOND" ... >>> "SQRT" ... >>> "STDDEV_POP" ... >>> "STDDEV_SAMP" ... >>> "SUM" ... >>> "UPPER" ... >>> "TRUNCATE" ... >>> "USER" ... >>> "VAR_POP" ... >>> "VAR_SAMP" ... >>> "YEAR" ... >>> "CURRENT_CATALOG" ... >>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>> "CURRENT_PATH" ... >>> "CURRENT_ROLE" ... >>> "CURRENT_SCHEMA" ... >>> "CURRENT_USER" ... >>> "SESSION_USER" ... >>> "SYSTEM_USER" ... >>> "NEW" ... >>> "CASE" ... >>> "CURRENT" ... >>> >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>> at >>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> >>> query: >>> >>> >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |CREATE TABLE user_behavior ( >>> | uid VARCHAR, >>> | phoneType VARCHAR, >>> | clickCount INT, >>> | proctime AS PROCTIME(), >>> | `time` TIMESTAMP(3) >>> |) WITH ( >>> | 'connector.type' = 'kafka', >>> | 'connector.version' = 'universal', >>> | 'connector.topic' = 'user_behavior', >>> | 'connector.startup-mode' = 'earliest-offset', >>> | 'connector.properties.0.key' = 'zookeeper.connect', >>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>> | 'connector.properties.1.key' = 'bootstrap.servers', >>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'update-mode' = 'append', >>> | 'format.type' = 'json', >>> | 'format.derive-schema' = 'true' >>> |) >>> |""".stripMargin) >>> streamTableEnv.sqlUpdate( >>> """ >>> | >>> |insert into user_cnt >>> |SELECT >>> | cast(b.`time` as string), u.age >>> |FROM >>> | user_behavior AS b >>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u >>> | ON b.uid = u.uid >>> | >>> |""".stripMargin) >>> >>> >>> >>> >>> >>> >>> ??????PROCTIME() AS proctime ????select ??????????????????proctime AS PROCTIME() >>> ????select ???????????? >>> >>> >>> >>> >>> >>> >>> >>> >>> ?? 2020-06-12 15:29:49??"Benchao Li" <libenc...@apache.org> ?????? >>>> ????????????proctime AS PROCTIME()?? >>>> ????????????query??????AS?????????? >>>> >>>> Zhou Zach <wander...@163.com> ??2020??6??12?????? ????2:24?????? >>>> >>>>> flink 1.10.0: >>>>> ??create table??????PROCTIME() AS proctime???????? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> ?? 2020-06-12 14:08:11??"Benchao Li" <libenc...@apache.org> ?????? >>>>>> Hi?? >>>>>> >>>>>> Temporal Table join????????????????????????????????b.`time`???????????????????????????????????? >>>>>> ??????????[1] >>>>>> >>>>>> [1] >>>>>> >>>>> >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >>>>>> >>>>>> Zhou Zach <wander...@163.com> ??2020??6??12?????? ????1:33?????? >>>>>> >>>>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>>>> >>>>>>> SLF4J: Found binding in >>>>>>> >>>>> >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>> >>>>>>> SLF4J: Found binding in >>>>>>> >>>>> >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>>>> >>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>>>> explanation. >>>>>>> >>>>>>> SLF4J: Actual binding is of type >>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>>>>>> >>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default >>>>>>> configuration: logging only errors to the console. >>>>>>> >>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>> Cannot generate a valid execution plan for the given query: >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>>>>>> fields=[time, sum_age]) >>>>>>> >>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>>>>>> >>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>>>>>> >>>>>>> :- FlinkLogicalCalc(select=[uid, time]) >>>>>>> >>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid, >>>>> phoneType, >>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>>>>>> >>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>>>>>> >>>>>>> +- FlinkLogicalCalc(select=[uid, age]) >>>>>>> >>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, >>> sex, >>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' >>> left >>>>>>> table's proctime field, doesn't support 'PROCTIME()' >>>>>>> >>>>>>> Please check the documentation for the set of currently supported SQL >>>>>>> features. >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>> >>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>> >>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>>>>>> >>>>>>> at >>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>>>>>> >>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table >>>>> join >>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >>>>>>> field, doesn't support 'PROCTIME()' >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >>>>>>> >>>>>>> at >>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >>>>>>> >>>>>>> at >>>>>>> >>>>> >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >>>>>>> >>>>>>> ... 20 more >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> query: >>>>>>> >>>>>>> >>>>>>> val streamExecutionEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> val blinkEnvSettings = >>>>>>> >>>>> >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>> val streamTableEnv = >>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >>>>>>> >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |CREATE TABLE user_behavior ( >>>>>>> | uid VARCHAR, >>>>>>> | phoneType VARCHAR, >>>>>>> | clickCount INT, >>>>>>> | `time` TIMESTAMP(3) >>>>>>> |) WITH ( >>>>>>> | 'connector.type' = 'kafka', >>>>>>> | 'connector.version' = 'universal', >>>>>>> | 'connector.topic' = 'user_behavior', >>>>>>> | 'connector.startup-mode' = 'earliest-offset', >>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect', >>>>>>> | 'connector.properties.0.value' = >>>>> 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers', >>>>>>> | 'connector.properties.1.value' = >>>>> 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>>>> | 'update-mode' = 'append', >>>>>>> | 'format.type' = 'json', >>>>>>> | 'format.derive-schema' = 'true' >>>>>>> |) >>>>>>> |""".stripMargin) >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |CREATE TABLE user_cnt ( >>>>>>> | `time` VARCHAR, >>>>>>> | sum_age INT >>>>>>> |) WITH ( >>>>>>> | 'connector.type' = 'jdbc', >>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >>>>>>> | 'connector.table' = 'user_cnt', >>>>>>> | 'connector.username' = 'root', >>>>>>> | 'connector.password' = '123456', >>>>>>> | 'connector.write.flush.max-rows' = '1' >>>>>>> |) >>>>>>> |""".stripMargin) >>>>>>> val userTableSource = new MysqlAsyncLookupTableSource( >>>>>>> Array("uid", "sex", "age", "created_time"), >>>>>>> Array(), >>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >>>>>>> streamTableEnv.registerTableSource("users", userTableSource) >>>>>>> streamTableEnv.sqlUpdate( >>>>>>> """ >>>>>>> | >>>>>>> |insert into user_cnt >>>>>>> |SELECT >>>>>>> | cast(b.`time` as string), u.age >>>>>>> |FROM >>>>>>> | user_behavior AS b >>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >>>>>>> | ON b.uid = u.uid >>>>>>> | >>>>>>> |""".stripMargin) >>>>>>> streamTableEnv.execute("Temporal table join") >>>>> >>>