使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。


【join】

> SELECT `b`.`rowtime`,
> `a`.`c_id`,
> `b`.`openid`
> FROM `test_table_a` AS `a`
> INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> AND `a`.`openid` = `b`.`openid`
> AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> AND `a`.`rowtime` + INTERVAL '6' HOUR
>
>
【window】

> SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `rowtime`,
> HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__windoow_start__`,
> HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__window_end__`,
> `c_id`,
> COUNT(`openid`) AS `cnt`
> FROM `test_table_in_6h`
> GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> `c_id`
>


我配置了Fink的内存是4G, 实际使用达到了6.8 G。
同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右

【配置】

> cat conf/flink-conf.yaml
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 6144m
> taskmanager.memory.process.size: 4g
> taskmanager.memory.jvm-overhead.min: 1024m
> taskmanager.memory.jvm-overhead.max: 2048m
> taskmanager.debug.memory.log-interval: 10000
> env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
> -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>



-- 
**************************************
 tivanli
**************************************

回复