Hi, 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
瓜牛 <[email protected]> 于2020年5月26日周二 下午6:07写道: > hi,大家好! > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > role_id 的去重数 > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > 大小却一直是增加的,是 SQL 写得有问题吗? > > 麻烦大家帮我看一下 > > 谢谢! > > ---------------- > > CREATE TABLE source_kafka ( > dtime string, > wm as cast(dtime as TIMESTAMP(3)), > server string, > reason string, > role_id string, > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'xxx', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'connector.properties.group.id' = 'xxx', > 'format.type' = 'json', > ) > ----------------- > > CREATE TABLE sink_kafka ( > window_time string, > server string, > reason string, > role_id_distinct_cnt BIGINT, > log_cnt BIGINT > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'xxx', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'format.type' = 'json' > ) > ----------------- > > INSERT INTO sink_kafka > SELECT > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), 'yyyy-MM-dd HH:mm:ss') > AS window_time, > server, > reason, > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > COUNT(1) AS log_cnt > FROM source_kafka > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason -- Best, Benchao Li
