https://issues.apache.org/jira/browse/FLINK-16068 https://issues.apache.org/jira/browse/FLINK-16345 上面这两个issue的修改都加到了1.10上了么?如果是的话,那这可能是还有其他的bug。 如果你可以在1.10和或者master分支的最新代码上复现这个问题的话,可以建一个issue来跟踪下这个问题。
111 <[email protected]> 于2020年4月16日周四 上午10:46写道: > Hi, > 基于1.10 源码按照jira里面的PR修改不行么? > 跟hbase的ddl关系应该不大,就发一个kafka的吧。 > > > //代码占位符 > Flink SQL> CREATE TABLE kafka_test1 ( > //代码占位符 > Flink SQL> CREATE TABLE kafka_test1 ( > > id varchar, > > a varchar, > > b int, > > ts as PROCTIME() > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'test', > > 'connector.properties.zookeeper.connect' = 'localnode2:2181', > > 'connector.properties.bootstrap.servers' = 'localnode2:9092', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.startup-mode' = 'latest-offset', > > 'format.type' = 'json' > > ) > > ; > [INFO] Table has been created. > > > Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR > SYSTEM_TIME AS OF a.ts as b on a.id = b.rowkey; > > > 异常信息: > //代码占位符 > [ERROR] Could not execute SQL statement. Reason: > java.lang.AssertionError: Conversion to relational algebra failed to > preserve datatypes: > validated type: > RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT > NULL ts, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, > RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, > INTEGER b) f) NOT NULL > converted type: > RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIME > ATTRIBUTE(PROCTIME) NOT NULL ts, VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" a, INTEGER b) f) NOT NULL > rel: > LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5]) > LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{0, 3}]) > LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()]) > LogicalTableScan(table=[[tgou, collie, kafka_test1, source: > [Kafka011TableSource(id, a, b)]]]) > LogicalFilter(condition=[=($cor1.id, $0)]) > LogicalSnapshot(period=[$cor1.ts]) > LogicalTableScan(table=[[tgou, collie, hbase_test1, source: > [HBaseTableSource[schema=[rowkey, f], projectFields=null]]]]) > > > Best, > Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
