你指定时间语义是EventTime了吗
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


> 2020年7月29日 上午9:56,111 <taochangl...@163.com> 写道:
> 
> 
> 
> 
> 
> 
> 
> 您好,请教一个问题,谢谢:
> 很简单的json,
> {"num":100,"ts":1595949526874,"vin":"DDDD"}
> {"num":200,"ts":1595949528874,"vin":"AAAA"}
> {"num":200,"ts":1595949530880,"vin":"CCCC"}
> {"num":300,"ts":1595949532883,"vin":"CCCC"}
> {"num":100,"ts":1595949534888,"vin":"AAAA"}
> {"num":300,"ts":1595949536892,"vin":"DDDD"}
> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
> public class FlinkKafka {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
> 
>        String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
> " ts BIGINT,\n" +
> " num INT ,\n" +
> " vin STRING ,\n" +
> " pts AS PROCTIME() ,  \n" +  //处理时间
> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), 
> \n " +
> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kkb',\n" +
> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
> " 'properties.group.id' = 'mm',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset' \n" +
> ")";
>        tableEnv.executeSql(kafkaSourceTable);
> 
>        String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
> 
>        windowAllTable.printSchema();
>        tableEnv.toAppendStream(windowAllTable, Row.class).print();
>        
> System.out.println("------------------------------------------------------");
>        env.execute("job");
> 
>    }
> 
> }
> 
> 
> ---------------------------
> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
> 打印结果:
> root
> |-- ts: BIGINT
> |-- num: INT
> |-- vin: STRING
> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
> |-- rowtime: TIMESTAMP(3) *ROWTIME*
> 
> 
> ------------------------------------------------------
> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
> 
> 
> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' 
> SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
> 版本是flink1.11.0
> 
> 
> 望指教,谢谢!
> 
> 

回复