[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1259127175 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -433,6 +461,13 @@ public static class RocksdbCompactFilterCleanupStrategy implements CleanupStrategies.CleanupStrategy { private static final long serialVersionUID = 3109278796506988980L; +/** + * Default value lets RocksDB control this feature as needed. For now, RocksDB will change + * this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the + * compaction process at least once every 30 days if not compacted sooner. + */ +static final Time DEFAULT_PERIODIC_COMPACTION_TIME = Time.seconds(0xfffeL); Review Comment: As discussed offline, we will set the default value to 30 days in Flink to avoid the unpredictable change in rocksdb. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1241839333 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -433,6 +461,13 @@ public static class RocksdbCompactFilterCleanupStrategy implements CleanupStrategies.CleanupStrategy { private static final long serialVersionUID = 3109278796506988980L; +/** + * Default value lets RocksDB control this feature as needed. For now, RocksDB will change + * this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the + * compaction process at least once every 30 days if not compacted sooner. + */ +static final Time DEFAULT_PERIODIC_COMPACTION_TIME = Time.seconds(0xfffeL); Review Comment: I have used long type in the inner class to avoid the confusing semantics and also update the doc. Please take a review agiain, Thanks! I will squash some commits when you approve. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1241087044 ## docs/content/docs/dev/datastream/fault-tolerance/state.md: ## @@ -633,6 +633,15 @@ Updating the timestamp more often can improve cleanup speed but it decreases compaction performance because it uses JNI call from native code. The default background cleanup for RocksDB backend queries the current timestamp each time 1000 entries have been processed. +Periodic compaction could speed up expired state entries cleanup, especially for state entries rarely accessed. Review Comment: I have added the description to the `Note` . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1138293917 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -437,18 +465,43 @@ public boolean runCleanupForEveryRecord() { DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY = new RocksdbCompactFilterCleanupStrategy(1000L); +/** + * Default value lets RocksDB control this feature as needed. For now, RocksDB will change + * this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the + * compaction process at least once every 30 days if not compacted sooner. + */ +static final long DEFAULT_PERIODIC_COMPACTION_SECONDS = 0xfffeL; Review Comment: Oh, I have fixed it. It should run normally now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1128942248 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -437,18 +465,43 @@ public boolean runCleanupForEveryRecord() { DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY = new RocksdbCompactFilterCleanupStrategy(1000L); +/** + * Default value lets RocksDB control this feature as needed. For now, RocksDB will change + * this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the + * compaction process at least once every 30 days if not compacted sooner. + */ +static final long DEFAULT_PERIODIC_COMPACTION_SECONDS = 0xfffeL; Review Comment: Just as you commented in the frocksdb: `Default: 0xfffe (allow RocksDB to auto-tune) For now, RocksDB will change this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the compaction process at least once every 30 days if not compacted sooner.` IIUC, it's the default value of this feature in RocksDB ? Or Do we need to introduce a specific default value for flink meaning not setting this option ? WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1128940896 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -298,12 +298,40 @@ public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) { return this; } +/** + * Cleanup expired state while Rocksdb compaction is running. + * + * RocksDB compaction filter will query current timestamp, used to check expiration, from + * Flink every time after processing {@code queryTimeAfterNumEntries} number of state + * entries. Updating the timestamp more often can improve cleanup speed but it decreases + * compaction performance because it uses JNI call from native code. + * + * Periodic compaction could speed up expired state entries cleanup, especially for state + * entries rarely accessed. Files older than this value will be picked up for compaction, + * and re-written to the same level as they were before. It makes sure a file goes through + * compaction filters periodically. + * + * @param queryTimeAfterNumEntries number of state entries to process by compaction filter + * before updating current timestamp + * @param periodicCompactionSeconds periodic compaction per seconds which could speed up + * expired state cleanup. 0 means turning off periodic compaction. + */ +@Nonnull +public Builder cleanupInRocksdbCompactFilter( +long queryTimeAfterNumEntries, long periodicCompactionSeconds) { Review Comment: Sorry for the delayed update due to my personal business last month. Thanks for the suggestion. I replaced `long` with `Time` which is also the type of TTL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org