Hi! 这看起来像一个 bug,我已经记了一个 issue [1],可以在那里关注问题进展。
如 issue 中所描述,目前看来如果常量字符串一样长,或者都 cast 成 varchar 可以绕过该问题。可以先这样绕过一下。 [1] https://issues.apache.org/jira/browse/FLINK-24537 kcz <[email protected]> 于2021年10月13日周三 下午5:29写道: > 因为select出多个sum的值,每一个sum的值都是一个type类型的数据,最后我将它插入到MySQL表里面,MySQL表结构为 > (id,type,value),于是想到通过列转行形式来操作。 > SQL如下: > CREATE TABLE kafka_table ( > > vin STRING, > > speed DOUBLE, > > brake DOUBLE, > > hard_to DOUBLE, > > distance DOUBLE, > > times TIMESTAMP(3), > > WATERMARK FOR times AS times - INTERVAL > '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > > > > > select window_start, window_end,vin,array[row('brakes',sum(if(brake > > 3.0451,1,0))),row('hard_tos',sum(if(hard_to > 3.0451,1,0)))] > from TABLE( > TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' > MINUTES)) group by window_start, window_end,vin; > > > 报错如下: > Exception in thread "main" java.lang.AssertionError: Conversion to > relational algebra failed to preserve datatypes: > validated type: > RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL > window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, > RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER > EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL > converted type: > RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL > window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, > RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT > NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL > rel: > LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], > EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) > CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT > NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET > "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)]) > LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], > agg#1=[SUM($4)]) > LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], > $f3=[IF(>($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(>($3, > 3.0451:DECIMAL(5, 4)), 1, 0)]) > LogicalTableFunctionScan(invocation=[TUMBLE($5, > DESCRIPTOR($5), 600000:INTERVAL MINUTE)], > rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, > DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) > window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) > LogicalProject(vin=[$0], speed=[$1], > brake=[$2], hard_to=[$3], distance=[$4], times=[$5]) > > LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL > SECOND)]) > > LogicalTableScan(table=[[default_catalog, default_database, kafka_table]]) > > > at > org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) > at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48) > at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87) > Disconnected from the target VM, address: '127.0.0.1:61710', transport: > 'socket' > > > Process finished with exit code 1
