????select??????sum????????????sum????????????type????????????????????????????MySQL????????MySQL????????
(id,type,value)????????????????????????????????
SQL??????
CREATE TABLE kafka_table (
vin STRING,
speed DOUBLE,
brake DOUBLE,
hard_to DOUBLE,
distance DOUBLE,
times TIMESTAMP(3),
WATERMARK FOR times AS times - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
select window_start, window_end,vin,array[row('brakes',sum(if(brake >
3.0451,1,0))),row('hard_tos',sum(if(hard_to > 3.0451,1,0)))]
from TABLE(
TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10'
MINUTES)) group by window_start, window_end,vin;
??????????
Exception in thread "main" java.lang.AssertionError: Conversion to relational
algebra failed to preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1)
NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT
NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
rel:
LogicalProject(window_start=[$0], window_end=[$1], vin=[$2],
EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER
SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL,
CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET
"UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)])
LogicalProject(window_start=[$6], window_end=[$7], vin=[$0],
$f3=[IF(>($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(>($3,
3.0451:DECIMAL(5, 4)), 1, 0)])
LogicalTableFunctionScan(invocation=[TUMBLE($5,
DESCRIPTOR($5), 600000:INTERVAL MINUTE)],
rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE
hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
LogicalProject(vin=[$0], speed=[$1], brake=[$2],
hard_to=[$3], distance=[$4], times=[$5])
LogicalWatermarkAssigner(rowtime=[times],
watermark=[-($5, 5000:INTERVAL SECOND)])
LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])
at
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket'
Process finished with exit code 1