Hi Benchao, 感谢你的回复 我使用的RocksDB,内存 overhead 太多了, 什么地方超用了那么多内存,好像不受控制了。 另外,我也测试过hop窗口,也是超用内存比较。没有使用增量checkpoint。
最后,我这边的interval join 是 inner join ,使用的 b 表的rowtime作为时间,没有观察到延迟数据的情况。 [image: image.png] Benchao Li <[email protected]> 于2020年9月23日周三 上午10:50写道: > Hi Tianwang, > > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 > > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 > `Math.max(leftRelativeSize, rightRelativeSize) + > allowedLateness`,根据你的SQL,这个值应该是6h > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / > 2;`,在你的SQL来讲,就是3h,也就是说 > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] > > 希望这个可以解答你的疑惑~ > > [1] https://issues.apache.org/jira/browse/FLINK-18996 > > Tianwang Li <[email protected]> 于2020年9月22日周二 下午8:26写道: > > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > > > > 【join】 > > > > > SELECT `b`.`rowtime`, > > > `a`.`c_id`, > > > `b`.`openid` > > > FROM `test_table_a` AS `a` > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > > AND `a`.`openid` = `b`.`openid` > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' > SECOND > > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > > > > 【window】 > > > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) > AS > > > `rowtime`, > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__windoow_start__`, > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__window_end__`, > > > `c_id`, > > > COUNT(`openid`) AS `cnt` > > > FROM `test_table_in_6h` > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > > `c_id` > > > > > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > > > 【配置】 > > > > > cat conf/flink-conf.yaml > > > jobmanager.rpc.address: flink-jobmanager > > > taskmanager.numberOfTaskSlots: 1 > > > blob.server.port: 6124 > > > jobmanager.rpc.port: 6123 > > > taskmanager.rpc.port: 6122 > > > jobmanager.heap.size: 6144m > > > taskmanager.memory.process.size: 4g > > > taskmanager.memory.jvm-overhead.min: 1024m > > > taskmanager.memory.jvm-overhead.max: 2048m > > > taskmanager.debug.memory.log-interval: 10000 > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > > -XX:NumberOfGCLogFiles=10 > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > > > > > > -- > > ************************************** > > tivanli > > ************************************** > > > > > -- > > Best, > Benchao Li > -- ************************************** tivanli **************************************
