您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
create table kafka_table (
`log_id`  string,
event_date timestamp(3),
process_time as PROCTIME(),
ts as event_date,
watermark for ts as ts - interval '1' second
) with (
 'connector' = 'kafka',
 'topic' = 'kafka_table',
 'properties.bootstrap.servers' = '10.2.12.3:9092',
 'properties.group.id' = 'tmp-log-consumer003',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)
执行的sql是
select TUMBLE_START(kafka_table.event_date, INTERVAL '10' 
SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' 
SECOND),src_ip,count(dest_ip) from kafka_table group by 
TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip




select log_id,process_time,ts from kafka_table查询的表结构如下
表结构为
root
 |-- log_id: STRING
 |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
 |-- ts: TIMESTAMP(3) *ROWTIME*


 输入数据为
log_id,process_time,ts
13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806

回复