[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liangxiaokun updated FLINK-23721:
---------------------------------
    Attachment: image-2022-06-02-17-59-38-542.png

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-23721
>                 URL: https://issues.apache.org/jira/browse/FLINK-23721
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends, Table SQL / Runtime
>    Affects Versions: 1.13.0
>            Reporter: Q Kang
>            Priority: Major
>         Attachments: image-2022-06-02-17-59-38-542.png
>
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value 
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value 
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value 
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to