Hi Tao,

my intuition is that the compaction of SST files is not triggering. By
default, it's only triggered by the size ratios of different levels [1] and
the TTL mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been
> compacted in a long time and thus just stay around.
>
> This might be especially possible if you insert a lot in the beginning
> (build up many levels) and then have a moderate rate of modifications, so
> the changes and expiration keep happening purely in the merges /
> compactions of the first levels. Then the later levels may stay unchanged
> for quite some time.
>

You should be able to see compaction details by setting RocksDB logging to
INFO [2]. Can you please check these and validate whether this really is
the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2]
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote:

> Hi team
>
> We have a job that uses value state with RocksDB and TTL set to 1 day. The
> TTL update type is OnCreateAndWrite. We set the value state when the value
> state doesn't exist and we never update it again after the state is not
> empty. The key of the value state is timestamp. My understanding of such
> TTL settings is that the size of all SST files remains flat (let's
> disregard the impact space amplification brings) after 1 day as the daily
> data volume is more or less the same. However the RocksDB native metrics
> show that the SST files continue to grow since I started the job. I check
> the SST files in local storage and I can see SST files with age 1 months
> ago (when I started the job). What is the possible reason for the SST files
> not cleaned up?.
>
> The Flink version is 1.12.1
> State backend is RocksDB with incremental checkpoint
> All default configuration for RocksDB
> Per job mode in Yarn and checkpoint to S3
>
>
> Here is the code to set value state
>
> public void open(Configuration parameters) {
>     StateTtlConfig ttlConfigClick = StateTtlConfig
>             .newBuilder(Time.days(1))
>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .cleanupInRocksdbCompactFilter(300_000)
>             .build();
>     ValueStateDescriptor<Click> clickStateDescriptor = new 
> ValueStateDescriptor<>("click", Click.class);
>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>
>     StateTtlConfig ttlConfigAds = StateTtlConfig
>             .newBuilder(Time.days(1))
>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .cleanupInRocksdbCompactFilter(30_000_000)
>             .build();
>     ValueStateDescriptor<A> adsStateDescriptor = new 
> ValueStateDescriptor<>("ads", slimAdsClass);
>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>     adsState = getRuntimeContext().getState(adsStateDescriptor);
> }
>
> @Override
> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, 
> Collector<A> collector) throws Exception {
>     if (tuple.f1 != null) {
>         Click click = tuple.f1;
>
>         if (clickState.value() != null) {
>             return;
>         }
>
>         clickState.update(click);
>
>         A adsFromState = adsState.value();
>         if (adsFromState != null) {
>             collector.collect(adsFromState);
>         }
>     } else {
>         A ads = tuple.f2;
>
>         if (adsState.value() != null) {
>             return;
>         }
>
>         adsState.update(ads);
>
>         Click clickFromState = clickState.value();
>         if (clickFromState != null) {
>             collector.collect(ads);
>         }
>     }
> }
>
>
> Here is the snippet of sst files in local storage
>
> [root@xxxx db]# ll | head -n10
> total 76040068
> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
> --
> Regards,
> Tao
>

Reply via email to