junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r701240936
##########
File path:
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -37,6 +40,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.OptionalLong;
Review comment:
Sorry, I meant adding a description regarding tombstone in the comment
of LogCleaner.
##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) ||
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
+
if(cleanableLogs.isEmpty) {
- None
+ val logsWithTombstonesExpired = dirtyLogs.filter {
+ case ltc =>
+ // in this case, we are probably in a low throughput situation
+ // therefore, we should take advantage of this fact and remove
tombstones if we can
+ // under the condition that the log's latest delete horizon is
less than the current time
+ // tracked
+ ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP &&
ltc.log.latestDeleteHorizon <= time.milliseconds()
Review comment:
Related to this, I am a bit concerned about the extra cleaning due to
this. If we have just one tombstone record, this can force a round of cleaning
on idle partitions. An alternative way is to clean the number of total
surviving records and tombstone records during cleaning. We only trigger a
cleaning if #tombstone/#totalRecords > minCleanableRatio. @hachikuji What do
you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]