????select??????sum????????????sum????????????type????????????????????????????MySQL????????MySQL????????
 (id,type,value)????????????????????????????????
SQL??????
CREATE TABLE kafka_table (
                      
       vin STRING,
                      
       speed DOUBLE,
                      
       brake DOUBLE,
                      
       hard_to DOUBLE,
                      
       distance DOUBLE,
                      
       times TIMESTAMP(3),
                      
       WATERMARK FOR times AS times - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);




select window_start, window_end,vin,array[row('brakes',sum(if(brake > 
3.0451,1,0))),row('hard_tos',sum(if(hard_to > 3.0451,1,0)))]
from TABLE(
    TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' 
MINUTES)) group by window_start, window_end,vin;


??????????
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL 
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, 
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) 
NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL 
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, 
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT 
NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
rel:
LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], 
EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER 
SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, 
CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET 
"UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
  LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)])
    LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], 
$f3=[IF(>($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(>($3, 
3.0451:DECIMAL(5, 4)), 1, 0)])
      LogicalTableFunctionScan(invocation=[TUMBLE($5, 
DESCRIPTOR($5), 600000:INTERVAL MINUTE)], 
rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE 
hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
        LogicalProject(vin=[$0], speed=[$1], brake=[$2], 
hard_to=[$3], distance=[$4], times=[$5])
          LogicalWatermarkAssigner(rowtime=[times], 
watermark=[-($5, 5000:INTERVAL SECOND)])
            
LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])


        at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
        at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
        at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket'


Process finished with exit code 1

回复