????select??????sum????????????sum????????????type????????????????????????????MySQL????????MySQL???????? (id,type,value)???????????????????????????????? SQL?????? CREATE TABLE kafka_table ( vin STRING, speed DOUBLE, brake DOUBLE, hard_to DOUBLE, distance DOUBLE, times TIMESTAMP(3), WATERMARK FOR times AS times - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );
select window_start, window_end,vin,array[row('brakes',sum(if(brake > 3.0451,1,0))),row('hard_tos',sum(if(hard_to > 3.0451,1,0)))] from TABLE( TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' MINUTES)) group by window_start, window_end,vin; ?????????? Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL converted type: RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL rel: LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)]) LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)]) LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], $f3=[IF(>($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(>($3, 3.0451:DECIMAL(5, 4)), 1, 0)]) LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 600000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) LogicalProject(vin=[$0], speed=[$1], brake=[$2], hard_to=[$3], distance=[$4], times=[$5]) LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL SECOND)]) LogicalTableScan(table=[[default_catalog, default_database, kafka_table]]) at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48) at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87) Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket' Process finished with exit code 1