Re:回复: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
好的

















在 2020-06-12 17:46:22,"咖啡泡油条" <9329...@qq.com> 写道:
>可以参考之前的邮件列表
>https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
>
>
>
>
>--原始邮件--
>发件人:"Leonard Xu"发送时间:2020年6月12日(星期五) 下午5:43
>收件人:"user-zh"
>主题:Re: flink sql Temporal table join failed
>
>
>
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
> 在 2020年6月12日,17:38,Zhou Zach  
> 
> 
> 
> 是的,1.10.0版本
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-12 16:28:15,"Benchao Li"  看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
> 
> Zhou Zach  
> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> 
>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> 
>[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for 
>an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using 
>default
> configuration: logging only errors to the console.
> Exception in thread "main" 
>org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
> "CURSOR" ...
> "EXISTS" ...
> "NOT" ...
> "ROW" ...
> "(" ...
> "+" ...
> "-" ...
>"TRUE" ...
> "FALSE" ...
> "UNKNOWN" ...
> "NULL" ...
>"DATE" ...
> "TIME"  "TIMESTAMP" ...
> "INTERVAL" ...
> "?" ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "NEXT" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
>  "MULTISET" ...
> "ARRAY" ...
> "MAP" ...
> "PERIOD" ...
> "SPECIFIC" ...
>  "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "CURRENT_DATE" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "USER" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_USER" ...
> "SESSION_USER" ...
>

?????? flink sql Temporal table join failed

2020-06-12 文章 ??????????
??
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E




----
??:"Leonard Xu"http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type
 [org.apache.logging.slf4j.Log4jLoggerFactory]
 ERROR StatusLogger No log4j2 configuration file found. Using 
default
 configuration: logging only errors to the console.
 Exception in thread "main" 
org.apache.flink.table.api.SqlParserException:
 SQL parse failed. Encountered "time FROM" at line 1, column 44.
 Was expecting one of:
 "CURSOR" ...
 "EXISTS" ...
 "NOT" ...
 "ROW" ...
 "(" ...
 "+" ...
 "-" ...
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
 
 Zhou Zach http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 
 SLF4J: Actual binding is of type
 [org.apache.logging.slf4j.Log4jLoggerFactory]
 
 ERROR StatusLogger No log4j2 configuration file 
found. Using default
 configuration: logging only errors to the console.
 
 Exception in thread "main" 
org.apache.flink.table.api.TableException:
 Cannot generate a valid execution plan for the 
given query:
 
 
 
 
 
 
FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
 fields=[time, sum_age])
 
 +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, 
age])
 
 +- FlinkLogicalJoin(condition=[=($0, 
$2)], joinType=[inner])
 
 :- 
FlinkLogicalCalc(select=[uid, time])
 
 : +- 
FlinkLogicalTableSourceScan(table=[[default_catalog,
 default_database, user_behavior, source: 
[KafkaTableSource(uid,
 phoneType,
 clickCount, time)]]], fields=[uid, phoneType, 
clickCount, time])
 
 +- 
FlinkLogicalSnapshot(period=[$cor0.time])
 
 +- 
FlinkLogicalCalc(select=[uid, age])
 

 +- FlinkLogicalTableSourceScan(table=[[default_catalog,
 default_database, users, source: 
[MysqlAsyncLookupTableSource(uid,
 sex,
 age, created_time)]]], fields=[uid, sex, age, 
created_time])
 
 
 
 
 Temporal table join currently only supports 'FOR 
SYSTEM_TIME AS OF'
 left
 table's proctime field, doesn't support 
'PROCTIME()'
 
 Please check the documentation for the set of 
currently supported SQL
 features.
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 
 at
 
 
 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 
 at
 
 
 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 
 at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
 
 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 
 at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 
 at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 
 at
 
 
 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
 
 at
 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 
 at
 
 
 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
 
 at
 
 
 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
 
 at
 
 
 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
 
 at
 
 
 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
 
 at
 
 
 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
 
 at
 
 
 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
 
 Caused by: 
org.apache.flink.table.api.TableException: Temporal table
 join
 currently only supports 'FOR SYSTEM_TIME AS OF' 
left table's proctime
 field, doesn't support 'PROCTIME()'
 
 at
 
 
 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
 
 at
 
 
 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
 
 at
 
 
 

Re:Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
感谢提醒

















在 2020-06-12 17:43:20,"Leonard Xu"  写道:
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach  写道:
>> 
>> 
>> 
>> 
>> 是的,1.10.0版本
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-12 16:28:15,"Benchao Li"  写道:
>>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>> 
>>> Zhou Zach  于2020年6月12日周五 下午3:47写道:
>>> 
 还是不行,
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type
 [org.apache.logging.slf4j.Log4jLoggerFactory]
 ERROR StatusLogger No log4j2 configuration file found. Using default
 configuration: logging only errors to the console.
 Exception in thread "main" org.apache.flink.table.api.SqlParserException:
 SQL parse failed. Encountered "time FROM" at line 1, column 44.
 Was expecting one of:
"CURSOR" ...
"EXISTS" ...
"NOT" ...
"ROW" ...
"(" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
 ...
 ...
 ...
"DATE" ...
"TIME"  ...
"TIMESTAMP" ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
 ...
"MULTISET" ...
"ARRAY" ...
"MAP" ...
"PERIOD" ...
"SPECIFIC" ...
 ...
 ...
 ...
 ...
 ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...
 
 at
 org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
 at
 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
 at
 org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
 at
 

Re: flink sql Temporal table join failed

2020-06-12 文章 Leonard Xu


你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。

祝好
Leonard Xu

> 在 2020年6月12日,17:38,Zhou Zach  写道:
> 
> 
> 
> 
> 是的,1.10.0版本
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-12 16:28:15,"Benchao Li"  写道:
>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>> 
>> Zhou Zach  于2020年6月12日周五 下午3:47写道:
>> 
>>> 还是不行,
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>> Was expecting one of:
>>>"CURSOR" ...
>>>"EXISTS" ...
>>>"NOT" ...
>>>"ROW" ...
>>>"(" ...
>>>"+" ...
>>>"-" ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>>"TRUE" ...
>>>"FALSE" ...
>>>"UNKNOWN" ...
>>>"NULL" ...
>>> ...
>>> ...
>>> ...
>>>"DATE" ...
>>>"TIME"  ...
>>>"TIMESTAMP" ...
>>>"INTERVAL" ...
>>>"?" ...
>>>"CAST" ...
>>>"EXTRACT" ...
>>>"POSITION" ...
>>>"CONVERT" ...
>>>"TRANSLATE" ...
>>>"OVERLAY" ...
>>>"FLOOR" ...
>>>"CEIL" ...
>>>"CEILING" ...
>>>"SUBSTRING" ...
>>>"TRIM" ...
>>>"CLASSIFIER" ...
>>>"MATCH_NUMBER" ...
>>>"RUNNING" ...
>>>"PREV" ...
>>>"NEXT" ...
>>>"JSON_EXISTS" ...
>>>"JSON_VALUE" ...
>>>"JSON_QUERY" ...
>>>"JSON_OBJECT" ...
>>>"JSON_OBJECTAGG" ...
>>>"JSON_ARRAY" ...
>>>"JSON_ARRAYAGG" ...
>>> ...
>>>"MULTISET" ...
>>>"ARRAY" ...
>>>"MAP" ...
>>>"PERIOD" ...
>>>"SPECIFIC" ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>>"ABS" ...
>>>"AVG" ...
>>>"CARDINALITY" ...
>>>"CHAR_LENGTH" ...
>>>"CHARACTER_LENGTH" ...
>>>"COALESCE" ...
>>>"COLLECT" ...
>>>"COVAR_POP" ...
>>>"COVAR_SAMP" ...
>>>"CUME_DIST" ...
>>>"COUNT" ...
>>>"CURRENT_DATE" ...
>>>"CURRENT_TIME" ...
>>>"CURRENT_TIMESTAMP" ...
>>>"DENSE_RANK" ...
>>>"ELEMENT" ...
>>>"EXP" ...
>>>"FIRST_VALUE" ...
>>>"FUSION" ...
>>>"GROUPING" ...
>>>"HOUR" ...
>>>"LAG" ...
>>>"LEAD" ...
>>>"LEFT" ...
>>>"LAST_VALUE" ...
>>>"LN" ...
>>>"LOCALTIME" ...
>>>"LOCALTIMESTAMP" ...
>>>"LOWER" ...
>>>"MAX" ...
>>>"MIN" ...
>>>"MINUTE" ...
>>>"MOD" ...
>>>"MONTH" ...
>>>"NTH_VALUE" ...
>>>"NTILE" ...
>>>"NULLIF" ...
>>>"OCTET_LENGTH" ...
>>>"PERCENT_RANK" ...
>>>"POWER" ...
>>>"RANK" ...
>>>"REGR_COUNT" ...
>>>"REGR_SXX" ...
>>>"REGR_SYY" ...
>>>"RIGHT" ...
>>>"ROW_NUMBER" ...
>>>"SECOND" ...
>>>"SQRT" ...
>>>"STDDEV_POP" ...
>>>"STDDEV_SAMP" ...
>>>"SUM" ...
>>>"UPPER" ...
>>>"TRUNCATE" ...
>>>"USER" ...
>>>"VAR_POP" ...
>>>"VAR_SAMP" ...
>>>"YEAR" ...
>>>"CURRENT_CATALOG" ...
>>>"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>"CURRENT_PATH" ...
>>>"CURRENT_ROLE" ...
>>>"CURRENT_SCHEMA" ...
>>>"CURRENT_USER" ...
>>>"SESSION_USER" ...
>>>"SYSTEM_USER" ...
>>>"NEW" ...
>>>"CASE" ...
>>>"CURRENT" ...
>>> 
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>> at
>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> 

Re:Re: Re: Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach



是的,1.10.0版本








在 2020-06-12 16:28:15,"Benchao Li"  写道:
>看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>
>Zhou Zach  于2020年6月12日周五 下午3:47写道:
>
>> 还是不行,
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>> Was expecting one of:
>> "CURSOR" ...
>> "EXISTS" ...
>> "NOT" ...
>> "ROW" ...
>> "(" ...
>> "+" ...
>> "-" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "TRUE" ...
>> "FALSE" ...
>> "UNKNOWN" ...
>> "NULL" ...
>>  ...
>>  ...
>>  ...
>> "DATE" ...
>> "TIME"  ...
>> "TIMESTAMP" ...
>> "INTERVAL" ...
>> "?" ...
>> "CAST" ...
>> "EXTRACT" ...
>> "POSITION" ...
>> "CONVERT" ...
>> "TRANSLATE" ...
>> "OVERLAY" ...
>> "FLOOR" ...
>> "CEIL" ...
>> "CEILING" ...
>> "SUBSTRING" ...
>> "TRIM" ...
>> "CLASSIFIER" ...
>> "MATCH_NUMBER" ...
>> "RUNNING" ...
>> "PREV" ...
>> "NEXT" ...
>> "JSON_EXISTS" ...
>> "JSON_VALUE" ...
>> "JSON_QUERY" ...
>> "JSON_OBJECT" ...
>> "JSON_OBJECTAGG" ...
>> "JSON_ARRAY" ...
>> "JSON_ARRAYAGG" ...
>>  ...
>> "MULTISET" ...
>> "ARRAY" ...
>> "MAP" ...
>> "PERIOD" ...
>> "SPECIFIC" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "ABS" ...
>> "AVG" ...
>> "CARDINALITY" ...
>> "CHAR_LENGTH" ...
>> "CHARACTER_LENGTH" ...
>> "COALESCE" ...
>> "COLLECT" ...
>> "COVAR_POP" ...
>> "COVAR_SAMP" ...
>> "CUME_DIST" ...
>> "COUNT" ...
>> "CURRENT_DATE" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "DENSE_RANK" ...
>> "ELEMENT" ...
>> "EXP" ...
>> "FIRST_VALUE" ...
>> "FUSION" ...
>> "GROUPING" ...
>> "HOUR" ...
>> "LAG" ...
>> "LEAD" ...
>> "LEFT" ...
>> "LAST_VALUE" ...
>> "LN" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "LOWER" ...
>> "MAX" ...
>> "MIN" ...
>> "MINUTE" ...
>> "MOD" ...
>> "MONTH" ...
>> "NTH_VALUE" ...
>> "NTILE" ...
>> "NULLIF" ...
>> "OCTET_LENGTH" ...
>> "PERCENT_RANK" ...
>> "POWER" ...
>> "RANK" ...
>> "REGR_COUNT" ...
>> "REGR_SXX" ...
>> "REGR_SYY" ...
>> "RIGHT" ...
>> "ROW_NUMBER" ...
>> "SECOND" ...
>> "SQRT" ...
>> "STDDEV_POP" ...
>> "STDDEV_SAMP" ...
>> "SUM" ...
>> "UPPER" ...
>> "TRUNCATE" ...
>> "USER" ...
>> "VAR_POP" ...
>> "VAR_SAMP" ...
>> "YEAR" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_USER" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "NEW" ...
>> "CASE" ...
>> "CURRENT" ...
>>
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>> at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at

Re: Re: Re: flink sql Temporal table join failed

2020-06-12 文章 Benchao Li
看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。

Zhou Zach  于2020年6月12日周五 下午3:47写道:

> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
> "CURSOR" ...
> "EXISTS" ...
> "NOT" ...
> "ROW" ...
> "(" ...
> "+" ...
> "-" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "TRUE" ...
> "FALSE" ...
> "UNKNOWN" ...
> "NULL" ...
>  ...
>  ...
>  ...
> "DATE" ...
> "TIME"  ...
> "TIMESTAMP" ...
> "INTERVAL" ...
> "?" ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "NEXT" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
>  ...
> "MULTISET" ...
> "ARRAY" ...
> "MAP" ...
> "PERIOD" ...
> "SPECIFIC" ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "CURRENT_DATE" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "USER" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_USER" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "NEW" ...
> "CASE" ...
> "CURRENT" ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> 

Re:Re: Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
还是不行,
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "time FROM" at line 1, column 44.
Was expecting one of:
"CURSOR" ...
"EXISTS" ...
"NOT" ...
"ROW" ...
"(" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
 ...
 ...
 ...
"DATE" ...
"TIME"  ...
"TIMESTAMP" ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
 ...
"MULTISET" ...
"ARRAY" ...
"MAP" ...
"PERIOD" ...
"SPECIFIC" ...
 ...
 ...
 ...
 ...
 ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 

Re:Re:Re: flink sql Temporal table join failed

2020-06-12 文章 陈邵瑾
参考一下sql相关time的文档,根据描述使用姿势有问题:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
At 2020-06-12 14:24:07, "Zhou Zach"  wrote:
>flink 1.10.0:
>在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-12 14:08:11,"Benchao Li"  写道:
>>Hi,
>>
>>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>可以参考下[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>
>>Zhou Zach  于2020年6月12日周五 下午1:33写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>>
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>>
>>>
>>>
>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>>
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>
>>>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>
>>>   :- FlinkLogicalCalc(select=[uid, time])
>>>
>>>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>
>>>   +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>
>>>  +- FlinkLogicalCalc(select=[uid, age])
>>>
>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>
>>>
>>>
>>>
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>>
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>> field, doesn't support 'PROCTIME()'
>>>
>>> at
>>> 

Re: Re: flink sql Temporal table join failed

2020-06-12 文章 Benchao Li
你写反了,是proctime AS PROCTIME()。
计算列跟普通query里面的AS是反着的。

Zhou Zach  于2020年6月12日周五 下午2:24写道:

> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-12 14:08:11,"Benchao Li"  写道:
> >Hi,
> >
> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >可以参考下[1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >
> >Zhou Zach  于2020年6月12日周五 下午1:33写道:
> >
> >> SLF4J: Class path contains multiple SLF4J bindings.
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> explanation.
> >>
> >> SLF4J: Actual binding is of type
> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >>
> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> configuration: logging only errors to the console.
> >>
> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> Cannot generate a valid execution plan for the given query:
> >>
> >>
> >>
> >>
> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> fields=[time, sum_age])
> >>
> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >>
> >>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >>
> >>   :- FlinkLogicalCalc(select=[uid, time])
> >>
> >>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> phoneType,
> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >>
> >>   +- FlinkLogicalSnapshot(period=[$cor0.time])
> >>
> >>  +- FlinkLogicalCalc(select=[uid, age])
> >>
> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >>
> >>
> >>
> >>
> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> >> table's proctime field, doesn't support 'PROCTIME()'
> >>
> >> Please check the documentation for the set of currently supported SQL
> >> features.
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>
> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>
> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>
> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >>
> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >>
> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> join
> >> currently only supports 'FOR 

Re: flink sql Temporal table join failed

2020-06-12 文章 李奇
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
 

> 在 2020年6月12日,下午2:24,Zhou Zach  写道:
> 
> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-12 14:08:11,"Benchao Li"  写道:
>> Hi,
>> 
>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> 可以参考下[1]
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> 
>> Zhou Zach  于2020年6月12日周五 下午1:33写道:
>> 
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> 
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> 
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>> 
>>> 
>>> 
>>> 
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>> 
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>> 
>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>> 
>>>  :- FlinkLogicalCalc(select=[uid, time])
>>> 
>>>  :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>> 
>>>  +- FlinkLogicalSnapshot(period=[$cor0.time])
>>> 
>>> +- FlinkLogicalCalc(select=[uid, age])
>>> 
>>>+- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>> 
>>> 
>>> 
>>> 
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>> 
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> 
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> 
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> 
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> 
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>> 
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime

Re:Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
flink 1.10.0:
在create table中,加PROCTIME() AS proctime字段报错

















在 2020-06-12 14:08:11,"Benchao Li"  写道:
>Hi,
>
>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>可以参考下[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>
>Zhou Zach  于2020年6月12日周五 下午1:33写道:
>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Cannot generate a valid execution plan for the given query:
>>
>>
>>
>>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> fields=[time, sum_age])
>>
>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>
>>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>
>>   :- FlinkLogicalCalc(select=[uid, time])
>>
>>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>
>>   +- FlinkLogicalSnapshot(period=[$cor0.time])
>>
>>  +- FlinkLogicalCalc(select=[uid, age])
>>
>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>
>>
>>
>>
>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> table's proctime field, doesn't support 'PROCTIME()'
>>
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>
>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> field, doesn't support 'PROCTIME()'
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>
>> at
>> 

Re: flink sql Temporal table join failed

2020-06-12 文章 Benchao Li
Hi,

Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
可以参考下[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html

Zhou Zach  于2020年6月12日周五 下午1:33写道:

> SLF4J: Class path contains multiple SLF4J bindings.
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
>
>
>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> fields=[time, sum_age])
>
> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>
>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>
>   :- FlinkLogicalCalc(select=[uid, time])
>
>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>
>   +- FlinkLogicalSnapshot(period=[$cor0.time])
>
>  +- FlinkLogicalCalc(select=[uid, age])
>
> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> age, created_time)]]], fields=[uid, sex, age, created_time])
>
>
>
>
> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> table's proctime field, doesn't support 'PROCTIME()'
>
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
> Caused by: org.apache.flink.table.api.TableException: Temporal table join
> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> field, doesn't support 'PROCTIME()'
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>
> at
> 

flink sql Temporal table join failed

2020-06-11 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 




FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], 
fields=[time, sum_age])

+- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])

   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])

  :- FlinkLogicalCalc(select=[uid, time])

  :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, 
clickCount, time)]]], fields=[uid, phoneType, clickCount, time])

  +- FlinkLogicalSnapshot(period=[$cor0.time])

 +- FlinkLogicalCalc(select=[uid, age])

+- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, 
created_time)]]], fields=[uid, sex, age, created_time])




Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left 
table's proctime field, doesn't support 'PROCTIME()'

Please check the documentation for the set of currently supported SQL features.

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)

at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)

at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)

at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

at 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)

at 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)

Caused by: org.apache.flink.table.api.TableException: Temporal table join 
currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, 
doesn't support 'PROCTIME()'

at 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)

at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)

at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at