使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
【join】 > SELECT `b`.`rowtime`, > `a`.`c_id`, > `b`.`openid` > FROM `test_table_a` AS `a` > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > AND `a`.`openid` = `b`.`openid` > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND > AND `a`.`rowtime` + INTERVAL '6' HOUR > > 【window】 > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > `rowtime`, > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > `__windoow_start__`, > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > `__window_end__`, > `c_id`, > COUNT(`openid`) AS `cnt` > FROM `test_table_in_6h` > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > `c_id` > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 【配置】 > cat conf/flink-conf.yaml > jobmanager.rpc.address: flink-jobmanager > taskmanager.numberOfTaskSlots: 1 > blob.server.port: 6124 > jobmanager.rpc.port: 6123 > taskmanager.rpc.port: 6122 > jobmanager.heap.size: 6144m > taskmanager.memory.process.size: 4g > taskmanager.memory.jvm-overhead.min: 1024m > taskmanager.memory.jvm-overhead.max: 2048m > taskmanager.debug.memory.log-interval: 10000 > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > -- ************************************** tivanli **************************************
