Re: flink sql Temporal table join failed
你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 祝好 Leonard Xu > 在 2020年6月12日,17:38,Zhou Zach 写道: > > > > > 是的,1.10.0版本 > > > > > > > > > 在 2020-06-12 16:28:15,"Benchao Li" 写道: >> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 >> >> Zhou Zach 于2020年6月12日周五 下午3:47写道: >> >>> 还是不行, >>> SLF4J: Class path contains multiple SLF4J bindings. >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >>> SQL parse failed. Encountered "time FROM" at line 1, column 44. >>> Was expecting one of: >>>"CURSOR" ... >>>"EXISTS" ... >>>"NOT" ... >>>"ROW" ... >>>"(" ... >>>"+" ... >>>"-" ... >>> ... >>> ... >>> ... >>> ... >>> ... >>> ... >>> ... >>>"TRUE" ... >>>"FALSE" ... >>>"UNKNOWN" ... >>>"NULL" ... >>> ... >>> ... >>> ... >>>"DATE" ... >>>"TIME" ... >>>"TIMESTAMP" ... >>>"INTERVAL" ... >>>"?" ... >>>"CAST" ... >>>"EXTRACT" ... >>>"POSITION" ... >>>"CONVERT" ... >>>"TRANSLATE" ... >>>"OVERLAY" ... >>>"FLOOR" ... >>>"CEIL" ... >>>"CEILING" ... >>>"SUBSTRING" ... >>>"TRIM" ... >>>"CLASSIFIER" ... >>>"MATCH_NUMBER" ... >>>"RUNNING" ... >>>"PREV" ... >>>"NEXT" ... >>>"JSON_EXISTS" ... >>>"JSON_VALUE" ... >>>"JSON_QUERY" ... >>>"JSON_OBJECT" ... >>>"JSON_OBJECTAGG" ... >>>"JSON_ARRAY" ... >>>"JSON_ARRAYAGG" ... >>> ... >>>"MULTISET" ... >>>"ARRAY" ... >>>"MAP" ... >>>"PERIOD" ... >>>"SPECIFIC" ... >>> ... >>> ... >>> ... >>> ... >>> ... >>>"ABS" ... >>>"AVG" ... >>>"CARDINALITY" ... >>>"CHAR_LENGTH" ... >>>"CHARACTER_LENGTH" ... >>>"COALESCE" ... >>>"COLLECT" ... >>>"COVAR_POP" ... >>>"COVAR_SAMP" ... >>>"CUME_DIST" ... >>>"COUNT" ... >>>"CURRENT_DATE" ... >>>"CURRENT_TIME" ... >>>"CURRENT_TIMESTAMP" ... >>>"DENSE_RANK" ... >>>"ELEMENT" ... >>>"EXP" ... >>>"FIRST_VALUE" ... >>>"FUSION" ... >>>"GROUPING" ... >>>"HOUR" ... >>>"LAG" ... >>>"LEAD" ... >>>"LEFT" ... >>>"LAST_VALUE" ... >>>"LN" ... >>>"LOCALTIME" ... >>>"LOCALTIMESTAMP" ... >>>"LOWER" ... >>>"MAX" ... >>>"MIN" ... >>>"MINUTE" ... >>>"MOD" ... >>>"MONTH" ... >>>"NTH_VALUE" ... >>>"NTILE" ... >>>"NULLIF" ... >>>"OCTET_LENGTH" ... >>>"PERCENT_RANK" ... >>>"POWER" ... >>>"RANK" ... >>>"REGR_COUNT" ... >>>"REGR_SXX" ... >>>"REGR_SYY" ... >>>"RIGHT" ... >>>"ROW_NUMBER" ... >>>"SECOND" ... >>>"SQRT" ... >>>"STDDEV_POP" ... >>>"STDDEV_SAMP" ... >>>"SUM" ... >>>"UPPER" ... >>>"TRUNCATE" ... >>>"USER" ... >>>"VAR_POP" ... >>>"VAR_SAMP" ... >>>"YEAR" ... >>>"CURRENT_CATALOG" ... >>>"CURRENT_DEFAULT_TRANSFORM_GROUP" ... >>>"CURRENT_PATH" ... >>>"CURRENT_ROLE" ... >>>"CURRENT_SCHEMA" ... >>>"CURRENT_USER" ... >>>"SESSION_USER" ... >>>"SYSTEM_USER" ... >>>"NEW" ... >>>"CASE" ... >>>"CURRENT" ... >>> >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >>> at >>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>> at >>>
Re:Re: Re: Re: flink sql Temporal table join failed
是的,1.10.0版本 在 2020-06-12 16:28:15,"Benchao Li" 写道: >看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 > >Zhou Zach 于2020年6月12日周五 下午3:47写道: > >> 还是不行, >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> SLF4J: Actual binding is of type >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> ERROR StatusLogger No log4j2 configuration file found. Using default >> configuration: logging only errors to the console. >> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >> SQL parse failed. Encountered "time FROM" at line 1, column 44. >> Was expecting one of: >> "CURSOR" ... >> "EXISTS" ... >> "NOT" ... >> "ROW" ... >> "(" ... >> "+" ... >> "-" ... >> ... >> ... >> ... >> ... >> ... >> ... >> ... >> "TRUE" ... >> "FALSE" ... >> "UNKNOWN" ... >> "NULL" ... >> ... >> ... >> ... >> "DATE" ... >> "TIME" ... >> "TIMESTAMP" ... >> "INTERVAL" ... >> "?" ... >> "CAST" ... >> "EXTRACT" ... >> "POSITION" ... >> "CONVERT" ... >> "TRANSLATE" ... >> "OVERLAY" ... >> "FLOOR" ... >> "CEIL" ... >> "CEILING" ... >> "SUBSTRING" ... >> "TRIM" ... >> "CLASSIFIER" ... >> "MATCH_NUMBER" ... >> "RUNNING" ... >> "PREV" ... >> "NEXT" ... >> "JSON_EXISTS" ... >> "JSON_VALUE" ... >> "JSON_QUERY" ... >> "JSON_OBJECT" ... >> "JSON_OBJECTAGG" ... >> "JSON_ARRAY" ... >> "JSON_ARRAYAGG" ... >> ... >> "MULTISET" ... >> "ARRAY" ... >> "MAP" ... >> "PERIOD" ... >> "SPECIFIC" ... >> ... >> ... >> ... >> ... >> ... >> "ABS" ... >> "AVG" ... >> "CARDINALITY" ... >> "CHAR_LENGTH" ... >> "CHARACTER_LENGTH" ... >> "COALESCE" ... >> "COLLECT" ... >> "COVAR_POP" ... >> "COVAR_SAMP" ... >> "CUME_DIST" ... >> "COUNT" ... >> "CURRENT_DATE" ... >> "CURRENT_TIME" ... >> "CURRENT_TIMESTAMP" ... >> "DENSE_RANK" ... >> "ELEMENT" ... >> "EXP" ... >> "FIRST_VALUE" ... >> "FUSION" ... >> "GROUPING" ... >> "HOUR" ... >> "LAG" ... >> "LEAD" ... >> "LEFT" ... >> "LAST_VALUE" ... >> "LN" ... >> "LOCALTIME" ... >> "LOCALTIMESTAMP" ... >> "LOWER" ... >> "MAX" ... >> "MIN" ... >> "MINUTE" ... >> "MOD" ... >> "MONTH" ... >> "NTH_VALUE" ... >> "NTILE" ... >> "NULLIF" ... >> "OCTET_LENGTH" ... >> "PERCENT_RANK" ... >> "POWER" ... >> "RANK" ... >> "REGR_COUNT" ... >> "REGR_SXX" ... >> "REGR_SYY" ... >> "RIGHT" ... >> "ROW_NUMBER" ... >> "SECOND" ... >> "SQRT" ... >> "STDDEV_POP" ... >> "STDDEV_SAMP" ... >> "SUM" ... >> "UPPER" ... >> "TRUNCATE" ... >> "USER" ... >> "VAR_POP" ... >> "VAR_SAMP" ... >> "YEAR" ... >> "CURRENT_CATALOG" ... >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ... >> "CURRENT_PATH" ... >> "CURRENT_ROLE" ... >> "CURRENT_SCHEMA" ... >> "CURRENT_USER" ... >> "SESSION_USER" ... >> "SYSTEM_USER" ... >> "NEW" ... >> "CASE" ... >> "CURRENT" ... >> >> at >> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >> at >> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >> at
Re: Re: Re: flink sql Temporal table join failed
看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 Zhou Zach 于2020年6月12日周五 下午3:47写道: > 还是不行, > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > ERROR StatusLogger No log4j2 configuration file found. Using default > configuration: logging only errors to the console. > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Encountered "time FROM" at line 1, column 44. > Was expecting one of: > "CURSOR" ... > "EXISTS" ... > "NOT" ... > "ROW" ... > "(" ... > "+" ... > "-" ... > ... > ... > ... > ... > ... > ... > ... > "TRUE" ... > "FALSE" ... > "UNKNOWN" ... > "NULL" ... > ... > ... > ... > "DATE" ... > "TIME" ... > "TIMESTAMP" ... > "INTERVAL" ... > "?" ... > "CAST" ... > "EXTRACT" ... > "POSITION" ... > "CONVERT" ... > "TRANSLATE" ... > "OVERLAY" ... > "FLOOR" ... > "CEIL" ... > "CEILING" ... > "SUBSTRING" ... > "TRIM" ... > "CLASSIFIER" ... > "MATCH_NUMBER" ... > "RUNNING" ... > "PREV" ... > "NEXT" ... > "JSON_EXISTS" ... > "JSON_VALUE" ... > "JSON_QUERY" ... > "JSON_OBJECT" ... > "JSON_OBJECTAGG" ... > "JSON_ARRAY" ... > "JSON_ARRAYAGG" ... > ... > "MULTISET" ... > "ARRAY" ... > "MAP" ... > "PERIOD" ... > "SPECIFIC" ... > ... > ... > ... > ... > ... > "ABS" ... > "AVG" ... > "CARDINALITY" ... > "CHAR_LENGTH" ... > "CHARACTER_LENGTH" ... > "COALESCE" ... > "COLLECT" ... > "COVAR_POP" ... > "COVAR_SAMP" ... > "CUME_DIST" ... > "COUNT" ... > "CURRENT_DATE" ... > "CURRENT_TIME" ... > "CURRENT_TIMESTAMP" ... > "DENSE_RANK" ... > "ELEMENT" ... > "EXP" ... > "FIRST_VALUE" ... > "FUSION" ... > "GROUPING" ... > "HOUR" ... > "LAG" ... > "LEAD" ... > "LEFT" ... > "LAST_VALUE" ... > "LN" ... > "LOCALTIME" ... > "LOCALTIMESTAMP" ... > "LOWER" ... > "MAX" ... > "MIN" ... > "MINUTE" ... > "MOD" ... > "MONTH" ... > "NTH_VALUE" ... > "NTILE" ... > "NULLIF" ... > "OCTET_LENGTH" ... > "PERCENT_RANK" ... > "POWER" ... > "RANK" ... > "REGR_COUNT" ... > "REGR_SXX" ... > "REGR_SYY" ... > "RIGHT" ... > "ROW_NUMBER" ... > "SECOND" ... > "SQRT" ... > "STDDEV_POP" ... > "STDDEV_SAMP" ... > "SUM" ... > "UPPER" ... > "TRUNCATE" ... > "USER" ... > "VAR_POP" ... > "VAR_SAMP" ... > "YEAR" ... > "CURRENT_CATALOG" ... > "CURRENT_DEFAULT_TRANSFORM_GROUP" ... > "CURRENT_PATH" ... > "CURRENT_ROLE" ... > "CURRENT_SCHEMA" ... > "CURRENT_USER" ... > "SESSION_USER" ... > "SYSTEM_USER" ... > "NEW" ... > "CASE" ... > "CURRENT" ... > > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) > at > org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) > at >
Re:Re: Re: flink sql Temporal table join failed
还是不行, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "time FROM" at line 1, column 44. Was expecting one of: "CURSOR" ... "EXISTS" ... "NOT" ... "ROW" ... "(" ... "+" ... "-" ... ... ... ... ... ... ... ... "TRUE" ... "FALSE" ... "UNKNOWN" ... "NULL" ... ... ... ... "DATE" ... "TIME" ... "TIMESTAMP" ... "INTERVAL" ... "?" ... "CAST" ... "EXTRACT" ... "POSITION" ... "CONVERT" ... "TRANSLATE" ... "OVERLAY" ... "FLOOR" ... "CEIL" ... "CEILING" ... "SUBSTRING" ... "TRIM" ... "CLASSIFIER" ... "MATCH_NUMBER" ... "RUNNING" ... "PREV" ... "NEXT" ... "JSON_EXISTS" ... "JSON_VALUE" ... "JSON_QUERY" ... "JSON_OBJECT" ... "JSON_OBJECTAGG" ... "JSON_ARRAY" ... "JSON_ARRAYAGG" ... ... "MULTISET" ... "ARRAY" ... "MAP" ... "PERIOD" ... "SPECIFIC" ... ... ... ... ... ... "ABS" ... "AVG" ... "CARDINALITY" ... "CHAR_LENGTH" ... "CHARACTER_LENGTH" ... "COALESCE" ... "COLLECT" ... "COVAR_POP" ... "COVAR_SAMP" ... "CUME_DIST" ... "COUNT" ... "CURRENT_DATE" ... "CURRENT_TIME" ... "CURRENT_TIMESTAMP" ... "DENSE_RANK" ... "ELEMENT" ... "EXP" ... "FIRST_VALUE" ... "FUSION" ... "GROUPING" ... "HOUR" ... "LAG" ... "LEAD" ... "LEFT" ... "LAST_VALUE" ... "LN" ... "LOCALTIME" ... "LOCALTIMESTAMP" ... "LOWER" ... "MAX" ... "MIN" ... "MINUTE" ... "MOD" ... "MONTH" ... "NTH_VALUE" ... "NTILE" ... "NULLIF" ... "OCTET_LENGTH" ... "PERCENT_RANK" ... "POWER" ... "RANK" ... "REGR_COUNT" ... "REGR_SXX" ... "REGR_SYY" ... "RIGHT" ... "ROW_NUMBER" ... "SECOND" ... "SQRT" ... "STDDEV_POP" ... "STDDEV_SAMP" ... "SUM" ... "UPPER" ... "TRUNCATE" ... "USER" ... "VAR_POP" ... "VAR_SAMP" ... "YEAR" ... "CURRENT_CATALOG" ... "CURRENT_DEFAULT_TRANSFORM_GROUP" ... "CURRENT_PATH" ... "CURRENT_ROLE" ... "CURRENT_SCHEMA" ... "CURRENT_USER" ... "SESSION_USER" ... "SYSTEM_USER" ... "NEW" ... "CASE" ... "CURRENT" ... at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at
Re: Re: flink sql Temporal table join failed
你写反了,是proctime AS PROCTIME()。 计算列跟普通query里面的AS是反着的。 Zhou Zach 于2020年6月12日周五 下午2:24写道: > flink 1.10.0: > 在create table中,加PROCTIME() AS proctime字段报错 > > > > > > > > > > > > > > > > > > 在 2020-06-12 14:08:11,"Benchao Li" 写道: > >Hi, > > > >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 > >可以参考下[1] > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > > > >Zhou Zach 于2020年6月12日周五 下午1:33写道: > > > >> SLF4J: Class path contains multiple SLF4J bindings. > >> > >> SLF4J: Found binding in > >> > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> > >> SLF4J: Found binding in > >> > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> > >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > >> explanation. > >> > >> SLF4J: Actual binding is of type > >> [org.apache.logging.slf4j.Log4jLoggerFactory] > >> > >> ERROR StatusLogger No log4j2 configuration file found. Using default > >> configuration: logging only errors to the console. > >> > >> Exception in thread "main" org.apache.flink.table.api.TableException: > >> Cannot generate a valid execution plan for the given query: > >> > >> > >> > >> > >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > >> fields=[time, sum_age]) > >> > >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > >> > >>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > >> > >> :- FlinkLogicalCalc(select=[uid, time]) > >> > >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> default_database, user_behavior, source: [KafkaTableSource(uid, > phoneType, > >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > >> > >> +- FlinkLogicalSnapshot(period=[$cor0.time]) > >> > >> +- FlinkLogicalCalc(select=[uid, age]) > >> > >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, > >> age, created_time)]]], fields=[uid, sex, age, created_time]) > >> > >> > >> > >> > >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left > >> table's proctime field, doesn't support 'PROCTIME()' > >> > >> Please check the documentation for the set of currently supported SQL > >> features. > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > >> > >> at > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> > >> at > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> > >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> > >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> > >> at > >> > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > >> > >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > >> > >> at > >> > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > >> > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > >> > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > >> > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >> > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > >> > >> at > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > >> > >> at > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > >> > >> Caused by: org.apache.flink.table.api.TableException: Temporal table > join > >> currently only supports 'FOR
Re: flink sql Temporal table join failed
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html > 在 2020年6月12日,下午2:24,Zhou Zach 写道: > > flink 1.10.0: > 在create table中,加PROCTIME() AS proctime字段报错 > > > > > > > > > > > > > > > > > >> 在 2020-06-12 14:08:11,"Benchao Li" 写道: >> Hi, >> >> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >> 可以参考下[1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> >> Zhou Zach 于2020年6月12日周五 下午1:33写道: >> >>> SLF4J: Class path contains multiple SLF4J bindings. >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Cannot generate a valid execution plan for the given query: >>> >>> >>> >>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>> fields=[time, sum_age]) >>> >>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>> >>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>> >>> :- FlinkLogicalCalc(select=[uid, time]) >>> >>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, >>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>> >>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>> >>> +- FlinkLogicalCalc(select=[uid, age]) >>> >>>+- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>> >>> >>> >>> >>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >>> table's proctime field, doesn't support 'PROCTIME()' >>> >>> Please check the documentation for the set of currently supported SQL >>> features. >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> >>> at >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>> >>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> Caused by: org.apache.flink.table.api.TableException: Temporal table join >>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
Re: flink sql Temporal table join failed
Hi, Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 可以参考下[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html Zhou Zach 于2020年6月12日周五 下午1:33写道: > SLF4J: Class path contains multiple SLF4J bindings. > > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > > ERROR StatusLogger No log4j2 configuration file found. Using default > configuration: logging only errors to the console. > > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query: > > > > > FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > fields=[time, sum_age]) > > +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > >+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > > :- FlinkLogicalCalc(select=[uid, time]) > > : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, > clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > > +- FlinkLogicalSnapshot(period=[$cor0.time]) > > +- FlinkLogicalCalc(select=[uid, age]) > > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, > age, created_time)]]], fields=[uid, sex, age, created_time]) > > > > > Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left > table's proctime field, doesn't support 'PROCTIME()' > > Please check the documentation for the set of currently supported SQL > features. > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > > at > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > > Caused by: org.apache.flink.table.api.TableException: Temporal table join > currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime > field, doesn't support 'PROCTIME()' > > at > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) > > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > > at >