Hi Chris,

Interval Join should clean state which is not joined during interval and you 
don't need to set state TTL. (Actually, the states used in interval join are 
not exposed out and you cannot set TTL for those state as TTL is only public 
for user self-described states.)

The checkpoint size continues to increase does not mean your actual state also 
increases. RocksDB actually write a deleter when remove element and those 
useless data would be cleared physically after compaction. You could judge 
whether state really grows up by using non-incremental checkpoints to see how 
much state size will be.

Moreover, the OOM should not be related to RocksDB as it used off-heap native 
memory, and you might need some work to dig what occupied the JVM memory during 
checkpoints.

Best
Yun Tang
________________________________
From: McBride, Chris <m...@amazon.com>
Sent: Saturday, June 5, 2021 3:17
To: user@flink.apache.org <user@flink.apache.org>
Subject: Question about State TTL and Interval Join


We currently have a flink 1.8 application deployed on Kinesis Data Analytics 
using the RocksDB State backend. Our application is joining across 3 different 
kinesis streams using an interval join. We noticed that our checkpoint sizes 
continue to increase over time, we eventually have OOM failures writing 
checkpoints and need to restart the application without restoring from a 
savepoint.



Does this kind of application require a state TTL on the join operator? I 
assumed since it was an interval join, events that fell outside of the lower 
timebound would automatically be expired from the state. Is that a correct 
assumption?



Thanks,

Chris


Reply via email to