如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发: 1. 保证所有 partition 都有数据。 2. 且每个 partition 数据的 event time 都在前进 3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s = 11s
以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。 Best, Jark On Sat, 14 Nov 2020 at 15:11, 李世钰 <[email protected]> wrote: > flink版本 flink1.11 > > > flink sql连接kafka > create table kafka_table ( > log_id string, > event_time bigint, > process_time as PROCTIME(), > ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)), > watermark for ts as ts - interval '1' second > ) with ( > 'connector' = 'kafka', > 'topic' = 'kafka_table', > 'properties.bootstrap.servers' = '10.2.12.3:9092', > 'properties.group.id' = 'tmp-log-consumer003', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > > > > 使用窗口聚合的代码 > val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL > '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' > SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group > by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' > SECOND),kafka_table.src_ip") > > > 相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发, > 系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的 > 求问是什么原因不能触发窗口或者我的用法有什么问题吗
