Cleaning up with timers should solve this. Both approaches have some advantages and disadvantages though.
Timers: - No "side effects". - Can be set in event time. Deletes are regular tombstones that will get compacted later on. TTL: - Performance. This costs literally nothing compared to an extra state for timer + writing a tombstone marker. - Has "side-effects", because it works in processing time. This is just something to keep in mind eg. when bootstraping the state from historical data. (large event time / processing time skew) With 1.14 release, we've bumped the RocksDB version so it may be possible to use a "periodic compaction" [1], but nobody has tried that so far. In the meantime I think there is non real workaround because we don't expose a way to trigger manual compaction. I'm off to vacation until 27th and I won't be responsive during that time. I'd like to pull Yun into the conversation as he's super familiar with the RocksDB state backend. [1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction Best, D. On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xiaotao...@gmail.com> wrote: > Hi David, > > Confirmed with RocksDB log Stephan's observation is the root cause that > compaction doesn't clean up the high level sst files fast enough. Do you > think manual clean up by registering a timer is the way to go or any > RocksDB parameter can be tuned to mitigate this issue? > > On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xiaotao...@gmail.com> wrote: > >> Hi David, >> >> If I read Stephan's comment correctly TTL doesn't work well for cases >> where we have too many levels, like fast growing state, as compaction >> doesn't clean up high level SST files in time, Is this correct? If yes >> should we register a timer with TTL time and manual clean up the state >> (state.clear() ) when the timer fires? >> >> I will turn on RocksDB logging as well as compaction logging [1] to >> verify this >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction >> >> >> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote: >> >>> Hi Tao, >>> >>> my intuition is that the compaction of SST files is not triggering. By >>> default, it's only triggered by the size ratios of different levels [1] and >>> the TTL mechanism has no effect on it. >>> >>> Some reasoning from Stephan: >>> >>> It's very likely to have large files in higher levels that haven't been >>>> compacted in a long time and thus just stay around. >>>> >>>> This might be especially possible if you insert a lot in the beginning >>>> (build up many levels) and then have a moderate rate of modifications, so >>>> the changes and expiration keep happening purely in the merges / >>>> compactions of the first levels. Then the later levels may stay unchanged >>>> for quite some time. >>>> >>> >>> You should be able to see compaction details by setting RocksDB logging >>> to INFO [2]. Can you please check these and validate whether this really is >>> the case? >>> >>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction >>> [2] >>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting >>> >>> Best, >>> D. >>> >>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote: >>> >>>> Hi team >>>> >>>> We have a job that uses value state with RocksDB and TTL set to 1 day. >>>> The TTL update type is OnCreateAndWrite. We set the value state when the >>>> value state doesn't exist and we never update it again after the state is >>>> not empty. The key of the value state is timestamp. My understanding of >>>> such TTL settings is that the size of all SST files remains flat (let's >>>> disregard the impact space amplification brings) after 1 day as the daily >>>> data volume is more or less the same. However the RocksDB native metrics >>>> show that the SST files continue to grow since I started the job. I check >>>> the SST files in local storage and I can see SST files with age 1 months >>>> ago (when I started the job). What is the possible reason for the SST files >>>> not cleaned up?. >>>> >>>> The Flink version is 1.12.1 >>>> State backend is RocksDB with incremental checkpoint >>>> All default configuration for RocksDB >>>> Per job mode in Yarn and checkpoint to S3 >>>> >>>> >>>> Here is the code to set value state >>>> >>>> public void open(Configuration parameters) { >>>> StateTtlConfig ttlConfigClick = StateTtlConfig >>>> .newBuilder(Time.days(1)) >>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>>> >>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>>> .cleanupInRocksdbCompactFilter(300_000) >>>> .build(); >>>> ValueStateDescriptor<Click> clickStateDescriptor = new >>>> ValueStateDescriptor<>("click", Click.class); >>>> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >>>> clickState = getRuntimeContext().getState(clickStateDescriptor); >>>> >>>> StateTtlConfig ttlConfigAds = StateTtlConfig >>>> .newBuilder(Time.days(1)) >>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>>> >>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>>> .cleanupInRocksdbCompactFilter(30_000_000) >>>> .build(); >>>> ValueStateDescriptor<A> adsStateDescriptor = new >>>> ValueStateDescriptor<>("ads", slimAdsClass); >>>> adsStateDescriptor.enableTimeToLive(ttlConfigAds); >>>> adsState = getRuntimeContext().getState(adsStateDescriptor); >>>> } >>>> >>>> @Override >>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, >>>> Collector<A> collector) throws Exception { >>>> if (tuple.f1 != null) { >>>> Click click = tuple.f1; >>>> >>>> if (clickState.value() != null) { >>>> return; >>>> } >>>> >>>> clickState.update(click); >>>> >>>> A adsFromState = adsState.value(); >>>> if (adsFromState != null) { >>>> collector.collect(adsFromState); >>>> } >>>> } else { >>>> A ads = tuple.f2; >>>> >>>> if (adsState.value() != null) { >>>> return; >>>> } >>>> >>>> adsState.update(ads); >>>> >>>> Click clickFromState = clickState.value(); >>>> if (clickFromState != null) { >>>> collector.collect(ads); >>>> } >>>> } >>>> } >>>> >>>> >>>> Here is the snippet of sst files in local storage >>>> >>>> [root@xxxx db]# ll | head -n10 >>>> total 76040068 >>>> -rw-r----- 1 hadoop yarn 0 Aug 16 08:46 000003.log >>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst >>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst >>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst >>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst >>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst >>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst >>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst >>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst >>>> -- >>>> Regards, >>>> Tao >>>> >>> >> >> -- >> Regards, >> Tao >> > > > -- > Regards, > Tao >