flink版本 flink1.11

flink sql连接kafka
create table kafka_table (
log_id  string,
event_time bigint,
process_time as PROCTIME(),
ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
watermark for ts as ts - interval '1' second
) with (
 'connector' = 'kafka',
 'topic' = 'kafka_table',
 'properties.bootstrap.servers' = '10.2.12.3:9092',
 'properties.group.id' = 'tmp-log-consumer003',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)





使用窗口聚合的代码
val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL '10' 
SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' SECOND, 
INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group by 
HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' 
SECOND),kafka_table.src_ip")


相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
求问是什么原因不能触发窗口或者我的用法有什么问题吗

回复