Cleaning up with timers should solve this. Both approaches have some
advantages and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get
compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for
timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just
something to keep in mind eg. when bootstraping the state from historical
data. (large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible
to use a "periodic compaction" [1], but nobody has tried that so far. In
the meantime I think there is non real workaround because we don't expose a
way to trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time.
I'd like to pull Yun into the conversation as he's super familiar with the
RocksDB state backend.

[1]
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xiaotao...@gmail.com> wrote:

> Hi David,
>
> Confirmed with RocksDB log Stephan's observation is the root cause that
> compaction doesn't clean up the high level sst files fast enough.  Do you
> think manual clean up by registering a timer is the way to go or any
> RocksDB parameter can be tuned to mitigate this issue?
>
> On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xiaotao...@gmail.com> wrote:
>
>> Hi David,
>>
>> If I read Stephan's comment correctly TTL doesn't work well for cases
>> where we have too many levels, like fast growing state,  as compaction
>> doesn't clean up high level SST files in time, Is this correct? If yes
>> should we register a timer with TTL time and manual clean up the state
>> (state.clear() ) when the timer fires?
>>
>> I will turn on RocksDB logging as well as compaction logging [1] to
>> verify this
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>
>>
>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote:
>>
>>> Hi Tao,
>>>
>>> my intuition is that the compaction of SST files is not triggering. By
>>> default, it's only triggered by the size ratios of different levels [1] and
>>> the TTL mechanism has no effect on it.
>>>
>>> Some reasoning from Stephan:
>>>
>>> It's very likely to have large files in higher levels that haven't been
>>>> compacted in a long time and thus just stay around.
>>>>
>>>> This might be especially possible if you insert a lot in the beginning
>>>> (build up many levels) and then have a moderate rate of modifications, so
>>>> the changes and expiration keep happening purely in the merges /
>>>> compactions of the first levels. Then the later levels may stay unchanged
>>>> for quite some time.
>>>>
>>>
>>> You should be able to see compaction details by setting RocksDB logging
>>> to INFO [2]. Can you please check these and validate whether this really is
>>> the case?
>>>
>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>>> [2]
>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote:
>>>
>>>> Hi team
>>>>
>>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>>> value state doesn't exist and we never update it again after the state is
>>>> not empty. The key of the value state is timestamp. My understanding of
>>>> such TTL settings is that the size of all SST files remains flat (let's
>>>> disregard the impact space amplification brings) after 1 day as the daily
>>>> data volume is more or less the same. However the RocksDB native metrics
>>>> show that the SST files continue to grow since I started the job. I check
>>>> the SST files in local storage and I can see SST files with age 1 months
>>>> ago (when I started the job). What is the possible reason for the SST files
>>>> not cleaned up?.
>>>>
>>>> The Flink version is 1.12.1
>>>> State backend is RocksDB with incremental checkpoint
>>>> All default configuration for RocksDB
>>>> Per job mode in Yarn and checkpoint to S3
>>>>
>>>>
>>>> Here is the code to set value state
>>>>
>>>> public void open(Configuration parameters) {
>>>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>>>             .newBuilder(Time.days(1))
>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>             
>>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>             .cleanupInRocksdbCompactFilter(300_000)
>>>>             .build();
>>>>     ValueStateDescriptor<Click> clickStateDescriptor = new 
>>>> ValueStateDescriptor<>("click", Click.class);
>>>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>>
>>>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>>>             .newBuilder(Time.days(1))
>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>             
>>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>>>             .build();
>>>>     ValueStateDescriptor<A> adsStateDescriptor = new 
>>>> ValueStateDescriptor<>("ads", slimAdsClass);
>>>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>>>> }
>>>>
>>>> @Override
>>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, 
>>>> Collector<A> collector) throws Exception {
>>>>     if (tuple.f1 != null) {
>>>>         Click click = tuple.f1;
>>>>
>>>>         if (clickState.value() != null) {
>>>>             return;
>>>>         }
>>>>
>>>>         clickState.update(click);
>>>>
>>>>         A adsFromState = adsState.value();
>>>>         if (adsFromState != null) {
>>>>             collector.collect(adsFromState);
>>>>         }
>>>>     } else {
>>>>         A ads = tuple.f2;
>>>>
>>>>         if (adsState.value() != null) {
>>>>             return;
>>>>         }
>>>>
>>>>         adsState.update(ads);
>>>>
>>>>         Click clickFromState = clickState.value();
>>>>         if (clickFromState != null) {
>>>>             collector.collect(ads);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>> Here is the snippet of sst files in local storage
>>>>
>>>> [root@xxxx db]# ll | head -n10
>>>> total 76040068
>>>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>>>> --
>>>> Regards,
>>>> Tao
>>>>
>>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>

Reply via email to