??????????????????????
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E




------------------ ???????? ------------------
??????:&nbsp;"Leonard Xu"<xbjt...@gmail.com&gt;;
????????:&nbsp;2020??6??12??(??????) ????5:43
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: flink sql Temporal table join failed




????????????????????????flink??????????????time????????bug??1.10.1????????????(??????????1.11)????????????

????
Leonard Xu

&gt; ?? 2020??6??12????17:38??Zhou Zach <wander...@163.com&gt; ??????
&gt; 
&gt; 
&gt; 
&gt; 
&gt; ??????1.10.0????
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; ?? 2020-06-12 16:28:15??"Benchao Li" <libenc...@apache.org&gt; ??????
&gt;&gt; 
????????????????????????????????1.10.0??????????????1.10.1??????????????bug??????1.10.1??????????
&gt;&gt; 
&gt;&gt; Zhou Zach <wander...@163.com&gt; ??2020??6??12?????? ????3:47??????
&gt;&gt; 
&gt;&gt;&gt; ??????????
&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt; 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt; 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
&gt;&gt;&gt; explanation.
&gt;&gt;&gt; SLF4J: Actual binding is of type
&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using 
default
&gt;&gt;&gt; configuration: logging only errors to the console.
&gt;&gt;&gt; Exception in thread "main" 
org.apache.flink.table.api.SqlParserException:
&gt;&gt;&gt; SQL parse failed. Encountered "time FROM" at line 1, column 44.
&gt;&gt;&gt; Was expecting one of:
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURSOR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXISTS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NOT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "(" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "+" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "-" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNSIGNED_INTEGER_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <DECIMAL_NUMERIC_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <APPROX_NUMERIC_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BINARY_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <PREFIXED_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_STRING&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FALSE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UNKNOWN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_D&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_T&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_TS&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIME" <QUOTED_STRING&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "INTERVAL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "?" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CAST" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXTRACT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POSITION" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CONVERT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRANSLATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OVERLAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FLOOR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEIL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEILING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUBSTRING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRIM" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CLASSIFIER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MATCH_NUMBER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RUNNING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PREV" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEXT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_EXISTS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_QUERY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECTAGG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAYAGG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_FN&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MULTISET" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ARRAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERIOD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SPECIFIC" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BACK_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BRACKET_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ABS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "AVG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CARDINALITY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHAR_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHARACTER_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COALESCE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COLLECT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CUME_DIST" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COUNT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIME" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DENSE_RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ELEMENT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FIRST_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FUSION" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "GROUPING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "HOUR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEAD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEFT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAST_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIME" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOWER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAX" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MIN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MINUTE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MOD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MONTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTH_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTILE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULLIF" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OCTET_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERCENT_RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POWER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_COUNT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SXX" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SYY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RIGHT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW_NUMBER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SECOND" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SQRT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUM" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UPPER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUNCATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "YEAR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_CATALOG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_PATH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_ROLE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_SCHEMA" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SESSION_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SYSTEM_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEW" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CASE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT" ...
&gt;&gt;&gt; 
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
&gt;&gt;&gt; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt;&gt;&gt; 
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
&gt;&gt;&gt; at
&gt;&gt;&gt; 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; query:
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; proctime AS PROCTIME(),
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 
'universal',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 
'user_behavior',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 
'earliest-offset',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' 
= 'zookeeper.connect',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' 
= 'bootstrap.servers',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 
'true'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF 
b.`proctime` AS u
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; ??????PROCTIME() AS proctime ????select ??????????????????proctime 
AS PROCTIME()
&gt;&gt;&gt; ????select ????????????
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; ?? 2020-06-12 15:29:49??"Benchao Li" <libenc...@apache.org&gt; 
??????
&gt;&gt;&gt;&gt; ????????????proctime AS PROCTIME()??
&gt;&gt;&gt;&gt; ????????????query??????AS??????????
&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt; Zhou Zach <wander...@163.com&gt; ??2020??6??12?????? 
????2:24??????
&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; flink 1.10.0:
&gt;&gt;&gt;&gt;&gt; ??create table??????PROCTIME() AS proctime????????
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; ?? 2020-06-12 14:08:11??"Benchao Li" 
<libenc...@apache.org&gt; ??????
&gt;&gt;&gt;&gt;&gt;&gt; Hi??
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; Temporal Table 
join????????????????????????????????b.`time`????????????????????????????????????
&gt;&gt;&gt;&gt;&gt;&gt; ??????????[1]
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; [1]
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; Zhou Zach <wander...@163.com&gt; ??2020??6??12?????? 
????1:33??????
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: See 
http://www.slf4j.org/codes.html#multiple_bindings for an
&gt;&gt;&gt;&gt;&gt;&gt;&gt; explanation.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Actual binding is of type
&gt;&gt;&gt;&gt;&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file 
found. Using default
&gt;&gt;&gt;&gt;&gt;&gt;&gt; configuration: logging only errors to the console.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Exception in thread "main" 
org.apache.flink.table.api.TableException:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Cannot generate a valid execution plan for the 
given query:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
&gt;&gt;&gt;&gt;&gt;&gt;&gt; fields=[time, sum_age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, 
age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp; +- FlinkLogicalJoin(condition=[=($0, 
$2)], joinType=[inner])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :- 
FlinkLogicalCalc(select=[uid, time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :&nbsp; +- 
FlinkLogicalTableSourceScan(table=[[default_catalog,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, user_behavior, source: 
[KafkaTableSource(uid,
&gt;&gt;&gt;&gt;&gt; phoneType,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; clickCount, time)]]], fields=[uid, phoneType, 
clickCount, time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- 
FlinkLogicalSnapshot(period=[$cor0.time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- 
FlinkLogicalCalc(select=[uid, age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 +- FlinkLogicalTableSourceScan(table=[[default_catalog,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, users, source: 
[MysqlAsyncLookupTableSource(uid,
&gt;&gt;&gt; sex,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; age, created_time)]]], fields=[uid, sex, age, 
created_time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Temporal table join currently only supports 'FOR 
SYSTEM_TIME AS OF'
&gt;&gt;&gt; left
&gt;&gt;&gt;&gt;&gt;&gt;&gt; table's proctime field, doesn't support 
'PROCTIME()'
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Please check the documentation for the set of 
currently supported SQL
&gt;&gt;&gt;&gt;&gt;&gt;&gt; features.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt; 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Caused by: 
org.apache.flink.table.api.TableException: Temporal table
&gt;&gt;&gt;&gt;&gt; join
&gt;&gt;&gt;&gt;&gt;&gt;&gt; currently only supports 'FOR SYSTEM_TIME AS OF' 
left table's proctime
&gt;&gt;&gt;&gt;&gt;&gt;&gt; field, doesn't support 'PROCTIME()'
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt; 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; ... 20 more
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; query:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamExecutionEnv =
&gt;&gt;&gt;&gt;&gt; StreamExecutionEnvironment.getExecutionEnvironment
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val blinkEnvSettings =
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamTableEnv =
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType 
VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount 
INT,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` 
TIMESTAMP(3)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.type' = 'kafka',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.version' = 'universal',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.topic' = 'user_behavior',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.startup-mode' = 'earliest-offset',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.properties.0.key' = 'zookeeper.connect',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.properties.0.value' =
&gt;&gt;&gt;&gt;&gt; 'cdh1:2181,cdh2:2181,cdh3:2181',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.properties.1.key' = 'bootstrap.servers',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.properties.1.value' =
&gt;&gt;&gt;&gt;&gt; 'cdh1:9092,cdh2:9092,cdh3:9092',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'update-mode' = 'append',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'format.type' = 'json',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'format.derive-schema' = 'true'
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_cnt (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` 
VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; sum_age INT
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.type' = 'jdbc',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.table' = 'user_cnt',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.username' = 'root',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.password' = '123456',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 
'connector.write.flush.max-rows' = '1'
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val userTableSource = new 
MysqlAsyncLookupTableSource(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array("uid", "sex", "age", "created_time"),
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(),
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(Types.STRING, Types.STRING, Types.INT, 
Types.STRING))
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.registerTableSource("users", 
userTableSource)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as 
string), u.age
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR 
SYSTEM_TIME AS OF b.`time` AS u
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.execute("Temporal table join")
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;

回复